Skip to content

Instantly share code, notes, and snippets.

@kkrishnan90
Created September 19, 2025 22:42
Show Gist options
  • Select an option

  • Save kkrishnan90/0595e7a9bb2813c2b014b0c3d51e9d70 to your computer and use it in GitHub Desktop.

Select an option

Save kkrishnan90/0595e7a9bb2813c2b014b0c3d51e9d70 to your computer and use it in GitHub Desktop.
import asyncio
import os
import traceback
import uuid # Added for generating unique IDs
import sys # Added for stdout redirection
import io # Added for stdout redirection
import json # Added for parsing log strings
from quart import Quart, websocket, jsonify
from quart_cors import cors
from websockets.exceptions import ConnectionClosedOK
import google.genai as genai
from google.genai import types # Crucial for Content, Part, Blob
from dotenv import load_dotenv
from datetime import datetime, timezone # For timestamping raw stdout logs
from google.genai.types import Tool, GoogleSearch
from gemini_tools import (
travel_tool,
NameCorrectionAgent,
SpecialClaimAgent,
Enquiry_Tool,
Eticket_Sender_Agent,
ObservabilityAgent,
DateChangeAgent,
Connect_To_Human_Tool,
Booking_Cancellation_Agent,
Flight_Booking_Details_Agent,
Webcheckin_And_Boarding_Pass_Agent
)
from travel_mock_data import GLOBAL_LOG_STORE # Import the global log store
load_dotenv()
# --- Log Capturing Setup ---
CAPTURED_STDOUT_LOGS = []
googel_search_tool = Tool(google_search=GoogleSearch())
_original_stdout = sys.stdout
class StdoutTee(io.TextIOBase):
def __init__(self, original_stdout, log_list):
self._original_stdout = original_stdout
self._log_list = log_list
def write(self, s):
self._original_stdout.write(s) # Write to original stdout (console)
s_stripped = s.strip()
if s_stripped: # Avoid empty lines
try:
# Attempt to parse as JSON, assuming logs from gemini_tools are JSON strings
log_entry = json.loads(s_stripped)
# Ensure it has the expected structure for frontend if it's a TOOL_EVENT
if isinstance(log_entry, dict) and log_entry.get("log_type") == "TOOL_EVENT":
self._log_list.append(log_entry)
else: # Not a TOOL_EVENT or not a dict, store as raw with context
self._log_list.append({
"timestamp": datetime.now(timezone.utc).isoformat(),
"log_type": "RAW_STDOUT",
"message": s_stripped,
"parsed_json": log_entry if isinstance(log_entry, dict) else None
})
except json.JSONDecodeError:
# If it's not JSON, store it as a raw string entry
self._log_list.append({
"timestamp": datetime.now(timezone.utc).isoformat(),
"log_type": "RAW_STDOUT",
"message": s_stripped
})
return len(s)
def flush(self):
self._original_stdout.flush()
sys.stdout = StdoutTee(_original_stdout, CAPTURED_STDOUT_LOGS)
# --- End Log Capturing Setup ---
try:
use_vertex_ai = os.getenv(
"GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true"
if use_vertex_ai:
project_id = os.getenv("GOOGLE_CLOUD_PROJECT_ID")
location = os.getenv("GOOGLE_CLOUD_LOCATION")
if not project_id or not location:
raise ValueError(
"GOOGLE_CLOUD_PROJECT_ID and LOCATION must be set in .env when using Vertex AI"
)
gemini_client = genai.Client(
vertexai=True, project=project_id, location=location
)
print(
f"✅ Gemini client initialized successfully using Vertex AI (Project: {project_id}, Location: {location})"
)
else:
gemini_client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
print("✅ Gemini client initialized successfully using API Key")
except Exception as e:
print(f"❌ Failed to initialize Gemini client: {e}")
raise
GEMINI_MODEL_NAME = os.getenv(
"GEMINI_MODEL_NAME", "gemini-2.5-flash-live-preview") # Load from environment
INPUT_SAMPLE_RATE = 16000
print(f"🤖 Using Gemini model: {GEMINI_MODEL_NAME}")
app = Quart(__name__)
app = cors(app, allow_origin="*")
@app.websocket("/listen")
async def websocket_endpoint():
print("🌐 WebSocket: Connection accepted from client")
current_session_handle = None # Initialize session handle
client_ready_for_audio = False # Track client audio readiness
initial_audio_buffer = [] # Buffer for initial audio chunks
connection_start_time = asyncio.get_event_loop().time() # Track connection start
# Force Hindi language for all transcription
language_code_to_use = "en-US"
print(f"🗣️ Forcing US English transcription: en-US")
# Check if VAD should be disabled to prevent audio feedback
disable_vad = os.getenv("DISABLE_VAD", "false").lower() == "true"
print(
f"🎙️ Voice Activity Detection: {'DISABLED' if disable_vad else 'ENABLED'}")
gemini_live_config = types.LiveConnectConfig(
response_modalities=["AUDIO"], # Matched to reference
system_instruction="""***Role and Persona***
You are an AI Assistant that will **ONLY** speak in English and help answer generic queries unmistakeably using google_search_tool. If user asks for any explicit query
related to travel and bookings, use the **Explicit Tool Triggers** unmistakeably to call the appropriate tool.
***Tool Triggers***
**Explicit Tool Triggers:**
* If the user explicitly asks to **cancel**, call `Booking_Cancellation_Agent`.
* If the user explicitly asks for **web check-in**, call `Webcheckin_And_Boarding_Pass_Agent`.
* If the user explicitly asks for an **e-ticket**, call `Eticket_Sender_Agent`.
* If the user explicitly asks to **correct a name**, call `NameCorrectionAgent`.
* If the user explicitly mentions a **special claim**, call `SpecialClaimAgent`.
* If the user explicitly asks to **check a refund status**, call `ObservabilityAgent`.
* If the user explicitly asks to **change a date**, call `DateChangeAgent`.
* If the user is **frustrated**, call `Connect_To_Human_Tool`.
""",
speech_config=types.SpeechConfig(
language_code=language_code_to_use,
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name="Zephyr"
)
)
),
input_audio_transcription={},
output_audio_transcription={},
session_resumption=types.SessionResumptionConfig(
handle=current_session_handle), # Added from reference
context_window_compression=types.ContextWindowCompressionConfig( # Added from reference
sliding_window=types.SlidingWindow(),
),
realtime_input_config=types.RealtimeInputConfig(
automatic_activity_detection=types.AutomaticActivityDetection(
disabled=disable_vad,
start_of_speech_sensitivity=types.StartSensitivity.START_SENSITIVITY_LOW,
end_of_speech_sensitivity=types.EndSensitivity.END_SENSITIVITY_LOW,
prefix_padding_ms=50, # Reduced from 200ms to minimize false triggers
silence_duration_ms=2000, # Increased from 800ms to 2000ms for better stability
)
),
generationConfig=types.GenerationConfig(
thinkingConfig=types.GenerationConfigThinkingConfig(
includeThoughts=False
)
),
tools=[travel_tool, googel_search_tool] # Added travel_tool here
)
print(
f"🧳 Travel tool configured with {len(travel_tool.function_declarations)} functions:")
for func in travel_tool.function_declarations:
print(f" - {func.name}")
print(
f"🤖 Attempting to connect to Gemini Live API (model: {GEMINI_MODEL_NAME})...")
try:
async with gemini_client.aio.live.connect(
model=GEMINI_MODEL_NAME,
config=gemini_live_config
) as session:
print("✅ Successfully connected to Gemini Live API")
# print(f"Quart Backend: Gemini session connected for model {GEMINI_MODEL_NAME} with tools.")
active_processing = True
async def handle_client_input_and_forward():
nonlocal active_processing, client_ready_for_audio, initial_audio_buffer
# print("Quart Backend: Starting handle_client_input_and_forward task.")
try:
while active_processing:
try:
client_data = await asyncio.wait_for(websocket.receive(), timeout=0.2)
if isinstance(client_data, str):
message_text = client_data
# print(f"Quart Backend: Received text from client: '{message_text}'")
# Handle client readiness signal
if message_text == "CLIENT_AUDIO_READY":
client_ready_for_audio = True
print(
"🔊 Client audio ready - flushing buffered audio")
# Flush any buffered audio chunks
for buffered_chunk in initial_audio_buffer:
try:
await websocket.send(buffered_chunk)
except Exception as send_exc:
print(
f"Error sending buffered audio: {send_exc}")
initial_audio_buffer.clear()
continue
prompt_for_gemini = message_text
if message_text == "SEND_TEST_AUDIO_PLEASE":
prompt_for_gemini = "Hello Gemini, please say 'testing one two three'."
# print(f"Quart Backend: Sending text prompt to Gemini: '{prompt_for_gemini}'")
user_content_for_text = types.Content(
role="user",
parts=[types.Part(
text=prompt_for_gemini)]
)
await session.send_client_content(turns=user_content_for_text)
# print(f"Quart Backend: Prompt '{prompt_for_gemini}' sent to Gemini.")
elif isinstance(client_data, bytes):
audio_chunk = client_data
if audio_chunk:
# print(f"Quart Backend: Received mic audio chunk: {len(audio_chunk)} bytes")
# print(f"Quart Backend: Sending audio chunk ({len(audio_chunk)} bytes) to Gemini via send_realtime_input...")
await session.send_realtime_input(
media=types.Blob(
mime_type=f"audio/pcm;rate={INPUT_SAMPLE_RATE}",
data=audio_chunk
)
)
# print(f"Quart Backend: Successfully sent mic audio to Gemini via send_realtime_input.")
else:
print(
f"Quart Backend: Received unexpected data type from client: {type(client_data)}, content: {client_data[:100] if isinstance(client_data, bytes) else client_data}")
except asyncio.TimeoutError:
if not active_processing:
break
continue # Normal timeout, continue listening
except ConnectionClosedOK:
print("INFO: Client closed the connection.")
active_processing = False
break
except Exception as e_fwd_outer:
print(
f"Quart Backend: Outer error in handle_client_input_and_forward: {type(e_fwd_outer).__name__}: {e_fwd_outer}")
traceback.print_exc()
active_processing = False # Ensure outer errors also stop processing
finally:
# print("Quart Backend: Stopped handling client input.")
active_processing = False # Ensure graceful shutdown of the other task
async def receive_from_gemini_and_forward_to_client():
nonlocal active_processing, current_session_handle, client_ready_for_audio, initial_audio_buffer, connection_start_time
# print("Quart Backend: Starting receive_from_gemini_and_forward_to_client task.")
available_functions = {
"NameCorrectionAgent": NameCorrectionAgent,
"SpecialClaimAgent": SpecialClaimAgent,
"Enquiry_Tool": Enquiry_Tool,
"Eticket_Sender_Agent": Eticket_Sender_Agent,
"ObservabilityAgent": ObservabilityAgent,
"DateChangeAgent": DateChangeAgent,
"Connect_To_Human_Tool": Connect_To_Human_Tool,
"Booking_Cancellation_Agent": Booking_Cancellation_Agent,
"Flight_Booking_Details_Agent": Flight_Booking_Details_Agent,
"Webcheckin_And_Boarding_Pass_Agent": Webcheckin_And_Boarding_Pass_Agent
}
current_user_utterance_id = None
# Renamed from latest_user_speech_text and initialized
accumulated_user_speech_text = ""
current_model_utterance_id = None
accumulated_model_speech_text = ""
try:
while active_processing:
had_gemini_activity_in_this_iteration = False
async for response in session.receive():
had_gemini_activity_in_this_iteration = True
if not active_processing:
break
if response.session_resumption_update:
update = response.session_resumption_update
if update.resumable and update.new_handle:
current_session_handle = update.new_handle
# print(f"Quart Backend: Received session resumption update. New handle: {current_session_handle}")
if hasattr(response, 'session_handle') and response.session_handle:
new_handle = response.session_handle
if new_handle != current_session_handle:
current_session_handle = new_handle
# print(f"Quart Backend: Updated session handle from direct response.session_handle: {current_session_handle}")
if response.data is not None:
try:
current_time = asyncio.get_event_loop().time()
time_since_connection = current_time - connection_start_time
# Auto-flush buffer after 3 seconds if client hasn't signaled readiness
if not client_ready_for_audio and time_since_connection > 3.0:
print(
"⏰ Client readiness timeout - auto-flushing buffer and marking ready")
client_ready_for_audio = True
# Flush buffered audio
for buffered_chunk in initial_audio_buffer:
try:
await websocket.send(buffered_chunk)
except Exception as send_exc:
print(
f"Error sending timeout-flushed audio: {send_exc}")
initial_audio_buffer.clear()
if client_ready_for_audio:
# Client is ready, send audio directly
await websocket.send(response.data)
print(
f"🔊 Sent audio chunk ({len(response.data)} bytes) to ready client")
else:
# Client not ready, buffer the audio chunk
initial_audio_buffer.append(
response.data)
print(
f"📦 Buffered audio chunk ({len(response.data)} bytes) - client not ready (t+{time_since_connection:.1f}s)")
# Limit buffer size to prevent memory issues (keep last 10 seconds worth)
# Roughly 10 seconds at ~20 chunks/sec
if len(initial_audio_buffer) > 200:
initial_audio_buffer.pop(0)
print(
"🗑️ Removed oldest buffered chunk to prevent memory overflow")
except Exception as send_exc:
print(
f"Quart Backend: Error sending audio data to client WebSocket: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
elif response.server_content:
if response.server_content.interrupted:
print(
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print(
"Quart Backend: Gemini server sent INTERRUPTED signal.")
print(
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
try:
await websocket.send_json({"type": "interrupt_playback"})
# print("Quart Backend: Sent interrupt_playback signal to client.")
except Exception as send_exc:
print(
f"Quart Backend: Error sending interrupt_playback signal to client: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
# User Input Processing
if response.server_content and hasattr(response.server_content, 'input_transcription') and \
response.server_content.input_transcription and \
hasattr(response.server_content.input_transcription, 'text') and \
response.server_content.input_transcription.text: # Ensure text is not empty
user_speech_chunk = response.server_content.input_transcription.text
if current_user_utterance_id is None: # Start of a new user utterance
current_user_utterance_id = str(
uuid.uuid4())
accumulated_user_speech_text = "" # Reset accumulator for new utterance
accumulated_user_speech_text += user_speech_chunk
if accumulated_user_speech_text: # Only send if there's actual accumulated text
payload = {
'id': current_user_utterance_id,
'text': accumulated_user_speech_text, # Send accumulated text
'sender': 'user',
'type': 'user_transcription_update',
'is_final': False
}
try:
await websocket.send_json(payload)
# Removed excessive user input logging
except Exception as send_exc:
print(
f"Quart Backend: Error sending user transcription update to client: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
# Model Output Processing
if response.server_content and hasattr(response.server_content, 'output_transcription') and \
response.server_content.output_transcription and \
hasattr(response.server_content.output_transcription, 'text') and \
response.server_content.output_transcription.text:
if current_model_utterance_id is None:
current_model_utterance_id = str(
uuid.uuid4())
accumulated_model_speech_text = "" # Ensure accumulator is clear
chunk = response.server_content.output_transcription.text
if chunk: # Only process if chunk has content
accumulated_model_speech_text += chunk
payload = {
'id': current_model_utterance_id,
'text': accumulated_model_speech_text, # Send accumulated text
'sender': 'model',
'type': 'model_response_update',
'is_final': False
}
try:
await websocket.send_json(payload)
# Removed excessive model output logging
except Exception as send_exc:
print(
f"Quart Backend: Error sending model response update to client: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
# Handling Model Generation Completion
if response.server_content and hasattr(response.server_content, 'generation_complete') and \
response.server_content.generation_complete == True:
if current_model_utterance_id and accumulated_model_speech_text: # Ensure there was a model utterance
payload = {
'id': current_model_utterance_id,
'text': accumulated_model_speech_text,
'sender': 'model',
'type': 'model_response_update',
'is_final': True
}
try:
await websocket.send_json(payload)
# Removed excessive final model output logging
except Exception as send_exc:
print(
f"Quart Backend: Error sending final model response to client: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
current_model_utterance_id = None # Reset for next model utterance
accumulated_model_speech_text = ""
# Handling Turn Completion (Finalizing User Speech)
if response.server_content and hasattr(response.server_content, 'turn_complete') and \
response.server_content.turn_complete == True:
if current_user_utterance_id and accumulated_user_speech_text: # Ensure there was a user utterance
payload = {
'id': current_user_utterance_id,
'text': accumulated_user_speech_text, # Send final accumulated text
'sender': 'user',
'type': 'user_transcription_update',
'is_final': True
}
try:
await websocket.send_json(payload)
print(
f"🎤 User said: {accumulated_user_speech_text}")
except Exception as send_exc:
print(
f"Quart Backend: Error sending final user transcription to client: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
current_user_utterance_id = None # Reset for next user utterance
accumulated_user_speech_text = "" # Reset accumulator
# Also reset model states
current_model_utterance_id = None
accumulated_model_speech_text = ""
# Removed excessive turn complete logging
# Fallback for other potential text or error structures (simplified)
is_transcription_related = (hasattr(response.server_content, 'input_transcription') and response.server_content.input_transcription) or \
(hasattr(response.server_content, 'output_transcription')
and response.server_content.output_transcription)
is_control_signal = (hasattr(response.server_content, 'generation_complete') and response.server_content.generation_complete) or \
(hasattr(response.server_content, 'turn_complete') and response.server_content.turn_complete) or\
(hasattr(
response.server_content, 'interrupted') and response.server_content.interrupted)
if not response.data and not is_transcription_related and not is_control_signal:
unhandled_text = None
if response.text:
unhandled_text = response.text
elif hasattr(response.server_content, 'model_turn') and response.server_content.model_turn and \
hasattr(response.server_content.model_turn, 'parts'):
for part in response.server_content.model_turn.parts:
if part.text:
unhandled_text = (
unhandled_text + " " if unhandled_text else "") + part.text
elif hasattr(response.server_content, 'output_text') and response.server_content.output_text:
unhandled_text = response.server_content.output_text
if unhandled_text:
print(
f"Quart Backend: Received unhandled server_content text: {unhandled_text}")
elif not response.tool_call:
print(
f"Quart Backend: Received server_content without primary data or known text parts: {response.server_content}")
elif response.tool_call:
print(
f"\033[92mQuart Backend: Received tool_call from Gemini: {response.tool_call}\033[0m")
function_responses = []
for fc in response.tool_call.function_calls:
print(
f"\033[92mQuart Backend: Gemini requests function call: {fc.name} with args: {dict(fc.args)}\033[0m")
function_to_call = available_functions.get(
fc.name)
function_response_content = None
if function_to_call:
try:
# Execute the actual local function
function_args = dict(fc.args)
print(
f"\033[92mQuart Backend: Calling function {fc.name} with args: {function_args}\033[0m")
# Await the async function call
result = await function_to_call(**function_args)
if isinstance(result, str):
function_response_content = {
"content": result}
else:
# Assumes result is already a dict if not a string
function_response_content = result
print(
f"\033[92mQuart Backend: Function {fc.name} executed. Result: {result}\033[0m")
except Exception as e:
print(
f"Quart Backend: Error executing function {fc.name}: {e}")
traceback.print_exc() # Add if not already there
function_response_content = {
"status": "error", "message": str(e)}
else:
print(
f"Quart Backend: Function {fc.name} not found.")
function_response_content = {
"status": "error", "message": f"Function {fc.name} not implemented or available."}
function_response = types.FunctionResponse(
id=fc.id,
name=fc.name,
response=function_response_content
)
function_responses.append(
function_response)
if function_responses:
print(
f"\033[92mQuart Backend: Sending {len(function_responses)} function response(s) to Gemini.\033[0m")
await session.send_tool_response(function_responses=function_responses)
else:
print(
"Quart Backend: No function responses generated for tool_call.")
elif hasattr(response, 'error') and response.error:
error_details = response.error
if hasattr(response.error, 'message'):
error_details = response.error.message
print(
f"Quart Backend: Gemini Error in response: {error_details}")
try:
await websocket.send(f"[ERROR_FROM_GEMINI]: {str(error_details)}")
except Exception as send_exc:
print(
f"Quart Backend: Error sending Gemini error to client WebSocket: {type(send_exc).__name__}: {send_exc}")
active_processing = False
break
# Removed the separate turn_complete log here as it's handled above with user speech sending.
if not active_processing:
break
if not had_gemini_activity_in_this_iteration and active_processing:
await asyncio.sleep(0.1)
elif had_gemini_activity_in_this_iteration and active_processing:
pass
except ConnectionClosedOK:
print("INFO: Connection to client closed.")
active_processing = False
finally:
# print("Quart Backend: Stopped receiving from Gemini.")
active_processing = False # Ensure graceful shutdown of the other task
forward_task = asyncio.create_task(
handle_client_input_and_forward(), name="ClientInputForwarder")
receive_task = asyncio.create_task(
receive_from_gemini_and_forward_to_client(), name="GeminiReceiver")
try:
await asyncio.gather(forward_task, receive_task)
except Exception as e_gather:
print(
f"Quart Backend: Exception during asyncio.gather: {type(e_gather).__name__}: {e_gather}")
traceback.print_exc() # Added traceback
finally:
active_processing = False
if not forward_task.done():
forward_task.cancel()
if not receive_task.done():
receive_task.cancel()
try:
await forward_task
except asyncio.CancelledError:
# print(f"Quart Backend: Task {forward_task.get_name()} was cancelled during cleanup.")
pass # Task cancellation is an expected part of shutdown
except Exception as e_fwd_cleanup:
print(
f"Quart Backend: Error during forward_task cleanup: {e_fwd_cleanup}")
traceback.print_exc() # Added traceback
try:
await receive_task
except asyncio.CancelledError:
# print(f"Quart Backend: Task {receive_task.get_name()} was cancelled during cleanup.")
pass # Task cancellation is an expected part of shutdown
except Exception as e_rcv_cleanup:
print(
f"Quart Backend: Error during receive_task cleanup: {e_rcv_cleanup}")
traceback.print_exc() # Added traceback
# print("Quart Backend: Gemini interaction tasks finished.")
except asyncio.CancelledError:
print("⚠️ WebSocket connection cancelled (client disconnected)")
pass # Expected on client disconnect
except TimeoutError as e_timeout:
print(f"⏰ Timeout connecting to Gemini Live API: {e_timeout}")
print("🔍 This could be due to:")
print(" - Network connectivity issues")
print(" - API key problems")
print(" - Google service unavailability")
print(" - Firewall blocking WebSocket connections")
traceback.print_exc()
except Exception as e_ws_main:
print(
f"❌ UNHANDLED error in WebSocket connection: {type(e_ws_main).__name__}: {e_ws_main}")
traceback.print_exc()
finally:
print("🔚 WebSocket endpoint processing finished")
@app.route("/api/logs", methods=["GET"])
async def get_logs():
"""API endpoint to fetch captured logs."""
# Combine logs from BQ's global store and our captured stdout logs
# Return copies to avoid issues if the lists are modified during serialization
combined_logs = list(GLOBAL_LOG_STORE) + list(CAPTURED_STDOUT_LOGS)
# Optional: Sort by timestamp if all logs have a compatible timestamp field
# For now, just concatenating. Assuming GLOBAL_LOG_STORE entries also have a timestamp
# or can be ordered meaningfully with the new TOOL_EVENT logs.
# If sorting is needed:
# combined_logs.sort(key=lambda x: x.get("timestamp", ""), reverse=True) # Example sort
return jsonify(combined_logs)
# To run this Quart application:
# 1. Install dependencies: pip install quart quart-cors google-generativeai python-dotenv hypercorn
# 2. Set your GEMINI_API_KEY environment variable in a .env file or your system environment.
# 3. Run using Hypercorn:
# hypercorn main:app --bind 0.0.0.0:8000
# Or, for development with auto-reload:
# quart run --host 0.0.0.0 --port 8000 --reload
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment