Skip to content

Instantly share code, notes, and snippets.

@raghavddps2
Created October 19, 2024 07:47
Show Gist options
  • Select an option

  • Save raghavddps2/b842560c0cd7eb9cba0f80c21d7c3d3a to your computer and use it in GitHub Desktop.

Select an option

Save raghavddps2/b842560c0cd7eb9cba0f80c21d7c3d3a to your computer and use it in GitHub Desktop.
import os
import json
import base64
import asyncio
import websockets
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
from fastapi.websockets import WebSocketDisconnect
from twilio.twiml.voice_response import VoiceResponse, Connect, Say, Stream
import boto3
from dotenv import load_dotenv
import json
import base64
import asyncio
import boto3
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
load_dotenv()
# Configuration
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
PORT = int(os.getenv('PORT', 5050))
SYSTEM_MESSAGE = (
"You are a helpful and bubbly AI assistant who loves to chat about "
"anything the user is interested in and is prepared to offer them facts. "
"You have a penchant for dad jokes, owl jokes, and rickrolling – subtly. "
"Always stay positive, but work in a joke when appropriate."
)
VOICE = 'alloy'
app = FastAPI()
if not AWS_ACCESS_KEY or not AWS_SECRET_KEY:
raise ValueError('Missing the OpenAI API key. Please set it in the .env file.')
@app.get("/")
async def index_page():
return {"message": "Twilio Media Stream Server is running!"}
@app.api_route("/incoming-call", methods=["GET", "POST"])
async def handle_incoming_call(request: Request):
response = VoiceResponse()
response.say("Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API")
response.pause(length=1)
response.say("O.K. you can start talking!")
host = request.url.hostname
connect = Connect()
connect.stream(url=f'wss://{host}/media-stream')
response.append(connect)
return HTMLResponse(content=str(response), media_type="application/xml")
# Initialize the Bedrock client
session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name='us-west-2'
)
bedrock_client = session.client('bedrock')
async def invoke_model_with_response_stream(model_id, input_data):
"""Invoke a model in AWS Bedrock with streaming response."""
response = bedrock_client.invoke_model_with_response_stream(
modelId=model_id,
body=json.dumps(input_data),
contentType='application/json'
)
return response['body'] # This will be a stream
@app.websocket("/media-stream")
async def handle_media_stream(websocket: WebSocket):
"""Handle WebSocket connections between Twilio and AWS Bedrock."""
print("Client connected")
await websocket.accept()
async def receive_from_twilio():
"""Receive audio data from Twilio and send it to Bedrock."""
try:
async for message in websocket.iter_text():
data = json.loads(message)
if data['event'] == 'media':
audio_payload = data['media']['payload']
input_data = {
"prompt": audio_payload, # Adjust as per your model's requirements
"max_tokens": 50 # Adjust as needed
}
response_stream = await invoke_model_with_response_stream('', input_data)
async for response in response_stream.iter_lines():
if response:
await handle_bedrock_response(response, websocket)
elif data['event'] == 'start':
print(f"Incoming stream has started: {data['start']['streamSid']}")
except WebSocketDisconnect:
print("Client disconnected.")
async def handle_bedrock_response(response, websocket):
"""Process the response from Bedrock and send audio back to Twilio."""
try:
response_data = json.loads(response)
if response_data.get('type') == 'response.audio.delta' and response_data.get('delta'):
audio_payload = base64.b64encode(base64.b64decode(response_data['delta'])).decode('utf-8')
audio_delta = {
"event": "media",
"streamSid": response_data.get('streamSid'), # Assuming this is part of the response
"media": {
"payload": audio_payload
}
}
await websocket.send_json(audio_delta)
except Exception as e:
print(f"Error processing audio data: {e}")
await receive_from_twilio()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment