Airflow Tutorial - Sensors | What are Airflow sensors | How sensors Work | Examples | s3keysensor

 


Hello Data Pros, 

In our previous blog, we explored the power of Airflow Variables and Connections and how to use them effectively in your Dags.

Today, we're going to deep dive into the world of Airflow sensors!

 

As we mentioned already, Airflow sensors are a special type of operator that are designed to wait for a specific condition to be met!

At regular intervals, they check to see if the condition is met.

Once it’s met, the corresponding task is marked successful, allowing their downstream tasks to execute.

Sensors make your Dags more event-driven, scattering use cases such as when a task needs to wait for a file to be created, Or a database table to be updated, or an external API to become available.

 

Here is a simple Dag consisting of two tasks. The first task uses S3KeySensor, which waits for a file to be available in the AWS S3 bucket. Once the file is ready, the next task loads the file into a Snowflake table.

 

Let's see how it works in the Airflow UI.

Though it's scheduled to run daily at 11 p.m; for the purpose of this demo, I'll run it manually.

The first task is currently running, and if we check the corresponding log, it's poking, or checking for a specific file!

I'll pause the video for 5 minutes and return.

The task remains running, consistently poking at 1-minute intervals.

Unfortunately, the required file is not yet available in the S3 bucket.

Let's go ahead and upload the file.

Once the file is successfully uploaded, the very next poke detects it and marks the first task as completed.

As a result, it allows the downstream task to execute, and load the file to the Snowflake table.

 

Great, all worked well, and the data has been loaded into the Snowflake table successfully.

 

In addition to the basic parameters, you can use these extra parameters which are common for almost all sensors.

 

Let’s start with Poke Interval. This parameter allows you to define the interval in seconds, at which the sensor checks for the condition.

Next, we have mode; which can either be poke or reschedule.

To understand these modes, let's revisit our previous execution.

By default, the sensor operates in 'poke' mode!

As you can see, throughout the sensor's execution duration, one of my worker slots was fully occupied, even though we're aware that this task spends the majority of its time in a sleeping state.

 

This can be problematic in production, especially when running hundred plus sensors from different Dags simultaneously.

 

These sensors can fully occupy all your worker slots, leaving no room for additional dag runs.

 

One solution to address this issue is to switch to 'reschedule' mode.

 

In 'reschedule' mode, worker slots are released and reallocated only when the sensor actively checks for file availability, rather than holding a worker slot for the entire execution duration.

 

While this does improve worker slot usage efficiency to some extent, there is still room for improvement. Deferrable operators and triggers that we will discuss in our next video offer further optimization.

 

Next, let's talk about the 'timeout' parameter.

What if the condition that your sensor is checking is never going to happen? Will Airflow keep checking indefinitely?

Answer is No! By default, after checking for 7 days, airflow will fail the task, and subsequently the dag.

 

You can customize this duration, as per your need using the time out parameter.

Next, we have soft fail.

This parameter is related to the time out we just discussed. If you don't specify 'soft fail,' the sensor will fail after the timeout period, causing the overall dag to fail.

 

However, when you set 'soft fail' to true: after the timeout period, the sensor task is just marked as skipped, and the overall status of the Airflow dag is marked as a success.

 

Airflow offers a wide range of sensors to address vast number of use cases. Here I’ve just listed some of the most popular ones!

 

That's all for today! Please stay tuned for our next video where we’ll explore Deferrable Operators & Triggers.

Please do like the video and subscribe to our channel.

If you’ve any questions or thoughts, please feel free to leave them in the comments section below.

Thanks for watching!


from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

dag = DAG(
    's3_to_snowflake_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 23 * * *',
    catchup=False
)

# Wait for the file in S3
wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_name='sleekdata',
    bucket_key='oms/employee_details.csv',
    aws_conn_id='aws_conn',
    poke_interval=60 * 10,
    mode="reschedule",
    timeout= 60 * 60 * 5,
    soft_fail=True,
    dag=dag
)

# Parameter       | Default Value              #
# ---------------------------------------------#
# poke_interval   | 60 Seconds                 #
# mode            | poke                       #
# timeout         | 7 days (60 * 60 * 24 * 7)  #
# soft_fail       | False                      #

# Load the file from S3 to Snowflake
load_table = CopyFromExternalStageToSnowflakeOperator(
    task_id="load_s3_file_to_table",
    snowflake_conn_id="snowflake_conn",
    files=["employee_details.csv"],
    table="SLEEKMART_OMS.L1_LANDING.employee_details",
    stage='my_s3_stage',
    file_format="(type = 'CSV',field_delimiter = ',', skip_header = 1)",
    dag=dag
)

# Set the dependencies
wait_for_file >> load_table


Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model