mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-05 15:09:22 +00:00
Fix #5972: Allow multi-source OR listeners to re-fire in cyclic flows
When a router emits a signal that should re-trigger an or_() listener that has already fired, the listener was permanently blocked by _fired_or_listeners. This created a catch-22: the clearing logic in _execute_single_listener (for cyclic re-execution) could never be reached because _find_triggered_methods blocked the listener first. Fix: Before processing router results as triggers, reset _fired_or_listeners entries for any OR listeners whose conditions match the router output signals. This preserves concurrent start-method protection (original trigger_method) while unblocking cyclic re-triggering through routers. Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -1271,6 +1271,34 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
with self._or_listeners_lock:
|
||||
self._fired_or_listeners.discard(listener_name)
|
||||
|
||||
def _reset_or_listeners_for_router_results(
|
||||
self, router_results: list[FlowMethodName]
|
||||
) -> None:
|
||||
"""Reset fired OR listeners that could be triggered by router results.
|
||||
|
||||
This enables cyclic flow re-triggering when a router emits a signal
|
||||
that should re-trigger an OR listener that has already fired in the
|
||||
current execution. Without this, _fired_or_listeners permanently blocks
|
||||
the listener and the clearing logic in _execute_single_listener (which
|
||||
handles cyclic re-execution) can never be reached.
|
||||
|
||||
Only clears listeners matching router results, preserving the concurrent
|
||||
start-method protection for the original trigger_method.
|
||||
"""
|
||||
with self._or_listeners_lock:
|
||||
for listener_name in list(self._fired_or_listeners):
|
||||
condition_data = self._listeners.get(listener_name)
|
||||
if condition_data is None:
|
||||
continue
|
||||
if is_simple_flow_condition(condition_data):
|
||||
_, methods = condition_data
|
||||
if any(r in methods for r in router_results):
|
||||
self._fired_or_listeners.discard(listener_name)
|
||||
elif is_flow_condition_dict(condition_data):
|
||||
all_methods = _extract_all_methods_recursive(condition_data)
|
||||
if any(r in all_methods for r in router_results):
|
||||
self._fired_or_listeners.discard(listener_name)
|
||||
|
||||
def _build_racing_groups(self) -> dict[frozenset[FlowMethodName], FlowMethodName]:
|
||||
"""Identify groups of methods that race for the same OR listener.
|
||||
|
||||
@@ -2847,6 +2875,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Now execute normal listeners for all router results and the original trigger
|
||||
all_triggers = [trigger_method, *router_results]
|
||||
|
||||
# For router results, clear _fired_or_listeners entries for any OR
|
||||
# listeners that could be triggered by those results. This unblocks
|
||||
# cyclic flows where a router emits a signal that should re-trigger an
|
||||
# OR listener that already fired earlier in the same execution.
|
||||
if router_results:
|
||||
self._reset_or_listeners_for_router_results(router_results)
|
||||
|
||||
for current_trigger in all_triggers:
|
||||
if current_trigger: # Skip None results
|
||||
listeners_triggered = self._find_triggered_methods(
|
||||
|
||||
@@ -1737,6 +1737,108 @@ def test_cyclic_flow_multiple_or_listeners_fire_every_iteration():
|
||||
)
|
||||
|
||||
|
||||
def test_or_listener_retriggers_via_different_router_signals():
|
||||
"""Test that an or_() listener re-fires when a router emits different signals.
|
||||
|
||||
Regression test for issue #5972: @listen(or_(A, B, C)) multi-source OR
|
||||
listener only fires once, blocking cyclic flow re-triggering.
|
||||
|
||||
Pattern: start → router emits "SignalA" → handler (or_ listener) fires →
|
||||
handler's router emits "SignalB" → handler should fire AGAIN (but was
|
||||
blocked by _fired_or_listeners).
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class OrRetriggerFlow(Flow):
|
||||
iteration = 0
|
||||
max_iterations = 3
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
execution_order.append("begin")
|
||||
|
||||
@router(begin)
|
||||
def init_router(self):
|
||||
execution_order.append("init_router")
|
||||
return "SignalA"
|
||||
|
||||
@listen(or_("SignalA", "SignalB"))
|
||||
def handler(self):
|
||||
self.iteration += 1
|
||||
execution_order.append(f"handler_{self.iteration}")
|
||||
|
||||
@router(handler)
|
||||
def handler_router(self):
|
||||
execution_order.append(f"handler_router_{self.iteration}")
|
||||
if self.iteration < self.max_iterations:
|
||||
return "SignalB"
|
||||
return "done"
|
||||
|
||||
@listen("done")
|
||||
def finish(self):
|
||||
execution_order.append("finish")
|
||||
|
||||
flow = OrRetriggerFlow()
|
||||
flow.kickoff()
|
||||
|
||||
# handler must fire max_iterations times (once for SignalA, rest for SignalB)
|
||||
handler_events = [e for e in execution_order if e.startswith("handler_") and not e.startswith("handler_router")]
|
||||
assert len(handler_events) == 3, (
|
||||
f"or_() listener 'handler' should fire every iteration via different "
|
||||
f"router signals, got {len(handler_events)} fires: {execution_order}"
|
||||
)
|
||||
|
||||
assert "finish" in execution_order, (
|
||||
f"Flow should have reached 'finish', got: {execution_order}"
|
||||
)
|
||||
|
||||
|
||||
def test_or_listener_retriggers_via_same_router_signal():
|
||||
"""Test that an or_() listener re-fires when a router emits the same signal repeatedly.
|
||||
|
||||
Variant of issue #5972: the router always emits the SAME signal that
|
||||
re-triggers the or_() listener.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class SameSignalFlow(Flow):
|
||||
iteration = 0
|
||||
max_iterations = 3
|
||||
|
||||
@start()
|
||||
def begin(self):
|
||||
execution_order.append("begin")
|
||||
|
||||
@router(begin)
|
||||
def init_router(self):
|
||||
return "process"
|
||||
|
||||
@listen(or_("process", "other"))
|
||||
def handler(self):
|
||||
self.iteration += 1
|
||||
execution_order.append(f"handler_{self.iteration}")
|
||||
|
||||
@router(handler)
|
||||
def handler_router(self):
|
||||
if self.iteration < self.max_iterations:
|
||||
return "process"
|
||||
return "exit"
|
||||
|
||||
@listen("exit")
|
||||
def finish(self):
|
||||
execution_order.append("finish")
|
||||
|
||||
flow = SameSignalFlow()
|
||||
flow.kickoff()
|
||||
|
||||
handler_events = [e for e in execution_order if e.startswith("handler_") and not e.startswith("handler_router")]
|
||||
assert len(handler_events) == 3, (
|
||||
f"or_() listener 'handler' should re-fire via same router signal, "
|
||||
f"got {len(handler_events)} fires: {execution_order}"
|
||||
)
|
||||
assert "finish" in execution_order
|
||||
|
||||
|
||||
def test_cyclic_flow_works_with_persist_and_id_input():
|
||||
"""Cyclic router flows must complete all iterations when persistence is
|
||||
enabled and 'id' is passed in inputs.
|
||||
|
||||
Reference in New Issue
Block a user