From 7bb873657ae6360e84f1bebf1a83aeb255f50148 Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Fri, 13 Feb 2026 21:50:26 -0800 Subject: [PATCH] Enhance cyclic flow handling for or_() listeners - Updated the Flow class to ensure that all fired or_() listeners are cleared between cycle iterations, allowing them to fire again in subsequent cycles. This change addresses a bug where listeners remained suppressed across iterations. - Added regression tests to verify that or_() listeners fire correctly on every iteration in cyclic flows, ensuring expected behavior in complex routing scenarios. --- lib/crewai/src/crewai/flow/flow.py | 8 +- lib/crewai/tests/test_flow.py | 125 +++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 9d060978f..c3ac1ad72 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2533,8 +2533,12 @@ class Flow(Generic[T], metaclass=FlowMeta): return (None, None) # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name) - # Also clear from fired OR listeners for cyclic flows - self._discard_or_listener(listener_name) + # Clear ALL fired OR listeners so they can fire again in the new cycle. + # This mirrors what _execute_start_method does for start-method cycles. + # Only discarding the individual listener is insufficient because + # downstream or_() listeners (e.g., method_a listening to + # or_(handler_a, handler_b)) would remain suppressed across iterations. + self._clear_or_listeners() try: method = self._methods[listener_name] diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 2040e9e5b..0ec4b3063 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1647,3 +1647,128 @@ class TestFlowAkickoff: assert execution_order == ["begin", "route", "path_a"] assert result == "path_a_result" + + +def test_cyclic_flow_or_listeners_fire_every_iteration(): + """Test that or_() listeners reset between cycle iterations through a router. + + Regression test for a bug where _fired_or_listeners was not cleared when + cycles loop through a router/listener instead of a @start method, causing + or_() listeners to permanently suppress after the first iteration. + + Pattern: router classifies → routes to ONE of several handlers → or_() + merge downstream → cycle back. Only one handler fires per iteration, but + the or_() merge must still fire every time. + """ + execution_order = [] + + class CyclicOrFlow(Flow): + iteration = 0 + max_iterations = 3 + + @start() + def begin(self): + execution_order.append("begin") + + @router(or_(begin, "loop_back")) + def route(self): + self.iteration += 1 + execution_order.append(f"route_{self.iteration}") + if self.iteration <= self.max_iterations: + # Alternate between handlers on each iteration + return "type_a" if self.iteration % 2 == 1 else "type_b" + return "done" + + @listen("type_a") + def handler_a(self): + execution_order.append(f"handler_a_{self.iteration}") + + @listen("type_b") + def handler_b(self): + execution_order.append(f"handler_b_{self.iteration}") + + # This or_() listener must fire on EVERY iteration, not just the first + @listen(or_(handler_a, handler_b)) + def merge(self): + execution_order.append(f"merge_{self.iteration}") + + @listen(merge) + def loop_back(self): + execution_order.append(f"loop_back_{self.iteration}") + + flow = CyclicOrFlow() + flow.kickoff() + + # merge must have fired once per iteration (3 times total) + merge_events = [e for e in execution_order if e.startswith("merge_")] + assert len(merge_events) == 3, ( + f"or_() listener 'merge' should fire every iteration, " + f"got {len(merge_events)} fires: {execution_order}" + ) + + # loop_back must have also fired every iteration + loop_back_events = [e for e in execution_order if e.startswith("loop_back_")] + assert len(loop_back_events) == 3, ( + f"'loop_back' should fire every iteration, " + f"got {len(loop_back_events)} fires: {execution_order}" + ) + + # Verify alternating handlers + handler_a_events = [e for e in execution_order if e.startswith("handler_a_")] + handler_b_events = [e for e in execution_order if e.startswith("handler_b_")] + assert len(handler_a_events) == 2 # iterations 1 and 3 + assert len(handler_b_events) == 1 # iteration 2 + + +def test_cyclic_flow_multiple_or_listeners_fire_every_iteration(): + """Test that multiple or_() listeners all reset between cycle iterations. + + Mirrors a real-world pattern: a router classifies messages, handlers process + them, then both a 'send' step (or_ on handlers) and a 'store' step (or_ on + router outputs) must fire on every loop iteration. + """ + execution_order = [] + + class MultiOrCyclicFlow(Flow): + iteration = 0 + max_iterations = 3 + + @start() + def begin(self): + execution_order.append("begin") + + @router(or_(begin, "capture")) + def classify(self): + self.iteration += 1 + execution_order.append(f"classify_{self.iteration}") + if self.iteration <= self.max_iterations: + return "type_a" + return "exit" + + @listen("type_a") + def handle_type_a(self): + execution_order.append(f"handle_a_{self.iteration}") + + # or_() listener on router output strings — must fire every iteration + @listen(or_("type_a", "type_b", "type_c")) + def store(self): + execution_order.append(f"store_{self.iteration}") + + # or_() listener on handler methods — must fire every iteration + @listen(or_(handle_type_a,)) + def send(self): + execution_order.append(f"send_{self.iteration}") + + @listen("send") + def capture(self): + execution_order.append(f"capture_{self.iteration}") + + flow = MultiOrCyclicFlow() + flow.kickoff() + + for method in ["store", "send", "capture"]: + events = [e for e in execution_order if e.startswith(f"{method}_")] + assert len(events) == 3, ( + f"'{method}' should fire every iteration, " + f"got {len(events)} fires: {execution_order}" + )