Skip to content

Instantly share code, notes, and snippets.

@jasonforte
Created August 31, 2025 11:13
Show Gist options
  • Select an option

  • Save jasonforte/f9b4267c9644964537322953d02777b5 to your computer and use it in GitHub Desktop.

Select an option

Save jasonforte/f9b4267c9644964537322953d02777b5 to your computer and use it in GitHub Desktop.
How to Summarize Images with Bedrock Data Automation
AWS_REGION = 'eu-west-1'
BEDROCK_DATA_AUTOMATION_PROJECT_ARN = ''
BEDROCK_DATA_AUTOMATION_INPUT_BUCKET = ''
BEDROCK_DATA_AUTOMATION_OUTPUT_BUCKET = ''
BEDROCK_DATA_AUTOMATION_PROFILE_ARN = ''
import os
import random
import string
import boto3
import time
import json
from dotenv import load_dotenv
load_dotenv()
# Load the required environment variables.
region_name = os.environ.get('AWS_REGION')
bda_project_arn = os.environ.get('BEDROCK_DATA_AUTOMATION_PROJECT_ARN')
bda_input_bucket = os.environ.get('BEDROCK_DATA_AUTOMATION_INPUT_BUCKET')
bda_output_bucket = os.environ.get('BEDROCK_DATA_AUTOMATION_OUTPUT_BUCKET')
bda_profile_arn = os.environ.get('BEDROCK_DATA_AUTOMATION_PROFILE_ARN')
# Load the boto3 clients for this script
bedrock_data_automation = boto3.client('bedrock-data-automation-runtime', region_name=region_name)
s3 = boto3.client('s3', region_name=region_name)
def randomize_filename(filepath, length=6):
"""Randomize the name of the file to avoid name conflicts."""
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
# Generate random string using lowercase letters and digits
random_string = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
# Construct the new filename
return f"{name}-{random_string}{ext}"
def upload_to_s3(filepath, client=''):
'''Upload the file to S3'''
if client == '':
client = 'unknown'
with open(filepath, 'rb') as fp:
randomized_name = randomize_filename(filepath).lstrip('./')
s3_key = f'{client}/{randomized_name}'
s3.upload_fileobj(fp, bda_input_bucket, s3_key)
s3_location = f's3://{bda_input_bucket}/{s3_key}'
print(f'Uploaded image to s3: {s3_location}')
return s3_location
def invoke_data_automation(input_s3_uri, client=''):
'''Trigger the data automation service.'''
if client == '':
client = 'unknown'
output_s3_uri = input_s3_uri.replace(bda_input_bucket, bda_output_bucket)
res = bedrock_data_automation.invoke_data_automation_async(
inputConfiguration={
's3Uri': input_s3_uri
},
outputConfiguration={
's3Uri': output_s3_uri
},
dataAutomationConfiguration={
'dataAutomationProjectArn': bda_project_arn
},
dataAutomationProfileArn=bda_profile_arn,
tags=[
{
'key': 'tenant_id',
'value': client
}
]
)
return res['invocationArn']
def get_data_automation_status(invocation_arn):
print(f'Checking for BDA status: invocation_arn={invocation_arn}')
return bedrock_data_automation.get_data_automation_status(invocationArn=invocation_arn)
def wait_for_success(invocation_arn, timeout=10):
'''Wait for the success response from BDA'''
count = 0
while True:
if count >= timeout:
break
result = get_data_automation_status(invocation_arn)
if result['status'] == 'Success':
return result
print(f'Status {result["status"]} waiting 10 seconds...')
time.sleep(10)
count += 1
return f"no success, waited {5 * timeout} seconds"
def get_s3_output(output_s3_uri):
'''Get the resulting object from S3.'''
print('Fetching resulting object:', output_s3_uri)
s3_bucket = output_s3_uri.split('/')[2]
s3_key = "/".join(output_s3_uri.split('/')[3:])
return s3.get_object(Bucket=s3_bucket, Key=s3_key)
def parse_bda_metadata(bda_metadata):
for bda_segment in bda_metadata['segment_metadata']:
if 'standard_output_path' in bda_segment:
output_obj = get_s3_output(bda_segment['standard_output_path'])
bda_segment['standard_output_path'] = json.load(output_obj['Body'])
if 'custom_output_path' in bda_segment:
output_obj = get_s3_output(bda_segment['custom_output_path'])
bda_segment['custom_output_path'] = json.load(output_obj['Body'])
yield {'type': 'segment', 'value': bda_segment}
def parse_bda_output(bda_output):
for output_metadata in bda_output['output_metadata']:
yield {'type': 'metadata', 'value': output_metadata}
yield from parse_bda_metadata(output_metadata)
def process_image_for_summary(filepath):
'''Full workflow, processing of the file.'''
uploaded_obj = upload_to_s3(filepath)
invocation_arn = invoke_data_automation(uploaded_obj)
result = wait_for_success(invocation_arn)
output_obj = get_s3_output(result['outputConfiguration']['s3Uri'])
output = json.load(output_obj['Body'])
for parsed_output in parse_bda_output(output):
if parsed_output['type'] == 'metadata':
print('## Metadata:\n', json.dumps(parsed_output['value'], indent=2))
if parsed_output['type'] == 'segment':
print('## Segment:\n', json.dumps(parsed_output['value'], indent=2))
[project]
name = "250824-bda"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"boto3>=1.40.16",
"dotenv>=0.9.9",
]
[dependency-groups]
dev = [
"ipython>=9.4.0",
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment