This guide explains how to set up a pipeline where files uploaded to multiple S3 bucket folders trigger specific DBT jobs in DBT Core. The pipeline uses AWS EventBridge for event routing and AWS Lambda for triggering DBT jobs.
- S3 Bucket Folders: Up to 7 folders in an S3 bucket (e.g.,
folder1/,folder2/, ...folder7/). - EventBridge Rules: Filter S3 events (file uploads) based on folder prefixes and file extensions.
- AWS Lambda: Dynamically triggers DBT jobs based on the folder the file was uploaded to.
- DBT Core API: The Lambda function makes an HTTP request to DBT Core to start the corresponding job.
- Files are uploaded to S3 bucket folders (
folder1/,folder2/, ...folder7/). - EventBridge filters events and sends them to a target (Lambda).
- Lambda determines the appropriate DBT job based on the folder and triggers it using the DBT Core API.
- DBT Core processes the job, loading and transforming the data in Snowflake.
- Go to the S3 bucket in the AWS Management Console.
- Navigate to Properties → Event Notifications.
- Add a new notification:
- Event Types:
s3:ObjectCreated:*. - Destination: Send to EventBridge.
- Event Types:
Create separate EventBridge rules for each folder.
{
"source": ["aws.s3"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["s3.amazonaws.com"],
"eventName": ["PutObject", "CompleteMultipartUpload"],
"requestParameters": {
"bucketName": ["acc-song-bg-files-uploads-test"]
}
}
}Repeat this process for each folder (folder2/ to folder7/), updating the prefix accordingly.
Write a Lambda function to dynamically trigger DBT jobs based on the folder.
import requests
import logging
# Base DBT API endpoint
DBT_API_SVC_URL = "http://10.100.49.185:9102/dbt_job_trigger"
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Mapping of folders to DBT job IDs
FOLDER_TO_JOB = {
"folder_01/": 12345,
"folder_02/": 23456
}
def lambda_handler(event, context):
try:
# Extract bucket name and file key from the S3 event
s3_bucket_name = event['detail']['bucket']['name']
s3_object_key = event['detail']['object']['key']
logger.info(f"New file uploaded: {s3_bucket_name}/{s3_object_key}")
# Determine the folder from the key
matching_job_id = None
for folder, job_id in FOLDER_TO_JOB.items():
if s3_object_key.startswith(folder):
matching_job_id = job_id
break
if not matching_job_id:
logger.warning(f"No matching DBT job found for file: {s3_object_key}")
return {"message": "No matching DBT job for the folder."}
# Prepare the API payload
payload = {
"job_id": matching_job_id,
"bucket_name": s3_bucket_name,
"object_key": s3_object_key
}
headers = {"Content-Type": "application/json"}
# Trigger the DBT job via API
response = requests.post(DBT_API_SVC_URL, json=payload, headers=headers)
if response.status_code == 200:
logger.info(f"Triggered DBT job {matching_job_id} for folder {folder}")
return {"message": f"DBT job {matching_job_id} triggered successfully."}
else:
logger.error(f"Failed to trigger DBT job: {response.status_code} - {response.text}")
return {"error": response.text}
except KeyError as e:
logger.error(f"Key error: {str(e)}")
return {"error": f"Invalid event structure: {str(e)}"}
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {str(e)}")
return {"error": f"Failed to make API request: {str(e)}"}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise-
EventBridge Permissions:
- Attach a policy allowing EventBridge to invoke the Lambda function.
-
CloudWatch Logging:
- Ensure the Lambda execution role allows writing logs to CloudWatch.
-
Upload Test Files:
- Upload files to different folders (
folder1/,folder2/, etc.) in the S3 bucket.
- Upload files to different folders (
-
Verify EventBridge Rules:
- Confirm the rules route the events correctly to Lambda.
-
Check Lambda Logs:
- View logs in CloudWatch to confirm the correct DBT job was triggered.
-
Check DBT Core:
- Verify job execution in DBT Core’s job history.
-
Dynamic Folder and Job Mapping:
- Use a DynamoDB table to store the folder-to-job mapping for easier updates.
-
Step Functions for Orchestration:
- Replace Lambda with AWS Step Functions if the workflow involves multiple steps.
-
Error Notifications:
- Configure SNS or a Dead Letter Queue (DLQ) for Lambda to handle failures.
-
Additional File Validation:
- Add validation logic in Lambda to check file formats or metadata before triggering the DBT job.
- Scalable: Easily supports more folders or jobs by adding rules or updating the mapping.
- Event-Driven: Efficiently triggers DBT jobs only when new files arrive.
- Centralized Logic: Lambda handles all folder-to-job mapping in one place, reducing duplication.