Skip to content

Instantly share code, notes, and snippets.

@yoyu777
Created December 5, 2025 09:42
Show Gist options
  • Select an option

  • Save yoyu777/51c3b5594f5beba5246eb559911cda42 to your computer and use it in GitHub Desktop.

Select an option

Save yoyu777/51c3b5594f5beba5246eb559911cda42 to your computer and use it in GitHub Desktop.
Task 3.5: Call the agent upon user input
# This is a utility module for ADK data structures
from google.genai import types
# This function needs to be **asynchronous** (`async def`),
# as we need to **await** the responses to be produced.
async def validate_input(user_input: str):
try:
print(f'validating input: {user_input}')
# `run_async` will return an asynchronous Generator,
# which may produce various responses (stream)
# Therefore, we must use an `async for ... in ...` loop
# to correctly process the streamed output.
async for event in validation_agent_runner.run_async(
user_id=user_id,
session_id=validation_session.id, # Get the unique ID for the current session
new_message=types.Content(role='user', parts=[types.Part(text=user_input)]) # Format the user's input into the required structure
):
# Check if the current streamed event contains the final response from the Agent
if event.is_final_response():
print(f'validation result: {event.content.parts[0].text}')
# Parse the final text (which is a JSON string) into a Python dictionary
return json.loads(event.content.parts[0].text)
except Exception as e:
display(f"Error during validation: {e}")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment