Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
d9dad68f5c Fix type-checker: use PrinterColor type for color parameter
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:23:42 +00:00
Devin AI
8abbae57af Fix lint: use underscore for unused condition_type variable
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:18:06 +00:00
Devin AI
425bfeaf9f Fix lint: remove trailing whitespace from docstrings
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:16:00 +00:00
Devin AI
b0f6c66c36 Fix nested or_() conditions triggering listeners multiple times (issue #3719)
- Add tracking of triggered OR listeners to prevent multiple executions
- OR listeners now execute only once when any condition in the OR is met
- Clear OR listener tracking when entering new cycles or restarting flows
- Add comprehensive test suite for nested OR conditions
- All existing tests pass with no regressions

Fixes #3719

Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:12:30 +00:00
2 changed files with 311 additions and 4 deletions

View File

@@ -31,7 +31,7 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.utilities.printer import Printer
from crewai.utilities.printer import Printer, PrinterColor
logger = logging.getLogger(__name__)
@@ -459,6 +459,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods: dict[str, Callable] = {}
self._method_execution_counts: dict[str, int] = {}
self._pending_and_listeners: dict[str, set[str]] = {}
self._triggered_or_listeners: set[str] = set() # Track OR listeners that have already triggered
self._method_outputs: list[Any] = [] # list to store all method outputs
self._completed_methods: set[str] = set() # Track completed methods for reload
self._persistence: FlowPersistence | None = persistence
@@ -822,6 +823,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._triggered_or_listeners.clear()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -922,6 +924,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
self._triggered_or_listeners.clear()
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
@@ -1086,7 +1089,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
for method_name in self._start_methods:
# Check if this start method is triggered by the current trigger
if method_name in self._listeners:
condition_type, trigger_methods = self._listeners[
_, trigger_methods = self._listeners[
method_name
]
if current_trigger in trigger_methods:
@@ -1124,9 +1127,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Handles both OR and AND conditions:
* OR: Triggers if any condition is met
* OR: Triggers if any condition is met (but only once per listener)
* AND: Triggers only when all conditions are met
- Maintains state for AND conditions using _pending_and_listeners
- Tracks OR listeners to prevent multiple triggers from the same condition
- Separates router and normal listener evaluation
"""
triggered = []
@@ -1140,9 +1144,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
continue
if condition_type == "OR":
# Check if this OR listener has already been triggered
if listener_name in self._triggered_or_listeners:
# Skip this listener as it has already been triggered by another method in the OR condition
continue
# If the trigger_method matches any in methods, run this
if trigger_method in methods:
triggered.append(listener_name)
self._triggered_or_listeners.add(listener_name)
elif condition_type == "AND":
# Initialize pending methods for this listener if not already done
if listener_name not in self._pending_and_listeners:
@@ -1195,6 +1205,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
self._triggered_or_listeners.discard(listener_name)
try:
method = self._methods[listener_name]
@@ -1218,7 +1229,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise
def _log_flow_event(
self, message: str, color: str = "yellow", level: str = "info"
self, message: str, color: PrinterColor = "yellow", level: str = "info"
) -> None:
"""Centralized logging method for flow events.

View File

@@ -0,0 +1,296 @@
"""Test nested or_() conditions in Flow execution (Issue #3719)."""
from crewai.flow.flow import Flow, listen, or_, start
def test_nested_or_condition_triggers_once():
"""Test that nested or_() conditions only trigger listeners once.
This test addresses issue #3719 where nested or_() conditions would
cause listeners to execute multiple times instead of once.
Setup:
method_5 listens to or_(method_1, or_(method_2, method_3))
method_7 listens to or_(method_5, method_6)
Expected behavior:
- method_5 should execute exactly once (triggered by first matching condition)
- method_7 should execute exactly once (triggered by first matching condition)
Bug behavior (before fix):
- method_5 executed 3 times (once for method_1, method_2, and method_3)
- method_7 executed 4 times (once for each method_5 execution + method_6)
"""
execution_order = []
class NestedOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
return "method_1_done"
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
return "method_2_done"
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
return "method_3_done"
@listen(or_("method_1", or_("method_2", "method_3")))
def method_5(self):
execution_order.append("method_5")
return "method_5_done"
@listen("method_1")
def method_6(self):
execution_order.append("method_6")
return "method_6_done"
@listen(or_("method_5", "method_6"))
def method_7(self):
execution_order.append("method_7")
return "method_7_done"
flow = NestedOrFlow()
flow.kickoff()
assert execution_order.count("method_5") == 1, (
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
)
assert execution_order.count("method_7") == 1, (
f"method_7 should execute exactly once, but executed {execution_order.count('method_7')} times"
)
assert "method_1" in execution_order
assert "method_2" in execution_order
assert "method_3" in execution_order
assert "method_5" in execution_order
assert "method_6" in execution_order
assert "method_7" in execution_order
def test_simple_or_condition_triggers_once():
"""Test that simple or_() conditions only trigger once.
Even without nesting, an OR condition should only trigger a listener once,
not multiple times for each method in the OR list.
"""
execution_order = []
class SimpleOrFlow(Flow):
@start()
def method_a(self):
execution_order.append("method_a")
@listen("method_a")
def method_b(self):
execution_order.append("method_b")
@listen("method_a")
def method_c(self):
execution_order.append("method_c")
@listen(or_("method_b", "method_c"))
def method_d(self):
execution_order.append("method_d")
flow = SimpleOrFlow()
flow.kickoff()
assert execution_order.count("method_d") == 1, (
f"method_d should execute exactly once, but executed {execution_order.count('method_d')} times"
)
def test_or_condition_with_three_methods():
"""Test OR condition with three methods triggers only once."""
execution_order = []
class ThreeMethodOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen("method_1")
def method_4(self):
execution_order.append("method_4")
@listen(or_("method_2", "method_3", "method_4"))
def method_5(self):
execution_order.append("method_5")
flow = ThreeMethodOrFlow()
flow.kickoff()
assert execution_order.count("method_5") == 1, (
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
)
def test_multiple_or_listeners_independent():
"""Test that multiple OR listeners are independent of each other."""
execution_order = []
class MultipleOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen(or_("method_2", "method_3"))
def method_a(self):
execution_order.append("method_a")
@listen(or_("method_2", "method_3"))
def method_b(self):
execution_order.append("method_b")
flow = MultipleOrFlow()
flow.kickoff()
assert execution_order.count("method_a") == 1
assert execution_order.count("method_b") == 1
def test_deeply_nested_or_conditions():
"""Test deeply nested or_() conditions."""
execution_order = []
class DeeplyNestedOrFlow(Flow):
@start()
def start_method(self):
execution_order.append("start_method")
@listen("start_method")
def method_a(self):
execution_order.append("method_a")
@listen("start_method")
def method_b(self):
execution_order.append("method_b")
@listen("start_method")
def method_c(self):
execution_order.append("method_c")
@listen("start_method")
def method_d(self):
execution_order.append("method_d")
@listen(or_(or_("method_a", "method_b"), or_("method_c", "method_d")))
def final_method(self):
execution_order.append("final_method")
flow = DeeplyNestedOrFlow()
flow.kickoff()
assert execution_order.count("final_method") == 1, (
f"final_method should execute exactly once, but executed {execution_order.count('final_method')} times"
)
def test_or_condition_execution_order():
"""Test that OR listener executes after first matching condition.
The listener should trigger as soon as any one of the OR conditions is met,
not wait for all of them.
"""
execution_order = []
class ExecutionOrderFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen(or_("method_2", "method_3"))
def method_4(self):
execution_order.append("method_4")
flow = ExecutionOrderFlow()
flow.kickoff()
method_4_index = execution_order.index("method_4")
assert "method_2" in execution_order[:method_4_index] or "method_3" in execution_order[:method_4_index], (
"method_4 should execute after at least one of method_2 or method_3"
)
def test_or_condition_with_single_method():
"""Test OR condition with a single method (edge case)."""
execution_order = []
class SingleMethodOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen(or_("method_1"))
def method_2(self):
execution_order.append("method_2")
flow = SingleMethodOrFlow()
flow.kickoff()
assert execution_order == ["method_1", "method_2"]
assert execution_order.count("method_2") == 1
def test_cyclic_flow_with_or_condition():
"""Test that OR conditions work correctly in cyclic flows.
Within a single flow execution, an OR listener should only trigger once
even if multiple methods in the OR condition complete.
"""
execution_order = []
class CyclicOrFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen("step_1")
def step_2(self):
execution_order.append("step_2")
@listen("step_1")
def step_3(self):
execution_order.append("step_3")
@listen(or_("step_2", "step_3"))
def step_4(self):
execution_order.append("step_4")
flow = CyclicOrFlow()
flow.kickoff()
assert execution_order.count("step_4") == 1, (
f"step_4 should execute once (not once for each OR condition), but executed {execution_order.count('step_4')} times"
)