ensure flow logs are not passed if its on executor

This commit is contained in:
lorenzejay
2025-11-29 14:04:33 -08:00
parent 1e324adab8
commit d25ab2d887
3 changed files with 118 additions and 233 deletions

View File

@@ -21,7 +21,9 @@ from typing_extensions import Self
from crewai.a2a.config import A2AConfig from crewai.a2a.config import A2AConfig
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache.cache_handler import CacheHandler from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow
# from crewai.agents.crew_agent_executor import CrewAgentExecutor
# from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.knowledge_events import ( from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent, KnowledgeQueryCompletedEvent,
@@ -213,6 +215,10 @@ class Agent(BaseAgent):
default=None, default=None,
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
) )
# agent_executor_class: CrewAgentExecutorFlow | CrewAgentExecutor = Field(
# default=CrewAgentExecutor,
# description="Class to use for the agent executor.",
# )
@model_validator(mode="before") @model_validator(mode="before")
def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805 def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805
@@ -449,8 +455,8 @@ class Agent(BaseAgent):
) )
tools = tools or self.tools or [] tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
self.create_agent_executor(tools=tools, task=task)
if self.crew and self.crew._train: if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt) task_prompt = self._training_handler(task_prompt=task_prompt)
else: else:
@@ -646,6 +652,8 @@ class Agent(BaseAgent):
rpm_limit_fn=rpm_limit_fn, rpm_limit_fn=rpm_limit_fn,
) )
else: else:
from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow
self.agent_executor = CrewAgentExecutorFlow( self.agent_executor = CrewAgentExecutorFlow(
llm=self.llm, llm=self.llm,
task=task, task=task,

View File

@@ -103,9 +103,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
callbacks: list[Any] | None = None, callbacks: list[Any] | None = None,
response_model: type[BaseModel] | None = None, response_model: type[BaseModel] | None = None,
) -> None: ) -> None:
"""Initialize with same signature as CrewAgentExecutor. """Initialize the flow-based agent executor.
Reference: crew_agent_executor.py lines 70-150
Args: Args:
llm: Language model instance. llm: Language model instance.
@@ -186,13 +184,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
""" """
if not self._flow_initialized: if not self._flow_initialized:
# Now call Flow's __init__ which will replace self._state # Now call Flow's __init__ which will replace self._state
# with Flow's managed state # with Flow's managed state. Suppress flow events since this is
super().__init__() # an agent executor, not a user-facing flow.
self._flow_initialized = True super().__init__(
self._printer.print( suppress_flow_events=True,
content=f"🌊 Flow initialized for instance: {self._instance_id}",
color="blue",
) )
self._flow_initialized = True
@property @property
def use_stop_words(self) -> bool: def use_stop_words(self) -> bool:
@@ -224,25 +221,13 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@start() @start()
def initialize_reasoning(self) -> str: def initialize_reasoning(self) -> str:
"""Initialize flow state and messages. """Initialize the reasoning flow and emit agent start logs."""
Maps to: Initial prompt formatting in current executor's invoke() method
Reference: crew_agent_executor.py lines 170-181
Flow Event: START -> triggers check_max_iterations_router
"""
self._show_start_logs() self._show_start_logs()
return "initialized" return "initialized"
@listen("force_final_answer") @listen("force_final_answer")
def force_final_answer(self) -> str: def force_final_answer(self) -> str:
"""Force agent to provide final answer when max iterations exceeded. """Force agent to provide final answer when max iterations exceeded."""
Maps to: handle_max_iterations_exceeded at lines 217-224
Reference: crew_agent_executor.py lines 217-224
Flow Event: "force_final_answer" -> "agent_finished"
"""
formatted_answer = handle_max_iterations_exceeded( formatted_answer = handle_max_iterations_exceeded(
formatted_answer=None, formatted_answer=None,
printer=self._printer, printer=self._printer,
@@ -259,17 +244,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("continue_reasoning") @listen("continue_reasoning")
def call_llm_and_parse(self) -> str: def call_llm_and_parse(self) -> str:
"""Execute LLM call with hooks and parse response. """Execute LLM call with hooks and parse the response.
Maps to: Lines 227-239 in _invoke_loop Returns routing decision based on parsing result.
Reference: crew_agent_executor.py lines 227-239
Flow Event: "continue_reasoning" -> "parsed" | "parser_error" | "context_error"
Steps:
1. enforce_rpm_limit
2. get_llm_response (with before/after hooks already integrated)
3. process_llm_response
""" """
self._printer.print( self._printer.print(
content=f"🤖 call_llm_and_parse: About to call LLM (iteration {self.state.iterations})", content=f"🤖 call_llm_and_parse: About to call LLM (iteration {self.state.iterations})",
@@ -278,10 +255,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
try: try:
enforce_rpm_limit(self.request_within_rpm_limit) enforce_rpm_limit(self.request_within_rpm_limit)
# Note: Hooks are already integrated in get_llm_response utility
answer = get_llm_response( answer = get_llm_response(
llm=self.llm, llm=self.llm,
messages=list(self.state.messages), # Pass copy of state messages messages=list(self.state.messages),
callbacks=self.callbacks, callbacks=self.callbacks,
printer=self._printer, printer=self._printer,
from_task=self.task, from_task=self.task,
@@ -290,7 +266,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
executor_context=self, executor_context=self,
) )
# Parse response (line 239) # Parse the LLM response
formatted_answer = process_llm_response(answer, self.use_stop_words) formatted_answer = process_llm_response(answer, self.use_stop_words)
self.state.current_answer = formatted_answer self.state.current_answer = formatted_answer
@@ -316,7 +292,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
if is_context_length_exceeded(e): if is_context_length_exceeded(e):
self._last_context_error = e self._last_context_error = e
return "context_error" return "context_error"
# Re-raise other exceptions (including litellm errors)
if e.__class__.__module__.startswith("litellm"): if e.__class__.__module__.startswith("litellm"):
raise e raise e
handle_unknown_error(self._printer, e) handle_unknown_error(self._printer, e)
@@ -324,13 +299,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(call_llm_and_parse) @router(call_llm_and_parse)
def route_by_answer_type(self) -> str: def route_by_answer_type(self) -> str:
"""Route based on whether answer is AgentAction or AgentFinish. """Route based on whether answer is AgentAction or AgentFinish."""
Maps to: isinstance check at line 241
Reference: crew_agent_executor.py line 241
Flow Event: call_llm_and_parse completes -> ROUTE -> "execute_tool" | "agent_finished"
"""
answer_type = type(self.state.current_answer).__name__ answer_type = type(self.state.current_answer).__name__
self._printer.print( self._printer.print(
content=f"🚦 route_by_answer_type: Got {answer_type}", content=f"🚦 route_by_answer_type: Got {answer_type}",
@@ -342,23 +311,11 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("execute_tool") @listen("execute_tool")
def execute_tool_action(self) -> str: def execute_tool_action(self) -> str:
"""Execute tool and handle result. """Execute the tool action and handle the result."""
Maps to: Lines 242-273 in _invoke_loop
Reference: crew_agent_executor.py lines 242-273
Flow Event: "execute_tool" -> completion triggers router
Steps:
1. Extract fingerprint context
2. Execute tool via execute_tool_and_check_finality
3. Handle agent action (append observation)
4. Invoke step callback
"""
try: try:
action = cast(AgentAction, self.state.current_answer) action = cast(AgentAction, self.state.current_answer)
# Extract fingerprint context (lines 243-253) # Extract fingerprint context for tool execution
fingerprint_context = {} fingerprint_context = {}
if ( if (
self.agent self.agent
@@ -369,7 +326,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"agent_fingerprint": str(self.agent.security_config.fingerprint) "agent_fingerprint": str(self.agent.security_config.fingerprint)
} }
# Execute tool (lines 255-267) # Execute the tool
tool_result = execute_tool_and_check_finality( tool_result = execute_tool_and_check_finality(
agent_action=action, agent_action=action,
fingerprint_context=fingerprint_context, fingerprint_context=fingerprint_context,
@@ -384,14 +341,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
crew=self.crew, crew=self.crew,
) )
# Handle agent action - appends observation to messages (lines 268-270) # Handle agent action and append observation to messages
result = self._handle_agent_action(action, tool_result) result = self._handle_agent_action(action, tool_result)
self.state.current_answer = result self.state.current_answer = result
# Invoke step callback (line 272) # Invoke step callback if configured
self._invoke_step_callback(result) self._invoke_step_callback(result)
# Append message to state (line 273) # Append result message to conversation state
if hasattr(result, "text"): if hasattr(result, "text"):
self._append_message_to_state(result.text) self._append_message_to_state(result.text)
@@ -408,26 +365,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("initialized") @listen("initialized")
def continue_iteration(self) -> str: def continue_iteration(self) -> str:
"""Bridge listener that catches 'initialized' event from increment_and_continue. """Bridge listener that connects iteration loop back to iteration check."""
This is needed because @router listens to METHODS, not EVENT STRINGS.
increment_and_continue returns 'initialized' STRING which triggers this listener.
"""
return "check_iteration" return "check_iteration"
@router(or_(initialize_reasoning, continue_iteration)) @router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(self) -> str: def check_max_iterations(self) -> str:
"""Check if max iterations reached before LLM call. """Check if max iterations reached before proceeding with reasoning."""
Maps to: has_reached_max_iterations check at line 216
Reference: crew_agent_executor.py lines 216-225
Triggered by:
- initialize_reasoning METHOD (first iteration)
- continue_iteration METHOD (subsequent iterations after tool execution)
Flow Event: ROUTE -> "force_final_answer" | "continue_reasoning"
"""
self._printer.print( self._printer.print(
content=f"🔄 check_max_iterations: iteration {self.state.iterations}/{self.max_iter}", content=f"🔄 check_max_iterations: iteration {self.state.iterations}/{self.max_iter}",
color="cyan", color="cyan",
@@ -438,35 +381,17 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(execute_tool_action) @router(execute_tool_action)
def increment_and_continue(self) -> str: def increment_and_continue(self) -> str:
"""Increment iteration counter and loop back. """Increment iteration counter and loop back for next iteration."""
Maps to: Loop continuation (iteration increment at line 301)
Reference: crew_agent_executor.py line 301 (finally block)
Flow Event: execute_tool_action completes -> ROUTER returns "loop_continue"
"""
# Increment iterations (line 301)
self.state.iterations += 1 self.state.iterations += 1
self._printer.print( self._printer.print(
content=f"+ increment_and_continue: Incremented to iteration {self.state.iterations}, looping back", content=f"+ increment_and_continue: Incremented to iteration {self.state.iterations}, looping back",
color="magenta", color="magenta",
) )
# Return "initialized" to trigger check_max_iterations router again (simple loop)
return "initialized" return "initialized"
@listen(or_("agent_finished", "tool_result_is_final")) @listen(or_("agent_finished", "tool_result_is_final"))
def finalize(self) -> str: def finalize(self) -> str:
"""Finalize execution and return result. """Finalize execution and emit completion logs."""
Maps to: Final steps after loop (lines 307-313)
Reference: crew_agent_executor.py lines 307-313
Triggered by:
- "agent_finished" (Router returns this when LLM gives Final Answer)
- "tool_result_is_final" (Tool execution returns this when tool has result_as_answer=True)
Flow Event: -> "completed" (END)
"""
if self.state.current_answer is None: if self.state.current_answer is None:
self._printer.print( self._printer.print(
content="⚠️ Finalize called but no answer in state - skipping", content="⚠️ Finalize called but no answer in state - skipping",
@@ -489,13 +414,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("parser_error") @listen("parser_error")
def recover_from_parser_error(self) -> str: def recover_from_parser_error(self) -> str:
"""Recover from output parser errors. """Recover from output parser errors and retry."""
Maps to: OutputParserError handling in _invoke_loop
Reference: crew_agent_executor.py lines 275-282
Flow Event: "parser_error" -> "initialized" (loops back via event)
"""
formatted_answer = handle_output_parser_exception( formatted_answer = handle_output_parser_exception(
e=self._last_parser_error, e=self._last_parser_error,
messages=list(self.state.messages), messages=list(self.state.messages),
@@ -513,13 +432,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("context_error") @listen("context_error")
def recover_from_context_length(self) -> str: def recover_from_context_length(self) -> str:
"""Recover from context length errors. """Recover from context length errors and retry."""
Maps to: Context length exception handling in _invoke_loop
Reference: crew_agent_executor.py lines 288-297
Flow Event: "context_error" -> "initialized" (loops back via event)
"""
handle_context_length( handle_context_length(
respect_context_window=self.respect_context_window, respect_context_window=self.respect_context_window,
printer=self._printer, printer=self._printer,
@@ -534,13 +447,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized" return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent with given inputs - maintains compatibility. """Execute agent with given inputs.
Maps to: invoke() method at lines 161-205
Reference: crew_agent_executor.py lines 161-205
This is the main entry point that maintains backward compatibility
with the current CrewAgentExecutor interface.
Args: Args:
inputs: Input dictionary containing prompt variables. inputs: Input dictionary containing prompt variables.
@@ -559,15 +466,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._is_executing = True self._is_executing = True
self._has_been_invoked = True self._has_been_invoked = True
self._printer.print(
content=f"🚀 FlowExecutor.invoke() called on instance: {self._instance_id}",
color="green",
)
try: try:
# Reset state for fresh execution # Reset state for fresh execution
# This is important because create_agent_executor may be called multiple times
# during agent initialization, and we need clean state for each actual task execution
self.state.messages.clear() self.state.messages.clear()
self.state.iterations = 0 self.state.iterations = 0
self.state.current_answer = None self.state.current_answer = None
@@ -625,9 +525,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _handle_agent_action( def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish: ) -> AgentAction | AgentFinish:
"""Process agent action and tool execution. """Process agent action and tool execution result.
Reference: crew_agent_executor.py lines 315-343
Args: Args:
formatted_answer: Agent's action to execute. formatted_answer: Agent's action to execute.
@@ -658,9 +556,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _invoke_step_callback( def _invoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish self, formatted_answer: AgentAction | AgentFinish
) -> None: ) -> None:
"""Invoke step callback. """Invoke step callback if configured.
Reference: crew_agent_executor.py lines 345-354
Args: Args:
formatted_answer: Current agent response. formatted_answer: Current agent response.
@@ -673,9 +569,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
) -> None: ) -> None:
"""Add message to state conversation history. """Add message to state conversation history.
Reference: crew_agent_executor.py lines 356-365
Adapted to work with Flow state instead of instance variable.
Args: Args:
text: Message content. text: Message content.
role: Message role (default: assistant). role: Message role (default: assistant).
@@ -683,10 +576,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.messages.append(format_message_for_llm(text, role=role)) self.state.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self) -> None: def _show_start_logs(self) -> None:
"""Emit agent start event. """Emit agent start event."""
Reference: crew_agent_executor.py lines 367-380
"""
if self.agent is None: if self.agent is None:
raise ValueError("Agent cannot be None") raise ValueError("Agent cannot be None")
@@ -703,8 +593,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None: def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None:
"""Emit agent execution event. """Emit agent execution event.
Reference: crew_agent_executor.py lines 382-399
Args: Args:
formatted_answer: Agent's response to log. formatted_answer: Agent's response to log.
""" """
@@ -724,9 +612,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _handle_crew_training_output( def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: str | None = None self, result: AgentFinish, human_feedback: str | None = None
) -> None: ) -> None:
"""Save training data. """Save training data for crew training mode.
Reference: crew_agent_executor.py lines 401-450
Args: Args:
result: Agent's final output. result: Agent's final output.
@@ -776,9 +662,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@staticmethod @staticmethod
def _format_prompt(prompt: str, inputs: dict[str, str]) -> str: def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
"""Format prompt with input values. """Format prompt template with input values.
Reference: crew_agent_executor.py lines 452-465
Args: Args:
prompt: Template string. prompt: Template string.
@@ -792,9 +676,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return prompt.replace("{tools}", inputs["tools"]) return prompt.replace("{tools}", inputs["tools"])
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Process human feedback. """Process human feedback and refine answer.
Reference: crew_agent_executor.py lines 467-481
Args: Args:
formatted_answer: Initial agent result. formatted_answer: Initial agent result.
@@ -812,8 +694,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _is_training_mode(self) -> bool: def _is_training_mode(self) -> bool:
"""Check if training mode is active. """Check if training mode is active.
Reference: crew_agent_executor.py lines 483-489
Returns: Returns:
True if in training mode. True if in training mode.
""" """
@@ -822,9 +702,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _handle_training_feedback( def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish: ) -> AgentFinish:
"""Process training feedback. """Process training feedback and generate improved answer.
Reference: crew_agent_executor.py lines 491-512
Args: Args:
initial_answer: Initial agent output. initial_answer: Initial agent output.
@@ -841,7 +719,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
) )
# Re-run flow for improved answer # Re-run flow for improved answer
# Need to reset state appropriately
self.state.iterations = 0 self.state.iterations = 0
self.state.is_finished = False self.state.is_finished = False
self.state.current_answer = None self.state.current_answer = None
@@ -862,9 +739,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def _handle_regular_feedback( def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish: ) -> AgentFinish:
"""Process regular feedback iteratively. """Process regular feedback iteratively until user is satisfied.
Reference: crew_agent_executor.py lines 514-537
Args: Args:
current_answer: Current agent output. current_answer: Current agent output.
@@ -886,9 +761,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return answer return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish: def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process single feedback iteration. """Process a single feedback iteration and generate updated response.
Reference: crew_agent_executor.py lines 539-553
Args: Args:
feedback: User feedback. feedback: User feedback.
@@ -922,9 +795,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
) -> CoreSchema: ) -> CoreSchema:
"""Generate Pydantic core schema for Protocol compatibility. """Generate Pydantic core schema for Protocol compatibility.
Reference: crew_agent_executor.py lines 555-564 Allows the executor to be used in Pydantic models without
This allows the Protocol to be used in Pydantic models without
requiring arbitrary_types_allowed=True. requiring arbitrary_types_allowed=True.
""" """
return core_schema.any_schema() return core_schema.any_schema()

View File

@@ -478,6 +478,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self, self,
persistence: FlowPersistence | None = None, persistence: FlowPersistence | None = None,
tracing: bool | None = None, tracing: bool | None = None,
suppress_flow_events: bool = False,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""Initialize a new Flow instance. """Initialize a new Flow instance.
@@ -485,6 +486,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Args: Args:
persistence: Optional persistence backend for storing flow states persistence: Optional persistence backend for storing flow states
tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings
suppress_flow_events: Whether to suppress flow event emissions (internal use)
**kwargs: Additional state values to initialize or override **kwargs: Additional state values to initialize or override
""" """
# Initialize basic instance attributes # Initialize basic instance attributes
@@ -498,6 +500,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._persistence: FlowPersistence | None = persistence self._persistence: FlowPersistence | None = persistence
self._is_execution_resuming: bool = False self._is_execution_resuming: bool = False
self._event_futures: list[Future[None]] = [] self._event_futures: list[Future[None]] = []
self.suppress_flow_events: bool = suppress_flow_events
# Initialize state with initial values # Initialize state with initial values
self._state = self._create_initial_state() self._state = self._create_initial_state()
@@ -511,13 +514,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
if kwargs: if kwargs:
self._initialize_state(kwargs) self._initialize_state(kwargs)
crewai_event_bus.emit( if not self.suppress_flow_events:
self, crewai_event_bus.emit(
FlowCreatedEvent( self,
type="flow_created", FlowCreatedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_created",
), flow_name=self.name or self.__class__.__name__,
) ),
)
# Register all flow-related methods # Register all flow-related methods
for method_name in dir(self): for method_name in dir(self):
@@ -974,19 +978,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(filtered_inputs) self._initialize_state(filtered_inputs)
# Emit FlowStartedEvent and log the start of the flow. # Emit FlowStartedEvent and log the start of the flow.
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
FlowStartedEvent( self,
type="flow_started", FlowStartedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_started",
inputs=inputs, flow_name=self.name or self.__class__.__name__,
), inputs=inputs,
) ),
if future: )
self._event_futures.append(future) if future:
self._log_flow_event( self._event_futures.append(future)
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)
if inputs is not None and "id" not in inputs: if inputs is not None and "id" not in inputs:
self._initialize_state(inputs) self._initialize_state(inputs)
@@ -1002,17 +1004,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
final_output = self._method_outputs[-1] if self._method_outputs else None final_output = self._method_outputs[-1] if self._method_outputs else None
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
FlowFinishedEvent( self,
type="flow_finished", FlowFinishedEvent(
flow_name=self.name or self.__class__.__name__, type="flow_finished",
result=final_output, flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(), result=final_output,
), state=self._copy_and_serialize_state(),
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
if self._event_futures: if self._event_futures:
await asyncio.gather( await asyncio.gather(
@@ -1111,18 +1114,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
kwargs or {} kwargs or {}
) )
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
MethodExecutionStartedEvent( self,
type="method_execution_started", MethodExecutionStartedEvent(
method_name=method_name, type="method_execution_started",
flow_name=self.name or self.__class__.__name__, method_name=method_name,
params=dumped_params, flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(), params=dumped_params,
), state=self._copy_and_serialize_state(),
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
result = ( result = (
await method(*args, **kwargs) await method(*args, **kwargs)
@@ -1137,32 +1141,34 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name) self._completed_methods.add(method_name)
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
MethodExecutionFinishedEvent( self,
type="method_execution_finished", MethodExecutionFinishedEvent(
method_name=method_name, type="method_execution_finished",
flow_name=self.name or self.__class__.__name__, method_name=method_name,
state=self._copy_and_serialize_state(), flow_name=self.name or self.__class__.__name__,
result=result, state=self._copy_and_serialize_state(),
), result=result,
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
return result return result
except Exception as e: except Exception as e:
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
MethodExecutionFailedEvent( self,
type="method_execution_failed", MethodExecutionFailedEvent(
method_name=method_name, type="method_execution_failed",
flow_name=self.name or self.__class__.__name__, method_name=method_name,
error=e, flow_name=self.name or self.__class__.__name__,
), error=e,
) ),
if future: )
self._event_futures.append(future) if future:
self._event_futures.append(future)
raise e raise e
def _copy_and_serialize_state(self) -> dict[str, Any]: def _copy_and_serialize_state(self) -> dict[str, Any]: