Apache Airflow Taskflow API | Airflow Decorators

 Hello Data Pros, welcome back to another episode of our Apache Airflow series!




Today, we're taking a deep dive into the world of Task flow API!

 

The Task flow API was introduced to Airflow, starting from version 2.0, aiming to simplify the dag authoring experience by significantly reducing the number of code lines needed by developers! this also makes the code more modular, understandable and easily maintainable!

 

The Taskflow API was built based on the concept of Python decorators! So to understand Taskflow API, you first need to know the basics of decorators.

 

Think of a decorator as a special function that takes another function as its input and returns a modified version of that original function.

 

It's like wrapping a gift: the decorator adds extra features to the original function.

 

Decorator is applied using the @ symbol, above a function definition.

 

In our case, ‘decorator example’ decorates the ‘say hello’ function.

 

Let's execute and observe the results. When I call the original function, you can see that the decorator's feature is also in effect.

For added clarity, let's remove the decoration from the function definition. Now, the original function behaves as usual.

 

Decorator is useful when applying common features to multiple functions, without code repetition.

This also keeps the code clean and organized by separating extra features and complex logics from core function.

 

Airflow has adopted this concept, and introduced various pre-built decorators.

Among them, one of the most popular and widely used decorators is @ task, which serves as an efficient alternative for traditional Python operators.

 

Let's see this in action!

Here's a dag coded in the traditional approach!

And here is the same dag coded with the @task decorator.

 

The first noticeable difference is the significant reduction in the number of code lines.

 

In the traditional approach, explicit coding of x-com logics for pushing and pulling data between tasks, along with the need to explicitly define dependencies between tasks, is required.

 

However, in the Taskflow API, all these boilerplate code lines are not necessary. They are cleverly hidden, or wrapped inside the @ task decorator by Airflow.

 

Its time to validate this in the Airflow UI.

 

As you can see, both these dags has produced exact same results!

 

Just like @ task, you can experiment with other prebuilt decorators based on your project's use case.

 

Before we wrapup the video, I'd like to break a common misconception, it's not always this or that! you can in fact create a hybrid dag that involves a combination of Taskflow API and traditional operators!

As always, all the code lines mentioned in this video are available in the link provided in the description.

 

In summary, the Taskflow API emerges as an invaluable feature in Airflow, promising significant reductions in development and maintenance efforts for your project!

 

That's all for today! Please stay tuned for our next video where we’ll explore more advanced airflow features!



1) Task Flow API:
from airflow import DAG

from airflow.decorators import task
from airflow.utils.dates import days_ago

@task
def extract():
    # Extract logic here
    return "Raw order data"
@task
def transform(raw_data):
    # Transform logic here
    return f"Processed: {raw_data}"
@task
def validate(processed_data):
    # Validate logic here
    return f"Validated: {processed_data}"
@task
def load(validated_data):
    # Load logic here
    print(f"Data loaded successfully: {validated_data}")

dag = DAG(
    'taskflow_api',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

with dag:
    load_task = load(validate(transform(extract())))



2) Traditional:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def extract(ti=None, **kwargs):
    # Extract logic here
    raw_data = "Raw order data"
    ti.xcom_push(key="raw_data", value=raw_data)

def transform(ti=None, **kwargs):
    # Transformation logic here
    raw_data = ti.xcom_pull(task_ids="extract", key="raw_data")
    processed_data = f"Processed: {raw_data}"
    ti.xcom_push(key="processed_data", value=processed_data)

def validate(ti=None, **kwargs):
    # Validation logic here
    processed_data = ti.xcom_pull(task_ids="transform", key="processed_data")
    validated_data = f"Validated: {processed_data}"
    ti.xcom_push(key="validated_data", value=validated_data)

def load(ti=None, **kwargs):
    # Load logic here
    validated_data = ti.xcom_pull(task_ids="validate", key="validated_data")
    print(f"Data loaded successfully: {validated_data}")

dag = DAG(
    'traditional_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract,
    dag=dag,
)

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
    dag=dag,
)

validate_task = PythonOperator(
    task_id="validate",
    python_callable=validate,
    dag=dag,
)

load_task = PythonOperator(
    task_id="load",
    python_callable=load,
    dag=dag,
)

# Set Dependencies
extract_task >> transform_task >> validate_task >> load_task

 


3) Hybrid:

from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

@task
def extract():
    # Extract logic here
    return "Raw order data"

@task
def transform(raw_data):
    # Transform logic here
    return f"Processed: {raw_data}"

@task
def validate(processed_data):
    # Validate logic here
    return f"Validated: {processed_data}"

@task
def load(validated_data):
    # Load logic here
    print(f"Data loaded successfully: {validated_data}")

dag = DAG(
    'hybrid_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

with dag:
    load_task = load(validate(transform(extract())))

    snowflake_task = SnowflakeOperator(
        task_id='Snowflake_task',
        sql='select 1',
        snowflake_conn_id='snowflake_conn',
    )

    load_task >> snowflake_task





Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model