Compare commits

...

14 Commits

Author SHA1 Message Date
Devin AI
66382f9928 Merge remote-tracking branch 'origin/devin/1737272386-flow-override-fix' into devin/1737272386-flow-override-fix 2025-01-19 08:54:35 +00:00
João Moura
968e0f504b refactoring test 2025-01-19 08:54:24 +00:00
Devin AI
ce6747b967 test: add has_set_count field to PoemState
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 08:54:24 +00:00
Devin AI
89b5edf6b2 test: improve state restoration verification with has_set_count flag
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 08:54:24 +00:00
Devin AI
b5c26aeadd fix: ensure persisted state overrides class defaults
- Remove early return in Flow.__init__ to allow proper state initialization
- Add test_flow_default_override.py to verify state override behavior
- Fix issue where default values weren't being overridden by persisted state

Fixes the issue where persisted state values weren't properly overriding
class defaults when restarting a flow with a previously saved state ID.

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 08:54:24 +00:00
João Moura
b75ef482bf refactoring test 2025-01-19 00:51:53 -08:00
João Moura
48b0f3b1b3 Merge branch 'main' into devin/1737272386-flow-override-fix 2025-01-19 05:51:31 -03:00
devin-ai-integration[bot]
3e4f112f39 feat: add colored logging for flow operations (#1923)
* feat: add colored logging for flow operations

- Add flow_id property for easy ID access
- Add yellow colored logging for flow start
- Add bold_yellow colored logging for state operations
- Implement consistent logging across flow lifecycle

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports to fix lint error

Co-Authored-By: Joe Moura <joao@crewai.com>

* feat: improve flow logging and error handling

- Add centralized logging method for flow events
- Add robust error handling in persistence decorator
- Add consistent log messages and levels
- Add color-coded error messages

Co-Authored-By: Joe Moura <joao@crewai.com>

* fix: sort imports and improve error handling

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-01-19 05:50:30 -03:00
João Moura
d3b67345d5 Merge branch 'main' into devin/1737272386-flow-override-fix 2025-01-19 05:37:15 -03:00
João Moura
cc018bf128 updating tools version 2025-01-19 00:36:19 -08:00
Devin AI
db626ec99d test: add has_set_count field to PoemState
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 08:30:38 +00:00
Devin AI
4f4f229a0b test: improve state restoration verification with has_set_count flag
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 08:28:58 +00:00
Devin AI
44b92ef12f fix: ensure persisted state overrides class defaults
- Remove early return in Flow.__init__ to allow proper state initialization
- Add test_flow_default_override.py to verify state override behavior
- Fix issue where default values weren't being overridden by persisted state

Fixes the issue where persisted state values weren't properly overriding
class defaults when restarting a flow with a previously saved state ID.

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-01-19 07:42:38 +00:00
devin-ai-integration[bot]
46d3e4d4d9 docs: add flow persistence section (#1922)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
2025-01-19 04:34:58 -03:00
9 changed files with 505 additions and 109 deletions

View File

@@ -323,6 +323,91 @@ flow.kickoff()
By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.
## Flow Persistence
The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions. This decorator can be applied at either the class level or method level, providing flexibility in how you manage state persistence.
### Class-Level Persistence
When applied at the class level, the @persist decorator automatically persists all flow method states:
```python
@persist # Using SQLiteFlowPersistence by default
class MyFlow(Flow[MyState]):
@start()
def initialize_flow(self):
# This method will automatically have its state persisted
self.state.counter = 1
print("Initialized flow. State ID:", self.state.id)
@listen(initialize_flow)
def next_step(self):
# The state (including self.state.id) is automatically reloaded
self.state.counter += 1
print("Flow state is persisted. Counter:", self.state.counter)
```
### Method-Level Persistence
For more granular control, you can apply @persist to specific methods:
```python
class AnotherFlow(Flow[dict]):
@persist # Persists only this method's state
@start()
def begin(self):
if "runs" not in self.state:
self.state["runs"] = 0
self.state["runs"] += 1
print("Method-level persisted runs:", self.state["runs"])
```
### How It Works
1. **Unique State Identification**
- Each flow state automatically receives a unique UUID
- The ID is preserved across state updates and method calls
- Supports both structured (Pydantic BaseModel) and unstructured (dictionary) states
2. **Default SQLite Backend**
- SQLiteFlowPersistence is the default storage backend
- States are automatically saved to a local SQLite database
- Robust error handling ensures clear messages if database operations fail
3. **Error Handling**
- Comprehensive error messages for database operations
- Automatic state validation during save and load
- Clear feedback when persistence operations encounter issues
### Important Considerations
- **State Types**: Both structured (Pydantic BaseModel) and unstructured (dictionary) states are supported
- **Automatic ID**: The `id` field is automatically added if not present
- **State Recovery**: Failed or restarted flows can automatically reload their previous state
- **Custom Implementation**: You can provide your own FlowPersistence implementation for specialized storage needs
### Technical Advantages
1. **Precise Control Through Low-Level Access**
- Direct access to persistence operations for advanced use cases
- Fine-grained control via method-level persistence decorators
- Built-in state inspection and debugging capabilities
- Full visibility into state changes and persistence operations
2. **Enhanced Reliability**
- Automatic state recovery after system failures or restarts
- Transaction-based state updates for data integrity
- Comprehensive error handling with clear error messages
- Robust validation during state save and load operations
3. **Extensible Architecture**
- Customizable persistence backend through FlowPersistence interface
- Support for specialized storage solutions beyond SQLite
- Compatible with both structured (Pydantic) and unstructured (dict) states
- Seamless integration with existing CrewAI flow patterns
The persistence system's architecture emphasizes technical precision and customization options, allowing developers to maintain full control over state management while benefiting from built-in reliability features.
## Flow Control
### Conditional Logic: `or`

View File

@@ -13,25 +13,20 @@ dependencies = [
"openai>=1.13.3",
"litellm==1.57.4",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",
# Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
# Data Handling
"chromadb>=0.5.23",
"openpyxl>=3.1.5",
"pyvis>=0.3.2",
# Authentication and Security
"auth0-python>=4.7.1",
"python-dotenv>=1.0.0",
# Configuration and Utils
"click>=8.1.7",
"appdirs>=1.4.4",
@@ -40,7 +35,7 @@ dependencies = [
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
"blinker>=1.9.0"
"blinker>=1.9.0",
]
[project.urls]
@@ -49,7 +44,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools>=0.25.5"]
tools = ["crewai-tools>=0.32.1"]
embeddings = [
"tiktoken~=0.7.0"
]

View File

@@ -3,7 +3,7 @@ from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from crewai.flow import Flow, listen, start
from {{folder_name}}.crews.poem_crew.poem_crew import PoemCrew

View File

@@ -1,3 +1,5 @@
from crewai.flow.flow import Flow
from crewai.flow.flow import Flow, start, listen, or_, and_, router
from crewai.flow.persistence import persist
__all__ = ["Flow", "start", "listen", "or_", "and_", "router", "persist"]
__all__ = ["Flow"]

View File

@@ -1,5 +1,6 @@
import asyncio
import inspect
import logging
from typing import (
Any,
Callable,
@@ -28,6 +29,9 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
class FlowState(BaseModel):
@@ -63,14 +67,14 @@ def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT:
ValueError: If state validation fails
"""
"""Ensure state matches expected type with proper validation.
Args:
state: State instance to validate
expected_type: Expected type for the state
Returns:
Validated state instance
Raises:
TypeError: If state doesn't match expected type
ValueError: If state validation fails
@@ -424,6 +428,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Type parameter T must be either Dict[str, Any] or a subclass of BaseModel."""
_telemetry = Telemetry()
_printer = Printer()
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
@@ -485,19 +490,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Attempt to load state, prioritizing restore_uuid
if restore_uuid:
self._log_flow_event(f"Loading flow state from memory for UUID: {restore_uuid}", color="bold_yellow")
stored_state = self._persistence.load_state(restore_uuid)
if not stored_state:
raise ValueError(
f"No state found for restore_uuid='{restore_uuid}'"
)
elif kwargs and "id" in kwargs:
self._log_flow_event(f"Loading flow state from memory for ID: {kwargs['id']}", color="bold_yellow")
stored_state = self._persistence.load_state(kwargs["id"])
if not stored_state:
# For kwargs["id"], we allow creating new state if not found
self._state = self._create_initial_state()
if kwargs:
self._initialize_state(kwargs)
return
# Don't return early if state not found - let the normal initialization flow handle it
# This ensures proper state initialization and override behavior
# Initialize state based on persistence and kwargs
if stored_state:
@@ -612,7 +615,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Create new instance of the same class
model_class = type(model)
return cast(T, model_class(**state_dict))
raise TypeError(
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
)
@@ -626,6 +628,39 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
@property
def flow_id(self) -> str:
"""Returns the unique identifier of this flow instance.
This property provides a consistent way to access the flow's unique identifier
regardless of the underlying state implementation (dict or BaseModel).
Returns:
str: The flow's unique identifier, or an empty string if not found
Note:
This property safely handles both dictionary and BaseModel state types,
returning an empty string if the ID cannot be retrieved rather than raising
an exception.
Example:
```python
flow = MyFlow()
print(f"Current flow ID: {flow.flow_id}") # Safely get flow ID
```
"""
try:
if not hasattr(self, '_state'):
return ""
if isinstance(self._state, dict):
return str(self._state.get("id", ""))
elif isinstance(self._state, BaseModel):
return str(getattr(self._state, "id", ""))
return ""
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
@@ -692,6 +727,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""
# When restoring from persistence, use the stored ID
stored_id = stored_state.get("id")
self._log_flow_event(f"Restoring flow state from memory for ID: {stored_id}", color="bold_yellow")
if not stored_id:
raise ValueError("Stored state must have an 'id' field")
@@ -722,6 +758,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_name=self.__class__.__name__,
),
)
self._log_flow_event(f"Flow started with ID: {self.flow_id}", color="yellow")
if inputs is not None:
self._initialize_state(inputs)
@@ -967,6 +1004,30 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def _log_flow_event(self, message: str, color: str = "yellow", level: str = "info") -> None:
"""Centralized logging method for flow events.
This method provides a consistent interface for logging flow-related events,
combining both console output with colors and proper logging levels.
Args:
message: The message to log
color: Color to use for console output (default: yellow)
Available colors: purple, red, bold_green, bold_purple,
bold_blue, yellow, bold_yellow
level: Log level to use (default: info)
Supported levels: info, warning
Note:
This method uses the Printer utility for colored console output
and the standard logging module for log level support.
"""
self._printer.print(message, color=color)
if level == "info":
logger.info(message)
elif level == "warning":
logger.warning(message)
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())

View File

@@ -5,14 +5,14 @@ Example:
```python
from crewai.flow.flow import Flow, start
from crewai.flow.persistence import persist, SQLiteFlowPersistence
class MyFlow(Flow):
@start()
@persist(SQLiteFlowPersistence())
def sync_method(self):
# Synchronous method implementation
pass
@start()
@persist(SQLiteFlowPersistence())
async def async_method(self):
@@ -23,59 +23,71 @@ Example:
import asyncio
import functools
import inspect
import logging
from typing import (
Any,
Callable,
Dict,
Optional,
Type,
TypeVar,
Union,
cast,
get_type_hints,
)
from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
T = TypeVar("T")
# Constants for log messages
LOG_MESSAGES = {
"save_state": "Saving flow state to memory for ID: {}",
"save_error": "Failed to persist state for method {}: {}",
"state_missing": "Flow instance has no state",
"id_missing": "Flow state must have an 'id' field for persistence"
}
def persist(persistence: Optional[FlowPersistence] = None):
"""Decorator to persist flow state.
class PersistenceDecorator:
"""Class to handle flow state persistence with consistent logging."""
This decorator can be applied at either the class level or method level.
When applied at the class level, it automatically persists all flow method
states. When applied at the method level, it persists only that method's
state.
_printer = Printer() # Class-level printer instance
Args:
persistence: Optional FlowPersistence implementation to use.
If not provided, uses SQLiteFlowPersistence.
Returns:
A decorator that can be applied to either a class or method
Raises:
ValueError: If the flow state doesn't have an 'id' field
RuntimeError: If state persistence fails
@classmethod
def persist_state(cls, flow_instance: Any, method_name: str, persistence_instance: FlowPersistence) -> None:
"""Persist flow state with proper error handling and logging.
Example:
@persist # Class-level persistence with default SQLite
class MyFlow(Flow[MyState]):
@start()
def begin(self):
This method handles the persistence of flow state data, including proper
error handling and colored console output for status updates.
Args:
flow_instance: The flow instance whose state to persist
method_name: Name of the method that triggered persistence
persistence_instance: The persistence backend to use
Raises:
ValueError: If flow has no state or state lacks an ID
RuntimeError: If state persistence fails
AttributeError: If flow instance lacks required state attributes
Note:
Uses bold_yellow color for success messages and red for errors.
All operations are logged at appropriate levels (info/error).
Example:
```python
@persist
def my_flow_method(self):
# Method implementation
pass
"""
def _persist_state(flow_instance: Any, method_name: str, persistence_instance: FlowPersistence) -> None:
"""Helper to persist state with error handling."""
# State will be automatically persisted after method execution
```
"""
try:
# Get flow UUID from state
state = getattr(flow_instance, 'state', None)
if state is None:
raise ValueError("Flow instance has no state")
@@ -87,26 +99,65 @@ def persist(persistence: Optional[FlowPersistence] = None):
flow_uuid = getattr(state, 'id', None)
if not flow_uuid:
raise ValueError(
"Flow state must have an 'id' field for persistence"
)
raise ValueError("Flow state must have an 'id' field for persistence")
# Persist the state
persistence_instance.save_state(
flow_uuid=flow_uuid,
method_name=method_name,
state_data=state,
)
except Exception as e:
logger.error(
f"Failed to persist state for method {method_name}: {str(e)}"
)
raise RuntimeError(f"State persistence failed: {str(e)}") from e
# Log state saving with consistent message
cls._printer.print(LOG_MESSAGES["save_state"].format(flow_uuid), color="bold_yellow")
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))
try:
persistence_instance.save_state(
flow_uuid=flow_uuid,
method_name=method_name,
state_data=state,
)
except Exception as e:
error_msg = LOG_MESSAGES["save_error"].format(method_name, str(e))
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise RuntimeError(f"State persistence failed: {str(e)}") from e
except AttributeError:
error_msg = LOG_MESSAGES["state_missing"]
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise ValueError(error_msg)
except (TypeError, ValueError) as e:
error_msg = LOG_MESSAGES["id_missing"]
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise ValueError(error_msg) from e
def persist(persistence: Optional[FlowPersistence] = None):
"""Decorator to persist flow state.
This decorator can be applied at either the class level or method level.
When applied at the class level, it automatically persists all flow method
states. When applied at the method level, it persists only that method's
state.
Args:
persistence: Optional FlowPersistence implementation to use.
If not provided, uses SQLiteFlowPersistence.
Returns:
A decorator that can be applied to either a class or method
Raises:
ValueError: If the flow state doesn't have an 'id' field
RuntimeError: If state persistence fails
Example:
@persist # Class-level persistence with default SQLite
class MyFlow(Flow[MyState]):
@start()
def begin(self):
pass
"""
def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]:
"""Decorator that handles both class and method decoration."""
actual_persistence = persistence or SQLiteFlowPersistence()
if isinstance(target, type):
# Class decoration
class_methods = {}
@@ -121,23 +172,23 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = await method_coro
else:
result = method_coro
_persist_state(self, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(self, method.__name__, actual_persistence)
return result
class_methods[name] = class_async_wrapper
else:
@functools.wraps(method)
def class_sync_wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
result = method(self, *args, **kwargs)
_persist_state(self, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(self, method.__name__, actual_persistence)
return result
class_methods[name] = class_sync_wrapper
# Preserve flow-specific attributes
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(class_methods[name], attr, getattr(method, attr))
setattr(class_methods[name], "__is_flow_method__", True)
# Update class with wrapped methods
for name, method in class_methods.items():
setattr(target, name, method)
@@ -146,7 +197,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
# Method decoration
method = target
setattr(method, "__is_flow_method__", True)
if asyncio.iscoroutinefunction(method):
@functools.wraps(method)
async def method_async_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
@@ -155,7 +206,7 @@ def persist(persistence: Optional[FlowPersistence] = None):
result = await method_coro
else:
result = method_coro
_persist_state(flow_instance, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
@@ -166,12 +217,12 @@ def persist(persistence: Optional[FlowPersistence] = None):
@functools.wraps(method)
def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
result = method(flow_instance, *args, **kwargs)
_persist_state(flow_instance, method.__name__, actual_persistence)
PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence)
return result
for attr in ["__is_start_method__", "__trigger_methods__", "__condition_type__", "__is_router__"]:
if hasattr(method, attr):
setattr(method_sync_wrapper, attr, getattr(method, attr))
setattr(method_sync_wrapper, "__is_flow_method__", True)
return cast(Callable[..., T], method_sync_wrapper)
return decorator

View File

@@ -0,0 +1,63 @@
"""Test that persisted state properly overrides default values."""
import os
from typing import Optional
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, FlowState, start
from crewai.flow.persistence import persist
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
class PoemState(FlowState):
"""Test state model with default values that should be overridden."""
sentence_count: int = 1000 # Default that should be overridden
has_set_count: bool = False # Track whether we've set the count
def test_default_value_override():
"""Test that persisted state values override class defaults."""
@persist()
class PoemFlow(Flow[PoemState]):
initial_state = PoemState
@start()
def set_sentence_count(self):
print("Setting sentence count")
print(self.state)
# Only set sentence_count on first run, not when loading from persistence
if self.state.has_set_count and self.state.sentence_count == 2:
self.state.sentence_count = 3
elif self.state.has_set_count and self.state.sentence_count == 1000:
self.state.sentence_count = 1000
elif self.state.has_set_count and self.state.sentence_count == 5:
self.state.sentence_count = 5
else:
self.state.sentence_count = 2
self.state.has_set_count = True
# First run - should set sentence_count to 2
flow1 = PoemFlow()
flow1.kickoff()
original_uuid = flow1.state.id
assert flow1.state.sentence_count == 2
# Second run - should load sentence_count=2 instead of default 1000
flow2 = PoemFlow()
flow2.kickoff(inputs={"id": original_uuid})
assert flow2.state.sentence_count == 3 # Should load 2, not default 1000
# Third run - should not load sentence_count=2 instead of default 1000
flow2 = PoemFlow()
flow2.kickoff(inputs={"has_set_count": True})
assert flow2.state.sentence_count == 1000 # Should load 1000, not 2
# Third run - explicit override should work
flow3 = PoemFlow(
id=original_uuid,
sentence_count=5, # Override persisted value
)
assert flow3.state.sentence_count == 5 # Should use override value

View File

@@ -7,7 +7,7 @@ import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, FlowState, start
from crewai.flow.persistence import FlowPersistence, persist
from crewai.flow.persistence import persist
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -21,20 +21,20 @@ def test_persist_decorator_saves_state(tmp_path):
"""Test that @persist decorator saves state in SQLite."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow[Dict[str, str]]):
initial_state = dict() # Use dict instance as initial state
@start()
@persist(persistence)
def init_step(self):
self.state["message"] = "Hello, World!"
self.state["id"] = "test-uuid" # Ensure we have an ID for persistence
# Run flow and verify state is saved
flow = TestFlow(persistence=persistence)
flow.kickoff()
# Load state from DB and verify
saved_state = persistence.load_state(flow.state["id"])
assert saved_state is not None
@@ -45,20 +45,20 @@ def test_structured_state_persistence(tmp_path):
"""Test persistence with Pydantic model state."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class StructuredFlow(Flow[TestState]):
initial_state = TestState
@start()
@persist(persistence)
def count_up(self):
self.state.counter += 1
self.state.message = f"Count is {self.state.counter}"
# Run flow and verify state changes are saved
flow = StructuredFlow(persistence=persistence)
flow.kickoff()
# Load and verify state
saved_state = persistence.load_state(flow.state.id)
assert saved_state is not None
@@ -70,46 +70,46 @@ def test_flow_state_restoration(tmp_path):
"""Test restoring flow state from persistence with various restoration methods."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
# First flow execution to create initial state
class RestorableFlow(Flow[TestState]):
initial_state = TestState
@start()
@persist(persistence)
def set_message(self):
self.state.message = "Original message"
self.state.counter = 42
# Create and persist initial state
flow1 = RestorableFlow(persistence=persistence)
flow1.kickoff()
original_uuid = flow1.state.id
# Test case 1: Restore using restore_uuid with field override
flow2 = RestorableFlow(
persistence=persistence,
restore_uuid=original_uuid,
counter=43, # Override counter
)
# Verify state restoration and selective field override
assert flow2.state.id == original_uuid
assert flow2.state.message == "Original message" # Preserved
assert flow2.state.counter == 43 # Overridden
# Test case 2: Restore using kwargs['id']
flow3 = RestorableFlow(
persistence=persistence,
id=original_uuid,
message="Updated message", # Override message
)
# Verify state restoration and selective field override
assert flow3.state.id == original_uuid
assert flow3.state.counter == 42 # Preserved
assert flow3.state.message == "Updated message" # Overridden
# Test case 3: Verify error on conflicting IDs
with pytest.raises(ValueError) as exc_info:
RestorableFlow(
@@ -118,7 +118,7 @@ def test_flow_state_restoration(tmp_path):
id="different-id", # Conflict with restore_uuid
)
assert "Conflicting IDs provided" in str(exc_info.value)
# Test case 4: Verify error on non-existent restore_uuid
with pytest.raises(ValueError) as exc_info:
RestorableFlow(
@@ -126,7 +126,7 @@ def test_flow_state_restoration(tmp_path):
restore_uuid="non-existent-uuid",
)
assert "No state found" in str(exc_info.value)
# Test case 5: Allow new state creation with kwargs['id']
new_uuid = "new-flow-id"
flow4 = RestorableFlow(
@@ -135,7 +135,7 @@ def test_flow_state_restoration(tmp_path):
message="New message",
counter=100,
)
# Verify new state creation with provided ID
assert flow4.state.id == new_uuid
assert flow4.state.message == "New message"
@@ -146,25 +146,25 @@ def test_multiple_method_persistence(tmp_path):
"""Test state persistence across multiple method executions."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class MultiStepFlow(Flow[TestState]):
initial_state = TestState
@start()
@persist(persistence)
def step_1(self):
self.state.counter = 1
self.state.message = "Step 1"
@start()
@persist(persistence)
def step_2(self):
self.state.counter = 2
self.state.message = "Step 2"
flow = MultiStepFlow(persistence=persistence)
flow.kickoff()
# Load final state
final_state = persistence.load_state(flow.state.id)
assert final_state is not None
@@ -176,20 +176,20 @@ def test_persistence_error_handling(tmp_path):
"""Test error handling in persistence operations."""
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class InvalidFlow(Flow[TestState]):
# Missing id field in initial state
class InvalidState(BaseModel):
value: str = ""
initial_state = InvalidState
@start()
@persist(persistence)
def will_fail(self):
self.state.value = "test"
with pytest.raises(ValueError) as exc_info:
flow = InvalidFlow(persistence=persistence)
assert "must have an 'id' field" in str(exc_info.value)

149
uv.lock generated
View File

@@ -198,6 +198,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/39/e3/893e8757be2612e6c266d9bb58ad2e3651524b5b40cf56761e985a28b13e/asgiref-3.8.1-py3-none-any.whl", hash = "sha256:3e1e3ecc849832fe52ccf2cb6686b7a55f82bb1d6aee72a58826471390335e47", size = 23828 },
]
[[package]]
name = "asn1crypto"
version = "1.5.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/de/cf/d547feed25b5244fcb9392e288ff9fdc3280b10260362fc45d37a798a6ee/asn1crypto-1.5.1.tar.gz", hash = "sha256:13ae38502be632115abf8a24cbe5f4da52e3b5231990aff31123c805306ccb9c", size = 121080 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c9/7f/09065fd9e27da0eda08b4d6897f1c13535066174cc023af248fc2a8d5e5a/asn1crypto-1.5.1-py2.py3-none-any.whl", hash = "sha256:db4e40728b728508912cbb3d44f19ce188f218e9eba635821bb4b68564f8fd67", size = 105045 },
]
[[package]]
name = "asttokens"
version = "2.4.1"
@@ -219,6 +228,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a7/fa/e01228c2938de91d47b307831c62ab9e4001e747789d0b05baf779a6488c/async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028", size = 5721 },
]
[[package]]
name = "atpublic"
version = "5.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5d/18/b1d247792440378abeeb0853f9daa2a127284b68776af6815990be7fcdb0/atpublic-5.0.tar.gz", hash = "sha256:d5cb6cbabf00ec1d34e282e8ce7cbc9b74ba4cb732e766c24e2d78d1ad7f723f", size = 14646 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6b/03/2cb0e5326e19b7d877bc9c3a7ef436a30a06835b638580d1f5e21a0409ed/atpublic-5.0-py3-none-any.whl", hash = "sha256:b651dcd886666b1042d1e38158a22a4f2c267748f4e97fde94bc492a4a28a3f3", size = 5207 },
]
[[package]]
name = "attrs"
version = "24.2.0"
@@ -714,7 +732,7 @@ requires-dist = [
{ name = "blinker", specifier = ">=1.9.0" },
{ name = "chromadb", specifier = ">=0.5.23" },
{ name = "click", specifier = ">=8.1.7" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.25.5" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.32.1" },
{ name = "docling", marker = "extra == 'docling'", specifier = ">=2.12.0" },
{ name = "fastembed", marker = "extra == 'fastembed'", specifier = ">=0.4.1" },
{ name = "instructor", specifier = ">=1.3.3" },
@@ -762,7 +780,7 @@ dev = [
[[package]]
name = "crewai-tools"
version = "0.25.6"
version = "0.32.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "beautifulsoup4" },
@@ -774,20 +792,21 @@ dependencies = [
{ name = "lancedb" },
{ name = "linkup-sdk" },
{ name = "openai" },
{ name = "patronus" },
{ name = "pydantic" },
{ name = "pyright" },
{ name = "pytest" },
{ name = "pytube" },
{ name = "requests" },
{ name = "scrapegraph-py" },
{ name = "selenium" },
{ name = "serpapi" },
{ name = "snowflake" },
{ name = "spider-client" },
{ name = "weaviate-client" },
]
sdist = { url = "https://files.pythonhosted.org/packages/23/2f/fbfd0dc8912d375a2d1272c503f79c83c25f3d2b4b72c230b0672278a1bd/crewai_tools-0.25.6.tar.gz", hash = "sha256:442a7e7e579cb3c671a53c5b7afce645cd31d2db913ecc6d1e22a4c5e1baa840", size = 883175 }
sdist = { url = "https://files.pythonhosted.org/packages/e9/e7/fb07f0089028f7c9003770641d21f5844d4fa22bf5cc4c4b3676bfa0e1fe/crewai_tools-0.32.1.tar.gz", hash = "sha256:41acea9243b17a463f355d48dfe7d73bd59738c8862a8da780eae008e0136414", size = 887378 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/21/561a81b4f8cfcc2ac6a0c3db3ec86b70a7db6dabb0dd7d13c96be981b2fc/crewai_tools-0.25.6-py3-none-any.whl", hash = "sha256:463e0ee8d780ab7a801992e3960471fb8e64d038866429f70995ddd0a83e0679", size = 514758 },
{ url = "https://files.pythonhosted.org/packages/36/f0/8f98f1a2b90b9b989bd01cf48b5e3bb2d842be2062bfd3177a77561e7b61/crewai_tools-0.32.1-py3-none-any.whl", hash = "sha256:6cb436dc66e19e35285a4fce501158a13bce99b244370574f568ec33c5513351", size = 537264 },
]
[[package]]
@@ -3495,6 +3514,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/20/ff623b09d963f88bfde16306a54e12ee5ea43e9b597108672ff3a408aad6/pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08", size = 31191 },
]
[[package]]
name = "patronus"
version = "0.0.17"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
{ name = "pandas" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyyaml" },
{ name = "tqdm" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c5/a0/d5218ff6f2eab18c5a90266d21cdac673c85070e82e3f8aba538b3200f54/patronus-0.0.17.tar.gz", hash = "sha256:7298f770d4f6774b955806fb319c2c872fda3551bd7fa63d975bbeedc14b28de", size = 27377 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/9e/717c4508d675549ff081a7fecf25af7d70f9d7ad87ea0d4825e02de3b801/patronus-0.0.17-py3-none-any.whl", hash = "sha256:1f322eeee838974515fdb7cbf8530ad25c6c59686abbcb28c1fdbf23d34eb10d", size = 31516 },
]
[[package]]
name = "pdfminer-six"
version = "20231228"
@@ -4055,6 +4092,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c2/35/c0edf199257ef0a7d407d29cd51c4e70d1dad4370a5f44deb65a7a5475e2/pymdown_extensions-10.11.2-py3-none-any.whl", hash = "sha256:41cdde0a77290e480cf53892f5c5e50921a7ee3e5cd60ba91bf19837b33badcf", size = 259044 },
]
[[package]]
name = "pyopenssl"
version = "24.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cryptography" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c1/d4/1067b82c4fc674d6f6e9e8d26b3dff978da46d351ca3bac171544693e085/pyopenssl-24.3.0.tar.gz", hash = "sha256:49f7a019577d834746bc55c5fce6ecbcec0f2b4ec5ce1cf43a9a173b8138bb36", size = 178944 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/42/22/40f9162e943f86f0fc927ebc648078be87def360d9d8db346619fb97df2b/pyOpenSSL-24.3.0-py3-none-any.whl", hash = "sha256:e474f5a473cd7f92221cc04976e48f4d11502804657a08a989fb3be5514c904a", size = 56111 },
]
[[package]]
name = "pypdf"
version = "5.0.1"
@@ -4923,6 +4972,87 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235 },
]
[[package]]
name = "snowflake"
version = "1.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "snowflake-core" },
{ name = "snowflake-legacy" },
]
sdist = { url = "https://files.pythonhosted.org/packages/80/d1/830929fb7b54586f4ee601f409e80343e16f32b9b579246cd6fa9984bcff/snowflake-1.0.2.tar.gz", hash = "sha256:4009e59af24e444de4a9e9d340fff0979cca8a02a4feee4665da97eb9c76d958", size = 6033 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b6/25/4cbba4da3f9b333d132680a66221d1a101309cce330fa8be38b674ceafd0/snowflake-1.0.2-py3-none-any.whl", hash = "sha256:6bb0fc70aa10234769202861ccb4b091f5e9fb1bbc61a1e708db93baa3f221f4", size = 5623 },
]
[[package]]
name = "snowflake-connector-python"
version = "3.12.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "asn1crypto" },
{ name = "certifi" },
{ name = "cffi" },
{ name = "charset-normalizer" },
{ name = "cryptography" },
{ name = "filelock" },
{ name = "idna" },
{ name = "packaging" },
{ name = "platformdirs" },
{ name = "pyjwt" },
{ name = "pyopenssl" },
{ name = "pytz" },
{ name = "requests" },
{ name = "sortedcontainers" },
{ name = "tomlkit" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6b/de/f43d9c827ccc1974696ffd3c0495e2d4e98b0414b2353b7de932621f23dd/snowflake_connector_python-3.12.4.tar.gz", hash = "sha256:289e0691dfbf8ec8b7a8f58bcbb95a819890fe5e5b278fdbfc885059a63a946f", size = 743445 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/53/6c/edc8909e424654a7a3c18cbf804d8a35c17a65a2131f866a87ed8e762bd0/snowflake_connector_python-3.12.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6f141c159e3244bd660279f87f32e39351b2845fcb75f8138f31d2219f983b05", size = 958038 },
{ url = "https://files.pythonhosted.org/packages/93/a3/34c5082dfb9b555c914f4233224b8bc1f2c4d5668bc71bb587680b8dcd73/snowflake_connector_python-3.12.4-cp310-cp310-macosx_11_0_x86_64.whl", hash = "sha256:091458ba777c24adff659c5c28f0f5bb0bcca8a9b6ecc5641ae25b7c20a8f43d", size = 970665 },
{ url = "https://files.pythonhosted.org/packages/f8/87/9eceaaba58b2ec4f9094fc3a04d953bbabbfdcc05a6b14ef12610c1039f9/snowflake_connector_python-3.12.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:23049d341da681ec7131cead71cdf7b1761ae5bcc08bcbdb931dcef6c25e8a5f", size = 2496731 },
{ url = "https://files.pythonhosted.org/packages/66/0a/e35e9e0a142f3779007b0246166a245305858b198ed0dd3a41a3d2405512/snowflake_connector_python-3.12.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cc88a09d77a8ce7e445094b2409b606ddb208b5fc9f7c7a379d0255a8d566e9d", size = 2520041 },
{ url = "https://files.pythonhosted.org/packages/79/77/9a238c153600adff8fbd1136d9f4be1e42cb827cbe1865924bfe84653e85/snowflake_connector_python-3.12.4-cp310-cp310-win_amd64.whl", hash = "sha256:3c33fbba036805c1767ea48eb40ffc3fb79d61f2a4bb4e77b571ea6f6a998be8", size = 918272 },
{ url = "https://files.pythonhosted.org/packages/0d/95/e8aac28d6913e4b59f96e6d361f31b9576b5f0abe4d2c4f7decf9f075932/snowflake_connector_python-3.12.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ec5cfaa1526084cf4d0e7849d5ace601245cb4ad9675ab3cd7d799b3abea481", size = 958125 },
{ url = "https://files.pythonhosted.org/packages/67/b6/a847a94e03bdf39010048feacd57f250a91a655eed333d7d32b165f65201/snowflake_connector_python-3.12.4-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:ff225824b3a0fa5e822442de72172f97028f04ae183877f1305d538d8d6c5d11", size = 970770 },
{ url = "https://files.pythonhosted.org/packages/0e/91/f97812ae9946944bcd9bfe1965af1cb9b1844919da879d90b90dfd3e5086/snowflake_connector_python-3.12.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9beced2789dc75e8f1e749aa637e7ec9b03302b4ed4b793ae0f1ff32823370e", size = 2519875 },
{ url = "https://files.pythonhosted.org/packages/37/52/500d72079bfb322ebdf3892180ecf3dc73c117b3a966ee8d4bb1378882b2/snowflake_connector_python-3.12.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5ea47450a04ff713f3adf28053e34103bd990291e62daee9721c76597af4b2b5", size = 2542320 },
{ url = "https://files.pythonhosted.org/packages/59/92/74ead6bee8dd29fe372002ce59477221e04b9da96ad7aafe584afce02937/snowflake_connector_python-3.12.4-cp311-cp311-win_amd64.whl", hash = "sha256:748f9125854dca07ea471bb2bb3c5bb932a53f9b8a77ba348b50b738c77203ce", size = 918363 },
{ url = "https://files.pythonhosted.org/packages/a5/a3/1cbe0b52b810f069bdc96c372b2d91ac51aeac32986c2832aa3fe0b0b0e5/snowflake_connector_python-3.12.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4bcd0371b20d199f15e6a3c0b489bf18e27f2a88c84cf3194b2569ca039fa7d1", size = 957561 },
{ url = "https://files.pythonhosted.org/packages/f4/05/8a5e16bd908a89f36d59686d356890c4bd6a976a487f86274181010f4b49/snowflake_connector_python-3.12.4-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:7900d82a450b206fa2ed6c42cd65d9b3b9fd4547eca1696937175fac2a03ba37", size = 969045 },
{ url = "https://files.pythonhosted.org/packages/79/1b/8f5ab15d224d7bf76533c55cfd8ce73b185ce94d84241f0e900739ce3f37/snowflake_connector_python-3.12.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:300f0562aeea55e40ee03b45205dbef7b78f5ba2f1787a278c7b807e7d8db22c", size = 2533969 },
{ url = "https://files.pythonhosted.org/packages/6e/d9/2e2fd72e0251691b5c54a219256c455141a2d3c104e411b82de598c62553/snowflake_connector_python-3.12.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a6762a00948f003be55d7dc5de9de690315d01951a94371ec3db069d9303daba", size = 2558052 },
{ url = "https://files.pythonhosted.org/packages/e8/cb/e0ab230ad5adc9932e595bdbec693b2499d446666daf6cb9cae306a41dd2/snowflake_connector_python-3.12.4-cp312-cp312-win_amd64.whl", hash = "sha256:83ca896790a7463b6c8cd42e1a29b8ea197cc920839ae6ee96a467475eab4ec2", size = 916627 },
]
[[package]]
name = "snowflake-core"
version = "1.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "atpublic" },
{ name = "pydantic" },
{ name = "python-dateutil" },
{ name = "pyyaml" },
{ name = "requests" },
{ name = "snowflake-connector-python" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1d/cf/6f91e5b2daaf3df9ae666a65f5ba3938f11a40784e4ada5218ecf154b29a/snowflake_core-1.0.2.tar.gz", hash = "sha256:8bf267ff1efcd17f157432c6e24f6d2eb6c2aeed66f43ab34b215aa76d8edf02", size = 1092618 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/75/3c/ec228b7325b32781081c72254dd0ef793943e853d82616e862e231909c6c/snowflake_core-1.0.2-py3-none-any.whl", hash = "sha256:55c37cf526a0d78dd3359ad96b9ecd7130bbbbc2f5a2fec77bb3da0dac2dc688", size = 1555690 },
]
[[package]]
name = "snowflake-legacy"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/41/a6211bd2109913eee1506d37865ab13cf9a8cc2faa41833da3d1ffec654b/snowflake_legacy-1.0.0.tar.gz", hash = "sha256:2044661c79ba01841ab279c5e74b994532244c9d103224eba16eb159c8ed6033", size = 4043 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/aa/8c/64f9b5ee0c3f376a733584c480b31addbf2baff7bb41f655e5e3f3719d3b/snowflake_legacy-1.0.0-py3-none-any.whl", hash = "sha256:25f9678f180d7d5f5b60d17f8112f0ee8a7a77b82c67fd599ed6e27bd502be5a", size = 3059 },
]
[[package]]
name = "sortedcontainers"
version = "2.4.0"
@@ -5184,6 +5314,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c4/ac/ce90573ba446a9bbe65838ded066a805234d159b4446ae9f8ec5bbd36cbd/tomli_w-1.1.0-py3-none-any.whl", hash = "sha256:1403179c78193e3184bfaade390ddbd071cba48a32a2e62ba11aae47490c63f7", size = 6440 },
]
[[package]]
name = "tomlkit"
version = "0.13.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b1/09/a439bec5888f00a54b8b9f05fa94d7f901d6735ef4e55dcec9bc37b5d8fa/tomlkit-0.13.2.tar.gz", hash = "sha256:fff5fe59a87295b278abd31bec92c15d9bc4a06885ab12bcea52c71119392e79", size = 192885 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/b6/a447b5e4ec71e13871be01ba81f5dfc9d0af7e473da256ff46bc0e24026f/tomlkit-0.13.2-py3-none-any.whl", hash = "sha256:7a974427f6e119197f670fbbbeae7bef749a6c14e793db934baefc1b5f03efde", size = 37955 },
]
[[package]]
name = "torch"
version = "2.4.1"