From d509dc74b0c6cde0a43035c90d35dbd23a244300 Mon Sep 17 00:00:00 2001 From: Joao Moura Date: Wed, 7 Jan 2026 22:23:14 -0800 Subject: [PATCH] fix: improve error handling for HumanFeedbackPending in flow execution Refined the flow error management to emit a paused event for HumanFeedbackPending exceptions instead of treating them as failures. This enhancement allows the flow to better manage human feedback scenarios, ensuring that the execution state is preserved and appropriately handled without signaling an error. Regular failure events are still emitted for other exceptions, maintaining robust error reporting. --- lib/crewai/src/crewai/flow/flow.py | 56 +++++++++++++++++++----------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index ba5f2291f..dcc0c78bc 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1602,29 +1602,45 @@ class Flow(Generic[T], metaclass=FlowMeta): return result except Exception as e: - if not self.suppress_flow_events: - # Check if this is a HumanFeedbackPending exception (paused, not failed) - from crewai.flow.async_feedback.types import HumanFeedbackPending + # Check if this is a HumanFeedbackPending exception (paused, not failed) + from crewai.flow.async_feedback.types import HumanFeedbackPending - if isinstance(e, HumanFeedbackPending): - # Auto-save pending feedback (create default persistence if needed) - if self._persistence is None: - from crewai.flow.persistence import SQLiteFlowPersistence + if isinstance(e, HumanFeedbackPending): + # Auto-save pending feedback (create default persistence if needed) + if self._persistence is None: + from crewai.flow.persistence import SQLiteFlowPersistence - self._persistence = SQLiteFlowPersistence() + self._persistence = SQLiteFlowPersistence() - # Regular failure - future = crewai_event_bus.emit( - self, - MethodExecutionFailedEvent( - type="method_execution_failed", - method_name=method_name, - flow_name=self.name or self.__class__.__name__, - error=e, - ), - ) - if future: - self._event_futures.append(future) + # Emit paused event (not failed) + if not self.suppress_flow_events: + future = crewai_event_bus.emit( + self, + MethodExecutionPausedEvent( + type="method_execution_paused", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + state=self._copy_and_serialize_state(), + flow_id=e.context.flow_id, + message=e.context.message, + emit=e.context.emit, + ), + ) + if future: + self._event_futures.append(future) + elif not self.suppress_flow_events: + # Regular failure - emit failed event + future = crewai_event_bus.emit( + self, + MethodExecutionFailedEvent( + type="method_execution_failed", + method_name=method_name, + flow_name=self.name or self.__class__.__name__, + error=e, + ), + ) + if future: + self._event_futures.append(future) raise e def _copy_and_serialize_state(self) -> dict[str, Any]: