diff --git a/tests/flow_test.py b/tests/flow_test.py index d52c459ce..4ade6c5e3 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -1,8 +1,10 @@ """Test Flow creation and execution basic functionality.""" import asyncio +import threading import pytest +from pydantic import BaseModel from crewai.flow.flow import Flow, and_, listen, or_, router, start @@ -322,3 +324,91 @@ def test_router_with_multiple_conditions(): # final_step should run after router_and assert execution_order.index("log_final_step") > execution_order.index("router_and") + + +def test_flow_with_rlock_in_state(): + """Test that Flow can handle unpickleable objects like RLock in state. + + Regression test for issue #3828: Flow should not crash when state contains + objects that cannot be deep copied (like threading.RLock). + + In version 1.3.0, Flow._copy_state() used copy.deepcopy() which would fail + with "TypeError: cannot pickle '_thread.RLock' object" when state contained + threading locks (e.g., from memory components or LLM instances). + + The current implementation no longer deep copies state, so this test verifies + that flows with unpickleable objects in state work correctly. + """ + execution_order = [] + + class StateWithRLock(BaseModel): + class Config: + arbitrary_types_allowed = True + + counter: int = 0 + lock: threading.RLock = None + + class FlowWithRLock(Flow[StateWithRLock]): + @start() + def step_1(self): + execution_order.append("step_1") + self.state.counter += 1 + + @listen(step_1) + def step_2(self): + execution_order.append("step_2") + self.state.counter += 1 + + flow = FlowWithRLock() + flow._state.lock = threading.RLock() + + flow.kickoff() + + assert execution_order == ["step_1", "step_2"] + assert flow.state.counter == 2 + + +def test_flow_with_nested_unpickleable_objects(): + """Test that Flow can handle unpickleable objects nested in containers. + + Regression test for issue #3828: Verifies that unpickleable objects + nested inside dicts/lists in state don't cause crashes. + + This simulates real-world scenarios where memory components or other + resources with locks might be stored in nested data structures. + """ + execution_order = [] + + class NestedState(BaseModel): + class Config: + arbitrary_types_allowed = True + + data: dict = {} + items: list = [] + + class FlowWithNestedUnpickleable(Flow[NestedState]): + @start() + def step_1(self): + execution_order.append("step_1") + self.state.data["lock"] = threading.RLock() + self.state.data["value"] = 42 + + @listen(step_1) + def step_2(self): + execution_order.append("step_2") + self.state.items.append(threading.Lock()) + self.state.items.append("normal_value") + + @listen(step_2) + def step_3(self): + execution_order.append("step_3") + assert self.state.data["value"] == 42 + assert len(self.state.items) == 2 + + flow = FlowWithNestedUnpickleable() + + flow.kickoff() + + assert execution_order == ["step_1", "step_2", "step_3"] + assert flow.state.data["value"] == 42 + assert len(flow.state.items) == 2