add inputs to flows

This commit is contained in:
Brandon Hancock
2024-11-01 16:36:07 -04:00
parent 34954e6f74
commit cf90bd3105
2 changed files with 131 additions and 68 deletions

View File

@@ -1,8 +1,20 @@
import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
@@ -191,10 +203,66 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
def kickoff(self) -> Any:
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
"""
Initializes or updates the state with the provided inputs.
Args:
inputs: Dictionary of inputs to initialize or update the state.
Raises:
ValueError: If inputs do not match the structured state model.
TypeError: If state is neither a BaseModel instance nor a dictionary.
"""
if isinstance(self._state, BaseModel):
# Structured state management
try:
M = self._state.__class__
# Dynamically create a new model class with 'extra' set to 'forbid'
class ModelWithExtraForbid(M):
model_config = M.model_config.copy()
model_config["extra"] = "forbid"
# Create a new instance using the combined state and inputs
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
)
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
# Unstructured state management
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow synchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
async def kickoff_async(self) -> Any:
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow asynchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
if not self._start_methods:
raise ValueError("No start method defined")