Airflow Tutorial - Variables and Connections



Hello Data Pros, 

In our last blog, we covered the fundamental concepts of Apache Airflow, including dags, tasks and operators!

In addition, we demonstrated the importance of the airflow configuration file, and the provider packages!

 

Today, we’ll learn about the power of Airflow Variables and Connections and how to use them effectively within your Dags.

 

Let's start with Apache Airflow Variables!

Variables are like little storage containers for values, that you can reuse across your dags.

Instead of hardcoding values, you can store them as variables, and reference them with its name whenever needed.

 

Technically each variable is a key and a value pair. You can think of the key as the variable name and the value as the data it holds.

 

There are two types of Airflow variables!

One, Regular variables, Where the value can be any string.

Two, JSON variables, where the value is a JSON string.

 

Let’s consider I have a dag, that includes a hard-coded support email address, and a web endpoint with an access key, which is a sensitive information.

 

For these cases, it is recommended to create variables to improve security and code maintainability.

 

To create and manage variables, go to the Airflow UI, click on 'Admin,' and then select 'Variables.'

Here you can create and manage your variables.

 

Let's create our first variable, support email.

Please note that the variables we create here can be used across multiple dags.

Next, create a web access key variable, for the web endpoint along with the sensitive access key.

Did you notice, this value was automatically masked by airflow! I’ll later explain how airflow handles this.

 

Finally, just to show you how json type variable looks like, let's create a variable named process interval.

As you can see, this holds the values for the start date and end date.

 

Now it’s time to implement these variables in our dags!

Please use var.value.get builtin function to fetch the values of the required variables.

Let’s try to execute the dag.

Cool! this works as expected!

 

Variables offer many advantages,

They make implementing changes easy.

For instance, if you need to change the email address, you can easily do so by changing the support email variable. This approach is much easier than having to modify each dag, in case the email address had been hardcoded.

 

You can set different values for variables in different airflow environments, such as dev, test, and production.

Airflow variables are stored securely in the Airflow database, and can also be encrypted if required.

Furthermore, if your variable name includes specific keywords, then airflow automatically masks their values, in both the Airflow UI and logs, to protect sensitive information.

 

Variables provide the flexibility to dynamically change dag’s behavior during runtime, catering to the requirements such as data processing for different dates, or even different set of files.

For example, utilizing start-date and end-date values from the process interval json variable, allows us to dynamically adjust the dag to process different sets of data.

 

Next, let's talk about Airflow Connections.

Airflow is often used for transferring data to and from external systems like databases, cloud providers, or even third-party APIs.

And hence it offers a robust "Connection" feature, designed for securely storing details required for connecting with external systems.

A Connection is essentially set of parameters, such as username, password, hostname etc…, along with the type of system that it connects to, and a unique name, called the connection id.

 

To create and manage connections, you can go to the Airflow UI, click on 'Admin,' and then select 'Connections.'

Here you can define various types of connections.

 

In case you cannot find the specific connection type, please refer back to the provider packages section in our previous video.

Basically, you'll need to install the relevant provider packages and then restart the Airflow instance.

 

Consider a scenario where I need to connect to a Snowflake database and perform a specific tasks as part of my workflow.

I should first create a Snowflake connection, and subsequently refer the connection ID when configuring the task.

 

The specific details you need to supply may vary depending on the connection type.

After filling in the details, remember to save them. You can also test the connection before using it in your dags.

 

Just like variables, connections also offer several advantages:

They can be reused across different dags.

We can set a particular connection id with different values for different environments (like development, testing and production).

Centralized connection management makes implementing changes easy.

Additionally, they provide a high level of security.

 

 

Let's see how an airflow code with connections looks.

In the dag, I can simply use the connection ID as a task parameter.

 

Just as an additional information, this SQL file is located in the sql subfolder within the main dags folder.

Of course, we can directly write the SQL code here, but this approach makes the code more modular and manageable.

 

Let’s try to execute the dag.

 

All works as expected!

 

Please note, the operators that work with two different systems typically require two connection IDs.

 

For example, the SnowflakeToSlackOperator uses both snowflake-connection-id as well as slack-connection-id.

 

In summary, both Airflow variables and connections are powerful tools that can help you make your workflows more efficient, secure, and maintainable!

 

That's all for today! Please stay tuned for our next video where we’ll explore Airflow Sensors and hooks.

Please do like the video and subscribe to our channel.

If you’ve any questions or thoughts, please feel free to leave them in the comments section below.



############################################

#           Variables Example    #

############################################


# Import

from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.email_operator import EmailOperator

from airflow.utils.dates import days_ago


# Define the DAG

dag = DAG(

    'live_exchange_rates',

    default_args={'start_date': days_ago(1)},

    schedule_interval='0 21 * * *',

    catchup=False

)


# Define the Tasks

fetch_exchange_rates = BashOperator(

    task_id='fetch_exchange_rates',

    bash_command="curl '{{ var.value.get('web_api_key') }}' -o xrate.json",

    cwd='/tmp',

    dag=dag,

)


send_email_task = EmailOperator(

    task_id='send_email',

    to="{{ var.value.get('support_email') }}",

    subject='Live Exchange Rate Download - Successful',

    html_content='Live Exchange Rate data has been successfully downloaded',

    dag=dag,

)




# Define the Dependencies

fetch_exchange_rates >> send_email_task



############################################

#          Connections Example    #

############################################

# Imports

from airflow import DAG

from airflow.utils.dates import days_ago

from airflow.operators.email_operator import EmailOperator

from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator


# Define the DAG

dag = DAG(

    'load_profit_uk',

    default_args={'start_date': days_ago(1)},

    schedule_interval='0 21 * * *',

    catchup=False

)


# Define the Task

load_table = SnowflakeOperator(

    task_id='load_table',

    sql='./sqls/profit_uk.sql',

    snowflake_conn_id='snowflake_conn_id',

    dag=dag

)


send_email = EmailOperator(

    task_id='send_email',

    to="{{ var.value.get('support_email') }}",

    subject='UK profit table load - Successful',

    html_content='UK Sales table to Profit table Load Completed',

    dag=dag,

)



# Define the Dependencies

load_table >> send_email


############################################

#   profit_uk.sql/json variable example    #

############################################


create or replace table SLEEKMART_OMS.TRAINING.profit_uk 

as ( 

  SELECT 

    sales_date, SUM(quantity_sold * unit_sell_price) as total_revenue,

    SUM(quantity_sold * unit_purchase_cost) as total_cost,

    SUM(quantity_sold * unit_sell_price) - SUM(quantity_sold * unit_purchase_cost) as total_profit

  FROM 

    SLEEKMART_OMS.TRAINING.sales_uk

  WHERE

    sales_date BETWEEN '{{ var.json.process_interval.start_date }}' AND '{{ var.json.process_interval.end_date }}'

  GROUP BY 

    sales_date

)


Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model