Airflow Tutorial - Hooks | Hooks vs Operators | airflow hooks example | When and How to use


 


Hello Data Pros, 

In our last blog, we uncovered the need for airflow X-coms! and demonstrated how to leverage them effectively in your dags!

Today, we're shifting our focus to Airflow hooks! 

We’re going to cover what hooks are! How they differ from Airflow operators! Lastly, when and how to use hooks, in your dags!

Let's dive right in!

 

Technically, Hooks are pre-built Python classes.

They simplify our interactions with external systems and services.

For instance, the popular S3Hook, which is part of the AWS provider package, offers various methods to interact with S3 Storage.

 

For example, the create bucket method, Creates an Amazon S3 bucket!

Load string method – can load a string value as a file in S3!

Delete objects method - can be used to delete an S3 file.

 

Now, let's dive into the source code of this Hook!

As you can see, it's low-level Python code. And if AWS has not provided this hook, you might find yourself having to write all this complex code on your own.

But thanks to Airflow and its providers ecosystem! these codes have already been developed and tested. This means you can simply import the appropriate hook and start using the underlying methods immediately, without worrying too much about the complex code underneath.

 

You might be wondering about the distinction between Hooks and Operators!

Well, both Hooks and Operators are essentially Python classes. However, the main difference lies in their level of abstraction or encapsulation! Hooks wrap around or abstract low-level Python code!

On the other hand, Operators wrap around or abstract Hooks!

 

For instance, here's an operator from the AWS provider package. This operator uses and wraps around the copy object method from the S3 hook.

 

Now, the next natural question is: When should we use an operator, and when should we use a hook?

 

If an operator is available for the task you want to do, use the operator.

If an operator is not available, the next option is to look for hooks that can get your work done.

If both the operator and hooks are not available, then you should write your own Python code to accomplish the task.

 

Now, let's see how to use a hook directly in your dag code.

Please consider this use case, where I want to perform custom transformations on an S3 file, and load the transformed file back to S3.

 

I've defined a custom transformation function, utilizing pre-built methods of the S3Hook.

I reed the file using the read key method,

apply necessary transformations,

and with the load string method, I upload the transformed file back to S3.

 

 

Within the actual task, I call the custom transformation function, and pass the bucket name, source file name, and target file name as parameters.

 

Let's try to execute the dag.

 

It’s completed successfully, and the transformed file has been uploaded to S3 as expected.

 

That's all for today! stay tuned for our next video where we’ll explore more advanced airflow features!

Please do like the video and subscribe to our channel.

If you’ve any questions or thoughts, please feel free to leave them in the comments section below.



from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.utils.dates import days_ago

# Define your custom transformation function using Hooks
def custom_transformation(bucketname, sourcekey, destinationkey):
    s3_hook = S3Hook(aws_conn_id='aws_conn')
    # Read S3 File
    content = s3_hook.read_key(bucket_name=bucketname, key=sourcekey)

    # Apply Custom Transformations
    transformed_content = content.upper()

    # Load S3 File
    s3_hook.load_string(transformed_content, bucket_name=bucketname, key=destinationkey)

# Define DAG
dag = DAG(
    'cust_s3_trans',
    default_args={'start_date': days_ago(1)},
    schedule_interval='0 23 * * *',
    catchup=False
)

# Define your tasks
transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=custom_transformation,
    op_args=['sleek-data', 'oms/customer.csv', 'oms/customer_transformed.csv'],
    dag=dag,
)

# Define Dependencies
transform_task




Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model