diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 8a06e2da6..0bec95a96 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1348,9 +1348,26 @@ class Flow(Generic[T], metaclass=FlowMeta): self._initialize_state(inputs) try: + # Determine which start methods to execute at kickoff + # Conditional start methods (with __trigger_methods__) are only triggered by their conditions + # UNLESS there are no unconditional starts (then all starts run as entry points) + unconditional_starts = [ + start_method + for start_method in self._start_methods + if not getattr( + self._methods.get(start_method), "__trigger_methods__", None + ) + ] + # If there are unconditional starts, only run those at kickoff + # If there are NO unconditional starts, run all starts (including conditional ones) + starts_to_execute = ( + unconditional_starts + if unconditional_starts + else self._start_methods + ) tasks = [ self._execute_start_method(start_method) - for start_method in self._start_methods + for start_method in starts_to_execute ] await asyncio.gather(*tasks) except Exception as e: @@ -1753,14 +1770,16 @@ class Flow(Generic[T], metaclass=FlowMeta): should_trigger = current_trigger in all_methods if should_trigger: - # Only execute if this is a cycle (method was already completed) + # Execute conditional start method triggered by router result if method_name in self._completed_methods: - # For router-triggered start methods in cycles, temporarily clear resumption flag - # to allow cyclic execution + # For cyclic re-execution, temporarily clear resumption flag was_resuming = self._is_execution_resuming self._is_execution_resuming = False await self._execute_start_method(method_name) self._is_execution_resuming = was_resuming + else: + # First-time execution of conditional start + await self._execute_start_method(method_name) def _evaluate_condition( self, @@ -1904,6 +1923,17 @@ class Flow(Generic[T], metaclass=FlowMeta): if self._is_execution_resuming: # During resumption, skip execution but continue listeners await self._execute_listeners(listener_name, None) + + # For routers, also check if any conditional starts they triggered are completed + # If so, continue their chains + if listener_name in self._routers: + for start_method_name in self._start_methods: + if ( + start_method_name in self._listeners + and start_method_name in self._completed_methods + ): + # This conditional start was executed, continue its chain + await self._execute_start_method(start_method_name) return # For cyclic flows, clear from completed to allow re-execution self._completed_methods.discard(listener_name)