mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
4 Commits
1.2.1
...
devin/1760
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9dad68f5c | ||
|
|
8abbae57af | ||
|
|
425bfeaf9f | ||
|
|
b0f6c66c36 |
@@ -31,7 +31,7 @@ from crewai.flow.flow_visualizer import plot_flow
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.types import FlowExecutionData
|
||||
from crewai.flow.utils import get_possible_return_constants
|
||||
from crewai.utilities.printer import Printer
|
||||
from crewai.utilities.printer import Printer, PrinterColor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -459,6 +459,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._methods: dict[str, Callable] = {}
|
||||
self._method_execution_counts: dict[str, int] = {}
|
||||
self._pending_and_listeners: dict[str, set[str]] = {}
|
||||
self._triggered_or_listeners: set[str] = set() # Track OR listeners that have already triggered
|
||||
self._method_outputs: list[Any] = [] # list to store all method outputs
|
||||
self._completed_methods: set[str] = set() # Track completed methods for reload
|
||||
self._persistence: FlowPersistence | None = persistence
|
||||
@@ -822,6 +823,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
# Clear completed methods and outputs for a fresh start
|
||||
self._completed_methods.clear()
|
||||
self._method_outputs.clear()
|
||||
self._triggered_or_listeners.clear()
|
||||
else:
|
||||
# We're restoring from persistence, set the flag
|
||||
self._is_execution_resuming = True
|
||||
@@ -922,6 +924,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(start_method_name)
|
||||
self._triggered_or_listeners.clear()
|
||||
|
||||
method = self._methods[start_method_name]
|
||||
enhanced_method = self._inject_trigger_payload_for_start_method(method)
|
||||
@@ -1086,7 +1089,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
for method_name in self._start_methods:
|
||||
# Check if this start method is triggered by the current trigger
|
||||
if method_name in self._listeners:
|
||||
condition_type, trigger_methods = self._listeners[
|
||||
_, trigger_methods = self._listeners[
|
||||
method_name
|
||||
]
|
||||
if current_trigger in trigger_methods:
|
||||
@@ -1124,9 +1127,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
Notes
|
||||
-----
|
||||
- Handles both OR and AND conditions:
|
||||
* OR: Triggers if any condition is met
|
||||
* OR: Triggers if any condition is met (but only once per listener)
|
||||
* AND: Triggers only when all conditions are met
|
||||
- Maintains state for AND conditions using _pending_and_listeners
|
||||
- Tracks OR listeners to prevent multiple triggers from the same condition
|
||||
- Separates router and normal listener evaluation
|
||||
"""
|
||||
triggered = []
|
||||
@@ -1140,9 +1144,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
continue
|
||||
|
||||
if condition_type == "OR":
|
||||
# Check if this OR listener has already been triggered
|
||||
if listener_name in self._triggered_or_listeners:
|
||||
# Skip this listener as it has already been triggered by another method in the OR condition
|
||||
continue
|
||||
|
||||
# If the trigger_method matches any in methods, run this
|
||||
if trigger_method in methods:
|
||||
triggered.append(listener_name)
|
||||
self._triggered_or_listeners.add(listener_name)
|
||||
elif condition_type == "AND":
|
||||
# Initialize pending methods for this listener if not already done
|
||||
if listener_name not in self._pending_and_listeners:
|
||||
@@ -1195,6 +1205,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(listener_name)
|
||||
self._triggered_or_listeners.discard(listener_name)
|
||||
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
@@ -1218,7 +1229,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
raise
|
||||
|
||||
def _log_flow_event(
|
||||
self, message: str, color: str = "yellow", level: str = "info"
|
||||
self, message: str, color: PrinterColor = "yellow", level: str = "info"
|
||||
) -> None:
|
||||
"""Centralized logging method for flow events.
|
||||
|
||||
|
||||
296
tests/test_nested_or_conditions.py
Normal file
296
tests/test_nested_or_conditions.py
Normal file
@@ -0,0 +1,296 @@
|
||||
"""Test nested or_() conditions in Flow execution (Issue #3719)."""
|
||||
|
||||
|
||||
from crewai.flow.flow import Flow, listen, or_, start
|
||||
|
||||
|
||||
def test_nested_or_condition_triggers_once():
|
||||
"""Test that nested or_() conditions only trigger listeners once.
|
||||
|
||||
This test addresses issue #3719 where nested or_() conditions would
|
||||
cause listeners to execute multiple times instead of once.
|
||||
|
||||
Setup:
|
||||
method_5 listens to or_(method_1, or_(method_2, method_3))
|
||||
method_7 listens to or_(method_5, method_6)
|
||||
|
||||
Expected behavior:
|
||||
- method_5 should execute exactly once (triggered by first matching condition)
|
||||
- method_7 should execute exactly once (triggered by first matching condition)
|
||||
|
||||
Bug behavior (before fix):
|
||||
- method_5 executed 3 times (once for method_1, method_2, and method_3)
|
||||
- method_7 executed 4 times (once for each method_5 execution + method_6)
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class NestedOrFlow(Flow):
|
||||
@start()
|
||||
def method_1(self):
|
||||
execution_order.append("method_1")
|
||||
return "method_1_done"
|
||||
|
||||
@listen("method_1")
|
||||
def method_2(self):
|
||||
execution_order.append("method_2")
|
||||
return "method_2_done"
|
||||
|
||||
@listen("method_1")
|
||||
def method_3(self):
|
||||
execution_order.append("method_3")
|
||||
return "method_3_done"
|
||||
|
||||
@listen(or_("method_1", or_("method_2", "method_3")))
|
||||
def method_5(self):
|
||||
execution_order.append("method_5")
|
||||
return "method_5_done"
|
||||
|
||||
@listen("method_1")
|
||||
def method_6(self):
|
||||
execution_order.append("method_6")
|
||||
return "method_6_done"
|
||||
|
||||
@listen(or_("method_5", "method_6"))
|
||||
def method_7(self):
|
||||
execution_order.append("method_7")
|
||||
return "method_7_done"
|
||||
|
||||
flow = NestedOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("method_5") == 1, (
|
||||
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
|
||||
)
|
||||
assert execution_order.count("method_7") == 1, (
|
||||
f"method_7 should execute exactly once, but executed {execution_order.count('method_7')} times"
|
||||
)
|
||||
|
||||
assert "method_1" in execution_order
|
||||
assert "method_2" in execution_order
|
||||
assert "method_3" in execution_order
|
||||
assert "method_5" in execution_order
|
||||
assert "method_6" in execution_order
|
||||
assert "method_7" in execution_order
|
||||
|
||||
|
||||
def test_simple_or_condition_triggers_once():
|
||||
"""Test that simple or_() conditions only trigger once.
|
||||
|
||||
Even without nesting, an OR condition should only trigger a listener once,
|
||||
not multiple times for each method in the OR list.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class SimpleOrFlow(Flow):
|
||||
@start()
|
||||
def method_a(self):
|
||||
execution_order.append("method_a")
|
||||
|
||||
@listen("method_a")
|
||||
def method_b(self):
|
||||
execution_order.append("method_b")
|
||||
|
||||
@listen("method_a")
|
||||
def method_c(self):
|
||||
execution_order.append("method_c")
|
||||
|
||||
@listen(or_("method_b", "method_c"))
|
||||
def method_d(self):
|
||||
execution_order.append("method_d")
|
||||
|
||||
flow = SimpleOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("method_d") == 1, (
|
||||
f"method_d should execute exactly once, but executed {execution_order.count('method_d')} times"
|
||||
)
|
||||
|
||||
|
||||
def test_or_condition_with_three_methods():
|
||||
"""Test OR condition with three methods triggers only once."""
|
||||
execution_order = []
|
||||
|
||||
class ThreeMethodOrFlow(Flow):
|
||||
@start()
|
||||
def method_1(self):
|
||||
execution_order.append("method_1")
|
||||
|
||||
@listen("method_1")
|
||||
def method_2(self):
|
||||
execution_order.append("method_2")
|
||||
|
||||
@listen("method_1")
|
||||
def method_3(self):
|
||||
execution_order.append("method_3")
|
||||
|
||||
@listen("method_1")
|
||||
def method_4(self):
|
||||
execution_order.append("method_4")
|
||||
|
||||
@listen(or_("method_2", "method_3", "method_4"))
|
||||
def method_5(self):
|
||||
execution_order.append("method_5")
|
||||
|
||||
flow = ThreeMethodOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("method_5") == 1, (
|
||||
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
|
||||
)
|
||||
|
||||
|
||||
def test_multiple_or_listeners_independent():
|
||||
"""Test that multiple OR listeners are independent of each other."""
|
||||
execution_order = []
|
||||
|
||||
class MultipleOrFlow(Flow):
|
||||
@start()
|
||||
def method_1(self):
|
||||
execution_order.append("method_1")
|
||||
|
||||
@listen("method_1")
|
||||
def method_2(self):
|
||||
execution_order.append("method_2")
|
||||
|
||||
@listen("method_1")
|
||||
def method_3(self):
|
||||
execution_order.append("method_3")
|
||||
|
||||
@listen(or_("method_2", "method_3"))
|
||||
def method_a(self):
|
||||
execution_order.append("method_a")
|
||||
|
||||
@listen(or_("method_2", "method_3"))
|
||||
def method_b(self):
|
||||
execution_order.append("method_b")
|
||||
|
||||
flow = MultipleOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("method_a") == 1
|
||||
assert execution_order.count("method_b") == 1
|
||||
|
||||
|
||||
def test_deeply_nested_or_conditions():
|
||||
"""Test deeply nested or_() conditions."""
|
||||
execution_order = []
|
||||
|
||||
class DeeplyNestedOrFlow(Flow):
|
||||
@start()
|
||||
def start_method(self):
|
||||
execution_order.append("start_method")
|
||||
|
||||
@listen("start_method")
|
||||
def method_a(self):
|
||||
execution_order.append("method_a")
|
||||
|
||||
@listen("start_method")
|
||||
def method_b(self):
|
||||
execution_order.append("method_b")
|
||||
|
||||
@listen("start_method")
|
||||
def method_c(self):
|
||||
execution_order.append("method_c")
|
||||
|
||||
@listen("start_method")
|
||||
def method_d(self):
|
||||
execution_order.append("method_d")
|
||||
|
||||
@listen(or_(or_("method_a", "method_b"), or_("method_c", "method_d")))
|
||||
def final_method(self):
|
||||
execution_order.append("final_method")
|
||||
|
||||
flow = DeeplyNestedOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("final_method") == 1, (
|
||||
f"final_method should execute exactly once, but executed {execution_order.count('final_method')} times"
|
||||
)
|
||||
|
||||
|
||||
def test_or_condition_execution_order():
|
||||
"""Test that OR listener executes after first matching condition.
|
||||
|
||||
The listener should trigger as soon as any one of the OR conditions is met,
|
||||
not wait for all of them.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class ExecutionOrderFlow(Flow):
|
||||
@start()
|
||||
def method_1(self):
|
||||
execution_order.append("method_1")
|
||||
|
||||
@listen("method_1")
|
||||
def method_2(self):
|
||||
execution_order.append("method_2")
|
||||
|
||||
@listen("method_1")
|
||||
def method_3(self):
|
||||
execution_order.append("method_3")
|
||||
|
||||
@listen(or_("method_2", "method_3"))
|
||||
def method_4(self):
|
||||
execution_order.append("method_4")
|
||||
|
||||
flow = ExecutionOrderFlow()
|
||||
flow.kickoff()
|
||||
|
||||
method_4_index = execution_order.index("method_4")
|
||||
|
||||
assert "method_2" in execution_order[:method_4_index] or "method_3" in execution_order[:method_4_index], (
|
||||
"method_4 should execute after at least one of method_2 or method_3"
|
||||
)
|
||||
|
||||
|
||||
def test_or_condition_with_single_method():
|
||||
"""Test OR condition with a single method (edge case)."""
|
||||
execution_order = []
|
||||
|
||||
class SingleMethodOrFlow(Flow):
|
||||
@start()
|
||||
def method_1(self):
|
||||
execution_order.append("method_1")
|
||||
|
||||
@listen(or_("method_1"))
|
||||
def method_2(self):
|
||||
execution_order.append("method_2")
|
||||
|
||||
flow = SingleMethodOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order == ["method_1", "method_2"]
|
||||
assert execution_order.count("method_2") == 1
|
||||
|
||||
|
||||
def test_cyclic_flow_with_or_condition():
|
||||
"""Test that OR conditions work correctly in cyclic flows.
|
||||
|
||||
Within a single flow execution, an OR listener should only trigger once
|
||||
even if multiple methods in the OR condition complete.
|
||||
"""
|
||||
execution_order = []
|
||||
|
||||
class CyclicOrFlow(Flow):
|
||||
@start()
|
||||
def step_1(self):
|
||||
execution_order.append("step_1")
|
||||
|
||||
@listen("step_1")
|
||||
def step_2(self):
|
||||
execution_order.append("step_2")
|
||||
|
||||
@listen("step_1")
|
||||
def step_3(self):
|
||||
execution_order.append("step_3")
|
||||
|
||||
@listen(or_("step_2", "step_3"))
|
||||
def step_4(self):
|
||||
execution_order.append("step_4")
|
||||
|
||||
flow = CyclicOrFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert execution_order.count("step_4") == 1, (
|
||||
f"step_4 should execute once (not once for each OR condition), but executed {execution_order.count('step_4')} times"
|
||||
)
|
||||
Reference in New Issue
Block a user