Compare commits

..

2 Commits

Author SHA1 Message Date
Vinicius Brasil
461fed1c5c Simplify flow condition evaluation to be stateless per event
Re-evaluate the whole `@listen`/`@router` condition tree against the set
of events seen so far, instead of tracking which AND sub-branches remain
pending.

Net effect:
* Fixes a regression where `or_()` short-circuited at the first
  satisfied branch, leaving a sibling `and_()` half-complete so a later
  trigger could spuriously re-fire the listener
* Removes the fragile per-branch pending state and `id()`-based keys
* Shrinks the evaluator to one readable predicate
2026-06-09 21:58:56 -07:00
Lucas Gomide
da8fe8c715 fix: respect suppress_flow_events for method-execution events (#6095)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* fix: respect suppress_flow_events for method-execution events

* test: align suppressed-flow test with new method-event behavior
2026-06-09 17:19:25 -04:00
8 changed files with 220 additions and 143 deletions

View File

@@ -780,10 +780,11 @@ class TraceCollectionListener(BaseEventListener):
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.
When ``suppress_flow_events=True``, console panels are hidden but
``FlowStartedEvent`` and method lifecycle events still emit; if no
batch exists yet, LLM/tool events must not fall back to implicit crew
batches.
When ``suppress_flow_events=True`` (infrastructure flows such as
``AgentExecutor`` and the memory flows), flow and method lifecycle
events are not emitted, so the batch is claimed from the flow context
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name

View File

@@ -84,7 +84,7 @@ class _ConversationalMixin:
name: str | None
_completed_methods: set[Any]
_method_outputs: list[Any]
_pending_and_listeners: dict[Any, Any]
_pending_events: dict[Any, Any]
_method_call_counts: dict[Any, int]
_is_execution_resuming: bool
_pending_user_message: str | dict[str, Any] | None
@@ -581,7 +581,7 @@ class _ConversationalMixin:
"""Clear per-execution tracking so the next turn re-runs the graph."""
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._pending_events.clear()
self._method_call_counts.clear()
self._clear_or_listeners()
self._is_execution_resuming = False

View File

@@ -154,14 +154,42 @@ ExecutionContext = Any # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
def _condition_branches(
condition: dict[str, Any],
) -> tuple[Literal["and", "or"], list[FlowDefinitionCondition]]:
if "and" in condition:
return "and", condition["and"]
return "or", condition["or"]
def _condition_satisfied(condition: FlowDefinitionCondition, events: set[str]) -> bool:
if isinstance(condition, str):
return condition in events
operator, branches = _condition_branches(condition)
combine = all if operator == "and" else any
return combine(_condition_satisfied(branch, events) for branch in branches)
def _iter_condition_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
sub_conditions = condition["and"] if "and" in condition else condition["or"]
for sub_condition in sub_conditions:
yield from _iter_condition_events(sub_condition)
_, branches = _condition_branches(condition)
for branch in branches:
yield from _iter_condition_events(branch)
def _or_alternative_events(condition: FlowDefinitionCondition) -> Iterator[str]:
if isinstance(condition, str):
yield condition
return
operator, branches = _condition_branches(condition)
if operator != "or":
return
for branch in branches:
yield from _or_alternative_events(branch)
def _is_multi_event_or(
@@ -170,7 +198,8 @@ def _is_multi_event_or(
if isinstance(condition, str):
return False
return "or" in condition and len(condition["or"]) > 1
operator, branches = _condition_branches(condition)
return operator == "or" and len(branches) > 1
def _resolve_persistence(value: Any) -> Any:
@@ -864,7 +893,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
_method_execution_counts: dict[FlowMethodName, int] = PrivateAttr(
default_factory=dict
)
_pending_and_listeners: dict[PendingListenerKey, set[int]] = PrivateAttr(
_pending_events: dict[PendingListenerKey, set[str]] = PrivateAttr(
default_factory=dict
)
_fired_or_listeners: set[FlowMethodName] = PrivateAttr(default_factory=set)
@@ -1027,11 +1056,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
condition = type(self)._start_condition(method_name)
if condition is None:
return False
return self._evaluate_condition(
condition,
trigger,
method_name,
pending_key_prefix=f"start:{method_name}",
return self._condition_met(
condition, trigger, PendingListenerKey(f"start:{method_name}")
)
def _rearm_or_listeners_for_trigger(
@@ -1071,6 +1097,9 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# Only events that EXCLUSIVELY feed one OR listener race; an event that
# also feeds another listener (e.g. an AND) is left alone when a sibling
# wins. e.g. @listen(or_(a, b)) on handler -> {frozenset({a, b}): handler}.
# Events nested under an and_() branch (e.g. or_(and_(a, b), c)) are not
# alternatives and never race -- cancelling one would make the AND
# unsatisfiable.
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
listener_conditions: dict[FlowMethodName, FlowDefinitionCondition] = {
listener_name: condition
@@ -1093,14 +1122,14 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
for listener_name, condition in listener_conditions.items():
if not isinstance(condition, dict):
continue
events = events_by_listener[listener_name]
if "or" not in condition or len(events) <= 1:
alternatives = set(_or_alternative_events(condition))
if len(alternatives) <= 1:
continue
exclusive_events = {
event
for event in events
if listeners_by_event.get(event, set()) == {listener_name}
for event in alternatives
if listeners_by_event[event] == {listener_name}
}
if len(exclusive_events) > 1:
# Racing only applies to method-completion events: each member is
@@ -1420,16 +1449,17 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if self.persistence is not None:
self.persistence.clear_pending_feedback(context.flow_id)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
if not self.suppress_flow_events:
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
flow_name=self.name or self.__class__.__name__,
method_name=context.method_name,
result=collapsed_outcome if emit else result,
state=self._state,
),
)
# Clear resumption flag before triggering listeners
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
@@ -2027,7 +2057,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._pending_events.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
@@ -2476,20 +2506,19 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
kwargs or {}
)
# MethodExecution events always fire — ``suppress_flow_events``
# only hides the Rich console panel, not observability events.
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_and_serialize_state(),
),
)
if future:
self._event_futures.append(future)
# Set method name in context so ask() can read it without
# stack inspection. Must happen before copy_context() so the
@@ -2531,19 +2560,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
self._completed_methods.add(method_name)
finished_event_id: str | None = None
# MethodExecution events always fire even when console panels are
# suppressed; tracing depends on them.
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
if not self.suppress_flow_events:
finished_event = MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
result=result,
)
finished_event_id = finished_event.event_id
future = crewai_event_bus.emit(self, finished_event)
if future:
self._event_futures.append(future)
return result, finished_event_id
except Exception as e:
@@ -2726,63 +2754,18 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
else:
await self._execute_start_method(method_name)
def _evaluate_condition(
def _condition_met(
self,
condition: FlowDefinitionCondition,
trigger_method: FlowMethodName,
listener_name: FlowMethodName,
pending_key_prefix: str | None = None,
subscription_key: PendingListenerKey,
) -> bool:
if isinstance(condition, str):
return condition == str(trigger_method)
def _sub_prefix(index: int) -> str | None:
if pending_key_prefix is None:
return None
return f"{pending_key_prefix}:{index}"
if "or" in condition:
# Evaluate every sub-condition (no short-circuit): a nested and_()
# branch needs the chance to clear its pending state in
# _pending_and_listeners even when an earlier branch already matched.
any_matched = False
for index, sub_condition in enumerate(condition["or"]):
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
any_matched = True
return any_matched
sub_conditions = condition["and"]
pending_key = PendingListenerKey(
pending_key_prefix
if pending_key_prefix is not None
else f"{listener_name}:{id(condition)}"
)
if pending_key not in self._pending_and_listeners:
self._pending_and_listeners[pending_key] = set(range(len(sub_conditions)))
pending_conditions = self._pending_and_listeners[pending_key]
for index, sub_condition in enumerate(sub_conditions):
if index not in pending_conditions:
continue
if self._evaluate_condition(
sub_condition,
trigger_method,
listener_name,
pending_key_prefix=_sub_prefix(index),
):
pending_conditions.discard(index)
if not pending_conditions:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
seen = self._pending_events.setdefault(subscription_key, set())
seen.add(str(trigger_method))
if not _condition_satisfied(condition, seen):
return False
del self._pending_events[subscription_key]
return True
def _find_triggered_methods(
self, trigger_method: FlowMethodName, router_only: bool
@@ -2800,10 +2783,8 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._evaluate_condition(
condition,
trigger_method,
listener_name,
if self._condition_met(
condition, trigger_method, PendingListenerKey(str(listener_name))
):
triggered.append(listener_name)
if should_check_fired:

View File

@@ -16,7 +16,7 @@ R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],
Annotated[str, "listener method name, or 'start:<method>' for conditional starts"],
)

View File

@@ -32,7 +32,7 @@ def _build_executor(**kwargs: Any) -> AgentExecutor:
executor._method_outputs = []
executor._completed_methods = set()
executor._fired_or_listeners = set()
executor._pending_and_listeners = {}
executor._pending_events = {}
executor._method_execution_counts = {}
executor._method_call_counts = {}
executor._event_futures = []

View File

@@ -1542,40 +1542,63 @@ def test_deeply_nested_conditions():
def test_or_branch_does_not_leave_stale_and_state():
"""or_() over nested and_() branches must not leave stale pending AND state.
Regression: evaluating an or_() condition stopped at the first branch that was
satisfied, so a later and_() branch that the *same* trigger would have completed
never cleared its pending state. On the next cycle that trigger alone then
spuriously re-satisfied the whole condition. Both branches share the final
event ``x`` here, so the shared trigger that completes branch ``(a AND x)`` also
completes branch ``(c AND x)`` and both must be cleared together.
"""
fired = []
class StaleStateFlow(Flow):
@start()
def begin(self):
pass
@listen(or_(and_("a", "x"), and_("c", "x")))
def joined(self):
@listen(begin)
def a(self):
pass
flow = StaleStateFlow()
condition = type(flow)._listen_condition("joined")
@listen(begin)
def c(self):
pass
def fires(trigger):
return flow._evaluate_condition(condition, trigger, "joined")
@listen(and_(a, c))
def x(self):
pass
# First cycle: "a" then "c" arrive, then the shared "x" completes (a AND x).
assert fires("a") is False
assert fires("c") is False
assert fires("x") is True
@listen(or_(and_("a", "x"), and_("c", "y")))
def joined(self):
fired.append("joined")
# Next cycle: "x" alone must NOT re-satisfy the condition. The "c" from the
# previous cycle was consumed when "joined" fired, so neither branch is half
# complete and "x" by itself is insufficient.
assert fires("x") is False
@router(joined)
def emit_y(self):
return "y"
StaleStateFlow().kickoff()
assert fired == ["joined"]
def test_and_branch_inside_or_does_not_race():
execution_order = []
class DiamondWithFallbackFlow(Flow):
@start()
def go(self):
execution_order.append("go")
@listen(go)
def a(self):
execution_order.append("a")
@listen(go)
def b(self):
execution_order.append("b")
@listen(or_(and_(a, b), "fallback"))
def done(self):
execution_order.append("done")
DiamondWithFallbackFlow().kickoff()
assert "done" in execution_order
assert execution_order.index("done") > execution_order.index("a")
assert execution_order.index("done") > execution_order.index("b")
def test_mixed_sync_async_execution_order():

View File

@@ -1281,7 +1281,11 @@ class TestFlowTracingWhenSuppressed:
assert started == ["QuietFlow"]
def test_method_execution_emitted_when_panel_events_suppressed(self) -> None:
def test_method_execution_suppressed_when_flow_events_suppressed(self) -> None:
"""``suppress_flow_events=True`` silences MethodExecution events so
infrastructure flows (AgentExecutor, memory) don't emit one trace span
per internal control-flow method."""
class QuietFlow(Flow[ChatState]):
suppress_flow_events = True
@@ -1303,8 +1307,8 @@ class TestFlowTracingWhenSuppressed:
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
QuietFlow().kickoff()
assert started == ["begin"]
assert finished == ["begin"]
assert started == []
assert finished == []
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
listener = TraceCollectionListener()

View File

@@ -838,6 +838,74 @@ def test_flow_method_execution_finished_includes_serialized_state():
assert final_output == "final_result"
def test_suppress_flow_events_silences_method_lifecycle_events():
"""``suppress_flow_events=True`` emits no MethodExecution* events on the
bus (used by infrastructure flows like AgentExecutor so their control-flow
methods don't pollute traces), while default flows still emit them."""
captured: list[tuple[str, str]] = []
class SuppressedFlow(Flow):
suppress_flow_events: bool = True
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
class ControlFlow(Flow):
@start()
def begin(self):
return "started"
@listen("begin")
def process(self):
return "done"
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _on_started(source, event):
captured.append(("started", type(source).__name__))
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _on_finished(source, event):
captured.append(("finished", type(source).__name__))
SuppressedFlow().kickoff()
wait_for_event_handlers()
assert [e for e in captured if e[1] == "SuppressedFlow"] == [], (
"suppress_flow_events=True must emit no MethodExecution* events"
)
captured.clear()
ControlFlow().kickoff()
wait_for_event_handlers()
control = [e for e in captured if e[1] == "ControlFlow"]
assert ("started", "ControlFlow") in control
assert ("finished", "ControlFlow") in control
def test_infrastructure_flows_suppress_flow_events_by_default():
"""Pin the infra flows that must stay silent in traces.
The gating in ``_execute_method`` only helps if these flows actually set
``suppress_flow_events=True``; without this guard, removing the flag from
AgentExecutor would silently bring back the verbose per-method trace spans.
"""
from crewai.experimental.agent_executor import AgentExecutor
from crewai.memory.encoding_flow import EncodingFlow
from crewai.memory.recall_flow import RecallFlow
assert AgentExecutor.model_fields["suppress_flow_events"].default is True
for flow_cls in (EncodingFlow, RecallFlow):
flow = flow_cls(storage=None, llm=None, embedder=None)
assert flow.suppress_flow_events is True
@pytest.mark.vcr()
def test_llm_emits_call_started_event():
started_events: list[LLMCallStartedEvent] = []