Skip to content

Instantly share code, notes, and snippets.

@kanchokanchev
Last active January 14, 2025 07:06
Show Gist options
  • Select an option

  • Save kanchokanchev/adf55278bddb39f4a98f206f03851ba3 to your computer and use it in GitHub Desktop.

Select an option

Save kanchokanchev/adf55278bddb39f4a98f206f03851ba3 to your computer and use it in GitHub Desktop.
DBT - Cloud Pipeline Using AWS S3, EventBridge and Lambda #DBT #DBT_AWS_S3 #DBT_Pipeline #DBT_AWS_EVENT_BRIDGE #AWS_EVENT_BRIDGE #AWS_LAMBDA

Multi-Folder S3 to DBT Core Pipeline Using EventBridge and Lambda

This guide explains how to set up a pipeline where files uploaded to multiple S3 bucket folders trigger specific DBT jobs in DBT Core. The pipeline uses AWS EventBridge for event routing and AWS Lambda for triggering DBT jobs.


Architecture Overview

  1. S3 Bucket Folders: Up to 7 folders in an S3 bucket (e.g., folder1/, folder2/, ... folder7/).
  2. EventBridge Rules: Filter S3 events (file uploads) based on folder prefixes and file extensions.
  3. AWS Lambda: Dynamically triggers DBT jobs based on the folder the file was uploaded to.
  4. DBT Core API: The Lambda function makes an HTTP request to DBT Core to start the corresponding job.

Pipeline Workflow

  1. Files are uploaded to S3 bucket folders (folder1/, folder2/, ... folder7/).
  2. EventBridge filters events and sends them to a target (Lambda).
  3. Lambda determines the appropriate DBT job based on the folder and triggers it using the DBT Core API.
  4. DBT Core processes the job, loading and transforming the data in Snowflake.

Implementation Steps

1. Set Up S3 Event Notifications

  1. Go to the S3 bucket in the AWS Management Console.
  2. Navigate to PropertiesEvent Notifications.
  3. Add a new notification:
    • Event Types: s3:ObjectCreated:*.
    • Destination: Send to EventBridge.

2. Create EventBridge Rules

Create separate EventBridge rules for each folder.

Example Rule for folder1/:

{
  "source": ["aws.s3"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["s3.amazonaws.com"],
    "eventName": ["PutObject", "CompleteMultipartUpload"],
    "requestParameters": {
      "bucketName": ["acc-song-bg-files-uploads-test"]
    }
  }
}

Repeat this process for each folder (folder2/ to folder7/), updating the prefix accordingly.


3. Create Lambda Function

Write a Lambda function to dynamically trigger DBT jobs based on the folder.

Python Code for Lambda Function

import requests
import logging

# Base DBT API endpoint
DBT_API_SVC_URL = "http://10.100.49.185:9102/dbt_job_trigger"

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Mapping of folders to DBT job IDs
FOLDER_TO_JOB = {
    "folder_01/": 12345,
    "folder_02/": 23456
}

def lambda_handler(event, context):
    try:
        # Extract bucket name and file key from the S3 event
        s3_bucket_name = event['detail']['bucket']['name']
        s3_object_key = event['detail']['object']['key']
        
        logger.info(f"New file uploaded: {s3_bucket_name}/{s3_object_key}")

        # Determine the folder from the key
        matching_job_id = None
        for folder, job_id in FOLDER_TO_JOB.items():
            if s3_object_key.startswith(folder):
                matching_job_id = job_id
                break

        if not matching_job_id:
            logger.warning(f"No matching DBT job found for file: {s3_object_key}")
            return {"message": "No matching DBT job for the folder."}

        # Prepare the API payload
        payload = {
            "job_id": matching_job_id,
            "bucket_name": s3_bucket_name,
            "object_key": s3_object_key
        }
        headers = {"Content-Type": "application/json"}

        # Trigger the DBT job via API
        response = requests.post(DBT_API_SVC_URL, json=payload, headers=headers)
        if response.status_code == 200:
            logger.info(f"Triggered DBT job {matching_job_id} for folder {folder}")
            return {"message": f"DBT job {matching_job_id} triggered successfully."}
        else:
            logger.error(f"Failed to trigger DBT job: {response.status_code} - {response.text}")
            return {"error": response.text}

    except KeyError as e:
        logger.error(f"Key error: {str(e)}")
        return {"error": f"Invalid event structure: {str(e)}"}
    except requests.exceptions.RequestException as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": f"Failed to make API request: {str(e)}"}
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        raise

4. Grant Lambda Permissions

  1. EventBridge Permissions:

    • Attach a policy allowing EventBridge to invoke the Lambda function.
  2. CloudWatch Logging:

    • Ensure the Lambda execution role allows writing logs to CloudWatch.

5. Test the Pipeline

  1. Upload Test Files:

    • Upload files to different folders (folder1/, folder2/, etc.) in the S3 bucket.
  2. Verify EventBridge Rules:

    • Confirm the rules route the events correctly to Lambda.
  3. Check Lambda Logs:

    • View logs in CloudWatch to confirm the correct DBT job was triggered.
  4. Check DBT Core:

    • Verify job execution in DBT Core’s job history.

Scaling and Enhancements

  1. Dynamic Folder and Job Mapping:

    • Use a DynamoDB table to store the folder-to-job mapping for easier updates.
  2. Step Functions for Orchestration:

    • Replace Lambda with AWS Step Functions if the workflow involves multiple steps.
  3. Error Notifications:

    • Configure SNS or a Dead Letter Queue (DLQ) for Lambda to handle failures.
  4. Additional File Validation:

    • Add validation logic in Lambda to check file formats or metadata before triggering the DBT job.

Advantages of This Approach

  • Scalable: Easily supports more folders or jobs by adding rules or updating the mapping.
  • Event-Driven: Efficiently triggers DBT jobs only when new files arrive.
  • Centralized Logic: Lambda handles all folder-to-job mapping in one place, reducing duplication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment