Skip to content

Instantly share code, notes, and snippets.

@jamsea
Created August 26, 2025 06:22
Show Gist options
  • Select an option

  • Save jamsea/f040a3b08247ffc071aa5e96cbee7eed to your computer and use it in GitHub Desktop.

Select an option

Save jamsea/f040a3b08247ffc071aa5e96cbee7eed to your computer and use it in GitHub Desktop.
Pipecat Cloud Twilio Bot with Enhanced Sentry Integration - Fixes for log consistency and heartbeat warning issues
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sentry_sdk
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.metrics.sentry import SentryMetrics
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import parse_telephony_websocket
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from sentry_sdk.integrations.loguru import LoggingLevels, LoguruIntegration
load_dotenv(override=True)
async def periodic_log_flush():
"""Periodically flush Sentry logs to prevent loss during system stress."""
while True:
try:
await asyncio.sleep(10) # Flush every 10 seconds
if SENTRY_DSN:
sentry_sdk.flush(timeout=1)
except Exception as e:
logger.debug(f"Periodic flush error (non-critical): {e}")
logger.info("Starting bot initialization process")
logger.debug("Environment variables loading...")
SENTRY_DSN = os.getenv("SENTRY_DSN")
logger.info(f"SENTRY_DSN found: {'Yes' if SENTRY_DSN else 'No'}")
if SENTRY_DSN:
logger.info("Initializing Sentry SDK...")
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
default_integrations=True,
environment=os.getenv("ENV"),
integrations=[
LoguruIntegration(
level=LoggingLevels.INFO.value, # Capture INFO and above as breadcrumbs
event_level=LoggingLevels.ERROR.value, # Send ERROR logs as events
)
],
# Improve reliability in cloud environments
send_default_pii=False,
attach_stacktrace=True,
)
logger.info(f"Sentry initialized - ENV: {os.getenv('ENV')}")
logger.debug("Sentry configuration completed successfully")
# Test logs to verify Sentry integration
logger.warning("This is a test warning log to verify Sentry integration")
logger.error("This is a test error log to verify Sentry integration")
# Initial flush to ensure test logs are sent
sentry_sdk.flush(timeout=3)
else:
logger.error("No SENTRY_DSN found, Sentry not initialized")
logger.warning("Sentry integration will be disabled")
async def run_bot(transport: BaseTransport):
"""Run your bot with the provided transport.
Args:
transport (BaseTransport): The transport to use for communication.
"""
logger.info("Starting run_bot function")
logger.debug(f"Transport type: {type(transport).__name__}")
# Configure your STT, LLM, and TTS services here
# Swap out different processors or properties to customize your bot
logger.info("Initializing STT service...")
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY", ""),
metrics=SentryMetrics() if SENTRY_DSN else None,
)
logger.debug("STT service initialized successfully")
logger.info("Initializing LLM service...")
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY", ""),
model="gpt-4o",
metrics=SentryMetrics() if SENTRY_DSN else None,
)
logger.debug("LLM service initialized successfully")
logger.info("Initializing TTS service...")
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY", ""),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
metrics=SentryMetrics() if SENTRY_DSN else None,
)
logger.debug("TTS service initialized successfully")
# Set up the initial context for the conversation
# You can specified initial system and assistant messages here
logger.info("Setting up conversation context...")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.",
},
]
logger.debug(f"Initial messages configured: {len(messages)} messages")
# Define and register tools as required
logger.info("Configuring tools...")
logger.debug("Tools configuration completed")
# This sets up the LLM context by providing messages and tools
logger.info("Creating LLM context aggregator...")
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
logger.debug("LLM context aggregator created successfully")
# A core voice AI pipeline
# Add additional processors to customize the bot's behavior
logger.info("Building pipeline...")
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
logger.debug("Pipeline built successfully")
logger.info("Creating pipeline task...")
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
enable_metrics=True,
enable_usage_metrics=True,
),
)
logger.debug("Pipeline task created successfully")
# Add error handler for pipeline events, especially heartbeat warnings
async def on_pipeline_error(task, error):
logger.error(f"Pipeline error occurred: {error}")
logger.exception("Pipeline error details")
# Tag heartbeat-related errors for better tracking
if "heartbeat" in str(error).lower():
logger.warning("Heartbeat-related error detected - forcing log flush")
if SENTRY_DSN:
with sentry_sdk.configure_scope() as scope:
scope.set_tag("error_type", "heartbeat")
scope.set_context("error_details", {"error_message": str(error)})
sentry_sdk.capture_exception(error)
sentry_sdk.flush(timeout=3) # Longer timeout for critical errors
# Register the error handler
task.add_event_handler("on_error", on_pipeline_error)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
logger.debug(f"Client connection details: {type(client)}")
# Force log flush to ensure delivery in cloud environment
if SENTRY_DSN:
sentry_sdk.flush(timeout=2)
# Kick off the conversation
logger.info("Starting conversation with context frame...")
await task.queue_frames([context_aggregator.user().get_context_frame()])
logger.debug("Context frame queued successfully")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected: {client}")
logger.debug(f"Client disconnection details: {type(client)}")
# Force log flush before cleanup
if SENTRY_DSN:
sentry_sdk.flush(timeout=2)
logger.info("Cancelling pipeline task...")
await task.cancel()
logger.debug("Pipeline task cancelled successfully")
# Final flush after cleanup
if SENTRY_DSN:
sentry_sdk.flush(timeout=2)
logger.info("Setting up pipeline runner...")
runner = PipelineRunner(handle_sigint=False, force_gc=True)
logger.debug("Pipeline runner configured")
# Start periodic log flushing to prevent loss during heartbeat warnings
flush_task = None
if SENTRY_DSN:
flush_task = asyncio.create_task(periodic_log_flush())
logger.debug("Started periodic log flush task")
logger.info("Starting pipeline execution...")
# Ensure all logs are sent before pipeline starts
if SENTRY_DSN:
sentry_sdk.flush(timeout=2)
try:
await runner.run(task)
logger.info("Pipeline execution completed")
finally:
# Clean up the flush task
if flush_task:
flush_task.cancel()
try:
await flush_task
except asyncio.CancelledError:
pass
logger.debug("Stopped periodic log flush task")
# Final flush after pipeline completion
if SENTRY_DSN:
sentry_sdk.flush(timeout=5)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
logger.info("Starting main bot function")
logger.debug(f"Runner arguments: {runner_args}")
transport = None
logger.info("Parsing telephony websocket...")
transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)
logger.info(f"Auto-detected transport: {transport_type}")
logger.debug(f"Call data: {call_data}")
# Create transport based on detected type
if transport_type == "twilio":
logger.info("Initializing Twilio transport...")
from pipecat.serializers.twilio import TwilioFrameSerializer
serializer = TwilioFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
logger.debug("Twilio serializer created successfully")
else:
logger.error(f"Unsupported telephony provider: {transport_type}")
logger.warning(f"Transport type {transport_type} is not supported")
return
# Create the transport
logger.info("Creating FastAPI websocket transport...")
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_in_filter=None,
audio_out_enabled=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(),
serializer=serializer,
),
)
logger.debug("FastAPI websocket transport created successfully")
if transport is None:
logger.error("Failed to create transport")
logger.critical("Transport creation failed - cannot proceed")
return
logger.info("Transport created successfully, starting bot...")
try:
# Add comprehensive error context for Sentry
if SENTRY_DSN:
with sentry_sdk.configure_scope() as scope:
scope.set_tag("bot_phase", "execution")
scope.set_context("transport", {"type": type(transport).__name__})
scope.set_context("environment", {"platform": "pipecat_cloud"})
await run_bot(transport)
logger.info("Bot process completed successfully")
logger.debug("Bot execution finished without errors")
# Ensure final logs are sent
if SENTRY_DSN:
sentry_sdk.flush(timeout=5)
except Exception as e:
logger.exception(f"Error in bot process: {str(e)}")
logger.error(f"Bot execution failed with exception: {type(e).__name__}")
logger.critical("Bot process terminated due to error")
# Ensure error logs are sent before re-raising
if SENTRY_DSN:
sentry_sdk.capture_exception(e)
sentry_sdk.flush(timeout=5)
raise
if __name__ == "__main__":
logger.info("Starting bot as main module")
logger.debug("Importing main function...")
from pipecat.runner.run import main
logger.info("Calling main function...")
main()
logger.info("Main function completed")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment