Created
November 18, 2025 04:42
-
-
Save janakiramm/79746d157b7b49fbd6f6d43bf6158cca to your computer and use it in GitHub Desktop.
Implementing a Multi-Agent System in Google ADK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ### Part 1 - Imports | |
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import re | |
| from typing import Optional, Dict, Any | |
| from google.adk.agents import LlmAgent, SequentialAgent | |
| from google.adk.runners import Runner | |
| from google.adk.sessions import InMemorySessionService | |
| from google.genai import types | |
| ### Part 2 - Configuration | |
| APP_NAME = "code-pipeline-demo" | |
| USER_ID = "demo-user" | |
| SESSION_ID = "code-session-1" | |
| GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") | |
| logging.basicConfig(level=logging.getLevelName(os.getenv("LOG_LEVEL", "ERROR"))) | |
| log = logging.getLogger(__name__) | |
| ### Part 3 - Define agents | |
| code_writer = LlmAgent( | |
| name="CodeWriter", | |
| model=GEMINI_MODEL, | |
| instruction="Generate complete, functional Python code based on the user's request. " | |
| "Output only the code in ```python...``` format.", | |
| output_key="generated_code" | |
| ) | |
| code_reviewer = LlmAgent( | |
| name="CodeReviewer", | |
| model=GEMINI_MODEL, | |
| instruction="Review the code from the previous step (in conversation history). " | |
| "Provide concise, bulleted feedback on correctness, readability, efficiency, " | |
| "edge cases, and best practices. If excellent, say 'No major issues found.'", | |
| output_key="review_comments" | |
| ) | |
| code_refactorer = LlmAgent( | |
| name="CodeRefactorer", | |
| model=GEMINI_MODEL, | |
| instruction="Apply review feedback to refactor the code (both in conversation history). " | |
| "Output only the final, improved code in ```python...``` format.", | |
| output_key="refactored_code" | |
| ) | |
| ### Part 4 - Create pipeline | |
| pipeline = SequentialAgent( | |
| name="CodePipeline", | |
| sub_agents=[code_writer, code_reviewer, code_refactorer] | |
| ) | |
| ### Part 5 - Setup runner | |
| session_service = InMemorySessionService() | |
| runner = Runner(app_name=APP_NAME, agent=pipeline, session_service=session_service) | |
| def extract_code(text: str) -> str: | |
| """Extract code from markdown code blocks.""" | |
| if not text: | |
| return "" | |
| # Try ```python first, then any ``` | |
| for pattern in [r'```python\n(.*?)```', r'```\n(.*?)```']: | |
| if match := re.search(pattern, text, re.DOTALL): | |
| return match.group(1).strip() | |
| return text.strip() | |
| def save_code(code: str, filename: str = "test.py") -> None: | |
| """Save code to a file.""" | |
| with open(filename, "w") as f: | |
| f.write(code) | |
| print(f"\n💾 Saved to {filename}") | |
| async def run_pipeline(query: str, state: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Execute the pipeline and return results.""" | |
| # Create session | |
| await session_service.create_session( | |
| app_name=APP_NAME, | |
| user_id=USER_ID, | |
| session_id=SESSION_ID, | |
| state=state or {} | |
| ) | |
| # Run pipeline | |
| message = types.Content(role='user', parts=[types.Part(text=query)]) | |
| response_text = "" | |
| async for event in runner.run_async( | |
| user_id=USER_ID, | |
| session_id=SESSION_ID, | |
| new_message=message | |
| ): | |
| if event.is_final_response() and event.content and event.content.parts: | |
| response_text = event.content.parts[0].text | |
| # Get final state | |
| session = await session_service.get_session( | |
| app_name=APP_NAME, | |
| user_id=USER_ID, | |
| session_id=SESSION_ID | |
| ) | |
| return { | |
| "response": response_text, | |
| "state": session.state if session else {} | |
| } | |
| def print_summary(result: Dict[str, Any]) -> None: | |
| """Print pipeline execution summary.""" | |
| state = result.get("state", {}) | |
| print("\n=== Pipeline Summary ===\n") | |
| stages = [ | |
| ("generated_code", "Code Generation", "✓"), | |
| ("review_comments", "Code Review", "✓"), | |
| ("refactored_code", "Refactoring", "✓") | |
| ] | |
| for key, name, symbol in stages: | |
| if key in state: | |
| print(f"{symbol} {name}: Completed") | |
| async def main() -> None: | |
| """Main entry point.""" | |
| print("=== Code Pipeline Demo ===") | |
| print("Three-stage pipeline: Writer → Reviewer → Refactorer\n") | |
| # Get user request | |
| query = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else ( | |
| "Write a Python module that fetches JSON from a URL and prints a table " | |
| "of keys and values. Include __main__ to run as a script with URL argument." | |
| ) | |
| print(f"Request: {query}\n") | |
| try: | |
| # Run pipeline | |
| result = await run_pipeline(query) | |
| # Print summary | |
| print_summary(result) | |
| # Extract and print final code | |
| state = result.get("state", {}) | |
| final_code = ( | |
| extract_code(state.get("refactored_code", "")) or | |
| extract_code(result.get("response", "")) or | |
| extract_code(state.get("generated_code", "")) | |
| ) | |
| #print("\n=== Final Code ===") | |
| #print(final_code) | |
| # Save to file | |
| if final_code: | |
| save_code(final_code) | |
| except Exception as e: | |
| log.error(f"Pipeline failed: {e}", exc_info=True) | |
| print(f"\n❌ Error: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment