Compare commits

...

3 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
89f7435373 Merge branch 'main' into bugfix/async-flows 2025-02-24 10:22:54 -05:00
Brandon Hancock
ad030d5eec Drop coroutine 2025-02-21 11:42:29 -05:00
Brandon Hancock
00b6ce94dc Better support async 2025-02-21 11:25:12 -05:00

View File

@@ -713,16 +713,35 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""Start the flow execution.
"""
Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
"""
async def run_flow():
return await self.kickoff_async(inputs)
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
logs the flow startup, and executes all start methods. Once completed, it emits the
FlowFinishedEvent and returns the final output.
Args:
inputs: Optional dictionary containing input values and potentially a state ID to restore
"""
# Handle state restoration if ID is provided in inputs
if inputs and "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
inputs: Optional dictionary containing input values and/or a state ID for restoration.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
@@ -730,24 +749,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
# Restore the state
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# Apply any additional inputs after restoration
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
# Start flow execution
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
@@ -760,27 +782,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
async def run_flow():
return await self.kickoff_async()
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
# Execute all start methods concurrently.
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
final_output = self._method_outputs[-1] if self._method_outputs else None
# Emit FlowFinishedEvent after all processing is complete.
crewai_event_bus.emit(
self,
FlowFinishedEvent(