fix: respect suppress_flow_events for method-execution events (#6095)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* fix: respect suppress_flow_events for method-execution events

* test: align suppressed-flow test with new method-event behavior
This commit is contained in:
Lucas Gomide
2026-06-09 18:19:25 -03:00
committed by GitHub
parent ce42994ae3
commit da8fe8c715
4 changed files with 116 additions and 44 deletions

View File

@@ -780,10 +780,11 @@ class TraceCollectionListener(BaseEventListener):
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.
When ``suppress_flow_events=True``, console panels are hidden but
``FlowStartedEvent`` and method lifecycle events still emit; if no
batch exists yet, LLM/tool events must not fall back to implicit crew
batches.
When ``suppress_flow_events=True`` (infrastructure flows such as
``AgentExecutor`` and the memory flows), flow and method lifecycle
events are not emitted, so the batch is claimed from the flow context
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name

View File

@@ -1420,16 +1420,17 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if self.persistence is not None:
self.persistence.clear_pending_feedback(context.flow_id)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
if not self.suppress_flow_events:
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
@@ -2476,20 +2477,19 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
kwargs or {}
)
# MethodExecution events always fire — ``suppress_flow_events``
# only hides the Rich console panel, not observability events.
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
# Set method name in context so ask() can read it without
# stack inspection. Must happen before copy_context() so the
@@ -2531,19 +2531,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
finished_event_id: str | None = None
# MethodExecution events always fire even when console panels are
# suppressed; tracing depends on them.
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
return result, finished_event_id
except Exception as e:

View File

@@ -1281,7 +1281,11 @@ class TestFlowTracingWhenSuppressed:
assert started == ["QuietFlow"]
def test_method_execution_emitted_when_panel_events_suppressed(self) -> None:
def test_method_execution_suppressed_when_flow_events_suppressed(self) -> None:
"""``suppress_flow_events=True`` silences MethodExecution events so
infrastructure flows (AgentExecutor, memory) don't emit one trace span
per internal control-flow method."""
class QuietFlow(Flow[ChatState]):
suppress_flow_events = True
@@ -1303,8 +1307,8 @@ class TestFlowTracingWhenSuppressed:
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
QuietFlow().kickoff()
assert started == ["begin"]
assert finished == ["begin"]
assert started == []
assert finished == []
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
listener = TraceCollectionListener()

View File

@@ -838,6 +838,74 @@ def test_flow_method_execution_finished_includes_serialized_state():
assert final_output == "final_result"
def test_suppress_flow_events_silences_method_lifecycle_events():
"""``suppress_flow_events=True`` emits no MethodExecution* events on the
bus (used by infrastructure flows like AgentExecutor so their control-flow
methods don't pollute traces), while default flows still emit them."""
captured: list[tuple[str, str]] = []
class SuppressedFlow(Flow):
suppress_flow_events: bool = True
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
class ControlFlow(Flow):
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _on_started(source, event):
captured.append(("started", type(source).__name__))
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _on_finished(source, event):
captured.append(("finished", type(source).__name__))
SuppressedFlow().kickoff()
wait_for_event_handlers()
assert [e for e in captured if e[1] == "SuppressedFlow"] == [], (
"suppress_flow_events=True must emit no MethodExecution* events"
)
captured.clear()
ControlFlow().kickoff()
wait_for_event_handlers()
control = [e for e in captured if e[1] == "ControlFlow"]
assert ("started", "ControlFlow") in control
assert ("finished", "ControlFlow") in control
def test_infrastructure_flows_suppress_flow_events_by_default():
"""Pin the infra flows that must stay silent in traces.
The gating in ``_execute_method`` only helps if these flows actually set
``suppress_flow_events=True``; without this guard, removing the flag from
AgentExecutor would silently bring back the verbose per-method trace spans.
"""
from crewai.experimental.agent_executor import AgentExecutor
from crewai.memory.encoding_flow import EncodingFlow
from crewai.memory.recall_flow import RecallFlow
assert AgentExecutor.model_fields["suppress_flow_events"].default is True
for flow_cls in (EncodingFlow, RecallFlow):
flow = flow_cls(storage=None, llm=None, embedder=None)
assert flow.suppress_flow_events is True
@pytest.mark.vcr()
def test_llm_emits_call_started_event():
started_events: list[LLMCallStartedEvent] = []