This is a code snippet accompanying the video posted on youtube.
Reach out to MakeOps for support or guidance.
| AWS_REGION = 'eu-west-1' | |
| BEDROCK_DATA_AUTOMATION_PROJECT_ARN = '' | |
| BEDROCK_DATA_AUTOMATION_INPUT_BUCKET = '' | |
| BEDROCK_DATA_AUTOMATION_OUTPUT_BUCKET = '' | |
| BEDROCK_DATA_AUTOMATION_PROFILE_ARN = '' |
This is a code snippet accompanying the video posted on youtube.
Reach out to MakeOps for support or guidance.
| import os | |
| import random | |
| import string | |
| import boto3 | |
| import time | |
| import json | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Load the required environment variables. | |
| region_name = os.environ.get('AWS_REGION') | |
| bda_project_arn = os.environ.get('BEDROCK_DATA_AUTOMATION_PROJECT_ARN') | |
| bda_input_bucket = os.environ.get('BEDROCK_DATA_AUTOMATION_INPUT_BUCKET') | |
| bda_output_bucket = os.environ.get('BEDROCK_DATA_AUTOMATION_OUTPUT_BUCKET') | |
| bda_profile_arn = os.environ.get('BEDROCK_DATA_AUTOMATION_PROFILE_ARN') | |
| # Load the boto3 clients for this script | |
| bedrock_data_automation = boto3.client('bedrock-data-automation-runtime', region_name=region_name) | |
| s3 = boto3.client('s3', region_name=region_name) | |
| def randomize_filename(filepath, length=6): | |
| """Randomize the name of the file to avoid name conflicts.""" | |
| filename = os.path.basename(filepath) | |
| name, ext = os.path.splitext(filename) | |
| # Generate random string using lowercase letters and digits | |
| random_string = ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) | |
| # Construct the new filename | |
| return f"{name}-{random_string}{ext}" | |
| def upload_to_s3(filepath, client=''): | |
| '''Upload the file to S3''' | |
| if client == '': | |
| client = 'unknown' | |
| with open(filepath, 'rb') as fp: | |
| randomized_name = randomize_filename(filepath).lstrip('./') | |
| s3_key = f'{client}/{randomized_name}' | |
| s3.upload_fileobj(fp, bda_input_bucket, s3_key) | |
| s3_location = f's3://{bda_input_bucket}/{s3_key}' | |
| print(f'Uploaded image to s3: {s3_location}') | |
| return s3_location | |
| def invoke_data_automation(input_s3_uri, client=''): | |
| '''Trigger the data automation service.''' | |
| if client == '': | |
| client = 'unknown' | |
| output_s3_uri = input_s3_uri.replace(bda_input_bucket, bda_output_bucket) | |
| res = bedrock_data_automation.invoke_data_automation_async( | |
| inputConfiguration={ | |
| 's3Uri': input_s3_uri | |
| }, | |
| outputConfiguration={ | |
| 's3Uri': output_s3_uri | |
| }, | |
| dataAutomationConfiguration={ | |
| 'dataAutomationProjectArn': bda_project_arn | |
| }, | |
| dataAutomationProfileArn=bda_profile_arn, | |
| tags=[ | |
| { | |
| 'key': 'tenant_id', | |
| 'value': client | |
| } | |
| ] | |
| ) | |
| return res['invocationArn'] | |
| def get_data_automation_status(invocation_arn): | |
| print(f'Checking for BDA status: invocation_arn={invocation_arn}') | |
| return bedrock_data_automation.get_data_automation_status(invocationArn=invocation_arn) | |
| def wait_for_success(invocation_arn, timeout=10): | |
| '''Wait for the success response from BDA''' | |
| count = 0 | |
| while True: | |
| if count >= timeout: | |
| break | |
| result = get_data_automation_status(invocation_arn) | |
| if result['status'] == 'Success': | |
| return result | |
| print(f'Status {result["status"]} waiting 10 seconds...') | |
| time.sleep(10) | |
| count += 1 | |
| return f"no success, waited {5 * timeout} seconds" | |
| def get_s3_output(output_s3_uri): | |
| '''Get the resulting object from S3.''' | |
| print('Fetching resulting object:', output_s3_uri) | |
| s3_bucket = output_s3_uri.split('/')[2] | |
| s3_key = "/".join(output_s3_uri.split('/')[3:]) | |
| return s3.get_object(Bucket=s3_bucket, Key=s3_key) | |
| def parse_bda_metadata(bda_metadata): | |
| for bda_segment in bda_metadata['segment_metadata']: | |
| if 'standard_output_path' in bda_segment: | |
| output_obj = get_s3_output(bda_segment['standard_output_path']) | |
| bda_segment['standard_output_path'] = json.load(output_obj['Body']) | |
| if 'custom_output_path' in bda_segment: | |
| output_obj = get_s3_output(bda_segment['custom_output_path']) | |
| bda_segment['custom_output_path'] = json.load(output_obj['Body']) | |
| yield {'type': 'segment', 'value': bda_segment} | |
| def parse_bda_output(bda_output): | |
| for output_metadata in bda_output['output_metadata']: | |
| yield {'type': 'metadata', 'value': output_metadata} | |
| yield from parse_bda_metadata(output_metadata) | |
| def process_image_for_summary(filepath): | |
| '''Full workflow, processing of the file.''' | |
| uploaded_obj = upload_to_s3(filepath) | |
| invocation_arn = invoke_data_automation(uploaded_obj) | |
| result = wait_for_success(invocation_arn) | |
| output_obj = get_s3_output(result['outputConfiguration']['s3Uri']) | |
| output = json.load(output_obj['Body']) | |
| for parsed_output in parse_bda_output(output): | |
| if parsed_output['type'] == 'metadata': | |
| print('## Metadata:\n', json.dumps(parsed_output['value'], indent=2)) | |
| if parsed_output['type'] == 'segment': | |
| print('## Segment:\n', json.dumps(parsed_output['value'], indent=2)) |
| [project] | |
| name = "250824-bda" | |
| version = "0.1.0" | |
| description = "Add your description here" | |
| readme = "README.md" | |
| requires-python = ">=3.12" | |
| dependencies = [ | |
| "boto3>=1.40.16", | |
| "dotenv>=0.9.9", | |
| ] | |
| [dependency-groups] | |
| dev = [ | |
| "ipython>=9.4.0", | |
| ] |