add inputs to flows (#1553)

* add inputs to flows

* fix flows lint
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-11-01 17:37:02 -04:00
committed by GitHub
parent 3878daffd6
commit 9b142e580b
2 changed files with 139 additions and 68 deletions

View File

@@ -1,8 +1,20 @@
import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
@@ -191,10 +203,74 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
def kickoff(self) -> Any:
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
"""
Initializes or updates the state with the provided inputs.
Args:
inputs: Dictionary of inputs to initialize or update the state.
Raises:
ValueError: If inputs do not match the structured state model.
TypeError: If state is neither a BaseModel instance nor a dictionary.
"""
if isinstance(self._state, BaseModel):
# Structured state management
try:
# Define a function to create the dynamic class
def create_model_with_extra_forbid(
base_model: Type[BaseModel],
) -> Type[BaseModel]:
class ModelWithExtraForbid(base_model): # type: ignore
model_config = base_model.model_config.copy()
model_config["extra"] = "forbid"
return ModelWithExtraForbid
# Create the dynamic class
ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__
)
# Create a new instance using the combined state and inputs
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
)
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
# Unstructured state management
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow synchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
async def kickoff_async(self) -> Any:
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow asynchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
if not self._start_methods:
raise ValueError("No start method defined")