Created
July 18, 2025 17:57
-
-
Save madhurprash/8fe06cae9494ce33ade9c9b5408df915 to your computer and use it in GitHub Desktop.
Observability of a strands agent using langfuse
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Strands Agent with Langfuse Observability - Terminal Version | |
| This script demonstrates how to monitor and debug your Strands Agent using Langfuse. | |
| """ | |
| import os | |
| import sys | |
| import base64 | |
| import boto3 | |
| from dotenv import load_dotenv | |
| from openinference.instrumentation.bedrock import BedrockInstrumentor | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import (OTLPSpanExporter, Compression) | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry import trace | |
| from strands import Agent | |
| from strands.models.bedrock import BedrockModel | |
| def check_env_file(): | |
| """Check if .env file exists and has required variables""" | |
| if not os.path.exists('.env'): | |
| print("β .env file not found!") | |
| return False | |
| load_dotenv() | |
| required_vars = [ | |
| 'AWS_REGION_NAME', | |
| 'LANGFUSE_PUBLIC_KEY', | |
| 'LANGFUSE_SECRET_KEY', | |
| 'LANGFUSE_HOST' | |
| ] | |
| missing_vars = [var for var in required_vars if not os.getenv(var)] | |
| if missing_vars: | |
| print(f"β Missing required environment variables: {', '.join(missing_vars)}") | |
| return False | |
| print("β Environment variables loaded successfully!") | |
| return True | |
| def interactive_setup(): | |
| """Interactive setup for environment variables""" | |
| print("\nπ§ Setting up your Langfuse and AWS configuration...") | |
| print("=" * 60) | |
| # Check if .env exists | |
| if os.path.exists('.env'): | |
| print("π Found existing .env file") | |
| use_existing = input("Use existing .env file? (y/n): ").lower().strip() | |
| if use_existing == 'y': | |
| if check_env_file(): | |
| return True | |
| else: | |
| print("β Invalid .env file. Let's create a new one.") | |
| else: | |
| print("π Creating new .env file...") | |
| else: | |
| print("π No .env file found. Let's create one...") | |
| # Collect user inputs | |
| print("\nπ Please provide the following information:") | |
| agent_id = input("Agent ID (e.g., user123): ").strip() | |
| agent_alias = input("Agent Alias (e.g., useralias): ").strip() | |
| aws_region = input("AWS Region (default: us-west-2): ").strip() or "us-west-2" | |
| print("\nπ Langfuse Configuration:") | |
| print("You can find these keys in your Langfuse project settings.") | |
| langfuse_public_key = input("Langfuse Public Key (pk-lf-...): ").strip() | |
| langfuse_secret_key = input("Langfuse Secret Key (sk-lf-...): ").strip() | |
| langfuse_host = input("Langfuse Host (default: https://us.cloud.langfuse.com): ").strip() or "https://us.cloud.langfuse.com" | |
| # Create .env file | |
| env_content = f"""AGENT_ID={agent_id} | |
| AGENT_ALIAS={agent_alias} | |
| AWS_REGION_NAME={aws_region} | |
| LANGFUSE_PUBLIC_KEY="{langfuse_public_key}" | |
| LANGFUSE_SECRET_KEY="{langfuse_secret_key}" | |
| LANGFUSE_HOST="{langfuse_host}" | |
| """ | |
| with open('.env', 'w') as f: | |
| f.write(env_content) | |
| print("\nβ .env file created successfully!") | |
| return check_env_file() | |
| def setup_langfuse_tracing(): | |
| """Setup Langfuse tracing with OpenTelemetry""" | |
| try: | |
| auth = base64.b64encode(f"{os.environ['LANGFUSE_PUBLIC_KEY']}:{os.environ['LANGFUSE_SECRET_KEY']}".encode()).decode() | |
| exporter = OTLPSpanExporter( | |
| endpoint=f"{os.environ['LANGFUSE_HOST']}/api/public/otel/v1/traces", | |
| headers={"Authorization": f"Basic {auth}"}, | |
| timeout=30, | |
| compression=Compression.Gzip | |
| ) | |
| provider = TracerProvider() | |
| provider.add_span_processor(BatchSpanProcessor(exporter)) | |
| trace.set_tracer_provider(provider) | |
| print("β Langfuse tracing configured successfully!") | |
| return True | |
| except Exception as e: | |
| print(f"β Error setting up Langfuse tracing: {e}") | |
| return False | |
| def initialize_agent(): | |
| """Initialize the Strands Agent with configured model and system prompt""" | |
| system_prompt = """You are an independent fact-checker and grader for technical Q&A about AWS (and related cloud topics). You have up-to-date world knowledge and can reason step-by-step, but MUST reveal only a short justification to the user. | |
| You should only answer questions about AWS cloud related topics. For any non-AWS related questions, politely redirect the user to ask about AWS services, architecture, best practices, or cloud computing topics. | |
| Be accurate and provide a detailed answer to only cloud related questions""" | |
| try: | |
| model_id = input("Use the model on bedrock (default: us.anthropic.claude-3-5-sonnet-20241022-v2:0): ").strip() or "us.anthropic.claude-3-5-sonnet-20241022-v2:0" | |
| model = BedrockModel( | |
| model_id=model_id, | |
| ) | |
| agent = Agent( | |
| model=model, | |
| system_prompt=system_prompt, | |
| trace_attributes={ | |
| "session.id": os.getenv('AGENT_ALIAS', 'default-session'), | |
| "user.id": os.getenv('AGENT_ID', 'default-user'), | |
| "langfuse.tags": [ | |
| "Agent-SDK-Example", | |
| "Strands-Project-Demo", | |
| "Observability-Tutorial", | |
| "Terminal-Version" | |
| ] | |
| } | |
| ) | |
| print("β Agent initialized successfully!") | |
| return agent | |
| except Exception as e: | |
| print(f"β Error initializing agent: {e}") | |
| return None | |
| def show_predefined_questions(): | |
| """Display predefined questions for the user""" | |
| questions = [ | |
| "What are the key differences between EC2 and Lambda?", | |
| "How do you configure Auto Scaling in AWS?", | |
| "What are the best practices for S3 bucket security?", | |
| "Explain the difference between IAM roles and policies", | |
| "How does AWS CloudFormation work?", | |
| "What are the benefits of using Amazon RDS vs DynamoDB?", | |
| "How do you set up a VPC with public and private subnets?", | |
| "What is the difference between Application Load Balancer and Network Load Balancer?", | |
| "How do you implement disaster recovery in AWS?", | |
| "What are the cost optimization strategies for AWS?" | |
| ] | |
| print("\nπ½οΈ Here are some example questions you can ask:") | |
| print("=" * 60) | |
| for i, question in enumerate(questions, 1): | |
| print(f"{i:2d}. {question}") | |
| print("=" * 60) | |
| print("π‘ You can type the number (1-10) to use a predefined question, or type your own question.") | |
| print("π Type 'exit' to quit the program.") | |
| return questions | |
| def run_interactive_chat(agent): | |
| """Run the interactive chat session""" | |
| print(f"\nπ€ Restaurant Helper Agent is ready!") | |
| print("π Your traces will be available at: https://us.cloud.langfuse.com/") | |
| predefined_questions = show_predefined_questions() | |
| while True: | |
| try: | |
| print("\n" + "β" * 60) | |
| user_input = input("π€ You: ").strip() | |
| if user_input.lower() == 'exit': | |
| print("π Thank you for using Restaurant Helper! Goodbye!") | |
| break | |
| # Check if user entered a number for predefined questions | |
| if user_input.isdigit(): | |
| question_num = int(user_input) | |
| if 1 <= question_num <= len(predefined_questions): | |
| user_input = predefined_questions[question_num - 1] | |
| print(f"π Selected: {user_input}") | |
| else: | |
| print("β Invalid question number. Please try again.") | |
| continue | |
| if user_input: | |
| print("π€ Restaurant Helper:", end=" ") | |
| response = agent(user_input) | |
| print() # Add a newline after response | |
| except KeyboardInterrupt: | |
| print("\n\nπ Chat interrupted. Goodbye!") | |
| break | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| print("Please try again or type 'exit' to quit.") | |
| def main(): | |
| """Main function to run the script""" | |
| print("π½οΈ Welcome to Restaurant Helper with Langfuse Observability!") | |
| print("=" * 70) | |
| # Setup environment | |
| if not interactive_setup(): | |
| print("β Setup failed. Exiting...") | |
| sys.exit(1) | |
| # Setup Langfuse tracing | |
| if not setup_langfuse_tracing(): | |
| print("β Tracing setup failed. Exiting...") | |
| sys.exit(1) | |
| # Initialize agent | |
| agent = initialize_agent() | |
| if not agent: | |
| print("β Agent initialization failed. Exiting...") | |
| sys.exit(1) | |
| # Run interactive chat | |
| run_interactive_chat(agent) | |
| # Show Langfuse link at the end | |
| print("\nπ View your traces and analytics at:") | |
| print(f"π {os.getenv('LANGFUSE_HOST', 'https://us.cloud.langfuse.com')}") | |
| print("\nThank you for using Restaurant Helper! π½οΈ") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment