mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-15 19:18:30 +00:00
Enhance Flow Execution Logic
- Introduced conditional execution for start methods in the Flow class. - Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present. - Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers. - Added checks to continue execution chains for completed conditional starts. These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions.
This commit is contained in:
@@ -1348,9 +1348,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._initialize_state(inputs)
|
||||
|
||||
try:
|
||||
# Determine which start methods to execute at kickoff
|
||||
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
|
||||
# UNLESS there are no unconditional starts (then all starts run as entry points)
|
||||
unconditional_starts = [
|
||||
start_method
|
||||
for start_method in self._start_methods
|
||||
if not getattr(
|
||||
self._methods.get(start_method), "__trigger_methods__", None
|
||||
)
|
||||
]
|
||||
# If there are unconditional starts, only run those at kickoff
|
||||
# If there are NO unconditional starts, run all starts (including conditional ones)
|
||||
starts_to_execute = (
|
||||
unconditional_starts
|
||||
if unconditional_starts
|
||||
else self._start_methods
|
||||
)
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in self._start_methods
|
||||
for start_method in starts_to_execute
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
except Exception as e:
|
||||
@@ -1753,14 +1770,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
should_trigger = current_trigger in all_methods
|
||||
|
||||
if should_trigger:
|
||||
# Only execute if this is a cycle (method was already completed)
|
||||
# Execute conditional start method triggered by router result
|
||||
if method_name in self._completed_methods:
|
||||
# For router-triggered start methods in cycles, temporarily clear resumption flag
|
||||
# to allow cyclic execution
|
||||
# For cyclic re-execution, temporarily clear resumption flag
|
||||
was_resuming = self._is_execution_resuming
|
||||
self._is_execution_resuming = False
|
||||
await self._execute_start_method(method_name)
|
||||
self._is_execution_resuming = was_resuming
|
||||
else:
|
||||
# First-time execution of conditional start
|
||||
await self._execute_start_method(method_name)
|
||||
|
||||
def _evaluate_condition(
|
||||
self,
|
||||
@@ -1904,6 +1923,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if self._is_execution_resuming:
|
||||
# During resumption, skip execution but continue listeners
|
||||
await self._execute_listeners(listener_name, None)
|
||||
|
||||
# For routers, also check if any conditional starts they triggered are completed
|
||||
# If so, continue their chains
|
||||
if listener_name in self._routers:
|
||||
for start_method_name in self._start_methods:
|
||||
if (
|
||||
start_method_name in self._listeners
|
||||
and start_method_name in self._completed_methods
|
||||
):
|
||||
# This conditional start was executed, continue its chain
|
||||
await self._execute_start_method(start_method_name)
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
self._completed_methods.discard(listener_name)
|
||||
|
||||
Reference in New Issue
Block a user