diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c85e6202a..01ea13dba 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -780,10 +780,11 @@ class TraceCollectionListener(BaseEventListener): def _try_initialize_flow_batch_from_context(self, event: Any) -> bool: """Claim a flow trace batch when an action event fires inside kickoff. - When ``suppress_flow_events=True``, console panels are hidden but - ``FlowStartedEvent`` and method lifecycle events still emit; if no - batch exists yet, LLM/tool events must not fall back to implicit crew - batches. + When ``suppress_flow_events=True`` (infrastructure flows such as + ``AgentExecutor`` and the memory flows), flow and method lifecycle + events are not emitted, so the batch is claimed from the flow context + (``current_flow_id``) to keep LLM/tool events from falling back to an + implicit crew batch. """ from crewai.flow.flow_context import current_flow_id, current_flow_name diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index ccf5d55ce..ee3fc1e18 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -1420,16 +1420,17 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): if self.persistence is not None: self.persistence.clear_pending_feedback(context.flow_id) - crewai_event_bus.emit( - self, - MethodExecutionFinishedEvent( - type="method_execution_finished", - flow_name=self.name or self.__class__.__name__, - method_name=context.method_name, - result=collapsed_outcome if emit else result, - state=self._state, - ), - ) + if not self.suppress_flow_events: + crewai_event_bus.emit( + self, + MethodExecutionFinishedEvent( + type="method_execution_finished", + flow_name=self.name or self.__class__.__name__, + method_name=context.method_name, + result=collapsed_outcome if emit else result, + state=self._state, + ), + ) # Clear resumption flag before triggering listeners # This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes) @@ -2476,20 +2477,19 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): kwargs or {} ) - # MethodExecution events always fire — ``suppress_flow_events`` - # only hides the Rich console panel, not observability events. - future = crewai_event_bus.emit( - self, - MethodExecutionStartedEvent( - type="method_execution_started", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - params=dumped_params, - state=self._copy_and_serialize_state(), - ), - ) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + MethodExecutionStartedEvent( + type="method_execution_started", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + params=dumped_params, + state=self._copy_and_serialize_state(), + ), + ) + if future: + self._event_futures.append(future) # Set method name in context so ask() can read it without # stack inspection. Must happen before copy_context() so the @@ -2531,19 +2531,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta): self._completed_methods.add(method_name) finished_event_id: str | None = None - # MethodExecution events always fire even when console panels are - # suppressed; tracing depends on them. - finished_event = MethodExecutionFinishedEvent( - type="method_execution_finished", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - state=self._copy_and_serialize_state(), - result=result, - ) - finished_event_id = finished_event.event_id - future = crewai_event_bus.emit(self, finished_event) - if future: - self._event_futures.append(future) + if not self.suppress_flow_events: + finished_event = MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + result=result, + ) + finished_event_id = finished_event.event_id + future = crewai_event_bus.emit(self, finished_event) + if future: + self._event_futures.append(future) return result, finished_event_id except Exception as e: diff --git a/lib/crewai/tests/test_flow_conversation.py b/lib/crewai/tests/test_flow_conversation.py index 87272c23d..03f95080f 100644 --- a/lib/crewai/tests/test_flow_conversation.py +++ b/lib/crewai/tests/test_flow_conversation.py @@ -1281,7 +1281,11 @@ class TestFlowTracingWhenSuppressed: assert started == ["QuietFlow"] - def test_method_execution_emitted_when_panel_events_suppressed(self) -> None: + def test_method_execution_suppressed_when_flow_events_suppressed(self) -> None: + """``suppress_flow_events=True`` silences MethodExecution events so + infrastructure flows (AgentExecutor, memory) don't emit one trace span + per internal control-flow method.""" + class QuietFlow(Flow[ChatState]): suppress_flow_events = True @@ -1303,8 +1307,8 @@ class TestFlowTracingWhenSuppressed: with patch.object(crewai_event_bus, "emit", side_effect=track_emit): QuietFlow().kickoff() - assert started == ["begin"] - assert finished == ["begin"] + assert started == [] + assert finished == [] def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None: listener = TraceCollectionListener() diff --git a/lib/crewai/tests/utilities/test_events.py b/lib/crewai/tests/utilities/test_events.py index 42c6b9f9e..8b71747b0 100644 --- a/lib/crewai/tests/utilities/test_events.py +++ b/lib/crewai/tests/utilities/test_events.py @@ -838,6 +838,74 @@ def test_flow_method_execution_finished_includes_serialized_state(): assert final_output == "final_result" +def test_suppress_flow_events_silences_method_lifecycle_events(): + """``suppress_flow_events=True`` emits no MethodExecution* events on the + bus (used by infrastructure flows like AgentExecutor so their control-flow + methods don't pollute traces), while default flows still emit them.""" + captured: list[tuple[str, str]] = [] + + class SuppressedFlow(Flow): + suppress_flow_events: bool = True + + @start() + def begin(self): + return "started" + + @listen("begin") + def process(self): + return "done" + + class ControlFlow(Flow): + @start() + def begin(self): + return "started" + + @listen("begin") + def process(self): + return "done" + + with crewai_event_bus.scoped_handlers(): + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def _on_started(source, event): + captured.append(("started", type(source).__name__)) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def _on_finished(source, event): + captured.append(("finished", type(source).__name__)) + + SuppressedFlow().kickoff() + wait_for_event_handlers() + assert [e for e in captured if e[1] == "SuppressedFlow"] == [], ( + "suppress_flow_events=True must emit no MethodExecution* events" + ) + + captured.clear() + ControlFlow().kickoff() + wait_for_event_handlers() + control = [e for e in captured if e[1] == "ControlFlow"] + assert ("started", "ControlFlow") in control + assert ("finished", "ControlFlow") in control + + +def test_infrastructure_flows_suppress_flow_events_by_default(): + """Pin the infra flows that must stay silent in traces. + + The gating in ``_execute_method`` only helps if these flows actually set + ``suppress_flow_events=True``; without this guard, removing the flag from + AgentExecutor would silently bring back the verbose per-method trace spans. + """ + from crewai.experimental.agent_executor import AgentExecutor + from crewai.memory.encoding_flow import EncodingFlow + from crewai.memory.recall_flow import RecallFlow + + assert AgentExecutor.model_fields["suppress_flow_events"].default is True + + for flow_cls in (EncodingFlow, RecallFlow): + flow = flow_cls(storage=None, llm=None, embedder=None) + assert flow.suppress_flow_events is True + + @pytest.mark.vcr() def test_llm_emits_call_started_event(): started_events: list[LLMCallStartedEvent] = []