feat: add unique ID to flow states (#1888)

* feat: add unique ID to flow states

- Add FlowState base model with UUID field
- Update type variable T to use FlowState
- Ensure all states (structured and unstructured) get UUID
- Fix type checking in _create_initial_state method

Co-Authored-By: Joe Moura <joao@crewai.com>

* docs: update documentation to reflect automatic UUID generation in flow states

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports in flow.py

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports according to PEP 8

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: auto-fix import sorting with ruff

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: add comprehensive tests for flow state UUID functionality

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
This commit is contained in:
devin-ai-integration[bot]
2025-01-13 22:57:53 -03:00
committed by Devin AI
parent 8617b46ac2
commit 200d4c030d
3 changed files with 165 additions and 17 deletions

View File

@@ -35,6 +35,8 @@ class ExampleFlow(Flow):
@start()
def generate_city(self):
print("Starting flow")
# Each flow state automatically gets a unique ID
print(f"Flow State ID: {self.state['id']}")
response = completion(
model=self.model,
@@ -47,6 +49,8 @@ class ExampleFlow(Flow):
)
random_city = response["choices"][0]["message"]["content"]
# Store the city in our state
self.state["city"] = random_city
print(f"Random City: {random_city}")
return random_city
@@ -64,6 +68,8 @@ class ExampleFlow(Flow):
)
fun_fact = response["choices"][0]["message"]["content"]
# Store the fun fact in our state
self.state["fun_fact"] = fun_fact
return fun_fact
@@ -76,7 +82,15 @@ print(f"Generated fun fact: {result}")
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
When you run the Flow, it will generate a random city and then generate a fun fact about that city. The output will be printed to the console.
Each Flow instance automatically receives a unique identifier (UUID) in its state, which helps track and manage flow executions. The state can also store additional data (like the generated city and fun fact) that persists throughout the flow's execution.
When you run the Flow, it will:
1. Generate a unique ID for the flow state
2. Generate a random city and store it in the state
3. Generate a fun fact about that city and store it in the state
4. Print the results to the console
The state's unique ID and stored data can be useful for tracking flow executions and maintaining context between tasks.
**Note:** Ensure you have set up your `.env` file to store your `OPENAI_API_KEY`. This key is necessary for authenticating requests to the OpenAI API.
@@ -207,14 +221,17 @@ allowing developers to choose the approach that best fits their application's ne
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
Even with unstructured states, CrewAI Flows automatically generates and maintains a unique identifier (UUID) for each state instance.
```python Code
from crewai.flow.flow import Flow, listen, start
class UntructuredExampleFlow(Flow):
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# The state automatically includes an 'id' field
print(f"State ID: {self.state['id']}")
self.state.message = "Hello from structured flow"
self.state.counter = 0
@@ -231,10 +248,12 @@ class UntructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}")
flow = UntructuredExampleFlow()
flow = UnstructuredExampleFlow()
flow.kickoff()
```
**Note:** The `id` field is automatically generated and preserved throughout the flow's execution. You don't need to manage or set it manually, and it will be maintained even when updating the state with new data.
**Key Points:**
- **Flexibility:** You can dynamically add attributes to `self.state` without predefined constraints.
@@ -245,12 +264,15 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
Each state in CrewAI Flows automatically receives a unique identifier (UUID) to help track and manage state instances. This ID is automatically generated and managed by the Flow system.
```python Code
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ExampleState(BaseModel):
# Note: 'id' field is automatically added to all states
counter: int = 0
message: str = ""
@@ -259,6 +281,8 @@ class StructuredExampleFlow(Flow[ExampleState]):
@start()
def first_method(self):
# Access the auto-generated ID if needed
print(f"State ID: {self.state.id}")
self.state.message = "Hello from structured flow"
@listen(first_method)
@@ -628,4 +652,4 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
></iframe>

View File

@@ -13,9 +13,10 @@ from typing import (
Union,
cast,
)
from uuid import uuid4
from blinker import Signal
from pydantic import BaseModel, ValidationError
from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_events import (
FlowFinishedEvent,
@@ -27,7 +28,12 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
id: str = Field(default_factory=lambda: str(uuid4()), description="Unique identifier for the flow state")
T = TypeVar("T", bound=Union[FlowState, Dict[str, Any]])
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
@@ -377,14 +383,37 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods[method_name] = getattr(self, method_name)
def _create_initial_state(self) -> T:
# Handle case where initial_state is None but we have a type parameter
if self.initial_state is None and hasattr(self, "_initial_state_T"):
return self._initial_state_T() # type: ignore
state_type = getattr(self, "_initial_state_T")
if isinstance(state_type, type):
if issubclass(state_type, FlowState):
return state_type() # type: ignore
elif issubclass(state_type, BaseModel):
# Create a new type that includes the ID field
class StateWithId(state_type, FlowState): # type: ignore
pass
return StateWithId() # type: ignore
# Handle case where no initial state is provided
if self.initial_state is None:
return {} # type: ignore
elif isinstance(self.initial_state, type):
return self.initial_state()
else:
return self.initial_state
return {"id": str(uuid4())} # type: ignore
# Handle case where initial_state is a type (class)
if isinstance(self.initial_state, type):
if issubclass(self.initial_state, FlowState):
return self.initial_state() # type: ignore
elif issubclass(self.initial_state, BaseModel):
# Create a new type that includes the ID field
class StateWithId(self.initial_state, FlowState): # type: ignore
pass
return StateWithId() # type: ignore
# Handle dictionary case
if isinstance(self.initial_state, dict) and "id" not in self.initial_state:
self.initial_state["id"] = str(uuid4())
return self.initial_state # type: ignore
@property
def state(self) -> T:
@@ -396,10 +425,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._method_outputs
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
if isinstance(self._state, BaseModel):
if isinstance(self._state, dict):
# Preserve the ID when updating unstructured state
current_id = self._state.get("id")
self._state.update(inputs)
if current_id:
self._state["id"] = current_id
elif "id" not in self._state:
self._state["id"] = str(uuid4())
elif isinstance(self._state, BaseModel):
# Structured state
try:
def create_model_with_extra_forbid(
base_model: Type[BaseModel],
) -> Type[BaseModel]:
@@ -409,16 +445,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
return ModelWithExtraForbid
# Get current state as dict, preserving the ID if it exists
state_model = cast(BaseModel, self._state)
current_state = (
state_model.model_dump()
if hasattr(state_model, "model_dump")
else state_model.dict()
if hasattr(state_model, "dict")
else {
k: v
for k, v in state_model.__dict__.items()
if not k.startswith("_")
}
)
ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__
)
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
T, ModelWithExtraForbid(**{**current_state, **inputs})
)
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")

View File

@@ -3,6 +3,7 @@
import asyncio
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
@@ -265,6 +266,81 @@ def test_flow_with_custom_state():
assert flow.counter == 2
def test_flow_uuid_unstructured():
"""Test that unstructured (dictionary) flow states automatically get a UUID that persists."""
initial_id = None
class UUIDUnstructuredFlow(Flow):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated
assert "id" in self.state
assert isinstance(self.state["id"], str)
# Store initial ID for comparison
initial_id = self.state["id"]
# Add some data to trigger state update
self.state["data"] = "example"
@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert "id" in self.state
assert self.state["id"] == initial_id
# Update state again to verify ID preservation
self.state["more_data"] = "test"
assert self.state["id"] == initial_id
flow = UUIDUnstructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state["id"] == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state["id"]) == 36
def test_flow_uuid_structured():
"""Test that structured (Pydantic) flow states automatically get a UUID that persists."""
initial_id = None
class MyStructuredState(BaseModel):
counter: int = 0
message: str = "initial"
class UUIDStructuredFlow(Flow[MyStructuredState]):
@start()
def first_method(self):
nonlocal initial_id
# Verify ID is automatically generated and accessible as attribute
assert hasattr(self.state, "id")
assert isinstance(self.state.id, str)
# Store initial ID for comparison
initial_id = self.state.id
# Update some fields to trigger state changes
self.state.counter += 1
self.state.message = "updated"
@listen(first_method)
def second_method(self):
# Ensure the ID persists after state updates
assert hasattr(self.state, "id")
assert self.state.id == initial_id
# Update state again to verify ID preservation
self.state.counter += 1
self.state.message = "final"
assert self.state.id == initial_id
flow = UUIDStructuredFlow()
flow.kickoff()
# Verify ID persists after flow completion
assert flow.state.id == initial_id
# Verify UUID format (36 characters, including hyphens)
assert len(flow.state.id) == 36
# Verify other state fields were properly updated
assert flow.state.counter == 2
assert flow.state.message == "final"
def test_router_with_multiple_conditions():
"""Test a router that triggers when any of multiple steps complete (OR condition),
and another router that triggers only after all specified steps complete (AND condition).