Airflow, writing custom operators and publishing them as a package: Part 2.
Welcome back to the second part of the series of building and packaging a custom airflow operator. In the previous part, we set up our local environment for packaging using poetry, we then set up our environment for running airflow in docker, and finally tested our airflow in the docker environment. In this article, we will build a simple operator in our local environment and then package it using poetry. Let us get started!!
Table of Contents:
- Setting up the plugins folder for building the operator
- Details of the custom operator.
- Implementing the simple custom operator.
- Testing our custom operator in an Airflow Dag.
- Packaging the operator (with optional formatting using the black package).
1. Setting up your plugins folder for building the operator:
We will build our simple operator in our airflow docker setup. We have already set up the dev environment along with the required folders. The docker-compose file we ran created the plugins folder for us. Now to build a custom operator we need to create an operators folder within the dev folder we have.
The hooks folder is something I created. Hooks are ways using which airflow talks and connects to external systems like a snowflake database or MySQL database. We won’t be writing any hooks in this article. To learn more about the connections and hooks in airflow you can visit the official docs here.
Next within the operator folder create an empty __init__.py file.
With this, we are ready to build our simple test operator.
2. Details of the custom operator:
The simple custom operator that we are going to build will be extending the functionality of the PythonOperator. We will be extending the functionality of the PythonOperator which is a native airflow operator and enable it to report the time taken to complete its execution. To learn about some other details on building a custom operator and get familiar with some in-depth things you can visit the official docs here.
One potential use case of such an operator could be to time a long-running function for example a training loop or some computation python function.
3. Implementing the simple custom operator:
To extend the functionality of the PythonOperator as discussed in the previous section we will be subclassing the PythonOperator. In the operator's folder, we will create a python file named timed_python_operator.py. This python file will contain the required code for our custom operator.
The code is as follows:
We subclass the PythonOperator and create our new operator and name it as “TimedPythonOperator”. For this simple operator, we don’t implement any additional parameters hence our __init__ function will just have a call to the superclass. We implement our logic of timing the operator in the execute function. We import the time module and start tracking the time, we make a call to the execute function of PythonOperator through “super().execute(context)” and we sure pass the context to this call. In the end, our custom TimedPythonOperator will report the elapsed time which can be retrieved from the logs. If you need the elapsed time can be pushed to some database or other place for further analysis.
4. Testing our custom operator in an Airflow Dag:
We will test our, TimedPythonOperator with an airflow dag. For this, I created a dag under the dags folder named custom_op_test_dag.py. The following is the code present in the DAG.
In the above DAG, we are running a python function that prints “I running using TimedPythonOperator custom operator” 10 times. Since it is run using the TimedPythonOperator, at the end of the run it will log the time taken for the say_hello() function to execute.
Once I triggered the dag, I was able to see the following output:
5. Packaging the operator:
Once you are done testing your operator on your local setup then you are ready to package up your airflow operator. Poetry already has given us the folder airflow_custom_operator along with the __init__.py. Now its time to copy over our operators folder from dev/operators to airflow_custom_operator/operators
- First, before we make the copy we would want to delete the __pycache__ folder. You can do this manually if you chose to but I have a makefile that clears such folders for me. Before I delete the __pycache__ folder I will stop my local airflow instance to make sure that the folder doesn’t get recreated.
- Second, we copy-paste the dev/operators folder to airflow_custom_operator/operators.
This is how my setup looks after completing the above-mentioned steps:
3. Now before we build our package using poetry we want to make sure that the package once built should be capable of being registered by airflow as an operator. To ensure that airflow registers our package as an airflow operator we need to modify the airflow_custom_operator/__init__.py.
Note:
airflow_custom_operator/__init__.py is not the same as airflow_custom_operator/operators/__init__.py
We update the airflow_custom_operator/__init__.py with a function called get_provider_info() and it is just filled with metadata of our operator.
The following image shows what has been updated in the airflow_custom_operator/__init__.py file:
These conclude the 3 important steps before we can build our package. Here next we have an optional 4th step.
4. Before I actually package the operator I want to make sure I have the right formatting of the python files. For that, I will use the black formatter. You can learn more about this formatter here.
I will add the black package to our project through the following code :
>poetry add black
Then I will run the following command to format all our python files:
> poetry run black airflow_custom_operator dev tests
Here are the results from the above command:
Now we are finally ready to build our package. To build your package you can run the following from the airflow-custom-operator folder
airflow-custom-operator>poetry build
It took less than 2 seconds for the package to be built:
You can install the package locally using
>poetry install
If you are looking to publish it to PyPi using poetry you can follow this tutorial. You should be ready to ship your package in no time.
We didn’t write any unit tests for our custom operator as it was a simple operator that we developed and didn’t really need a whole lot of test cases. But there are a lot of articles out there that will teach you how to develop test cases for airflow operators. If you guys need me to also write about developing tests for airflow operators please let me know in the comments below.
Thank you everyone for reading, that is all for this series on airflow. In this 2 part series on building airflow custom operator, we looked at how to set up our environment to develop a custom airflow operator, test it with DAGS and finally package it up using poetry. I will continue to write about Airflow in the future and also other Machine Learning and MLOps-related topics all included with detailed code and screenshots for you guys to follow along.
Next I will be writing an article on cometML , its features (with code and demonstration) and also how it is benifical for Data Scientists and MLOps team. Until next time, take care.
More content to follow. Please clap if the article was helpful and comment if you have any questions. If you want to connect with me, learn and grow with me or collaborate you can reach me at any of the following:
Linkedin:- https://www.linkedin.com/in/virajdatt-kohir/
Twitter:- https://twitter.com/kvirajdatt
GitHub:- https://github.com/Virajdatt
GoodReads:- https://www.goodreads.com/user/show/114768501-virajdatt-kohir
:) :):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):):)