|
import os |
|
import boto3 |
|
import uuid |
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
aws_region_name = os.environ.get('AWS_REGION_NAME') |
|
bedrock_agent_alias_id = os.environ.get('BEDROCK_AGENT_ALIAS_ID') |
|
|
|
bedrock_agent_id = os.environ.get('BEDROCK_AGENT_ID') |
|
bedrock_agent = boto3.client('bedrock-agent-runtime', region_name=aws_region_name) |
|
|
|
|
|
SAMPLE_MESSAGES = { |
|
'logistics': './samples/logistics.txt', |
|
'spam': './samples/spam.txt', |
|
'accounts': './samples/accounts.txt', |
|
'management': './samples/management.txt' |
|
} |
|
|
|
|
|
def load_message(message_type): |
|
assert message_type in SAMPLE_MESSAGES, f'invalid message type: must be one of {SAMPLE_MESSAGES.keys()}' |
|
|
|
with open(SAMPLE_MESSAGES[message_type], 'r') as fp: |
|
return fp.read() |
|
|
|
|
|
def invoke_agent(prompt, session_id=None): |
|
|
|
if not session_id: |
|
session_id = str(uuid.uuid4()) |
|
|
|
response = bedrock_agent.invoke_agent( |
|
agentId=bedrock_agent_id, |
|
agentAliasId=bedrock_agent_alias_id, |
|
inputText=prompt, |
|
sessionId=session_id, |
|
) |
|
|
|
return { |
|
'sessionId': response['sessionId'], |
|
'completion': response['completion'] |
|
} |
|
|
|
|
|
def print_response(response): |
|
|
|
assert 'chunk' in response, 'no chunk in response' |
|
|
|
output = response['chunk']['bytes'].decode() |
|
|
|
print(f'## Response:\n{output}\n') |
|
|
|
return output |
|
|
|
|
|
def process_inbound_email(message): |
|
|
|
response = invoke_agent(message) |
|
|
|
outputs = [] |
|
for c in response['completion']: |
|
print_response(c) |
|
outputs.append(c) |
|
|
|
return response['sessionId'], outputs |