SleekData, the ultimate hub for all things data. Whether you're a data engineer, data scientist or aspiring data professional, you've come to the perfect place.
Join us as we explore the power of Azure, AWS, Bigdata, dbt (data build tool), Snowflake, Databricks, Terraform, Power BI, and more.
Manage flow of tasks - Airflow Tutorial Trigger Rules, Conditional Branching, Setup Teardown, Latest Only, Depends On Past
Get link
Facebook
X
Pinterest
Email
Other Apps
Hello Data Pros, and welcome back to another exciting episode of our Apache Airflow series!
**** Code lines at the End ****
Today, we'll explore how to manage the flow of tasks in Airflow—a critical step in orchestrating efficient data pipelines!
With the default airflow settings, a task is executed only when all its dependencies complete successfully. However, in real-world projects, customizing this default behaviour becomes essential to address a vast number of use cases.
For example, you might need to dynamically pick and run a specific branch depending on the outcome of a preceding task, while skipping the remaining branches.
The Branch Python Operator facilitates this feature, by allowing you to select a branch through a user-defined Python function. Within this function, you can implement the logic to determine the appropriate branch, and should ensure that it returns the task ID of the downstream task to be executed next.
All the code lines I've demonstrated in this video are provided in the description as a link.
Now, let's move on to executing it in the Airflow UI. Everything proceeded smoothly, and this is exactly what I need to accomplish.
In another scenario, you may want to trigger a specific task only if all of its predecessor tasks have failed. This is where Airflow trigger rules come into play!
So far, we haven't explicitly coded trigger rules in our dags, that’s because by default, airflow considers the all success trigger rule for your tasks.
However, you can modify this behaviour by specifying trigger rule parameters for your tasks as demonstrated here.
With over 10 different types of trigger rules available, you have the flexibility to customize task runs according to your specific use case.
Now, let's validate it in the Airflow UI.
Great, it works perfectly!
As we progress, consider a scenario where you must perform initial setup and final cleanup tasks—such as starting a cluster at the beginning of the pipeline and stopping it at the end. These two steps are necessary regardless of the outcomes of the tasks in between.
This dependency definition code over here, sets one of my tasks named create cluster as the setup task, and the delete cluster task as the teardown task.
Now, let's witness it in the UI.
Cool, this’s exactly what we expected!
Moving further, for backfill scenarios, where historical data is processed, you may find it valuable to selectively skip certain steps in the pipeline.
For example, you might want to avoid sending a job completion email when the pipeline is run for a backfill scenario. This approach prevents flooding your mailbox with a large number of emails, especially if the backfill spans over a significant number of days.
This feature is achieved by creating a dummy task with the Latest Only Operator. Any task that you place as a successor of this dummy task will be executed only for the current day run, and specifically not for backfill runs.
For the purpose of this demo, I've set up a catch-up window of just 3 days. Let's observe how it works in the UI.
As you can see, the email step was triggered only for the last run, which is for today's date.
Lastly, imagine a scenario where a task should only run if its previous execution instance has completed successfully. Perhaps, you wouldn't want to execute today's task if yesterday's run has failed, and the root cause has not yet been addressed.
All that you’ve to do is set the 'depends on past' parameter to True, in the specific task.
That's all for today! stay tuned for our next video where we’ll explore more advanced airflow features!
1) Branching:
fromdatetimeimportdatetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
Hello Data Pros, In our previous blog, we explored what Airflow is! covered essential concepts such as Dags, Tasks and Operators! We also dissected its architecture and core components! In this video, we'll demonstrate how to set up Airflow on your local machine and create your first Airflow Dag! Let's begin right away! I’m using windows, but the same approach works well on macOS as well. As of this video, Airflow is not officially supported on Windows, so we'll be installing Docker and running Airflow on top of it. Docker is a software containerization platform designed for developing, shipping, and running applications. It packages the entire application along with its dependencies and configurations within a standardized unit known as a container. These containerized applications are known for their consistency, repeatability, and portability across different operating systems. Please download and install 'Docker Desktop for Windows' using the
Hello Data Pros, In our last blog, we demonstrated step-by-step installation of Apache Airflow on a Windows PC, and successfully executed our very first Airflow dag! Now, it's time to dive deeper! In this video, we'll learn about the airflow configuration file! Explore each section inside a dag! Understand various Operator types! Experience the power of provider packages! Let's begin right away! As we already know, airflow dags are coded in Python language. Every Airflow setup has a ‘dags folder’. You can set this folder path in the Airflow configuration file, named airflow dot cfg. In addition to the dags folder, this configuration file has many other settings that you can customize to meet your needs. For example, to enable my Airflow instance to send email notifications, I added another Docker container in my docker compose. This new container will locally host a simple SMTP server. I then updated the Airflow configuration file to use the corresponding Docker se
###### Snowflake DDL - at the end ######### Hello Data Folks, Today we’ll see how to install DBT, set up your first project, and create a dbt model. So, without further ado, let's dive right in and get started! Python is a prerequisite for using dbt, so make sure to download it from python.org and install it on your system. Please select this checkbox during the installation process. This’ll automatically add the Python installation directory to your system's Path variable. To verify that Python is working correctly, open the command prompt and use the "python --version" command. Looking good! Let's now download and install Visual Studio Code, the most powerful and widely used IDE in the industry. After installing VS Code, please proceed to install the Python and dbt extensions one after another. As a best practice, please choose extensions with high downloads and ratings. Please open the terminal, and cd to the path where you want to setup your first pr
Comments
Post a Comment