Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save kanchokanchev/b43eea4afe87eaf2b6fbc99bcc5335c8 to your computer and use it in GitHub Desktop.

Select an option

Save kanchokanchev/b43eea4afe87eaf2b6fbc99bcc5335c8 to your computer and use it in GitHub Desktop.
DBT - Cloud Pipeline Using AWS S3, Lambda With Data Validation and Control-M #DBT #DBT_AWS_S3 #DBT_Pipeline #Control_M #AWS_LAMBDA #AWS_SNS_TOPIC

Multi-Folder S3 to DBT Cloud Pipeline Using Lambda With Data Files Validation and Control-M

This guide explains how to set up a pipeline where files uploaded to multiple S3 bucket folders trigger specific DBT jobs in DBT Core. The pipeline uses AWS Lambda to validate each uploaded data file, send a mail notification if needed and trigger DBT jobs through Control-M.


Prerequisites

1. Store Secrets in AWS Secrets Manager:

2. Create a secret in Secrets Manager with the credentials for the Control-M API.

Example secret name: ControlMApiCredentials

Secret structure:

{
  "username": "your_username",
  "password": "your_password"
}

3. Set Up SNS

A. Create an SNS Topic

>> Go to the Amazon SNS Console.
>> Click Create Topic.
>> Choose a topic type (e.g., Standard or FIFO):
>> Standard is sufficient for most use cases.
>> Provide a name for the topic (e.g., FileValidationNotifications).
>> Click Create Topic.

B. Add Email Subscriptions to the Topic

>> In the SNS topic details page, click Create Subscription.
>> Set the protocol to Email.
>> Enter the email address where notifications should be sent.
>> Click Create Subscription.
>> Check your inbox for a subscription confirmation email and click the link to confirm the subscription.

4. Ensure the Lambda execution role has the following permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": "arn:aws:secretsmanager:region:account-id:secret:ControlMApiCredentials-*"
    },
    {
       "Effect": "Allow",
       "Action": "sns:Publish",
       "Resource": "arn:aws:sns:<region>:<account-id>:<topic-name>"
    }
  ]
}

5. Lambda Function Code

import boto3
import requests
import pandas as pd
import io
import logging
import json

# Constants
SECRET_NAME = "ControlMApiCredentials"
REGION_NAME = "your-aws-region"  # e.g., us-east-1
CONTROL_M_API_URL = "https://control-m-api.example.com/automation-api"

S3_CLIENT = boto3.client("s3")
SNS_CLIENT = boto3.client("sns")
SNS_TOPIC_ARN = "arn:aws:sns:region:account-id:YourSNSTopic"

# Mapping of S3 buckets to Control-M jobs
BUCKET_TO_JOB = {
    "bucket1": {"folder": "ProviderA_Folder", "job": "ProviderA_Job"},
    "bucket2": {"folder": "ProviderB_Folder", "job": "ProviderB_Job"},
    "bucket3": {"folder": "ProviderC_Folder", "job": "ProviderC_Job"},
}

# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_secrets():
    """
    Retrieve credentials from AWS Secrets Manager.
    """
    try:
        client = boto3.client("secretsmanager", region_name=REGION_NAME)
        secret_value = client.get_secret_value(SecretId=SECRET_NAME)
        return json.loads(secret_value["SecretString"])
    except Exception as e:
        logger.error(f"Error retrieving secrets: {str(e)}")
        raise RuntimeError("Failed to retrieve secrets from AWS Secrets Manager.")


def validate_csv(bucket, key):
    """
    Validate the CSV file for content and schema.
    """
    try:
        # Download file from S3
        response = S3_CLIENT.get_object(Bucket=bucket, Key=key)
        csv_content = response["Body"].read().decode("utf-8")

        # Load CSV into pandas for validation
        df = pd.read_csv(io.StringIO(csv_content))

        # Check if file is empty
        if df.empty:
            return False, "CSV file is empty."

        # Check for required columns
        required_columns = ["column1", "column2", "column3"]
        if not all(col in df.columns for col in required_columns):
            return False, f"Missing required columns: {set(required_columns) - set(df.columns)}."

        # Additional data quality checks (e.g., null values, data types)
        if df.isnull().any().any():
            return False, "CSV contains null values."

        return True, "CSV validation passed."

    except Exception as e:
        logger.error(f"Error validating CSV: {str(e)}")
        return False, f"Error validating CSV: {str(e)}"


def send_notification(bucket_name, file_name, error_message):
    """
    Send a detailed SNS notification.
    """
    message = (
        f"Validation failed for file: {file_name} in bucket: {bucket_name}.\n"
        f"Error: {error_message}"
    )
    SNS_CLIENT.publish(
        TopicArn=SNS_TOPIC_ARN,
        Message=message,
        Subject="S3 File Validation Failed"
    )


def lambda_handler(event, context):
    """
    Lambda function triggered by S3 events.
    """
    try:
        # Extract bucket name and file path from the S3 event
        bucket_name = event["detail"]["bucket"]["name"]
        file_path = event["detail"]["object"]["key"]
        logger.info(f"New file uploaded: {bucket_name}/{file_path}")
	

        # Determine the corresponding job based on the bucket name
        if bucket_name not in BUCKET_TO_JOB:
            raise ValueError(f"Unknown bucket: {bucket_name}")
	
	# Validate the CSV file
	logger.info(f"Processing file: s3://{bucket_name}/{file_key}")
	is_valid, validation_message = validate_csv(bucket_name, file_path)	
	if not is_valid:
           logger.error(f"Validation failed: {validation_message}")
           send_notification(bucket_name, file_path, f"Validation failed for file {file_key}: {validation_message}")
           return {"status": "failed", "message": validation_message}

        # Triggering Control-M Endpoint
	logger.info("Validation passed. Triggering Control-M job.")

        job_details = BUCKET_TO_JOB[bucket_name]
        folder = job_details["folder"]
        job = job_details["job"]

        # Retrieve credentials from Secrets Manager
        secrets = get_secrets()
        username = secrets["username"]
        password = secrets["password"]

        # Authenticate with Control-M API
        auth_url = f"{CONTROL_M_API_URL}/session/login"
        auth_payload = {"username": username, "password": password}
        auth_response = requests.post(auth_url, json=auth_payload)
        auth_response.raise_for_status()
        session_token = auth_response.json().get("token")

        # Trigger the Control-M job
        run_job_url = f"{CONTROL_M_API_URL}/run/job"
        headers = {"Authorization": f"Bearer {session_token}"}
        job_payload = {
            "folder": folder,
            "job": job,
            "variables": {
                "S3_BUCKET": bucket_name,
                "S3_FILE_PATH": file_path,
            },
        }
        run_response = requests.post(run_job_url, headers=headers, json=job_payload)
        run_response.raise_for_status()

        logger.info(f"Control-M job triggered successfully: {run_response.json()}")
        return {"message": "Control-M job triggered successfully.", "job_details": run_response.json()}

    except requests.exceptions.RequestException as e:
        logger.error(f"Error interacting with Control-M API: {e}")
        return {"error": f"Control-M API error: {e}"}
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment