Airflow DAGs, Operators, Tasks & Providers
Hello Data Pros,
In our last blog, we demonstrated step-by-step installation of Apache Airflow on a Windows PC, and successfully executed our very first Airflow dag!
Now, it's time to dive deeper!
In this video, we'll learn about the airflow configuration file!
Explore each section inside a dag!
Understand various Operator types!
Experience the power of provider packages!
Let's begin right away!
As we already know, airflow dags are coded in Python language.
Every Airflow setup has a ‘dags folder’. You can set this folder path in the Airflow configuration file, named airflow dot cfg.
In addition to the dags folder, this configuration file has many other settings that you can customize to meet your needs.
For example, to enable my Airflow instance to send email notifications, I added another Docker container in my docker compose. This new container will locally host a simple SMTP server. I then updated the Airflow configuration file to use the corresponding Docker service name as the SMTP host.
Let's now take a closer look at each section inside a dag!
In general, we begin with import statements.
We should always import the dag class,
In addition, import the airflow operators that you are planning to use in your tasks.
We may also need to import any functions you use, whether they are built-in or user-defined.
For example, in my case, I've developed a custom Python function called clean data, which is placed in the plugin folder. Because I intend to use the clean data function in this dag, I've included it in the imports section.
Next, create a dag object and configure the dag-level parameters, such as start date, end date, schedule interval, backfill, catchup and more.
For a complete list of dag parameters and their purpose, please refer to this official Airflow documentation link in the video description.
Next, we create tasks.
A task is created by instantiating a specific operator and providing the necessary task-level parameters.
The parameters differ depending on the operator used, so always refer to the relevant documentation for a list of the task level parameters that you can use.
At the end, we define the task dependencies!
There are several methods for establishing dependencies between tasks:
Using Bitshift operators.
With Set upstream and set downstream functions.
or with the use of chain function!
In addition, when using the TaskFlow API, dependencies are automatically inferred based on the sequence of task function calls. We’ll cover the TaskFlow API in our later videos.
Let’s try to execute this dag in the airflow UI.
It has completed successfully, and I can validate the respective logs as well.
Alright, let's understand the difference between an operator and a task.
In simpler terms, think of an operator as a blueprint or design template, while tasks are implementations of the blueprint.
In Python or object-oriented programming terms, operator is a class, and tasks are objects created from the class!
At a high level, we can categorize these operators into three main groups:
1. Action Operators!
These operators execute a specific function or task. For instance, the BashOperator, which is used for running Bash commands!
The PythonOperator, which lets you run Python code!
Azure DataFactory Run Pipeline Operator, which is used to Execute Azure Data Factory pipeline!
2. Transfer Operators!
These operators are used for moving data from one place to another. An excellent example of this is the S3ToRedshiftOperator. It does exactly what it sounds like. It moves data from Amazon S3 to Amazon Redshift.
3. Sensor Operators!
Sensors wait for a specific condition to be met, before triggering the subsequent tasks in the workflow.
An example of a sensor is the S3KeySensor, which waits for one or more files to be created in a S3 bucket.
The AWS Redshift Cluster Sensor waits for a Redshift cluster to reach a specific status.
It's worth mentioning that Airflow offers a vast number of operators. This list highlights just a few of the most common ones.
Not all packages are included in the default Airflow installation. For instance, if we attempt to use the GithubOperator in our code, we may encounter an issue.
Let’s go ahead and try!
When you return to the UI, you should observe a "dag Import Error" similar to the one displayed here.
This error is caused by a missing provider package. While some provider packages are included with Airflow, you may encounter situations where you need to install additional ones.
Please refer to this link in the video description for the complete list of Airflow provider packages.
In our setup you can simply add the missing provider package name over here in the docker file.
Alternatively, we can create a requirements file and include all packages one after another.
For this change to take effect, we should rebuild the docker image.
Let’s restart the docker containers.
Now I no longer see an error, and my dag executes successfully.
This plug-n-play extendibility, and the availability of wide range of provider packages make Airflow an exceptionally powerful and versatile platform.
That's all for today! Please stay tuned for our next video where we’ll explore Airflow Variables and Connections.
Please do like the video and subscribe to our channel.
If you’ve any questions or thoughts, please feel free to leave them in the comments section below.
# exchange_rate_pipeline.py
# git_repo_dag.py
# clean_data.py
# dockerfile
# docker-compose.yml
Comments
Post a Comment