Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Last active October 27, 2025 19:22
Show Gist options
  • Select an option

  • Save kausmeows/1306b9ee75f289a3c7e85368ffbfea16 to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/1306b9ee75f289a3c7e85368ffbfea16 to your computer and use it in GitHub Desktop.
import asyncio
from dataclasses import dataclass
from datetime import datetime
from os import getenv
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
overload,
)
from uuid import uuid4
from fastapi import WebSocket
from pydantic import BaseModel
from agno.agent.agent import Agent
from agno.db.base import AsyncBaseDb, BaseDb, SessionType
from agno.exceptions import InputCheckError, OutputCheckError, RunCancelledException
from agno.media import Audio, File, Image, Video
from agno.models.message import Message
from agno.models.metrics import Metrics
from agno.run.agent import RunContentEvent, RunEvent, RunOutput
from agno.run.base import RunStatus
from agno.run.cancel import (
cancel_run as cancel_run_global,
)
from agno.run.cancel import (
cleanup_run,
raise_if_cancelled,
register_run,
)
from agno.run.team import RunCompletedEvent, RunContentEvent as TeamRunContentEvent, RunStartedEvent
from agno.run.team import TeamRunEvent
from agno.run.workflow import (
StepOutputEvent,
WorkflowCancelledEvent,
WorkflowCompletedEvent,
WorkflowRunEvent,
WorkflowRunOutput,
WorkflowRunOutputEvent,
WorkflowStartedEvent,
)
from agno.session.workflow import WorkflowSession
from agno.team.team import Team
from agno.utils.common import is_typed_dict, validate_typed_dict
from agno.utils.log import (
log_debug,
log_error,
log_warning,
logger,
set_log_level_to_debug,
set_log_level_to_info,
use_workflow_logger,
)
from agno.utils.print_response.workflow import (
aprint_response,
aprint_response_stream,
print_response,
print_response_stream,
)
from agno.workflow.agent import WorkflowAgent
from agno.workflow.condition import Condition
from agno.workflow.loop import Loop
from agno.workflow.parallel import Parallel
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.steps import Steps
from agno.workflow.types import (
StepInput,
StepMetrics,
StepOutput,
StepType,
WebSocketHandler,
WorkflowExecutionInput,
WorkflowMetrics,
)
STEP_TYPE_MAPPING = {
Step: StepType.STEP,
Steps: StepType.STEPS,
Loop: StepType.LOOP,
Parallel: StepType.PARALLEL,
Condition: StepType.CONDITION,
Router: StepType.ROUTER,
}
WorkflowSteps = Union[
Callable[
["Workflow", WorkflowExecutionInput],
Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput], Any],
],
Steps,
List[
Union[
Callable[
[StepInput], Union[StepOutput, Awaitable[StepOutput], Iterator[StepOutput], AsyncIterator[StepOutput]]
],
Step,
Steps,
Loop,
Parallel,
Condition,
Router,
]
],
]
@dataclass
class Workflow:
"""Pipeline-based workflow execution"""
# Workflow identification - make name optional with default
name: Optional[str] = None
# Workflow ID (autogenerated if not set)
id: Optional[str] = None
# Workflow description
description: Optional[str] = None
# Workflow steps
steps: Optional[WorkflowSteps] = None
# Database to use for this workflow
db: Optional[Union[BaseDb, AsyncBaseDb]] = None
# Agentic Workflow - WorkflowAgent that decides when to run the workflow
agent: Optional[WorkflowAgent] = None # type: ignore
# Default session_id to use for this workflow (autogenerated if not set)
session_id: Optional[str] = None
# Default user_id to use for this workflow
user_id: Optional[str] = None
# Default session state (stored in the database to persist across runs)
session_state: Optional[Dict[str, Any]] = None
# Set to True to overwrite the stored session_state with the session_state provided in the run
overwrite_db_session_state: bool = False
# If True, the workflow runs in debug mode
debug_mode: Optional[bool] = False
# --- Workflow Streaming ---
# Stream the response from the Workflow
stream: Optional[bool] = None
# Stream the intermediate steps from the Workflow
stream_events: bool = False
# [Deprecated] Stream the intermediate steps from the Workflow
stream_intermediate_steps: bool = False
# Stream events from executors (agents/teams/functions) within steps
stream_executor_events: bool = True
# Persist the events on the run response
store_events: bool = False
# Events to skip when persisting the events on the run response
events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None
# Control whether to store executor responses (agent/team responses) in flattened runs
store_executor_outputs: bool = True
websocket_handler: Optional[WebSocketHandler] = None
# Input schema to validate the input to the workflow
input_schema: Optional[Type[BaseModel]] = None
# Metadata stored with this workflow
metadata: Optional[Dict[str, Any]] = None
# --- Telemetry ---
# telemetry=True logs minimal telemetry for analytics
# This helps us improve the Agent and provide better support
telemetry: bool = True
# Add this flag to control if the workflow should add history to the steps
add_workflow_history_to_steps: bool = False
# Number of historical runs to include in the messages
num_history_runs: int = 3
def __init__(
self,
id: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
db: Optional[Union[BaseDb, AsyncBaseDb]] = None,
steps: Optional[WorkflowSteps] = None,
agent: Optional[WorkflowAgent] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
overwrite_db_session_state: bool = False,
user_id: Optional[str] = None,
debug_mode: Optional[bool] = False,
stream: Optional[bool] = None,
stream_events: bool = False,
stream_intermediate_steps: bool = False,
stream_executor_events: bool = True,
store_events: bool = False,
events_to_skip: Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] = None,
store_executor_outputs: bool = True,
input_schema: Optional[Type[BaseModel]] = None,
metadata: Optional[Dict[str, Any]] = None,
cache_session: bool = False,
telemetry: bool = True,
add_workflow_history_to_steps: bool = False,
num_history_runs: int = 3,
):
self.id = id
self.name = name
self.description = description
self.steps = steps
self.agent = agent
self.session_id = session_id
self.session_state = session_state
self.overwrite_db_session_state = overwrite_db_session_state
self.user_id = user_id
self.debug_mode = debug_mode
self.store_events = store_events
self.events_to_skip = events_to_skip or []
self.stream = stream
self.stream_events = stream_events
self.stream_intermediate_steps = stream_intermediate_steps
self.stream_executor_events = stream_executor_events
self.store_executor_outputs = store_executor_outputs
self.input_schema = input_schema
self.metadata = metadata
self.cache_session = cache_session
self.db = db
self.telemetry = telemetry
self.add_workflow_history_to_steps = add_workflow_history_to_steps
self.num_history_runs = num_history_runs
self._workflow_session: Optional[WorkflowSession] = None
def set_id(self) -> None:
if self.id is None:
if self.name is not None:
self.id = self.name.lower().replace(" ", "-")
else:
self.id = str(uuid4())
def _has_async_db(self) -> bool:
return self.db is not None and isinstance(self.db, AsyncBaseDb)
def _validate_input(
self, input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]]
) -> Optional[Union[str, List, Dict, Message, BaseModel]]:
"""Parse and validate input against input_schema if provided"""
if self.input_schema is None:
return input # Return input unchanged if no schema is set
if input is None:
raise ValueError("Input required when input_schema is set")
# Handle Message objects - extract content
if isinstance(input, Message):
input = input.content # type: ignore
# If input is a string, convert it to a dict
if isinstance(input, str):
import json
try:
input = json.loads(input)
except Exception as e:
raise ValueError(f"Failed to parse input. Is it a valid JSON string?: {e}")
# Case 1: Message is already a BaseModel instance
if isinstance(input, BaseModel):
if isinstance(input, self.input_schema):
try:
return input
except Exception as e:
raise ValueError(f"BaseModel validation failed: {str(e)}")
else:
# Different BaseModel types
raise ValueError(f"Expected {self.input_schema.__name__} but got {type(input).__name__}")
# Case 2: Message is a dict
elif isinstance(input, dict):
try:
# Check if the schema is a TypedDict
if is_typed_dict(self.input_schema):
validated_dict = validate_typed_dict(input, self.input_schema)
return validated_dict
else:
validated_model = self.input_schema(**input)
return validated_model
except Exception as e:
raise ValueError(f"Failed to parse dict into {self.input_schema.__name__}: {str(e)}")
# Case 3: Other types not supported for structured input
else:
raise ValueError(
f"Cannot validate {type(input)} against input_schema. Expected dict or {self.input_schema.__name__} instance."
)
@property
def run_parameters(self) -> Dict[str, Any]:
"""Get the run parameters for the workflow"""
if self.steps is None:
return {}
parameters = {}
if self.steps and callable(self.steps):
from inspect import Parameter, signature
sig = signature(self.steps) # type: ignore
for param_name, param in sig.parameters.items():
if param_name not in ["workflow", "execution_input", "self"]:
parameters[param_name] = {
"name": param_name,
"default": param.default.default
if hasattr(param.default, "__class__") and param.default.__class__.__name__ == "FieldInfo"
else (param.default if param.default is not Parameter.empty else None),
"annotation": (
param.annotation.__name__
if hasattr(param.annotation, "__name__")
else (
str(param.annotation).replace("typing.Optional[", "").replace("]", "")
if "typing.Optional" in str(param.annotation)
else str(param.annotation)
)
)
if param.annotation is not Parameter.empty
else None,
"required": param.default is Parameter.empty,
}
else:
parameters = {
"message": {
"name": "message",
"default": None,
"annotation": "str",
"required": True,
},
}
return parameters
def initialize_workflow(self):
if self.id is None:
self.set_id()
log_debug(f"Generated new workflow_id: {self.id}")
def _initialize_session(
self,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> Tuple[str, Optional[str]]:
"""Initialize the session for the workflow."""
if session_id is None:
if self.session_id:
session_id = self.session_id
else:
session_id = str(uuid4())
# We make the session_id sticky to the agent instance if no session_id is provided
self.session_id = session_id
log_debug(f"Session ID: {session_id}", center=True)
# Use the default user_id when necessary
if user_id is None or user_id == "":
user_id = self.user_id
return session_id, user_id
def _initialize_session_state(
self,
session_state: Dict[str, Any],
user_id: Optional[str] = None,
session_id: Optional[str] = None,
run_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Initialize the session state for the workflow."""
if user_id:
session_state["current_user_id"] = user_id
if session_id is not None:
session_state["current_session_id"] = session_id
if run_id is not None:
session_state["current_run_id"] = run_id
session_state.update(
{
"workflow_id": self.id,
"run_id": run_id,
"session_id": session_id,
}
)
if self.name:
session_state["workflow_name"] = self.name
return session_state
def _generate_workflow_session_name(self) -> str:
"""Generate a name for the workflow session"""
if self.session_id is None:
return f"Workflow Session - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
datetime_str = datetime.now().strftime("%Y-%m-%d %H:%M")
new_session_name = f"Workflow Session-{datetime_str}"
if self.description:
truncated_desc = self.description[:40] + "-" if len(self.description) > 40 else self.description
new_session_name = f"{truncated_desc} - {datetime_str}"
return new_session_name
async def aset_session_name(
self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None
) -> WorkflowSession:
"""Set the session name and save to storage, using an async database"""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
# -*- Read from storage
session = await self.aget_session(session_id=session_id) # type: ignore
if autogenerate:
# -*- Generate name for session
session_name = self._generate_workflow_session_name()
log_debug(f"Generated Workflow Session Name: {session_name}")
elif session_name is None:
raise Exception("Session name is not set")
# -*- Rename session
session.session_data["session_name"] = session_name # type: ignore
# -*- Save to storage
await self.asave_session(session=session) # type: ignore
return session # type: ignore
def set_session_name(
self, session_id: Optional[str] = None, autogenerate: bool = False, session_name: Optional[str] = None
) -> WorkflowSession:
"""Set the session name and save to storage"""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
# -*- Read from storage
session = self.get_session(session_id=session_id) # type: ignore
if autogenerate:
# -*- Generate name for session
session_name = self._generate_workflow_session_name()
log_debug(f"Generated Workflow Session Name: {session_name}")
elif session_name is None:
raise Exception("Session name is not set")
# -*- Rename session
session.session_data["session_name"] = session_name # type: ignore
# -*- Save to storage
self.save_session(session=session) # type: ignore
return session # type: ignore
async def aget_session_name(self, session_id: Optional[str] = None) -> str:
"""Get the session name for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = await self.aget_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
return session.session_data.get("session_name", "") if session.session_data else ""
def get_session_name(self, session_id: Optional[str] = None) -> str:
"""Get the session name for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = self.get_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
return session.session_data.get("session_name", "") if session.session_data else ""
async def aget_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Get the session state for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = await self.aget_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
return session.session_data.get("session_state", {}) if session.session_data else {}
def get_session_state(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""Get the session state for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = self.get_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
return session.session_data.get("session_state", {}) if session.session_data else {}
def update_session_state(
self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Update the session state for the given session ID.
Args:
session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs.
session_id: The session ID to update. If not provided, the current cached session ID is used.
Returns:
dict: The updated session state.
"""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = self.get_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
if session.session_data is not None and "session_state" not in session.session_data:
session.session_data["session_state"] = {}
for key, value in session_state_updates.items():
session.session_data["session_state"][key] = value # type: ignore
self.save_session(session=session)
return session.session_data["session_state"] # type: ignore
async def aupdate_session_state(
self, session_state_updates: Dict[str, Any], session_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Update the session state for the given session ID (async).
Args:
session_state_updates: The updates to apply to the session state. Should be a dictionary of key-value pairs.
session_id: The session ID to update. If not provided, the current cached session ID is used.
Returns:
dict: The updated session state.
"""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is not set")
session = await self.aget_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
if session.session_data is not None and "session_state" not in session.session_data:
session.session_data["session_state"] = {} # type: ignore
for key, value in session_state_updates.items():
session.session_data["session_state"][key] = value # type: ignore
await self.asave_session(session=session)
return session.session_data["session_state"] # type: ignore
async def adelete_session(self, session_id: str):
"""Delete the current session and save to storage"""
if self.db is None:
return
# -*- Delete session
await self.db.delete_session(session_id=session_id) # type: ignore
def delete_session(self, session_id: str):
"""Delete the current session and save to storage"""
if self.db is None:
return
# -*- Delete session
self.db.delete_session(session_id=session_id)
async def aget_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
"""Get a RunOutput from the database."""
if self._workflow_session is not None:
run_response = self._workflow_session.get_run(run_id=run_id)
if run_response is not None:
return run_response
else:
log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}")
return None
else:
workflow_session = await self.aget_session(session_id=session_id) # type: ignore
if workflow_session is not None:
run_response = workflow_session.get_run(run_id=run_id)
if run_response is not None:
return run_response
else:
log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}")
return None
def get_run_output(self, run_id: str, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
"""Get a RunOutput from the database."""
if self._workflow_session is not None:
run_response = self._workflow_session.get_run(run_id=run_id)
if run_response is not None:
return run_response
else:
log_warning(f"RunOutput {run_id} not found in AgentSession {self._workflow_session.session_id}")
return None
else:
workflow_session = self.get_session(session_id=session_id)
if workflow_session is not None:
run_response = workflow_session.get_run(run_id=run_id)
if run_response is not None:
return run_response
else:
log_warning(f"RunOutput {run_id} not found in AgentSession {session_id}")
return None
async def aget_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
"""Get the last run response from the database."""
if (
self._workflow_session is not None
and self._workflow_session.runs is not None
and len(self._workflow_session.runs) > 0
):
run_response = self._workflow_session.runs[-1]
if run_response is not None:
return run_response
else:
workflow_session = await self.aget_session(session_id=session_id) # type: ignore
if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0:
run_response = workflow_session.runs[-1]
if run_response is not None:
return run_response
else:
log_warning(f"No run responses found in WorkflowSession {session_id}")
return None
def get_last_run_output(self, session_id: Optional[str] = None) -> Optional[WorkflowRunOutput]:
"""Get the last run response from the database."""
if (
self._workflow_session is not None
and self._workflow_session.runs is not None
and len(self._workflow_session.runs) > 0
):
run_response = self._workflow_session.runs[-1]
if run_response is not None:
return run_response
else:
workflow_session = self.get_session(session_id=session_id)
if workflow_session is not None and workflow_session.runs is not None and len(workflow_session.runs) > 0:
run_response = workflow_session.runs[-1]
if run_response is not None:
return run_response
else:
log_warning(f"No run responses found in WorkflowSession {session_id}")
return None
def read_or_create_session(
self,
session_id: str,
user_id: Optional[str] = None,
) -> WorkflowSession:
from time import time
# Returning cached session if we have one
if self._workflow_session is not None and self._workflow_session.session_id == session_id:
return self._workflow_session
# Try to load from database
workflow_session = None
if self.db is not None:
log_debug(f"Reading WorkflowSession: {session_id}")
workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id))
if workflow_session is None:
# Creating new session if none found
log_debug(f"Creating new WorkflowSession: {session_id}")
session_data = {}
if self.session_state is not None:
from copy import deepcopy
session_data["session_state"] = deepcopy(self.session_state)
workflow_session = WorkflowSession(
session_id=session_id,
workflow_id=self.id,
user_id=user_id,
workflow_data=self._get_workflow_data(),
session_data=session_data,
metadata=self.metadata,
created_at=int(time()),
)
# Cache the session if relevant
if workflow_session is not None and self.cache_session:
self._workflow_session = workflow_session
return workflow_session
async def aread_or_create_session(
self,
session_id: str,
user_id: Optional[str] = None,
) -> WorkflowSession:
from time import time
# Returning cached session if we have one
if self._workflow_session is not None and self._workflow_session.session_id == session_id:
return self._workflow_session
# Try to load from database
workflow_session = None
if self.db is not None:
log_debug(f"Reading WorkflowSession: {session_id}")
workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id))
if workflow_session is None:
# Creating new session if none found
log_debug(f"Creating new WorkflowSession: {session_id}")
workflow_session = WorkflowSession(
session_id=session_id,
workflow_id=self.id,
user_id=user_id,
workflow_data=self._get_workflow_data(),
session_data={},
metadata=self.metadata,
created_at=int(time()),
)
# Cache the session if relevant
if workflow_session is not None and self.cache_session:
self._workflow_session = workflow_session
return workflow_session
async def aget_session(
self,
session_id: Optional[str] = None,
) -> Optional[WorkflowSession]:
"""Load an WorkflowSession from database.
Args:
session_id: The session_id to load from storage.
Returns:
WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist.
"""
if not session_id and not self.session_id:
raise Exception("No session_id provided")
session_id_to_load = session_id or self.session_id
# Try to load from database
if self.db is not None and session_id_to_load is not None:
workflow_session = cast(WorkflowSession, await self._aread_session(session_id=session_id_to_load))
return workflow_session
log_warning(f"WorkflowSession {session_id_to_load} not found in db")
return None
def get_session(
self,
session_id: Optional[str] = None,
) -> Optional[WorkflowSession]:
"""Load an WorkflowSession from database.
Args:
session_id: The session_id to load from storage.
Returns:
WorkflowSession: The WorkflowSession loaded from the database or created if it does not exist.
"""
if not session_id and not self.session_id:
raise Exception("No session_id provided")
session_id_to_load = session_id or self.session_id
# Try to load from database
if self.db is not None and session_id_to_load is not None:
workflow_session = cast(WorkflowSession, self._read_session(session_id=session_id_to_load))
return workflow_session
log_warning(f"WorkflowSession {session_id_to_load} not found in db")
return None
async def asave_session(self, session: WorkflowSession) -> None:
"""Save the WorkflowSession to storage, using an async database.
Returns:
Optional[WorkflowSession]: The saved WorkflowSession or None if not saved.
"""
if self.db is not None and session.session_data is not None:
if session.session_data.get("session_state") is not None:
session.session_data["session_state"].pop("current_session_id", None)
session.session_data["session_state"].pop("current_user_id", None)
session.session_data["session_state"].pop("current_run_id", None)
session.session_data["session_state"].pop("workflow_id", None)
session.session_data["session_state"].pop("run_id", None)
session.session_data["session_state"].pop("session_id", None)
session.session_data["session_state"].pop("workflow_name", None)
await self._aupsert_session(session=session) # type: ignore
log_debug(f"Created or updated WorkflowSession record: {session.session_id}")
def save_session(self, session: WorkflowSession) -> None:
"""Save the WorkflowSession to storage
Returns:
Optional[WorkflowSession]: The saved WorkflowSession or None if not saved.
"""
if self.db is not None and session.session_data is not None:
if session.session_data.get("session_state") is not None:
session.session_data["session_state"].pop("current_session_id", None)
session.session_data["session_state"].pop("current_user_id", None)
session.session_data["session_state"].pop("current_run_id", None)
session.session_data["session_state"].pop("workflow_id", None)
session.session_data["session_state"].pop("run_id", None)
session.session_data["session_state"].pop("session_id", None)
session.session_data["session_state"].pop("workflow_name", None)
self._upsert_session(session=session)
log_debug(f"Created or updated WorkflowSession record: {session.session_id}")
# -*- Session Database Functions
async def _aread_session(self, session_id: str) -> Optional[WorkflowSession]:
"""Get a Session from the database."""
try:
if not self.db:
raise ValueError("Db not initialized")
session = await self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW) # type: ignore
return session if isinstance(session, (WorkflowSession, type(None))) else None
except Exception as e:
log_warning(f"Error getting session from db: {e}")
return None
def _read_session(self, session_id: str) -> Optional[WorkflowSession]:
"""Get a Session from the database."""
try:
if not self.db:
raise ValueError("Db not initialized")
session = self.db.get_session(session_id=session_id, session_type=SessionType.WORKFLOW)
return session if isinstance(session, (WorkflowSession, type(None))) else None
except Exception as e:
log_warning(f"Error getting session from db: {e}")
return None
async def _aupsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]:
"""Upsert a Session into the database."""
try:
if not self.db:
raise ValueError("Db not initialized")
result = await self.db.upsert_session(session=session) # type: ignore
return result if isinstance(result, (WorkflowSession, type(None))) else None
except Exception as e:
log_warning(f"Error upserting session into db: {e}")
return None
def _upsert_session(self, session: WorkflowSession) -> Optional[WorkflowSession]:
"""Upsert a Session into the database."""
try:
if not self.db:
raise ValueError("Db not initialized")
result = self.db.upsert_session(session=session)
return result if isinstance(result, (WorkflowSession, type(None))) else None
except Exception as e:
log_warning(f"Error upserting session into db: {e}")
return None
def _update_metadata(self, session: WorkflowSession):
"""Update the extra_data in the session"""
from agno.utils.merge_dict import merge_dictionaries
# Read metadata from the database
if session.metadata is not None:
# If metadata is set in the workflow, update the database metadata with the workflow's metadata
if self.metadata is not None:
# Updates workflow's session metadata in place
merge_dictionaries(session.metadata, self.metadata)
# Update the current metadata with the metadata from the database which is updated in place
self.metadata = session.metadata
def _load_session_state(self, session: WorkflowSession, session_state: Dict[str, Any]):
"""Load and return the stored session_state from the database, optionally merging it with the given one"""
from agno.utils.merge_dict import merge_dictionaries
# Get the session_state from the database and merge with proper precedence
# At this point session_state contains: agent_defaults + run_params
if session.session_data and "session_state" in session.session_data:
session_state_from_db = session.session_data.get("session_state")
if (
session_state_from_db is not None
and isinstance(session_state_from_db, dict)
and len(session_state_from_db) > 0
and not self.overwrite_db_session_state
):
# This preserves precedence: run_params > db_state > agent_defaults
merged_state = session_state_from_db.copy()
merge_dictionaries(merged_state, session_state)
session_state.clear()
session_state.update(merged_state)
# Update the session_state in the session
if session.session_data is None:
session.session_data = {}
session.session_data["session_state"] = session_state
return session_state
def _get_workflow_data(self) -> Dict[str, Any]:
workflow_data: Dict[str, Any] = {
"workflow_id": self.id,
"name": self.name,
}
if self.steps and not callable(self.steps):
steps_dict = []
for step in self.steps: # type: ignore
if callable(step):
step_type = StepType.STEP
elif isinstance(step, Agent) or isinstance(step, Team):
step_type = StepType.STEP
else:
step_type = STEP_TYPE_MAPPING[type(step)]
step_dict = {
"name": step.name if hasattr(step, "name") else step.__name__, # type: ignore
"description": step.description if hasattr(step, "description") else "User-defined callable step",
"type": step_type.value,
}
steps_dict.append(step_dict)
workflow_data["steps"] = steps_dict
elif callable(self.steps):
workflow_data["steps"] = [
{
"name": "Custom Function",
"description": "User-defined callable workflow",
"type": "Callable",
}
]
return workflow_data
def _handle_event(
self,
event: "WorkflowRunOutputEvent",
workflow_run_response: WorkflowRunOutput,
websocket_handler: Optional[WebSocketHandler] = None,
) -> "WorkflowRunOutputEvent":
"""Handle workflow events for storage - similar to Team._handle_event"""
if self.store_events:
# Check if this event type should be skipped
if self.events_to_skip:
event_type = event.event
for skip_event in self.events_to_skip:
if isinstance(skip_event, str):
if event_type == skip_event:
return event
else:
# It's a WorkflowRunEvent enum
if event_type == skip_event.value:
return event
# Store the event
if workflow_run_response.events is None:
workflow_run_response.events = []
workflow_run_response.events.append(event)
# Broadcast to WebSocket if available (async context only)
if websocket_handler:
import asyncio
try:
loop = asyncio.get_running_loop()
if loop:
asyncio.create_task(websocket_handler.handle_event(event))
except RuntimeError:
pass
return event
def _enrich_event_with_workflow_context(
self,
event: Any,
workflow_run_response: WorkflowRunOutput,
step_index: Optional[Union[int, tuple]] = None,
step: Optional[Any] = None,
) -> Any:
"""Enrich any event with workflow context information for frontend tracking"""
step_id = getattr(step, "step_id", None) if step else None
step_name = getattr(step, "name", None) if step else None
if hasattr(event, "workflow_id"):
event.workflow_id = workflow_run_response.workflow_id
if hasattr(event, "workflow_run_id"):
event.workflow_run_id = workflow_run_response.run_id
if hasattr(event, "step_id") and step_id:
event.step_id = step_id
if hasattr(event, "step_name") and step_name is not None:
if event.step_name is None:
event.step_name = step_name
# Only set step_index if it's not already set (preserve parallel.py's tuples)
if hasattr(event, "step_index") and step_index is not None:
if event.step_index is None:
event.step_index = step_index
return event
def _transform_step_output_to_event(
self, step_output: StepOutput, workflow_run_response: WorkflowRunOutput, step_index: Optional[int] = None
) -> StepOutputEvent:
"""Transform a StepOutput object into a StepOutputEvent for consistent streaming interface"""
return StepOutputEvent(
step_output=step_output,
run_id=workflow_run_response.run_id or "",
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
step_name=step_output.step_name,
step_index=step_index,
)
def _set_debug(self) -> None:
"""Set debug mode and configure logging"""
if self.debug_mode or getenv("AGNO_DEBUG", "false").lower() == "true":
use_workflow_logger()
self.debug_mode = True
set_log_level_to_debug(source_type="workflow")
# Propagate to steps - only if steps is iterable (not callable)
if self.steps and not callable(self.steps):
if isinstance(self.steps, Steps):
steps_to_iterate = self.steps.steps
else:
steps_to_iterate = self.steps
for step in steps_to_iterate:
self._propagate_debug_to_step(step)
else:
set_log_level_to_info(source_type="workflow")
def _set_telemetry(self) -> None:
"""Override telemetry settings based on environment variables."""
telemetry_env = getenv("AGNO_TELEMETRY")
if telemetry_env is not None:
self.telemetry = telemetry_env.lower() == "true"
def _propagate_debug_to_step(self, step):
"""Recursively propagate debug mode to steps and nested primitives"""
# Handle direct Step objects
if hasattr(step, "active_executor") and step.active_executor:
executor = step.active_executor
if hasattr(executor, "debug_mode"):
executor.debug_mode = True
# If it's a team, propagate to all members
if hasattr(executor, "members"):
for member in executor.members:
if hasattr(member, "debug_mode"):
member.debug_mode = True
# Handle nested primitives - check both 'steps' and 'choices' attributes
for attr_name in ["steps", "choices"]:
if hasattr(step, attr_name):
attr_value = getattr(step, attr_name)
if attr_value and isinstance(attr_value, list):
for nested_step in attr_value:
self._propagate_debug_to_step(nested_step)
def _create_step_input(
self,
execution_input: WorkflowExecutionInput,
previous_step_outputs: Optional[Dict[str, StepOutput]] = None,
shared_images: Optional[List[Image]] = None,
shared_videos: Optional[List[Video]] = None,
shared_audio: Optional[List[Audio]] = None,
shared_files: Optional[List[File]] = None,
) -> StepInput:
"""Helper method to create StepInput with enhanced data flow support"""
previous_step_content = None
if previous_step_outputs:
last_output = list(previous_step_outputs.values())[-1]
previous_step_content = last_output.content if last_output else None
log_debug(f"Using previous step content from: {list(previous_step_outputs.keys())[-1]}")
return StepInput(
input=execution_input.input,
previous_step_content=previous_step_content,
previous_step_outputs=previous_step_outputs,
additional_data=execution_input.additional_data,
images=shared_images or [],
videos=shared_videos or [],
audio=shared_audio or [],
files=shared_files or [],
)
def _get_step_count(self) -> int:
"""Get the number of steps in the workflow"""
if self.steps is None:
return 0
elif callable(self.steps):
return 1 # Callable function counts as 1 step
else:
# Handle Steps wrapper
if isinstance(self.steps, Steps):
return len(self.steps.steps)
else:
return len(self.steps)
def _aggregate_workflow_metrics(self, step_results: List[Union[StepOutput, List[StepOutput]]]) -> WorkflowMetrics:
"""Aggregate metrics from all step responses into structured workflow metrics"""
steps_dict = {}
def process_step_output(step_output: StepOutput):
"""Process a single step output for metrics"""
# If this step has nested steps, process them recursively
if hasattr(step_output, "steps") and step_output.steps:
for nested_step in step_output.steps:
process_step_output(nested_step)
# Only collect metrics from steps that actually have metrics (actual agents/teams)
if (
step_output.step_name and step_output.metrics and step_output.executor_type in ["agent", "team"]
): # Only include actual executors
step_metrics = StepMetrics(
step_name=step_output.step_name,
executor_type=step_output.executor_type or "unknown",
executor_name=step_output.executor_name or "unknown",
metrics=step_output.metrics,
)
steps_dict[step_output.step_name] = step_metrics
# Process all step results
for step_result in step_results:
process_step_output(cast(StepOutput, step_result))
return WorkflowMetrics(
steps=steps_dict,
)
def _call_custom_function(self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any) -> Any:
"""Call custom function with only the parameters it expects"""
from inspect import signature
sig = signature(func)
# Build arguments based on what the function actually accepts
call_kwargs: Dict[str, Any] = {}
# Only add workflow and execution_input if the function expects them
if "workflow" in sig.parameters: # type: ignore
call_kwargs["workflow"] = self
if "execution_input" in sig.parameters:
call_kwargs["execution_input"] = execution_input # type: ignore
if "session_state" in sig.parameters:
call_kwargs["session_state"] = self.session_state # type: ignore
# Add any other kwargs that the function expects
for param_name in kwargs:
if param_name in sig.parameters: # type: ignore
call_kwargs[param_name] = kwargs[param_name]
# If function has **kwargs parameter, pass all remaining kwargs
for param in sig.parameters.values(): # type: ignore
if param.kind == param.VAR_KEYWORD:
call_kwargs.update(kwargs)
break
try:
return func(**call_kwargs)
except TypeError as e:
# If signature inspection fails, fall back to original method
logger.error(f"Function signature inspection failed: {e}. Falling back to original calling convention.")
return func(**kwargs)
def _accumulate_partial_step_data(
self, event: Union[RunContentEvent, TeamRunContentEvent], partial_step_content: str
) -> str:
"""Accumulate partial step data from streaming events"""
if isinstance(event, (RunContentEvent, TeamRunContentEvent)) and event.content:
if isinstance(event.content, str):
partial_step_content += event.content
return partial_step_content
def _execute(
self,
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
workflow_run_response: WorkflowRunOutput,
session_state: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> WorkflowRunOutput:
"""Execute a specific pipeline by name synchronously"""
from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction
workflow_run_response.status = RunStatus.running
if workflow_run_response.run_id:
register_run(workflow_run_response.run_id) # type: ignore
if callable(self.steps):
if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps):
raise ValueError("Cannot use async function with synchronous execution")
elif isgeneratorfunction(self.steps):
content = ""
for chunk in self.steps(self, execution_input, **kwargs):
# Check for cancellation while consuming generator
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
else:
content += str(chunk)
workflow_run_response.content = content
else:
# Execute the workflow with the custom executor
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs) # type: ignore[arg-type]
workflow_run_response.status = RunStatus.completed
else:
try:
# Track outputs from each step for enhanced data flow
collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
previous_step_outputs: Dict[str, StepOutput] = {}
shared_images: List[Image] = execution_input.images or []
output_images: List[Image] = (execution_input.images or []).copy() # Start with input images
shared_videos: List[Video] = execution_input.videos or []
output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos
shared_audio: List[Audio] = execution_input.audio or []
output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio
shared_files: List[File] = execution_input.files or []
output_files: List[File] = (execution_input.files or []).copy() # Start with input files
for i, step in enumerate(self.steps): # type: ignore[arg-type]
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
step_name = getattr(step, "name", f"step_{i + 1}")
log_debug(f"Executing step {i + 1}/{self._get_step_count()}: {step_name}")
# Create enhanced StepInput
step_input = self._create_step_input(
execution_input=execution_input,
previous_step_outputs=previous_step_outputs,
shared_images=shared_images,
shared_videos=shared_videos,
shared_audio=shared_audio,
shared_files=shared_files,
)
# Check for can cellation before executing step
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
step_output = step.execute( # type: ignore[union-attr]
step_input,
session_id=session.session_id,
user_id=self.user_id,
workflow_run_response=workflow_run_response,
session_state=session_state,
store_executor_outputs=self.store_executor_outputs,
workflow_session=session,
add_workflow_history_to_steps=self.add_workflow_history_to_steps
if self.add_workflow_history_to_steps
else None,
num_history_runs=self.num_history_runs,
)
# Check for cancellation after step execution
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
# Update the workflow-level previous_step_outputs dictionary
previous_step_outputs[step_name] = step_output
collected_step_outputs.append(step_output)
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
if step_output.stop:
logger.info(f"Early termination requested by step {step_name}")
break
# Update the workflow_run_response with completion data
if collected_step_outputs:
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
last_output = cast(StepOutput, collected_step_outputs[-1])
# Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
if getattr(last_output, "steps", None):
_cur = last_output
while getattr(_cur, "steps", None):
_steps = _cur.steps or []
if not _steps:
break
_cur = _steps[-1]
workflow_run_response.content = _cur.content
else:
workflow_run_response.content = last_output.content
else:
workflow_run_response.content = "No steps executed"
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.images = output_images
workflow_run_response.videos = output_videos
workflow_run_response.audio = output_audio
workflow_run_response.status = RunStatus.completed
except (InputCheckError, OutputCheckError) as e:
log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
# Store error response
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}"
raise e
except RunCancelledException as e:
logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled")
workflow_run_response.status = RunStatus.cancelled
workflow_run_response.content = str(e)
except Exception as e:
import traceback
traceback.print_exc()
logger.error(f"Workflow execution failed: {e}")
# Store error response
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Workflow execution failed: {e}"
raise e
finally:
self._update_session_metrics(session=session, workflow_run_response=workflow_run_response)
session.upsert_run(run=workflow_run_response)
self.save_session(session=session)
# Always clean up the run tracking
cleanup_run(workflow_run_response.run_id) # type: ignore
# Log Workflow Telemetry
if self.telemetry:
self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id)
return workflow_run_response
def _execute_stream(
self,
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
workflow_run_response: WorkflowRunOutput,
session_state: Optional[Dict[str, Any]] = None,
stream_events: bool = False,
**kwargs: Any,
) -> Iterator[WorkflowRunOutputEvent]:
"""Execute a specific pipeline by name with event streaming"""
from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction
workflow_run_response.status = RunStatus.running
# Register run for cancellation tracking
if workflow_run_response.run_id:
register_run(workflow_run_response.run_id)
workflow_started_event = WorkflowStartedEvent(
run_id=workflow_run_response.run_id or "",
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
)
yield self._handle_event(workflow_started_event, workflow_run_response)
if callable(self.steps):
if iscoroutinefunction(self.steps) or isasyncgenfunction(self.steps):
raise ValueError("Cannot use async function with synchronous execution")
elif isgeneratorfunction(self.steps):
content = ""
for chunk in self._call_custom_function(self.steps, execution_input, **kwargs): # type: ignore[arg-type]
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
# Update the run_response with the content from the result
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
yield chunk
else:
content += str(chunk)
workflow_run_response.content = content
else:
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs)
workflow_run_response.status = RunStatus.completed
else:
try:
# Track outputs from each step for enhanced data flow
collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
previous_step_outputs: Dict[str, StepOutput] = {}
shared_images: List[Image] = execution_input.images or []
output_images: List[Image] = (execution_input.images or []).copy() # Start with input images
shared_videos: List[Video] = execution_input.videos or []
output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos
shared_audio: List[Audio] = execution_input.audio or []
output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio
shared_files: List[File] = execution_input.files or []
output_files: List[File] = (execution_input.files or []).copy() # Start with input files
early_termination = False
# Track partial step data in case of cancellation
current_step_name = ""
current_step = None
partial_step_content = ""
for i, step in enumerate(self.steps): # type: ignore[arg-type]
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
step_name = getattr(step, "name", f"step_{i + 1}")
log_debug(f"Streaming step {i + 1}/{self._get_step_count()}: {step_name}")
# Track current step for cancellation handler
current_step_name = step_name
current_step = step
# Reset partial data for this step
partial_step_content = ""
# Create enhanced StepInput
step_input = self._create_step_input(
execution_input=execution_input,
previous_step_outputs=previous_step_outputs,
shared_images=shared_images,
shared_videos=shared_videos,
shared_audio=shared_audio,
shared_files=shared_files,
)
# Execute step with streaming and yield all events
for event in step.execute_stream( # type: ignore[union-attr]
step_input,
session_id=session.session_id,
user_id=self.user_id,
stream_events=stream_events,
stream_executor_events=self.stream_executor_events,
workflow_run_response=workflow_run_response,
session_state=session_state,
step_index=i,
store_executor_outputs=self.store_executor_outputs,
workflow_session=session,
add_workflow_history_to_steps=self.add_workflow_history_to_steps
if self.add_workflow_history_to_steps
else None,
num_history_runs=self.num_history_runs,
):
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
# Accumulate partial data from streaming events
partial_step_content = self._accumulate_partial_step_data(event, partial_step_content) # type: ignore
# Handle events
if isinstance(event, StepOutput):
step_output = event
collected_step_outputs.append(step_output)
# Update the workflow-level previous_step_outputs dictionary
previous_step_outputs[step_name] = step_output
# Transform StepOutput to StepOutputEvent for consistent streaming interface
step_output_event = self._transform_step_output_to_event(
step_output, workflow_run_response, step_index=i
)
if step_output.stop:
logger.info(f"Early termination requested by step {step_name}")
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
# Only yield StepOutputEvent for function executors, not for agents/teams
if getattr(step, "executor_type", None) == "function":
yield step_output_event
# Break out of the step loop
early_termination = True
break
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
# Only yield StepOutputEvent for generator functions, not for agents/teams
if getattr(step, "executor_type", None) == "function":
yield step_output_event
elif isinstance(event, WorkflowRunOutputEvent): # type: ignore
# Enrich event with workflow context before yielding
enriched_event = self._enrich_event_with_workflow_context(
event, workflow_run_response, step_index=i, step=step
)
yield self._handle_event(enriched_event, workflow_run_response) # type: ignore
else:
# Enrich other events with workflow context before yielding
enriched_event = self._enrich_event_with_workflow_context(
event, workflow_run_response, step_index=i, step=step
)
if self.stream_executor_events:
yield self._handle_event(enriched_event, workflow_run_response) # type: ignore
# Break out of main step loop if early termination was requested
if "early_termination" in locals() and early_termination:
break
# Update the workflow_run_response with completion data
if collected_step_outputs:
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
last_output = cast(StepOutput, collected_step_outputs[-1])
# Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
if getattr(last_output, "steps", None):
_cur = last_output
while getattr(_cur, "steps", None):
_steps = _cur.steps or []
if not _steps:
break
_cur = _steps[-1]
workflow_run_response.content = _cur.content
else:
workflow_run_response.content = last_output.content
else:
workflow_run_response.content = "No steps executed"
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.images = output_images
workflow_run_response.videos = output_videos
workflow_run_response.audio = output_audio
workflow_run_response.status = RunStatus.completed
except (InputCheckError, OutputCheckError) as e:
log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
from agno.run.workflow import WorkflowErrorEvent
error_event = WorkflowErrorEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session.session_id,
error=str(e),
)
yield error_event
# Update workflow_run_response with error
workflow_run_response.content = error_event.error
workflow_run_response.status = RunStatus.error
except RunCancelledException as e:
# Handle run cancellation during streaming
logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming")
workflow_run_response.status = RunStatus.cancelled
workflow_run_response.content = str(e)
# Capture partial progress from the step that was cancelled mid-stream
if partial_step_content:
logger.info(
f"Step with name '{current_step_name}' was cancelled. Setting its partial progress as step output."
)
partial_step_output = StepOutput(
step_name=current_step_name,
step_id=getattr(current_step, "step_id", None) if current_step else None,
step_type=StepType.STEP,
executor_type=getattr(current_step, "executor_type", None) if current_step else None,
executor_name=getattr(current_step, "executor_name", None) if current_step else None,
content=partial_step_content,
success=False,
error="Cancelled during execution",
)
collected_step_outputs.append(partial_step_output)
# Preserve all progress (completed steps + partial step) before cancellation
if collected_step_outputs:
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
cancelled_event = WorkflowCancelledEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session.session_id,
reason=str(e),
)
yield self._handle_event(cancelled_event, workflow_run_response)
except Exception as e:
logger.error(f"Workflow execution failed: {e}")
from agno.run.workflow import WorkflowErrorEvent
error_event = WorkflowErrorEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session.session_id,
error=str(e),
)
yield error_event
# Update workflow_run_response with error
workflow_run_response.content = error_event.error
workflow_run_response.status = RunStatus.error
raise e
# Yield workflow completed event
workflow_completed_event = WorkflowCompletedEvent(
run_id=workflow_run_response.run_id or "",
content=workflow_run_response.content,
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
step_results=workflow_run_response.step_results, # type: ignore
metadata=workflow_run_response.metadata,
)
yield self._handle_event(workflow_completed_event, workflow_run_response)
# Store the completed workflow response
self._update_session_metrics(session=session, workflow_run_response=workflow_run_response)
session.upsert_run(run=workflow_run_response)
self.save_session(session=session)
# Always clean up the run tracking
cleanup_run(workflow_run_response.run_id) # type: ignore
# Log Workflow Telemetry
if self.telemetry:
self._log_workflow_telemetry(session_id=session.session_id, run_id=workflow_run_response.run_id)
async def _acall_custom_function(
self, func: Callable, execution_input: WorkflowExecutionInput, **kwargs: Any
) -> Any:
"""Call custom function with only the parameters it expects - handles both async functions and async generators"""
from inspect import isasyncgenfunction, signature
sig = signature(func)
# Build arguments based on what the function actually accepts
call_kwargs: Dict[str, Any] = {}
# Only add workflow and execution_input if the function expects them
if "workflow" in sig.parameters: # type: ignore
call_kwargs["workflow"] = self
if "execution_input" in sig.parameters:
call_kwargs["execution_input"] = execution_input # type: ignore
if "session_state" in sig.parameters:
call_kwargs["session_state"] = self.session_state # type: ignore
# Add any other kwargs that the function expects
for param_name in kwargs:
if param_name in sig.parameters: # type: ignore
call_kwargs[param_name] = kwargs[param_name]
# If function has **kwargs parameter, pass all remaining kwargs
for param in sig.parameters.values(): # type: ignore
if param.kind == param.VAR_KEYWORD:
call_kwargs.update(kwargs)
break
try:
# Check if it's an async generator function
if isasyncgenfunction(func):
# For async generators, call the function and return the async generator directly
return func(**call_kwargs) # type: ignore
else:
# For regular async functions, await the result
return await func(**call_kwargs) # type: ignore
except TypeError as e:
# If signature inspection fails, fall back to original method
logger.warning(
f"Async function signature inspection failed: {e}. Falling back to original calling convention."
)
if isasyncgenfunction(func):
# For async generators, use the same signature inspection logic in fallback
return func(**call_kwargs) # type: ignore
else:
# For regular async functions, use the same signature inspection logic in fallback
return await func(**call_kwargs) # type: ignore
async def _aload_or_create_session(
self, session_id: str, user_id: Optional[str], session_state: Optional[Dict[str, Any]]
) -> Tuple[WorkflowSession, Dict[str, Any]]:
"""Load or create session from database, update metadata, and prepare session state.
Returns:
Tuple of (workflow_session, prepared_session_state)
"""
# Read existing session from database
if self._has_async_db():
workflow_session = await self.aread_or_create_session(session_id=session_id, user_id=user_id)
else:
workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id)
self._update_metadata(session=workflow_session)
# Update session state from DB
_session_state = session_state or {}
_session_state = self._load_session_state(session=workflow_session, session_state=_session_state)
return workflow_session, _session_state
async def _aexecute(
self,
session_id: str,
user_id: Optional[str],
execution_input: WorkflowExecutionInput,
workflow_run_response: WorkflowRunOutput,
session_state: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> WorkflowRunOutput:
"""Execute a specific pipeline by name asynchronously"""
from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction
# Read existing session from database
workflow_session, session_state = await self._aload_or_create_session(
session_id=session_id, user_id=user_id, session_state=session_state
)
workflow_run_response.status = RunStatus.running
# Register run for cancellation tracking
if workflow_run_response.run_id:
register_run(workflow_run_response.run_id) # type: ignore
if callable(self.steps):
# Execute the workflow with the custom executor
content = ""
if iscoroutinefunction(self.steps): # type: ignore
workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs)
elif isgeneratorfunction(self.steps):
for chunk in self.steps(self, execution_input, **kwargs): # type: ignore[arg-type]
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
else:
content += str(chunk)
workflow_run_response.content = content
elif isasyncgenfunction(self.steps): # type: ignore
async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs)
async for chunk in async_gen:
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
else:
content += str(chunk)
workflow_run_response.content = content
else:
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
workflow_run_response.content = self._call_custom_function(self.steps, execution_input, **kwargs)
workflow_run_response.status = RunStatus.completed
else:
try:
# Track outputs from each step for enhanced data flow
collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
previous_step_outputs: Dict[str, StepOutput] = {}
shared_images: List[Image] = execution_input.images or []
output_images: List[Image] = (execution_input.images or []).copy() # Start with input images
shared_videos: List[Video] = execution_input.videos or []
output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos
shared_audio: List[Audio] = execution_input.audio or []
output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio
shared_files: List[File] = execution_input.files or []
output_files: List[File] = (execution_input.files or []).copy() # Start with input files
for i, step in enumerate(self.steps): # type: ignore[arg-type]
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
step_name = getattr(step, "name", f"step_{i + 1}")
log_debug(f"Async Executing step {i + 1}/{self._get_step_count()}: {step_name}")
# Create enhanced StepInput
step_input = self._create_step_input(
execution_input=execution_input,
previous_step_outputs=previous_step_outputs,
shared_images=shared_images,
shared_videos=shared_videos,
shared_audio=shared_audio,
shared_files=shared_files,
)
# Check for cancellation before executing step
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
step_output = await step.aexecute( # type: ignore[union-attr]
step_input,
session_id=session_id,
user_id=self.user_id,
workflow_run_response=workflow_run_response,
session_state=session_state,
store_executor_outputs=self.store_executor_outputs,
workflow_session=workflow_session,
add_workflow_history_to_steps=self.add_workflow_history_to_steps
if self.add_workflow_history_to_steps
else None,
num_history_runs=self.num_history_runs,
)
# Check for cancellation after step execution
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
# Update the workflow-level previous_step_outputs dictionary
previous_step_outputs[step_name] = step_output
collected_step_outputs.append(step_output)
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
if step_output.stop:
logger.info(f"Early termination requested by step {step_name}")
break
# Update the workflow_run_response with completion data
if collected_step_outputs:
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
last_output = cast(StepOutput, collected_step_outputs[-1])
# Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
if getattr(last_output, "steps", None):
_cur = last_output
while getattr(_cur, "steps", None):
_steps = _cur.steps or []
if not _steps:
break
_cur = _steps[-1]
workflow_run_response.content = _cur.content
else:
workflow_run_response.content = last_output.content
else:
workflow_run_response.content = "No steps executed"
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.images = output_images
workflow_run_response.videos = output_videos
workflow_run_response.audio = output_audio
workflow_run_response.status = RunStatus.completed
except (InputCheckError, OutputCheckError) as e:
log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
# Store error response
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Validation failed: {str(e)} | Check: {e.check_trigger}"
raise e
except RunCancelledException as e:
logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled")
workflow_run_response.status = RunStatus.cancelled
workflow_run_response.content = str(e)
except Exception as e:
logger.error(f"Workflow execution failed: {e}")
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Workflow execution failed: {e}"
raise e
self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response)
workflow_session.upsert_run(run=workflow_run_response)
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Always clean up the run tracking
cleanup_run(workflow_run_response.run_id) # type: ignore
# Log Workflow Telemetry
if self.telemetry:
await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id)
return workflow_run_response
async def _aexecute_stream(
self,
session_id: str,
user_id: Optional[str],
execution_input: WorkflowExecutionInput,
workflow_run_response: WorkflowRunOutput,
session_state: Optional[Dict[str, Any]] = None,
stream_events: bool = False,
websocket_handler: Optional[WebSocketHandler] = None,
**kwargs: Any,
) -> AsyncIterator[WorkflowRunOutputEvent]:
"""Execute a specific pipeline by name with event streaming"""
from inspect import isasyncgenfunction, iscoroutinefunction, isgeneratorfunction
# Read existing session from database
workflow_session, session_state = await self._aload_or_create_session(
session_id=session_id, user_id=user_id, session_state=session_state
)
workflow_run_response.status = RunStatus.running
# Register run for cancellation tracking
if workflow_run_response.run_id:
register_run(workflow_run_response.run_id)
workflow_started_event = WorkflowStartedEvent(
run_id=workflow_run_response.run_id or "",
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
)
yield self._handle_event(workflow_started_event, workflow_run_response, websocket_handler=websocket_handler)
if callable(self.steps):
if iscoroutinefunction(self.steps): # type: ignore
workflow_run_response.content = await self._acall_custom_function(self.steps, execution_input, **kwargs)
elif isgeneratorfunction(self.steps):
content = ""
for chunk in self.steps(self, execution_input, **kwargs): # type: ignore[arg-type]
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
yield chunk
else:
content += str(chunk)
workflow_run_response.content = content
elif isasyncgenfunction(self.steps): # type: ignore
content = ""
async_gen = await self._acall_custom_function(self.steps, execution_input, **kwargs)
async for chunk in async_gen:
raise_if_cancelled(workflow_run_response.run_id) # type: ignore
if hasattr(chunk, "content") and chunk.content is not None and isinstance(chunk.content, str):
content += chunk.content
yield chunk
else:
content += str(chunk)
workflow_run_response.content = content
else:
workflow_run_response.content = self.steps(self, execution_input, **kwargs)
workflow_run_response.status = RunStatus.completed
else:
try:
# Track outputs from each step for enhanced data flow
collected_step_outputs: List[Union[StepOutput, List[StepOutput]]] = []
previous_step_outputs: Dict[str, StepOutput] = {}
shared_images: List[Image] = execution_input.images or []
output_images: List[Image] = (execution_input.images or []).copy() # Start with input images
shared_videos: List[Video] = execution_input.videos or []
output_videos: List[Video] = (execution_input.videos or []).copy() # Start with input videos
shared_audio: List[Audio] = execution_input.audio or []
output_audio: List[Audio] = (execution_input.audio or []).copy() # Start with input audio
shared_files: List[File] = execution_input.files or []
output_files: List[File] = (execution_input.files or []).copy() # Start with input files
early_termination = False
# Track partial step data in case of cancellation
current_step_name = ""
current_step = None
partial_step_content = ""
for i, step in enumerate(self.steps): # type: ignore[arg-type]
if workflow_run_response.run_id:
raise_if_cancelled(workflow_run_response.run_id)
step_name = getattr(step, "name", f"step_{i + 1}")
log_debug(f"Async streaming step {i + 1}/{self._get_step_count()}: {step_name}")
current_step_name = step_name
current_step = step
# Reset partial data for this step
partial_step_content = ""
# Create enhanced StepInput
step_input = self._create_step_input(
execution_input=execution_input,
previous_step_outputs=previous_step_outputs,
shared_images=shared_images,
shared_videos=shared_videos,
shared_audio=shared_audio,
shared_files=shared_files,
)
# Execute step with streaming and yield all events
async for event in step.aexecute_stream( # type: ignore[union-attr]
step_input,
session_id=session_id,
user_id=self.user_id,
stream_events=stream_events,
stream_executor_events=self.stream_executor_events,
workflow_run_response=workflow_run_response,
session_state=session_state,
step_index=i,
store_executor_outputs=self.store_executor_outputs,
workflow_session=workflow_session,
add_workflow_history_to_steps=self.add_workflow_history_to_steps
if self.add_workflow_history_to_steps
else None,
num_history_runs=self.num_history_runs,
):
if workflow_run_response.run_id:
raise_if_cancelled(workflow_run_response.run_id)
# Accumulate partial data from streaming events
partial_step_content = self._accumulate_partial_step_data(event, partial_step_content) # type: ignore
if isinstance(event, StepOutput):
step_output = event
collected_step_outputs.append(step_output)
# Update the workflow-level previous_step_outputs dictionary
previous_step_outputs[step_name] = step_output
# Transform StepOutput to StepOutputEvent for consistent streaming interface
step_output_event = self._transform_step_output_to_event(
step_output, workflow_run_response, step_index=i
)
if step_output.stop:
logger.info(f"Early termination requested by step {step_name}")
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
if getattr(step, "executor_type", None) == "function":
yield step_output_event
# Break out of the step loop
early_termination = True
break
# Update shared media for next step
shared_images.extend(step_output.images or [])
shared_videos.extend(step_output.videos or [])
shared_audio.extend(step_output.audio or [])
shared_files.extend(step_output.files or [])
output_images.extend(step_output.images or [])
output_videos.extend(step_output.videos or [])
output_audio.extend(step_output.audio or [])
output_files.extend(step_output.files or [])
# Only yield StepOutputEvent for generator functions, not for agents/teams
if getattr(step, "executor_type", None) == "function":
yield step_output_event
elif isinstance(event, WorkflowRunOutputEvent): # type: ignore
# Enrich event with workflow context before yielding
enriched_event = self._enrich_event_with_workflow_context(
event, workflow_run_response, step_index=i, step=step
)
yield self._handle_event(
enriched_event, workflow_run_response, websocket_handler=websocket_handler
) # type: ignore
else:
# Enrich other events with workflow context before yielding
enriched_event = self._enrich_event_with_workflow_context(
event, workflow_run_response, step_index=i, step=step
)
if self.stream_executor_events:
yield self._handle_event(
enriched_event, workflow_run_response, websocket_handler=websocket_handler
) # type: ignore
# Break out of main step loop if early termination was requested
if "early_termination" in locals() and early_termination:
break
# Update the workflow_run_response with completion data
if collected_step_outputs:
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
last_output = cast(StepOutput, collected_step_outputs[-1])
# Use deepest nested content if this is a container (Steps/Router/Loop/etc.)
if getattr(last_output, "steps", None):
_cur = last_output
while getattr(_cur, "steps", None):
_steps = _cur.steps or []
if not _steps:
break
_cur = _steps[-1]
workflow_run_response.content = _cur.content
else:
workflow_run_response.content = last_output.content
else:
workflow_run_response.content = "No steps executed"
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.images = output_images
workflow_run_response.videos = output_videos
workflow_run_response.audio = output_audio
workflow_run_response.status = RunStatus.completed
except (InputCheckError, OutputCheckError) as e:
log_error(f"Validation failed: {str(e)} | Check: {e.check_trigger}")
from agno.run.workflow import WorkflowErrorEvent
error_event = WorkflowErrorEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session_id,
error=str(e),
)
yield error_event
# Update workflow_run_response with error
workflow_run_response.content = error_event.error
workflow_run_response.status = RunStatus.error
except RunCancelledException as e:
# Handle run cancellation during streaming
logger.info(f"Workflow run {workflow_run_response.run_id} was cancelled during streaming")
workflow_run_response.status = RunStatus.cancelled
workflow_run_response.content = str(e)
# Capture partial progress from the step that was cancelled mid-stream
if partial_step_content:
logger.info(
f"Step with name '{current_step_name}' was cancelled. Setting its partial progress as step output."
)
partial_step_output = StepOutput(
step_name=current_step_name,
step_id=getattr(current_step, "step_id", None) if current_step else None,
step_type=StepType.STEP,
executor_type=getattr(current_step, "executor_type", None) if current_step else None,
executor_name=getattr(current_step, "executor_name", None) if current_step else None,
content=partial_step_content,
success=False,
error="Cancelled during execution",
)
collected_step_outputs.append(partial_step_output)
# Preserve all progress (completed steps + partial step) before cancellation
if collected_step_outputs:
workflow_run_response.step_results = collected_step_outputs
workflow_run_response.metrics = self._aggregate_workflow_metrics(collected_step_outputs)
cancelled_event = WorkflowCancelledEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session_id,
reason=str(e),
)
yield self._handle_event(
cancelled_event,
workflow_run_response,
websocket_handler=websocket_handler,
)
except Exception as e:
logger.error(f"Workflow execution failed: {e}")
from agno.run.workflow import WorkflowErrorEvent
error_event = WorkflowErrorEvent(
run_id=workflow_run_response.run_id or "",
workflow_id=self.id,
workflow_name=self.name,
session_id=session_id,
error=str(e),
)
yield error_event
# Update workflow_run_response with error
workflow_run_response.content = error_event.error
workflow_run_response.status = RunStatus.error
raise e
# Yield workflow completed event
workflow_completed_event = WorkflowCompletedEvent(
run_id=workflow_run_response.run_id or "",
content=workflow_run_response.content,
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
step_results=workflow_run_response.step_results, # type: ignore[arg-type]
metadata=workflow_run_response.metadata,
)
yield self._handle_event(workflow_completed_event, workflow_run_response, websocket_handler=websocket_handler)
# Store the completed workflow response
self._update_session_metrics(session=workflow_session, workflow_run_response=workflow_run_response)
workflow_session.upsert_run(run=workflow_run_response)
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Log Workflow Telemetry
if self.telemetry:
await self._alog_workflow_telemetry(session_id=session_id, run_id=workflow_run_response.run_id)
# Always clean up the run tracking
cleanup_run(workflow_run_response.run_id) # type: ignore
async def _arun_background(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
**kwargs: Any,
) -> WorkflowRunOutput:
"""Execute workflow in background using asyncio.create_task()"""
run_id = str(uuid4())
self.initialize_workflow()
session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)
# Read existing session from database
workflow_session, session_state = await self._aload_or_create_session(
session_id=session_id, user_id=user_id, session_state=session_state
)
self._prepare_steps()
# Create workflow run response with PENDING status
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=input,
session_id=session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
status=RunStatus.pending,
)
# Store PENDING response immediately
workflow_session.upsert_run(run=workflow_run_response)
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Prepare execution input
inputs = WorkflowExecutionInput(
input=input,
additional_data=additional_data,
audio=audio, # type: ignore
images=images, # type: ignore
videos=videos, # type: ignore
files=files, # type: ignore
)
self.update_agents_and_teams_session_info()
async def execute_workflow_background():
"""Simple background execution"""
try:
# Update status to RUNNING and save
workflow_run_response.status = RunStatus.running
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
if self.agent is not None:
self._aexecute_workflow_agent(
user_input=input, # type: ignore
session_id=session_id,
user_id=user_id,
execution_input=inputs,
session_state=session_state,
stream=False,
**kwargs,
)
else:
await self._aexecute(
session_id=session_id,
user_id=user_id,
execution_input=inputs,
workflow_run_response=workflow_run_response,
session_state=session_state,
**kwargs,
)
log_debug(f"Background execution completed with status: {workflow_run_response.status}")
except Exception as e:
logger.error(f"Background workflow execution failed: {e}")
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Background execution failed: {str(e)}"
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Create and start asyncio task
loop = asyncio.get_running_loop()
loop.create_task(execute_workflow_background())
# Return SAME object that will be updated by background execution
return workflow_run_response
async def _arun_background_stream(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream_events: bool = False,
websocket_handler: Optional[WebSocketHandler] = None,
**kwargs: Any,
) -> WorkflowRunOutput:
"""Execute workflow in background with streaming and WebSocket broadcasting"""
run_id = str(uuid4())
self.initialize_workflow()
session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)
# Read existing session from database
workflow_session, session_state = await self._aload_or_create_session(
session_id=session_id, user_id=user_id, session_state=session_state
)
self._prepare_steps()
# Create workflow run response with PENDING status
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=input,
session_id=session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
status=RunStatus.pending,
)
# Prepare execution input
inputs = WorkflowExecutionInput(
input=input,
additional_data=additional_data,
audio=audio, # type: ignore
images=images, # type: ignore
videos=videos, # type: ignore
files=files, # type: ignore
)
self.update_agents_and_teams_session_info()
async def execute_workflow_background_stream():
"""Background execution with streaming and WebSocket broadcasting"""
try:
if self.agent is not None:
result = self._aexecute_workflow_agent(
user_input=input, # type: ignore
session_id=session_id,
user_id=user_id,
execution_input=inputs,
session_state=session_state,
stream=True,
websocket_handler=websocket_handler,
**kwargs,
)
# For streaming, result is an async iterator
async for event in result: # type: ignore
# Events are automatically broadcast by _handle_event in the agent execution
# We just consume them here to drive the execution
pass
log_debug(
f"Background streaming execution (workflow agent) completed with status: {workflow_run_response.status}"
)
else:
# Update status to RUNNING and save
workflow_run_response.status = RunStatus.running
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Execute with streaming - consume all events (they're auto-broadcast via _handle_event)
async for event in self._aexecute_stream(
session_id=session_id,
user_id=user_id,
execution_input=inputs,
workflow_run_response=workflow_run_response,
stream_events=stream_events,
session_state=session_state,
websocket_handler=websocket_handler,
**kwargs,
):
# Events are automatically broadcast by _handle_event
# We just consume them here to drive the execution
pass
log_debug(f"Background streaming execution completed with status: {workflow_run_response.status}")
except Exception as e:
logger.error(f"Background streaming workflow execution failed: {e}")
workflow_run_response.status = RunStatus.error
workflow_run_response.content = f"Background streaming execution failed: {str(e)}"
if self._has_async_db():
await self.asave_session(session=workflow_session)
else:
self.save_session(session=workflow_session)
# Create and start asyncio task for background streaming execution
loop = asyncio.get_running_loop()
loop.create_task(execute_workflow_background_stream())
# Return SAME object that will be updated by background execution
return workflow_run_response
async def aget_run(self, run_id: str) -> Optional[WorkflowRunOutput]:
"""Get the status and details of a background workflow run - SIMPLIFIED"""
if self.db is not None and self.session_id is not None:
session = await self.db.aget_session(session_id=self.session_id, session_type=SessionType.WORKFLOW) # type: ignore
if session and isinstance(session, WorkflowSession) and session.runs:
# Find the run by ID
for run in session.runs:
if run.run_id == run_id:
return run
return None
def get_run(self, run_id: str) -> Optional[WorkflowRunOutput]:
"""Get the status and details of a background workflow run - SIMPLIFIED"""
if self.db is not None and self.session_id is not None:
session = self.db.get_session(session_id=self.session_id, session_type=SessionType.WORKFLOW)
if session and isinstance(session, WorkflowSession) and session.runs:
# Find the run by ID
for run in session.runs:
if run.run_id == run_id:
return run
return None
def _initialize_workflow_agent(
self,
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
) -> None:
"""Initialize the workflow agent with tools (but NOT context - that's passed per-run)"""
from agno.tools.function import Function
workflow_tool_func = self.agent.create_workflow_tool( # type: ignore
workflow=self,
session=session,
execution_input=execution_input,
session_state=session_state,
stream=stream,
)
workflow_tool = Function.from_callable(workflow_tool_func)
self.agent.tools = [workflow_tool] # type: ignore
log_debug("Workflow agent initialized with run_workflow tool")
def _get_workflow_agent_dependencies(self, session: WorkflowSession) -> Dict[str, Any]:
"""Build dependencies dict with workflow context to pass to agent.run()"""
# Get configuration from the WorkflowAgent instance
add_history = True
num_runs = 5
if self.agent and isinstance(self.agent, WorkflowAgent):
add_history = self.agent.add_workflow_history
num_runs = self.agent.num_history_runs
if add_history:
history_context = (
session.get_workflow_history_context(num_runs=num_runs) or "No previous workflow runs in this session."
)
else:
history_context = "No workflow history available."
# Build workflow context with description and history
workflow_context = ""
if self.description:
workflow_context += f"Workflow Description: {self.description}\n\n"
workflow_context += history_context
return {
"workflow_context": workflow_context,
}
def _execute_workflow_agent(
self,
user_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
**kwargs: Any,
) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]:
"""
Execute the workflow agent in streaming or non-streaming mode.
The agent decides whether to run the workflow or answer directly from history.
Args:
user_input: The user's input
session: The workflow session
execution_input: The execution input
session_state: The session state
stream: Whether to stream the response
stream_intermediate_steps: Whether to stream intermediate steps
Returns:
WorkflowRunOutput if stream=False, Iterator[WorkflowRunOutputEvent] if stream=True
"""
if stream:
return self._execute_workflow_agent_streaming(
agent_input=user_input,
session=session,
execution_input=execution_input,
session_state=session_state,
stream=stream,
**kwargs,
)
else:
return self._execute_workflow_agent_non_streaming(
agent_input=user_input,
session=session,
execution_input=execution_input,
session_state=session_state,
stream=stream,
)
def _execute_workflow_agent_streaming(
self,
agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
**kwargs: Any,
) -> Iterator[WorkflowRunOutputEvent]:
"""
Execute the workflow agent in streaming mode.
The agent's tool (run_workflow) is a generator that yields workflow events directly.
These events bubble up through the agent's streaming and are yielded here.
We filter to only yield WorkflowRunOutputEvent to the CLI.
Yields:
WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered)
"""
from typing import get_args
from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent
# Initialize agent with stream_intermediate_steps=True so tool yields events
self._initialize_workflow_agent(session, execution_input, session_state, stream=stream)
# Build dependencies with workflow context
dependencies = self._get_workflow_agent_dependencies(session)
# Run agent with streaming - workflow events will bubble up from the tool
agent_response: Optional[RunOutput] = None
workflow_executed = False
from agno.run.agent import RunContentEvent
from agno.run.team import RunContentEvent as TeamRunContentEvent
log_debug(f"Executing workflow agent with streaming - input: {agent_input}...")
# Run the agent in streaming mode and yield all events
for event in self.agent.run( # type: ignore[union-attr]
input=agent_input,
stream=True,
stream_intermediate_steps=True,
yield_run_response=True,
dependencies=dependencies, # Pass context dynamically per-run
): # type: ignore
if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))):
yield event # type: ignore[misc]
# Track if workflow was executed by checking for WorkflowCompletedEvent
if isinstance(event, WorkflowCompletedEvent):
workflow_executed = True
elif isinstance(event, (RunContentEvent, TeamRunContentEvent)):
if event.step_name is None:
# This is from the workflow agent itself
# Enrich with metadata to mark it as a workflow agent event
event.is_workflow_agent = True # type: ignore
yield event # type: ignore[misc]
# Capture the final RunOutput (but don't yield it)
if isinstance(event, RunOutput):
agent_response = event
# Handle direct answer case (no workflow execution)
if not workflow_executed:
# Create a new workflow run output for the direct answer
run_id = str(uuid4())
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content=agent_response.content if agent_response else "",
status=RunStatus.completed,
workflow_agent_run=agent_response,
)
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = workflow_run_response.run_id
agent_response.workflow_id = workflow_run_response.workflow_id
# Update the run in session
session.upsert_run(run=workflow_run_response)
# Save session
self.save_session(session=session)
log_debug(f"Agent decision: workflow_executed={workflow_executed}")
# Yield a workflow completed event with the agent's direct response
completed_event = WorkflowCompletedEvent(
run_id=workflow_run_response.run_id or "",
content=workflow_run_response.content,
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
step_results=[],
metadata={"agent_direct_response": True},
)
yield completed_event
else:
# Workflow was executed by the tool
reloaded_session = self.get_session(session_id=session.session_id)
if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
# Get the last run (which is the one just created by the tool)
last_run = reloaded_session.runs[-1]
# Update the last run with workflow_agent_run
last_run.workflow_agent_run = agent_response
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = last_run.run_id
agent_response.workflow_id = last_run.workflow_id
# Save the reloaded session (which has the updated run)
self.save_session(session=reloaded_session)
else:
log_warning("Could not reload session or no runs found after workflow execution")
def _execute_workflow_agent_non_streaming(
self,
agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
) -> WorkflowRunOutput:
"""
Execute the workflow agent in non-streaming mode.
The agent decides whether to run the workflow or answer directly from history.
Returns:
WorkflowRunOutput: The workflow run output with agent response
"""
# Initialize the agent
self._initialize_workflow_agent(session, execution_input, session_state, stream=stream)
# Build dependencies with workflow context
dependencies = self._get_workflow_agent_dependencies(session)
# Run the agent
agent_response: RunOutput = self.agent.run( # type: ignore[union-attr]
input=agent_input,
dependencies=dependencies,
stream=stream,
) # type: ignore
# Check if the agent called the workflow tool
workflow_executed = False
if agent_response.messages:
for message in agent_response.messages:
if message.role == "assistant" and message.tool_calls:
# Check if the tool call is specifically for run_workflow
for tool_call in message.tool_calls:
# Handle both dict and object formats
if isinstance(tool_call, dict):
tool_name = tool_call.get("function", {}).get("name", "")
else:
tool_name = tool_call.function.name if hasattr(tool_call, "function") else ""
if tool_name == "run_workflow":
workflow_executed = True
break
if workflow_executed:
break
log_debug(f"Workflow agent execution complete. Workflow executed: {workflow_executed}")
# Handle direct answer case (no workflow execution)
if not workflow_executed:
# Create a new workflow run output for the direct answer
run_id = str(uuid4())
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content=agent_response.content,
status=RunStatus.completed,
workflow_agent_run=agent_response,
)
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = workflow_run_response.run_id
agent_response.workflow_id = workflow_run_response.workflow_id
# Update the run in session
session.upsert_run(run=workflow_run_response)
self.save_session(session=session)
log_debug(f"Agent decision: workflow_executed={workflow_executed}")
return workflow_run_response
else:
# Workflow was executed by the tool
reloaded_session = self.get_session(session_id=session.session_id)
if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
# Get the last run (which is the one just created by the tool)
last_run = reloaded_session.runs[-1]
# Update the last run directly with workflow_agent_run
last_run.workflow_agent_run = agent_response
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = last_run.run_id
agent_response.workflow_id = last_run.workflow_id
# Save the reloaded session (which has the updated run)
self.save_session(session=reloaded_session)
# Return the last run directly (WRO2 from inner workflow)
return last_run
else:
log_warning("Could not reload session or no runs found after workflow execution")
# Return a placeholder error response
return WorkflowRunOutput(
run_id=str(uuid4()),
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content="Error: Workflow execution failed",
status=RunStatus.error,
)
def _async_initialize_workflow_agent(
self,
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
websocket_handler: Optional[WebSocketHandler] = None,
stream: bool = False,
) -> None:
"""Initialize the workflow agent with async tools (but NOT context - that's passed per-run)"""
from agno.tools.function import Function
workflow_tool_func = self.agent.async_create_workflow_tool( # type: ignore
workflow=self,
session=session,
execution_input=execution_input,
session_state=session_state,
stream=stream,
websocket_handler=websocket_handler,
)
workflow_tool = Function.from_callable(workflow_tool_func)
self.agent.tools = [workflow_tool] # type: ignore
log_debug("Workflow agent initialized with async run_workflow tool")
async def _aload_session_for_workflow_agent(
self,
session_id: str,
user_id: Optional[str],
session_state: Optional[Dict[str, Any]],
) -> Tuple[WorkflowSession, Dict[str, Any]]:
"""Helper to load or create session for workflow agent execution"""
return await self._aload_or_create_session(
session_id=session_id, user_id=user_id, session_state=session_state
)
def _aexecute_workflow_agent(
self,
user_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session_id: str,
user_id: Optional[str],
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
websocket_handler: Optional[WebSocketHandler] = None,
**kwargs: Any,
):
"""
Execute the workflow agent asynchronously in streaming or non-streaming mode.
The agent decides whether to run the workflow or answer directly from history.
Args:
user_input: The user's input
session_id: The workflow session ID
user_id: The user ID
execution_input: The execution input
session_state: The session state
stream: Whether to stream the response
websocket_handler: The WebSocket handler
Returns:
Coroutine[WorkflowRunOutput] if stream=False, AsyncIterator[WorkflowRunOutputEvent] if stream=True
"""
if stream:
async def _stream():
session, session_state_loaded = await self._aload_session_for_workflow_agent(
session_id, user_id, session_state
)
async for event in self._aexecute_workflow_agent_streaming(
agent_input=user_input,
session=session,
execution_input=execution_input,
session_state=session_state_loaded,
stream=stream,
websocket_handler=websocket_handler,
**kwargs,
):
yield event
return _stream()
else:
async def _execute():
session, session_state_loaded = await self._aload_session_for_workflow_agent(
session_id, user_id, session_state
)
return await self._aexecute_workflow_agent_non_streaming(
agent_input=user_input,
session=session,
execution_input=execution_input,
session_state=session_state_loaded,
stream=stream,
)
return _execute()
async def _aexecute_workflow_agent_streaming(
self,
agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
websocket_handler: Optional[WebSocketHandler] = None,
**kwargs: Any,
) -> AsyncIterator[WorkflowRunOutputEvent]:
"""
Execute the workflow agent asynchronously in streaming mode.
The agent's tool (run_workflow) is an async generator that yields workflow events directly.
These events bubble up through the agent's streaming and are yielded here.
We filter to only yield WorkflowRunOutputEvent to the CLI.
Yields:
WorkflowRunOutputEvent: Events from workflow execution (agent events are filtered)
"""
from typing import get_args
from agno.run.workflow import WorkflowCompletedEvent, WorkflowRunOutputEvent
logger.info("Workflow agent enabled - async streaming mode")
log_debug(f"User input: {agent_input}")
self._async_initialize_workflow_agent(
session, execution_input, session_state, stream=stream, websocket_handler=websocket_handler
)
dependencies = self._get_workflow_agent_dependencies(session)
agent_response: Optional[RunOutput] = None
workflow_executed = False
from agno.run.agent import RunContentEvent
from agno.run.team import RunContentEvent as TeamRunContentEvent
log_debug(f"Executing async workflow agent with streaming - input: {agent_input}...")
# Run the agent in streaming mode and yield all events
async for event in self.agent.arun( # type: ignore[union-attr]
input=agent_input,
stream=True,
stream_intermediate_steps=True,
yield_run_response=True,
dependencies=dependencies, # Pass context dynamically per-run
): # type: ignore
if isinstance(event, tuple(get_args(WorkflowRunOutputEvent))):
yield event # type: ignore[misc]
if isinstance(event, WorkflowCompletedEvent):
workflow_executed = True
log_debug("Workflow execution detected via WorkflowCompletedEvent")
elif isinstance(event, (RunContentEvent, TeamRunContentEvent)):
if event.step_name is None and isinstance(event, (RunStartedEvent, RunContentEvent, RunCompletedEvent)):
# This is from the workflow agent itself
# Enrich with metadata to mark it as a workflow agent event
event.is_workflow_agent = True # type: ignore
# Broadcast to WebSocket if available (async context only)
if websocket_handler:
try:
loop = asyncio.get_running_loop()
if loop:
asyncio.create_task(websocket_handler.handle_event(event))
except RuntimeError:
pass
yield event # type: ignore[misc]
# Capture the final RunOutput (but don't yield it)
if isinstance(event, RunOutput):
agent_response = event
log_debug(
f"Agent response: {str(agent_response.content)[:100] if agent_response.content else 'None'}..."
)
# Handle direct answer case (no workflow execution)
if not workflow_executed:
# Create a new workflow run output for the direct answer
run_id = str(uuid4())
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content=agent_response.content if agent_response else "",
status=RunStatus.completed,
workflow_agent_run=agent_response,
)
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = workflow_run_response.run_id
agent_response.workflow_id = workflow_run_response.workflow_id
# Update the run in session
session.upsert_run(run=workflow_run_response)
# Save session
self.save_session(session=session)
# Yield a workflow completed event with the agent's direct response
completed_event = WorkflowCompletedEvent(
run_id=workflow_run_response.run_id or "",
content=workflow_run_response.content,
workflow_name=workflow_run_response.workflow_name,
workflow_id=workflow_run_response.workflow_id,
session_id=workflow_run_response.session_id,
step_results=[],
metadata={"agent_direct_response": True},
)
yield completed_event
else:
# Workflow was executed by the tool
reloaded_session = self.get_session(session_id=session.session_id)
if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
# Get the last run (which is the one just created by the tool)
last_run = reloaded_session.runs[-1]
# Update the last run with workflow_agent_run
last_run.workflow_agent_run = agent_response
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = last_run.run_id
agent_response.workflow_id = last_run.workflow_id
# Save the reloaded session (which has the updated run)
self.save_session(session=reloaded_session)
else:
log_warning("Could not reload session or no runs found after workflow execution")
async def _aexecute_workflow_agent_non_streaming(
self,
agent_input: Union[str, Dict[str, Any], List[Any], BaseModel],
session: WorkflowSession,
execution_input: WorkflowExecutionInput,
session_state: Optional[Dict[str, Any]],
stream: bool = False,
) -> WorkflowRunOutput:
"""
Execute the workflow agent asynchronously in non-streaming mode.
The agent decides whether to run the workflow or answer directly from history.
Returns:
WorkflowRunOutput: The workflow run output with agent response
"""
# Initialize the agent
self._async_initialize_workflow_agent(session, execution_input, session_state, stream=stream)
# Build dependencies with workflow context
dependencies = self._get_workflow_agent_dependencies(session)
# Run the agent
agent_response: RunOutput = await self.agent.arun( # type: ignore[union-attr]
input=agent_input,
dependencies=dependencies,
stream=stream,
) # type: ignore
# Check if the agent called the workflow tool
workflow_executed = False
if agent_response.messages:
for message in agent_response.messages:
if message.role == "assistant" and message.tool_calls:
# Check if the tool call is specifically for run_workflow
for tool_call in message.tool_calls:
# Handle both dict and object formats
if isinstance(tool_call, dict):
tool_name = tool_call.get("function", {}).get("name", "")
else:
tool_name = tool_call.function.name if hasattr(tool_call, "function") else ""
if tool_name == "run_workflow":
workflow_executed = True
break
if workflow_executed:
break
# Handle direct answer case (no workflow execution)
if not workflow_executed:
# Create a new workflow run output for the direct answer
run_id = str(uuid4())
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content=agent_response.content,
status=RunStatus.completed,
workflow_agent_run=agent_response,
)
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = workflow_run_response.run_id
agent_response.workflow_id = workflow_run_response.workflow_id
# Update the run in session
session.upsert_run(run=workflow_run_response)
self.save_session(session=session)
log_debug(f"Agent decision: workflow_executed={workflow_executed}")
return workflow_run_response
else:
# Workflow was executed by the tool
logger.info("=" * 80)
logger.info("WORKFLOW AGENT: Called run_workflow tool (async)")
logger.info(" ➜ Workflow was executed, retrieving results...")
logger.info("=" * 80)
log_debug("Reloading session from database to get the latest workflow run...")
reloaded_session = self.get_session(session_id=session.session_id)
if reloaded_session and reloaded_session.runs and len(reloaded_session.runs) > 0:
# Get the last run (which is the one just created by the tool)
last_run = reloaded_session.runs[-1]
log_debug(f"Retrieved latest workflow run: {last_run.run_id}")
log_debug(f"Total workflow runs in session: {len(reloaded_session.runs)}")
# Update the last run with workflow_agent_run
last_run.workflow_agent_run = agent_response
# Store the full agent RunOutput and establish parent-child relationship
if agent_response:
agent_response.parent_run_id = last_run.run_id
agent_response.workflow_id = last_run.workflow_id
# Save the reloaded session (which has the updated run)
self.save_session(session=reloaded_session)
log_debug(f"Agent decision: workflow_executed={workflow_executed}")
# Return the last run directly (WRO2 from inner workflow)
return last_run
else:
log_warning("Could not reload session or no runs found after workflow execution")
# Return a placeholder error response
return WorkflowRunOutput(
run_id=str(uuid4()),
input=execution_input.input,
session_id=session.session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
content="Error: Workflow execution failed",
status=RunStatus.error,
)
def cancel_run(self, run_id: str) -> bool:
"""Cancel a running workflow execution.
Args:
run_id (str): The run_id to cancel.
Returns:
bool: True if the run was found and marked for cancellation, False otherwise.
"""
return cancel_run_global(run_id)
@overload
def run(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Literal[False] = False,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
background: Optional[bool] = False,
) -> WorkflowRunOutput: ...
@overload
def run(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Literal[True] = True,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
background: Optional[bool] = False,
) -> Iterator[WorkflowRunOutputEvent]: ...
def run(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: bool = False,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
background: Optional[bool] = False,
**kwargs: Any,
) -> Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]:
"""Execute the workflow synchronously with optional streaming"""
if self._has_async_db():
raise Exception("`run()` is not supported with an async DB. Please use `arun()`.")
input = self._validate_input(input)
if background:
raise RuntimeError("Background execution is not supported for sync run()")
self._set_debug()
run_id = str(uuid4())
self.initialize_workflow()
session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)
# Read existing session from database
workflow_session = self.read_or_create_session(session_id=session_id, user_id=user_id)
self._update_metadata(session=workflow_session)
# Initialize session state
session_state = self._initialize_session_state(
session_state=session_state or {}, user_id=user_id, session_id=session_id, run_id=run_id
)
# Update session state from DB
session_state = self._load_session_state(session=workflow_session, session_state=session_state)
log_debug(f"Workflow Run Start: {self.name}", center=True)
# Use simple defaults
stream = stream or self.stream or False
stream_events = (stream_events or stream_intermediate_steps) or (
self.stream_events or self.stream_intermediate_steps
)
# Can't stream events if streaming is disabled
if stream is False:
stream_events = False
log_debug(f"Stream: {stream}")
log_debug(f"Total steps: {self._get_step_count()}")
# Prepare steps
self._prepare_steps()
inputs = WorkflowExecutionInput(
input=input,
additional_data=additional_data,
audio=audio, # type: ignore
images=images, # type: ignore
videos=videos, # type: ignore
files=files, # type: ignore
)
log_debug(
f"Created pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}"
)
self.update_agents_and_teams_session_info()
# Execute workflow agent if configured
if self.agent is not None:
return self._execute_workflow_agent(
user_input=input, # type: ignore
session=workflow_session,
execution_input=inputs,
session_state=session_state,
stream=stream,
**kwargs,
)
# Create workflow run response for regular workflow execution
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=input,
session_id=session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
)
if stream:
return self._execute_stream(
session=workflow_session,
execution_input=inputs, # type: ignore[arg-type]
workflow_run_response=workflow_run_response,
stream_events=stream_events,
session_state=session_state,
**kwargs,
)
else:
return self._execute(
session=workflow_session,
execution_input=inputs, # type: ignore[arg-type]
workflow_run_response=workflow_run_response,
session_state=session_state,
**kwargs,
)
@overload
async def arun(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Literal[False] = False,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
background: Optional[bool] = False,
websocket: Optional[WebSocket] = None,
) -> WorkflowRunOutput: ...
@overload
def arun(
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Literal[True] = True,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
background: Optional[bool] = False,
websocket: Optional[WebSocket] = None,
) -> AsyncIterator[WorkflowRunOutputEvent]: ...
def arun( # type: ignore
self,
input: Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]] = None,
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: bool = False,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = False,
background: Optional[bool] = False,
websocket: Optional[WebSocket] = None,
**kwargs: Any,
) -> Union[WorkflowRunOutput, AsyncIterator[WorkflowRunOutputEvent]]:
"""Execute the workflow synchronously with optional streaming"""
input = self._validate_input(input)
websocket_handler = None
if websocket:
from agno.workflow.types import WebSocketHandler
websocket_handler = WebSocketHandler(websocket=websocket)
if background:
if stream and websocket:
# Consider both stream_events and stream_intermediate_steps (deprecated)
stream_events = stream_events or stream_intermediate_steps or False
# Background + Streaming + WebSocket = Real-time events
return self._arun_background_stream( # type: ignore
input=input,
additional_data=additional_data,
user_id=user_id,
session_id=session_id,
session_state=session_state,
audio=audio,
images=images,
videos=videos,
files=files,
stream_events=stream_events,
websocket_handler=websocket_handler,
**kwargs,
)
elif stream and not websocket:
# Background + Streaming but no WebSocket = Not supported
raise ValueError("Background streaming execution requires a WebSocket for real-time events")
else:
# Background + Non-streaming = Polling (existing)
return self._arun_background( # type: ignore
input=input,
additional_data=additional_data,
user_id=user_id,
session_id=session_id,
session_state=session_state,
audio=audio,
images=images,
videos=videos,
files=files,
**kwargs,
)
self._set_debug()
run_id = str(uuid4())
self.initialize_workflow()
session_id, user_id = self._initialize_session(session_id=session_id, user_id=user_id)
log_debug(f"Async Workflow Run Start: {self.name}", center=True)
# Use simple defaults
stream = stream or self.stream or False
stream_events = (stream_events or stream_intermediate_steps) or (
self.stream_events or self.stream_intermediate_steps
)
# Can't stream events if streaming is disabled
if stream is False:
stream_events = False
log_debug(f"Stream: {stream}")
# Prepare steps
self._prepare_steps()
inputs = WorkflowExecutionInput(
input=input,
additional_data=additional_data,
audio=audio, # type: ignore
images=images, # type: ignore
videos=videos, # type: ignore
files=files,
)
log_debug(
f"Created async pipeline input with session state keys: {list(session_state.keys()) if session_state else 'None'}"
)
self.update_agents_and_teams_session_info()
if self.agent is not None:
return self._aexecute_workflow_agent( # type: ignore
user_input=input, # type: ignore
session_id=session_id,
user_id=user_id,
execution_input=inputs,
session_state=session_state,
stream=stream,
**kwargs,
)
# Create workflow run response for regular workflow execution
workflow_run_response = WorkflowRunOutput(
run_id=run_id,
input=input,
session_id=session_id,
workflow_id=self.id,
workflow_name=self.name,
created_at=int(datetime.now().timestamp()),
)
if stream:
return self._aexecute_stream( # type: ignore
execution_input=inputs,
workflow_run_response=workflow_run_response,
session_id=session_id,
user_id=user_id,
stream_events=stream_events,
websocket=websocket,
files=files,
session_state=session_state,
**kwargs,
)
else:
return self._aexecute( # type: ignore
execution_input=inputs,
workflow_run_response=workflow_run_response,
session_id=session_id,
user_id=user_id,
websocket=websocket,
files=files,
session_state=session_state,
**kwargs,
)
def _prepare_steps(self):
"""Prepare the steps for execution"""
if not callable(self.steps) and self.steps is not None:
prepared_steps: List[Union[Step, Steps, Loop, Parallel, Condition, Router]] = []
for i, step in enumerate(self.steps): # type: ignore
if callable(step) and hasattr(step, "__name__"):
step_name = step.__name__
log_debug(f"Step {i + 1}: Wrapping callable function '{step_name}'")
prepared_steps.append(Step(name=step_name, description="User-defined callable step", executor=step)) # type: ignore
elif isinstance(step, Agent):
step_name = step.name or f"step_{i + 1}"
log_debug(f"Step {i + 1}: Agent '{step_name}'")
prepared_steps.append(Step(name=step_name, description=step.description, agent=step))
elif isinstance(step, Team):
step_name = step.name or f"step_{i + 1}"
log_debug(f"Step {i + 1}: Team '{step_name}' with {len(step.members)} members")
prepared_steps.append(Step(name=step_name, description=step.description, team=step))
elif isinstance(step, (Step, Steps, Loop, Parallel, Condition, Router)):
step_type = type(step).__name__
step_name = getattr(step, "name", f"unnamed_{step_type.lower()}")
log_debug(f"Step {i + 1}: {step_type} '{step_name}'")
prepared_steps.append(step)
else:
raise ValueError(f"Invalid step type: {type(step).__name__}")
self.steps = prepared_steps # type: ignore
log_debug("Step preparation completed")
def print_response(
self,
input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]],
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Optional[bool] = None,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
markdown: bool = True,
show_time: bool = True,
show_step_details: bool = True,
console: Optional[Any] = None,
**kwargs: Any,
) -> None:
"""Print workflow execution with rich formatting and optional streaming
Args:
input: The main query/input for the workflow
additional_data: Attached message data to the input
user_id: User ID
session_id: Session ID
audio: Audio input
images: Image input
videos: Video input
stream: Whether to stream the response content
stream_events: Whether to stream intermediate steps
markdown: Whether to render content as markdown
show_time: Whether to show execution time
show_step_details: Whether to show individual step outputs
console: Rich console instance (optional)
(deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default.
"""
if self._has_async_db():
raise Exception("`print_response()` is not supported with an async DB. Please use `aprint_response()`.")
if stream is None:
stream = self.stream or False
# Considering both stream_events and stream_intermediate_steps (deprecated)
stream_events = stream_events or stream_intermediate_steps
# Can't stream events if streaming is disabled
if stream is False:
stream_events = False
if stream_events is None:
stream_events = (
False
if (self.stream_events is None and self.stream_intermediate_steps is None)
else (self.stream_intermediate_steps or self.stream_events)
)
if stream:
print_response_stream(
workflow=self,
input=input,
user_id=user_id,
session_id=session_id,
additional_data=additional_data,
audio=audio,
images=images,
videos=videos,
files=files,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
console=console,
**kwargs,
)
else:
print_response(
workflow=self,
input=input,
user_id=user_id,
session_id=session_id,
additional_data=additional_data,
audio=audio,
images=images,
videos=videos,
files=files,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
console=console,
**kwargs,
)
async def aprint_response(
self,
input: Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]],
additional_data: Optional[Dict[str, Any]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
audio: Optional[List[Audio]] = None,
images: Optional[List[Image]] = None,
videos: Optional[List[Video]] = None,
files: Optional[List[File]] = None,
stream: Optional[bool] = None,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
markdown: bool = True,
show_time: bool = True,
show_step_details: bool = True,
console: Optional[Any] = None,
**kwargs: Any,
) -> None:
"""Print workflow execution with rich formatting and optional streaming
Args:
input: The main message/input for the workflow
additional_data: Attached message data to the input
user_id: User ID
session_id: Session ID
audio: Audio input
images: Image input
videos: Video input
stream: Whether to stream the response content
stream_events: Whether to stream intermediate steps
markdown: Whether to render content as markdown
show_time: Whether to show execution time
show_step_details: Whether to show individual step outputs
console: Rich console instance (optional)
(deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default.
"""
if stream is None:
stream = self.stream or False
# Considering both stream_events and stream_intermediate_steps (deprecated)
stream_events = stream_events or stream_intermediate_steps
# Can't stream events if streaming is disabled
if stream is False:
stream_events = False
if stream_events is None:
stream_events = (
False
if (self.stream_events is None and self.stream_intermediate_steps is None)
else (self.stream_intermediate_steps or self.stream_events)
)
if stream:
await aprint_response_stream(
workflow=self,
input=input,
additional_data=additional_data,
user_id=user_id,
session_id=session_id,
audio=audio,
images=images,
videos=videos,
files=files,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
console=console,
**kwargs,
)
else:
await aprint_response(
workflow=self,
input=input,
additional_data=additional_data,
user_id=user_id,
session_id=session_id,
audio=audio,
images=images,
videos=videos,
files=files,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
console=console,
**kwargs,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert workflow to dictionary representation"""
def serialize_step(step):
# Handle callable functions (not wrapped in Step objects)
if callable(step) and hasattr(step, "__name__"):
step_dict = {
"name": step.__name__,
"description": "User-defined callable step",
"type": StepType.STEP.value,
}
return step_dict
# Handle Agent and Team objects directly
if isinstance(step, Agent):
step_dict = {
"name": step.name or "unnamed_agent",
"description": step.description or "Agent step",
"type": StepType.STEP.value,
"agent": step,
}
return step_dict
if isinstance(step, Team):
step_dict = {
"name": step.name or "unnamed_team",
"description": step.description or "Team step",
"type": StepType.STEP.value,
"team": step,
}
return step_dict
step_dict = {
"name": step.name if hasattr(step, "name") else f"unnamed_{type(step).__name__.lower()}",
"description": step.description if hasattr(step, "description") else "User-defined callable step",
"type": STEP_TYPE_MAPPING[type(step)].value, # type: ignore
}
# Handle agent/team/tools
if hasattr(step, "agent"):
step_dict["agent"] = step.agent if hasattr(step, "agent") else None # type: ignore
if hasattr(step, "team"):
step_dict["team"] = step.team if hasattr(step, "team") else None # type: ignore
# Handle nested steps for Router/Loop
if isinstance(step, (Router)):
step_dict["steps"] = (
[serialize_step(step) for step in step.choices] if hasattr(step, "choices") else None
)
elif isinstance(step, (Loop, Condition, Steps, Parallel)):
step_dict["steps"] = [serialize_step(step) for step in step.steps] if hasattr(step, "steps") else None
return step_dict
if self.steps is None or callable(self.steps):
steps_list = []
elif isinstance(self.steps, Steps):
steps_list = self.steps.steps
else:
steps_list = self.steps
return {
"name": self.name,
"workflow_id": self.id,
"description": self.description,
"steps": [serialize_step(s) for s in steps_list],
"session_id": self.session_id,
}
def _calculate_session_metrics_from_workflow_metrics(self, workflow_metrics: WorkflowMetrics) -> Metrics:
"""Calculate session metrics by aggregating all step metrics from workflow metrics"""
session_metrics = Metrics()
# Aggregate metrics from all steps
for step_name, step_metrics in workflow_metrics.steps.items():
if step_metrics.metrics:
session_metrics += step_metrics.metrics
session_metrics.time_to_first_token = None
return session_metrics
def _get_session_metrics(self, session: WorkflowSession) -> Metrics:
"""Get existing session metrics from the database"""
if session.session_data and "session_metrics" in session.session_data:
session_metrics_from_db = session.session_data.get("session_metrics")
if session_metrics_from_db is not None:
if isinstance(session_metrics_from_db, dict):
return Metrics(**session_metrics_from_db)
elif isinstance(session_metrics_from_db, Metrics):
return session_metrics_from_db
return Metrics()
def _update_session_metrics(self, session: WorkflowSession, workflow_run_response: WorkflowRunOutput):
"""Calculate and update session metrics"""
# Get existing session metrics
session_metrics = self._get_session_metrics(session=session)
# If workflow has metrics, convert and add them to session metrics
if workflow_run_response.metrics:
run_session_metrics = self._calculate_session_metrics_from_workflow_metrics(workflow_run_response.metrics)
session_metrics += run_session_metrics
session_metrics.time_to_first_token = None
# Store updated session metrics - CONVERT TO DICT FOR JSON SERIALIZATION
if not session.session_data:
session.session_data = {}
session.session_data["session_metrics"] = session_metrics.to_dict()
async def aget_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]:
"""Get the session metrics for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is required")
session = await self.aget_session(session_id=session_id) # type: ignore
if session is None:
raise Exception("Session not found")
return self._get_session_metrics(session=session)
def get_session_metrics(self, session_id: Optional[str] = None) -> Optional[Metrics]:
"""Get the session metrics for the given session ID and user ID."""
session_id = session_id or self.session_id
if session_id is None:
raise Exception("Session ID is required")
session = self.get_session(session_id=session_id)
if session is None:
raise Exception("Session not found")
return self._get_session_metrics(session=session)
def update_agents_and_teams_session_info(self):
"""Update agents and teams with workflow session information"""
log_debug("Updating agents and teams with session information")
# Initialize steps - only if steps is iterable (not callable)
if self.steps and not callable(self.steps):
steps_list = self.steps.steps if isinstance(self.steps, Steps) else self.steps
for step in steps_list:
# TODO: Handle properly steps inside other primitives
if isinstance(step, Step):
active_executor = step.active_executor
if hasattr(active_executor, "workflow_id"):
active_executor.workflow_id = self.id
# If it's a team, update all members
if hasattr(active_executor, "members"):
for member in active_executor.members: # type: ignore
if hasattr(member, "workflow_id"):
member.workflow_id = self.id
###########################################################################
# Telemetry functions
###########################################################################
def _get_telemetry_data(self) -> Dict[str, Any]:
"""Get the telemetry data for the workflow"""
return {
"workflow_id": self.id,
"db_type": self.db.__class__.__name__ if self.db else None,
"has_input_schema": self.input_schema is not None,
}
def _log_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None:
"""Send a telemetry event to the API for a created Workflow run"""
self._set_telemetry()
if not self.telemetry:
return
from agno.api.workflow import WorkflowRunCreate, create_workflow_run
try:
create_workflow_run(
workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data()),
)
except Exception as e:
log_debug(f"Could not create Workflow run telemetry event: {e}")
async def _alog_workflow_telemetry(self, session_id: str, run_id: Optional[str] = None) -> None:
"""Send a telemetry event to the API for a created Workflow async run"""
self._set_telemetry()
if not self.telemetry:
return
from agno.api.workflow import WorkflowRunCreate, acreate_workflow_run
try:
await acreate_workflow_run(
workflow=WorkflowRunCreate(session_id=session_id, run_id=run_id, data=self._get_telemetry_data())
)
except Exception as e:
log_debug(f"Could not create Workflow run telemetry event: {e}")
def cli_app(
self,
input: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
user: str = "User",
emoji: str = ":technologist:",
stream: Optional[bool] = None,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
markdown: bool = True,
show_time: bool = True,
show_step_details: bool = True,
exit_on: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""
Run an interactive command-line interface to interact with the workflow.
This method creates a CLI interface that allows users to interact with the workflow
either by providing a single input or through continuous interactive prompts.
Arguments:
input: Optional initial input to process before starting interactive mode.
session_id: Optional session identifier for maintaining conversation context.
user_id: Optional user identifier for tracking user-specific data.
user: Display name for the user in the CLI prompt. Defaults to "User".
emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:".
stream: Whether to stream the workflow response. If None, uses workflow default.
stream_events: Whether to stream intermediate step outputs. If None, uses workflow default.
markdown: Whether to render output as markdown. Defaults to True.
show_time: Whether to display timestamps in the output. Defaults to True.
show_step_details: Whether to show detailed step information. Defaults to True.
exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"].
(deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default.
**kwargs: Additional keyword arguments passed to the workflow's print_response method.
Returns:
None: This method runs interactively and does not return a value.
"""
from rich.prompt import Prompt
# Considering both stream_events and stream_intermediate_steps (deprecated)
stream_events = stream_events or stream_intermediate_steps or False
if input:
self.print_response(
input=input,
stream=stream,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
user_id=user_id,
session_id=session_id,
**kwargs,
)
_exit_on = exit_on or ["exit", "quit", "bye", "stop"]
while True:
message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
if message in _exit_on:
break
self.print_response(
input=message,
stream=stream,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
user_id=user_id,
session_id=session_id,
**kwargs,
)
async def acli_app(
self,
input: Optional[str] = None,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
user: str = "User",
emoji: str = ":technologist:",
stream: Optional[bool] = None,
stream_events: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
markdown: bool = True,
show_time: bool = True,
show_step_details: bool = True,
exit_on: Optional[List[str]] = None,
**kwargs: Any,
) -> None:
"""
Run an interactive command-line interface to interact with the workflow.
This method creates a CLI interface that allows users to interact with the workflow
either by providing a single input or through continuous interactive prompts.
Arguments:
input: Optional initial input to process before starting interactive mode.
session_id: Optional session identifier for maintaining conversation context.
user_id: Optional user identifier for tracking user-specific data.
user: Display name for the user in the CLI prompt. Defaults to "User".
emoji: Emoji to display next to the user name in prompts. Defaults to ":technologist:".
stream: Whether to stream the workflow response. If None, uses workflow default.
stream_events: Whether to stream events from the workflow. If None, uses workflow default.
markdown: Whether to render output as markdown. Defaults to True.
show_time: Whether to display timestamps in the output. Defaults to True.
show_step_details: Whether to show detailed step information. Defaults to True.
exit_on: List of commands that will exit the CLI. Defaults to ["exit", "quit", "bye", "stop"].
(deprecated) stream_intermediate_steps: Whether to stream intermediate step outputs. If None, uses workflow default.
**kwargs: Additional keyword arguments passed to the workflow's print_response method.
Returns:
None: This method runs interactively and does not return a value.
"""
from rich.prompt import Prompt
# Considering both stream_events and stream_intermediate_steps (deprecated)
stream_events = stream_events or stream_intermediate_steps or False
if input:
await self.aprint_response(
input=input,
stream=stream,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
user_id=user_id,
session_id=session_id,
**kwargs,
)
_exit_on = exit_on or ["exit", "quit", "bye", "stop"]
while True:
message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
if message in _exit_on:
break
await self.aprint_response(
input=message,
stream=stream,
stream_events=stream_events,
markdown=markdown,
show_time=show_time,
show_step_details=show_step_details,
user_id=user_id,
session_id=session_id,
**kwargs,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment