Manage flow of tasks - Airflow Tutorial Trigger Rules, Conditional Branching, Setup Teardown, Latest Only, Depends On Past

 Hello Data Pros, and welcome back to another exciting episode of our Apache Airflow series!


**** Code lines at the End ****

Today, we'll explore how to manage the flow of tasks in Airflow—a critical step in orchestrating efficient data pipelines!

 

With the default airflow settings, a task is executed only when all its dependencies complete successfully. However, in real-world projects, customizing this default behaviour becomes essential to address a vast number of use cases.

 

For example, you might need to dynamically pick and run a specific branch depending on the outcome of a preceding task, while skipping the remaining branches.

The Branch Python Operator facilitates this feature, by allowing you to select a branch through a user-defined Python function. Within this function, you can implement the logic to determine the appropriate branch, and should ensure that it returns the task ID of the downstream task to be executed next.

 

All the code lines I've demonstrated in this video are provided in the description as a link.

Now, let's move on to executing it in the Airflow UI. Everything proceeded smoothly, and this is exactly what I need to accomplish.

 

In another scenario, you may want to trigger a specific task only if all of its predecessor tasks have failed. This is where Airflow trigger rules come into play!

So far, we haven't explicitly coded trigger rules in our dags, that’s because by default, airflow considers the all success trigger rule for your tasks.

However, you can modify this behaviour by specifying trigger rule parameters for your tasks as demonstrated here.

With over 10 different types of trigger rules available, you have the flexibility to customize task runs according to your specific use case.

Now, let's validate it in the Airflow UI.

Great, it works perfectly!

 

 

As we progress, consider a scenario where you must perform initial setup and final cleanup tasks—such as starting a cluster at the beginning of the pipeline and stopping it at the end. These two steps are necessary regardless of the outcomes of the tasks in between.

 

This dependency definition code over here, sets one of my tasks named create cluster as the setup task, and the delete cluster task as the teardown task.

 

Now, let's witness it in the UI.

 

Cool, this’s exactly what we expected!

 

 

Moving further, for backfill scenarios, where historical data is processed, you may find it valuable to selectively skip certain steps in the pipeline.

For example, you might want to avoid sending a job completion email when the pipeline is run for a backfill scenario. This approach prevents flooding your mailbox with a large number of emails, especially if the backfill spans over a significant number of days.

 

This feature is achieved by creating a dummy task with the Latest Only Operator. Any task that you place as a successor of this dummy task will be executed only for the current day run, and specifically not for backfill runs.

 

For the purpose of this demo, I've set up a catch-up window of just 3 days. Let's observe how it works in the UI.

As you can see, the email step was triggered only for the last run, which is for today's date.

 

Lastly, imagine a scenario where a task should only run if its previous execution instance has completed successfully. Perhaps, you wouldn't want to execute today's task if yesterday's run has failed, and the root cause has not yet been addressed.

 

All that you’ve to do is set the 'depends on past' parameter to True, in the specific task.

 

That's all for today! stay tuned for our next video where we’ll explore more advanced airflow features!


1) Branching:


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

# Function to get file size
def get_file_size(file_path, ti):
    # Get file size in bytes
    with open(file_path, "rb") as f:
        file_size_bytes = len(f.read())

    # Convert bytes to gigabytes
    file_size_gb = file_size_bytes / (1024 ** 3)

    # Push file size to XCom
    ti.xcom_push(key='file_size', value=file_size_gb)

# Function to decide the branch
def decide_branch(ti):
    file_size = ti.xcom_pull(task_ids='check_file_size', key='file_size')
    return 'parallel_transform' if file_size > 10 else 'serial_transform'

# Serial transformation task
def serial_transform():
    # Add your serial transformation logic here
    print("Executing serial transformation")

# Serial load task
def serial_load():
    # Add your serial load logic here
    print("Executing serial load")

# Parallel transformation task
def parallel_transform():
    # Add your parallel transformation logic here
    print("Executing parallel transformation")

# Parallel load task
def parallel_load():
    # Add your parallel load logic here
    print("Executing parallel load")

dag = DAG(
    'conditional_branching',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 23 * * *',
    catchup=False
)

# Task to check file size
check_file_size = PythonOperator(
    task_id='check_file_size',
    python_callable=get_file_size,
    op_args=['/tmp/source_file_extrat.dat'],
    provide_context=True,
    dag=dag,
)

# Task to decide the branch
decide_branch_task = BranchPythonOperator(
    task_id='decide_branch',
    python_callable=decide_branch,
    provide_context=True,
    dag=dag,
)

# Define tasks for serial execution
serial_transform_task = PythonOperator(
    task_id='serial_transform',
    python_callable=serial_transform,
    dag=dag,
)

serial_load_task = PythonOperator(
    task_id='serial_load',
    python_callable=serial_load,
    dag=dag,
)

# Define tasks for parallel execution
parallel_transform_task = PythonOperator(
    task_id='parallel_transform',
    python_callable=parallel_transform,
    dag=dag,
)

parallel_load_task = PythonOperator(
    task_id='parallel_load',
    python_callable=parallel_load,
    dag=dag,
)

# Set up task dependencies
check_file_size >> decide_branch_task

# Serial branch
decide_branch_task >> serial_transform_task >> serial_load_task

# Parallel branch
decide_branch_task >> parallel_transform_task >> parallel_load_task


2) Trigger Rules:


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


dag = DAG(
    'trigger_rules',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

# Define tasks
task_a = PythonOperator(
    task_id='task_a',
    python_callable=lambda: raise_exception("Failure in Query a"),
    dag=dag,
)

task_b = PythonOperator(
    task_id='task_b',
    python_callable=lambda: raise_exception("Failure in Query b"),
    dag=dag,
)

task_c = PythonOperator(
    task_id='task_c',
    python_callable=lambda: raise_exception("Failure in Query c"),
    dag=dag,
)

task_d = PythonOperator(
    task_id='task_d',
    python_callable=lambda: print("Executing Task D"),
    dag=dag,
    trigger_rule='all_failed',
)

task_e = PythonOperator(
    task_id='task_e',
    python_callable=lambda: print("Executing Task E"),
    dag=dag,
)

# Define task dependencies
task_a >> task_d
task_b >> task_d
task_c >> task_d
task_d >> task_e



3) Setup and Tear Down:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


dag = DAG(
    'setup_teardown',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

# Define tasks
create_cluster = PythonOperator(
    task_id='create_cluster',
    python_callable=lambda: print("Creating Cluster"),
    dag=dag,
)

run_query1 = PythonOperator(
    task_id='run_query1',
    python_callable=lambda: print("Running Query 1"),
    dag=dag,
)

# run_query2 = PythonOperator(
#     task_id='run_query2',
#     python_callable=lambda: print("Running Query 2"),
#     dag=dag,
# )

run_query2 = PythonOperator(
    task_id='run_query2',
    python_callable=lambda: raise_exception("Failure in Query 2"),
    dag=dag,
)


run_query3 = PythonOperator(
    task_id='run_query3',
    python_callable=lambda: print("Running Query 3"),
    dag=dag,
)

delete_cluster = PythonOperator(
    task_id='delete_cluster',
    python_callable=lambda: print("Deleting Cluster"),
    dag=dag,
)

# Define task dependencies
create_cluster >> [run_query1, run_query2]
[run_query1, run_query2] >> run_query3
run_query3 >> delete_cluster.as_teardown(setups=create_cluster)


4) Latest Only:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago

dag = DAG(
    'LatestOnly',
    default_args={'start_date': days_ago(3)},
    schedule_interval='0 2 * * *',
    catchup=True,
)

# Define tasks
task_a = PythonOperator(
    task_id='task_a',
    python_callable=lambda: print("Executing Task A"),
    dag=dag,
)

task_b = PythonOperator(
    task_id='task_b',
    python_callable=lambda: print("Executing Task B"),
    dag=dag,
)

# Send email based on success
send_email_completed = EmailOperator(
    task_id="send_email_completed",
    to="{{ var.value.get('support_email') }}",
    subject="UK Sales Data Load - Successful",
    html_content="UK Sales Data Load Completed Successfully.",
    dag=dag,
)

# Send email based on failure
send_email_failed = EmailOperator(
    task_id="send_email_failed",
    to="{{ var.value.get('support_email') }}",
    subject="UK Sales Data Load  - Failed",
    html_content="UK Sales Data Load Failed. Please check the logs for more details.",
    dag=dag,
    trigger_rule="all_failed",
)

# Task for LatestOnlyOperator branch
latest_only = LatestOnlyOperator(
    task_id="latest_only",
    dag=dag)

# Connect tasks
task_a >> task_b >> send_email_failed
task_b >> latest_only >> send_email_completed



5) Depends on Past:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


dag = DAG(
    'depends_on_past',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False,
)

# Define tasks
task_a = PythonOperator(
    task_id="task_a",
    python_callable=lambda: print("Executing Task A"),
    dag=dag,
)

task_b = PythonOperator(
    task_id="task_b",
    python_callable=lambda: print("Executing Task B"),
    depends_on_past=True,
    dag=dag,
)


# Connect tasks
task_a >> task_b

Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model