Airflow DAGs, Operators, Tasks & Providers


Hello Data Pros, 

In our last blog, we demonstrated step-by-step installation of Apache Airflow on a Windows PC, and successfully executed our very first Airflow dag!

Now, it's time to dive deeper!

In this video, we'll learn about the airflow configuration file!

Explore each section inside a dag!

Understand various Operator types!

Experience the power of provider packages!

Let's begin right away!

 

As we already know, airflow dags are coded in Python language.

 

Every Airflow setup has a ‘dags folder’. You can set this folder path in the Airflow configuration file, named airflow dot cfg.

 

In addition to the dags folder, this configuration file has many other settings that you can customize to meet your needs.

 

For example, to enable my Airflow instance to send email notifications, I added another Docker container in my docker compose. This new container will locally host a simple SMTP server. I then updated the Airflow configuration file to use the corresponding Docker service name as the SMTP host.

 

Let's now take a closer look at each section inside a dag!

In general, we begin with import statements.

We should always import the dag class,

In addition, import the airflow operators that you are planning to use in your tasks.

 

We may also need to import any functions you use, whether they are built-in or user-defined.

 

For example, in my case, I've developed a custom Python function called clean data, which is placed in the plugin folder. Because I intend to use the clean data function in this dag, I've included it in the imports section.

 

Next, create a dag object and configure the dag-level parameters, such as start date, end date, schedule interval, backfill, catchup and more.

For a complete list of dag parameters and their purpose, please refer to this official Airflow documentation link in the video description.

 

Next, we create tasks.

A task is created by instantiating a specific operator and providing the necessary task-level parameters.

The parameters differ depending on the operator used, so always refer to the relevant documentation for a list of the task level parameters that you can use.

 

At the end, we define the task dependencies!

There are several methods for establishing dependencies between tasks:

 

Using Bitshift operators.

With Set upstream and set downstream functions.

or with the use of chain function!

 

In addition, when using the TaskFlow API, dependencies are automatically inferred based on the sequence of task function calls. We’ll cover the TaskFlow API in our later videos.

 

Let’s try to execute this dag in the airflow UI.

It has completed successfully, and I can validate the respective logs as well.

 

 

Alright, let's understand the difference between an operator and a task.

In simpler terms, think of an operator as a blueprint or design template, while tasks are implementations of the blueprint.

In Python or object-oriented programming terms, operator is a class, and tasks are objects created from the class!

 

At a high level, we can categorize these operators into three main groups:

1. Action Operators!

These operators execute a specific function or task. For instance, the BashOperator, which is used for running Bash commands!

The PythonOperator, which lets you run Python code!

Azure DataFactory Run Pipeline Operator, which is used to Execute Azure Data Factory pipeline!

 

2. Transfer Operators!

These operators are used for moving data from one place to another. An excellent example of this is the S3ToRedshiftOperator. It does exactly what it sounds like. It moves data from Amazon S3 to Amazon Redshift.

 

3. Sensor Operators!

Sensors wait for a specific condition to be met, before triggering the subsequent tasks in the workflow.

An example of a sensor is the S3KeySensor, which waits for one or more files to be created in a S3 bucket.

The AWS Redshift Cluster Sensor waits for a Redshift cluster to reach a specific status.

 

It's worth mentioning that Airflow offers a vast number of operators. This list highlights just a few of the most common ones.

 

Not all packages are included in the default Airflow installation. For instance, if we attempt to use the GithubOperator in our code, we may encounter an issue.

 

Let’s go ahead and try!

 

When you return to the UI, you should observe a "dag Import Error" similar to the one displayed here.

 

This error is caused by a missing provider package. While some provider packages are included with Airflow, you may encounter situations where you need to install additional ones.

 

Please refer to this link in the video description for the complete list of Airflow provider packages.

 

In our setup you can simply add the missing provider package name over here in the docker file.

Alternatively, we can create a requirements file and include all packages one after another.

 

For this change to take effect, we should rebuild the docker image.

Let’s restart the docker containers.

 

Now I no longer see an error, and my dag executes successfully.

 

This plug-n-play extendibility, and the availability of wide range of provider packages make Airflow an exceptionally powerful and versatile platform.

 

That's all for today! Please stay tuned for our next video where we’ll explore Airflow Variables and Connections.

Please do like the video and subscribe to our channel.

If you’ve any questions or thoughts, please feel free to leave them in the comments section below.



# exchange_rate_pipeline.py

# Imports
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from clean_data import clean_data

# Define or Instantiate DAG
dag = DAG(
    'exchange_rate_etl',
    start_date=datetime(2023, 10, 1),
    end_date=datetime(2023, 12, 31),
    schedule_interval='0 22 * * *',
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
    catchup=False
)

# Define or Instantiate Tasks
download_task = BashOperator(
    task_id='download_file',
    bash_command='curl -o xrate.csv https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata',
    cwd='/tmp',
    dag=dag,
)

clean_data_task = PythonOperator(
    task_id='clean_data',
    python_callable=clean_data,
    dag=dag,
)

send_email_task = EmailOperator(
    task_id='send_email',
    to='sleekdatasolutions@gmail.com',
    subject='Exchange Rate Download - Successful',
    html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.',
    dag=dag,
)

# Define Task Dependencies
download_task >> clean_data_task >> send_email_task



# git_repo_dag.py

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.github.operators.github import GithubOperator
from airflow.operators.dummy import DummyOperator
import logging

# Define the DAG
dag = DAG(
    'git_repo_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 21 * * *',
    catchup=False
)

# Start Dummy Operator
start = DummyOperator(task_id='start', dag=dag)

# List GitRepository Tags
list_repo_tags = GithubOperator(
    task_id="list_repo_tags",
    github_method="get_repo",
    github_method_args={"full_name_or_id": "sunapana2003/airflow-demo"},
    result_processor=lambda repo: logging.info(list(repo.get_tags())),
    dag=dag,
)

# End Dummy Operator
end = DummyOperator(task_id='end', dag=dag)

# Define task dependencies
start >> list_repo_tags >> end


# clean_data.py

import os
import pandas as pd

def clean_data():

    # Load raw data into DataFrame  
    data = pd.read_csv('/tmp/xrate.csv', header=None)

    # Cleanse Data
    default_values = {
        int: 0,
        float: 0.0,
        str: '',
    }

    cleaned_data = data.fillna(value=default_values)

    # Get the current date components
    now = pd.Timestamp.now()
    year = now.year
    month = now.month
    day = now.day

    # Create the directory path if it doesn't exist
    data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}'
    os.makedirs(data_dir, exist_ok=True)

    # Save the cleaned data to a new file
    cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)

# dockerfile

FROM apache/airflow:latest
USER root

RUN apt-get update && \
    apt-get -y install git && \
    apt-get clean

USER airflow

# Install provider packages from requirements.txt
COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt


# docker-compose.yml

version: '3'

services:
  sleek-airflow:
    image: sleek-airflow:latest
    volumes:
      - ./airflow:/opt/airflow
    ports:
      - "8080:8080"
    command: airflow standalone
  sleek-smtp:
    image: bytemark/smtp
    restart: always



Comments

Popular posts from this blog

How to Install Airflow on Windows

How to Install DBT and Set Up a Project, Create Your First dbt Model