Stateful flows (#1931)

* fix: ensure persisted state overrides class defaults

- Remove early return in Flow.__init__ to allow proper state initialization
- Add test_flow_default_override.py to verify state override behavior
- Fix issue where default values weren't being overridden by persisted state

Fixes the issue where persisted state values weren't properly overriding
class defaults when restarting a flow with a previously saved state ID.

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: improve state restoration verification with has_set_count flag

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: add has_set_count field to PoemState

Co-Authored-By: Joe Moura <joao@crewai.com>

* refactoring test

* fix: ensure persisted state overrides class defaults

- Remove early return in Flow.__init__ to allow proper state initialization
- Add test_flow_default_override.py to verify state override behavior
- Fix issue where default values weren't being overridden by persisted state

Fixes the issue where persisted state values weren't properly overriding
class defaults when restarting a flow with a previously saved state ID.

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: improve state restoration verification with has_set_count flag

Co-Authored-By: Joe Moura <joao@crewai.com>

* test: add has_set_count field to PoemState

Co-Authored-By: Joe Moura <joao@crewai.com>

* refactoring test

* Fixing flow state

* fixing peristed stateful flows

* linter

* type fix

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
This commit is contained in:
João Moura
2025-01-20 13:30:09 -03:00
committed by GitHub
parent 3e4f112f39
commit ab2274caf0
9 changed files with 339 additions and 222 deletions

View File

@@ -54,57 +54,44 @@ LOG_MESSAGES = {
class PersistenceDecorator:
"""Class to handle flow state persistence with consistent logging."""
_printer = Printer() # Class-level printer instance
@classmethod
def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence) -> None:
"""Persist flow state with proper error handling and logging.
This method handles the persistence of flow state data, including proper
error handling and colored console output for status updates.
Args:
flow_instance: The flow instance whose state to persist
method_name: Name of the method that triggered persistence
persistence_instance: The persistence backend to use
Raises:
ValueError: If flow has no state or state lacks an ID
RuntimeError: If state persistence fails
AttributeError: If flow instance lacks required state attributes
Note:
Uses bold_yellow color for success messages and red for errors.
All operations are logged at appropriate levels (info/error).
Example:
```python
@persist
def my_flow_method(self):
# Method implementation
pass
# State will be automatically persisted after method execution
```
"""
try:
state = getattr(flow_instance, 'state', None)
if state is None:
raise ValueError("Flow instance has no state")
flow_uuid: Optional[str] = None
if isinstance(state, dict):
flow_uuid = state.get('id')
elif isinstance(state, BaseModel):
flow_uuid = getattr(state, 'id', None)
if not flow_uuid:
raise ValueError("Flow state must have an 'id' field for persistence")
# Log state saving with consistent message
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="bold_yellow")
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan")
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try:
persistence_instance.save_state(
flow_uuid=flow_uuid,
@@ -154,44 +141,79 @@ def persist(persistence: Optional[FlowPersistence] = None):
def begin(self):
pass
"""
def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]:
"""Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence()
if isinstance(target, type):
# Class decoration
class_methods = {}
for name, method in target.__dict__.items():
if callable(method) and hasattr(method, "__is_flow_method__"):
# Wrap each flow method with persistence
if asyncio.iscoroutinefunction(method):
@functools.wraps(method)
async def class_async_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
method_coro = method(self, *args, **kwargs)
if asyncio.iscoroutine(method_coro):
result = await method_coro
else:
result = method_coro
PersistenceDecorator.persist_state(self, method.__name__, actual_persistence)
return result
class_methods[name] = class_async_wrapper
else:
@functools.wraps(method)
def class_sync_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method.__name__, actual_persistence)
return result
class_methods[name] = class_sync_wrapper
original_init = getattr(target, "__init__")
# Preserve flow-specific attributes
@functools.wraps(original_init)
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
if 'persistence' not in kwargs:
kwargs['persistence'] = actual_persistence
original_init(self, *args, **kwargs)
setattr(target, "__init__", new_init)
# Store original methods to preserve their decorators
original_methods = {}
for name, method in target.__dict__.items():
if callable(method) and (
hasattr(method, "__is_start_method__") or
hasattr(method, "__trigger_methods__") or
hasattr(method, "__condition_type__") or
hasattr(method, "__is_flow_method__") or
hasattr(method, "__is_router__")
):
original_methods[name] = method
# Create wrapped versions of the methods that include persistence
for name, method in original_methods.items():
if asyncio.iscoroutinefunction(method):
# Create a closure to capture the current name and method
def create_async_wrapper(method_name: str, original_method: Callable):
@functools.wraps(original_method)
async def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = await original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence)
return result
return method_wrapper
wrapped = create_async_wrapper(name, method)
# Preserve all original decorators and attributes
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(class_methods[name], attr, getattr(method, attr))
setattr(class_methods[name], "__is_flow_method__", True)
setattr(wrapped, attr, getattr(method, attr))
setattr(wrapped, "__is_flow_method__", True)
# Update the class with the wrapped method
setattr(target, name, wrapped)
else:
# Create a closure to capture the current name and method
def create_sync_wrapper(method_name: str, original_method: Callable):
@functools.wraps(original_method)
def method_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = original_method(self, *args, **kwargs)
PersistenceDecorator.persist_state(self, method_name, actual_persistence)
return result
return method_wrapper
wrapped = create_sync_wrapper(name, method)
# Preserve all original decorators and attributes
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(wrapped, attr, getattr(method, attr))
setattr(wrapped, "__is_flow_method__", True)
# Update the class with the wrapped method
setattr(target, name, wrapped)
# Update class with wrapped methods
for name, method in class_methods.items():
setattr(target, name, method)
return target
else:
# Method decoration
@@ -208,6 +230,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = method_coro
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(method_async_wrapper, attr, getattr(method, attr))
@@ -219,6 +242,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = method(flow_instance, *args, **kwargs)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))

View File

@@ -3,10 +3,9 @@ SQLite-based implementation of flow state persistence.
"""
import json
import os
import sqlite3
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
@@ -16,34 +15,34 @@ from crewai.flow.persistence.base import FlowPersistence
class SQLiteFlowPersistence(FlowPersistence):
"""SQLite-based implementation of flow state persistence.
This class provides a simple, file-based persistence implementation using SQLite.
It's suitable for development and testing, or for production use cases with
moderate performance requirements.
"""
db_path: str # Type annotation for instance variable
def __init__(self, db_path: Optional[str] = None):
"""Initialize SQLite persistence.
Args:
db_path: Path to the SQLite database file. If not provided, uses
db_storage_path() from utilities.paths.
Raises:
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or db_storage_path()
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
if not path:
raise ValueError("Database path must be provided")
self.db_path = path # Now mypy knows this is str
self.init_db()
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
@@ -58,10 +57,10 @@ class SQLiteFlowPersistence(FlowPersistence):
""")
# Add index for faster UUID lookups
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
""")
def save_state(
self,
flow_uuid: str,
@@ -69,7 +68,7 @@ class SQLiteFlowPersistence(FlowPersistence):
state_data: Union[Dict[str, Any], BaseModel],
) -> None:
"""Save the current flow state to SQLite.
Args:
flow_uuid: Unique identifier for the flow instance
method_name: Name of the method that just completed
@@ -84,7 +83,7 @@ class SQLiteFlowPersistence(FlowPersistence):
raise ValueError(
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO flow_states (
@@ -99,13 +98,13 @@ class SQLiteFlowPersistence(FlowPersistence):
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:
flow_uuid: Unique identifier for the flow instance
Returns:
The most recent state as a dictionary, or None if no state exists
"""
@@ -118,7 +117,7 @@ class SQLiteFlowPersistence(FlowPersistence):
LIMIT 1
""", (flow_uuid,))
row = cursor.fetchone()
if row:
return json.loads(row[0])
return None