Datasets and data-aware scheduling - Apache Airflow

 


Hello Data Pros, 

In our last blog, we explored airflow hooks, and discussed how they differ from Airflow operators!

 

Today, we'll focus on Airflow datasets!  and demonstrate the workings of data-aware scheduling and cross-dag dependencies!

 

Data-aware scheduling is a feature of Apache Airflow, that enables the execution of dags based on the availability of files or datasets.

This stands in contrast to the time-based scheduling that we're already familiar with through our previous examples.

 

A dataset is a logical representation of the underlying data.

You can simply create a Dataset in airflow, by instantiating the dataset class.

 

Within the dataset you can use the complete URI of your physical data set, or just a descriptive string.

Even if you give the complete URI, the airflow does not try to access, or validate the data represented by the dataset URI.

 

Instead, it’s just treated as a string or identifier, and be used to establish producer and consumer relationships between your dags.

 

You might be wondering what key benefits dataset feature offer!

Well, datasets can help resolve many common use cases in Airflow:

For example, imagine a data engineering team with a dag that creates a file, and an analytics team with another dag that processes the same. The datasets feature can seamlessly connect such producer-and-consumer dags.

 

With the help of datasets, you can easily implement cross-dag dependencies. Even if your dags don't depend on files, you can create a dependency between dags just by defining a dummy dataset.

 

In addition, leveraging datasets can contribute to a reduction in Airflow running costs. Especially because datasets operate without utilizing worker slots, a key distinction from sensors and other similar implementations that require worker-or-triggerer slots.

 

 

Let’s explore a sample airflow dags to further understand this feature.

 

This is my producer dag, where the data engineering team ingests the exchange rate data to a S3 bucket.

 

When you define a task with an 'outlets' parameter, Airflow designates the task as a producer task, responsible for updating the dataset. This holds true even if the task doesn't actually create or update the referenced dataset.

 

And this’s a code for consumer dag,

Soon after latest exchange rate files is loaded into the S3 bucket, the data analyst team wanted to transform the data and load it into the snowflake table.

 

Any dag that’s scheduled with a dataset is considered a consumer dag, even if that dag doesn't access the referenced dataset.

 

In our example the data analyst team uses a sql file to transform and load the data into snowflake.

 

Let’s try to explore these 2 dags in the airflow UI!

As you can see, the consumer dag has schedule information presented as a dataset, instead of time-based scheduling details.

And this dag’ll trigger automatically, whenever the producer dag or task completes successfully.

 

 

For the purpose of this demo, we shall execute the producer dag manually.

Here you go, as soon as producer dag is successful, it’s automatically triggered the consumer dag.

 

Everything proceeded smoothly as expected.

 

You can find more information about all datasets in your Airflow by checking the Datasets tab. This includes details about when the dataset trigger was activated at different times.

 

You can also navigate to the 'Browse' section, and select dag dependencies', to visualize all cross-dag dependencies in your Airflow environment.

 

It’s worth mentioning that you can in fact trigger a dag based on more than one producer dags or datasets.

 

Simply list all dependent datasets as comma-separated values. In this scenario, the dag will be scheduled to run once all the corresponding producer tasks have been successfully completed.

 

That's all for today! stay tuned for our next blog where we’ll explore more advanced airflow features!

Please do like the video and subscribe to our youtube channel.

If you’ve any questions or thoughts, please feel free to leave them in the comments section below.

Thanks for watching!

 

from airflow import DAG, Dataset
from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
from airflow.utils.dates import days_ago
from airflow.models import Variable

dag = DAG(
    "data_producer_dag",
    default_args={"start_date": days_ago(1)},
    schedule_interval="0 23 * * *",
)

http_to_s3_task = HttpToS3Operator(
    task_id="data_producer_task",
    endpoint=Variable.get("web_api_key"),
    s3_bucket="sleek-data",
    s3_key="oms/xrate.json",
    aws_conn_id="aws_conn",
    http_conn_id=None,
    replace=True,
    dag=dag,
    outlets=[Dataset("s3://sleek-data/oms/xrate.json")]
)



# Imports
from airflow import DAG, Dataset
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

# Define the DAG
dag = DAG(
    'data_consumer_dag',
    default_args={'start_date': days_ago(1)},
    schedule=[Dataset("s3://sleek-data/oms/xrate.json")],

#   We usually code cron-based (or time based) schedule_interval as below:
#   schedule_interval="0 23 * * *",
)

# Define the Task
load_table = SnowflakeOperator(
    task_id='data_consumer_task',
    sql='./sqls/xrate_sf.sql',
    snowflake_conn_id='snowflake_conn',
    dag=dag
)



INSERT INTO exchange_rate_hist
SELECT
T1.$1:timestamp::TIMESTAMP_NTZ AS timestamp,
T1.$1:base::VARCHAR AS base_currency,
T2.$2 AS exchange_currency,
T2.$5 AS exchange_rate
FROM @MY_S3_STAGE/xrate.json (file_format => 'my_json_format') T1,
     lateral flatten(input => $1, path => 'rates') T2

Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model