diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 2babbe57c..5f17c4b84 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -713,16 +713,35 @@ class Flow(Generic[T], metaclass=FlowMeta): raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}") def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any: - """Start the flow execution. + """ + Start the flow execution in a synchronous context. + + This method wraps kickoff_async so that all state initialization and event + emission is handled in the asynchronous method. + """ + + async def run_flow(): + return await self.kickoff_async(inputs) + + return asyncio.run(run_flow()) + + @init_flow_main_trace + async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any: + """ + Start the flow execution asynchronously. + + This method performs state restoration (if an 'id' is provided and persistence is available) + and updates the flow state with any additional inputs. It then emits the FlowStartedEvent, + logs the flow startup, and executes all start methods. Once completed, it emits the + FlowFinishedEvent and returns the final output. Args: - inputs: Optional dictionary containing input values and potentially a state ID to restore - """ - # Handle state restoration if ID is provided in inputs - if inputs and "id" in inputs and self._persistence is not None: - restore_uuid = inputs["id"] - stored_state = self._persistence.load_state(restore_uuid) + inputs: Optional dictionary containing input values and/or a state ID for restoration. + Returns: + The final output from the flow, which is the result of the last executed method. + """ + if inputs: # Override the id in the state if it exists in inputs if "id" in inputs: if isinstance(self._state, dict): @@ -730,24 +749,27 @@ class Flow(Generic[T], metaclass=FlowMeta): elif isinstance(self._state, BaseModel): setattr(self._state, "id", inputs["id"]) - if stored_state: - self._log_flow_event( - f"Loading flow state from memory for UUID: {restore_uuid}", - color="yellow", - ) - # Restore the state - self._restore_state(stored_state) - else: - self._log_flow_event( - f"No flow state found for UUID: {restore_uuid}", color="red" - ) + # If persistence is enabled, attempt to restore the stored state using the provided id. + if "id" in inputs and self._persistence is not None: + restore_uuid = inputs["id"] + stored_state = self._persistence.load_state(restore_uuid) + if stored_state: + self._log_flow_event( + f"Loading flow state from memory for UUID: {restore_uuid}", + color="yellow", + ) + self._restore_state(stored_state) + else: + self._log_flow_event( + f"No flow state found for UUID: {restore_uuid}", color="red" + ) - # Apply any additional inputs after restoration + # Update state with any additional inputs (ignoring the 'id' key) filtered_inputs = {k: v for k, v in inputs.items() if k != "id"} if filtered_inputs: self._initialize_state(filtered_inputs) - # Start flow execution + # Emit FlowStartedEvent and log the start of the flow. crewai_event_bus.emit( self, FlowStartedEvent( @@ -760,27 +782,18 @@ class Flow(Generic[T], metaclass=FlowMeta): f"Flow started with ID: {self.flow_id}", color="bold_magenta" ) - if inputs is not None and "id" not in inputs: - self._initialize_state(inputs) - - async def run_flow(): - return await self.kickoff_async() - - return asyncio.run(run_flow()) - - @init_flow_main_trace - async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any: if not self._start_methods: raise ValueError("No start method defined") + # Execute all start methods concurrently. tasks = [ self._execute_start_method(start_method) for start_method in self._start_methods ] await asyncio.gather(*tasks) - final_output = self._method_outputs[-1] if self._method_outputs else None + # Emit FlowFinishedEvent after all processing is complete. crewai_event_bus.emit( self, FlowFinishedEvent(