Created
July 24, 2025 18:43
-
-
Save KennyVaneetvelde/7e67da1b25015420db33544528577d45 to your computer and use it in GitHub Desktop.
Atomic Agents Dynamic Orchestration Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from typing import Union, List, Type | |
| import instructor | |
| import openai | |
| from pydantic import Field | |
| from rich.console import Console | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| from atomic_agents.agents.base_agent import BaseAgent, BaseAgentConfig | |
| from atomic_agents.lib.base.base_io_schema import BaseIOSchema | |
| from atomic_agents.lib.components.system_prompt_generator import SystemPromptGenerator | |
| from atomic_agents.lib.base.base_tool import BaseTool | |
| # Import tools from local directory | |
| from tools import ( | |
| SearxNGSearchTool, | |
| SearxNGSearchToolConfig, | |
| CalculatorTool, | |
| CalculatorToolConfig, | |
| ) | |
| console = Console() | |
| # Input schema for the orchestrator | |
| class OrchestratorInputSchema(BaseIOSchema): | |
| """Schema for the input to the Orchestrator.""" | |
| user_input: str = Field(..., description="The user's input or question") | |
| # Final answer schema | |
| class FinalAnswerSchema(BaseIOSchema): | |
| """Schema for the final answer.""" | |
| response: str = Field(..., description="The final response to the user") | |
| def create_dynamic_output_schema(tools: List[BaseTool]) -> Type[BaseIOSchema]: | |
| """Create a dynamic output schema based on the tools list.""" | |
| input_schemas = [tool.input_schema for tool in tools] | |
| # Create Union type from all tool input schemas | |
| if len(input_schemas) == 0: | |
| raise ValueError("No tools provided") | |
| elif len(input_schemas) == 1: | |
| union_type = input_schemas[0] | |
| else: | |
| union_type = Union[tuple(input_schemas)] | |
| # Create new schema class dynamically | |
| class DynamicOrchestratorOutputSchema(BaseIOSchema): | |
| """Schema for the output from the Orchestrator using type-based selection.""" | |
| tool_parameters: union_type = Field( | |
| ..., | |
| description="The parameters for the selected tool, type determines which tool to use", | |
| ) | |
| return DynamicOrchestratorOutputSchema | |
| def execute_tool_by_type( | |
| tools: List[BaseTool], orchestrator_output: BaseIOSchema | |
| ) -> BaseIOSchema: | |
| """ | |
| Execute the appropriate tool based on the type of tool_parameters. | |
| Finds the matching tool by checking isinstance against each tool's input schema. | |
| """ | |
| for tool in tools: | |
| if isinstance(orchestrator_output.tool_parameters, tool.input_schema): | |
| console.print( | |
| f"[cyan]Using {tool.__class__.__name__} (detected by type)[/cyan]" | |
| ) | |
| return tool.run(orchestrator_output.tool_parameters) | |
| raise ValueError( | |
| f"No tool found for parameter type: {type(orchestrator_output.tool_parameters)}" | |
| ) | |
| def main(): | |
| # Initialize the client | |
| client = instructor.from_openai(openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))) | |
| # Create tools list | |
| tools = [ | |
| SearxNGSearchTool( | |
| config=SearxNGSearchToolConfig(base_url="http://localhost:8888") | |
| ), | |
| CalculatorTool(config=CalculatorToolConfig()), | |
| ] | |
| # Create dynamic output schema based on tools | |
| DynamicOutputSchema = create_dynamic_output_schema(tools) | |
| # Build tool descriptions for the system prompt | |
| tool_descriptions = [] | |
| for i, tool in enumerate(tools, 1): | |
| tool_descriptions.append( | |
| f"{i}. {tool.__class__.__name__} - {tool.input_schema.__doc__.strip()}" | |
| ) | |
| # Create the orchestrator agent with type-based selection | |
| system_prompt = SystemPromptGenerator( | |
| background=[ | |
| "You are an intelligent orchestrator that can route user requests to appropriate tools.", | |
| "You determine which tool to use based on the user's input.", | |
| "You have access to these tools:", | |
| ] | |
| + tool_descriptions, | |
| steps=[ | |
| "Analyze the user's input to determine the intent", | |
| "Select the appropriate tool by returning the correct schema type:", | |
| ] | |
| + [ | |
| f" - For {tool.__class__.__name__}: return {tool.input_schema.__name__}" | |
| for tool in tools | |
| ] | |
| + [ | |
| "Important: The system will automatically detect which tool to use based on the schema type you return" | |
| ], | |
| output_instructions=[ | |
| "Return ONLY the appropriate tool parameters", | |
| "Do NOT include a 'tool' field - the tool is determined by the parameter type", | |
| "Make sure to fill in all required fields for the chosen tool's input schema", | |
| ], | |
| ) | |
| orchestrator_agent = BaseAgent( | |
| config=BaseAgentConfig( | |
| client=client, | |
| model="gpt-4o-mini", | |
| system_prompt_generator=system_prompt, | |
| input_schema=OrchestratorInputSchema, | |
| output_schema=DynamicOutputSchema, | |
| ) | |
| ) | |
| # Create the final response agent | |
| response_prompt = SystemPromptGenerator( | |
| background=[ | |
| "You are responsible for providing clear, concise answers based on tool outputs.", | |
| "You receive results from various tools and need to format them appropriately.", | |
| ], | |
| steps=[ | |
| "Review the tool output", | |
| "Formulate a clear, helpful response for the user", | |
| "Include relevant details from the tool results", | |
| ], | |
| output_instructions=[ | |
| "Provide a complete answer to the user's question", | |
| "Be concise but thorough", | |
| "Format numbers and data clearly", | |
| ], | |
| ) | |
| response_agent = BaseAgent( | |
| config=BaseAgentConfig( | |
| client=client, | |
| model="gpt-4o-mini", | |
| system_prompt_generator=response_prompt, | |
| input_schema=BaseIOSchema, | |
| output_schema=FinalAnswerSchema, | |
| ) | |
| ) | |
| # Main interaction loop | |
| console.print("[bold green]Type-Based Orchestrator Agent[/bold green]") | |
| console.print( | |
| "Ask me anything! I'll automatically route to the appropriate tool based on your query." | |
| ) | |
| console.print("Type 'exit' to quit.\n") | |
| while True: | |
| user_input = console.input("[bold blue]You:[/bold blue] ") | |
| if user_input.lower() in ["exit", "quit"]: | |
| console.print("Goodbye!") | |
| break | |
| try: | |
| # Step 1: Orchestrator determines which tool to use (by type) | |
| orchestrator_input = OrchestratorInputSchema(user_input=user_input) | |
| orchestrator_output = orchestrator_agent.run(orchestrator_input) | |
| # Step 2: Execute the tool based on parameter type | |
| tool_result = execute_tool_by_type(tools, orchestrator_output) | |
| # Step 3: Generate final response | |
| # Create a generic schema with the tool result and original question | |
| class ToolResultSchema(BaseIOSchema): | |
| """Schema for passing tool results to response agent.""" | |
| original_question: str = Field(..., description="The original user question") | |
| result: str = Field(..., description="The tool execution result") | |
| # Convert tool result to string for response agent | |
| result_text = str(tool_result) | |
| tool_result_input = ToolResultSchema( | |
| original_question=user_input, | |
| result=result_text | |
| ) | |
| final_response = response_agent.run(tool_result_input) | |
| console.print( | |
| f"\n[bold green]Assistant:[/bold green] {final_response.response}\n" | |
| ) | |
| except Exception as e: | |
| console.print(f"[bold red]Error:[/bold red] {str(e)}\n") | |
| # To add more tools, just add them to the tools list: | |
| # from tools.another_tool import AnotherTool, AnotherToolConfig | |
| # tools.append(AnotherTool(config=AnotherToolConfig())) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment