diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index f1242a2bf..4e9e43162 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -807,6 +807,13 @@ class Flow(Generic[T], metaclass=FlowMeta): async def _execute_method( self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: + # Serialize state before event emission to avoid pickling issues + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) + ) + dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) self.event_emitter.send( self, @@ -815,7 +822,7 @@ class Flow(Generic[T], metaclass=FlowMeta): method_name=method_name, flow_name=self.__class__.__name__, params=dumped_params, - state=self._copy_state(), + state=state_copy, ), ) @@ -829,13 +836,20 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_execution_counts.get(method_name, 0) + 1 ) + # Serialize state after execution + state_copy = ( + type(self._state)(**self._state.model_dump()) + if isinstance(self._state, BaseModel) + else dict(self._state) + ) + self.event_emitter.send( self, event=MethodExecutionFinishedEvent( type="method_execution_finished", method_name=method_name, flow_name=self.__class__.__name__, - state=self._copy_state(), + state=state_copy, result=result, ), ) diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py index c8f9e9694..27746e9c8 100644 --- a/src/crewai/flow/flow_events.py +++ b/src/crewai/flow/flow_events.py @@ -26,6 +26,12 @@ class MethodExecutionStartedEvent(Event): state: Union[Dict[str, Any], BaseModel] params: Optional[Dict[str, Any]] = None + def __post_init__(self): + super().__post_init__() + # Create a new instance of BaseModel state to avoid pickling issues + if isinstance(self.state, BaseModel): + self.state = type(self.state)(**self.state.model_dump()) + @dataclass class MethodExecutionFinishedEvent(Event): @@ -33,6 +39,12 @@ class MethodExecutionFinishedEvent(Event): state: Union[Dict[str, Any], BaseModel] result: Any = None + def __post_init__(self): + super().__post_init__() + # Create a new instance of BaseModel state to avoid pickling issues + if isinstance(self.state, BaseModel): + self.state = type(self.state)(**self.state.model_dump()) + @dataclass class FlowFinishedEvent(Event): diff --git a/tests/flow_test.py b/tests/flow_test.py index d036f7987..fa324fab7 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -348,6 +348,35 @@ def test_flow_uuid_structured(): assert flow.state.message == "final" +def test_flow_with_thread_lock(): + """Test that Flow properly handles thread locks in state.""" + import threading + + class LockFlow(Flow): + def __init__(self): + super().__init__() + self.lock = threading.RLock() + self.counter = 0 + + @start() + async def step_1(self): + with self.lock: + self.counter += 1 + return "step 1" + + @listen(step_1) + async def step_2(self, result): + with self.lock: + self.counter += 1 + return result + " -> step 2" + + flow = LockFlow() + result = flow.kickoff() + + assert result == "step 1 -> step 2" + assert flow.counter == 2 + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).