From d25ab2d8879156b7558ae8ec68b301c5899dfea3 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Sat, 29 Nov 2025 14:04:33 -0800 Subject: [PATCH] ensure flow logs are not passed if its on executor --- lib/crewai/src/crewai/agent/core.py | 12 +- .../crewai/agents/crew_agent_executor_flow.py | 201 ++++-------------- lib/crewai/src/crewai/flow/flow.py | 138 ++++++------ 3 files changed, 118 insertions(+), 233 deletions(-) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index f8bc85d46..051c7baa0 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -21,7 +21,9 @@ from typing_extensions import Self from crewai.a2a.config import A2AConfig from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.cache.cache_handler import CacheHandler -from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow + +# from crewai.agents.crew_agent_executor import CrewAgentExecutor +# from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow from crewai.events.event_bus import crewai_event_bus from crewai.events.types.knowledge_events import ( KnowledgeQueryCompletedEvent, @@ -213,6 +215,10 @@ class Agent(BaseAgent): default=None, description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", ) + # agent_executor_class: CrewAgentExecutorFlow | CrewAgentExecutor = Field( + # default=CrewAgentExecutor, + # description="Class to use for the agent executor.", + # ) @model_validator(mode="before") def validate_from_repository(cls, v: Any) -> dict[str, Any] | None | Any: # noqa: N805 @@ -449,8 +455,8 @@ class Agent(BaseAgent): ) tools = tools or self.tools or [] - self.create_agent_executor(tools=tools, task=task) + self.create_agent_executor(tools=tools, task=task) if self.crew and self.crew._train: task_prompt = self._training_handler(task_prompt=task_prompt) else: @@ -646,6 +652,8 @@ class Agent(BaseAgent): rpm_limit_fn=rpm_limit_fn, ) else: + from crewai.agents.crew_agent_executor_flow import CrewAgentExecutorFlow + self.agent_executor = CrewAgentExecutorFlow( llm=self.llm, task=task, diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py b/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py index f721e6fb1..08497ab17 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor_flow.py @@ -103,9 +103,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): callbacks: list[Any] | None = None, response_model: type[BaseModel] | None = None, ) -> None: - """Initialize with same signature as CrewAgentExecutor. - - Reference: crew_agent_executor.py lines 70-150 + """Initialize the flow-based agent executor. Args: llm: Language model instance. @@ -186,13 +184,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): """ if not self._flow_initialized: # Now call Flow's __init__ which will replace self._state - # with Flow's managed state - super().__init__() - self._flow_initialized = True - self._printer.print( - content=f"🌊 Flow initialized for instance: {self._instance_id}", - color="blue", + # with Flow's managed state. Suppress flow events since this is + # an agent executor, not a user-facing flow. + super().__init__( + suppress_flow_events=True, ) + self._flow_initialized = True @property def use_stop_words(self) -> bool: @@ -224,25 +221,13 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @start() def initialize_reasoning(self) -> str: - """Initialize flow state and messages. - - Maps to: Initial prompt formatting in current executor's invoke() method - Reference: crew_agent_executor.py lines 170-181 - - Flow Event: START -> triggers check_max_iterations_router - """ + """Initialize the reasoning flow and emit agent start logs.""" self._show_start_logs() return "initialized" @listen("force_final_answer") def force_final_answer(self) -> str: - """Force agent to provide final answer when max iterations exceeded. - - Maps to: handle_max_iterations_exceeded at lines 217-224 - Reference: crew_agent_executor.py lines 217-224 - - Flow Event: "force_final_answer" -> "agent_finished" - """ + """Force agent to provide final answer when max iterations exceeded.""" formatted_answer = handle_max_iterations_exceeded( formatted_answer=None, printer=self._printer, @@ -259,17 +244,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("continue_reasoning") def call_llm_and_parse(self) -> str: - """Execute LLM call with hooks and parse response. + """Execute LLM call with hooks and parse the response. - Maps to: Lines 227-239 in _invoke_loop - Reference: crew_agent_executor.py lines 227-239 - - Flow Event: "continue_reasoning" -> "parsed" | "parser_error" | "context_error" - - Steps: - 1. enforce_rpm_limit - 2. get_llm_response (with before/after hooks already integrated) - 3. process_llm_response + Returns routing decision based on parsing result. """ self._printer.print( content=f"🤖 call_llm_and_parse: About to call LLM (iteration {self.state.iterations})", @@ -278,10 +255,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): try: enforce_rpm_limit(self.request_within_rpm_limit) - # Note: Hooks are already integrated in get_llm_response utility answer = get_llm_response( llm=self.llm, - messages=list(self.state.messages), # Pass copy of state messages + messages=list(self.state.messages), callbacks=self.callbacks, printer=self._printer, from_task=self.task, @@ -290,7 +266,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): executor_context=self, ) - # Parse response (line 239) + # Parse the LLM response formatted_answer = process_llm_response(answer, self.use_stop_words) self.state.current_answer = formatted_answer @@ -316,7 +292,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): if is_context_length_exceeded(e): self._last_context_error = e return "context_error" - # Re-raise other exceptions (including litellm errors) if e.__class__.__module__.startswith("litellm"): raise e handle_unknown_error(self._printer, e) @@ -324,13 +299,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @router(call_llm_and_parse) def route_by_answer_type(self) -> str: - """Route based on whether answer is AgentAction or AgentFinish. - - Maps to: isinstance check at line 241 - Reference: crew_agent_executor.py line 241 - - Flow Event: call_llm_and_parse completes -> ROUTE -> "execute_tool" | "agent_finished" - """ + """Route based on whether answer is AgentAction or AgentFinish.""" answer_type = type(self.state.current_answer).__name__ self._printer.print( content=f"🚦 route_by_answer_type: Got {answer_type}", @@ -342,23 +311,11 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("execute_tool") def execute_tool_action(self) -> str: - """Execute tool and handle result. - - Maps to: Lines 242-273 in _invoke_loop - Reference: crew_agent_executor.py lines 242-273 - - Flow Event: "execute_tool" -> completion triggers router - - Steps: - 1. Extract fingerprint context - 2. Execute tool via execute_tool_and_check_finality - 3. Handle agent action (append observation) - 4. Invoke step callback - """ + """Execute the tool action and handle the result.""" try: action = cast(AgentAction, self.state.current_answer) - # Extract fingerprint context (lines 243-253) + # Extract fingerprint context for tool execution fingerprint_context = {} if ( self.agent @@ -369,7 +326,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): "agent_fingerprint": str(self.agent.security_config.fingerprint) } - # Execute tool (lines 255-267) + # Execute the tool tool_result = execute_tool_and_check_finality( agent_action=action, fingerprint_context=fingerprint_context, @@ -384,14 +341,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): crew=self.crew, ) - # Handle agent action - appends observation to messages (lines 268-270) + # Handle agent action and append observation to messages result = self._handle_agent_action(action, tool_result) self.state.current_answer = result - # Invoke step callback (line 272) + # Invoke step callback if configured self._invoke_step_callback(result) - # Append message to state (line 273) + # Append result message to conversation state if hasattr(result, "text"): self._append_message_to_state(result.text) @@ -408,26 +365,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("initialized") def continue_iteration(self) -> str: - """Bridge listener that catches 'initialized' event from increment_and_continue. - - This is needed because @router listens to METHODS, not EVENT STRINGS. - increment_and_continue returns 'initialized' STRING which triggers this listener. - """ + """Bridge listener that connects iteration loop back to iteration check.""" return "check_iteration" @router(or_(initialize_reasoning, continue_iteration)) def check_max_iterations(self) -> str: - """Check if max iterations reached before LLM call. - - Maps to: has_reached_max_iterations check at line 216 - Reference: crew_agent_executor.py lines 216-225 - - Triggered by: - - initialize_reasoning METHOD (first iteration) - - continue_iteration METHOD (subsequent iterations after tool execution) - - Flow Event: ROUTE -> "force_final_answer" | "continue_reasoning" - """ + """Check if max iterations reached before proceeding with reasoning.""" self._printer.print( content=f"🔄 check_max_iterations: iteration {self.state.iterations}/{self.max_iter}", color="cyan", @@ -438,35 +381,17 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @router(execute_tool_action) def increment_and_continue(self) -> str: - """Increment iteration counter and loop back. - - Maps to: Loop continuation (iteration increment at line 301) - Reference: crew_agent_executor.py line 301 (finally block) - - Flow Event: execute_tool_action completes -> ROUTER returns "loop_continue" - """ - # Increment iterations (line 301) + """Increment iteration counter and loop back for next iteration.""" self.state.iterations += 1 self._printer.print( content=f"+ increment_and_continue: Incremented to iteration {self.state.iterations}, looping back", color="magenta", ) - # Return "initialized" to trigger check_max_iterations router again (simple loop) return "initialized" @listen(or_("agent_finished", "tool_result_is_final")) def finalize(self) -> str: - """Finalize execution and return result. - - Maps to: Final steps after loop (lines 307-313) - Reference: crew_agent_executor.py lines 307-313 - - Triggered by: - - "agent_finished" (Router returns this when LLM gives Final Answer) - - "tool_result_is_final" (Tool execution returns this when tool has result_as_answer=True) - - Flow Event: -> "completed" (END) - """ + """Finalize execution and emit completion logs.""" if self.state.current_answer is None: self._printer.print( content="⚠️ Finalize called but no answer in state - skipping", @@ -489,13 +414,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("parser_error") def recover_from_parser_error(self) -> str: - """Recover from output parser errors. - - Maps to: OutputParserError handling in _invoke_loop - Reference: crew_agent_executor.py lines 275-282 - - Flow Event: "parser_error" -> "initialized" (loops back via event) - """ + """Recover from output parser errors and retry.""" formatted_answer = handle_output_parser_exception( e=self._last_parser_error, messages=list(self.state.messages), @@ -513,13 +432,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("context_error") def recover_from_context_length(self) -> str: - """Recover from context length errors. - - Maps to: Context length exception handling in _invoke_loop - Reference: crew_agent_executor.py lines 288-297 - - Flow Event: "context_error" -> "initialized" (loops back via event) - """ + """Recover from context length errors and retry.""" handle_context_length( respect_context_window=self.respect_context_window, printer=self._printer, @@ -534,13 +447,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return "initialized" def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: - """Execute agent with given inputs - maintains compatibility. - - Maps to: invoke() method at lines 161-205 - Reference: crew_agent_executor.py lines 161-205 - - This is the main entry point that maintains backward compatibility - with the current CrewAgentExecutor interface. + """Execute agent with given inputs. Args: inputs: Input dictionary containing prompt variables. @@ -559,15 +466,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self._is_executing = True self._has_been_invoked = True - self._printer.print( - content=f"🚀 FlowExecutor.invoke() called on instance: {self._instance_id}", - color="green", - ) - try: # Reset state for fresh execution - # This is important because create_agent_executor may be called multiple times - # during agent initialization, and we need clean state for each actual task execution self.state.messages.clear() self.state.iterations = 0 self.state.current_answer = None @@ -625,9 +525,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _handle_agent_action( self, formatted_answer: AgentAction, tool_result: ToolResult ) -> AgentAction | AgentFinish: - """Process agent action and tool execution. - - Reference: crew_agent_executor.py lines 315-343 + """Process agent action and tool execution result. Args: formatted_answer: Agent's action to execute. @@ -658,9 +556,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _invoke_step_callback( self, formatted_answer: AgentAction | AgentFinish ) -> None: - """Invoke step callback. - - Reference: crew_agent_executor.py lines 345-354 + """Invoke step callback if configured. Args: formatted_answer: Current agent response. @@ -673,9 +569,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) -> None: """Add message to state conversation history. - Reference: crew_agent_executor.py lines 356-365 - Adapted to work with Flow state instead of instance variable. - Args: text: Message content. role: Message role (default: assistant). @@ -683,10 +576,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self.state.messages.append(format_message_for_llm(text, role=role)) def _show_start_logs(self) -> None: - """Emit agent start event. - - Reference: crew_agent_executor.py lines 367-380 - """ + """Emit agent start event.""" if self.agent is None: raise ValueError("Agent cannot be None") @@ -703,8 +593,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None: """Emit agent execution event. - Reference: crew_agent_executor.py lines 382-399 - Args: formatted_answer: Agent's response to log. """ @@ -724,9 +612,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _handle_crew_training_output( self, result: AgentFinish, human_feedback: str | None = None ) -> None: - """Save training data. - - Reference: crew_agent_executor.py lines 401-450 + """Save training data for crew training mode. Args: result: Agent's final output. @@ -776,9 +662,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @staticmethod def _format_prompt(prompt: str, inputs: dict[str, str]) -> str: - """Format prompt with input values. - - Reference: crew_agent_executor.py lines 452-465 + """Format prompt template with input values. Args: prompt: Template string. @@ -792,9 +676,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return prompt.replace("{tools}", inputs["tools"]) def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: - """Process human feedback. - - Reference: crew_agent_executor.py lines 467-481 + """Process human feedback and refine answer. Args: formatted_answer: Initial agent result. @@ -812,8 +694,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _is_training_mode(self) -> bool: """Check if training mode is active. - Reference: crew_agent_executor.py lines 483-489 - Returns: True if in training mode. """ @@ -822,9 +702,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _handle_training_feedback( self, initial_answer: AgentFinish, feedback: str ) -> AgentFinish: - """Process training feedback. - - Reference: crew_agent_executor.py lines 491-512 + """Process training feedback and generate improved answer. Args: initial_answer: Initial agent output. @@ -841,7 +719,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) # Re-run flow for improved answer - # Need to reset state appropriately self.state.iterations = 0 self.state.is_finished = False self.state.current_answer = None @@ -862,9 +739,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def _handle_regular_feedback( self, current_answer: AgentFinish, initial_feedback: str ) -> AgentFinish: - """Process regular feedback iteratively. - - Reference: crew_agent_executor.py lines 514-537 + """Process regular feedback iteratively until user is satisfied. Args: current_answer: Current agent output. @@ -886,9 +761,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return answer def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process single feedback iteration. - - Reference: crew_agent_executor.py lines 539-553 + """Process a single feedback iteration and generate updated response. Args: feedback: User feedback. @@ -922,9 +795,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) -> CoreSchema: """Generate Pydantic core schema for Protocol compatibility. - Reference: crew_agent_executor.py lines 555-564 - - This allows the Protocol to be used in Pydantic models without + Allows the executor to be used in Pydantic models without requiring arbitrary_types_allowed=True. """ return core_schema.any_schema() diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 6123994e3..c0bcda000 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -478,6 +478,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self, persistence: FlowPersistence | None = None, tracing: bool | None = None, + suppress_flow_events: bool = False, **kwargs: Any, ) -> None: """Initialize a new Flow instance. @@ -485,6 +486,7 @@ class Flow(Generic[T], metaclass=FlowMeta): Args: persistence: Optional persistence backend for storing flow states tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings + suppress_flow_events: Whether to suppress flow event emissions (internal use) **kwargs: Additional state values to initialize or override """ # Initialize basic instance attributes @@ -498,6 +500,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._persistence: FlowPersistence | None = persistence self._is_execution_resuming: bool = False self._event_futures: list[Future[None]] = [] + self.suppress_flow_events: bool = suppress_flow_events # Initialize state with initial values self._state = self._create_initial_state() @@ -511,13 +514,14 @@ class Flow(Generic[T], metaclass=FlowMeta): if kwargs: self._initialize_state(kwargs) - crewai_event_bus.emit( - self, - FlowCreatedEvent( - type="flow_created", - flow_name=self.name or self.__class__.__name__, - ), - ) + if not self.suppress_flow_events: + crewai_event_bus.emit( + self, + FlowCreatedEvent( + type="flow_created", + flow_name=self.name or self.__class__.__name__, + ), + ) # Register all flow-related methods for method_name in dir(self): @@ -974,19 +978,17 @@ class Flow(Generic[T], metaclass=FlowMeta): self._initialize_state(filtered_inputs) # Emit FlowStartedEvent and log the start of the flow. - future = crewai_event_bus.emit( - self, - FlowStartedEvent( - type="flow_started", - flow_name=self.name or self.__class__.__name__, - inputs=inputs, - ), - ) - if future: - self._event_futures.append(future) - self._log_flow_event( - f"Flow started with ID: {self.flow_id}", color="bold_magenta" - ) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + FlowStartedEvent( + type="flow_started", + flow_name=self.name or self.__class__.__name__, + inputs=inputs, + ), + ) + if future: + self._event_futures.append(future) if inputs is not None and "id" not in inputs: self._initialize_state(inputs) @@ -1002,17 +1004,18 @@ class Flow(Generic[T], metaclass=FlowMeta): final_output = self._method_outputs[-1] if self._method_outputs else None - future = crewai_event_bus.emit( - self, - FlowFinishedEvent( - type="flow_finished", - flow_name=self.name or self.__class__.__name__, - result=final_output, - state=self._copy_and_serialize_state(), - ), - ) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + FlowFinishedEvent( + type="flow_finished", + flow_name=self.name or self.__class__.__name__, + result=final_output, + state=self._copy_and_serialize_state(), + ), + ) + if future: + self._event_futures.append(future) if self._event_futures: await asyncio.gather( @@ -1111,18 +1114,19 @@ class Flow(Generic[T], metaclass=FlowMeta): kwargs or {} ) - future = crewai_event_bus.emit( - self, - MethodExecutionStartedEvent( - type="method_execution_started", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - params=dumped_params, - state=self._copy_and_serialize_state(), - ), - ) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + MethodExecutionStartedEvent( + type="method_execution_started", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + params=dumped_params, + state=self._copy_and_serialize_state(), + ), + ) + if future: + self._event_futures.append(future) result = ( await method(*args, **kwargs) @@ -1137,32 +1141,34 @@ class Flow(Generic[T], metaclass=FlowMeta): self._completed_methods.add(method_name) - future = crewai_event_bus.emit( - self, - MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - state=self._copy_and_serialize_state(), - result=result, - ), - ) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + result=result, + ), + ) + if future: + self._event_futures.append(future) return result except Exception as e: - future = crewai_event_bus.emit( - self, - MethodExecutionFailedEvent( - type="method_execution_failed", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - error=e, - ), - ) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + MethodExecutionFailedEvent( + type="method_execution_failed", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + error=e, + ), + ) + if future: + self._event_futures.append(future) raise e def _copy_and_serialize_state(self) -> dict[str, Any]: