Snowflake Tasks | Types | Examples | snowflake tasks and streams

Hello Data Pros, and welcome back to another exciting episode of our Snowflake learning series! In our previous video, we’ve covered Snowflake Streams; and demonstrated their role in change data capture.

Today, we will explore Snowflake Tasks, which are essential for automating a variety of data processes within Snowflake.



------------------- Follow the link in the description for the SQLs used in this lab -------------------

---------------------------------------------------------------------------
---------------- Setting Up Foundational Objects for the Lab---------------
---------------------------------------------------------------------------

--Create Database and Schema
CREATE OR REPLACE DATABASE order_db;
CREATE OR REPLACE SCHEMA order_db.order_schema;

-- Create Tables
USE SCHEMA order_db.order_schema;

CREATE or replace TABLE order_raw (
    order_id INT,
    order_date DATE,
    cust_fname VARCHAR(50),
    cust_lname VARCHAR(50),
    product_id INT,
    quantity INT,
    unit_price DECIMAL(10, 2),
    discounts DECIMAL(10, 2)
);

CREATE OR REPLACE TABLE order_analytics (
    order_id INT,
    order_date DATE,
    cust_name VARCHAR(100),
    product_id INT,
    total_price DECIMAL(10, 2)
);

CREATE OR REPLACE NOTIFICATION INTEGRATION my_email_integration
  TYPE=EMAIL
  ENABLED=TRUE
  ALLOWED_RECIPIENTS=('tuftech007@outlook.com');
 

CREATE OR REPLACE STAGE order_raw_ext_stg
  URL='s3://s3explore/'
  CREDENTIALS=(AWS_KEY_ID='your_AWS_KEY_ID' AWS_SECRET_KEY='your_AWS_SECRET_KEY');
 
--Create Steam
CREATE OR REPLACE STREAM order_raw_stream ON TABLE order_raw;


---------------------------------------------------------------------------
-------------------------- Standalone TASK setup --------------------------
---------------------------------------------------------------------------


CREATE OR REPLACE TASK load_order_analytics
  WAREHOUSE = compute_wh
  SCHEDULE = 'USING CRON 0/5 * * * * UTC'        --- or SCHEDULE = '5 MINUTE'
  AS
  INSERT INTO order_analytics
  SELECT
      order_id,
      order_date,
      CONCAT(cust_fname, ' ', cust_lname) AS cust_name,
      product_id,
      (quantity * unit_price) - discounts AS total_price
  FROM order_raw_stream;



ALTER TASK load_order_analytics RESUME;


---------------------------------------------------------------------------
----------------------Multiple TASKs Chained Together----------------------
---------------------------------------------------------------------------

-- Task 1: copy_data
CREATE OR REPLACE TASK copy_data
  WAREHOUSE = compute_wh
  SCHEDULE = '5 MINUTE'
  AS
  COPY INTO order_raw
  FROM @order_raw_ext_stg
  FILE_FORMAT = (TYPE = 'CSV');
 

-- Task 2: transform_data
CREATE OR REPLACE TASK transform_data
  WAREHOUSE = compute_wh
  AFTER copy_data
  WHEN SYSTEM$STREAM_HAS_DATA('order_raw_stream')
  AS
  INSERT INTO order_analytics
  SELECT
      order_id,
      order_date,
      CONCAT(cust_fname, ' ', cust_lname) AS cust_name,
      product_id,
      (quantity * unit_price) - discounts AS total_price
  FROM order_raw_stream;


-- Task 3: send_email_report
CREATE OR REPLACE TASK send_email_report
  WAREHOUSE = COMPUTE_WH
  AFTER transform_data
  AS
  BEGIN
    CALL SYSTEM$SEND_EMAIL(
      'my_email_integration',
      'tuftech007@outlook.com',
      'Regular ORDER Data Processing Report',
      'The Regular ORDER data processing is complete.'
    );
  END;


ALTER TASK send_email_report RESUME;
ALTER TASK transform_data RESUME;
ALTER TASK copy_data RESUME;


select * from order_analytics;






---------------------------------------------------------------------------
--------------------------External Table Approach--------------------------
---------------------------------------------------------------------------



CREATE OR REPLACE EXTERNAL TABLE order_raw_ext_table (
    order_id INT AS (VALUE:c1::INT),
    order_date DATE AS (VALUE:c2::DATE),
    cust_fname VARCHAR(50) AS (VALUE:c3::VARCHAR(50)),
    cust_lname VARCHAR(50) AS (VALUE:c4::VARCHAR(50)),
    product_id INT AS (VALUE:c5::INT),
    quantity INT AS (VALUE:c6::INT),
    unit_price DECIMAL(10, 2) AS (VALUE:c7::DECIMAL(10, 2)),
    discounts DECIMAL(10, 2) AS (VALUE:c8::DECIMAL(10, 2))
)
WITH LOCATION = @order_raw_ext_stg
AUTO_REFRESH = TRUE
FILE_FORMAT = (TYPE = 'CSV');

-- Verify that the external table is created and accessible
select * from order_raw_ext_table;

-- Display all external tables in the current database
SHOW EXTERNAL TABLES;

-- Instructions for setting up event notifications in AWS S3:
-- 1. Copy the notification_channel URL from the output of SHOW EXTERNAL TABLES.
-- 2. Log in to the AWS Console and navigate to your S3 Bucket properties.
-- 3. Go to the Event Notifications section and click Create event notification.
-- 4. Name the event as desired.
-- 5. For Event types, select ObjectCreated (All) and ObjectRemoved.
-- 6. For Destination, select 'SQS queue'. Then select 'Enter SQS queue ARN' and paste the URL you copied earlier into the 'SQS queue' field.
-- 7. Save the changes.


-- To activate AUTO_REFRESH, perform a manual refresh once. Subsequent refreshes will be automatic.
ALTER EXTERNAL TABLE order_raw_ext_table REFRESH;


--Create Steam
CREATE OR REPLACE STREAM order_raw_ext_table_stream
ON EXTERNAL TABLE order_raw_ext_table
INSERT_ONLY = TRUE;


-- Task 1: transform_data
CREATE OR REPLACE TASK transform_data_using_ext_tbl
  WAREHOUSE = compute_wh
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('order_raw_ext_table_stream')
  AS
  INSERT INTO order_analytics
  SELECT
      order_id,
      order_date,
      CONCAT(cust_fname, ' ', cust_lname) AS cust_name,
      product_id,
      (quantity * unit_price) - discounts AS total_price
  FROM order_raw_ext_table_stream;


-- Task 2: send_email_report
CREATE OR REPLACE TASK send_email_report_ext_tbl
  WAREHOUSE = COMPUTE_WH
  AFTER transform_data_using_ext_tbl
  AS
  BEGIN
    CALL SYSTEM$SEND_EMAIL(
      'my_email_integration',
      'tuftech007@outlook.com',
      'Regular ORDER Data Processing Report',
      'The Regular ORDER data processing is complete.'
    );
  END;



ALTER TASK transform_data_using_ext_tbl RESUME;
ALTER TASK send_email_report_ext_tbl RESUME;








So, what exactly are Snowflake Tasks? In simple terms, a Task is a way to schedule and run SQL statements automatically. You can think of it as a scheduler for your Snowflake environment.

You can also link multiple tasks together to run in a specific sequence!

Tasks help you automate recurring processes, ensuring your data workflows run smoothly without manual intervention.

Let's jump into the UI and explore how to create Tasks in Snowflake. I’ve already set up some foundational objects required for this lab.

Shifting our focus to task creation: first, you need to define what you want your task to do. This could be anything from loading data into a table, transforming data, or even calling a stored procedure.

Next, you’ll define the schedule or interval at which you want this task to execute. As you can see, the schedule can be set using a cron expression, which gives you more flexibility in terms of timing.

Optionally, you can specify exactly which warehouse to use for running the task. Depending on whether you include the WAREHOUSE parameter or not, tasks are classified as either user-managed or serverless.

If you include the WAREHOUSE parameter, it's considered a user-managed task, meaning it will execute on the warehouse you’ve defined. If you skip the WAREHOUSE parameter, it becomes a serverless task, and Snowflake will automatically allocate the necessary compute resources.

Both approaches have their own pros and cons!

Serverless compute is roughly 1.5 times more expensive than the user-managed warehouse. However, please be aware that the user-managed warehouses are fully charged for the first minute, even if you use them for just a few seconds. After that initial minute,  you’re billed on a per-second basis. On the other hand, serverless compute is billed per second right from the start.

For this reason, short and frequently executed tasks, particularly those running for less than 40 seconds, are often recommended to be created as serverless tasks.

Since users select the warehouse size for user-managed tasks, there's a chance they might choose an inappropriate size. With serverless tasks, Snowflake automatically picks the optimal warehouse size for you.

Additionally, serverless tasks are ideal for time-critical tasks with strict SLA requirements because they’re dedicated solely to the task at hand. In contrast, user-managed warehouses might handle other workloads as well, which could lead to wait times and delays.

Now, let’s run this to create the task. Once the Task is created, it will be in a suspended state by default. You should enable it using the ALTER TASK rezyoom statement.

One of the powerful features of Snowflake Tasks is that they can be chained together. This means you can create complex workflows by linking multiple Tasks.

For instance, you might have a Task that loads data from an external stage to a table, followed by another Task that transforms the data, and a third Task that sends email notifications. Each Task can be set to start only after the previous Task is completed successfully, ensuring a smooth and reliable workflow.

To chain Tasks, you use the AFTER clause in the CREATE TASK statement.

This should give you a better understanding, but there’s still room to enhance this workflow. For instance, you can add a check to see if the stream has data. With this approach, the warehouse will only start if there’s data to process, potentially leading to significant cost savings.

You can also achieve a similar result by creating external tables and setting up cloud file notifications. The code for this approach is available in the description, so feel free to check it out if you’re interested.

Snowflake Tasks also offer robust monitoring and error-handling capabilities.

You can monitor the status of your Tasks using the Snowflake UI as well as SQL commands, making it easy to manage and troubleshoot any issues that arise.

If a Task fails, you can set it to retry automatically.

Overall, Tasks are a powerful tool for automating and streamlining your workflows in Snowflake.

That's all for today! Please stay tuned for our next video where we’ll explore more advanced Snowflake features!

If you found this video helpful, please give it a thumbs up, and hit that subscribe button to stay up-to-date with the latest technologies!

Thanks for watching!

















Comments

Popular posts from this blog

How to Install Airflow on Windows

Airflow DAGs, Operators, Tasks & Providers

How to Install DBT and Set Up a Project, Create Your First dbt Model