Based on the research conducted using Octocode MCP, here's a real-world example of Deep Agents in action:
Repository: guy-hartstein/company-research-agent (1,445+ stars)
This is a production-ready implementation of Deep Agents that demonstrates all four core components working together:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Planning Tool β β Sub Agents β β File System β
β β β β β β
β β’ Task Pipeline ββββββ β’ CompanyAnalyzerββββββ β’ Document Storeβ
β β’ Node Routing β β β’ IndustryAnalyzerβ β β’ Progress Cacheβ
β β’ Progress Trackβ β β’ FinancialAnalystβ β β’ Result Archiveβ
β β’ Error Recoveryβ β β’ NewsScanner β β β’ Search Index β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βββββββββββββββββββ
β System Prompt β
β β
β β’ Research Goalsβ
β β’ Quality Standardsβ
β β’ Coordination Rulesβ
β β’ Output Formats β
βββββββββββββββββββ
The system uses a sequential pipeline architecture with specialized nodes:
# Research Pipeline (Planning Tool)
class ResearchPipeline:
def __init__(self):
self.nodes = [
CompanyAnalyzer(), # Research core business info
IndustryAnalyzer(), # Analyze market position
FinancialAnalyst(), # Gather financial data
NewsScanner(), # Collect recent news
Collector(), # Aggregate all research
Curator(), # Filter and score content
Briefing(), # Generate summaries
Editor() # Compile final report
]
async def execute(self, company_name: str):
context = {"company": company_name, "data": {}}
for node in self.nodes:
context = await node.process(context)
await self.send_progress_update(node.name, context)
return context["final_report"]Each agent has domain-specific expertise and tools:
class CompanyAnalyzer(BaseAgent):
"""Researches core business information"""
async def process(self, context):
company = context["company"]
# Generate targeted search queries
queries = await self.generate_queries(company)
# Use Tavily API for web research
results = await self.tavily_client.search(queries)
# Extract and structure business data
business_info = await self.extract_business_info(results)
context["data"]["business"] = business_info
return context
class FinancialAnalyst(BaseAgent):
"""Gathers financial metrics and performance data"""
async def process(self, context):
company = context["company"]
# Search for financial reports and metrics
financial_queries = [
f"{company} financial statements",
f"{company} revenue earnings",
f"{company} stock performance"
]
results = await self.tavily_client.search(financial_queries)
financial_data = await self.extract_financial_metrics(results)
context["data"]["financial"] = financial_data
return contextSophisticated content curation and storage system:
class ContentCurator:
"""Implements content filtering and relevance scoring"""
async def process(self, context):
all_documents = context["data"]["raw_documents"]
# Relevance scoring using Tavily's AI
scored_docs = []
for doc in all_documents:
score = await self.tavily_client.get_relevance_score(doc)
if score >= self.min_threshold: # Default: 0.4
scored_docs.append({
"content": doc,
"relevance_score": score,
"url": doc.url,
"timestamp": doc.timestamp
})
# Sort by relevance and deduplicate
curated_docs = self.deduplicate_and_sort(scored_docs)
# Store in persistent file system
await self.file_system.store_curated_content(curated_docs)
context["data"]["curated"] = curated_docs
return context
class VirtualFileSystem:
"""Manages persistent storage and retrieval"""
def __init__(self):
self.storage = {} # In production: MongoDB/PostgreSQL
self.search_index = SearchIndex()
async def store_curated_content(self, documents):
for doc in documents:
doc_id = self.generate_doc_id(doc)
self.storage[doc_id] = doc
await self.search_index.index_document(doc_id, doc)
async def retrieve_by_query(self, query: str):
return await self.search_index.search(query)Multi-model architecture with specialized prompts:
class SystemPromptManager:
"""Manages different prompts for different models and tasks"""
RESEARCH_PROMPT = """
You are a specialized research agent in a multi-agent system.
Your role: {agent_role}
Guidelines:
1. Focus on factual, verifiable information
2. Cite sources and provide relevance scores
3. Structure output for downstream processing
4. Communicate progress to the coordination system
Current Task: {task_description}
Context: {context_data}
"""
SYNTHESIS_PROMPT = """
You are using Gemini 2.0 Flash for high-context research synthesis.
Task: Generate comprehensive briefing from research data
Strengths: Large context windows, data summarization
Input: {research_data}
Requirements: {output_requirements}
"""
EDITING_PROMPT = """
You are using GPT-4.1-mini for precise formatting and editing.
Task: Compile and format final report
Strengths: Precise formatting, consistency, markdown structure
Content: {briefing_content}
Format Requirements: {format_specs}
"""The system implements WebSocket-based real-time updates:
class WebSocketManager:
"""Manages real-time communication with frontend"""
def __init__(self):
self.active_connections = {}
async def send_status_update(self, job_id: str, status: str, message: str, result: dict):
"""Send structured updates to connected clients"""
update = {
"job_id": job_id,
"status": status,
"message": message,
"result": result,
"timestamp": datetime.utcnow().isoformat()
}
if job_id in self.active_connections:
await self.active_connections[job_id].send_json(update)
# Usage in agents
async def process_with_progress(self, context):
await websocket_manager.send_status_update(
job_id=context["job_id"],
status="processing",
message=f"Analyzing {context['company']} financial data",
result={
"step": "FinancialAnalyst",
"progress": "50%",
"documents_processed": 15
}
)Strategic use of different models for optimal performance:
class ModelManager:
"""Manages different models for different tasks"""
def __init__(self):
self.gemini = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
self.gpt4 = ChatOpenAI(model="gpt-4.1-mini")
async def synthesize_research(self, research_data: dict) -> str:
"""Use Gemini for high-context synthesis"""
prompt = self.build_synthesis_prompt(research_data)
return await self.gemini.ainvoke(prompt)
async def format_report(self, content: str) -> str:
"""Use GPT-4 for precise formatting"""
prompt = self.build_formatting_prompt(content)
return await self.gpt4.ainvoke(prompt)For complex workflow management:
from langgraph.graph import StateGraph, END
def create_research_graph():
"""Create LangGraph workflow for research pipeline"""
workflow = StateGraph(ResearchState)
# Add nodes for each agent
workflow.add_node("company_analyzer", CompanyAnalyzer())
workflow.add_node("industry_analyzer", IndustryAnalyzer())
workflow.add_node("financial_analyst", FinancialAnalyst())
workflow.add_node("news_scanner", NewsScanner())
workflow.add_node("curator", ContentCurator())
workflow.add_node("briefing", BriefingGenerator())
workflow.add_node("editor", ReportEditor())
# Define workflow edges
workflow.add_edge("company_analyzer", "industry_analyzer")
workflow.add_edge("industry_analyzer", "financial_analyst")
workflow.add_edge("financial_analyst", "news_scanner")
workflow.add_edge("news_scanner", "curator")
workflow.add_edge("curator", "briefing")
workflow.add_edge("briefing", "editor")
workflow.add_edge("editor", END)
# Set entry point
workflow.set_entry_point("company_analyzer")
return workflow.compile()For dynamic tool discovery:
from deepmcpagent import HTTPServerSpec, build_deep_agent
async def create_mcp_enhanced_agent():
"""Create agent with MCP tool discovery"""
servers = {
"research_tools": HTTPServerSpec(
url="http://localhost:8000/mcp",
transport="http"
),
"financial_data": HTTPServerSpec(
url="http://financial-api.com/mcp",
transport="sse"
)
}
model = ChatOpenAI(model="gpt-4.1")
graph, loader = await build_deep_agent(
servers=servers,
model=model,
instructions="You are a research agent with access to dynamic tools",
trace_tools=True
)
return graph, loaderfrom fastapi import FastAPI, WebSocket, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Deep Agents Research System")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/research/start")
async def start_research(
request: ResearchRequest,
background_tasks: BackgroundTasks
):
"""Start a new research job"""
job_id = generate_job_id()
# Start research in background
background_tasks.add_task(
execute_research_pipeline,
job_id,
request.company_name,
request.research_depth
)
return {"job_id": job_id, "status": "started"}
@app.websocket("/research/ws/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
"""WebSocket endpoint for real-time updates"""
await websocket.accept()
websocket_manager.add_connection(job_id, websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
websocket_manager.remove_connection(job_id)Execute independent agents concurrently:
import asyncio
async def parallel_research_phase(context):
"""Run independent research agents in parallel"""
tasks = [
CompanyAnalyzer().process(context.copy()),
IndustryAnalyzer().process(context.copy()),
FinancialAnalyst().process(context.copy()),
NewsScanner().process(context.copy())
]
results = await asyncio.gather(*tasks)
# Merge results
merged_context = context.copy()
for result in results:
merged_context["data"].update(result["data"])
return merged_contextCache expensive operations:
from functools import lru_cache
import redis
class CacheManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
async def get_cached_research(self, company: str, research_type: str):
"""Get cached research results"""
cache_key = f"research:{company}:{research_type}"
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
return None
async def cache_research(self, company: str, research_type: str, data: dict):
"""Cache research results with TTL"""
cache_key = f"research:{company}:{research_type}"
self.redis_client.setex(
cache_key,
timedelta(hours=24), # 24-hour TTL
json.dumps(data)
)Manage API rate limits and costs:
class ResourceManager:
def __init__(self):
self.api_quotas = {
"tavily": RateLimiter(max_calls=100, time_window=3600),
"openai": RateLimiter(max_calls=1000, time_window=3600),
"gemini": RateLimiter(max_calls=500, time_window=3600)
}
async def execute_with_quota(self, api_name: str, func, *args, **kwargs):
"""Execute function with rate limiting"""
limiter = self.api_quotas[api_name]
async with limiter:
return await func(*args, **kwargs)from pydantic import BaseModel, validator
import re
class ResearchRequest(BaseModel):
company_name: str
research_depth: str = "standard"
@validator('company_name')
def validate_company_name(cls, v):
# Sanitize company name input
if not re.match(r'^[a-zA-Z0-9\s\-\.&]+$', v):
raise ValueError('Invalid company name format')
if len(v) > 100:
raise ValueError('Company name too long')
return v.strip()class ResilientAgent:
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
async def execute_with_retry(self, func, *args, **kwargs):
"""Execute function with exponential backoff retry"""
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise e
wait_time = 2 ** attempt # Exponential backoff
await asyncio.sleep(wait_time)
logger.warning(f"Attempt {attempt + 1} failed: {e}")import structlog
from prometheus_client import Counter, Histogram, start_http_server
# Metrics
AGENT_EXECUTIONS = Counter('agent_executions_total', 'Total agent executions', ['agent_type', 'status'])
EXECUTION_TIME = Histogram('agent_execution_seconds', 'Agent execution time', ['agent_type'])
logger = structlog.get_logger()
class MonitoredAgent:
def __init__(self, agent_type: str):
self.agent_type = agent_type
async def execute(self, task):
with EXECUTION_TIME.labels(agent_type=self.agent_type).time():
try:
result = await self.process_task(task)
AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='success').inc()
logger.info(
"Agent execution completed",
agent_type=self.agent_type,
task_id=task.id,
duration=time.time() - start_time
)
return result
except Exception as e:
AGENT_EXECUTIONS.labels(agent_type=self.agent_type, status='error').inc()
logger.error(
"Agent execution failed",
agent_type=self.agent_type,
task_id=task.id,
error=str(e)
)
raise# Dockerfile for Deep Agents
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["uvicorn", "application:app", "--host", "0.0.0.0", "--port", "8000"]# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: deep-agents
spec:
replicas: 3
selector:
matchLabels:
app: deep-agents
template:
metadata:
labels:
app: deep-agents
spec:
containers:
- name: deep-agents
image: deep-agents:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: openai-key
- name: TAVILY_API_KEY
valueFrom:
secretKeyRef:
name: api-keys
key: tavily-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: deep-agents-service
spec:
selector:
app: deep-agents
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer- Stateless Agents: Design agents to be stateless for easy horizontal scaling
- Load Balancing: Distribute requests across multiple agent instances
- Queue Management: Use message queues for task distribution
- Memory Optimization: Efficient memory usage for large context processing
- CPU Optimization: Parallel processing for compute-intensive tasks
- GPU Acceleration: For model inference and heavy computations
- Read Replicas: Scale read operations across multiple database instances
- Sharding: Partition data across multiple databases
- Caching: Redis/Memcached for frequently accessed data
import pytest
from unittest.mock import AsyncMock, patch
class TestCompanyAnalyzer:
@pytest.fixture
def analyzer(self):
return CompanyAnalyzer()
@pytest.mark.asyncio
async def test_process_success(self, analyzer):
# Mock external dependencies
with patch.object(analyzer.tavily_client, 'search') as mock_search:
mock_search.return_value = [{"title": "Test Company", "content": "Test content"}]
context = {"company": "Test Corp", "data": {}}
result = await analyzer.process(context)
assert "business" in result["data"]
assert result["data"]["business"] is not None
@pytest.mark.asyncio
async def test_process_api_failure(self, analyzer):
# Test error handling
with patch.object(analyzer.tavily_client, 'search') as mock_search:
mock_search.side_effect = Exception("API Error")
context = {"company": "Test Corp", "data": {}}
with pytest.raises(Exception):
await analyzer.process(context)@pytest.mark.integration
class TestResearchPipeline:
@pytest.mark.asyncio
async def test_full_pipeline(self):
pipeline = ResearchPipeline()
# Test with real APIs (using test keys)
result = await pipeline.execute("Apple Inc")
assert result is not None
assert "business" in result
assert "financial" in result
assert "industry" in result
assert "news" in resultimport asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
async def load_test_research_endpoint():
"""Load test the research endpoint"""
async def make_request(session, company):
async with session.post(
"http://localhost:8000/research/start",
json={"company_name": company}
) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
companies = ["Apple", "Google", "Microsoft", "Amazon", "Tesla"] * 20
tasks = [make_request(session, company) for company in companies]
results = await asyncio.gather(*tasks)
success_count = sum(1 for r in results if r.get("status") == "started")
print(f"Success rate: {success_count}/{len(results)}")- DeepMCPAgent (
cryxnet/DeepMCPAgent): MCP-based agent framework - Company Research Agent (
guy-hartstein/company-research-agent): Production multi-agent system - LangGraph Examples (
langchain-ai/langgraph): Official workflow examples
This context document provides the technical foundation for implementing sophisticated Deep Agent systems. Use it alongside the Planning Document for comprehensive system development.