diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx index cf9d20f63..de01f4c1d 100644 --- a/docs/concepts/flows.mdx +++ b/docs/concepts/flows.mdx @@ -35,6 +35,8 @@ class ExampleFlow(Flow): @start() def generate_city(self): print("Starting flow") + # Each flow state automatically gets a unique ID + print(f"Flow State ID: {self.state['id']}") response = completion( model=self.model, @@ -47,6 +49,8 @@ class ExampleFlow(Flow): ) random_city = response["choices"][0]["message"]["content"] + # Store the city in our state + self.state["city"] = random_city print(f"Random City: {random_city}") return random_city @@ -64,6 +68,8 @@ class ExampleFlow(Flow): ) fun_fact = response["choices"][0]["message"]["content"] + # Store the fun fact in our state + self.state["fun_fact"] = fun_fact return fun_fact @@ -76,7 +82,15 @@ print(f"Generated fun fact: {result}") In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task. -When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console. +Each Flow instance automatically receives a unique identifier (UUID) in its state, which helps track and manage flow executions. The state can also store additional data (like the generated city and fun fact) that persists throughout the flow's execution. + +When you run the Flow, it will: +1. Generate a unique ID for the flow state +2. Generate a random city and store it in the state +3. Generate a fun fact about that city and store it in the state +4. Print the results to the console + +The state's unique ID and stored data can be useful for tracking flow executions and maintaining context between tasks. **Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API. @@ -207,14 +221,17 @@ allowing developers to choose the approach that best fits their application's ne In unstructured state management, all state is stored in the `state` attribute of the `Flow` class. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema. +Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance. ```python Code from crewai.flow.flow import Flow, listen, start -class UntructuredExampleFlow(Flow): +class UnstructuredExampleFlow(Flow): @start() def first_method(self): + # The state automatically includes an 'id' field + print(f"State ID: {self.state['id']}") self.state.message = "Hello from structured flow" self.state.counter = 0 @@ -231,10 +248,12 @@ class UntructuredExampleFlow(Flow): print(f"State after third_method: {self.state}") -flow = UntructuredExampleFlow() +flow = UnstructuredExampleFlow() flow.kickoff() ``` +**Note:** The `id` field is automatically generated and preserved throughout the flow's execution. You don't need to manage or set it manually, and it will be maintained even when updating the state with new data. + **Key Points:** - **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints. @@ -245,12 +264,15 @@ flow.kickoff() Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments. +Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system. + ```python Code from crewai.flow.flow import Flow, listen, start from pydantic import BaseModel class ExampleState(BaseModel): + # Note: 'id' field is automatically added to all states counter: int = 0 message: str = "" @@ -259,6 +281,8 @@ class StructuredExampleFlow(Flow[ExampleState]): @start() def first_method(self): + # Access the auto-generated ID if needed + print(f"State ID: {self.state.id}") self.state.message = "Hello from structured flow" @listen(first_method) @@ -628,4 +652,4 @@ Also, check out our YouTube video on how to use flows in CrewAI below! allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen -> \ No newline at end of file +> diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index dc46aa6d8..f10626ce4 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -13,9 +13,10 @@ from typing import ( Union, cast, ) +from uuid import uuid4 from blinker import Signal -from pydantic import BaseModel, ValidationError +from pydantic import BaseModel, Field, ValidationError from crewai.flow.flow_events import ( FlowFinishedEvent, @@ -27,7 +28,12 @@ from crewai.flow.flow_visualizer import plot_flow from crewai.flow.utils import get_possible_return_constants from crewai.telemetry import Telemetry -T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) + +class FlowState(BaseModel): + """Base model for all flow states, ensuring each state has a unique ID.""" + id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the flow state") + +T = TypeVar("T", bound=Union[FlowState, Dict[str, Any]]) def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: @@ -377,14 +383,37 @@ class Flow(Generic[T], metaclass=FlowMeta): self._methods[method_name] = getattr(self, method_name) def _create_initial_state(self) -> T: + # Handle case where initial_state is None but we have a type parameter if self.initial_state is None and hasattr(self, "_initial_state_T"): - return self._initial_state_T() # type: ignore + state_type = getattr(self, "_initial_state_T") + if isinstance(state_type, type): + if issubclass(state_type, FlowState): + return state_type() # type: ignore + elif issubclass(state_type, BaseModel): + # Create a new type that includes the ID field + class StateWithId(state_type, FlowState): # type: ignore + pass + return StateWithId() # type: ignore + + # Handle case where no initial state is provided if self.initial_state is None: - return {} # type: ignore - elif isinstance(self.initial_state, type): - return self.initial_state() - else: - return self.initial_state + return {"id": str(uuid4())} # type: ignore + + # Handle case where initial_state is a type (class) + if isinstance(self.initial_state, type): + if issubclass(self.initial_state, FlowState): + return self.initial_state() # type: ignore + elif issubclass(self.initial_state, BaseModel): + # Create a new type that includes the ID field + class StateWithId(self.initial_state, FlowState): # type: ignore + pass + return StateWithId() # type: ignore + + # Handle dictionary case + if isinstance(self.initial_state, dict) and "id" not in self.initial_state: + self.initial_state["id"] = str(uuid4()) + + return self.initial_state # type: ignore @property def state(self) -> T: @@ -396,10 +425,17 @@ class Flow(Generic[T], metaclass=FlowMeta): return self._method_outputs def _initialize_state(self, inputs: Dict[str, Any]) -> None: - if isinstance(self._state, BaseModel): + if isinstance(self._state, dict): + # Preserve the ID when updating unstructured state + current_id = self._state.get("id") + self._state.update(inputs) + if current_id: + self._state["id"] = current_id + elif "id" not in self._state: + self._state["id"] = str(uuid4()) + elif isinstance(self._state, BaseModel): # Structured state try: - def create_model_with_extra_forbid( base_model: Type[BaseModel], ) -> Type[BaseModel]: @@ -409,16 +445,28 @@ class Flow(Generic[T], metaclass=FlowMeta): return ModelWithExtraForbid + # Get current state as dict, preserving the ID if it exists + state_model = cast(BaseModel, self._state) + current_state = ( + state_model.model_dump() + if hasattr(state_model, "model_dump") + else state_model.dict() + if hasattr(state_model, "dict") + else { + k: v + for k, v in state_model.__dict__.items() + if not k.startswith("_") + } + ) + ModelWithExtraForbid = create_model_with_extra_forbid( self._state.__class__ ) self._state = cast( - T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs}) + T, ModelWithExtraForbid(**{**current_state, **inputs}) ) except ValidationError as e: raise ValueError(f"Invalid inputs for structured state: {e}") from e - elif isinstance(self._state, dict): - self._state.update(inputs) else: raise TypeError("State must be a BaseModel instance or a dictionary.") diff --git a/tests/flow_test.py b/tests/flow_test.py index d52c459ce..44ea1d15d 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -3,6 +3,7 @@ import asyncio import pytest +from pydantic import BaseModel from crewai.flow.flow import Flow, and_, listen, or_, router, start @@ -265,6 +266,81 @@ def test_flow_with_custom_state(): assert flow.counter == 2 +def test_flow_uuid_unstructured(): + """Test that unstructured (dictionary) flow states automatically get a UUID that persists.""" + initial_id = None + + class UUIDUnstructuredFlow(Flow): + @start() + def first_method(self): + nonlocal initial_id + # Verify ID is automatically generated + assert "id" in self.state + assert isinstance(self.state["id"], str) + # Store initial ID for comparison + initial_id = self.state["id"] + # Add some data to trigger state update + self.state["data"] = "example" + + @listen(first_method) + def second_method(self): + # Ensure the ID persists after state updates + assert "id" in self.state + assert self.state["id"] == initial_id + # Update state again to verify ID preservation + self.state["more_data"] = "test" + assert self.state["id"] == initial_id + + flow = UUIDUnstructuredFlow() + flow.kickoff() + # Verify ID persists after flow completion + assert flow.state["id"] == initial_id + # Verify UUID format (36 characters, including hyphens) + assert len(flow.state["id"]) == 36 + + +def test_flow_uuid_structured(): + """Test that structured (Pydantic) flow states automatically get a UUID that persists.""" + initial_id = None + + class MyStructuredState(BaseModel): + counter: int = 0 + message: str = "initial" + + class UUIDStructuredFlow(Flow[MyStructuredState]): + @start() + def first_method(self): + nonlocal initial_id + # Verify ID is automatically generated and accessible as attribute + assert hasattr(self.state, "id") + assert isinstance(self.state.id, str) + # Store initial ID for comparison + initial_id = self.state.id + # Update some fields to trigger state changes + self.state.counter += 1 + self.state.message = "updated" + + @listen(first_method) + def second_method(self): + # Ensure the ID persists after state updates + assert hasattr(self.state, "id") + assert self.state.id == initial_id + # Update state again to verify ID preservation + self.state.counter += 1 + self.state.message = "final" + assert self.state.id == initial_id + + flow = UUIDStructuredFlow() + flow.kickoff() + # Verify ID persists after flow completion + assert flow.state.id == initial_id + # Verify UUID format (36 characters, including hyphens) + assert len(flow.state.id) == 36 + # Verify other state fields were properly updated + assert flow.state.counter == 2 + assert flow.state.message == "final" + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).