Created
October 19, 2024 07:47
-
-
Save raghavddps2/b842560c0cd7eb9cba0f80c21d7c3d3a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import json | |
| import base64 | |
| import asyncio | |
| import websockets | |
| from fastapi import FastAPI, WebSocket, Request | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.websockets import WebSocketDisconnect | |
| from twilio.twiml.voice_response import VoiceResponse, Connect, Say, Stream | |
| import boto3 | |
| from dotenv import load_dotenv | |
| import json | |
| import base64 | |
| import asyncio | |
| import boto3 | |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect | |
| load_dotenv() | |
| # Configuration | |
| AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY') | |
| AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY') | |
| PORT = int(os.getenv('PORT', 5050)) | |
| SYSTEM_MESSAGE = ( | |
| "You are a helpful and bubbly AI assistant who loves to chat about " | |
| "anything the user is interested in and is prepared to offer them facts. " | |
| "You have a penchant for dad jokes, owl jokes, and rickrolling – subtly. " | |
| "Always stay positive, but work in a joke when appropriate." | |
| ) | |
| VOICE = 'alloy' | |
| app = FastAPI() | |
| if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: | |
| raise ValueError('Missing the OpenAI API key. Please set it in the .env file.') | |
| @app.get("/") | |
| async def index_page(): | |
| return {"message": "Twilio Media Stream Server is running!"} | |
| @app.api_route("/incoming-call", methods=["GET", "POST"]) | |
| async def handle_incoming_call(request: Request): | |
| response = VoiceResponse() | |
| response.say("Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API") | |
| response.pause(length=1) | |
| response.say("O.K. you can start talking!") | |
| host = request.url.hostname | |
| connect = Connect() | |
| connect.stream(url=f'wss://{host}/media-stream') | |
| response.append(connect) | |
| return HTMLResponse(content=str(response), media_type="application/xml") | |
| # Initialize the Bedrock client | |
| session = boto3.Session( | |
| aws_access_key_id=AWS_ACCESS_KEY, | |
| aws_secret_access_key=AWS_SECRET_KEY, | |
| region_name='us-west-2' | |
| ) | |
| bedrock_client = session.client('bedrock') | |
| async def invoke_model_with_response_stream(model_id, input_data): | |
| """Invoke a model in AWS Bedrock with streaming response.""" | |
| response = bedrock_client.invoke_model_with_response_stream( | |
| modelId=model_id, | |
| body=json.dumps(input_data), | |
| contentType='application/json' | |
| ) | |
| return response['body'] # This will be a stream | |
| @app.websocket("/media-stream") | |
| async def handle_media_stream(websocket: WebSocket): | |
| """Handle WebSocket connections between Twilio and AWS Bedrock.""" | |
| print("Client connected") | |
| await websocket.accept() | |
| async def receive_from_twilio(): | |
| """Receive audio data from Twilio and send it to Bedrock.""" | |
| try: | |
| async for message in websocket.iter_text(): | |
| data = json.loads(message) | |
| if data['event'] == 'media': | |
| audio_payload = data['media']['payload'] | |
| input_data = { | |
| "prompt": audio_payload, # Adjust as per your model's requirements | |
| "max_tokens": 50 # Adjust as needed | |
| } | |
| response_stream = await invoke_model_with_response_stream('', input_data) | |
| async for response in response_stream.iter_lines(): | |
| if response: | |
| await handle_bedrock_response(response, websocket) | |
| elif data['event'] == 'start': | |
| print(f"Incoming stream has started: {data['start']['streamSid']}") | |
| except WebSocketDisconnect: | |
| print("Client disconnected.") | |
| async def handle_bedrock_response(response, websocket): | |
| """Process the response from Bedrock and send audio back to Twilio.""" | |
| try: | |
| response_data = json.loads(response) | |
| if response_data.get('type') == 'response.audio.delta' and response_data.get('delta'): | |
| audio_payload = base64.b64encode(base64.b64decode(response_data['delta'])).decode('utf-8') | |
| audio_delta = { | |
| "event": "media", | |
| "streamSid": response_data.get('streamSid'), # Assuming this is part of the response | |
| "media": { | |
| "payload": audio_payload | |
| } | |
| } | |
| await websocket.send_json(audio_delta) | |
| except Exception as e: | |
| print(f"Error processing audio data: {e}") | |
| await receive_from_twilio() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=PORT) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment