This guide explains how to set up a pipeline where files uploaded to multiple S3 bucket folders trigger specific DBT jobs in DBT Core. The pipeline uses AWS Lambda to validate each uploaded data file, send a mail notification if needed and trigger DBT jobs through Control-M.
Example secret name: ControlMApiCredentials
Secret structure:
{
"username": "your_username",
"password": "your_password"
}>> Go to the Amazon SNS Console.
>> Click Create Topic.
>> Choose a topic type (e.g., Standard or FIFO):
>> Standard is sufficient for most use cases.
>> Provide a name for the topic (e.g., FileValidationNotifications).
>> Click Create Topic.
>> In the SNS topic details page, click Create Subscription.
>> Set the protocol to Email.
>> Enter the email address where notifications should be sent.
>> Click Create Subscription.
>> Check your inbox for a subscription confirmation email and click the link to confirm the subscription.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:region:account-id:secret:ControlMApiCredentials-*"
},
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:<region>:<account-id>:<topic-name>"
}
]
}import boto3
import requests
import pandas as pd
import io
import logging
import json
# Constants
SECRET_NAME = "ControlMApiCredentials"
REGION_NAME = "your-aws-region" # e.g., us-east-1
CONTROL_M_API_URL = "https://control-m-api.example.com/automation-api"
S3_CLIENT = boto3.client("s3")
SNS_CLIENT = boto3.client("sns")
SNS_TOPIC_ARN = "arn:aws:sns:region:account-id:YourSNSTopic"
# Mapping of S3 buckets to Control-M jobs
BUCKET_TO_JOB = {
"bucket1": {"folder": "ProviderA_Folder", "job": "ProviderA_Job"},
"bucket2": {"folder": "ProviderB_Folder", "job": "ProviderB_Job"},
"bucket3": {"folder": "ProviderC_Folder", "job": "ProviderC_Job"},
}
# Logger setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_secrets():
"""
Retrieve credentials from AWS Secrets Manager.
"""
try:
client = boto3.client("secretsmanager", region_name=REGION_NAME)
secret_value = client.get_secret_value(SecretId=SECRET_NAME)
return json.loads(secret_value["SecretString"])
except Exception as e:
logger.error(f"Error retrieving secrets: {str(e)}")
raise RuntimeError("Failed to retrieve secrets from AWS Secrets Manager.")
def validate_csv(bucket, key):
"""
Validate the CSV file for content and schema.
"""
try:
# Download file from S3
response = S3_CLIENT.get_object(Bucket=bucket, Key=key)
csv_content = response["Body"].read().decode("utf-8")
# Load CSV into pandas for validation
df = pd.read_csv(io.StringIO(csv_content))
# Check if file is empty
if df.empty:
return False, "CSV file is empty."
# Check for required columns
required_columns = ["column1", "column2", "column3"]
if not all(col in df.columns for col in required_columns):
return False, f"Missing required columns: {set(required_columns) - set(df.columns)}."
# Additional data quality checks (e.g., null values, data types)
if df.isnull().any().any():
return False, "CSV contains null values."
return True, "CSV validation passed."
except Exception as e:
logger.error(f"Error validating CSV: {str(e)}")
return False, f"Error validating CSV: {str(e)}"
def send_notification(bucket_name, file_name, error_message):
"""
Send a detailed SNS notification.
"""
message = (
f"Validation failed for file: {file_name} in bucket: {bucket_name}.\n"
f"Error: {error_message}"
)
SNS_CLIENT.publish(
TopicArn=SNS_TOPIC_ARN,
Message=message,
Subject="S3 File Validation Failed"
)
def lambda_handler(event, context):
"""
Lambda function triggered by S3 events.
"""
try:
# Extract bucket name and file path from the S3 event
bucket_name = event["detail"]["bucket"]["name"]
file_path = event["detail"]["object"]["key"]
logger.info(f"New file uploaded: {bucket_name}/{file_path}")
# Determine the corresponding job based on the bucket name
if bucket_name not in BUCKET_TO_JOB:
raise ValueError(f"Unknown bucket: {bucket_name}")
# Validate the CSV file
logger.info(f"Processing file: s3://{bucket_name}/{file_key}")
is_valid, validation_message = validate_csv(bucket_name, file_path)
if not is_valid:
logger.error(f"Validation failed: {validation_message}")
send_notification(bucket_name, file_path, f"Validation failed for file {file_key}: {validation_message}")
return {"status": "failed", "message": validation_message}
# Triggering Control-M Endpoint
logger.info("Validation passed. Triggering Control-M job.")
job_details = BUCKET_TO_JOB[bucket_name]
folder = job_details["folder"]
job = job_details["job"]
# Retrieve credentials from Secrets Manager
secrets = get_secrets()
username = secrets["username"]
password = secrets["password"]
# Authenticate with Control-M API
auth_url = f"{CONTROL_M_API_URL}/session/login"
auth_payload = {"username": username, "password": password}
auth_response = requests.post(auth_url, json=auth_payload)
auth_response.raise_for_status()
session_token = auth_response.json().get("token")
# Trigger the Control-M job
run_job_url = f"{CONTROL_M_API_URL}/run/job"
headers = {"Authorization": f"Bearer {session_token}"}
job_payload = {
"folder": folder,
"job": job,
"variables": {
"S3_BUCKET": bucket_name,
"S3_FILE_PATH": file_path,
},
}
run_response = requests.post(run_job_url, headers=headers, json=job_payload)
run_response.raise_for_status()
logger.info(f"Control-M job triggered successfully: {run_response.json()}")
return {"message": "Control-M job triggered successfully.", "job_details": run_response.json()}
except requests.exceptions.RequestException as e:
logger.error(f"Error interacting with Control-M API: {e}")
return {"error": f"Control-M API error: {e}"}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise