Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created August 12, 2025 06:14
Show Gist options
  • Select an option

  • Save kausmeows/33c9e662e6327cc7f0dfe6136b1c034b to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/33c9e662e6327cc7f0dfe6136b1c034b to your computer and use it in GitHub Desktop.
from typing import List, Dict, Any
import json
from agno.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.step import Step
from agno.workflow.v2.parallel import Parallel
from agno.workflow.v2.workflow import Workflow
from agno.workflow.v2.types import StepInput, StepOutput
# 1) Decomposer agent: returns a JSON array like: [{"query": "..."}, ...]
decomposer = Agent(
name="Query Decomposer",
instructions=[
"Break the input topic into 5-7 crisp, non-overlapping web search queries.",
'Return ONLY valid JSON array of objects: [{"query": "<string>"}]. No prose.'
],
)
# 2) Search agent used by each parallel sub-step
search_agent = Agent(
name="Searcher",
tools=[DuckDuckGoTools()],
instructions="Search the web and return a short, well-cited summary for the given query."
)
# 3) Writer agent (reduce)
writer = Agent(
name="Writer",
instructions=[
"You are a report writer.",
"Use the aggregated search summaries (from previous step) to write a coherent, well-structured report with headings and sources."
],
)
# -- Function step that does dynamic map (Parallel) + returns aggregated StepOutput --
def dynamic_map_parallel(step_input: StepInput) -> StepOutput:
# Read the decomposer output (previous step’s content)
raw = step_input.previous_step_content or step_input.message or "[]"
# Parse queries from JSON; fallback to lines
queries: List[str] = []
try:
data = json.loads(raw) if isinstance(raw, str) else raw
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and "query" in item:
queries.append(str(item["query"]))
elif isinstance(item, str):
queries.append(item)
except Exception:
# fallback if the decomposer returned lines
if isinstance(raw, str):
queries = [ln.strip() for ln in raw.splitlines() if ln.strip()]
if not queries:
return StepOutput(step_name="Dynamic Map", content="No queries found to execute in parallel.")
# Build a Parallel of function steps, one per query
def make_search_step(q: str) -> Step:
def run_search(_: StepInput) -> StepOutput:
# Run the search agent as a one-off step to ensure proper StepOutput
s = Step(name=f"search_{q[:32]}", agent=search_agent)
# Create a fresh StepInput so the agent receives the query as its message
return s.execute(StepInput(message=f"Search this query and summarize findings:\n{q}"))
return Step(name=f"Search:{q[:24]}", executor=run_search)
parallel = Parallel(*[make_search_step(q) for q in queries], name="Map: Search Queries")
# Execute the dynamic Parallel with the same StepInput context
aggregated = parallel.execute(step_input)
# aggregated is a StepOutput with:
# - content: aggregated markdown summary
# - parallel_step_outputs: dict of sub-step StepOutput objects
return aggregated
# Assemble the workflow: Decompose -> Dynamic Parallel Map -> Writer (Reduce)
workflow = Workflow(
name="Dynamic Map-Reduce Workflow",
steps=[
Step(name="Decompose", agent=decomposer),
Step(name="Map (Dynamic Parallel)", executor=dynamic_map_parallel),
Step(name="Write Report", agent=writer),
],
)
if __name__ == "__main__":
workflow.print_response(
message="Produce a report on current AI safety debates and practical mitigations",
stream=True,
stream_intermediate_steps=True,
markdown=True,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment