Last active
October 27, 2025 19:22
-
-
Save kausmeows/1306b9ee75f289a3c7e85368ffbfea16 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from os import getenv | |
| from typing import ( | |
| Any, | |
| AsyncIterator, | |
| Awaitable, | |
| Callable, | |
| Dict, | |
| Iterator, | |
| List, | |
| Literal, | |
| Optional, | |
| Tuple, | |
| Type, | |
| Union, | |
| cast, | |
| overload, | |
| ) | |
| from uuid import uuid4 | |
| from fastapi import WebSocket | |
| from pydantic import BaseModel | |
| from agno.agent.agent import Agent | |
| from agno.db.base import AsyncBaseDb, BaseDb, SessionType | |
| from agno.exceptions import InputCheckError, OutputCheckError, RunCancelledException | |
| from agno.media import Audio, File, Image, Video | |
| from agno.models.message import Message | |
| from agno.models.metrics import Metrics | |
| from agno.run.agent import RunContentEvent, RunEvent, RunOutput | |
| from agno.run.base import RunStatus | |
| from agno.run.cancel import ( | |
| cancel_run as cancel_run_global, | |
| ) | |
| from agno.run.cancel import ( | |
| cleanup_run, | |
| raise_if_cancelled, | |
| register_run, | |
| ) | |
| from agno.run.team import RunCompletedEvent, RunContentEvent as TeamRunContentEvent, RunStartedEvent | |
| from agno.run.team import TeamRunEvent | |
| from agno.run.workflow import ( | |
| StepOutputEvent, | |
| WorkflowCancelledEvent, | |
| WorkflowCompletedEvent, | |
| WorkflowRunEvent, | |
| WorkflowRunOutput, | |
| WorkflowRunOutputEvent, | |
| WorkflowStartedEvent, | |
| ) | |
| from agno.session.workflow import WorkflowSession | |
| from agno.team.team import Team | |
| from agno.utils.common import is_typed_dict, validate_typed_dict | |
| from agno.utils.log import ( | |
| log_debug, | |
| log_error, | |
| log_warning, | |
| logger, | |
| set_log_level_to_debug, | |
| set_log_level_to_info, | |
| use_workflow_logger, | |
| ) | |
| from agno.utils.print_response.workflow import ( | |
| aprint_response, | |
| aprint_response_stream, | |
| print_response, | |
| print_response_stream, | |
| ) | |
| from agno.workflow.agent import WorkflowAgent | |
| from agno.workflow.condition import Condition | |
| from agno.workflow.loop import Loop | |
| from agno.workflow.parallel import Parallel | |
| from agno.workflow.router import Router | |
| from agno.workflow.step import Step | |
| from agno.workflow.steps import Steps | |
| from agno.workflow.types import ( | |
| StepInput, | |
| StepMetrics, | |
| StepOutput, | |
| StepType, | |
| WebSocketHandler, | |
| WorkflowExecutionInput, | |
| WorkflowMetrics, | |
| ) | |
| STEP_TYPE_MAPPING = { | |
| Step: StepType.STEP, | |
| Steps: StepType.STEPS, | |
| Loop: StepType.LOOP, | |
| Parallel: StepType.PARALLEL, | |
| Condition: StepType.CONDITION, | |
| Router: StepType.ROUTER, | |
| } | |
| WorkflowSteps = Union[ | |
| Callable[ | |
| ["Workflow", WorkflowExecutionInput], | |
| Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput], Any], | |
| ], | |
| Steps, | |
| List[ | |
| Union[ | |
| Callable[ | |
| [StepInput], Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput]] | |
| ], | |
| Step, | |
| Steps, | |
| Loop, | |
| Parallel, | |
| Condition, | |
| Router, | |
| ] | |
| ], | |
| ] | |
| @dataclass | |
| class Workflow: | |
| """Pipeline-based workflow execution""" | |
| # Workflow identification - make name optional with default | |
| name: Optional[str] = None | |
| # Workflow ID (autogenerated if not set) | |
| id: Optional[str] = None | |
| # Workflow description | |
| description: Optional[str] = None | |
| # Workflow steps | |
| steps: Optional[WorkflowSteps] = None | |
| # Database to use for this workflow | |
| db: Optional[Union[BaseDb, AsyncBaseDb]] = None | |
| # Agentic Workflow - WorkflowAgent that decides when to run the workflow | |
| agent: Optional[WorkflowAgent] = None # type: ignore | |
| # Default session_id to use for this workflow (autogenerated if not set) | |
| session_id: Optional[str] = None | |
| # Default user_id to use for this workflow | |
| user_id: Optional[str] = None | |
| # Default session state (stored in the database to persist across runs) | |
| session_state: Optional[Dict[str, Any]] = None | |
| # Set to True to overwrite the stored session_state with the session_state provided in the run | |
| overwrite_db_session_state: bool = False | |
| # If True, the workflow runs in debug mode | |
| debug_mode: Optional[bool] = False | |
| # --- Workflow Streaming --- | |
| # Stream the response from the Workflow | |
| stream: Optional[bool] = None | |
| # Stream the intermediate steps from the Workflow | |
| stream_events: bool = False | |
| # [Deprecated] Stream the intermediate steps from the Workflow | |
| stream_intermediate_steps: bool = False | |
| # Stream events from executors (agents/teams/functions) within steps | |
| stream_executor_events: bool = True | |
| # Persist the events on the run response | |
| store_events: bool = False | |
| # Events to skip when persisting the events on the run response | |
| events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None | |
| # Control whether to store executor responses (agent/team responses) in flattened runs | |
| store_executor_outputs: bool = True | |
| websocket_handler: Optional[WebSocketHandler] = None | |
| # Input schema to validate the input to the workflow | |
| input_schema: Optional[Type[BaseModel]] = None | |
| # Metadata stored with this workflow | |
| metadata: Optional[Dict[str, Any]] = None | |
| # --- Telemetry --- | |
| # telemetry=True logs minimal telemetry for analytics | |
| # This helps us improve the Agent and provide better support | |
| telemetry: bool = True | |
| # Add this flag to control if the workflow should add history to the steps | |
| add_workflow_history_to_steps: bool = False | |
| # Number of historical runs to include in the messages | |
| num_history_runs: int = 3 | |
| def __init__( | |
| self, | |
| id: Optional[str] = None, | |
| name: Optional[str] = None, | |
| description: Optional[str] = None, | |
| db: Optional[Union[BaseDb, AsyncBaseDb]] = None, | |
| steps: Optional[WorkflowSteps] = None, | |
| agent: Optional[WorkflowAgent] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| overwrite_db_session_state: bool = False, | |
| user_id: Optional[str] = None, | |
| debug_mode: Optional[bool] = False, | |
| stream: Optional[bool] = None, | |
| stream_events: bool = False, | |
| stream_intermediate_steps: bool = False, | |
| stream_executor_events: bool = True, | |
| store_events: bool = False, | |
| events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None, | |
| store_executor_outputs: bool = True, | |
| input_schema: Optional[Type[BaseModel]] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| cache_session: bool = False, | |
| telemetry: bool = True, | |
| add_workflow_history_to_steps: bool = False, | |
| num_history_runs: int = 3, | |
| ): | |
| self.id = id | |
| self.name = name | |
| self.description = description | |
| self.steps = steps | |
| self.agent = agent | |
| self.session_id = session_id | |
| self.session_state = session_state | |
| self.overwrite_db_session_state = overwrite_db_session_state | |
| self.user_id = user_id | |
| self.debug_mode = debug_mode | |
| self.store_events = store_events | |
| self.events_to_skip = events_to_skip or [] | |
| self.stream = stream | |
| self.stream_events = stream_events | |
| self.stream_intermediate_steps = stream_intermediate_steps | |
| self.stream_executor_events = stream_executor_events | |
| self.store_executor_outputs = store_executor_outputs | |
| self.input_schema = input_schema | |
| self.metadata = metadata | |
| self.cache_session = cache_session | |
| self.db = db | |
| self.telemetry = telemetry | |
| self.add_workflow_history_to_steps = add_workflow_history_to_steps | |
| self.num_history_runs = num_history_runs | |
| self._workflow_session: Optional[WorkflowSession] = None | |
| def set_id(self) -> None: | |
| if self.id is None: | |
| if self.name is not None: | |
| self.id = self.name.lower().replace(" ", "-") | |
| else: | |
| self.id = str(uuid4()) | |
| def _has_async_db(self) -> bool: | |
| return self.db is not None and isinstance(self.db, AsyncBaseDb) | |
| def _validate_input( | |
| self, input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] | |
| ) -> Optional[Union[str, List, Dict, Message, BaseModel]]: | |
| """Parse and validate input against input_schema if provided""" | |
| if self.input_schema is None: | |
| return input # Return input unchanged if no schema is set | |
| if input is None: | |
| raise ValueError("Input required when input_schema is set") | |
| # Handle Message objects - extract content | |
| if isinstance(input, Message): | |
| input = input.content # type: ignore | |
| # If input is a string, convert it to a dict | |
| if isinstance(input, str): | |
| import json | |
| try: | |
| input = json.loads(input) | |
| except Exception as e: | |
| raise ValueError(f"Failed to parse input. Is it a valid JSON string?: {e}") | |
| # Case 1: Message is already a BaseModel instance | |
| if isinstance(input, BaseModel): | |
| if isinstance(input, self.input_schema): | |
| try: | |
| return input | |
| except Exception as e: | |
| raise ValueError(f"BaseModel validation failed: {str(e)}") | |
| else: | |
| # Different BaseModel types | |
| raise ValueError(f"Expected {self.input_schema.__name__} but got {type(input).__name__}") | |
| # Case 2: Message is a dict | |
| elif isinstance(input, dict): | |
| try: | |
| # Check if the schema is a TypedDict | |
| if is_typed_dict(self.input_schema): | |
| validated_dict = validate_typed_dict(input, self.input_schema) | |
| return validated_dict | |
| else: | |
| validated_model = self.input_schema(**input) | |
| return validated_model | |
| except Exception as e: | |
| raise ValueError(f"Failed to parse dict into {self.input_schema.__name__}: {str(e)}") | |
| # Case 3: Other types not supported for structured input | |
| else: | |
| raise ValueError( | |
| f"Cannot validate {type(input)} against input_schema. Expected dict or {self.input_schema.__name__} instance." | |
| ) | |
| @property | |
| def run_parameters(self) -> Dict[str, Any]: | |
| """Get the run parameters for the workflow""" | |
| if self.steps is None: | |
| return {} | |
| parameters = {} | |
| if self.steps and callable(self.steps): | |
| from inspect import Parameter, signature | |
| sig = signature(self.steps) # type: ignore | |
| for param_name, param in sig.parameters.items(): | |
| if param_name not in ["workflow", "execution_input", "self"]: | |
| parameters[param_name] = { | |
| "name": param_name, | |
| "default": param.default.default | |
| if hasattr(param.default, "__class__") and param.default.__class__.__name__ == "FieldInfo" | |
| else (param.default if param.default is not Parameter.empty else None), | |
| "annotation": ( | |
| param.annotation.__name__ | |
| if hasattr(param.annotation, "__name__") | |
| else ( | |
| str(param.annotation).replace("typing.Optional[", "").replace("]", "") | |
| if "typing.Optional" in str(param.annotation) | |
| else str(param.annotation) | |
| ) | |
| ) | |
| if param.annotation is not Parameter.empty | |
| else None, | |
| "required": param.default is Parameter.empty, | |
| } | |
| else: | |
| parameters = { | |
| "message": { | |
| "name": "message", | |
| "default": None, | |
| "annotation": "str", | |
| "required": True, | |
| }, | |
| } | |
| return parameters | |
| def initialize_workflow(self): | |
| if self.id is None: | |
| self.set_id() | |
| log_debug(f"Generated new workflow_id: {self.id}") | |
| def _initialize_session( | |
| self, | |
| session_id: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| ) -> Tuple[str, Optional[str]]: | |
| """Initialize the session for the workflow.""" | |
| if session_id is None: | |
| if self.session_id: | |
| session_id = self.session_id | |
| else: | |
| session_id = str(uuid4()) | |
| # We make the session_id sticky to the agent instance if no session_id is provided | |
| self.session_id = session_id | |
| log_debug(f"Session ID: {session_id}", center=True) | |
| # Use the default user_id when necessary | |
| if user_id is None or user_id == "": | |
| user_id = self.user_id | |
| return session_id, user_id | |
| def _initialize_session_state( | |
| self, | |
| session_state: Dict[str, Any], | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| run_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Initialize the session state for the workflow.""" | |
| if user_id: | |
| session_state["current_user_id"] = user_id | |
| if session_id is not None: | |
| session_state["current_session_id"] = session_id | |
| if run_id is not None: | |
| session_state["current_run_id"] = run_id | |
| session_state.update( | |
| { | |
| "workflow_id": self.id, | |
| "run_id": run_id, | |
| "session_id": session_id, | |
| } | |
| ) | |
| if self.name: | |
| session_state["workflow_name"] = self.name | |
| return session_state | |
| def _generate_workflow_session_name(self) -> str: | |
| """Generate a name for the workflow session""" | |
| if self.session_id is None: | |
| return f"Workflow Session - {datetime.now().strftime('%Y-%m-%d %H:%M')}" | |
| datetime_str = datetime.now().strftime("%Y-%m-%d %H:%M") | |
| new_session_name = f"Workflow Session-{datetime_str}" | |
| if self.description: | |
| truncated_desc = self.description[:40] + "-" if len(self.description) > 40 else self.description | |
| new_session_name = f"{truncated_desc} - {datetime_str}" | |
| return new_session_name | |
| async def aset_session_name( | |
| self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None | |
| ) -> WorkflowSession: | |
| """Set the session name and save to storage, using an async database""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| # -*- Read from storage | |
| session = await self.aget_session(session_id=session_id) # type: ignore | |
| if autogenerate: | |
| # -*- Generate name for session | |
| session_name = self._generate_workflow_session_name() | |
| log_debug(f"Generated Workflow Session Name: {session_name}") | |
| elif session_name is None: | |
| raise Exception("Session name is not set") | |
| # -*- Rename session | |
| session.session_data["session_name"] = session_name # type: ignore | |
| # -*- Save to storage | |
| await self.asave_session(session=session) # type: ignore | |
| return session # type: ignore | |
| def set_session_name( | |
| self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None | |
| ) -> WorkflowSession: | |
| """Set the session name and save to storage""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| # -*- Read from storage | |
| session = self.get_session(session_id=session_id) # type: ignore | |
| if autogenerate: | |
| # -*- Generate name for session | |
| session_name = self._generate_workflow_session_name() | |
| log_debug(f"Generated Workflow Session Name: {session_name}") | |
| elif session_name is None: | |
| raise Exception("Session name is not set") | |
| # -*- Rename session | |
| session.session_data["session_name"] = session_name # type: ignore | |
| # -*- Save to storage | |
| self.save_session(session=session) # type: ignore | |
| return session # type: ignore | |
| async def aget_session_name(self, session_id: Optional[str] = None) -> str: | |
| """Get the session name for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = await self.aget_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| return session.session_data.get("session_name", "") if session.session_data else "" | |
| def get_session_name(self, session_id: Optional[str] = None) -> str: | |
| """Get the session name for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = self.get_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| return session.session_data.get("session_name", "") if session.session_data else "" | |
| async def aget_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Get the session state for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = await self.aget_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| return session.session_data.get("session_state", {}) if session.session_data else {} | |
| def get_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Get the session state for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = self.get_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| return session.session_data.get("session_state", {}) if session.session_data else {} | |
| def update_session_state( | |
| self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Update the session state for the given session ID. | |
| Args: | |
| session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs. | |
| session_id: The session ID to update. If not provided, the current cached session ID is used. | |
| Returns: | |
| dict: The updated session state. | |
| """ | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = self.get_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| if session.session_data is not None and "session_state" not in session.session_data: | |
| session.session_data["session_state"] = {} | |
| for key, value in session_state_updates.items(): | |
| session.session_data["session_state"][key] = value # type: ignore | |
| self.save_session(session=session) | |
| return session.session_data["session_state"] # type: ignore | |
| async def aupdate_session_state( | |
| self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Update the session state for the given session ID (async). | |
| Args: | |
| session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs. | |
| session_id: The session ID to update. If not provided, the current cached session ID is used. | |
| Returns: | |
| dict: The updated session state. | |
| """ | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is not set") | |
| session = await self.aget_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| if session.session_data is not None and "session_state" not in session.session_data: | |
| session.session_data["session_state"] = {} # type: ignore | |
| for key, value in session_state_updates.items(): | |
| session.session_data["session_state"][key] = value # type: ignore | |
| await self.asave_session(session=session) | |
| return session.session_data["session_state"] # type: ignore | |
| async def adelete_session(self, session_id: str): | |
| """Delete the current session and save to storage""" | |
| if self.db is None: | |
| return | |
| # -*- Delete session | |
| await self.db.delete_session(session_id=session_id) # type: ignore | |
| def delete_session(self, session_id: str): | |
| """Delete the current session and save to storage""" | |
| if self.db is None: | |
| return | |
| # -*- Delete session | |
| self.db.delete_session(session_id=session_id) | |
| async def aget_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]: | |
| """Get a RunOutput from the database.""" | |
| if self._workflow_session is not None: | |
| run_response = self._workflow_session.get_run(run_id=run_id) | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}") | |
| return None | |
| else: | |
| workflow_session = await self.aget_session(session_id=session_id) # type: ignore | |
| if workflow_session is not None: | |
| run_response = workflow_session.get_run(run_id=run_id) | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}") | |
| return None | |
| def get_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]: | |
| """Get a RunOutput from the database.""" | |
| if self._workflow_session is not None: | |
| run_response = self._workflow_session.get_run(run_id=run_id) | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}") | |
| return None | |
| else: | |
| workflow_session = self.get_session(session_id=session_id) | |
| if workflow_session is not None: | |
| run_response = workflow_session.get_run(run_id=run_id) | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}") | |
| return None | |
| async def aget_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]: | |
| """Get the last run response from the database.""" | |
| if ( | |
| self._workflow_session is not None | |
| and self._workflow_session.runs is not None | |
| and len(self._workflow_session.runs) > 0 | |
| ): | |
| run_response = self._workflow_session.runs[-1] | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| workflow_session = await self.aget_session(session_id=session_id) # type: ignore | |
| if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0: | |
| run_response = workflow_session.runs[-1] | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"No run responses found in WorkflowSession {session_id}") | |
| return None | |
| def get_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]: | |
| """Get the last run response from the database.""" | |
| if ( | |
| self._workflow_session is not None | |
| and self._workflow_session.runs is not None | |
| and len(self._workflow_session.runs) > 0 | |
| ): | |
| run_response = self._workflow_session.runs[-1] | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| workflow_session = self.get_session(session_id=session_id) | |
| if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0: | |
| run_response = workflow_session.runs[-1] | |
| if run_response is not None: | |
| return run_response | |
| else: | |
| log_warning(f"No run responses found in WorkflowSession {session_id}") | |
| return None | |
| def read_or_create_session( | |
| self, | |
| session_id: str, | |
| user_id: Optional[str] = None, | |
| ) -> WorkflowSession: | |
| from time import time | |
| # Returning cached session if we have one | |
| if self._workflow_session is not None and self._workflow_session.session_id == session_id: | |
| return self._workflow_session | |
| # Try to load from database | |
| workflow_session = None | |
| if self.db is not None: | |
| log_debug(f"Reading WorkflowSession: {session_id}") | |
| workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id)) | |
| if workflow_session is None: | |
| # Creating new session if none found | |
| log_debug(f"Creating new WorkflowSession: {session_id}") | |
| session_data = {} | |
| if self.session_state is not None: | |
| from copy import deepcopy | |
| session_data["session_state"] = deepcopy(self.session_state) | |
| workflow_session = WorkflowSession( | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| user_id=user_id, | |
| workflow_data=self._get_workflow_data(), | |
| session_data=session_data, | |
| metadata=self.metadata, | |
| created_at=int(time()), | |
| ) | |
| # Cache the session if relevant | |
| if workflow_session is not None and self.cache_session: | |
| self._workflow_session = workflow_session | |
| return workflow_session | |
| async def aread_or_create_session( | |
| self, | |
| session_id: str, | |
| user_id: Optional[str] = None, | |
| ) -> WorkflowSession: | |
| from time import time | |
| # Returning cached session if we have one | |
| if self._workflow_session is not None and self._workflow_session.session_id == session_id: | |
| return self._workflow_session | |
| # Try to load from database | |
| workflow_session = None | |
| if self.db is not None: | |
| log_debug(f"Reading WorkflowSession: {session_id}") | |
| workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id)) | |
| if workflow_session is None: | |
| # Creating new session if none found | |
| log_debug(f"Creating new WorkflowSession: {session_id}") | |
| workflow_session = WorkflowSession( | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| user_id=user_id, | |
| workflow_data=self._get_workflow_data(), | |
| session_data={}, | |
| metadata=self.metadata, | |
| created_at=int(time()), | |
| ) | |
| # Cache the session if relevant | |
| if workflow_session is not None and self.cache_session: | |
| self._workflow_session = workflow_session | |
| return workflow_session | |
| async def aget_session( | |
| self, | |
| session_id: Optional[str] = None, | |
| ) -> Optional[WorkflowSession]: | |
| """Load an WorkflowSession from database. | |
| Args: | |
| session_id: The session_id to load from storage. | |
| Returns: | |
| WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist. | |
| """ | |
| if not session_id and not self.session_id: | |
| raise Exception("No session_id provided") | |
| session_id_to_load = session_id or self.session_id | |
| # Try to load from database | |
| if self.db is not None and session_id_to_load is not None: | |
| workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id_to_load)) | |
| return workflow_session | |
| log_warning(f"WorkflowSession {session_id_to_load} not found in db") | |
| return None | |
| def get_session( | |
| self, | |
| session_id: Optional[str] = None, | |
| ) -> Optional[WorkflowSession]: | |
| """Load an WorkflowSession from database. | |
| Args: | |
| session_id: The session_id to load from storage. | |
| Returns: | |
| WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist. | |
| """ | |
| if not session_id and not self.session_id: | |
| raise Exception("No session_id provided") | |
| session_id_to_load = session_id or self.session_id | |
| # Try to load from database | |
| if self.db is not None and session_id_to_load is not None: | |
| workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id_to_load)) | |
| return workflow_session | |
| log_warning(f"WorkflowSession {session_id_to_load} not found in db") | |
| return None | |
| async def asave_session(self, session: WorkflowSession) -> None: | |
| """Save the WorkflowSession to storage, using an async database. | |
| Returns: | |
| Optional[WorkflowSession]: The saved WorkflowSession or None if not saved. | |
| """ | |
| if self.db is not None and session.session_data is not None: | |
| if session.session_data.get("session_state") is not None: | |
| session.session_data["session_state"].pop("current_session_id", None) | |
| session.session_data["session_state"].pop("current_user_id", None) | |
| session.session_data["session_state"].pop("current_run_id", None) | |
| session.session_data["session_state"].pop("workflow_id", None) | |
| session.session_data["session_state"].pop("run_id", None) | |
| session.session_data["session_state"].pop("session_id", None) | |
| session.session_data["session_state"].pop("workflow_name", None) | |
| await self._aupsert_session(session=session) # type: ignore | |
| log_debug(f"Created or updated WorkflowSession record: {session.session_id}") | |
| def save_session(self, session: WorkflowSession) -> None: | |
| """Save the WorkflowSession to storage | |
| Returns: | |
| Optional[WorkflowSession]: The saved WorkflowSession or None if not saved. | |
| """ | |
| if self.db is not None and session.session_data is not None: | |
| if session.session_data.get("session_state") is not None: | |
| session.session_data["session_state"].pop("current_session_id", None) | |
| session.session_data["session_state"].pop("current_user_id", None) | |
| session.session_data["session_state"].pop("current_run_id", None) | |
| session.session_data["session_state"].pop("workflow_id", None) | |
| session.session_data["session_state"].pop("run_id", None) | |
| session.session_data["session_state"].pop("session_id", None) | |
| session.session_data["session_state"].pop("workflow_name", None) | |
| self._upsert_session(session=session) | |
| log_debug(f"Created or updated WorkflowSession record: {session.session_id}") | |
| # -*- Session Database Functions | |
| async def _aread_session(self, session_id: str) -> Optional[WorkflowSession]: | |
| """Get a Session from the database.""" | |
| try: | |
| if not self.db: | |
| raise ValueError("Db not initialized") | |
| session = await self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW) # type: ignore | |
| return session if isinstance(session, (WorkflowSession, type(None))) else None | |
| except Exception as e: | |
| log_warning(f"Error getting session from db: {e}") | |
| return None | |
| def _read_session(self, session_id: str) -> Optional[WorkflowSession]: | |
| """Get a Session from the database.""" | |
| try: | |
| if not self.db: | |
| raise ValueError("Db not initialized") | |
| session = self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW) | |
| return session if isinstance(session, (WorkflowSession, type(None))) else None | |
| except Exception as e: | |
| log_warning(f"Error getting session from db: {e}") | |
| return None | |
| async def _aupsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]: | |
| """Upsert a Session into the database.""" | |
| try: | |
| if not self.db: | |
| raise ValueError("Db not initialized") | |
| result = await self.db.upsert_session(session=session) # type: ignore | |
| return result if isinstance(result, (WorkflowSession, type(None))) else None | |
| except Exception as e: | |
| log_warning(f"Error upserting session into db: {e}") | |
| return None | |
| def _upsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]: | |
| """Upsert a Session into the database.""" | |
| try: | |
| if not self.db: | |
| raise ValueError("Db not initialized") | |
| result = self.db.upsert_session(session=session) | |
| return result if isinstance(result, (WorkflowSession, type(None))) else None | |
| except Exception as e: | |
| log_warning(f"Error upserting session into db: {e}") | |
| return None | |
| def _update_metadata(self, session: WorkflowSession): | |
| """Update the extra_data in the session""" | |
| from agno.utils.merge_dict import merge_dictionaries | |
| # Read metadata from the database | |
| if session.metadata is not None: | |
| # If metadata is set in the workflow, update the database metadata with the workflow's metadata | |
| if self.metadata is not None: | |
| # Updates workflow's session metadata in place | |
| merge_dictionaries(session.metadata, self.metadata) | |
| # Update the current metadata with the metadata from the database which is updated in place | |
| self.metadata = session.metadata | |
| def _load_session_state(self, session: WorkflowSession, session_state: Dict[str, Any]): | |
| """Load and return the stored session_state from the database, optionally merging it with the given one""" | |
| from agno.utils.merge_dict import merge_dictionaries | |
| # Get the session_state from the database and merge with proper precedence | |
| # At this point session_state contains: agent_defaults + run_params | |
| if session.session_data and "session_state" in session.session_data: | |
| session_state_from_db = session.session_data.get("session_state") | |
| if ( | |
| session_state_from_db is not None | |
| and isinstance(session_state_from_db, dict) | |
| and len(session_state_from_db) > 0 | |
| and not self.overwrite_db_session_state | |
| ): | |
| # This preserves precedence: run_params > db_state > agent_defaults | |
| merged_state = session_state_from_db.copy() | |
| merge_dictionaries(merged_state, session_state) | |
| session_state.clear() | |
| session_state.update(merged_state) | |
| # Update the session_state in the session | |
| if session.session_data is None: | |
| session.session_data = {} | |
| session.session_data["session_state"] = session_state | |
| return session_state | |
| def _get_workflow_data(self) -> Dict[str, Any]: | |
| workflow_data: Dict[str, Any] = { | |
| "workflow_id": self.id, | |
| "name": self.name, | |
| } | |
| if self.steps and not callable(self.steps): | |
| steps_dict = [] | |
| for step in self.steps: # type: ignore | |
| if callable(step): | |
| step_type = StepType.STEP | |
| elif isinstance(step, Agent) or isinstance(step, Team): | |
| step_type = StepType.STEP | |
| else: | |
| step_type = STEP_TYPE_MAPPING[type(step)] | |
| step_dict = { | |
| "name": step.name if hasattr(step, "name") else step.__name__, # type: ignore | |
| "description": step.description if hasattr(step, "description") else "User-defined callable step", | |
| "type": step_type.value, | |
| } | |
| steps_dict.append(step_dict) | |
| workflow_data["steps"] = steps_dict | |
| elif callable(self.steps): | |
| workflow_data["steps"] = [ | |
| { | |
| "name": "Custom Function", | |
| "description": "User-defined callable workflow", | |
| "type": "Callable", | |
| } | |
| ] | |
| return workflow_data | |
| def _handle_event( | |
| self, | |
| event: "WorkflowRunOutputEvent", | |
| workflow_run_response: WorkflowRunOutput, | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| ) -> "WorkflowRunOutputEvent": | |
| """Handle workflow events for storage - similar to Team._handle_event""" | |
| if self.store_events: | |
| # Check if this event type should be skipped | |
| if self.events_to_skip: | |
| event_type = event.event | |
| for skip_event in self.events_to_skip: | |
| if isinstance(skip_event, str): | |
| if event_type == skip_event: | |
| return event | |
| else: | |
| # It's a WorkflowRunEvent enum | |
| if event_type == skip_event.value: | |
| return event | |
| # Store the event | |
| if workflow_run_response.events is None: | |
| workflow_run_response.events = [] | |
| workflow_run_response.events.append(event) | |
| # Broadcast to WebSocket if available (async context only) | |
| if websocket_handler: | |
| import asyncio | |
| try: | |
| loop = asyncio.get_running_loop() | |
| if loop: | |
| asyncio.create_task(websocket_handler.handle_event(event)) | |
| except RuntimeError: | |
| pass | |
| return event | |
| def _enrich_event_with_workflow_context( | |
| self, | |
| event: Any, | |
| workflow_run_response: WorkflowRunOutput, | |
| step_index: Optional[Union[int, tuple]] = None, | |
| step: Optional[Any] = None, | |
| ) -> Any: | |
| """Enrich any event with workflow context information for frontend tracking""" | |
| step_id = getattr(step, "step_id", None) if step else None | |
| step_name = getattr(step, "name", None) if step else None | |
| if hasattr(event, "workflow_id"): | |
| event.workflow_id = workflow_run_response.workflow_id | |
| if hasattr(event, "workflow_run_id"): | |
| event.workflow_run_id = workflow_run_response.run_id | |
| if hasattr(event, "step_id") and step_id: | |
| event.step_id = step_id | |
| if hasattr(event, "step_name") and step_name is not None: | |
| if event.step_name is None: | |
| event.step_name = step_name | |
| # Only set step_index if it's not already set (preserve parallel.py's tuples) | |
| if hasattr(event, "step_index") and step_index is not None: | |
| if event.step_index is None: | |
| event.step_index = step_index | |
| return event | |
| def _transform_step_output_to_event( | |
| self, step_output: StepOutput, workflow_run_response: WorkflowRunOutput, step_index: Optional[int] = None | |
| ) -> StepOutputEvent: | |
| """Transform a StepOutput object into a StepOutputEvent for consistent streaming interface""" | |
| return StepOutputEvent( | |
| step_output=step_output, | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| step_name=step_output.step_name, | |
| step_index=step_index, | |
| ) | |
| def _set_debug(self) -> None: | |
| """Set debug mode and configure logging""" | |
| if self.debug_mode or getenv("AGNO_DEBUG", "false").lower() == "true": | |
| use_workflow_logger() | |
| self.debug_mode = True | |
| set_log_level_to_debug(source_type="workflow") | |
| # Propagate to steps - only if steps is iterable (not callable) | |
| if self.steps and not callable(self.steps): | |
| if isinstance(self.steps, Steps): | |
| steps_to_iterate = self.steps.steps | |
| else: | |
| steps_to_iterate = self.steps | |
| for step in steps_to_iterate: | |
| self._propagate_debug_to_step(step) | |
| else: | |
| set_log_level_to_info(source_type="workflow") | |
| def _set_telemetry(self) -> None: | |
| """Override telemetry settings based on environment variables.""" | |
| telemetry_env = getenv("AGNO_TELEMETRY") | |
| if telemetry_env is not None: | |
| self.telemetry = telemetry_env.lower() == "true" | |
| def _propagate_debug_to_step(self, step): | |
| """Recursively propagate debug mode to steps and nested primitives""" | |
| # Handle direct Step objects | |
| if hasattr(step, "active_executor") and step.active_executor: | |
| executor = step.active_executor | |
| if hasattr(executor, "debug_mode"): | |
| executor.debug_mode = True | |
| # If it's a team, propagate to all members | |
| if hasattr(executor, "members"): | |
| for member in executor.members: | |
| if hasattr(member, "debug_mode"): | |
| member.debug_mode = True | |
| # Handle nested primitives - check both 'steps' and 'choices' attributes | |
| for attr_name in ["steps", "choices"]: | |
| if hasattr(step, attr_name): | |
| attr_value = getattr(step, attr_name) | |
| if attr_value and isinstance(attr_value, list): | |
| for nested_step in attr_value: | |
| self._propagate_debug_to_step(nested_step) | |
| def _create_step_input( | |
| self, | |
| execution_input: WorkflowExecutionInput, | |
| previous_step_outputs: Optional[Dict[str, StepOutput]] = None, | |
| shared_images: Optional[List[Image]] = None, | |
| shared_videos: Optional[List[Video]] = None, | |
| shared_audio: Optional[List[Audio]] = None, | |
| shared_files: Optional[List[File]] = None, | |
| ) -> StepInput: | |
| """Helper method to create StepInput with enhanced data flow support""" | |
| previous_step_content = None | |
| if previous_step_outputs: | |
| last_output = list(previous_step_outputs.values())[-1] | |
| previous_step_content = last_output.content if last_output else None | |
| log_debug(f"Using previous step content from: {list(previous_step_outputs.keys())[-1]}") | |
| return StepInput( | |
| input=execution_input.input, | |
| previous_step_content=previous_step_content, | |
| previous_step_outputs=previous_step_outputs, | |
| additional_data=execution_input.additional_data, | |
| images=shared_images or [], | |
| videos=shared_videos or [], | |
| audio=shared_audio or [], | |
| files=shared_files or [], | |
| ) | |
| def _get_step_count(self) -> int: | |
| """Get the number of steps in the workflow""" | |
| if self.steps is None: | |
| return 0 | |
| elif callable(self.steps): | |
| return 1 # Callable function counts as 1 step | |
| else: | |
| # Handle Steps wrapper | |
| if isinstance(self.steps, Steps): | |
| return len(self.steps.steps) | |
| else: | |
| return len(self.steps) | |
| def _aggregate_workflow_metrics(self, step_results: List[Union[StepOutput, List[StepOutput]]]) -> WorkflowMetrics: | |
| """Aggregate metrics from all step responses into structured workflow metrics""" | |
| steps_dict = {} | |
| def process_step_output(step_output: StepOutput): | |
| """Process a single step output for metrics""" | |
| # If this step has nested steps, process them recursively | |
| if hasattr(step_output, "steps") and step_output.steps: | |
| for nested_step in step_output.steps: | |
| process_step_output(nested_step) | |
| # Only collect metrics from steps that actually have metrics (actual agents/teams) | |
| if ( | |
| step_output.step_name and step_output.metrics and step_output.executor_type in ["agent", "team"] | |
| ): # Only include actual executors | |
| step_metrics = StepMetrics( | |
| step_name=step_output.step_name, | |
| executor_type=step_output.executor_type or "unknown", | |
| executor_name=step_output.executor_name or "unknown", | |
| metrics=step_output.metrics, | |
| ) | |
| steps_dict[step_output.step_name] = step_metrics | |
| # Process all step results | |
| for step_result in step_results: | |
| process_step_output(cast(StepOutput, step_result)) | |
| return WorkflowMetrics( | |
| steps=steps_dict, | |
| ) | |
| def _call_custom_function(self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any) -> Any: | |
| """Call custom function with only the parameters it expects""" | |
| from inspect import signature | |
| sig = signature(func) | |
| # Build arguments based on what the function actually accepts | |
| call_kwargs: Dict[str, Any] = {} | |
| # Only add workflow and execution_input if the function expects them | |
| if "workflow" in sig.parameters: # type: ignore | |
| call_kwargs["workflow"] = self | |
| if "execution_input" in sig.parameters: | |
| call_kwargs["execution_input"] = execution_input # type: ignore | |
| if "session_state" in sig.parameters: | |
| call_kwargs["session_state"] = self.session_state # type: ignore | |
| # Add any other kwargs that the function expects | |
| for param_name in kwargs: | |
| if param_name in sig.parameters: # type: ignore | |
| call_kwargs[param_name] = kwargs[param_name] | |
| # If function has **kwargs parameter, pass all remaining kwargs | |
| for param in sig.parameters.values(): # type: ignore | |
| if param.kind == param.VAR_KEYWORD: | |
| call_kwargs.update(kwargs) | |
| break | |
| try: | |
| return func(**call_kwargs) | |
| except TypeError as e: | |
| # If signature inspection fails, fall back to original method | |
| logger.error(f"Function signature inspection failed: {e}. Falling back to original calling convention.") | |
| return func(**kwargs) | |
| def _accumulate_partial_step_data( | |
| self, event: Union[RunContentEvent, TeamRunContentEvent], partial_step_content: str | |
| ) -> str: | |
| """Accumulate partial step data from streaming events""" | |
| if isinstance(event, (RunContentEvent, TeamRunContentEvent)) and event.content: | |
| if isinstance(event.content, str): | |
| partial_step_content += event.content | |
| return partial_step_content | |
| def _execute( | |
| self, | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| workflow_run_response: WorkflowRunOutput, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| **kwargs: Any, | |
| ) -> WorkflowRunOutput: | |
| """Execute a specific pipeline by name synchronously""" | |
| from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction | |
| workflow_run_response.status = RunStatus.running | |
| if workflow_run_response.run_id: | |
| register_run(workflow_run_response.run_id) # type: ignore | |
| if callable(self.steps): | |
| if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps): | |
| raise ValueError("Cannot use async function with synchronous execution") | |
| elif isgeneratorfunction(self.steps): | |
| content = "" | |
| for chunk in self.steps(self, execution_input, **kwargs): | |
| # Check for cancellation while consuming generator | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| else: | |
| # Execute the workflow with the custom executor | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs) # type: ignore[arg-type] | |
| workflow_run_response.status = RunStatus.completed | |
| else: | |
| try: | |
| # Track outputs from each step for enhanced data flow | |
| collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = [] | |
| previous_step_outputs: Dict[str, StepOutput] = {} | |
| shared_images: List[Image] = execution_input.images or [] | |
| output_images: List[Image] = (execution_input.images or []).copy() # Start with input images | |
| shared_videos: List[Video] = execution_input.videos or [] | |
| output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos | |
| shared_audio: List[Audio] = execution_input.audio or [] | |
| output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio | |
| shared_files: List[File] = execution_input.files or [] | |
| output_files: List[File] = (execution_input.files or []).copy() # Start with input files | |
| for i, step in enumerate(self.steps): # type: ignore[arg-type] | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| step_name = getattr(step, "name", f"step_{i + 1}") | |
| log_debug(f"Executing step {i + 1}/{self._get_step_count()}: {step_name}") | |
| # Create enhanced StepInput | |
| step_input = self._create_step_input( | |
| execution_input=execution_input, | |
| previous_step_outputs=previous_step_outputs, | |
| shared_images=shared_images, | |
| shared_videos=shared_videos, | |
| shared_audio=shared_audio, | |
| shared_files=shared_files, | |
| ) | |
| # Check for can cellation before executing step | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| step_output = step.execute( # type: ignore[union-attr] | |
| step_input, | |
| session_id=session.session_id, | |
| user_id=self.user_id, | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| store_executor_outputs=self.store_executor_outputs, | |
| workflow_session=session, | |
| add_workflow_history_to_steps=self.add_workflow_history_to_steps | |
| if self.add_workflow_history_to_steps | |
| else None, | |
| num_history_runs=self.num_history_runs, | |
| ) | |
| # Check for cancellation after step execution | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| # Update the workflow-level previous_step_outputs dictionary | |
| previous_step_outputs[step_name] = step_output | |
| collected_step_outputs.append(step_output) | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| if step_output.stop: | |
| logger.info(f"Early termination requested by step {step_name}") | |
| break | |
| # Update the workflow_run_response with completion data | |
| if collected_step_outputs: | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| last_output = cast(StepOutput, collected_step_outputs[-1]) | |
| # Use deepest nested content if this is a container (Steps/Router/Loop/etc.) | |
| if getattr(last_output, "steps", None): | |
| _cur = last_output | |
| while getattr(_cur, "steps", None): | |
| _steps = _cur.steps or [] | |
| if not _steps: | |
| break | |
| _cur = _steps[-1] | |
| workflow_run_response.content = _cur.content | |
| else: | |
| workflow_run_response.content = last_output.content | |
| else: | |
| workflow_run_response.content = "No steps executed" | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.images = output_images | |
| workflow_run_response.videos = output_videos | |
| workflow_run_response.audio = output_audio | |
| workflow_run_response.status = RunStatus.completed | |
| except (InputCheckError, OutputCheckError) as e: | |
| log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}") | |
| # Store error response | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}" | |
| raise e | |
| except RunCancelledException as e: | |
| logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled") | |
| workflow_run_response.status = RunStatus.cancelled | |
| workflow_run_response.content = str(e) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| logger.error(f"Workflow execution failed: {e}") | |
| # Store error response | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Workflow execution failed: {e}" | |
| raise e | |
| finally: | |
| self._update_session_metrics(session=session, workflow_run_response=workflow_run_response) | |
| session.upsert_run(run=workflow_run_response) | |
| self.save_session(session=session) | |
| # Always clean up the run tracking | |
| cleanup_run(workflow_run_response.run_id) # type: ignore | |
| # Log Workflow Telemetry | |
| if self.telemetry: | |
| self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id) | |
| return workflow_run_response | |
| def _execute_stream( | |
| self, | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| workflow_run_response: WorkflowRunOutput, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| stream_events: bool = False, | |
| **kwargs: Any, | |
| ) -> Iterator[WorkflowRunOutputEvent]: | |
| """Execute a specific pipeline by name with event streaming""" | |
| from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction | |
| workflow_run_response.status = RunStatus.running | |
| # Register run for cancellation tracking | |
| if workflow_run_response.run_id: | |
| register_run(workflow_run_response.run_id) | |
| workflow_started_event = WorkflowStartedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| ) | |
| yield self._handle_event(workflow_started_event, workflow_run_response) | |
| if callable(self.steps): | |
| if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps): | |
| raise ValueError("Cannot use async function with synchronous execution") | |
| elif isgeneratorfunction(self.steps): | |
| content = "" | |
| for chunk in self._call_custom_function(self.steps, execution_input, **kwargs): # type: ignore[arg-type] | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| # Update the run_response with the content from the result | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| yield chunk | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| else: | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs) | |
| workflow_run_response.status = RunStatus.completed | |
| else: | |
| try: | |
| # Track outputs from each step for enhanced data flow | |
| collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = [] | |
| previous_step_outputs: Dict[str, StepOutput] = {} | |
| shared_images: List[Image] = execution_input.images or [] | |
| output_images: List[Image] = (execution_input.images or []).copy() # Start with input images | |
| shared_videos: List[Video] = execution_input.videos or [] | |
| output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos | |
| shared_audio: List[Audio] = execution_input.audio or [] | |
| output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio | |
| shared_files: List[File] = execution_input.files or [] | |
| output_files: List[File] = (execution_input.files or []).copy() # Start with input files | |
| early_termination = False | |
| # Track partial step data in case of cancellation | |
| current_step_name = "" | |
| current_step = None | |
| partial_step_content = "" | |
| for i, step in enumerate(self.steps): # type: ignore[arg-type] | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| step_name = getattr(step, "name", f"step_{i + 1}") | |
| log_debug(f"Streaming step {i + 1}/{self._get_step_count()}: {step_name}") | |
| # Track current step for cancellation handler | |
| current_step_name = step_name | |
| current_step = step | |
| # Reset partial data for this step | |
| partial_step_content = "" | |
| # Create enhanced StepInput | |
| step_input = self._create_step_input( | |
| execution_input=execution_input, | |
| previous_step_outputs=previous_step_outputs, | |
| shared_images=shared_images, | |
| shared_videos=shared_videos, | |
| shared_audio=shared_audio, | |
| shared_files=shared_files, | |
| ) | |
| # Execute step with streaming and yield all events | |
| for event in step.execute_stream( # type: ignore[union-attr] | |
| step_input, | |
| session_id=session.session_id, | |
| user_id=self.user_id, | |
| stream_events=stream_events, | |
| stream_executor_events=self.stream_executor_events, | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| step_index=i, | |
| store_executor_outputs=self.store_executor_outputs, | |
| workflow_session=session, | |
| add_workflow_history_to_steps=self.add_workflow_history_to_steps | |
| if self.add_workflow_history_to_steps | |
| else None, | |
| num_history_runs=self.num_history_runs, | |
| ): | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| # Accumulate partial data from streaming events | |
| partial_step_content = self._accumulate_partial_step_data(event, partial_step_content) # type: ignore | |
| # Handle events | |
| if isinstance(event, StepOutput): | |
| step_output = event | |
| collected_step_outputs.append(step_output) | |
| # Update the workflow-level previous_step_outputs dictionary | |
| previous_step_outputs[step_name] = step_output | |
| # Transform StepOutput to StepOutputEvent for consistent streaming interface | |
| step_output_event = self._transform_step_output_to_event( | |
| step_output, workflow_run_response, step_index=i | |
| ) | |
| if step_output.stop: | |
| logger.info(f"Early termination requested by step {step_name}") | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| # Only yield StepOutputEvent for function executors, not for agents/teams | |
| if getattr(step, "executor_type", None) == "function": | |
| yield step_output_event | |
| # Break out of the step loop | |
| early_termination = True | |
| break | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| # Only yield StepOutputEvent for generator functions, not for agents/teams | |
| if getattr(step, "executor_type", None) == "function": | |
| yield step_output_event | |
| elif isinstance(event, WorkflowRunOutputEvent): # type: ignore | |
| # Enrich event with workflow context before yielding | |
| enriched_event = self._enrich_event_with_workflow_context( | |
| event, workflow_run_response, step_index=i, step=step | |
| ) | |
| yield self._handle_event(enriched_event, workflow_run_response) # type: ignore | |
| else: | |
| # Enrich other events with workflow context before yielding | |
| enriched_event = self._enrich_event_with_workflow_context( | |
| event, workflow_run_response, step_index=i, step=step | |
| ) | |
| if self.stream_executor_events: | |
| yield self._handle_event(enriched_event, workflow_run_response) # type: ignore | |
| # Break out of main step loop if early termination was requested | |
| if "early_termination" in locals() and early_termination: | |
| break | |
| # Update the workflow_run_response with completion data | |
| if collected_step_outputs: | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| last_output = cast(StepOutput, collected_step_outputs[-1]) | |
| # Use deepest nested content if this is a container (Steps/Router/Loop/etc.) | |
| if getattr(last_output, "steps", None): | |
| _cur = last_output | |
| while getattr(_cur, "steps", None): | |
| _steps = _cur.steps or [] | |
| if not _steps: | |
| break | |
| _cur = _steps[-1] | |
| workflow_run_response.content = _cur.content | |
| else: | |
| workflow_run_response.content = last_output.content | |
| else: | |
| workflow_run_response.content = "No steps executed" | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.images = output_images | |
| workflow_run_response.videos = output_videos | |
| workflow_run_response.audio = output_audio | |
| workflow_run_response.status = RunStatus.completed | |
| except (InputCheckError, OutputCheckError) as e: | |
| log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}") | |
| from agno.run.workflow import WorkflowErrorEvent | |
| error_event = WorkflowErrorEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session.session_id, | |
| error=str(e), | |
| ) | |
| yield error_event | |
| # Update workflow_run_response with error | |
| workflow_run_response.content = error_event.error | |
| workflow_run_response.status = RunStatus.error | |
| except RunCancelledException as e: | |
| # Handle run cancellation during streaming | |
| logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming") | |
| workflow_run_response.status = RunStatus.cancelled | |
| workflow_run_response.content = str(e) | |
| # Capture partial progress from the step that was cancelled mid-stream | |
| if partial_step_content: | |
| logger.info( | |
| f"Step with name '{current_step_name}' was cancelled. Setting its partial progress as step output." | |
| ) | |
| partial_step_output = StepOutput( | |
| step_name=current_step_name, | |
| step_id=getattr(current_step, "step_id", None) if current_step else None, | |
| step_type=StepType.STEP, | |
| executor_type=getattr(current_step, "executor_type", None) if current_step else None, | |
| executor_name=getattr(current_step, "executor_name", None) if current_step else None, | |
| content=partial_step_content, | |
| success=False, | |
| error="Cancelled during execution", | |
| ) | |
| collected_step_outputs.append(partial_step_output) | |
| # Preserve all progress (completed steps + partial step) before cancellation | |
| if collected_step_outputs: | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| cancelled_event = WorkflowCancelledEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session.session_id, | |
| reason=str(e), | |
| ) | |
| yield self._handle_event(cancelled_event, workflow_run_response) | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}") | |
| from agno.run.workflow import WorkflowErrorEvent | |
| error_event = WorkflowErrorEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session.session_id, | |
| error=str(e), | |
| ) | |
| yield error_event | |
| # Update workflow_run_response with error | |
| workflow_run_response.content = error_event.error | |
| workflow_run_response.status = RunStatus.error | |
| raise e | |
| # Yield workflow completed event | |
| workflow_completed_event = WorkflowCompletedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| content=workflow_run_response.content, | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| step_results=workflow_run_response.step_results, # type: ignore | |
| metadata=workflow_run_response.metadata, | |
| ) | |
| yield self._handle_event(workflow_completed_event, workflow_run_response) | |
| # Store the completed workflow response | |
| self._update_session_metrics(session=session, workflow_run_response=workflow_run_response) | |
| session.upsert_run(run=workflow_run_response) | |
| self.save_session(session=session) | |
| # Always clean up the run tracking | |
| cleanup_run(workflow_run_response.run_id) # type: ignore | |
| # Log Workflow Telemetry | |
| if self.telemetry: | |
| self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id) | |
| async def _acall_custom_function( | |
| self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any | |
| ) -> Any: | |
| """Call custom function with only the parameters it expects - handles both async functions and async generators""" | |
| from inspect import isasyncgenfunction, signature | |
| sig = signature(func) | |
| # Build arguments based on what the function actually accepts | |
| call_kwargs: Dict[str, Any] = {} | |
| # Only add workflow and execution_input if the function expects them | |
| if "workflow" in sig.parameters: # type: ignore | |
| call_kwargs["workflow"] = self | |
| if "execution_input" in sig.parameters: | |
| call_kwargs["execution_input"] = execution_input # type: ignore | |
| if "session_state" in sig.parameters: | |
| call_kwargs["session_state"] = self.session_state # type: ignore | |
| # Add any other kwargs that the function expects | |
| for param_name in kwargs: | |
| if param_name in sig.parameters: # type: ignore | |
| call_kwargs[param_name] = kwargs[param_name] | |
| # If function has **kwargs parameter, pass all remaining kwargs | |
| for param in sig.parameters.values(): # type: ignore | |
| if param.kind == param.VAR_KEYWORD: | |
| call_kwargs.update(kwargs) | |
| break | |
| try: | |
| # Check if it's an async generator function | |
| if isasyncgenfunction(func): | |
| # For async generators, call the function and return the async generator directly | |
| return func(**call_kwargs) # type: ignore | |
| else: | |
| # For regular async functions, await the result | |
| return await func(**call_kwargs) # type: ignore | |
| except TypeError as e: | |
| # If signature inspection fails, fall back to original method | |
| logger.warning( | |
| f"Async function signature inspection failed: {e}. Falling back to original calling convention." | |
| ) | |
| if isasyncgenfunction(func): | |
| # For async generators, use the same signature inspection logic in fallback | |
| return func(**call_kwargs) # type: ignore | |
| else: | |
| # For regular async functions, use the same signature inspection logic in fallback | |
| return await func(**call_kwargs) # type: ignore | |
| async def _aload_or_create_session( | |
| self, session_id: str, user_id: Optional[str], session_state: Optional[Dict[str, Any]] | |
| ) -> Tuple[WorkflowSession, Dict[str, Any]]: | |
| """Load or create session from database, update metadata, and prepare session state. | |
| Returns: | |
| Tuple of (workflow_session, prepared_session_state) | |
| """ | |
| # Read existing session from database | |
| if self._has_async_db(): | |
| workflow_session = await self.aread_or_create_session(session_id=session_id, user_id=user_id) | |
| else: | |
| workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id) | |
| self._update_metadata(session=workflow_session) | |
| # Update session state from DB | |
| _session_state = session_state or {} | |
| _session_state = self._load_session_state(session=workflow_session, session_state=_session_state) | |
| return workflow_session, _session_state | |
| async def _aexecute( | |
| self, | |
| session_id: str, | |
| user_id: Optional[str], | |
| execution_input: WorkflowExecutionInput, | |
| workflow_run_response: WorkflowRunOutput, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| **kwargs: Any, | |
| ) -> WorkflowRunOutput: | |
| """Execute a specific pipeline by name asynchronously""" | |
| from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction | |
| # Read existing session from database | |
| workflow_session, session_state = await self._aload_or_create_session( | |
| session_id=session_id, user_id=user_id, session_state=session_state | |
| ) | |
| workflow_run_response.status = RunStatus.running | |
| # Register run for cancellation tracking | |
| if workflow_run_response.run_id: | |
| register_run(workflow_run_response.run_id) # type: ignore | |
| if callable(self.steps): | |
| # Execute the workflow with the custom executor | |
| content = "" | |
| if iscoroutinefunction(self.steps): # type: ignore | |
| workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs) | |
| elif isgeneratorfunction(self.steps): | |
| for chunk in self.steps(self, execution_input, **kwargs): # type: ignore[arg-type] | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| elif isasyncgenfunction(self.steps): # type: ignore | |
| async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs) | |
| async for chunk in async_gen: | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| else: | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs) | |
| workflow_run_response.status = RunStatus.completed | |
| else: | |
| try: | |
| # Track outputs from each step for enhanced data flow | |
| collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = [] | |
| previous_step_outputs: Dict[str, StepOutput] = {} | |
| shared_images: List[Image] = execution_input.images or [] | |
| output_images: List[Image] = (execution_input.images or []).copy() # Start with input images | |
| shared_videos: List[Video] = execution_input.videos or [] | |
| output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos | |
| shared_audio: List[Audio] = execution_input.audio or [] | |
| output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio | |
| shared_files: List[File] = execution_input.files or [] | |
| output_files: List[File] = (execution_input.files or []).copy() # Start with input files | |
| for i, step in enumerate(self.steps): # type: ignore[arg-type] | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| step_name = getattr(step, "name", f"step_{i + 1}") | |
| log_debug(f"Async Executing step {i + 1}/{self._get_step_count()}: {step_name}") | |
| # Create enhanced StepInput | |
| step_input = self._create_step_input( | |
| execution_input=execution_input, | |
| previous_step_outputs=previous_step_outputs, | |
| shared_images=shared_images, | |
| shared_videos=shared_videos, | |
| shared_audio=shared_audio, | |
| shared_files=shared_files, | |
| ) | |
| # Check for cancellation before executing step | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| step_output = await step.aexecute( # type: ignore[union-attr] | |
| step_input, | |
| session_id=session_id, | |
| user_id=self.user_id, | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| store_executor_outputs=self.store_executor_outputs, | |
| workflow_session=workflow_session, | |
| add_workflow_history_to_steps=self.add_workflow_history_to_steps | |
| if self.add_workflow_history_to_steps | |
| else None, | |
| num_history_runs=self.num_history_runs, | |
| ) | |
| # Check for cancellation after step execution | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| # Update the workflow-level previous_step_outputs dictionary | |
| previous_step_outputs[step_name] = step_output | |
| collected_step_outputs.append(step_output) | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| if step_output.stop: | |
| logger.info(f"Early termination requested by step {step_name}") | |
| break | |
| # Update the workflow_run_response with completion data | |
| if collected_step_outputs: | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| last_output = cast(StepOutput, collected_step_outputs[-1]) | |
| # Use deepest nested content if this is a container (Steps/Router/Loop/etc.) | |
| if getattr(last_output, "steps", None): | |
| _cur = last_output | |
| while getattr(_cur, "steps", None): | |
| _steps = _cur.steps or [] | |
| if not _steps: | |
| break | |
| _cur = _steps[-1] | |
| workflow_run_response.content = _cur.content | |
| else: | |
| workflow_run_response.content = last_output.content | |
| else: | |
| workflow_run_response.content = "No steps executed" | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.images = output_images | |
| workflow_run_response.videos = output_videos | |
| workflow_run_response.audio = output_audio | |
| workflow_run_response.status = RunStatus.completed | |
| except (InputCheckError, OutputCheckError) as e: | |
| log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}") | |
| # Store error response | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}" | |
| raise e | |
| except RunCancelledException as e: | |
| logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled") | |
| workflow_run_response.status = RunStatus.cancelled | |
| workflow_run_response.content = str(e) | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}") | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Workflow execution failed: {e}" | |
| raise e | |
| self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response) | |
| workflow_session.upsert_run(run=workflow_run_response) | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Always clean up the run tracking | |
| cleanup_run(workflow_run_response.run_id) # type: ignore | |
| # Log Workflow Telemetry | |
| if self.telemetry: | |
| await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id) | |
| return workflow_run_response | |
| async def _aexecute_stream( | |
| self, | |
| session_id: str, | |
| user_id: Optional[str], | |
| execution_input: WorkflowExecutionInput, | |
| workflow_run_response: WorkflowRunOutput, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| stream_events: bool = False, | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| **kwargs: Any, | |
| ) -> AsyncIterator[WorkflowRunOutputEvent]: | |
| """Execute a specific pipeline by name with event streaming""" | |
| from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction | |
| # Read existing session from database | |
| workflow_session, session_state = await self._aload_or_create_session( | |
| session_id=session_id, user_id=user_id, session_state=session_state | |
| ) | |
| workflow_run_response.status = RunStatus.running | |
| # Register run for cancellation tracking | |
| if workflow_run_response.run_id: | |
| register_run(workflow_run_response.run_id) | |
| workflow_started_event = WorkflowStartedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| ) | |
| yield self._handle_event(workflow_started_event, workflow_run_response, websocket_handler=websocket_handler) | |
| if callable(self.steps): | |
| if iscoroutinefunction(self.steps): # type: ignore | |
| workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs) | |
| elif isgeneratorfunction(self.steps): | |
| content = "" | |
| for chunk in self.steps(self, execution_input, **kwargs): # type: ignore[arg-type] | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| yield chunk | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| elif isasyncgenfunction(self.steps): # type: ignore | |
| content = "" | |
| async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs) | |
| async for chunk in async_gen: | |
| raise_if_cancelled(workflow_run_response.run_id) # type: ignore | |
| if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str): | |
| content += chunk.content | |
| yield chunk | |
| else: | |
| content += str(chunk) | |
| workflow_run_response.content = content | |
| else: | |
| workflow_run_response.content = self.steps(self, execution_input, **kwargs) | |
| workflow_run_response.status = RunStatus.completed | |
| else: | |
| try: | |
| # Track outputs from each step for enhanced data flow | |
| collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = [] | |
| previous_step_outputs: Dict[str, StepOutput] = {} | |
| shared_images: List[Image] = execution_input.images or [] | |
| output_images: List[Image] = (execution_input.images or []).copy() # Start with input images | |
| shared_videos: List[Video] = execution_input.videos or [] | |
| output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos | |
| shared_audio: List[Audio] = execution_input.audio or [] | |
| output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio | |
| shared_files: List[File] = execution_input.files or [] | |
| output_files: List[File] = (execution_input.files or []).copy() # Start with input files | |
| early_termination = False | |
| # Track partial step data in case of cancellation | |
| current_step_name = "" | |
| current_step = None | |
| partial_step_content = "" | |
| for i, step in enumerate(self.steps): # type: ignore[arg-type] | |
| if workflow_run_response.run_id: | |
| raise_if_cancelled(workflow_run_response.run_id) | |
| step_name = getattr(step, "name", f"step_{i + 1}") | |
| log_debug(f"Async streaming step {i + 1}/{self._get_step_count()}: {step_name}") | |
| current_step_name = step_name | |
| current_step = step | |
| # Reset partial data for this step | |
| partial_step_content = "" | |
| # Create enhanced StepInput | |
| step_input = self._create_step_input( | |
| execution_input=execution_input, | |
| previous_step_outputs=previous_step_outputs, | |
| shared_images=shared_images, | |
| shared_videos=shared_videos, | |
| shared_audio=shared_audio, | |
| shared_files=shared_files, | |
| ) | |
| # Execute step with streaming and yield all events | |
| async for event in step.aexecute_stream( # type: ignore[union-attr] | |
| step_input, | |
| session_id=session_id, | |
| user_id=self.user_id, | |
| stream_events=stream_events, | |
| stream_executor_events=self.stream_executor_events, | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| step_index=i, | |
| store_executor_outputs=self.store_executor_outputs, | |
| workflow_session=workflow_session, | |
| add_workflow_history_to_steps=self.add_workflow_history_to_steps | |
| if self.add_workflow_history_to_steps | |
| else None, | |
| num_history_runs=self.num_history_runs, | |
| ): | |
| if workflow_run_response.run_id: | |
| raise_if_cancelled(workflow_run_response.run_id) | |
| # Accumulate partial data from streaming events | |
| partial_step_content = self._accumulate_partial_step_data(event, partial_step_content) # type: ignore | |
| if isinstance(event, StepOutput): | |
| step_output = event | |
| collected_step_outputs.append(step_output) | |
| # Update the workflow-level previous_step_outputs dictionary | |
| previous_step_outputs[step_name] = step_output | |
| # Transform StepOutput to StepOutputEvent for consistent streaming interface | |
| step_output_event = self._transform_step_output_to_event( | |
| step_output, workflow_run_response, step_index=i | |
| ) | |
| if step_output.stop: | |
| logger.info(f"Early termination requested by step {step_name}") | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| if getattr(step, "executor_type", None) == "function": | |
| yield step_output_event | |
| # Break out of the step loop | |
| early_termination = True | |
| break | |
| # Update shared media for next step | |
| shared_images.extend(step_output.images or []) | |
| shared_videos.extend(step_output.videos or []) | |
| shared_audio.extend(step_output.audio or []) | |
| shared_files.extend(step_output.files or []) | |
| output_images.extend(step_output.images or []) | |
| output_videos.extend(step_output.videos or []) | |
| output_audio.extend(step_output.audio or []) | |
| output_files.extend(step_output.files or []) | |
| # Only yield StepOutputEvent for generator functions, not for agents/teams | |
| if getattr(step, "executor_type", None) == "function": | |
| yield step_output_event | |
| elif isinstance(event, WorkflowRunOutputEvent): # type: ignore | |
| # Enrich event with workflow context before yielding | |
| enriched_event = self._enrich_event_with_workflow_context( | |
| event, workflow_run_response, step_index=i, step=step | |
| ) | |
| yield self._handle_event( | |
| enriched_event, workflow_run_response, websocket_handler=websocket_handler | |
| ) # type: ignore | |
| else: | |
| # Enrich other events with workflow context before yielding | |
| enriched_event = self._enrich_event_with_workflow_context( | |
| event, workflow_run_response, step_index=i, step=step | |
| ) | |
| if self.stream_executor_events: | |
| yield self._handle_event( | |
| enriched_event, workflow_run_response, websocket_handler=websocket_handler | |
| ) # type: ignore | |
| # Break out of main step loop if early termination was requested | |
| if "early_termination" in locals() and early_termination: | |
| break | |
| # Update the workflow_run_response with completion data | |
| if collected_step_outputs: | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| last_output = cast(StepOutput, collected_step_outputs[-1]) | |
| # Use deepest nested content if this is a container (Steps/Router/Loop/etc.) | |
| if getattr(last_output, "steps", None): | |
| _cur = last_output | |
| while getattr(_cur, "steps", None): | |
| _steps = _cur.steps or [] | |
| if not _steps: | |
| break | |
| _cur = _steps[-1] | |
| workflow_run_response.content = _cur.content | |
| else: | |
| workflow_run_response.content = last_output.content | |
| else: | |
| workflow_run_response.content = "No steps executed" | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.images = output_images | |
| workflow_run_response.videos = output_videos | |
| workflow_run_response.audio = output_audio | |
| workflow_run_response.status = RunStatus.completed | |
| except (InputCheckError, OutputCheckError) as e: | |
| log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}") | |
| from agno.run.workflow import WorkflowErrorEvent | |
| error_event = WorkflowErrorEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session_id, | |
| error=str(e), | |
| ) | |
| yield error_event | |
| # Update workflow_run_response with error | |
| workflow_run_response.content = error_event.error | |
| workflow_run_response.status = RunStatus.error | |
| except RunCancelledException as e: | |
| # Handle run cancellation during streaming | |
| logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming") | |
| workflow_run_response.status = RunStatus.cancelled | |
| workflow_run_response.content = str(e) | |
| # Capture partial progress from the step that was cancelled mid-stream | |
| if partial_step_content: | |
| logger.info( | |
| f"Step with name '{current_step_name}' was cancelled. Setting its partial progress as step output." | |
| ) | |
| partial_step_output = StepOutput( | |
| step_name=current_step_name, | |
| step_id=getattr(current_step, "step_id", None) if current_step else None, | |
| step_type=StepType.STEP, | |
| executor_type=getattr(current_step, "executor_type", None) if current_step else None, | |
| executor_name=getattr(current_step, "executor_name", None) if current_step else None, | |
| content=partial_step_content, | |
| success=False, | |
| error="Cancelled during execution", | |
| ) | |
| collected_step_outputs.append(partial_step_output) | |
| # Preserve all progress (completed steps + partial step) before cancellation | |
| if collected_step_outputs: | |
| workflow_run_response.step_results = collected_step_outputs | |
| workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs) | |
| cancelled_event = WorkflowCancelledEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session_id, | |
| reason=str(e), | |
| ) | |
| yield self._handle_event( | |
| cancelled_event, | |
| workflow_run_response, | |
| websocket_handler=websocket_handler, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}") | |
| from agno.run.workflow import WorkflowErrorEvent | |
| error_event = WorkflowErrorEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| session_id=session_id, | |
| error=str(e), | |
| ) | |
| yield error_event | |
| # Update workflow_run_response with error | |
| workflow_run_response.content = error_event.error | |
| workflow_run_response.status = RunStatus.error | |
| raise e | |
| # Yield workflow completed event | |
| workflow_completed_event = WorkflowCompletedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| content=workflow_run_response.content, | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| step_results=workflow_run_response.step_results, # type: ignore[arg-type] | |
| metadata=workflow_run_response.metadata, | |
| ) | |
| yield self._handle_event(workflow_completed_event, workflow_run_response, websocket_handler=websocket_handler) | |
| # Store the completed workflow response | |
| self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response) | |
| workflow_session.upsert_run(run=workflow_run_response) | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Log Workflow Telemetry | |
| if self.telemetry: | |
| await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id) | |
| # Always clean up the run tracking | |
| cleanup_run(workflow_run_response.run_id) # type: ignore | |
| async def _arun_background( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| **kwargs: Any, | |
| ) -> WorkflowRunOutput: | |
| """Execute workflow in background using asyncio.create_task()""" | |
| run_id = str(uuid4()) | |
| self.initialize_workflow() | |
| session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id) | |
| # Read existing session from database | |
| workflow_session, session_state = await self._aload_or_create_session( | |
| session_id=session_id, user_id=user_id, session_state=session_state | |
| ) | |
| self._prepare_steps() | |
| # Create workflow run response with PENDING status | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=input, | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| status=RunStatus.pending, | |
| ) | |
| # Store PENDING response immediately | |
| workflow_session.upsert_run(run=workflow_run_response) | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Prepare execution input | |
| inputs = WorkflowExecutionInput( | |
| input=input, | |
| additional_data=additional_data, | |
| audio=audio, # type: ignore | |
| images=images, # type: ignore | |
| videos=videos, # type: ignore | |
| files=files, # type: ignore | |
| ) | |
| self.update_agents_and_teams_session_info() | |
| async def execute_workflow_background(): | |
| """Simple background execution""" | |
| try: | |
| # Update status to RUNNING and save | |
| workflow_run_response.status = RunStatus.running | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| if self.agent is not None: | |
| self._aexecute_workflow_agent( | |
| user_input=input, # type: ignore | |
| session_id=session_id, | |
| user_id=user_id, | |
| execution_input=inputs, | |
| session_state=session_state, | |
| stream=False, | |
| **kwargs, | |
| ) | |
| else: | |
| await self._aexecute( | |
| session_id=session_id, | |
| user_id=user_id, | |
| execution_input=inputs, | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| **kwargs, | |
| ) | |
| log_debug(f"Background execution completed with status: {workflow_run_response.status}") | |
| except Exception as e: | |
| logger.error(f"Background workflow execution failed: {e}") | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Background execution failed: {str(e)}" | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Create and start asyncio task | |
| loop = asyncio.get_running_loop() | |
| loop.create_task(execute_workflow_background()) | |
| # Return SAME object that will be updated by background execution | |
| return workflow_run_response | |
| async def _arun_background_stream( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream_events: bool = False, | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| **kwargs: Any, | |
| ) -> WorkflowRunOutput: | |
| """Execute workflow in background with streaming and WebSocket broadcasting""" | |
| run_id = str(uuid4()) | |
| self.initialize_workflow() | |
| session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id) | |
| # Read existing session from database | |
| workflow_session, session_state = await self._aload_or_create_session( | |
| session_id=session_id, user_id=user_id, session_state=session_state | |
| ) | |
| self._prepare_steps() | |
| # Create workflow run response with PENDING status | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=input, | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| status=RunStatus.pending, | |
| ) | |
| # Prepare execution input | |
| inputs = WorkflowExecutionInput( | |
| input=input, | |
| additional_data=additional_data, | |
| audio=audio, # type: ignore | |
| images=images, # type: ignore | |
| videos=videos, # type: ignore | |
| files=files, # type: ignore | |
| ) | |
| self.update_agents_and_teams_session_info() | |
| async def execute_workflow_background_stream(): | |
| """Background execution with streaming and WebSocket broadcasting""" | |
| try: | |
| if self.agent is not None: | |
| result = self._aexecute_workflow_agent( | |
| user_input=input, # type: ignore | |
| session_id=session_id, | |
| user_id=user_id, | |
| execution_input=inputs, | |
| session_state=session_state, | |
| stream=True, | |
| websocket_handler=websocket_handler, | |
| **kwargs, | |
| ) | |
| # For streaming, result is an async iterator | |
| async for event in result: # type: ignore | |
| # Events are automatically broadcast by _handle_event in the agent execution | |
| # We just consume them here to drive the execution | |
| pass | |
| log_debug( | |
| f"Background streaming execution (workflow agent) completed with status: {workflow_run_response.status}" | |
| ) | |
| else: | |
| # Update status to RUNNING and save | |
| workflow_run_response.status = RunStatus.running | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Execute with streaming - consume all events (they're auto-broadcast via _handle_event) | |
| async for event in self._aexecute_stream( | |
| session_id=session_id, | |
| user_id=user_id, | |
| execution_input=inputs, | |
| workflow_run_response=workflow_run_response, | |
| stream_events=stream_events, | |
| session_state=session_state, | |
| websocket_handler=websocket_handler, | |
| **kwargs, | |
| ): | |
| # Events are automatically broadcast by _handle_event | |
| # We just consume them here to drive the execution | |
| pass | |
| log_debug(f"Background streaming execution completed with status: {workflow_run_response.status}") | |
| except Exception as e: | |
| logger.error(f"Background streaming workflow execution failed: {e}") | |
| workflow_run_response.status = RunStatus.error | |
| workflow_run_response.content = f"Background streaming execution failed: {str(e)}" | |
| if self._has_async_db(): | |
| await self.asave_session(session=workflow_session) | |
| else: | |
| self.save_session(session=workflow_session) | |
| # Create and start asyncio task for background streaming execution | |
| loop = asyncio.get_running_loop() | |
| loop.create_task(execute_workflow_background_stream()) | |
| # Return SAME object that will be updated by background execution | |
| return workflow_run_response | |
| async def aget_run(self, run_id: str) -> Optional[WorkflowRunOutput]: | |
| """Get the status and details of a background workflow run - SIMPLIFIED""" | |
| if self.db is not None and self.session_id is not None: | |
| session = await self.db.aget_session(session_id=self.session_id, session_type=SessionType.WORKFLOW) # type: ignore | |
| if session and isinstance(session, WorkflowSession) and session.runs: | |
| # Find the run by ID | |
| for run in session.runs: | |
| if run.run_id == run_id: | |
| return run | |
| return None | |
| def get_run(self, run_id: str) -> Optional[WorkflowRunOutput]: | |
| """Get the status and details of a background workflow run - SIMPLIFIED""" | |
| if self.db is not None and self.session_id is not None: | |
| session = self.db.get_session(session_id=self.session_id, session_type=SessionType.WORKFLOW) | |
| if session and isinstance(session, WorkflowSession) and session.runs: | |
| # Find the run by ID | |
| for run in session.runs: | |
| if run.run_id == run_id: | |
| return run | |
| return None | |
| def _initialize_workflow_agent( | |
| self, | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| ) -> None: | |
| """Initialize the workflow agent with tools (but NOT context - that's passed per-run)""" | |
| from agno.tools.function import Function | |
| workflow_tool_func = self.agent.create_workflow_tool( # type: ignore | |
| workflow=self, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state, | |
| stream=stream, | |
| ) | |
| workflow_tool = Function.from_callable(workflow_tool_func) | |
| self.agent.tools = [workflow_tool] # type: ignore | |
| log_debug("Workflow agent initialized with run_workflow tool") | |
| def _get_workflow_agent_dependencies(self, session: WorkflowSession) -> Dict[str, Any]: | |
| """Build dependencies dict with workflow context to pass to agent.run()""" | |
| # Get configuration from the WorkflowAgent instance | |
| add_history = True | |
| num_runs = 5 | |
| if self.agent and isinstance(self.agent, WorkflowAgent): | |
| add_history = self.agent.add_workflow_history | |
| num_runs = self.agent.num_history_runs | |
| if add_history: | |
| history_context = ( | |
| session.get_workflow_history_context(num_runs=num_runs) or "No previous workflow runs in this session." | |
| ) | |
| else: | |
| history_context = "No workflow history available." | |
| # Build workflow context with description and history | |
| workflow_context = "" | |
| if self.description: | |
| workflow_context += f"Workflow Description: {self.description}\n\n" | |
| workflow_context += history_context | |
| return { | |
| "workflow_context": workflow_context, | |
| } | |
| def _execute_workflow_agent( | |
| self, | |
| user_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| **kwargs: Any, | |
| ) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]: | |
| """ | |
| Execute the workflow agent in streaming or non-streaming mode. | |
| The agent decides whether to run the workflow or answer directly from history. | |
| Args: | |
| user_input: The user's input | |
| session: The workflow session | |
| execution_input: The execution input | |
| session_state: The session state | |
| stream: Whether to stream the response | |
| stream_intermediate_steps: Whether to stream intermediate steps | |
| Returns: | |
| WorkflowRunOutput if stream=False, Iterator[WorkflowRunOutputEvent] if stream=True | |
| """ | |
| if stream: | |
| return self._execute_workflow_agent_streaming( | |
| agent_input=user_input, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state, | |
| stream=stream, | |
| **kwargs, | |
| ) | |
| else: | |
| return self._execute_workflow_agent_non_streaming( | |
| agent_input=user_input, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state, | |
| stream=stream, | |
| ) | |
| def _execute_workflow_agent_streaming( | |
| self, | |
| agent_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| **kwargs: Any, | |
| ) -> Iterator[WorkflowRunOutputEvent]: | |
| """ | |
| Execute the workflow agent in streaming mode. | |
| The agent's tool (run_workflow) is a generator that yields workflow events directly. | |
| These events bubble up through the agent's streaming and are yielded here. | |
| We filter to only yield WorkflowRunOutputEvent to the CLI. | |
| Yields: | |
| WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered) | |
| """ | |
| from typing import get_args | |
| from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent | |
| # Initialize agent with stream_intermediate_steps=True so tool yields events | |
| self._initialize_workflow_agent(session, execution_input, session_state, stream=stream) | |
| # Build dependencies with workflow context | |
| dependencies = self._get_workflow_agent_dependencies(session) | |
| # Run agent with streaming - workflow events will bubble up from the tool | |
| agent_response: Optional[RunOutput] = None | |
| workflow_executed = False | |
| from agno.run.agent import RunContentEvent | |
| from agno.run.team import RunContentEvent as TeamRunContentEvent | |
| log_debug(f"Executing workflow agent with streaming - input: {agent_input}...") | |
| # Run the agent in streaming mode and yield all events | |
| for event in self.agent.run( # type: ignore[union-attr] | |
| input=agent_input, | |
| stream=True, | |
| stream_intermediate_steps=True, | |
| yield_run_response=True, | |
| dependencies=dependencies, # Pass context dynamically per-run | |
| ): # type: ignore | |
| if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))): | |
| yield event # type: ignore[misc] | |
| # Track if workflow was executed by checking for WorkflowCompletedEvent | |
| if isinstance(event, WorkflowCompletedEvent): | |
| workflow_executed = True | |
| elif isinstance(event, (RunContentEvent, TeamRunContentEvent)): | |
| if event.step_name is None: | |
| # This is from the workflow agent itself | |
| # Enrich with metadata to mark it as a workflow agent event | |
| event.is_workflow_agent = True # type: ignore | |
| yield event # type: ignore[misc] | |
| # Capture the final RunOutput (but don't yield it) | |
| if isinstance(event, RunOutput): | |
| agent_response = event | |
| # Handle direct answer case (no workflow execution) | |
| if not workflow_executed: | |
| # Create a new workflow run output for the direct answer | |
| run_id = str(uuid4()) | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content=agent_response.content if agent_response else "", | |
| status=RunStatus.completed, | |
| workflow_agent_run=agent_response, | |
| ) | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = workflow_run_response.run_id | |
| agent_response.workflow_id = workflow_run_response.workflow_id | |
| # Update the run in session | |
| session.upsert_run(run=workflow_run_response) | |
| # Save session | |
| self.save_session(session=session) | |
| log_debug(f"Agent decision: workflow_executed={workflow_executed}") | |
| # Yield a workflow completed event with the agent's direct response | |
| completed_event = WorkflowCompletedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| content=workflow_run_response.content, | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| step_results=[], | |
| metadata={"agent_direct_response": True}, | |
| ) | |
| yield completed_event | |
| else: | |
| # Workflow was executed by the tool | |
| reloaded_session = self.get_session(session_id=session.session_id) | |
| if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0: | |
| # Get the last run (which is the one just created by the tool) | |
| last_run = reloaded_session.runs[-1] | |
| # Update the last run with workflow_agent_run | |
| last_run.workflow_agent_run = agent_response | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = last_run.run_id | |
| agent_response.workflow_id = last_run.workflow_id | |
| # Save the reloaded session (which has the updated run) | |
| self.save_session(session=reloaded_session) | |
| else: | |
| log_warning("Could not reload session or no runs found after workflow execution") | |
| def _execute_workflow_agent_non_streaming( | |
| self, | |
| agent_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| ) -> WorkflowRunOutput: | |
| """ | |
| Execute the workflow agent in non-streaming mode. | |
| The agent decides whether to run the workflow or answer directly from history. | |
| Returns: | |
| WorkflowRunOutput: The workflow run output with agent response | |
| """ | |
| # Initialize the agent | |
| self._initialize_workflow_agent(session, execution_input, session_state, stream=stream) | |
| # Build dependencies with workflow context | |
| dependencies = self._get_workflow_agent_dependencies(session) | |
| # Run the agent | |
| agent_response: RunOutput = self.agent.run( # type: ignore[union-attr] | |
| input=agent_input, | |
| dependencies=dependencies, | |
| stream=stream, | |
| ) # type: ignore | |
| # Check if the agent called the workflow tool | |
| workflow_executed = False | |
| if agent_response.messages: | |
| for message in agent_response.messages: | |
| if message.role == "assistant" and message.tool_calls: | |
| # Check if the tool call is specifically for run_workflow | |
| for tool_call in message.tool_calls: | |
| # Handle both dict and object formats | |
| if isinstance(tool_call, dict): | |
| tool_name = tool_call.get("function", {}).get("name", "") | |
| else: | |
| tool_name = tool_call.function.name if hasattr(tool_call, "function") else "" | |
| if tool_name == "run_workflow": | |
| workflow_executed = True | |
| break | |
| if workflow_executed: | |
| break | |
| log_debug(f"Workflow agent execution complete. Workflow executed: {workflow_executed}") | |
| # Handle direct answer case (no workflow execution) | |
| if not workflow_executed: | |
| # Create a new workflow run output for the direct answer | |
| run_id = str(uuid4()) | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content=agent_response.content, | |
| status=RunStatus.completed, | |
| workflow_agent_run=agent_response, | |
| ) | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = workflow_run_response.run_id | |
| agent_response.workflow_id = workflow_run_response.workflow_id | |
| # Update the run in session | |
| session.upsert_run(run=workflow_run_response) | |
| self.save_session(session=session) | |
| log_debug(f"Agent decision: workflow_executed={workflow_executed}") | |
| return workflow_run_response | |
| else: | |
| # Workflow was executed by the tool | |
| reloaded_session = self.get_session(session_id=session.session_id) | |
| if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0: | |
| # Get the last run (which is the one just created by the tool) | |
| last_run = reloaded_session.runs[-1] | |
| # Update the last run directly with workflow_agent_run | |
| last_run.workflow_agent_run = agent_response | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = last_run.run_id | |
| agent_response.workflow_id = last_run.workflow_id | |
| # Save the reloaded session (which has the updated run) | |
| self.save_session(session=reloaded_session) | |
| # Return the last run directly (WRO2 from inner workflow) | |
| return last_run | |
| else: | |
| log_warning("Could not reload session or no runs found after workflow execution") | |
| # Return a placeholder error response | |
| return WorkflowRunOutput( | |
| run_id=str(uuid4()), | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content="Error: Workflow execution failed", | |
| status=RunStatus.error, | |
| ) | |
| def _async_initialize_workflow_agent( | |
| self, | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| stream: bool = False, | |
| ) -> None: | |
| """Initialize the workflow agent with async tools (but NOT context - that's passed per-run)""" | |
| from agno.tools.function import Function | |
| workflow_tool_func = self.agent.async_create_workflow_tool( # type: ignore | |
| workflow=self, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state, | |
| stream=stream, | |
| websocket_handler=websocket_handler, | |
| ) | |
| workflow_tool = Function.from_callable(workflow_tool_func) | |
| self.agent.tools = [workflow_tool] # type: ignore | |
| log_debug("Workflow agent initialized with async run_workflow tool") | |
| async def _aload_session_for_workflow_agent( | |
| self, | |
| session_id: str, | |
| user_id: Optional[str], | |
| session_state: Optional[Dict[str, Any]], | |
| ) -> Tuple[WorkflowSession, Dict[str, Any]]: | |
| """Helper to load or create session for workflow agent execution""" | |
| return await self._aload_or_create_session( | |
| session_id=session_id, user_id=user_id, session_state=session_state | |
| ) | |
| def _aexecute_workflow_agent( | |
| self, | |
| user_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session_id: str, | |
| user_id: Optional[str], | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| **kwargs: Any, | |
| ): | |
| """ | |
| Execute the workflow agent asynchronously in streaming or non-streaming mode. | |
| The agent decides whether to run the workflow or answer directly from history. | |
| Args: | |
| user_input: The user's input | |
| session_id: The workflow session ID | |
| user_id: The user ID | |
| execution_input: The execution input | |
| session_state: The session state | |
| stream: Whether to stream the response | |
| websocket_handler: The WebSocket handler | |
| Returns: | |
| Coroutine[WorkflowRunOutput] if stream=False, AsyncIterator[WorkflowRunOutputEvent] if stream=True | |
| """ | |
| if stream: | |
| async def _stream(): | |
| session, session_state_loaded = await self._aload_session_for_workflow_agent( | |
| session_id, user_id, session_state | |
| ) | |
| async for event in self._aexecute_workflow_agent_streaming( | |
| agent_input=user_input, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state_loaded, | |
| stream=stream, | |
| websocket_handler=websocket_handler, | |
| **kwargs, | |
| ): | |
| yield event | |
| return _stream() | |
| else: | |
| async def _execute(): | |
| session, session_state_loaded = await self._aload_session_for_workflow_agent( | |
| session_id, user_id, session_state | |
| ) | |
| return await self._aexecute_workflow_agent_non_streaming( | |
| agent_input=user_input, | |
| session=session, | |
| execution_input=execution_input, | |
| session_state=session_state_loaded, | |
| stream=stream, | |
| ) | |
| return _execute() | |
| async def _aexecute_workflow_agent_streaming( | |
| self, | |
| agent_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| websocket_handler: Optional[WebSocketHandler] = None, | |
| **kwargs: Any, | |
| ) -> AsyncIterator[WorkflowRunOutputEvent]: | |
| """ | |
| Execute the workflow agent asynchronously in streaming mode. | |
| The agent's tool (run_workflow) is an async generator that yields workflow events directly. | |
| These events bubble up through the agent's streaming and are yielded here. | |
| We filter to only yield WorkflowRunOutputEvent to the CLI. | |
| Yields: | |
| WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered) | |
| """ | |
| from typing import get_args | |
| from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent | |
| logger.info("Workflow agent enabled - async streaming mode") | |
| log_debug(f"User input: {agent_input}") | |
| self._async_initialize_workflow_agent( | |
| session, execution_input, session_state, stream=stream, websocket_handler=websocket_handler | |
| ) | |
| dependencies = self._get_workflow_agent_dependencies(session) | |
| agent_response: Optional[RunOutput] = None | |
| workflow_executed = False | |
| from agno.run.agent import RunContentEvent | |
| from agno.run.team import RunContentEvent as TeamRunContentEvent | |
| log_debug(f"Executing async workflow agent with streaming - input: {agent_input}...") | |
| # Run the agent in streaming mode and yield all events | |
| async for event in self.agent.arun( # type: ignore[union-attr] | |
| input=agent_input, | |
| stream=True, | |
| stream_intermediate_steps=True, | |
| yield_run_response=True, | |
| dependencies=dependencies, # Pass context dynamically per-run | |
| ): # type: ignore | |
| if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))): | |
| yield event # type: ignore[misc] | |
| if isinstance(event, WorkflowCompletedEvent): | |
| workflow_executed = True | |
| log_debug("Workflow execution detected via WorkflowCompletedEvent") | |
| elif isinstance(event, (RunContentEvent, TeamRunContentEvent)): | |
| if event.step_name is None and isinstance(event, (RunStartedEvent, RunContentEvent, RunCompletedEvent)): | |
| # This is from the workflow agent itself | |
| # Enrich with metadata to mark it as a workflow agent event | |
| event.is_workflow_agent = True # type: ignore | |
| # Broadcast to WebSocket if available (async context only) | |
| if websocket_handler: | |
| try: | |
| loop = asyncio.get_running_loop() | |
| if loop: | |
| asyncio.create_task(websocket_handler.handle_event(event)) | |
| except RuntimeError: | |
| pass | |
| yield event # type: ignore[misc] | |
| # Capture the final RunOutput (but don't yield it) | |
| if isinstance(event, RunOutput): | |
| agent_response = event | |
| log_debug( | |
| f"Agent response: {str(agent_response.content)[:100] if agent_response.content else 'None'}..." | |
| ) | |
| # Handle direct answer case (no workflow execution) | |
| if not workflow_executed: | |
| # Create a new workflow run output for the direct answer | |
| run_id = str(uuid4()) | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content=agent_response.content if agent_response else "", | |
| status=RunStatus.completed, | |
| workflow_agent_run=agent_response, | |
| ) | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = workflow_run_response.run_id | |
| agent_response.workflow_id = workflow_run_response.workflow_id | |
| # Update the run in session | |
| session.upsert_run(run=workflow_run_response) | |
| # Save session | |
| self.save_session(session=session) | |
| # Yield a workflow completed event with the agent's direct response | |
| completed_event = WorkflowCompletedEvent( | |
| run_id=workflow_run_response.run_id or "", | |
| content=workflow_run_response.content, | |
| workflow_name=workflow_run_response.workflow_name, | |
| workflow_id=workflow_run_response.workflow_id, | |
| session_id=workflow_run_response.session_id, | |
| step_results=[], | |
| metadata={"agent_direct_response": True}, | |
| ) | |
| yield completed_event | |
| else: | |
| # Workflow was executed by the tool | |
| reloaded_session = self.get_session(session_id=session.session_id) | |
| if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0: | |
| # Get the last run (which is the one just created by the tool) | |
| last_run = reloaded_session.runs[-1] | |
| # Update the last run with workflow_agent_run | |
| last_run.workflow_agent_run = agent_response | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = last_run.run_id | |
| agent_response.workflow_id = last_run.workflow_id | |
| # Save the reloaded session (which has the updated run) | |
| self.save_session(session=reloaded_session) | |
| else: | |
| log_warning("Could not reload session or no runs found after workflow execution") | |
| async def _aexecute_workflow_agent_non_streaming( | |
| self, | |
| agent_input: Union[str, Dict[str, Any], List[Any], BaseModel], | |
| session: WorkflowSession, | |
| execution_input: WorkflowExecutionInput, | |
| session_state: Optional[Dict[str, Any]], | |
| stream: bool = False, | |
| ) -> WorkflowRunOutput: | |
| """ | |
| Execute the workflow agent asynchronously in non-streaming mode. | |
| The agent decides whether to run the workflow or answer directly from history. | |
| Returns: | |
| WorkflowRunOutput: The workflow run output with agent response | |
| """ | |
| # Initialize the agent | |
| self._async_initialize_workflow_agent(session, execution_input, session_state, stream=stream) | |
| # Build dependencies with workflow context | |
| dependencies = self._get_workflow_agent_dependencies(session) | |
| # Run the agent | |
| agent_response: RunOutput = await self.agent.arun( # type: ignore[union-attr] | |
| input=agent_input, | |
| dependencies=dependencies, | |
| stream=stream, | |
| ) # type: ignore | |
| # Check if the agent called the workflow tool | |
| workflow_executed = False | |
| if agent_response.messages: | |
| for message in agent_response.messages: | |
| if message.role == "assistant" and message.tool_calls: | |
| # Check if the tool call is specifically for run_workflow | |
| for tool_call in message.tool_calls: | |
| # Handle both dict and object formats | |
| if isinstance(tool_call, dict): | |
| tool_name = tool_call.get("function", {}).get("name", "") | |
| else: | |
| tool_name = tool_call.function.name if hasattr(tool_call, "function") else "" | |
| if tool_name == "run_workflow": | |
| workflow_executed = True | |
| break | |
| if workflow_executed: | |
| break | |
| # Handle direct answer case (no workflow execution) | |
| if not workflow_executed: | |
| # Create a new workflow run output for the direct answer | |
| run_id = str(uuid4()) | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content=agent_response.content, | |
| status=RunStatus.completed, | |
| workflow_agent_run=agent_response, | |
| ) | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = workflow_run_response.run_id | |
| agent_response.workflow_id = workflow_run_response.workflow_id | |
| # Update the run in session | |
| session.upsert_run(run=workflow_run_response) | |
| self.save_session(session=session) | |
| log_debug(f"Agent decision: workflow_executed={workflow_executed}") | |
| return workflow_run_response | |
| else: | |
| # Workflow was executed by the tool | |
| logger.info("=" * 80) | |
| logger.info("WORKFLOW AGENT: Called run_workflow tool (async)") | |
| logger.info(" ➜ Workflow was executed, retrieving results...") | |
| logger.info("=" * 80) | |
| log_debug("Reloading session from database to get the latest workflow run...") | |
| reloaded_session = self.get_session(session_id=session.session_id) | |
| if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0: | |
| # Get the last run (which is the one just created by the tool) | |
| last_run = reloaded_session.runs[-1] | |
| log_debug(f"Retrieved latest workflow run: {last_run.run_id}") | |
| log_debug(f"Total workflow runs in session: {len(reloaded_session.runs)}") | |
| # Update the last run with workflow_agent_run | |
| last_run.workflow_agent_run = agent_response | |
| # Store the full agent RunOutput and establish parent-child relationship | |
| if agent_response: | |
| agent_response.parent_run_id = last_run.run_id | |
| agent_response.workflow_id = last_run.workflow_id | |
| # Save the reloaded session (which has the updated run) | |
| self.save_session(session=reloaded_session) | |
| log_debug(f"Agent decision: workflow_executed={workflow_executed}") | |
| # Return the last run directly (WRO2 from inner workflow) | |
| return last_run | |
| else: | |
| log_warning("Could not reload session or no runs found after workflow execution") | |
| # Return a placeholder error response | |
| return WorkflowRunOutput( | |
| run_id=str(uuid4()), | |
| input=execution_input.input, | |
| session_id=session.session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| content="Error: Workflow execution failed", | |
| status=RunStatus.error, | |
| ) | |
| def cancel_run(self, run_id: str) -> bool: | |
| """Cancel a running workflow execution. | |
| Args: | |
| run_id (str): The run_id to cancel. | |
| Returns: | |
| bool: True if the run was found and marked for cancellation, False otherwise. | |
| """ | |
| return cancel_run_global(run_id) | |
| @overload | |
| def run( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Literal[False] = False, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| background: Optional[bool] = False, | |
| ) -> WorkflowRunOutput: ... | |
| @overload | |
| def run( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Literal[True] = True, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| background: Optional[bool] = False, | |
| ) -> Iterator[WorkflowRunOutputEvent]: ... | |
| def run( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: bool = False, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| background: Optional[bool] = False, | |
| **kwargs: Any, | |
| ) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]: | |
| """Execute the workflow synchronously with optional streaming""" | |
| if self._has_async_db(): | |
| raise Exception("`run()` is not supported with an async DB. Please use `arun()`.") | |
| input = self._validate_input(input) | |
| if background: | |
| raise RuntimeError("Background execution is not supported for sync run()") | |
| self._set_debug() | |
| run_id = str(uuid4()) | |
| self.initialize_workflow() | |
| session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id) | |
| # Read existing session from database | |
| workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id) | |
| self._update_metadata(session=workflow_session) | |
| # Initialize session state | |
| session_state = self._initialize_session_state( | |
| session_state=session_state or {}, user_id=user_id, session_id=session_id, run_id=run_id | |
| ) | |
| # Update session state from DB | |
| session_state = self._load_session_state(session=workflow_session, session_state=session_state) | |
| log_debug(f"Workflow Run Start: {self.name}", center=True) | |
| # Use simple defaults | |
| stream = stream or self.stream or False | |
| stream_events = (stream_events or stream_intermediate_steps) or ( | |
| self.stream_events or self.stream_intermediate_steps | |
| ) | |
| # Can't stream events if streaming is disabled | |
| if stream is False: | |
| stream_events = False | |
| log_debug(f"Stream: {stream}") | |
| log_debug(f"Total steps: {self._get_step_count()}") | |
| # Prepare steps | |
| self._prepare_steps() | |
| inputs = WorkflowExecutionInput( | |
| input=input, | |
| additional_data=additional_data, | |
| audio=audio, # type: ignore | |
| images=images, # type: ignore | |
| videos=videos, # type: ignore | |
| files=files, # type: ignore | |
| ) | |
| log_debug( | |
| f"Created pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}" | |
| ) | |
| self.update_agents_and_teams_session_info() | |
| # Execute workflow agent if configured | |
| if self.agent is not None: | |
| return self._execute_workflow_agent( | |
| user_input=input, # type: ignore | |
| session=workflow_session, | |
| execution_input=inputs, | |
| session_state=session_state, | |
| stream=stream, | |
| **kwargs, | |
| ) | |
| # Create workflow run response for regular workflow execution | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=input, | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| ) | |
| if stream: | |
| return self._execute_stream( | |
| session=workflow_session, | |
| execution_input=inputs, # type: ignore[arg-type] | |
| workflow_run_response=workflow_run_response, | |
| stream_events=stream_events, | |
| session_state=session_state, | |
| **kwargs, | |
| ) | |
| else: | |
| return self._execute( | |
| session=workflow_session, | |
| execution_input=inputs, # type: ignore[arg-type] | |
| workflow_run_response=workflow_run_response, | |
| session_state=session_state, | |
| **kwargs, | |
| ) | |
| @overload | |
| async def arun( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Literal[False] = False, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| background: Optional[bool] = False, | |
| websocket: Optional[WebSocket] = None, | |
| ) -> WorkflowRunOutput: ... | |
| @overload | |
| def arun( | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Literal[True] = True, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| background: Optional[bool] = False, | |
| websocket: Optional[WebSocket] = None, | |
| ) -> AsyncIterator[WorkflowRunOutputEvent]: ... | |
| def arun( # type: ignore | |
| self, | |
| input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None, | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| session_state: Optional[Dict[str, Any]] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: bool = False, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = False, | |
| background: Optional[bool] = False, | |
| websocket: Optional[WebSocket] = None, | |
| **kwargs: Any, | |
| ) -> Union[WorkflowRunOutput, AsyncIterator[WorkflowRunOutputEvent]]: | |
| """Execute the workflow synchronously with optional streaming""" | |
| input = self._validate_input(input) | |
| websocket_handler = None | |
| if websocket: | |
| from agno.workflow.types import WebSocketHandler | |
| websocket_handler = WebSocketHandler(websocket=websocket) | |
| if background: | |
| if stream and websocket: | |
| # Consider both stream_events and stream_intermediate_steps (deprecated) | |
| stream_events = stream_events or stream_intermediate_steps or False | |
| # Background + Streaming + WebSocket = Real-time events | |
| return self._arun_background_stream( # type: ignore | |
| input=input, | |
| additional_data=additional_data, | |
| user_id=user_id, | |
| session_id=session_id, | |
| session_state=session_state, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| stream_events=stream_events, | |
| websocket_handler=websocket_handler, | |
| **kwargs, | |
| ) | |
| elif stream and not websocket: | |
| # Background + Streaming but no WebSocket = Not supported | |
| raise ValueError("Background streaming execution requires a WebSocket for real-time events") | |
| else: | |
| # Background + Non-streaming = Polling (existing) | |
| return self._arun_background( # type: ignore | |
| input=input, | |
| additional_data=additional_data, | |
| user_id=user_id, | |
| session_id=session_id, | |
| session_state=session_state, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| **kwargs, | |
| ) | |
| self._set_debug() | |
| run_id = str(uuid4()) | |
| self.initialize_workflow() | |
| session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id) | |
| log_debug(f"Async Workflow Run Start: {self.name}", center=True) | |
| # Use simple defaults | |
| stream = stream or self.stream or False | |
| stream_events = (stream_events or stream_intermediate_steps) or ( | |
| self.stream_events or self.stream_intermediate_steps | |
| ) | |
| # Can't stream events if streaming is disabled | |
| if stream is False: | |
| stream_events = False | |
| log_debug(f"Stream: {stream}") | |
| # Prepare steps | |
| self._prepare_steps() | |
| inputs = WorkflowExecutionInput( | |
| input=input, | |
| additional_data=additional_data, | |
| audio=audio, # type: ignore | |
| images=images, # type: ignore | |
| videos=videos, # type: ignore | |
| files=files, | |
| ) | |
| log_debug( | |
| f"Created async pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}" | |
| ) | |
| self.update_agents_and_teams_session_info() | |
| if self.agent is not None: | |
| return self._aexecute_workflow_agent( # type: ignore | |
| user_input=input, # type: ignore | |
| session_id=session_id, | |
| user_id=user_id, | |
| execution_input=inputs, | |
| session_state=session_state, | |
| stream=stream, | |
| **kwargs, | |
| ) | |
| # Create workflow run response for regular workflow execution | |
| workflow_run_response = WorkflowRunOutput( | |
| run_id=run_id, | |
| input=input, | |
| session_id=session_id, | |
| workflow_id=self.id, | |
| workflow_name=self.name, | |
| created_at=int(datetime.now().timestamp()), | |
| ) | |
| if stream: | |
| return self._aexecute_stream( # type: ignore | |
| execution_input=inputs, | |
| workflow_run_response=workflow_run_response, | |
| session_id=session_id, | |
| user_id=user_id, | |
| stream_events=stream_events, | |
| websocket=websocket, | |
| files=files, | |
| session_state=session_state, | |
| **kwargs, | |
| ) | |
| else: | |
| return self._aexecute( # type: ignore | |
| execution_input=inputs, | |
| workflow_run_response=workflow_run_response, | |
| session_id=session_id, | |
| user_id=user_id, | |
| websocket=websocket, | |
| files=files, | |
| session_state=session_state, | |
| **kwargs, | |
| ) | |
| def _prepare_steps(self): | |
| """Prepare the steps for execution""" | |
| if not callable(self.steps) and self.steps is not None: | |
| prepared_steps: List[Union[Step, Steps, Loop, Parallel, Condition, Router]] = [] | |
| for i, step in enumerate(self.steps): # type: ignore | |
| if callable(step) and hasattr(step, "__name__"): | |
| step_name = step.__name__ | |
| log_debug(f"Step {i + 1}: Wrapping callable function '{step_name}'") | |
| prepared_steps.append(Step(name=step_name, description="User-defined callable step", executor=step)) # type: ignore | |
| elif isinstance(step, Agent): | |
| step_name = step.name or f"step_{i + 1}" | |
| log_debug(f"Step {i + 1}: Agent '{step_name}'") | |
| prepared_steps.append(Step(name=step_name, description=step.description, agent=step)) | |
| elif isinstance(step, Team): | |
| step_name = step.name or f"step_{i + 1}" | |
| log_debug(f"Step {i + 1}: Team '{step_name}' with {len(step.members)} members") | |
| prepared_steps.append(Step(name=step_name, description=step.description, team=step)) | |
| elif isinstance(step, (Step, Steps, Loop, Parallel, Condition, Router)): | |
| step_type = type(step).__name__ | |
| step_name = getattr(step, "name", f"unnamed_{step_type.lower()}") | |
| log_debug(f"Step {i + 1}: {step_type} '{step_name}'") | |
| prepared_steps.append(step) | |
| else: | |
| raise ValueError(f"Invalid step type: {type(step).__name__}") | |
| self.steps = prepared_steps # type: ignore | |
| log_debug("Step preparation completed") | |
| def print_response( | |
| self, | |
| input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]], | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Optional[bool] = None, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| markdown: bool = True, | |
| show_time: bool = True, | |
| show_step_details: bool = True, | |
| console: Optional[Any] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Print workflow execution with rich formatting and optional streaming | |
| Args: | |
| input: The main query/input for the workflow | |
| additional_data: Attached message data to the input | |
| user_id: User ID | |
| session_id: Session ID | |
| audio: Audio input | |
| images: Image input | |
| videos: Video input | |
| stream: Whether to stream the response content | |
| stream_events: Whether to stream intermediate steps | |
| markdown: Whether to render content as markdown | |
| show_time: Whether to show execution time | |
| show_step_details: Whether to show individual step outputs | |
| console: Rich console instance (optional) | |
| (deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default. | |
| """ | |
| if self._has_async_db(): | |
| raise Exception("`print_response()` is not supported with an async DB. Please use `aprint_response()`.") | |
| if stream is None: | |
| stream = self.stream or False | |
| # Considering both stream_events and stream_intermediate_steps (deprecated) | |
| stream_events = stream_events or stream_intermediate_steps | |
| # Can't stream events if streaming is disabled | |
| if stream is False: | |
| stream_events = False | |
| if stream_events is None: | |
| stream_events = ( | |
| False | |
| if (self.stream_events is None and self.stream_intermediate_steps is None) | |
| else (self.stream_intermediate_steps or self.stream_events) | |
| ) | |
| if stream: | |
| print_response_stream( | |
| workflow=self, | |
| input=input, | |
| user_id=user_id, | |
| session_id=session_id, | |
| additional_data=additional_data, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| console=console, | |
| **kwargs, | |
| ) | |
| else: | |
| print_response( | |
| workflow=self, | |
| input=input, | |
| user_id=user_id, | |
| session_id=session_id, | |
| additional_data=additional_data, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| console=console, | |
| **kwargs, | |
| ) | |
| async def aprint_response( | |
| self, | |
| input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]], | |
| additional_data: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| audio: Optional[List[Audio]] = None, | |
| images: Optional[List[Image]] = None, | |
| videos: Optional[List[Video]] = None, | |
| files: Optional[List[File]] = None, | |
| stream: Optional[bool] = None, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| markdown: bool = True, | |
| show_time: bool = True, | |
| show_step_details: bool = True, | |
| console: Optional[Any] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Print workflow execution with rich formatting and optional streaming | |
| Args: | |
| input: The main message/input for the workflow | |
| additional_data: Attached message data to the input | |
| user_id: User ID | |
| session_id: Session ID | |
| audio: Audio input | |
| images: Image input | |
| videos: Video input | |
| stream: Whether to stream the response content | |
| stream_events: Whether to stream intermediate steps | |
| markdown: Whether to render content as markdown | |
| show_time: Whether to show execution time | |
| show_step_details: Whether to show individual step outputs | |
| console: Rich console instance (optional) | |
| (deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default. | |
| """ | |
| if stream is None: | |
| stream = self.stream or False | |
| # Considering both stream_events and stream_intermediate_steps (deprecated) | |
| stream_events = stream_events or stream_intermediate_steps | |
| # Can't stream events if streaming is disabled | |
| if stream is False: | |
| stream_events = False | |
| if stream_events is None: | |
| stream_events = ( | |
| False | |
| if (self.stream_events is None and self.stream_intermediate_steps is None) | |
| else (self.stream_intermediate_steps or self.stream_events) | |
| ) | |
| if stream: | |
| await aprint_response_stream( | |
| workflow=self, | |
| input=input, | |
| additional_data=additional_data, | |
| user_id=user_id, | |
| session_id=session_id, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| console=console, | |
| **kwargs, | |
| ) | |
| else: | |
| await aprint_response( | |
| workflow=self, | |
| input=input, | |
| additional_data=additional_data, | |
| user_id=user_id, | |
| session_id=session_id, | |
| audio=audio, | |
| images=images, | |
| videos=videos, | |
| files=files, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| console=console, | |
| **kwargs, | |
| ) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert workflow to dictionary representation""" | |
| def serialize_step(step): | |
| # Handle callable functions (not wrapped in Step objects) | |
| if callable(step) and hasattr(step, "__name__"): | |
| step_dict = { | |
| "name": step.__name__, | |
| "description": "User-defined callable step", | |
| "type": StepType.STEP.value, | |
| } | |
| return step_dict | |
| # Handle Agent and Team objects directly | |
| if isinstance(step, Agent): | |
| step_dict = { | |
| "name": step.name or "unnamed_agent", | |
| "description": step.description or "Agent step", | |
| "type": StepType.STEP.value, | |
| "agent": step, | |
| } | |
| return step_dict | |
| if isinstance(step, Team): | |
| step_dict = { | |
| "name": step.name or "unnamed_team", | |
| "description": step.description or "Team step", | |
| "type": StepType.STEP.value, | |
| "team": step, | |
| } | |
| return step_dict | |
| step_dict = { | |
| "name": step.name if hasattr(step, "name") else f"unnamed_{type(step).__name__.lower()}", | |
| "description": step.description if hasattr(step, "description") else "User-defined callable step", | |
| "type": STEP_TYPE_MAPPING[type(step)].value, # type: ignore | |
| } | |
| # Handle agent/team/tools | |
| if hasattr(step, "agent"): | |
| step_dict["agent"] = step.agent if hasattr(step, "agent") else None # type: ignore | |
| if hasattr(step, "team"): | |
| step_dict["team"] = step.team if hasattr(step, "team") else None # type: ignore | |
| # Handle nested steps for Router/Loop | |
| if isinstance(step, (Router)): | |
| step_dict["steps"] = ( | |
| [serialize_step(step) for step in step.choices] if hasattr(step, "choices") else None | |
| ) | |
| elif isinstance(step, (Loop, Condition, Steps, Parallel)): | |
| step_dict["steps"] = [serialize_step(step) for step in step.steps] if hasattr(step, "steps") else None | |
| return step_dict | |
| if self.steps is None or callable(self.steps): | |
| steps_list = [] | |
| elif isinstance(self.steps, Steps): | |
| steps_list = self.steps.steps | |
| else: | |
| steps_list = self.steps | |
| return { | |
| "name": self.name, | |
| "workflow_id": self.id, | |
| "description": self.description, | |
| "steps": [serialize_step(s) for s in steps_list], | |
| "session_id": self.session_id, | |
| } | |
| def _calculate_session_metrics_from_workflow_metrics(self, workflow_metrics: WorkflowMetrics) -> Metrics: | |
| """Calculate session metrics by aggregating all step metrics from workflow metrics""" | |
| session_metrics = Metrics() | |
| # Aggregate metrics from all steps | |
| for step_name, step_metrics in workflow_metrics.steps.items(): | |
| if step_metrics.metrics: | |
| session_metrics += step_metrics.metrics | |
| session_metrics.time_to_first_token = None | |
| return session_metrics | |
| def _get_session_metrics(self, session: WorkflowSession) -> Metrics: | |
| """Get existing session metrics from the database""" | |
| if session.session_data and "session_metrics" in session.session_data: | |
| session_metrics_from_db = session.session_data.get("session_metrics") | |
| if session_metrics_from_db is not None: | |
| if isinstance(session_metrics_from_db, dict): | |
| return Metrics(**session_metrics_from_db) | |
| elif isinstance(session_metrics_from_db, Metrics): | |
| return session_metrics_from_db | |
| return Metrics() | |
| def _update_session_metrics(self, session: WorkflowSession, workflow_run_response: WorkflowRunOutput): | |
| """Calculate and update session metrics""" | |
| # Get existing session metrics | |
| session_metrics = self._get_session_metrics(session=session) | |
| # If workflow has metrics, convert and add them to session metrics | |
| if workflow_run_response.metrics: | |
| run_session_metrics = self._calculate_session_metrics_from_workflow_metrics(workflow_run_response.metrics) | |
| session_metrics += run_session_metrics | |
| session_metrics.time_to_first_token = None | |
| # Store updated session metrics - CONVERT TO DICT FOR JSON SERIALIZATION | |
| if not session.session_data: | |
| session.session_data = {} | |
| session.session_data["session_metrics"] = session_metrics.to_dict() | |
| async def aget_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]: | |
| """Get the session metrics for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is required") | |
| session = await self.aget_session(session_id=session_id) # type: ignore | |
| if session is None: | |
| raise Exception("Session not found") | |
| return self._get_session_metrics(session=session) | |
| def get_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]: | |
| """Get the session metrics for the given session ID and user ID.""" | |
| session_id = session_id or self.session_id | |
| if session_id is None: | |
| raise Exception("Session ID is required") | |
| session = self.get_session(session_id=session_id) | |
| if session is None: | |
| raise Exception("Session not found") | |
| return self._get_session_metrics(session=session) | |
| def update_agents_and_teams_session_info(self): | |
| """Update agents and teams with workflow session information""" | |
| log_debug("Updating agents and teams with session information") | |
| # Initialize steps - only if steps is iterable (not callable) | |
| if self.steps and not callable(self.steps): | |
| steps_list = self.steps.steps if isinstance(self.steps, Steps) else self.steps | |
| for step in steps_list: | |
| # TODO: Handle properly steps inside other primitives | |
| if isinstance(step, Step): | |
| active_executor = step.active_executor | |
| if hasattr(active_executor, "workflow_id"): | |
| active_executor.workflow_id = self.id | |
| # If it's a team, update all members | |
| if hasattr(active_executor, "members"): | |
| for member in active_executor.members: # type: ignore | |
| if hasattr(member, "workflow_id"): | |
| member.workflow_id = self.id | |
| ########################################################################### | |
| # Telemetry functions | |
| ########################################################################### | |
| def _get_telemetry_data(self) -> Dict[str, Any]: | |
| """Get the telemetry data for the workflow""" | |
| return { | |
| "workflow_id": self.id, | |
| "db_type": self.db.__class__.__name__ if self.db else None, | |
| "has_input_schema": self.input_schema is not None, | |
| } | |
| def _log_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None: | |
| """Send a telemetry event to the API for a created Workflow run""" | |
| self._set_telemetry() | |
| if not self.telemetry: | |
| return | |
| from agno.api.workflow import WorkflowRunCreate, create_workflow_run | |
| try: | |
| create_workflow_run( | |
| workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data()), | |
| ) | |
| except Exception as e: | |
| log_debug(f"Could not create Workflow run telemetry event: {e}") | |
| async def _alog_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None: | |
| """Send a telemetry event to the API for a created Workflow async run""" | |
| self._set_telemetry() | |
| if not self.telemetry: | |
| return | |
| from agno.api.workflow import WorkflowRunCreate, acreate_workflow_run | |
| try: | |
| await acreate_workflow_run( | |
| workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data()) | |
| ) | |
| except Exception as e: | |
| log_debug(f"Could not create Workflow run telemetry event: {e}") | |
| def cli_app( | |
| self, | |
| input: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| user: str = "User", | |
| emoji: str = ":technologist:", | |
| stream: Optional[bool] = None, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| markdown: bool = True, | |
| show_time: bool = True, | |
| show_step_details: bool = True, | |
| exit_on: Optional[List[str]] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """ | |
| Run an interactive command-line interface to interact with the workflow. | |
| This method creates a CLI interface that allows users to interact with the workflow | |
| either by providing a single input or through continuous interactive prompts. | |
| Arguments: | |
| input: Optional initial input to process before starting interactive mode. | |
| session_id: Optional session identifier for maintaining conversation context. | |
| user_id: Optional user identifier for tracking user-specific data. | |
| user: Display name for the user in the CLI prompt. Defaults to "User". | |
| emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:". | |
| stream: Whether to stream the workflow response. If None, uses workflow default. | |
| stream_events: Whether to stream intermediate step outputs. If None, uses workflow default. | |
| markdown: Whether to render output as markdown. Defaults to True. | |
| show_time: Whether to display timestamps in the output. Defaults to True. | |
| show_step_details: Whether to show detailed step information. Defaults to True. | |
| exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"]. | |
| (deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default. | |
| **kwargs: Additional keyword arguments passed to the workflow's print_response method. | |
| Returns: | |
| None: This method runs interactively and does not return a value. | |
| """ | |
| from rich.prompt import Prompt | |
| # Considering both stream_events and stream_intermediate_steps (deprecated) | |
| stream_events = stream_events or stream_intermediate_steps or False | |
| if input: | |
| self.print_response( | |
| input=input, | |
| stream=stream, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| user_id=user_id, | |
| session_id=session_id, | |
| **kwargs, | |
| ) | |
| _exit_on = exit_on or ["exit", "quit", "bye", "stop"] | |
| while True: | |
| message = Prompt.ask(f"[bold] {emoji} {user} [/bold]") | |
| if message in _exit_on: | |
| break | |
| self.print_response( | |
| input=message, | |
| stream=stream, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| user_id=user_id, | |
| session_id=session_id, | |
| **kwargs, | |
| ) | |
| async def acli_app( | |
| self, | |
| input: Optional[str] = None, | |
| session_id: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| user: str = "User", | |
| emoji: str = ":technologist:", | |
| stream: Optional[bool] = None, | |
| stream_events: Optional[bool] = None, | |
| stream_intermediate_steps: Optional[bool] = None, | |
| markdown: bool = True, | |
| show_time: bool = True, | |
| show_step_details: bool = True, | |
| exit_on: Optional[List[str]] = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """ | |
| Run an interactive command-line interface to interact with the workflow. | |
| This method creates a CLI interface that allows users to interact with the workflow | |
| either by providing a single input or through continuous interactive prompts. | |
| Arguments: | |
| input: Optional initial input to process before starting interactive mode. | |
| session_id: Optional session identifier for maintaining conversation context. | |
| user_id: Optional user identifier for tracking user-specific data. | |
| user: Display name for the user in the CLI prompt. Defaults to "User". | |
| emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:". | |
| stream: Whether to stream the workflow response. If None, uses workflow default. | |
| stream_events: Whether to stream events from the workflow. If None, uses workflow default. | |
| markdown: Whether to render output as markdown. Defaults to True. | |
| show_time: Whether to display timestamps in the output. Defaults to True. | |
| show_step_details: Whether to show detailed step information. Defaults to True. | |
| exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"]. | |
| (deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default. | |
| **kwargs: Additional keyword arguments passed to the workflow's print_response method. | |
| Returns: | |
| None: This method runs interactively and does not return a value. | |
| """ | |
| from rich.prompt import Prompt | |
| # Considering both stream_events and stream_intermediate_steps (deprecated) | |
| stream_events = stream_events or stream_intermediate_steps or False | |
| if input: | |
| await self.aprint_response( | |
| input=input, | |
| stream=stream, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| user_id=user_id, | |
| session_id=session_id, | |
| **kwargs, | |
| ) | |
| _exit_on = exit_on or ["exit", "quit", "bye", "stop"] | |
| while True: | |
| message = Prompt.ask(f"[bold] {emoji} {user} [/bold]") | |
| if message in _exit_on: | |
| break | |
| await self.aprint_response( | |
| input=message, | |
| stream=stream, | |
| stream_events=stream_events, | |
| markdown=markdown, | |
| show_time=show_time, | |
| show_step_details=show_step_details, | |
| user_id=user_id, | |
| session_id=session_id, | |
| **kwargs, | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment