From 252095a668acc77fc43e6cf5feb07236b34f22b4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 12:07:26 +0000 Subject: [PATCH] fix: Handle thread locks in Flow state serialization - Add state serialization in Flow events to avoid pickling RLock objects - Update event emission to use serialized state - Add test case for Flow with thread locks Fixes #2120 Co-Authored-By: Joe Moura --- src/crewai/flow/flow.py | 18 ++++++++++++++++-- src/crewai/flow/flow_events.py | 12 ++++++++++++ tests/flow_test.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index f1242a2bf..4e9e43162 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -807,6 +807,13 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_method( self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: + # Serialize state before event emission to avoid pickling issues + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) + ) + dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) self.event_emitter.send( self, @@ -815,7 +822,7 @@ class Flow(Generic[T], metaclass=FlowMeta): method_name=method_name, flow_name=self.__class__.__name__, params=dumped_params, - state=self._copy_state(), + state=state_copy, ), ) @@ -829,13 +836,20 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_execution_counts.get(method_name, 0) + 1 ) + # Serialize state after execution + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) + ) + self.event_emitter.send( self, event=MethodExecutionFinishedEvent( type="method_execution_finished", method_name=method_name, flow_name=self.__class__.__name__, - state=self._copy_state(), + state=state_copy, result=result, ), ) diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py index c8f9e9694..27746e9c8 100644 --- a/src/crewai/flow/flow_events.py +++ b/src/crewai/flow/flow_events.py @@ -26,6 +26,12 @@ class MethodExecutionStartedEvent(Event): state: Union[Dict[str, Any], BaseModel] params: Optional[Dict[str, Any]] = None + def __post_init__(self): + super().__post_init__() + # Create a new instance of BaseModel state to avoid pickling issues + if isinstance(self.state, BaseModel): + self.state = type(self.state)(**self.state.model_dump()) + @dataclass class MethodExecutionFinishedEvent(Event): @@ -33,6 +39,12 @@ class MethodExecutionFinishedEvent(Event): state: Union[Dict[str, Any], BaseModel] result: Any = None + def __post_init__(self): + super().__post_init__() + # Create a new instance of BaseModel state to avoid pickling issues + if isinstance(self.state, BaseModel): + self.state = type(self.state)(**self.state.model_dump()) + @dataclass class FlowFinishedEvent(Event): diff --git a/tests/flow_test.py b/tests/flow_test.py index d036f7987..fa324fab7 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -348,6 +348,35 @@ def test_flow_uuid_structured(): assert flow.state.message == "final" +def test_flow_with_thread_lock(): + """Test that Flow properly handles thread locks in state.""" + import threading + + class LockFlow(Flow): + def __init__(self): + super().__init__() + self.lock = threading.RLock() + self.counter = 0 + + @start() + async def step_1(self): + with self.lock: + self.counter += 1 + return "step 1" + + @listen(step_1) + async def step_2(self, result): + with self.lock: + self.counter += 1 + return result + " -> step 2" + + flow = LockFlow() + result = flow.kickoff() + + assert result == "step 1 -> step 2" + assert flow.counter == 2 + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).