mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-16 11:38:31 +00:00
Enhance Flow Listener Logic and Agent Imports
- Updated the Flow class to track fired OR listeners, ensuring that multi-source OR listeners only trigger once during execution. This prevents redundant executions and improves flow efficiency. - Cleared fired OR listeners during cyclic flow resets to allow re-execution in new cycles. - Modified the Agent class imports to include Coroutine from collections.abc, enhancing type handling for asynchronous operations. These changes improve the control and performance of flow execution in CrewAI, ensuring more predictable behavior in complex scenarios.
This commit is contained in:
@@ -1,14 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable, Sequence
|
||||
from collections.abc import Callable, Coroutine, Sequence
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Coroutine,
|
||||
Final,
|
||||
Literal,
|
||||
cast,
|
||||
|
||||
@@ -520,6 +520,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {}
|
||||
self._method_execution_counts: dict[FlowMethodName, int] = {}
|
||||
self._pending_and_listeners: dict[PendingListenerKey, set[FlowMethodName]] = {}
|
||||
self._fired_or_listeners: set[FlowMethodName] = (
|
||||
set()
|
||||
) # Track OR listeners that already fired
|
||||
self._method_outputs: list[Any] = [] # list to store all method outputs
|
||||
self._completed_methods: set[FlowMethodName] = (
|
||||
set()
|
||||
@@ -1297,6 +1300,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._completed_methods.clear()
|
||||
self._method_outputs.clear()
|
||||
self._pending_and_listeners.clear()
|
||||
self._fired_or_listeners.clear()
|
||||
else:
|
||||
# We're restoring from persistence, set the flag
|
||||
self._is_execution_resuming = True
|
||||
@@ -1500,6 +1504,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(start_method_name)
|
||||
# Also clear fired OR listeners to allow them to fire again in new cycle
|
||||
self._fired_or_listeners.clear()
|
||||
|
||||
method = self._methods[start_method_name]
|
||||
enhanced_method = self._inject_trigger_payload_for_start_method(method)
|
||||
@@ -1877,8 +1883,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
condition_type, methods = condition_data
|
||||
|
||||
if condition_type == OR_CONDITION:
|
||||
if trigger_method in methods:
|
||||
triggered.append(listener_name)
|
||||
# Only trigger multi-source OR listeners (or_(A, B, C)) once - skip if already fired
|
||||
# Simple single-method listeners fire every time their trigger occurs
|
||||
# Routers also fire every time - they're decision points
|
||||
has_multiple_triggers = len(methods) > 1
|
||||
should_check_fired = has_multiple_triggers and not is_router
|
||||
|
||||
if (
|
||||
not should_check_fired
|
||||
or listener_name not in self._fired_or_listeners
|
||||
):
|
||||
if trigger_method in methods:
|
||||
triggered.append(listener_name)
|
||||
# Only track multi-source OR listeners (not single-method or routers)
|
||||
if should_check_fired:
|
||||
self._fired_or_listeners.add(listener_name)
|
||||
elif condition_type == AND_CONDITION:
|
||||
pending_key = PendingListenerKey(listener_name)
|
||||
if pending_key not in self._pending_and_listeners:
|
||||
@@ -1891,10 +1910,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._pending_and_listeners.pop(pending_key, None)
|
||||
|
||||
elif is_flow_condition_dict(condition_data):
|
||||
# For complex conditions, check if top-level is OR and track accordingly
|
||||
top_level_type = condition_data.get("type", OR_CONDITION)
|
||||
is_or_based = top_level_type == OR_CONDITION
|
||||
|
||||
# Only track multi-source OR conditions (multiple sub-conditions), not routers
|
||||
sub_conditions = condition_data.get("conditions", [])
|
||||
has_multiple_triggers = is_or_based and len(sub_conditions) > 1
|
||||
should_check_fired = has_multiple_triggers and not is_router
|
||||
|
||||
# Skip compound OR-based listeners that have already fired
|
||||
if should_check_fired and listener_name in self._fired_or_listeners:
|
||||
continue
|
||||
|
||||
if self._evaluate_condition(
|
||||
condition_data, trigger_method, listener_name
|
||||
):
|
||||
triggered.append(listener_name)
|
||||
# Track compound OR-based listeners so they only fire once
|
||||
if should_check_fired:
|
||||
self._fired_or_listeners.add(listener_name)
|
||||
|
||||
return triggered
|
||||
|
||||
@@ -1937,6 +1972,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(listener_name)
|
||||
# Also clear from fired OR listeners for cyclic flows
|
||||
self._fired_or_listeners.discard(listener_name)
|
||||
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
|
||||
@@ -1202,8 +1202,9 @@ def test_complex_and_or_branching():
|
||||
)
|
||||
assert execution_order.index("branch_2b") > min_branch_1_index
|
||||
|
||||
# Final should be last and after both 2a and 2b
|
||||
assert execution_order[-1] == "final"
|
||||
# Final should be after both 2a and 2b
|
||||
# Note: final may not be absolutely last due to independent branches (like branch_1c)
|
||||
# that don't contribute to the final result path with sequential listener execution
|
||||
assert execution_order.index("final") > execution_order.index("branch_2a")
|
||||
assert execution_order.index("final") > execution_order.index("branch_2b")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user