Add regression tests for issue #3828: Flow with unpickleable objects in state

- Add test_flow_with_rlock_in_state to verify Flow works with threading.RLock in state
- Add test_flow_with_nested_unpickleable_objects to verify Flow works with unpickleable objects nested in containers
- These tests ensure the issue from version 1.3.0 (TypeError: cannot pickle '_thread.RLock' object) doesn't get reintroduced
- The issue was resolved in the current main branch by removing the _copy_state() method that used copy.deepcopy()
- Tests verify that flows with memory components or other resources containing locks work correctly

Fixes #3828

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-11-04 10:27:39 +00:00
parent 329567153b
commit 8c0da6c129

View File

@@ -1384,3 +1384,89 @@ def test_mixed_sync_async_execution_order():
]
assert execution_order == expected_order
def test_flow_with_rlock_in_state():
"""Test that Flow can handle unpickleable objects like RLock in state.
Regression test for issue #3828: Flow should not crash when state contains
objects that cannot be deep copied (like threading.RLock).
In version 1.3.0, Flow._copy_state() used copy.deepcopy() which would fail
with "TypeError: cannot pickle '_thread.RLock' object" when state contained
threading locks (e.g., from memory components or LLM instances).
The current implementation no longer deep copies state, so this test verifies
that flows with unpickleable objects in state work correctly.
"""
execution_order = []
class StateWithRLock(BaseModel):
model_config = {"arbitrary_types_allowed": True}
counter: int = 0
lock: threading.RLock = None
class FlowWithRLock(Flow[StateWithRLock]):
@start()
def step_1(self):
execution_order.append("step_1")
self.state.counter += 1
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
self.state.counter += 1
flow = FlowWithRLock()
flow._state.lock = threading.RLock()
flow.kickoff()
assert execution_order == ["step_1", "step_2"]
assert flow.state.counter == 2
def test_flow_with_nested_unpickleable_objects():
"""Test that Flow can handle unpickleable objects nested in containers.
Regression test for issue #3828: Verifies that unpickleable objects
nested inside dicts/lists in state don't cause crashes.
This simulates real-world scenarios where memory components or other
resources with locks might be stored in nested data structures.
"""
execution_order = []
class NestedState(BaseModel):
model_config = {"arbitrary_types_allowed": True}
data: dict = {}
items: list = []
class FlowWithNestedUnpickleable(Flow[NestedState]):
@start()
def step_1(self):
execution_order.append("step_1")
self.state.data["lock"] = threading.RLock()
self.state.data["value"] = 42
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
self.state.items.append(threading.Lock())
self.state.items.append("normal_value")
@listen(step_2)
def step_3(self):
execution_order.append("step_3")
assert self.state.data["value"] == 42
assert len(self.state.items) == 2
flow = FlowWithNestedUnpickleable()
flow.kickoff()
assert execution_order == ["step_1", "step_2", "step_3"]
assert flow.state.data["value"] == 42
assert len(flow.state.items) == 2