You have data files from 2-3 different suppliers. These files are uploaded to an AWS S3 bucket and are then loaded into the respective database tables in Snowflake using Snowpipe. The Snowpipe is triggered automatically whenever new files arrive in the S3 bucket via S3 SNS (Simple Notification Service). After loading the data into Snowflake tables, a stored procedure is triggered for further data processing and distribution into other tables.
- Suppliers upload files to an AWS S3 bucket.
- Amazon SNS (Simple Notification Service) sends a notification to Snowflake about the new files.
- Snowpipe automatically loads the data from S3 into the corresponding staging tables in Snowflake.
- Once Snowpipe loads the data, a stream detects the new or changed data in the staging tables.
- A task tied to the stream runs a stored procedure to process the new data.
- The stored procedure transforms, cleans, and distributes the data into the appropriate target tables.
CREATE OR REPLACE STAGE my_stage
URL = 's3://my-bucket/supplier-data/'
STORAGE_INTEGRATION = my_integration;CREATE OR REPLACE PIPE supplier_pipe
AUTO_INGEST = TRUE
AS
COPY INTO supplier_staging_table
FROM @my_stage
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"');- Configure S3 Event Notifications to publish to an SNS topic.
- In Snowflake, associate the SNS topic with Snowpipe for automatic triggers.
Create a stream on the staging table to track new or modified rows:
CREATE OR REPLACE STREAM supplier_staging_stream
ON TABLE supplier_staging_tableDefine a stored procedure to process the new data detected by the stream and distribute it into the appropriate target tables:
CREATE OR REPLACE PROCEDURE process_supplier_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
-- Insert new data into the target table
INSERT INTO processed_table (column1, column2, column3)
SELECT column1, column2, column3
FROM supplier_staging_stream
WHERE metadata$action = 'INSERT';
-- Perform additional transformations or processing as needed
RETURN 'Data processed successfully';
END;
$$;Use a Snowflake Task to automate the execution of the procedure whenever new data arrives:
CREATE OR REPLACE TASK process_supplier_task
WAREHOUSE = my_warehouse
SCHEDULE = '1 MINUTE' -- Check every minute
AS
CALL process_supplier_data();The task queries the stream, ensuring it runs only when there’s new data. This ensures efficient use of compute resources.
-
Data Arrival:
- Files arrive in S3 → SNS triggers Snowpipe.
-
Data Loading:
- Snowpipe loads the files into the
supplier_staging_table.
- Snowpipe loads the files into the
-
Change Detection:
- Stream
supplier_staging_streamtracks new or modified rows.
- Stream
-
Data Processing:
- Task
process_supplier_taskruns every minute, calling the stored procedureprocess_supplier_data. - The procedure processes the stream’s data and inserts it into target tables.
- Task
-
Performance Tuning:
- Use a Snowflake multi-cluster warehouse for the
process_supplier_taskif you anticipate high concurrency or large data volumes.
- Use a Snowflake multi-cluster warehouse for the
-
Error Handling:
- Add error-catching logic to the stored procedure to log or handle exceptions.
-
Cost Efficiency:
- Ensure tasks and warehouses are sized appropriately and use suspend-on-idle to minimize compute costs.
-
Monitoring:
- Use Snowflake’s
TASK_HISTORYandSTREAMSmetadata views to monitor the pipeline:SELECT * FROM INFORMATION_SCHEMA.TASK_HISTORY WHERE TASK_NAME = 'process_supplier_task';
- Use Snowflake’s
-
End-to-End Testing:
- Validate the pipeline with test files to ensure correctness.
- Automation: Fully automated end-to-end pipeline with Snowpipe, streams, and tasks.
- Scalability: The multi-cluster architecture ensures scalability for large volumes.
- Efficiency: Tasks run only when there’s new data, saving compute resources.
- Modularity: Streams and tasks decouple the stages, making the pipeline easier to maintain and extend.