Compare commits

...

5 Commits

Author SHA1 Message Date
Devin AI
3c2f85d9d4 fix: Remove duplicate Protocol import
- Remove Protocol import from typing to fix type checker error
- Keep Protocol from typing_extensions

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 06:30:10 +00:00
Devin AI
ae82745ddd fix: Improve error handling in _serialize_value
- Move try-except block to cover all code paths
- Ensure proper error handling for all value types

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 06:28:38 +00:00
Devin AI
b98e720531 feat: Add performance monitoring and type safety improvements
- Add performance monitoring for serialization
- Add type safety protocols
- Add concurrent access test
- Improve error handling
- Optimize thread-safe primitive detection

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 06:27:04 +00:00
Devin AI
5467a70d97 fix: Fix import sorting in test file
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 06:21:20 +00:00
Devin AI
99a6390158 fix: Handle thread locks in Flow state serialization
- Add custom serialization for thread locks in Flow state
- Add test coverage for thread locks and async primitives
- Maintain backward compatibility
- Fix issue #2120

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-14 06:19:45 +00:00
2 changed files with 384 additions and 2 deletions

View File

@@ -1,7 +1,11 @@
import asyncio
import copy
import dataclasses
import functools
import inspect
import logging
import threading
import time
from contextlib import contextmanager
from typing import (
Any,
Callable,
@@ -15,6 +19,22 @@ from typing import (
Union,
cast,
)
from typing_extensions import Protocol
logger = logging.getLogger(__name__)
class SerializationError(Exception):
"""Error during state serialization."""
pass
class LockProtocol(Protocol):
"""Protocol for thread-safe primitives."""
def acquire(self) -> bool: ...
def release(self) -> None: ...
def _is_owned(self) -> bool: ...
from uuid import uuid4
from blinker import Signal
@@ -437,6 +457,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
initial_state: Union[Type[T], T, None] = None
event_emitter = Signal("event_emitter")
@contextmanager
def _performance_monitor(self, operation: str):
"""Monitor performance of an operation.
Args:
operation: Name of the operation being monitored
Yields:
None
"""
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
logger.debug(f"{operation} took {duration:.4f} seconds")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
_initial_state_T = item # type: ignore
@@ -569,8 +606,171 @@ class Flow(Generic[T], metaclass=FlowMeta):
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
)
# Cache thread-safe primitive types
THREAD_SAFE_TYPES = {
type(threading.RLock()): threading.RLock,
type(threading.Lock()): threading.Lock,
type(threading.Semaphore()): threading.Semaphore,
type(threading.Event()): threading.Event,
type(threading.Condition()): threading.Condition,
type(asyncio.Lock()): asyncio.Lock,
type(asyncio.Event()): asyncio.Event,
type(asyncio.Condition()): asyncio.Condition,
type(asyncio.Semaphore()): asyncio.Semaphore,
}
def _get_thread_safe_primitive_type(self, value: Any) -> Optional[Type[LockProtocol]]:
"""Get the type of a thread-safe primitive for recreation.
Args:
value: Any Python value to check
Returns:
The type of the thread-safe primitive, or None if not a primitive
"""
return (self.THREAD_SAFE_TYPES.get(type(value))
if hasattr(value, '_is_owned') and hasattr(value, 'acquire')
else None)
@functools.lru_cache(maxsize=128)
def _get_dataclass_fields(self, cls):
"""Get cached dataclass fields.
Args:
cls: Dataclass type
Returns:
Dict mapping field names to Field objects
"""
return {field.name: field for field in dataclasses.fields(cls)}
def _serialize_dataclass(self, value: Any) -> Union[Dict[str, Any], Any]:
"""Serialize a dataclass instance.
Args:
value: A dataclass instance
Returns:
A new instance of the dataclass with thread-safe primitives recreated
"""
try:
if not hasattr(value, '__class__'):
return value
if hasattr(value, '__pydantic_validate__'):
return value.__pydantic_validate__()
# Get field values, handling thread-safe primitives
field_values = {}
for field_name, field in self._get_dataclass_fields(value.__class__).items():
field_value = getattr(value, field_name)
primitive_type = self._get_thread_safe_primitive_type(field_value)
if primitive_type is not None:
field_values[field_name] = primitive_type()
else:
field_values[field_name] = self._serialize_value(field_value)
# Create new instance
return value.__class__(**field_values)
except Exception as e:
logger.error(f"Dataclass serialization error for {type(value)}: {str(e)}")
raise SerializationError(f"Failed to serialize dataclass {type(value)}") from e
def _serialize_value(self, value: Any) -> Any:
"""Recursively serialize a value, handling thread locks.
Args:
value: Any Python value to serialize
Returns:
Serialized version of the value with thread-safe primitives handled
Raises:
SerializationError: If serialization fails
"""
with self._performance_monitor(f"serialize_{type(value).__name__}"):
try:
# Handle None
if value is None:
return None
# Handle thread-safe primitives
primitive_type = self._get_thread_safe_primitive_type(value)
if primitive_type is not None:
return primitive_type()
# Handle Pydantic models
if isinstance(value, BaseModel):
model_class = type(value)
model_data = value.model_dump(exclude_none=True)
# Create new instance
instance = model_class(**model_data)
# Copy excluded fields that are thread-safe primitives
for field_name, field in value.__class__.model_fields.items():
if field.exclude:
field_value = getattr(value, field_name, None)
if field_value is not None:
primitive_type = self._get_thread_safe_primitive_type(field_value)
if primitive_type is not None:
setattr(instance, field_name, primitive_type())
return instance
# Handle dataclasses
if dataclasses.is_dataclass(value):
return self._serialize_dataclass(value)
# Handle dictionaries
if isinstance(value, dict):
return {
k: self._serialize_value(v)
for k, v in value.items()
}
# Handle lists, tuples, and sets
if isinstance(value, (list, tuple, set)):
serialized = [self._serialize_value(item) for item in value]
return (
serialized if isinstance(value, list)
else tuple(serialized) if isinstance(value, tuple)
else set(serialized)
)
# Handle other types
return value
except Exception as e:
logger.error(f"Serialization error for {type(value)}: {str(e)}")
raise SerializationError(f"Failed to serialize {type(value)}") from e
# Handle dataclasses
if dataclasses.is_dataclass(value):
return self._serialize_dataclass(value)
# Handle dictionaries
if isinstance(value, dict):
return {
k: self._serialize_value(v)
for k, v in value.items()
}
# Handle lists, tuples, and sets
if isinstance(value, (list, tuple, set)):
serialized = [self._serialize_value(item) for item in value]
return (
serialized if isinstance(value, list)
else tuple(serialized) if isinstance(value, tuple)
else set(serialized)
)
# Handle other types
return value
def _copy_state(self) -> T:
return copy.deepcopy(self._state)
"""Create a deep copy of the current state."""
return self._serialize_value(self._state)
@property
def state(self) -> T:

View File

@@ -0,0 +1,182 @@
"""Tests for Flow with thread locks."""
import asyncio
import threading
from typing import Optional
from uuid import uuid4
import pytest
from pydantic import BaseModel, Field, field_validator
from crewai.flow.flow import Flow, listen, start
class ThreadSafeState(BaseModel):
"""Test state model with thread locks."""
model_config = {
"arbitrary_types_allowed": True,
"exclude": {"lock"}
}
id: str = Field(default_factory=lambda: str(uuid4()))
lock: Optional[threading.RLock] = Field(default=None, exclude=True)
value: str = ""
def __init__(self, **data):
super().__init__(**data)
if self.lock is None:
self.lock = threading.RLock()
class LockFlow(Flow[ThreadSafeState]):
"""Test flow with thread locks."""
initial_state = ThreadSafeState
@start()
async def step_1(self):
with self.state.lock:
self.state.value = "step 1"
return "step 1"
@listen(step_1)
async def step_2(self, result):
with self.state.lock:
self.state.value += " -> step 2"
return result + " -> step 2"
def test_flow_with_thread_locks():
"""Test Flow with thread locks in state."""
flow = LockFlow()
result = asyncio.run(flow.kickoff_async())
assert result == "step 1 -> step 2"
assert flow.state.value == "step 1 -> step 2"
def test_kickoff_async_with_lock_inputs():
"""Test kickoff_async with thread lock inputs."""
flow = LockFlow()
inputs = {
"lock": threading.RLock(),
"value": "test"
}
result = asyncio.run(flow.kickoff_async(inputs=inputs))
assert result == "step 1 -> step 2"
assert flow.state.value == "step 1 -> step 2"
class ComplexState(BaseModel):
"""Test state model with nested thread locks."""
model_config = {
"arbitrary_types_allowed": True,
"exclude": {"outer_lock"}
}
id: str = Field(default_factory=lambda: str(uuid4()))
outer_lock: Optional[threading.RLock] = Field(default=None, exclude=True)
inner: Optional[ThreadSafeState] = Field(default_factory=ThreadSafeState)
value: str = ""
def __init__(self, **data):
super().__init__(**data)
if self.outer_lock is None:
self.outer_lock = threading.RLock()
class NestedLockFlow(Flow[ComplexState]):
"""Test flow with nested thread locks."""
initial_state = ComplexState
@start()
async def step_1(self):
with self.state.outer_lock:
with self.state.inner.lock:
self.state.value = "outer"
self.state.inner.value = "inner"
return "step 1"
@listen(step_1)
async def step_2(self, result):
with self.state.outer_lock:
with self.state.inner.lock:
self.state.value += " -> outer 2"
self.state.inner.value += " -> inner 2"
return result + " -> step 2"
def test_flow_with_nested_locks():
"""Test Flow with nested thread locks in state."""
flow = NestedLockFlow()
result = asyncio.run(flow.kickoff_async())
assert result == "step 1 -> step 2"
assert flow.state.value == "outer -> outer 2"
assert flow.state.inner.value == "inner -> inner 2"
class AsyncLockState(BaseModel):
"""Test state model with async locks."""
model_config = {
"arbitrary_types_allowed": True,
"exclude": {"lock", "event"}
}
id: str = Field(default_factory=lambda: str(uuid4()))
lock: Optional[asyncio.Lock] = Field(default=None, exclude=True)
event: Optional[asyncio.Event] = Field(default=None, exclude=True)
value: str = ""
def __init__(self, **data):
super().__init__(**data)
if self.lock is None:
self.lock = asyncio.Lock()
if self.event is None:
self.event = asyncio.Event()
class AsyncLockFlow(Flow[AsyncLockState]):
"""Test flow with async locks."""
initial_state = AsyncLockState
@start()
async def step_1(self):
async with self.state.lock:
self.state.value = "step 1"
self.state.event.set()
return "step 1"
@listen(step_1)
async def step_2(self, result):
async with self.state.lock:
await self.state.event.wait()
self.state.value += " -> step 2"
return result + " -> step 2"
def test_flow_with_async_locks():
"""Test Flow with async locks in state."""
flow = AsyncLockFlow()
result = asyncio.run(flow.kickoff_async())
assert result == "step 1 -> step 2"
assert flow.state.value == "step 1 -> step 2"
def test_flow_concurrent_access():
"""Test Flow with concurrent access."""
flow = LockFlow()
results = []
errors = []
async def run_flow():
try:
result = await flow.kickoff_async()
results.append(result)
except Exception as e:
errors.append(e)
async def test():
tasks = [run_flow() for _ in range(10)]
await asyncio.gather(*tasks)
asyncio.run(test())
assert len(results) == 10
assert not errors
assert all(result == "step 1 -> step 2" for result in results)