Skip to content

Instantly share code, notes, and snippets.

@kanchokanchev
Last active January 3, 2025 15:37
Show Gist options
  • Select an option

  • Save kanchokanchev/b161fb948907b81af96583b76781657d to your computer and use it in GitHub Desktop.

Select an option

Save kanchokanchev/b161fb948907b81af96583b76781657d to your computer and use it in GitHub Desktop.
Snowflake - Real-Time or Near Real-Time Data Pipeline #Snowflake #Snowflake_Pipeline

Real-Time or Near Real-Time Data Pipeline in Snowflake

Scenario Overview

You have data files from 2-3 different suppliers. These files are uploaded to an AWS S3 bucket and are then loaded into the respective database tables in Snowflake using Snowpipe. The Snowpipe is triggered automatically whenever new files arrive in the S3 bucket via S3 SNS (Simple Notification Service). After loading the data into Snowflake tables, a stored procedure is triggered for further data processing and distribution into other tables.


Solution Architecture

1. Data Arrival

  • Suppliers upload files to an AWS S3 bucket.
  • Amazon SNS (Simple Notification Service) sends a notification to Snowflake about the new files.

2. Data Loading

  • Snowpipe automatically loads the data from S3 into the corresponding staging tables in Snowflake.

3. Triggering a Procedure

  • Once Snowpipe loads the data, a stream detects the new or changed data in the staging tables.
  • A task tied to the stream runs a stored procedure to process the new data.

4. Further Processing

  • The stored procedure transforms, cleans, and distributes the data into the appropriate target tables.

Implementation Steps

1. Configure Snowpipe to Load Data from S3

Set Up External Stage:

CREATE OR REPLACE STAGE my_stage
URL = 's3://my-bucket/supplier-data/'
STORAGE_INTEGRATION = my_integration;

Define Snowpipe:

CREATE OR REPLACE PIPE supplier_pipe
AUTO_INGEST = TRUE
AS
COPY INTO supplier_staging_table
FROM @my_stage
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"');

Link Snowpipe to S3 Events:

  • Configure S3 Event Notifications to publish to an SNS topic.
  • In Snowflake, associate the SNS topic with Snowpipe for automatic triggers.

2. Set Up Streams to Capture Changes

Create a stream on the staging table to track new or modified rows:

CREATE OR REPLACE STREAM supplier_staging_stream
ON TABLE supplier_staging_table

3. Create Stored Procedures for Data Processing

Define a stored procedure to process the new data detected by the stream and distribute it into the appropriate target tables:

CREATE OR REPLACE PROCEDURE process_supplier_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Insert new data into the target table
    INSERT INTO processed_table (column1, column2, column3)
    SELECT column1, column2, column3
    FROM supplier_staging_stream
    WHERE metadata$action = 'INSERT';

    -- Perform additional transformations or processing as needed

    RETURN 'Data processed successfully';
END;
$$;

4. Create a Task to Trigger the Procedure

Use a Snowflake Task to automate the execution of the procedure whenever new data arrives:

CREATE OR REPLACE TASK process_supplier_task
WAREHOUSE = my_warehouse
SCHEDULE = '1 MINUTE' -- Check every minute
AS
CALL process_supplier_data();

Bind the Task to the Stream:

The task queries the stream, ensuring it runs only when there’s new data. This ensures efficient use of compute resources.


Workflow Summary

  1. Data Arrival:

    • Files arrive in S3 → SNS triggers Snowpipe.
  2. Data Loading:

    • Snowpipe loads the files into the supplier_staging_table.
  3. Change Detection:

    • Stream supplier_staging_stream tracks new or modified rows.
  4. Data Processing:

    • Task process_supplier_task runs every minute, calling the stored procedure process_supplier_data.
    • The procedure processes the stream’s data and inserts it into target tables.

Key Considerations

  1. Performance Tuning:

    • Use a Snowflake multi-cluster warehouse for the process_supplier_task if you anticipate high concurrency or large data volumes.
  2. Error Handling:

    • Add error-catching logic to the stored procedure to log or handle exceptions.
  3. Cost Efficiency:

    • Ensure tasks and warehouses are sized appropriately and use suspend-on-idle to minimize compute costs.
  4. Monitoring:

    • Use Snowflake’s TASK_HISTORY and STREAMS metadata views to monitor the pipeline:
      SELECT * FROM INFORMATION_SCHEMA.TASK_HISTORY WHERE TASK_NAME = 'process_supplier_task';
  5. End-to-End Testing:

    • Validate the pipeline with test files to ensure correctness.

Benefits of This Approach

  • Automation: Fully automated end-to-end pipeline with Snowpipe, streams, and tasks.
  • Scalability: The multi-cluster architecture ensures scalability for large volumes.
  • Efficiency: Tasks run only when there’s new data, saving compute resources.
  • Modularity: Streams and tasks decouple the stages, making the pipeline easier to maintain and extend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment