Compare commits

...

2 Commits

Author SHA1 Message Date
Joao Moura
d509dc74b0 fix: improve error handling for HumanFeedbackPending in flow execution
Refined the flow error management to emit a paused event for HumanFeedbackPending exceptions instead of treating them as failures. This enhancement allows the flow to better manage human feedback scenarios, ensuring that the execution state is preserved and appropriately handled without signaling an error. Regular failure events are still emitted for other exceptions, maintaining robust error reporting.
2026-01-07 22:23:14 -08:00
Joao Moura
4640c2c67c fix: handle HumanFeedbackPending in flow error management
Updated the flow error handling to treat HumanFeedbackPending as expected control flow rather than an error. This change ensures that the flow can appropriately manage human feedback scenarios without signaling an error, improving the robustness of the flow execution.
2026-01-07 19:39:31 -08:00

View File

@@ -1203,7 +1203,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = self.kickoff(inputs=inputs) result = self.kickoff(inputs=inputs)
result_holder.append(result) result_holder.append(result)
except Exception as e: except Exception as e:
signal_error(state, e) # HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e)
finally: finally:
self.stream = True self.stream = True
signal_end(state) signal_end(state)
@@ -1258,7 +1264,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = await self.kickoff_async(inputs=inputs) result = await self.kickoff_async(inputs=inputs)
result_holder.append(result) result_holder.append(result)
except Exception as e: except Exception as e:
signal_error(state, e, is_async=True) # HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e, is_async=True)
finally: finally:
self.stream = True self.stream = True
signal_end(state, is_async=True) signal_end(state, is_async=True)
@@ -1590,29 +1602,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result return result
except Exception as e: except Exception as e:
if not self.suppress_flow_events: # Check if this is a HumanFeedbackPending exception (paused, not failed)
# Check if this is a HumanFeedbackPending exception (paused, not failed) from crewai.flow.async_feedback.types import HumanFeedbackPending
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending): if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed) # Auto-save pending feedback (create default persistence if needed)
if self._persistence is None: if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence() self._persistence = SQLiteFlowPersistence()
# Regular failure # Emit paused event (not failed)
future = crewai_event_bus.emit( if not self.suppress_flow_events:
self, future = crewai_event_bus.emit(
MethodExecutionFailedEvent( self,
type="method_execution_failed", MethodExecutionPausedEvent(
method_name=method_name, type="method_execution_paused",
flow_name=self.name or self.__class__.__name__, method_name=method_name,
error=e, flow_name=self.name or self.__class__.__name__,
), state=self._copy_and_serialize_state(),
) flow_id=e.context.flow_id,
if future: message=e.context.message,
self._event_futures.append(future) emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
elif not self.suppress_flow_events:
# Regular failure - emit failed event
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e raise e
def _copy_and_serialize_state(self) -> dict[str, Any]: def _copy_and_serialize_state(self) -> dict[str, Any]: