mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-29 01:58:14 +00:00
Compare commits
5 Commits
devin/1768
...
devin/1739
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c2f85d9d4 | ||
|
|
ae82745ddd | ||
|
|
b98e720531 | ||
|
|
5467a70d97 | ||
|
|
99a6390158 |
@@ -1,7 +1,11 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import copy
|
import dataclasses
|
||||||
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from contextlib import contextmanager
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
@@ -15,6 +19,22 @@ from typing import (
|
|||||||
Union,
|
Union,
|
||||||
cast,
|
cast,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from typing_extensions import Protocol
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SerializationError(Exception):
|
||||||
|
"""Error during state serialization."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LockProtocol(Protocol):
|
||||||
|
"""Protocol for thread-safe primitives."""
|
||||||
|
def acquire(self) -> bool: ...
|
||||||
|
def release(self) -> None: ...
|
||||||
|
def _is_owned(self) -> bool: ...
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from blinker import Signal
|
from blinker import Signal
|
||||||
@@ -437,6 +457,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
initial_state: Union[Type[T], T, None] = None
|
initial_state: Union[Type[T], T, None] = None
|
||||||
event_emitter = Signal("event_emitter")
|
event_emitter = Signal("event_emitter")
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _performance_monitor(self, operation: str):
|
||||||
|
"""Monitor performance of an operation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation: Name of the operation being monitored
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
duration = time.perf_counter() - start
|
||||||
|
logger.debug(f"{operation} took {duration:.4f} seconds")
|
||||||
|
|
||||||
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
|
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
|
||||||
class _FlowGeneric(cls): # type: ignore
|
class _FlowGeneric(cls): # type: ignore
|
||||||
_initial_state_T = item # type: ignore
|
_initial_state_T = item # type: ignore
|
||||||
@@ -569,8 +606,171 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
|
f"Initial state must be dict or BaseModel, got {type(self.initial_state)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Cache thread-safe primitive types
|
||||||
|
THREAD_SAFE_TYPES = {
|
||||||
|
type(threading.RLock()): threading.RLock,
|
||||||
|
type(threading.Lock()): threading.Lock,
|
||||||
|
type(threading.Semaphore()): threading.Semaphore,
|
||||||
|
type(threading.Event()): threading.Event,
|
||||||
|
type(threading.Condition()): threading.Condition,
|
||||||
|
type(asyncio.Lock()): asyncio.Lock,
|
||||||
|
type(asyncio.Event()): asyncio.Event,
|
||||||
|
type(asyncio.Condition()): asyncio.Condition,
|
||||||
|
type(asyncio.Semaphore()): asyncio.Semaphore,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_thread_safe_primitive_type(self, value: Any) -> Optional[Type[LockProtocol]]:
|
||||||
|
"""Get the type of a thread-safe primitive for recreation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: Any Python value to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The type of the thread-safe primitive, or None if not a primitive
|
||||||
|
"""
|
||||||
|
return (self.THREAD_SAFE_TYPES.get(type(value))
|
||||||
|
if hasattr(value, '_is_owned') and hasattr(value, 'acquire')
|
||||||
|
else None)
|
||||||
|
|
||||||
|
@functools.lru_cache(maxsize=128)
|
||||||
|
def _get_dataclass_fields(self, cls):
|
||||||
|
"""Get cached dataclass fields.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cls: Dataclass type
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict mapping field names to Field objects
|
||||||
|
"""
|
||||||
|
return {field.name: field for field in dataclasses.fields(cls)}
|
||||||
|
|
||||||
|
def _serialize_dataclass(self, value: Any) -> Union[Dict[str, Any], Any]:
|
||||||
|
"""Serialize a dataclass instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: A dataclass instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A new instance of the dataclass with thread-safe primitives recreated
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hasattr(value, '__class__'):
|
||||||
|
return value
|
||||||
|
|
||||||
|
if hasattr(value, '__pydantic_validate__'):
|
||||||
|
return value.__pydantic_validate__()
|
||||||
|
|
||||||
|
# Get field values, handling thread-safe primitives
|
||||||
|
field_values = {}
|
||||||
|
for field_name, field in self._get_dataclass_fields(value.__class__).items():
|
||||||
|
field_value = getattr(value, field_name)
|
||||||
|
primitive_type = self._get_thread_safe_primitive_type(field_value)
|
||||||
|
if primitive_type is not None:
|
||||||
|
field_values[field_name] = primitive_type()
|
||||||
|
else:
|
||||||
|
field_values[field_name] = self._serialize_value(field_value)
|
||||||
|
|
||||||
|
# Create new instance
|
||||||
|
return value.__class__(**field_values)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Dataclass serialization error for {type(value)}: {str(e)}")
|
||||||
|
raise SerializationError(f"Failed to serialize dataclass {type(value)}") from e
|
||||||
|
|
||||||
|
def _serialize_value(self, value: Any) -> Any:
|
||||||
|
"""Recursively serialize a value, handling thread locks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: Any Python value to serialize
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Serialized version of the value with thread-safe primitives handled
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SerializationError: If serialization fails
|
||||||
|
"""
|
||||||
|
with self._performance_monitor(f"serialize_{type(value).__name__}"):
|
||||||
|
try:
|
||||||
|
# Handle None
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Handle thread-safe primitives
|
||||||
|
primitive_type = self._get_thread_safe_primitive_type(value)
|
||||||
|
if primitive_type is not None:
|
||||||
|
return primitive_type()
|
||||||
|
|
||||||
|
# Handle Pydantic models
|
||||||
|
if isinstance(value, BaseModel):
|
||||||
|
model_class = type(value)
|
||||||
|
model_data = value.model_dump(exclude_none=True)
|
||||||
|
|
||||||
|
# Create new instance
|
||||||
|
instance = model_class(**model_data)
|
||||||
|
|
||||||
|
# Copy excluded fields that are thread-safe primitives
|
||||||
|
for field_name, field in value.__class__.model_fields.items():
|
||||||
|
if field.exclude:
|
||||||
|
field_value = getattr(value, field_name, None)
|
||||||
|
if field_value is not None:
|
||||||
|
primitive_type = self._get_thread_safe_primitive_type(field_value)
|
||||||
|
if primitive_type is not None:
|
||||||
|
setattr(instance, field_name, primitive_type())
|
||||||
|
|
||||||
|
return instance
|
||||||
|
|
||||||
|
# Handle dataclasses
|
||||||
|
if dataclasses.is_dataclass(value):
|
||||||
|
return self._serialize_dataclass(value)
|
||||||
|
|
||||||
|
# Handle dictionaries
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return {
|
||||||
|
k: self._serialize_value(v)
|
||||||
|
for k, v in value.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Handle lists, tuples, and sets
|
||||||
|
if isinstance(value, (list, tuple, set)):
|
||||||
|
serialized = [self._serialize_value(item) for item in value]
|
||||||
|
return (
|
||||||
|
serialized if isinstance(value, list)
|
||||||
|
else tuple(serialized) if isinstance(value, tuple)
|
||||||
|
else set(serialized)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle other types
|
||||||
|
return value
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Serialization error for {type(value)}: {str(e)}")
|
||||||
|
raise SerializationError(f"Failed to serialize {type(value)}") from e
|
||||||
|
|
||||||
|
# Handle dataclasses
|
||||||
|
if dataclasses.is_dataclass(value):
|
||||||
|
return self._serialize_dataclass(value)
|
||||||
|
|
||||||
|
# Handle dictionaries
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return {
|
||||||
|
k: self._serialize_value(v)
|
||||||
|
for k, v in value.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Handle lists, tuples, and sets
|
||||||
|
if isinstance(value, (list, tuple, set)):
|
||||||
|
serialized = [self._serialize_value(item) for item in value]
|
||||||
|
return (
|
||||||
|
serialized if isinstance(value, list)
|
||||||
|
else tuple(serialized) if isinstance(value, tuple)
|
||||||
|
else set(serialized)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle other types
|
||||||
|
return value
|
||||||
|
|
||||||
def _copy_state(self) -> T:
|
def _copy_state(self) -> T:
|
||||||
return copy.deepcopy(self._state)
|
"""Create a deep copy of the current state."""
|
||||||
|
return self._serialize_value(self._state)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def state(self) -> T:
|
def state(self) -> T:
|
||||||
|
|||||||
182
tests/test_flow_thread_locks.py
Normal file
182
tests/test_flow_thread_locks.py
Normal file
@@ -0,0 +1,182 @@
|
|||||||
|
"""Tests for Flow with thread locks."""
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
from typing import Optional
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from pydantic import BaseModel, Field, field_validator
|
||||||
|
|
||||||
|
from crewai.flow.flow import Flow, listen, start
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadSafeState(BaseModel):
|
||||||
|
"""Test state model with thread locks."""
|
||||||
|
model_config = {
|
||||||
|
"arbitrary_types_allowed": True,
|
||||||
|
"exclude": {"lock"}
|
||||||
|
}
|
||||||
|
|
||||||
|
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||||
|
lock: Optional[threading.RLock] = Field(default=None, exclude=True)
|
||||||
|
value: str = ""
|
||||||
|
|
||||||
|
def __init__(self, **data):
|
||||||
|
super().__init__(**data)
|
||||||
|
if self.lock is None:
|
||||||
|
self.lock = threading.RLock()
|
||||||
|
|
||||||
|
|
||||||
|
class LockFlow(Flow[ThreadSafeState]):
|
||||||
|
"""Test flow with thread locks."""
|
||||||
|
initial_state = ThreadSafeState
|
||||||
|
|
||||||
|
@start()
|
||||||
|
async def step_1(self):
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.value = "step 1"
|
||||||
|
return "step 1"
|
||||||
|
|
||||||
|
@listen(step_1)
|
||||||
|
async def step_2(self, result):
|
||||||
|
with self.state.lock:
|
||||||
|
self.state.value += " -> step 2"
|
||||||
|
return result + " -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_with_thread_locks():
|
||||||
|
"""Test Flow with thread locks in state."""
|
||||||
|
flow = LockFlow()
|
||||||
|
result = asyncio.run(flow.kickoff_async())
|
||||||
|
assert result == "step 1 -> step 2"
|
||||||
|
assert flow.state.value == "step 1 -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_kickoff_async_with_lock_inputs():
|
||||||
|
"""Test kickoff_async with thread lock inputs."""
|
||||||
|
flow = LockFlow()
|
||||||
|
inputs = {
|
||||||
|
"lock": threading.RLock(),
|
||||||
|
"value": "test"
|
||||||
|
}
|
||||||
|
result = asyncio.run(flow.kickoff_async(inputs=inputs))
|
||||||
|
assert result == "step 1 -> step 2"
|
||||||
|
assert flow.state.value == "step 1 -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
class ComplexState(BaseModel):
|
||||||
|
"""Test state model with nested thread locks."""
|
||||||
|
model_config = {
|
||||||
|
"arbitrary_types_allowed": True,
|
||||||
|
"exclude": {"outer_lock"}
|
||||||
|
}
|
||||||
|
|
||||||
|
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||||
|
outer_lock: Optional[threading.RLock] = Field(default=None, exclude=True)
|
||||||
|
inner: Optional[ThreadSafeState] = Field(default_factory=ThreadSafeState)
|
||||||
|
value: str = ""
|
||||||
|
|
||||||
|
def __init__(self, **data):
|
||||||
|
super().__init__(**data)
|
||||||
|
if self.outer_lock is None:
|
||||||
|
self.outer_lock = threading.RLock()
|
||||||
|
|
||||||
|
|
||||||
|
class NestedLockFlow(Flow[ComplexState]):
|
||||||
|
"""Test flow with nested thread locks."""
|
||||||
|
initial_state = ComplexState
|
||||||
|
|
||||||
|
@start()
|
||||||
|
async def step_1(self):
|
||||||
|
with self.state.outer_lock:
|
||||||
|
with self.state.inner.lock:
|
||||||
|
self.state.value = "outer"
|
||||||
|
self.state.inner.value = "inner"
|
||||||
|
return "step 1"
|
||||||
|
|
||||||
|
@listen(step_1)
|
||||||
|
async def step_2(self, result):
|
||||||
|
with self.state.outer_lock:
|
||||||
|
with self.state.inner.lock:
|
||||||
|
self.state.value += " -> outer 2"
|
||||||
|
self.state.inner.value += " -> inner 2"
|
||||||
|
return result + " -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_with_nested_locks():
|
||||||
|
"""Test Flow with nested thread locks in state."""
|
||||||
|
flow = NestedLockFlow()
|
||||||
|
result = asyncio.run(flow.kickoff_async())
|
||||||
|
assert result == "step 1 -> step 2"
|
||||||
|
assert flow.state.value == "outer -> outer 2"
|
||||||
|
assert flow.state.inner.value == "inner -> inner 2"
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncLockState(BaseModel):
|
||||||
|
"""Test state model with async locks."""
|
||||||
|
model_config = {
|
||||||
|
"arbitrary_types_allowed": True,
|
||||||
|
"exclude": {"lock", "event"}
|
||||||
|
}
|
||||||
|
|
||||||
|
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||||
|
lock: Optional[asyncio.Lock] = Field(default=None, exclude=True)
|
||||||
|
event: Optional[asyncio.Event] = Field(default=None, exclude=True)
|
||||||
|
value: str = ""
|
||||||
|
|
||||||
|
def __init__(self, **data):
|
||||||
|
super().__init__(**data)
|
||||||
|
if self.lock is None:
|
||||||
|
self.lock = asyncio.Lock()
|
||||||
|
if self.event is None:
|
||||||
|
self.event = asyncio.Event()
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncLockFlow(Flow[AsyncLockState]):
|
||||||
|
"""Test flow with async locks."""
|
||||||
|
initial_state = AsyncLockState
|
||||||
|
|
||||||
|
@start()
|
||||||
|
async def step_1(self):
|
||||||
|
async with self.state.lock:
|
||||||
|
self.state.value = "step 1"
|
||||||
|
self.state.event.set()
|
||||||
|
return "step 1"
|
||||||
|
|
||||||
|
@listen(step_1)
|
||||||
|
async def step_2(self, result):
|
||||||
|
async with self.state.lock:
|
||||||
|
await self.state.event.wait()
|
||||||
|
self.state.value += " -> step 2"
|
||||||
|
return result + " -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_with_async_locks():
|
||||||
|
"""Test Flow with async locks in state."""
|
||||||
|
flow = AsyncLockFlow()
|
||||||
|
result = asyncio.run(flow.kickoff_async())
|
||||||
|
assert result == "step 1 -> step 2"
|
||||||
|
assert flow.state.value == "step 1 -> step 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_flow_concurrent_access():
|
||||||
|
"""Test Flow with concurrent access."""
|
||||||
|
flow = LockFlow()
|
||||||
|
results = []
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
async def run_flow():
|
||||||
|
try:
|
||||||
|
result = await flow.kickoff_async()
|
||||||
|
results.append(result)
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(e)
|
||||||
|
|
||||||
|
async def test():
|
||||||
|
tasks = [run_flow() for _ in range(10)]
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
asyncio.run(test())
|
||||||
|
assert len(results) == 10
|
||||||
|
assert not errors
|
||||||
|
assert all(result == "step 1 -> step 2" for result in results)
|
||||||
Reference in New Issue
Block a user