Created
August 26, 2025 06:22
-
-
Save jamsea/f040a3b08247ffc071aa5e96cbee7eed to your computer and use it in GitHub Desktop.
Pipecat Cloud Twilio Bot with Enhanced Sentry Integration - Fixes for log consistency and heartbeat warning issues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # | |
| # Copyright (c) 2025, Daily | |
| # | |
| # SPDX-License-Identifier: BSD 2-Clause License | |
| # | |
| import asyncio | |
| import os | |
| import sentry_sdk | |
| from dotenv import load_dotenv | |
| from loguru import logger | |
| from pipecat.audio.vad.silero import SileroVADAnalyzer | |
| from pipecat.pipeline.pipeline import Pipeline | |
| from pipecat.pipeline.runner import PipelineRunner | |
| from pipecat.pipeline.task import PipelineParams, PipelineTask | |
| from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext | |
| from pipecat.processors.metrics.sentry import SentryMetrics | |
| from pipecat.runner.types import RunnerArguments | |
| from pipecat.runner.utils import parse_telephony_websocket | |
| from pipecat.services.cartesia.tts import CartesiaTTSService | |
| from pipecat.services.deepgram.stt import DeepgramSTTService | |
| from pipecat.services.openai.llm import OpenAILLMService | |
| from pipecat.transports.base_transport import BaseTransport | |
| from pipecat.transports.network.fastapi_websocket import ( | |
| FastAPIWebsocketParams, | |
| FastAPIWebsocketTransport, | |
| ) | |
| from sentry_sdk.integrations.loguru import LoggingLevels, LoguruIntegration | |
| load_dotenv(override=True) | |
| async def periodic_log_flush(): | |
| """Periodically flush Sentry logs to prevent loss during system stress.""" | |
| while True: | |
| try: | |
| await asyncio.sleep(10) # Flush every 10 seconds | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=1) | |
| except Exception as e: | |
| logger.debug(f"Periodic flush error (non-critical): {e}") | |
| logger.info("Starting bot initialization process") | |
| logger.debug("Environment variables loading...") | |
| SENTRY_DSN = os.getenv("SENTRY_DSN") | |
| logger.info(f"SENTRY_DSN found: {'Yes' if SENTRY_DSN else 'No'}") | |
| if SENTRY_DSN: | |
| logger.info("Initializing Sentry SDK...") | |
| sentry_sdk.init( | |
| dsn=SENTRY_DSN, | |
| traces_sample_rate=1.0, | |
| profiles_sample_rate=1.0, | |
| default_integrations=True, | |
| environment=os.getenv("ENV"), | |
| integrations=[ | |
| LoguruIntegration( | |
| level=LoggingLevels.INFO.value, # Capture INFO and above as breadcrumbs | |
| event_level=LoggingLevels.ERROR.value, # Send ERROR logs as events | |
| ) | |
| ], | |
| # Improve reliability in cloud environments | |
| send_default_pii=False, | |
| attach_stacktrace=True, | |
| ) | |
| logger.info(f"Sentry initialized - ENV: {os.getenv('ENV')}") | |
| logger.debug("Sentry configuration completed successfully") | |
| # Test logs to verify Sentry integration | |
| logger.warning("This is a test warning log to verify Sentry integration") | |
| logger.error("This is a test error log to verify Sentry integration") | |
| # Initial flush to ensure test logs are sent | |
| sentry_sdk.flush(timeout=3) | |
| else: | |
| logger.error("No SENTRY_DSN found, Sentry not initialized") | |
| logger.warning("Sentry integration will be disabled") | |
| async def run_bot(transport: BaseTransport): | |
| """Run your bot with the provided transport. | |
| Args: | |
| transport (BaseTransport): The transport to use for communication. | |
| """ | |
| logger.info("Starting run_bot function") | |
| logger.debug(f"Transport type: {type(transport).__name__}") | |
| # Configure your STT, LLM, and TTS services here | |
| # Swap out different processors or properties to customize your bot | |
| logger.info("Initializing STT service...") | |
| stt = DeepgramSTTService( | |
| api_key=os.getenv("DEEPGRAM_API_KEY", ""), | |
| metrics=SentryMetrics() if SENTRY_DSN else None, | |
| ) | |
| logger.debug("STT service initialized successfully") | |
| logger.info("Initializing LLM service...") | |
| llm = OpenAILLMService( | |
| api_key=os.getenv("OPENAI_API_KEY", ""), | |
| model="gpt-4o", | |
| metrics=SentryMetrics() if SENTRY_DSN else None, | |
| ) | |
| logger.debug("LLM service initialized successfully") | |
| logger.info("Initializing TTS service...") | |
| tts = CartesiaTTSService( | |
| api_key=os.getenv("CARTESIA_API_KEY", ""), | |
| voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady | |
| metrics=SentryMetrics() if SENTRY_DSN else None, | |
| ) | |
| logger.debug("TTS service initialized successfully") | |
| # Set up the initial context for the conversation | |
| # You can specified initial system and assistant messages here | |
| logger.info("Setting up conversation context...") | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are Chatbot, a friendly, helpful robot. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by introducing yourself.", | |
| }, | |
| ] | |
| logger.debug(f"Initial messages configured: {len(messages)} messages") | |
| # Define and register tools as required | |
| logger.info("Configuring tools...") | |
| logger.debug("Tools configuration completed") | |
| # This sets up the LLM context by providing messages and tools | |
| logger.info("Creating LLM context aggregator...") | |
| context = OpenAILLMContext(messages) | |
| context_aggregator = llm.create_context_aggregator(context) | |
| logger.debug("LLM context aggregator created successfully") | |
| # A core voice AI pipeline | |
| # Add additional processors to customize the bot's behavior | |
| logger.info("Building pipeline...") | |
| pipeline = Pipeline( | |
| [ | |
| transport.input(), | |
| stt, | |
| context_aggregator.user(), | |
| llm, | |
| tts, | |
| transport.output(), | |
| context_aggregator.assistant(), | |
| ] | |
| ) | |
| logger.debug("Pipeline built successfully") | |
| logger.info("Creating pipeline task...") | |
| task = PipelineTask( | |
| pipeline, | |
| params=PipelineParams( | |
| allow_interruptions=True, | |
| audio_in_sample_rate=8000, | |
| audio_out_sample_rate=8000, | |
| enable_metrics=True, | |
| enable_usage_metrics=True, | |
| ), | |
| ) | |
| logger.debug("Pipeline task created successfully") | |
| # Add error handler for pipeline events, especially heartbeat warnings | |
| async def on_pipeline_error(task, error): | |
| logger.error(f"Pipeline error occurred: {error}") | |
| logger.exception("Pipeline error details") | |
| # Tag heartbeat-related errors for better tracking | |
| if "heartbeat" in str(error).lower(): | |
| logger.warning("Heartbeat-related error detected - forcing log flush") | |
| if SENTRY_DSN: | |
| with sentry_sdk.configure_scope() as scope: | |
| scope.set_tag("error_type", "heartbeat") | |
| scope.set_context("error_details", {"error_message": str(error)}) | |
| sentry_sdk.capture_exception(error) | |
| sentry_sdk.flush(timeout=3) # Longer timeout for critical errors | |
| # Register the error handler | |
| task.add_event_handler("on_error", on_pipeline_error) | |
| @transport.event_handler("on_client_connected") | |
| async def on_client_connected(transport, client): | |
| logger.info(f"Client connected: {client}") | |
| logger.debug(f"Client connection details: {type(client)}") | |
| # Force log flush to ensure delivery in cloud environment | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=2) | |
| # Kick off the conversation | |
| logger.info("Starting conversation with context frame...") | |
| await task.queue_frames([context_aggregator.user().get_context_frame()]) | |
| logger.debug("Context frame queued successfully") | |
| @transport.event_handler("on_client_disconnected") | |
| async def on_client_disconnected(transport, client): | |
| logger.info(f"Client disconnected: {client}") | |
| logger.debug(f"Client disconnection details: {type(client)}") | |
| # Force log flush before cleanup | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=2) | |
| logger.info("Cancelling pipeline task...") | |
| await task.cancel() | |
| logger.debug("Pipeline task cancelled successfully") | |
| # Final flush after cleanup | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=2) | |
| logger.info("Setting up pipeline runner...") | |
| runner = PipelineRunner(handle_sigint=False, force_gc=True) | |
| logger.debug("Pipeline runner configured") | |
| # Start periodic log flushing to prevent loss during heartbeat warnings | |
| flush_task = None | |
| if SENTRY_DSN: | |
| flush_task = asyncio.create_task(periodic_log_flush()) | |
| logger.debug("Started periodic log flush task") | |
| logger.info("Starting pipeline execution...") | |
| # Ensure all logs are sent before pipeline starts | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=2) | |
| try: | |
| await runner.run(task) | |
| logger.info("Pipeline execution completed") | |
| finally: | |
| # Clean up the flush task | |
| if flush_task: | |
| flush_task.cancel() | |
| try: | |
| await flush_task | |
| except asyncio.CancelledError: | |
| pass | |
| logger.debug("Stopped periodic log flush task") | |
| # Final flush after pipeline completion | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=5) | |
| async def bot(runner_args: RunnerArguments): | |
| """Main bot entry point compatible with Pipecat Cloud.""" | |
| logger.info("Starting main bot function") | |
| logger.debug(f"Runner arguments: {runner_args}") | |
| transport = None | |
| logger.info("Parsing telephony websocket...") | |
| transport_type, call_data = await parse_telephony_websocket(runner_args.websocket) | |
| logger.info(f"Auto-detected transport: {transport_type}") | |
| logger.debug(f"Call data: {call_data}") | |
| # Create transport based on detected type | |
| if transport_type == "twilio": | |
| logger.info("Initializing Twilio transport...") | |
| from pipecat.serializers.twilio import TwilioFrameSerializer | |
| serializer = TwilioFrameSerializer( | |
| stream_sid=call_data["stream_id"], | |
| call_sid=call_data["call_id"], | |
| account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""), | |
| auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""), | |
| ) | |
| logger.debug("Twilio serializer created successfully") | |
| else: | |
| logger.error(f"Unsupported telephony provider: {transport_type}") | |
| logger.warning(f"Transport type {transport_type} is not supported") | |
| return | |
| # Create the transport | |
| logger.info("Creating FastAPI websocket transport...") | |
| transport = FastAPIWebsocketTransport( | |
| websocket=runner_args.websocket, | |
| params=FastAPIWebsocketParams( | |
| audio_in_enabled=True, | |
| audio_in_filter=None, | |
| audio_out_enabled=True, | |
| add_wav_header=False, | |
| vad_analyzer=SileroVADAnalyzer(), | |
| serializer=serializer, | |
| ), | |
| ) | |
| logger.debug("FastAPI websocket transport created successfully") | |
| if transport is None: | |
| logger.error("Failed to create transport") | |
| logger.critical("Transport creation failed - cannot proceed") | |
| return | |
| logger.info("Transport created successfully, starting bot...") | |
| try: | |
| # Add comprehensive error context for Sentry | |
| if SENTRY_DSN: | |
| with sentry_sdk.configure_scope() as scope: | |
| scope.set_tag("bot_phase", "execution") | |
| scope.set_context("transport", {"type": type(transport).__name__}) | |
| scope.set_context("environment", {"platform": "pipecat_cloud"}) | |
| await run_bot(transport) | |
| logger.info("Bot process completed successfully") | |
| logger.debug("Bot execution finished without errors") | |
| # Ensure final logs are sent | |
| if SENTRY_DSN: | |
| sentry_sdk.flush(timeout=5) | |
| except Exception as e: | |
| logger.exception(f"Error in bot process: {str(e)}") | |
| logger.error(f"Bot execution failed with exception: {type(e).__name__}") | |
| logger.critical("Bot process terminated due to error") | |
| # Ensure error logs are sent before re-raising | |
| if SENTRY_DSN: | |
| sentry_sdk.capture_exception(e) | |
| sentry_sdk.flush(timeout=5) | |
| raise | |
| if __name__ == "__main__": | |
| logger.info("Starting bot as main module") | |
| logger.debug("Importing main function...") | |
| from pipecat.runner.run import main | |
| logger.info("Calling main function...") | |
| main() | |
| logger.info("Main function completed") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment