mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-17 12:58:31 +00:00
Compare commits
3 Commits
lg-llm-doc
...
bugfix/asy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
89f7435373 | ||
|
|
ad030d5eec | ||
|
|
00b6ce94dc |
@@ -713,16 +713,35 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
|
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
|
||||||
|
|
||||||
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||||
"""Start the flow execution.
|
"""
|
||||||
|
Start the flow execution in a synchronous context.
|
||||||
|
|
||||||
|
This method wraps kickoff_async so that all state initialization and event
|
||||||
|
emission is handled in the asynchronous method.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def run_flow():
|
||||||
|
return await self.kickoff_async(inputs)
|
||||||
|
|
||||||
|
return asyncio.run(run_flow())
|
||||||
|
|
||||||
|
@init_flow_main_trace
|
||||||
|
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||||
|
"""
|
||||||
|
Start the flow execution asynchronously.
|
||||||
|
|
||||||
|
This method performs state restoration (if an 'id' is provided and persistence is available)
|
||||||
|
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
|
||||||
|
logs the flow startup, and executes all start methods. Once completed, it emits the
|
||||||
|
FlowFinishedEvent and returns the final output.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
inputs: Optional dictionary containing input values and potentially a state ID to restore
|
inputs: Optional dictionary containing input values and/or a state ID for restoration.
|
||||||
"""
|
|
||||||
# Handle state restoration if ID is provided in inputs
|
|
||||||
if inputs and "id" in inputs and self._persistence is not None:
|
|
||||||
restore_uuid = inputs["id"]
|
|
||||||
stored_state = self._persistence.load_state(restore_uuid)
|
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The final output from the flow, which is the result of the last executed method.
|
||||||
|
"""
|
||||||
|
if inputs:
|
||||||
# Override the id in the state if it exists in inputs
|
# Override the id in the state if it exists in inputs
|
||||||
if "id" in inputs:
|
if "id" in inputs:
|
||||||
if isinstance(self._state, dict):
|
if isinstance(self._state, dict):
|
||||||
@@ -730,24 +749,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
elif isinstance(self._state, BaseModel):
|
elif isinstance(self._state, BaseModel):
|
||||||
setattr(self._state, "id", inputs["id"])
|
setattr(self._state, "id", inputs["id"])
|
||||||
|
|
||||||
if stored_state:
|
# If persistence is enabled, attempt to restore the stored state using the provided id.
|
||||||
self._log_flow_event(
|
if "id" in inputs and self._persistence is not None:
|
||||||
f"Loading flow state from memory for UUID: {restore_uuid}",
|
restore_uuid = inputs["id"]
|
||||||
color="yellow",
|
stored_state = self._persistence.load_state(restore_uuid)
|
||||||
)
|
if stored_state:
|
||||||
# Restore the state
|
self._log_flow_event(
|
||||||
self._restore_state(stored_state)
|
f"Loading flow state from memory for UUID: {restore_uuid}",
|
||||||
else:
|
color="yellow",
|
||||||
self._log_flow_event(
|
)
|
||||||
f"No flow state found for UUID: {restore_uuid}", color="red"
|
self._restore_state(stored_state)
|
||||||
)
|
else:
|
||||||
|
self._log_flow_event(
|
||||||
|
f"No flow state found for UUID: {restore_uuid}", color="red"
|
||||||
|
)
|
||||||
|
|
||||||
# Apply any additional inputs after restoration
|
# Update state with any additional inputs (ignoring the 'id' key)
|
||||||
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
|
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
|
||||||
if filtered_inputs:
|
if filtered_inputs:
|
||||||
self._initialize_state(filtered_inputs)
|
self._initialize_state(filtered_inputs)
|
||||||
|
|
||||||
# Start flow execution
|
# Emit FlowStartedEvent and log the start of the flow.
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
FlowStartedEvent(
|
FlowStartedEvent(
|
||||||
@@ -760,27 +782,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
|
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
|
||||||
)
|
)
|
||||||
|
|
||||||
if inputs is not None and "id" not in inputs:
|
|
||||||
self._initialize_state(inputs)
|
|
||||||
|
|
||||||
async def run_flow():
|
|
||||||
return await self.kickoff_async()
|
|
||||||
|
|
||||||
return asyncio.run(run_flow())
|
|
||||||
|
|
||||||
@init_flow_main_trace
|
|
||||||
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
|
||||||
if not self._start_methods:
|
if not self._start_methods:
|
||||||
raise ValueError("No start method defined")
|
raise ValueError("No start method defined")
|
||||||
|
|
||||||
|
# Execute all start methods concurrently.
|
||||||
tasks = [
|
tasks = [
|
||||||
self._execute_start_method(start_method)
|
self._execute_start_method(start_method)
|
||||||
for start_method in self._start_methods
|
for start_method in self._start_methods
|
||||||
]
|
]
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
final_output = self._method_outputs[-1] if self._method_outputs else None
|
final_output = self._method_outputs[-1] if self._method_outputs else None
|
||||||
|
|
||||||
|
# Emit FlowFinishedEvent after all processing is complete.
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
FlowFinishedEvent(
|
FlowFinishedEvent(
|
||||||
|
|||||||
Reference in New Issue
Block a user