fix: Handle thread locks in Flow state serialization

- Add state serialization in Flow events to avoid pickling RLock objects
- Update event emission to use serialized state
- Add test case for Flow with thread locks

Fixes #2120

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-02-13 12:07:26 +00:00
parent 96a7e8038f
commit 92e1877bf0
2 changed files with 55 additions and 3 deletions

View File

@@ -819,9 +819,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:
try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (
kwargs or {}
# Serialize state before event emission to avoid pickling issues
state_copy = (
type(self._state)(**self._state.model_dump())
if isinstance(self._state, BaseModel)
else dict(self._state)
)
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {})
crewai_event_bus.emit(
self,
MethodExecutionStartedEvent(
@@ -829,7 +834,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
method_name=method_name,
flow_name=self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
state=state_copy,
),
)
@@ -844,6 +849,24 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts.get(method_name, 0) + 1
)
# Serialize state after execution to avoid pickling issues
state_copy = (
type(self._state)(**self._state.model_dump())
if isinstance(self._state, BaseModel)
else dict(self._state)
)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
state=state_copy,
result=result,
),
)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(