feat: add unique ID to flow states (#1888)

* feat: add unique ID to flow states

- Add FlowState base model with UUID field
- Update type variable T to use FlowState
- Ensure all states (structured and unstructured) get UUID
- Fix type checking in _create_initial_state method

Co-Authored-By: Joe Moura <joao@crewai.com>

* docs: update documentation to reflect automatic UUID generation in flow states

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports in flow.py

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports according to PEP 8

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: auto-fix import sorting with ruff

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: add comprehensive tests for flow state UUID functionality

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
This commit is contained in:
devin-ai-integration[bot]
2025-01-13 22:57:53 -03:00
committed by GitHub
parent 3de81cedd6
commit 75e68f6fc8
3 changed files with 165 additions and 17 deletions

View File

@@ -35,6 +35,8 @@ class ExampleFlow(Flow):
@start() @start()
def generate_city(self): def generate_city(self):
print("Starting flow") print("Starting flow")
# Each flow state automatically gets a unique ID
print(f"Flow State ID: {self.state['id']}")
response = completion( response = completion(
model=self.model, model=self.model,
@@ -47,6 +49,8 @@ class ExampleFlow(Flow):
) )
random_city = response["choices"][0]["message"]["content"] random_city = response["choices"][0]["message"]["content"]
# Store the city in our state
self.state["city"] = random_city
print(f"Random City: {random_city}") print(f"Random City: {random_city}")
return random_city return random_city
@@ -64,6 +68,8 @@ class ExampleFlow(Flow):
) )
fun_fact = response["choices"][0]["message"]["content"] fun_fact = response["choices"][0]["message"]["content"]
# Store the fun fact in our state
self.state["fun_fact"] = fun_fact
return fun_fact return fun_fact
@@ -76,7 +82,15 @@ print(f"Generated fun fact: {result}")
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task. In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console. Each Flow instance automatically receives a unique identifier (UUID) in its state, which helps track and manage flow executions. The state can also store additional data (like the generated city and fun fact) that persists throughout the flow's execution.
When you run the Flow, it will:
1. Generate a unique ID for the flow state
2. Generate a random city and store it in the state
3. Generate a fun fact about that city and store it in the state
4. Print the results to the console
The state's unique ID and stored data can be useful for tracking flow executions and maintaining context between tasks.
**Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API. **Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API.
@@ -207,14 +221,17 @@ allowing developers to choose the approach that best fits their application's ne
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class. In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance.
```python Code ```python Code
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
class UntructuredExampleFlow(Flow): class UnstructuredExampleFlow(Flow):
@start() @start()
def first_method(self): def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state.message = "Hello from structured flow" self.state.message = "Hello from structured flow"
self.state.counter = 0 self.state.counter = 0
@@ -231,10 +248,12 @@ class UntructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}") print(f"State after third_method: {self.state}")
flow = UntructuredExampleFlow() flow = UnstructuredExampleFlow()
flow.kickoff() flow.kickoff()
``` ```
**Note:** The `id` field is automatically generated and preserved throughout the flow's execution. You don't need to manage or set it manually, and it will be maintained even when updating the state with new data.
**Key Points:** **Key Points:**
- **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints. - **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints.
@@ -245,12 +264,15 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments. By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system.
```python Code ```python Code
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel from pydantic import BaseModel
class ExampleState(BaseModel): class ExampleState(BaseModel):
# Note: 'id' field is automatically added to all states
counter: int = 0 counter: int = 0
message: str = "" message: str = ""
@@ -259,6 +281,8 @@ class StructuredExampleFlow(Flow[ExampleState]):
@start() @start()
def first_method(self): def first_method(self):
# Access the auto-generated ID if needed
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow" self.state.message = "Hello from structured flow"
@listen(first_method) @listen(first_method)
@@ -628,4 +652,4 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin" referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen allowfullscreen
></iframe> ></iframe>

View File

@@ -13,9 +13,10 @@ from typing import (
Union, Union,
cast, cast,
) )
from uuid import uuid4
from blinker import Signal from blinker import Signal
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_events import ( from crewai.flow.flow_events import (
FlowFinishedEvent, FlowFinishedEvent,
@@ -27,7 +28,12 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the flow state")
T = TypeVar("T", bound=Union[FlowState, Dict[str, Any]])
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
@@ -377,14 +383,37 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods[method_name] = getattr(self, method_name) self._methods[method_name] = getattr(self, method_name)
def _create_initial_state(self) -> T: def _create_initial_state(self) -> T:
# Handle case where initial_state is None but we have a type parameter
if self.initial_state is None and hasattr(self, "_initial_state_T"): if self.initial_state is None and hasattr(self, "_initial_state_T"):
return self._initial_state_T() # type: ignore state_type = getattr(self, "_initial_state_T")
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
return state_type() # type: ignore
elif issubclass(state_type, BaseModel):
# Create a new type that includes the ID field
class StateWithId(state_type, FlowState): # type: ignore
pass
return StateWithId() # type: ignore
# Handle case where no initial state is provided
if self.initial_state is None: if self.initial_state is None:
return {} # type: ignore return {"id": str(uuid4())} # type: ignore
elif isinstance(self.initial_state, type):
return self.initial_state() # Handle case where initial_state is a type (class)
else: if isinstance(self.initial_state, type):
return self.initial_state if issubclass(self.initial_state, FlowState):
return self.initial_state() # type: ignore
elif issubclass(self.initial_state, BaseModel):
# Create a new type that includes the ID field
class StateWithId(self.initial_state, FlowState): # type: ignore
pass
return StateWithId() # type: ignore
# Handle dictionary case
if isinstance(self.initial_state, dict) and "id" not in self.initial_state:
self.initial_state["id"] = str(uuid4())
return self.initial_state # type: ignore
@property @property
def state(self) -> T: def state(self) -> T:
@@ -396,10 +425,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._method_outputs return self._method_outputs
def _initialize_state(self, inputs: Dict[str, Any]) -> None: def _initialize_state(self, inputs: Dict[str, Any]) -> None:
if isinstance(self._state, BaseModel): if isinstance(self._state, dict):
# Preserve the ID when updating unstructured state
current_id = self._state.get("id")
self._state.update(inputs)
if current_id:
self._state["id"] = current_id
elif "id" not in self._state:
self._state["id"] = str(uuid4())
elif isinstance(self._state, BaseModel):
# Structured state # Structured state
try: try:
def create_model_with_extra_forbid( def create_model_with_extra_forbid(
base_model: Type[BaseModel], base_model: Type[BaseModel],
) -> Type[BaseModel]: ) -> Type[BaseModel]:
@@ -409,16 +445,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
return ModelWithExtraForbid return ModelWithExtraForbid
# Get current state as dict, preserving the ID if it exists
state_model = cast(BaseModel, self._state)
current_state = (
state_model.model_dump()
if hasattr(state_model, "model_dump")
else state_model.dict()
if hasattr(state_model, "dict")
else {
k: v
for k, v in state_model.__dict__.items()
if not k.startswith("_")
}
)
ModelWithExtraForbid = create_model_with_extra_forbid( ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__ self._state.__class__
) )
self._state = cast( self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs}) T, ModelWithExtraForbid(**{**current_state, **inputs})
) )
except ValidationError as e: except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
self._state.update(inputs)
else: else:
raise TypeError("State must be a BaseModel instance or a dictionary.") raise TypeError("State must be a BaseModel instance or a dictionary.")

View File

@@ -3,6 +3,7 @@
import asyncio import asyncio
import pytest import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow import Flow, and_, listen, or_, router, start
@@ -265,6 +266,81 @@ def test_flow_with_custom_state():
assert flow.counter == 2 assert flow.counter == 2
def test_flow_uuid_unstructured():
"""Test that unstructured (dictionary) flow states automatically get a UUID that persists."""
initial_id = None
class UUIDUnstructuredFlow(Flow):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated
assert "id" in self.state
assert isinstance(self.state["id"], str)
# Store initial ID for comparison
initial_id = self.state["id"]
# Add some data to trigger state update
self.state["data"] = "example"
@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert "id" in self.state
assert self.state["id"] == initial_id
# Update state again to verify ID preservation
self.state["more_data"] = "test"
assert self.state["id"] == initial_id
flow = UUIDUnstructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state["id"] == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state["id"]) == 36
def test_flow_uuid_structured():
"""Test that structured (Pydantic) flow states automatically get a UUID that persists."""
initial_id = None
class MyStructuredState(BaseModel):
counter: int = 0
message: str = "initial"
class UUIDStructuredFlow(Flow[MyStructuredState]):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated and accessible as attribute
assert hasattr(self.state, "id")
assert isinstance(self.state.id, str)
# Store initial ID for comparison
initial_id = self.state.id
# Update some fields to trigger state changes
self.state.counter += 1
self.state.message = "updated"
@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert hasattr(self.state, "id")
assert self.state.id == initial_id
# Update state again to verify ID preservation
self.state.counter += 1
self.state.message = "final"
assert self.state.id == initial_id
flow = UUIDStructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state.id == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state.id) == 36
# Verify other state fields were properly updated
assert flow.state.counter == 2
assert flow.state.message == "final"
def test_router_with_multiple_conditions(): def test_router_with_multiple_conditions():
"""Test a router that triggers when any of multiple steps complete (OR condition), """Test a router that triggers when any of multiple steps complete (OR condition),
and another router that triggers only after all specified steps complete (AND condition). and another router that triggers only after all specified steps complete (AND condition).