diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 12b6733bd..005d14de8 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -1,14 +1,13 @@ from __future__ import annotations import asyncio -from collections.abc import Callable, Sequence +from collections.abc import Callable, Coroutine, Sequence import shutil import subprocess import time from typing import ( TYPE_CHECKING, Any, - Coroutine, Final, Literal, cast, diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 6c26dbb67..82cc9d788 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -520,6 +520,9 @@ class Flow(Generic[T], metaclass=FlowMeta): self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {} self._method_execution_counts: dict[FlowMethodName, int] = {} self._pending_and_listeners: dict[PendingListenerKey, set[FlowMethodName]] = {} + self._fired_or_listeners: set[FlowMethodName] = ( + set() + ) # Track OR listeners that already fired self._method_outputs: list[Any] = [] # list to store all method outputs self._completed_methods: set[FlowMethodName] = ( set() @@ -1297,6 +1300,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._completed_methods.clear() self._method_outputs.clear() self._pending_and_listeners.clear() + self._fired_or_listeners.clear() else: # We're restoring from persistence, set the flag self._is_execution_resuming = True @@ -1500,6 +1504,8 @@ class Flow(Generic[T], metaclass=FlowMeta): return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(start_method_name) + # Also clear fired OR listeners to allow them to fire again in new cycle + self._fired_or_listeners.clear() method = self._methods[start_method_name] enhanced_method = self._inject_trigger_payload_for_start_method(method) @@ -1877,8 +1883,21 @@ class Flow(Generic[T], metaclass=FlowMeta): condition_type, methods = condition_data if condition_type == OR_CONDITION: - if trigger_method in methods: - triggered.append(listener_name) + # Only trigger multi-source OR listeners (or_(A, B, C)) once - skip if already fired + # Simple single-method listeners fire every time their trigger occurs + # Routers also fire every time - they're decision points + has_multiple_triggers = len(methods) > 1 + should_check_fired = has_multiple_triggers and not is_router + + if ( + not should_check_fired + or listener_name not in self._fired_or_listeners + ): + if trigger_method in methods: + triggered.append(listener_name) + # Only track multi-source OR listeners (not single-method or routers) + if should_check_fired: + self._fired_or_listeners.add(listener_name) elif condition_type == AND_CONDITION: pending_key = PendingListenerKey(listener_name) if pending_key not in self._pending_and_listeners: @@ -1891,10 +1910,26 @@ class Flow(Generic[T], metaclass=FlowMeta): self._pending_and_listeners.pop(pending_key, None) elif is_flow_condition_dict(condition_data): + # For complex conditions, check if top-level is OR and track accordingly + top_level_type = condition_data.get("type", OR_CONDITION) + is_or_based = top_level_type == OR_CONDITION + + # Only track multi-source OR conditions (multiple sub-conditions), not routers + sub_conditions = condition_data.get("conditions", []) + has_multiple_triggers = is_or_based and len(sub_conditions) > 1 + should_check_fired = has_multiple_triggers and not is_router + + # Skip compound OR-based listeners that have already fired + if should_check_fired and listener_name in self._fired_or_listeners: + continue + if self._evaluate_condition( condition_data, trigger_method, listener_name ): triggered.append(listener_name) + # Track compound OR-based listeners so they only fire once + if should_check_fired: + self._fired_or_listeners.add(listener_name) return triggered @@ -1937,6 +1972,8 @@ class Flow(Generic[T], metaclass=FlowMeta): return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name) + # Also clear from fired OR listeners for cyclic flows + self._fired_or_listeners.discard(listener_name) try: method = self._methods[listener_name] diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index 7a5bccb53..54562d41a 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -1202,8 +1202,9 @@ def test_complex_and_or_branching(): ) assert execution_order.index("branch_2b") > min_branch_1_index - # Final should be last and after both 2a and 2b - assert execution_order[-1] == "final" + # Final should be after both 2a and 2b + # Note: final may not be absolutely last due to independent branches (like branch_1c) + # that don't contribute to the final result path with sequential listener execution assert execution_order.index("final") > execution_order.index("branch_2a") assert execution_order.index("final") > execution_order.index("branch_2b")