Created
August 12, 2025 06:14
-
-
Save kausmeows/33c9e662e6327cc7f0dfe6136b1c034b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import List, Dict, Any | |
| import json | |
| from agno.agent import Agent | |
| from agno.tools.duckduckgo import DuckDuckGoTools | |
| from agno.workflow.v2.step import Step | |
| from agno.workflow.v2.parallel import Parallel | |
| from agno.workflow.v2.workflow import Workflow | |
| from agno.workflow.v2.types import StepInput, StepOutput | |
| # 1) Decomposer agent: returns a JSON array like: [{"query": "..."}, ...] | |
| decomposer = Agent( | |
| name="Query Decomposer", | |
| instructions=[ | |
| "Break the input topic into 5-7 crisp, non-overlapping web search queries.", | |
| 'Return ONLY valid JSON array of objects: [{"query": "<string>"}]. No prose.' | |
| ], | |
| ) | |
| # 2) Search agent used by each parallel sub-step | |
| search_agent = Agent( | |
| name="Searcher", | |
| tools=[DuckDuckGoTools()], | |
| instructions="Search the web and return a short, well-cited summary for the given query." | |
| ) | |
| # 3) Writer agent (reduce) | |
| writer = Agent( | |
| name="Writer", | |
| instructions=[ | |
| "You are a report writer.", | |
| "Use the aggregated search summaries (from previous step) to write a coherent, well-structured report with headings and sources." | |
| ], | |
| ) | |
| # -- Function step that does dynamic map (Parallel) + returns aggregated StepOutput -- | |
| def dynamic_map_parallel(step_input: StepInput) -> StepOutput: | |
| # Read the decomposer output (previous step’s content) | |
| raw = step_input.previous_step_content or step_input.message or "[]" | |
| # Parse queries from JSON; fallback to lines | |
| queries: List[str] = [] | |
| try: | |
| data = json.loads(raw) if isinstance(raw, str) else raw | |
| if isinstance(data, list): | |
| for item in data: | |
| if isinstance(item, dict) and "query" in item: | |
| queries.append(str(item["query"])) | |
| elif isinstance(item, str): | |
| queries.append(item) | |
| except Exception: | |
| # fallback if the decomposer returned lines | |
| if isinstance(raw, str): | |
| queries = [ln.strip() for ln in raw.splitlines() if ln.strip()] | |
| if not queries: | |
| return StepOutput(step_name="Dynamic Map", content="No queries found to execute in parallel.") | |
| # Build a Parallel of function steps, one per query | |
| def make_search_step(q: str) -> Step: | |
| def run_search(_: StepInput) -> StepOutput: | |
| # Run the search agent as a one-off step to ensure proper StepOutput | |
| s = Step(name=f"search_{q[:32]}", agent=search_agent) | |
| # Create a fresh StepInput so the agent receives the query as its message | |
| return s.execute(StepInput(message=f"Search this query and summarize findings:\n{q}")) | |
| return Step(name=f"Search:{q[:24]}", executor=run_search) | |
| parallel = Parallel(*[make_search_step(q) for q in queries], name="Map: Search Queries") | |
| # Execute the dynamic Parallel with the same StepInput context | |
| aggregated = parallel.execute(step_input) | |
| # aggregated is a StepOutput with: | |
| # - content: aggregated markdown summary | |
| # - parallel_step_outputs: dict of sub-step StepOutput objects | |
| return aggregated | |
| # Assemble the workflow: Decompose -> Dynamic Parallel Map -> Writer (Reduce) | |
| workflow = Workflow( | |
| name="Dynamic Map-Reduce Workflow", | |
| steps=[ | |
| Step(name="Decompose", agent=decomposer), | |
| Step(name="Map (Dynamic Parallel)", executor=dynamic_map_parallel), | |
| Step(name="Write Report", agent=writer), | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| workflow.print_response( | |
| message="Produce a report on current AI safety debates and practical mitigations", | |
| stream=True, | |
| stream_intermediate_steps=True, | |
| markdown=True, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment