Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Last active October 8, 2025 10:44
Show Gist options
  • Select an option

  • Save kausmeows/091a8923f74c4cf951b0e3b4b9e9999e to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/091a8923f74c4cf951b0e3b4b9e9999e to your computer and use it in GitHub Desktop.
from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.googlesearch import GoogleSearchTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from agno.workflow.parallel import Parallel
from typing import Dict, Any, AsyncIterator, Union, List
import json
from agno.run.workflow import WorkflowRunOutputEvent
from datetime import datetime
from pydantic import BaseModel, Field
# Define structured output models for each research phase
class CustomerProfileResearch(BaseModel):
"""Structured customer profile research findings"""
research_topic: str = Field(description="The customer research topic")
target_demographics: List[str] = Field(description="Key demographic segments identified", min_items=2)
customer_personas: List[str] = Field(description="Defined customer personas", min_items=2)
pain_points: List[str] = Field(description="Major customer pain points discovered", min_items=3)
motivations: List[str] = Field(description="Customer motivations and drivers", min_items=3)
customer_journey: List[str] = Field(description="Key touchpoints in customer journey", min_items=3)
behavioral_patterns: List[str] = Field(description="Customer behavioral insights", min_items=2)
segmentation_insights: str = Field(description="Customer segmentation summary")
confidence_score: float = Field(description="Confidence in findings (0.0-1.0)", ge=0.0, le=1.0)
class BusinessGoalsResearch(BaseModel):
"""Structured business goals research findings"""
research_topic: str = Field(description="The business goals research topic")
primary_objectives: List[str] = Field(description="Main business objectives identified", min_items=3)
key_metrics: List[str] = Field(description="Important KPIs and success metrics", min_items=3)
industry_trends: List[str] = Field(description="Relevant industry trends", min_items=3)
competitive_landscape: List[str] = Field(description="Competitive analysis insights", min_items=2)
growth_opportunities: List[str] = Field(description="Identified growth opportunities", min_items=2)
strategic_challenges: List[str] = Field(description="Key challenges to address", min_items=2)
market_positioning: str = Field(description="Market positioning analysis")
success_factors: List[str] = Field(description="Critical success factors", min_items=2)
confidence_score: float = Field(description="Confidence in findings (0.0-1.0)", ge=0.0, le=1.0)
class WebIntelligenceResearch(BaseModel):
"""Structured web intelligence research findings"""
research_topic: str = Field(description="The web intelligence research topic")
digital_presence: List[str] = Field(description="Digital presence insights", min_items=2)
social_media_patterns: List[str] = Field(description="Social media engagement patterns", min_items=2)
web_behavior: List[str] = Field(description="Web behavior analysis", min_items=2)
digital_touchpoints: List[str] = Field(description="Key digital touchpoints", min_items=3)
online_positioning: List[str] = Field(description="Online brand positioning insights", min_items=2)
digital_marketing: List[str] = Field(description="Digital marketing strategies observed", min_items=2)
engagement_metrics: str = Field(description="Engagement and interaction patterns summary")
technology_stack: List[str] = Field(description="Technology platforms and tools identified", min_items=2)
confidence_score: float = Field(description="Confidence in findings (0.0-1.0)", ge=0.0, le=1.0)
class ConsolidatedResearch(BaseModel):
"""Consolidated research findings from all phases"""
research_query: str = Field(description="Original research query")
key_insights: List[str] = Field(description="Top consolidated insights", min_items=5)
customer_profile_summary: str = Field(description="Executive summary of customer profile")
business_goals_summary: str = Field(description="Executive summary of business goals")
web_intelligence_summary: str = Field(description="Executive summary of web intelligence")
strategic_opportunities: List[str] = Field(description="Strategic opportunities identified", min_items=3)
critical_findings: List[str] = Field(description="Most critical findings across all research", min_items=4)
patterns_correlations: List[str] = Field(description="Patterns and correlations found", min_items=2)
recommendations: List[str] = Field(description="High-level recommendations", min_items=3)
research_confidence: float = Field(description="Overall research confidence (0.0-1.0)", ge=0.0, le=1.0)
# Define specialized research agents
customer_profile_agent = Agent(
name="Customer Profile Researcher",
model=OpenAIChat(id="gpt-4o"),
tools=[GoogleSearchTools(), DuckDuckGoTools()],
output_schema=CustomerProfileResearch,
instructions=[
"You are an expert customer profile researcher specializing in comprehensive customer analysis",
"Research customer demographics, psychographics, and behavioral patterns using available tools",
"Focus on understanding customer personas, pain points, and motivations",
"Analyze customer journey touchpoints and behavioral insights",
"Provide structured findings according to the CustomerProfileResearch model",
"Include confidence scores and detailed segmentation insights",
"Use tools extensively to gather data-driven insights"
],
db=InMemoryDb(),
)
business_goals_agent = Agent(
name="Business Goals Researcher",
model=OpenAIChat(id="gpt-4o"),
tools=[GoogleSearchTools(), HackerNewsTools()],
output_schema=BusinessGoalsResearch,
instructions=[
"You are a business strategy and goals research specialist with deep market expertise",
"Analyze customer business objectives, KPIs, and success metrics using available tools",
"Research industry trends, competitive landscape, and market opportunities",
"Identify growth opportunities and strategic challenges",
"Focus on understanding what customers want to achieve and critical success factors",
"Provide structured findings according to the BusinessGoalsResearch model",
"Include confidence scores and comprehensive market positioning analysis",
"Use tools to gather current industry data and competitive intelligence"
],
db=InMemoryDb(),
)
web_intelligence_agent = Agent(
name="Web Intelligence Researcher",
model=OpenAIChat(id="gpt-4o"),
tools=[GoogleSearchTools(), DuckDuckGoTools(), HackerNewsTools()],
output_schema=WebIntelligenceResearch,
instructions=[
"You are a web intelligence and market research specialist with expertise in digital analytics",
"Research customer's online presence, digital footprint, and web behavior using available tools",
"Analyze social media presence, website activity, and digital engagement patterns",
"Identify digital touchpoints, technology stacks, and online positioning strategies",
"Provide insights on customer's digital marketing approaches and engagement metrics",
"Structure your findings according to the WebIntelligenceResearch model",
"Include confidence scores and comprehensive engagement analysis",
"Use tools to gather current digital presence and online behavior data"
],
db=InMemoryDb(),
)
# Create research team for consolidation
research_consolidation_team = Team(
name="Research Consolidation Team",
members=[customer_profile_agent, business_goals_agent, web_intelligence_agent],
output_schema=ConsolidatedResearch,
instructions=[
"You are a research consolidation team specializing in synthesizing complex research data",
"Synthesize all research findings into comprehensive customer insights and patterns",
"Identify correlations, patterns, and key insights across customer profile, business goals, and web intelligence research",
"Create actionable recommendations and strategic opportunities based on consolidated research",
"Analyze critical findings and provide executive summaries for each research phase",
"Structure your consolidated findings according to the ConsolidatedResearch model",
"Include overall research confidence and strategic recommendations",
"Focus on creating cohesive insights that integrate all research phases"
],
db=InMemoryDb(),
)
# Task recommender agent
task_recommender_agent = Agent(
name="Task Recommender",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"You are an expert task and strategy recommender with deep implementation expertise",
"Based on consolidated customer research, provide specific, actionable tasks and recommendations",
"Prioritize tasks based on impact, urgency, and feasibility assessment",
"Create detailed action plans with implementation timelines and success metrics",
"Identify quick wins, resource requirements, and risk mitigation strategies",
"Develop both high and medium priority task categories with long-term strategic initiatives",
"Structure your recommendations according to the TaskRecommendations model",
"Include feasibility assessment and comprehensive implementation guidance"
],
db=InMemoryDb(),
)
def set_session_state_step(step_input: StepInput, session_state: Dict[str, Any]) -> StepOutput:
"""
Initialize session state for customer research workflow
"""
customer_query = step_input.input
# Initialize comprehensive session state structure
if "customer_research" not in session_state:
session_state["customer_research"] = {
"workflow_id": f"research_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"customer_query": str(customer_query),
"research_phases": {
"profile": {"status": "pending", "findings": []},
"business_goals": {"status": "pending", "findings": []},
"web_intelligence": {"status": "pending", "findings": []}
},
"consolidated_insights": [],
"recommendations": [],
"research_metadata": {
"started_at": datetime.now().isoformat(),
"total_research_steps": 0,
"completed_steps": 0
}
}
# Set workflow configuration
session_state["workflow_config"] = {
"research_depth": "comprehensive",
"focus_areas": ["customer_profile", "business_goals", "web_intelligence"],
"output_format": "detailed_report",
"created_by": "customer_research_team"
}
# Set research preferences
session_state["research_preferences"] = {
"analysis_style": "data_driven",
"recommendation_type": "actionable_tasks",
"reporting_level": "executive_summary"
}
session_state["customer_research"]["research_metadata"]["total_research_steps"] += 1
return StepOutput(
content=f"""
## Customer Research Session Initialized
**Research Query:** {customer_query}
**Workflow ID:** {session_state['customer_research']['workflow_id']}
**Research Phases:** {len(session_state['customer_research']['research_phases'])} phases planned
**Session Configuration:**
- Research Depth: {session_state['workflow_config']['research_depth']}
- Focus Areas: {', '.join(session_state['workflow_config']['focus_areas'])}
- Analysis Style: {session_state['research_preferences']['analysis_style']}
Session state has been initialized and is ready for comprehensive customer research.
""".strip()
)
async def customer_profile_research_step(step_input: StepInput, session_state: Dict[str, Any]) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Conduct customer profile research with session state tracking
"""
customer_query = step_input.input
previous_content = step_input.previous_step_content
# Update session state
session_state["customer_research"]["research_phases"]["profile"]["status"] = "in_progress"
research_prompt = f"""
CUSTOMER PROFILE RESEARCH REQUEST:
Research Query: {customer_query}
Session Context: {session_state['customer_research']['workflow_id']}
Previous Context: {previous_content[:300] if previous_content else "Initial research"}
RESEARCH OBJECTIVES:
1. Identify target customer demographics and psychographics
2. Understand customer personas and segmentation
3. Analyze customer pain points and motivations
4. Research customer journey and touchpoints
5. Identify customer preferences and behaviors
Provide comprehensive customer profile insights with specific data points and actionable findings.
"""
try:
research_result_iterator = customer_profile_agent.arun(research_prompt, stream=True, stream_intermediate_steps=True)
async for event in research_result_iterator:
yield event
# Get the actual response after streaming
research_result = customer_profile_agent.get_last_run_output()
# Store findings in session state with structured data
findings = {
"research_type": "customer_profile",
"timestamp": datetime.now().isoformat(),
"structured_data": research_result.content,
"success": True
}
session_state["customer_research"]["research_phases"]["profile"]["findings"].append(findings)
session_state["customer_research"]["research_phases"]["profile"]["status"] = "completed"
session_state["customer_research"]["research_metadata"]["completed_steps"] += 1
# Return the structured Pydantic data directly
yield StepOutput(
content=research_result.content,
success=True
)
except Exception as e:
session_state["customer_research"]["research_phases"]["profile"]["status"] = "failed"
yield StepOutput(
content=f"Customer profile research failed: {str(e)}",
success=False
)
async def customer_biz_goals_research_step(step_input: StepInput, session_state: Dict[str, Any]) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Conduct business goals research with session state tracking
"""
customer_query = step_input.input
# Update session state
session_state["customer_research"]["research_phases"]["business_goals"]["status"] = "in_progress"
research_prompt = f"""
CUSTOMER BUSINESS GOALS RESEARCH REQUEST:
Research Query: {customer_query}
Session Context: {session_state['customer_research']['workflow_id']}
Research Depth: {session_state['workflow_config']['research_depth']}
RESEARCH OBJECTIVES:
1. Identify customer's primary business objectives and KPIs
2. Understand success metrics and performance indicators
3. Analyze industry trends affecting customer goals
4. Research competitive landscape and market positioning
5. Identify growth opportunities and challenges
Provide detailed business goals analysis with strategic insights and market context.
"""
try:
research_result_iterator = business_goals_agent.arun(research_prompt, stream=True, stream_intermediate_steps=True)
async for event in research_result_iterator:
yield event
# Get the actual response after streaming
research_result = business_goals_agent.get_last_run_output()
# Store findings in session state with structured data
findings = {
"research_type": "business_goals",
"timestamp": datetime.now().isoformat(),
"structured_data": research_result.content,
"success": True
}
session_state["customer_research"]["research_phases"]["business_goals"]["findings"].append(findings)
session_state["customer_research"]["research_phases"]["business_goals"]["status"] = "completed"
session_state["customer_research"]["research_metadata"]["completed_steps"] += 1
# Return the structured Pydantic data directly
yield StepOutput(
content=research_result.content,
success=True
)
except Exception as e:
session_state["customer_research"]["research_phases"]["business_goals"]["status"] = "failed"
yield StepOutput(
content=f"Business goals research failed: {str(e)}",
success=False
)
async def web_intelligence_research_step(step_input: StepInput, session_state: Dict[str, Any]) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Conduct web intelligence research with session state tracking
"""
customer_query = step_input.input
# Update session state
session_state["customer_research"]["research_phases"]["web_intelligence"]["status"] = "in_progress"
research_prompt = f"""
WEB INTELLIGENCE RESEARCH REQUEST:
Research Query: {customer_query}
Session Context: {session_state['customer_research']['workflow_id']}
Analysis Style: {session_state['research_preferences']['analysis_style']}
RESEARCH OBJECTIVES:
1. Analyze customer's digital presence and online footprint
2. Research social media activity and engagement patterns
3. Understand web behavior and digital touchpoints
4. Identify online brand positioning and messaging
5. Analyze digital marketing strategies and channels
Provide comprehensive web intelligence with digital insights and online behavior analysis.
"""
try:
research_result_iterator = web_intelligence_agent.arun(research_prompt, stream=True, stream_intermediate_steps=True)
async for event in research_result_iterator:
yield event
# Get the actual response after streaming
research_result = web_intelligence_agent.get_last_run_output()
# Store findings in session state with structured data
findings = {
"research_type": "web_intelligence",
"timestamp": datetime.now().isoformat(),
"structured_data": research_result.content,
"success": True
}
session_state["customer_research"]["research_phases"]["web_intelligence"]["findings"].append(findings)
session_state["customer_research"]["research_phases"]["web_intelligence"]["status"] = "completed"
session_state["customer_research"]["research_metadata"]["completed_steps"] += 1
# Return the structured Pydantic data directly
yield StepOutput(
content=research_result.content,
success=True
)
except Exception as e:
session_state["customer_research"]["research_phases"]["web_intelligence"]["status"] = "failed"
yield StepOutput(
content=f"Web intelligence research failed: {str(e)}",
success=False
)
async def customer_report_consolidation_step(step_input: StepInput, session_state: Dict[str, Any]) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Consolidate all research findings into comprehensive customer report
"""
customer_query = step_input.input
previous_content = step_input.previous_step_content
# Gather all research findings from session state
research_data = session_state["customer_research"]
# Compile research findings with structured data
all_findings = []
structured_summaries = {}
for phase_name, phase_data in research_data["research_phases"].items():
for finding in phase_data["findings"]:
if "structured_data" in finding:
# Include structured data summary
all_findings.append({
"phase": phase_name,
"structured_data": finding["structured_data"],
"research_topic": finding.get("research_topic", "N/A"),
"confidence_score": finding.get("confidence_score", 0.0),
"timestamp": finding["timestamp"]
})
structured_summaries[phase_name] = finding["structured_data"]
else:
# Fallback for non-structured data
all_findings.append({
"phase": phase_name,
"content": str(finding.get("content", ""))[:500],
"timestamp": finding["timestamp"]
})
consolidation_prompt = f"""
COMPREHENSIVE CUSTOMER RESEARCH CONSOLIDATION (STRUCTURED DATA):
Original Query: {customer_query}
Session ID: {research_data['workflow_id']}
Total Research Phases: {len(research_data['research_phases'])}
Completed Steps: {research_data['research_metadata']['completed_steps']}
STRUCTURED RESEARCH FINDINGS TO CONSOLIDATE:
Customer Profile Research:
{json.dumps(structured_summaries.get('profile', {}), indent=2) if 'profile' in structured_summaries else 'No structured data available'}
Business Goals Research:
{json.dumps(structured_summaries.get('business_goals', {}), indent=2) if 'business_goals' in structured_summaries else 'No structured data available'}
Web Intelligence Research:
{json.dumps(structured_summaries.get('web_intelligence', {}), indent=2) if 'web_intelligence' in structured_summaries else 'No structured data available'}
CONSOLIDATION OBJECTIVES:
1. Synthesize all structured research findings into cohesive customer insights
2. Identify patterns, correlations, and key themes across customer profile, business goals, and web intelligence
3. Create comprehensive consolidated view with strategic opportunities
4. Highlight critical findings and cross-research correlations
5. Provide executive summaries and high-level recommendations
6. Structure response according to ConsolidatedResearch model
Create a detailed, consolidated customer research report that integrates all structured findings.
"""
try:
consolidation_result_iterator = research_consolidation_team.arun(consolidation_prompt, stream=True, stream_intermediate_steps=True)
async for event in consolidation_result_iterator:
yield event
# Get the actual response after streaming
consolidation_result = research_consolidation_team.get_last_run_output()
# Store consolidated insights in session state
consolidated_insight = {
"consolidation_timestamp": datetime.now().isoformat(),
"structured_data": consolidation_result.content,
"research_phases_included": list(research_data["research_phases"].keys()),
"total_findings_consolidated": len(all_findings)
}
session_state["customer_research"]["consolidated_insights"].append(consolidated_insight)
# Return the structured Pydantic data directly
yield StepOutput(
content=consolidation_result.content,
success=True
)
except Exception as e:
yield StepOutput(
content=f"Research consolidation failed: {str(e)}",
success=False
)
async def task_recommender_step(step_input: StepInput, session_state: Dict[str, Any]) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Generate actionable task recommendations based on consolidated research
"""
customer_query = step_input.input
previous_content = step_input.previous_step_content
research_data = session_state["customer_research"]
workflow_config = session_state["workflow_config"]
research_prefs = session_state["research_preferences"]
# Get latest consolidated insights
latest_insights = research_data["consolidated_insights"][-1] if research_data["consolidated_insights"] else {}
consolidated_structured_data = latest_insights.get("structured_data", {}) if latest_insights else {}
recommendation_prompt = f"""
STRATEGIC TASK RECOMMENDATIONS REQUEST (BASED ON STRUCTURED DATA):
Research Query: {customer_query}
Session Context: {research_data['workflow_id']}
CONSOLIDATED RESEARCH INSIGHTS (STRUCTURED):
{json.dumps(consolidated_structured_data, indent=2) if consolidated_structured_data else "No structured consolidated research available"}
WORKFLOW CONFIGURATION:
- Research Depth: {workflow_config['research_depth']}
- Focus Areas: {', '.join(workflow_config['focus_areas'])}
- Recommendation Type: {research_prefs['recommendation_type']}
- Reporting Level: {research_prefs['reporting_level']}
SESSION RESEARCH SUMMARY:
- Total Research Phases: {len(research_data['research_phases'])}
- Completed Analysis Steps: {research_data['research_metadata']['completed_steps']}
- Research Duration: Session-based analysis
RECOMMENDATION OBJECTIVES:
1. Generate specific, actionable tasks based on research findings
2. Prioritize recommendations by impact, urgency, and feasibility
3. Create detailed action plans with timelines and success metrics
4. Align recommendations with identified customer goals and pain points
5. Provide implementation guidance and resource requirements
Create comprehensive task recommendations with clear action items and strategic priorities.
"""
try:
recommendation_result_iterator = task_recommender_agent.arun(recommendation_prompt, stream=True, stream_intermediate_steps=True)
async for event in recommendation_result_iterator:
yield event
# Get the actual response after streaming
recommendation_result = task_recommender_agent.get_last_run_output()
# Store recommendations in session state
recommendation_data = {
"recommendation_timestamp": datetime.now().isoformat(),
"structured_data": recommendation_result.content,
"based_on_insights": len(research_data["consolidated_insights"]),
"recommendation_type": research_prefs["recommendation_type"]
}
session_state["customer_research"]["recommendations"].append(recommendation_data)
# Final session state update
session_state["customer_research"]["research_metadata"]["completed_at"] = datetime.now().isoformat()
session_state["customer_research"]["research_metadata"]["final_status"] = "completed_successfully"
# Return the structured Pydantic data directly
yield StepOutput(
content=recommendation_result.content,
success=True
)
except Exception as e:
yield StepOutput(
content=f"Task recommendation generation failed: {str(e)}",
success=False
)
# Define workflow steps
set_session_state_step_obj = Step(
name="Set Session State",
executor=set_session_state_step,
)
customer_profile_research_step_obj = Step(
name="Customer Profile Research",
executor=customer_profile_research_step,
)
customer_biz_goals_research_step_obj = Step(
name="Customer Business Goals Research",
executor=customer_biz_goals_research_step,
)
web_intelligence_research_step_obj = Step(
name="Web Intelligence Research",
executor=web_intelligence_research_step,
)
customer_report_consolidation_step_obj = Step(
name="Customer Report Consolidation",
executor=customer_report_consolidation_step,
)
task_recommender_step_obj = Step(
name="Task Recommender",
executor=task_recommender_step,
)
# Create the comprehensive customer research workflow
customer_research_workflow = Workflow(
name="Customer Research Pipeline",
description="Comprehensive customer research with parallel execution and session state management",
db=SqliteDb(
session_table="customer_research_sessions",
db_file="tmp/customer_research_workflow.db",
),
steps=[
set_session_state_step_obj,
Parallel(
customer_profile_research_step_obj,
customer_biz_goals_research_step_obj,
web_intelligence_research_step_obj,
name="Parallel Research Phase"
),
customer_report_consolidation_step_obj,
task_recommender_step_obj,
],
)
agent_os = AgentOS(
description="Example OS setup",
workflows=[customer_research_workflow],
)
app = agent_os.get_app()
if __name__ == "__main__":
agent_os.serve(app="test:app", reload=True)
# # Example usage
# async def main():
# print("🔍 Starting Comprehensive Customer Research Workflow...")
# print("=" * 70)
# # Example customer research query
# research_query = "Analyze SaaS startup customers in the healthcare technology space, focusing on mid-market companies (50-500 employees) looking for patient management solutions"
# # Run the workflow
# result = await customer_research_workflow.aprint_response(
# input=research_query,
# markdown=True,
# stream=True,
# stream_intermediate_steps=True,
# )
# print("\n" + "=" * 70)
# print("✅ Customer Research Workflow Completed Successfully!")
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment