From 92e1877bf0d1704990050b50de98b1def487d219 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 12:07:26 +0000 Subject: [PATCH] fix: Handle thread locks in Flow state serialization - Add state serialization in Flow events to avoid pickling RLock objects - Update event emission to use serialized state - Add test case for Flow with thread locks Fixes #2120 Co-Authored-By: Joe Moura --- src/crewai/flow/flow.py | 29 ++++++++++++++++++++++++++--- tests/flow_test.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 2babbe57c..99c865e2f 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -819,9 +819,14 @@ class Flow(Generic[T], metaclass=FlowMeta): self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: try: - dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | ( - kwargs or {} + # Serialize state before event emission to avoid pickling issues + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) ) + + dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) crewai_event_bus.emit( self, MethodExecutionStartedEvent( @@ -829,7 +834,7 @@ class Flow(Generic[T], metaclass=FlowMeta): method_name=method_name, flow_name=self.__class__.__name__, params=dumped_params, - state=self._copy_state(), + state=state_copy, ), ) @@ -844,6 +849,24 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_execution_counts.get(method_name, 0) + 1 ) + # Serialize state after execution to avoid pickling issues + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) + ) + + crewai_event_bus.emit( + self, + MethodExecutionFinishedEvent( + type="method_execution_finished", + method_name=method_name, + flow_name=self.__class__.__name__, + state=state_copy, + result=result, + ), + ) + crewai_event_bus.emit( self, MethodExecutionFinishedEvent( diff --git a/tests/flow_test.py b/tests/flow_test.py index b2edcfa5a..acc9ffa25 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -350,6 +350,35 @@ def test_flow_uuid_structured(): assert flow.state.message == "final" +def test_flow_with_thread_lock(): + """Test that Flow properly handles thread locks in state.""" + import threading + + class LockFlow(Flow): + def __init__(self): + super().__init__() + self.lock = threading.RLock() + self.counter = 0 + + @start() + async def step_1(self): + with self.lock: + self.counter += 1 + return "step 1" + + @listen(step_1) + async def step_2(self, result): + with self.lock: + self.counter += 1 + return result + " -> step 2" + + flow = LockFlow() + result = flow.kickoff() + + assert result == "step 1 -> step 2" + assert flow.counter == 2 + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).