From 601eda9095da2452754329a159aacd3b993b09da Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 15 Jan 2026 09:29:25 -0800 Subject: [PATCH] Enhance Flow Execution Logic - Introduced conditional execution for start methods in the Flow class. - Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present. - Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers. - Added checks to continue execution chains for completed conditional starts. These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions. --- lib/crewai/src/crewai/flow/flow.py | 38 ++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 8a06e2da6..0bec95a96 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1348,9 +1348,26 @@ class Flow(Generic[T], metaclass=FlowMeta): self._initialize_state(inputs) try: + # Determine which start methods to execute at kickoff + # Conditional start methods (with __trigger_methods__) are only triggered by their conditions + # UNLESS there are no unconditional starts (then all starts run as entry points) + unconditional_starts = [ + start_method + for start_method in self._start_methods + if not getattr( + self._methods.get(start_method), "__trigger_methods__", None + ) + ] + # If there are unconditional starts, only run those at kickoff + # If there are NO unconditional starts, run all starts (including conditional ones) + starts_to_execute = ( + unconditional_starts + if unconditional_starts + else self._start_methods + ) tasks = [ self._execute_start_method(start_method) - for start_method in self._start_methods + for start_method in starts_to_execute ] await asyncio.gather(*tasks) except Exception as e: @@ -1753,14 +1770,16 @@ class Flow(Generic[T], metaclass=FlowMeta): should_trigger = current_trigger in all_methods if should_trigger: - # Only execute if this is a cycle (method was already completed) + # Execute conditional start method triggered by router result if method_name in self._completed_methods: - # For router-triggered start methods in cycles, temporarily clear resumption flag - # to allow cyclic execution + # For cyclic re-execution, temporarily clear resumption flag was_resuming = self._is_execution_resuming self._is_execution_resuming = False await self._execute_start_method(method_name) self._is_execution_resuming = was_resuming + else: + # First-time execution of conditional start + await self._execute_start_method(method_name) def _evaluate_condition( self, @@ -1904,6 +1923,17 @@ class Flow(Generic[T], metaclass=FlowMeta): if self._is_execution_resuming: # During resumption, skip execution but continue listeners await self._execute_listeners(listener_name, None) + + # For routers, also check if any conditional starts they triggered are completed + # If so, continue their chains + if listener_name in self._routers: + for start_method_name in self._start_methods: + if ( + start_method_name in self._listeners + and start_method_name in self._completed_methods + ): + # This conditional start was executed, continue its chain + await self._execute_start_method(start_method_name) return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name)