diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 85bb077ee..fbdad3c06 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -459,6 +459,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._methods: dict[str, Callable] = {} self._method_execution_counts: dict[str, int] = {} self._pending_and_listeners: dict[str, set[str]] = {} + self._triggered_or_listeners: set[str] = set() # Track OR listeners that have already triggered self._method_outputs: list[Any] = [] # list to store all method outputs self._completed_methods: set[str] = set() # Track completed methods for reload self._persistence: FlowPersistence | None = persistence @@ -822,6 +823,7 @@ class Flow(Generic[T], metaclass=FlowMeta): # Clear completed methods and outputs for a fresh start self._completed_methods.clear() self._method_outputs.clear() + self._triggered_or_listeners.clear() else: # We're restoring from persistence, set the flag self._is_execution_resuming = True @@ -922,6 +924,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(start_method_name) + self._triggered_or_listeners.clear() method = self._methods[start_method_name] enhanced_method = self._inject_trigger_payload_for_start_method(method) @@ -1124,9 +1127,10 @@ class Flow(Generic[T], metaclass=FlowMeta): Notes ----- - Handles both OR and AND conditions: - * OR: Triggers if any condition is met + * OR: Triggers if any condition is met (but only once per listener) * AND: Triggers only when all conditions are met - Maintains state for AND conditions using _pending_and_listeners + - Tracks OR listeners to prevent multiple triggers from the same condition - Separates router and normal listener evaluation """ triggered = [] @@ -1140,9 +1144,15 @@ class Flow(Generic[T], metaclass=FlowMeta): continue if condition_type == "OR": + # Check if this OR listener has already been triggered + if listener_name in self._triggered_or_listeners: + # Skip this listener as it has already been triggered by another method in the OR condition + continue + # If the trigger_method matches any in methods, run this if trigger_method in methods: triggered.append(listener_name) + self._triggered_or_listeners.add(listener_name) elif condition_type == "AND": # Initialize pending methods for this listener if not already done if listener_name not in self._pending_and_listeners: @@ -1195,6 +1205,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name) + self._triggered_or_listeners.discard(listener_name) try: method = self._methods[listener_name] diff --git a/tests/test_nested_or_conditions.py b/tests/test_nested_or_conditions.py new file mode 100644 index 000000000..c90608b44 --- /dev/null +++ b/tests/test_nested_or_conditions.py @@ -0,0 +1,297 @@ +"""Test nested or_() conditions in Flow execution (Issue #3719).""" + +import pytest + +from crewai.flow.flow import Flow, listen, or_, router, start + + +def test_nested_or_condition_triggers_once(): + """Test that nested or_() conditions only trigger listeners once. + + This test addresses issue #3719 where nested or_() conditions would + cause listeners to execute multiple times instead of once. + + Setup: + method_5 listens to or_(method_1, or_(method_2, method_3)) + method_7 listens to or_(method_5, method_6) + + Expected behavior: + - method_5 should execute exactly once (triggered by first matching condition) + - method_7 should execute exactly once (triggered by first matching condition) + + Bug behavior (before fix): + - method_5 executed 3 times (once for method_1, method_2, and method_3) + - method_7 executed 4 times (once for each method_5 execution + method_6) + """ + execution_order = [] + + class NestedOrFlow(Flow): + @start() + def method_1(self): + execution_order.append("method_1") + return "method_1_done" + + @listen("method_1") + def method_2(self): + execution_order.append("method_2") + return "method_2_done" + + @listen("method_1") + def method_3(self): + execution_order.append("method_3") + return "method_3_done" + + @listen(or_("method_1", or_("method_2", "method_3"))) + def method_5(self): + execution_order.append("method_5") + return "method_5_done" + + @listen("method_1") + def method_6(self): + execution_order.append("method_6") + return "method_6_done" + + @listen(or_("method_5", "method_6")) + def method_7(self): + execution_order.append("method_7") + return "method_7_done" + + flow = NestedOrFlow() + flow.kickoff() + + assert execution_order.count("method_5") == 1, ( + f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times" + ) + assert execution_order.count("method_7") == 1, ( + f"method_7 should execute exactly once, but executed {execution_order.count('method_7')} times" + ) + + assert "method_1" in execution_order + assert "method_2" in execution_order + assert "method_3" in execution_order + assert "method_5" in execution_order + assert "method_6" in execution_order + assert "method_7" in execution_order + + +def test_simple_or_condition_triggers_once(): + """Test that simple or_() conditions only trigger once. + + Even without nesting, an OR condition should only trigger a listener once, + not multiple times for each method in the OR list. + """ + execution_order = [] + + class SimpleOrFlow(Flow): + @start() + def method_a(self): + execution_order.append("method_a") + + @listen("method_a") + def method_b(self): + execution_order.append("method_b") + + @listen("method_a") + def method_c(self): + execution_order.append("method_c") + + @listen(or_("method_b", "method_c")) + def method_d(self): + execution_order.append("method_d") + + flow = SimpleOrFlow() + flow.kickoff() + + assert execution_order.count("method_d") == 1, ( + f"method_d should execute exactly once, but executed {execution_order.count('method_d')} times" + ) + + +def test_or_condition_with_three_methods(): + """Test OR condition with three methods triggers only once.""" + execution_order = [] + + class ThreeMethodOrFlow(Flow): + @start() + def method_1(self): + execution_order.append("method_1") + + @listen("method_1") + def method_2(self): + execution_order.append("method_2") + + @listen("method_1") + def method_3(self): + execution_order.append("method_3") + + @listen("method_1") + def method_4(self): + execution_order.append("method_4") + + @listen(or_("method_2", "method_3", "method_4")) + def method_5(self): + execution_order.append("method_5") + + flow = ThreeMethodOrFlow() + flow.kickoff() + + assert execution_order.count("method_5") == 1, ( + f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times" + ) + + +def test_multiple_or_listeners_independent(): + """Test that multiple OR listeners are independent of each other.""" + execution_order = [] + + class MultipleOrFlow(Flow): + @start() + def method_1(self): + execution_order.append("method_1") + + @listen("method_1") + def method_2(self): + execution_order.append("method_2") + + @listen("method_1") + def method_3(self): + execution_order.append("method_3") + + @listen(or_("method_2", "method_3")) + def method_a(self): + execution_order.append("method_a") + + @listen(or_("method_2", "method_3")) + def method_b(self): + execution_order.append("method_b") + + flow = MultipleOrFlow() + flow.kickoff() + + assert execution_order.count("method_a") == 1 + assert execution_order.count("method_b") == 1 + + +def test_deeply_nested_or_conditions(): + """Test deeply nested or_() conditions.""" + execution_order = [] + + class DeeplyNestedOrFlow(Flow): + @start() + def start_method(self): + execution_order.append("start_method") + + @listen("start_method") + def method_a(self): + execution_order.append("method_a") + + @listen("start_method") + def method_b(self): + execution_order.append("method_b") + + @listen("start_method") + def method_c(self): + execution_order.append("method_c") + + @listen("start_method") + def method_d(self): + execution_order.append("method_d") + + @listen(or_(or_("method_a", "method_b"), or_("method_c", "method_d"))) + def final_method(self): + execution_order.append("final_method") + + flow = DeeplyNestedOrFlow() + flow.kickoff() + + assert execution_order.count("final_method") == 1, ( + f"final_method should execute exactly once, but executed {execution_order.count('final_method')} times" + ) + + +def test_or_condition_execution_order(): + """Test that OR listener executes after first matching condition. + + The listener should trigger as soon as any one of the OR conditions is met, + not wait for all of them. + """ + execution_order = [] + + class ExecutionOrderFlow(Flow): + @start() + def method_1(self): + execution_order.append("method_1") + + @listen("method_1") + def method_2(self): + execution_order.append("method_2") + + @listen("method_1") + def method_3(self): + execution_order.append("method_3") + + @listen(or_("method_2", "method_3")) + def method_4(self): + execution_order.append("method_4") + + flow = ExecutionOrderFlow() + flow.kickoff() + + method_4_index = execution_order.index("method_4") + + assert "method_2" in execution_order[:method_4_index] or "method_3" in execution_order[:method_4_index], ( + "method_4 should execute after at least one of method_2 or method_3" + ) + + +def test_or_condition_with_single_method(): + """Test OR condition with a single method (edge case).""" + execution_order = [] + + class SingleMethodOrFlow(Flow): + @start() + def method_1(self): + execution_order.append("method_1") + + @listen(or_("method_1")) + def method_2(self): + execution_order.append("method_2") + + flow = SingleMethodOrFlow() + flow.kickoff() + + assert execution_order == ["method_1", "method_2"] + assert execution_order.count("method_2") == 1 + + +def test_cyclic_flow_with_or_condition(): + """Test that OR conditions work correctly in cyclic flows. + + Within a single flow execution, an OR listener should only trigger once + even if multiple methods in the OR condition complete. + """ + execution_order = [] + + class CyclicOrFlow(Flow): + @start() + def step_1(self): + execution_order.append("step_1") + + @listen("step_1") + def step_2(self): + execution_order.append("step_2") + + @listen("step_1") + def step_3(self): + execution_order.append("step_3") + + @listen(or_("step_2", "step_3")) + def step_4(self): + execution_order.append("step_4") + + flow = CyclicOrFlow() + flow.kickoff() + + assert execution_order.count("step_4") == 1, ( + f"step_4 should execute once (not once for each OR condition), but executed {execution_order.count('step_4')} times" + )