fix: Handle thread locks in Flow state serialization

- Add state serialization in Flow events to avoid pickling RLock objects
- Update event emission to use serialized state
- Add test case for Flow with thread locks

Fixes #2120

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-02-13 12:07:26 +00:00
parent d3b398ed52
commit 252095a668
3 changed files with 57 additions and 2 deletions

View File

@@ -807,6 +807,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def _execute_method( async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any: ) -> Any:
# Serialize state before event emission to avoid pickling issues
state_copy = (
type(self._state)(**self._state.model_dump())
if isinstance(self._state, BaseModel)
else dict(self._state)
)
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
self.event_emitter.send( self.event_emitter.send(
self, self,
@@ -815,7 +822,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
method_name=method_name, method_name=method_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
params=dumped_params, params=dumped_params,
state=self._copy_state(), state=state_copy,
), ),
) )
@@ -829,13 +836,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts.get(method_name, 0) + 1 self._method_execution_counts.get(method_name, 0) + 1
) )
# Serialize state after execution
state_copy = (
type(self._state)(**self._state.model_dump())
if isinstance(self._state, BaseModel)
else dict(self._state)
)
self.event_emitter.send( self.event_emitter.send(
self, self,
event=MethodExecutionFinishedEvent( event=MethodExecutionFinishedEvent(
type="method_execution_finished", type="method_execution_finished",
method_name=method_name, method_name=method_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
state=self._copy_state(), state=state_copy,
result=result, result=result,
), ),
) )

View File

@@ -26,6 +26,12 @@ class MethodExecutionStartedEvent(Event):
state: Union[Dict[str, Any], BaseModel] state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None params: Optional[Dict[str, Any]] = None
def __post_init__(self):
super().__post_init__()
# Create a new instance of BaseModel state to avoid pickling issues
if isinstance(self.state, BaseModel):
self.state = type(self.state)(**self.state.model_dump())
@dataclass @dataclass
class MethodExecutionFinishedEvent(Event): class MethodExecutionFinishedEvent(Event):
@@ -33,6 +39,12 @@ class MethodExecutionFinishedEvent(Event):
state: Union[Dict[str, Any], BaseModel] state: Union[Dict[str, Any], BaseModel]
result: Any = None result: Any = None
def __post_init__(self):
super().__post_init__()
# Create a new instance of BaseModel state to avoid pickling issues
if isinstance(self.state, BaseModel):
self.state = type(self.state)(**self.state.model_dump())
@dataclass @dataclass
class FlowFinishedEvent(Event): class FlowFinishedEvent(Event):

View File

@@ -348,6 +348,35 @@ def test_flow_uuid_structured():
assert flow.state.message == "final" assert flow.state.message == "final"
def test_flow_with_thread_lock():
"""Test that Flow properly handles thread locks in state."""
import threading
class LockFlow(Flow):
def __init__(self):
super().__init__()
self.lock = threading.RLock()
self.counter = 0
@start()
async def step_1(self):
with self.lock:
self.counter += 1
return "step 1"
@listen(step_1)
async def step_2(self, result):
with self.lock:
self.counter += 1
return result + " -> step 2"
flow = LockFlow()
result = flow.kickoff()
assert result == "step 1 -> step 2"
assert flow.counter == 2
def test_router_with_multiple_conditions(): def test_router_with_multiple_conditions():
"""Test a router that triggers when any of multiple steps complete (OR condition), """Test a router that triggers when any of multiple steps complete (OR condition),
and another router that triggers only after all specified steps complete (AND condition). and another router that triggers only after all specified steps complete (AND condition).