fix: emit flow_finished event after HITL resume

resume_async() was missing trace infrastructure that kickoff_async()
sets up, causing flow_finished to never reach the platform after HITL
feedback. Add FlowStartedEvent emission to initialize the trace batch,
await event futures, finalize the trace batch, and guard with
suppress_flow_events.
This commit is contained in:
Greyson LaLonde
2026-04-09 05:31:31 +08:00
committed by GitHub
parent 8cdde16ac8
commit 9ab67552a7

View File

@@ -1455,6 +1455,25 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"No pending feedback context. Use from_pending() to restore a paused flow."
)
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=None,
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
@@ -1594,16 +1613,39 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
final_result = self._method_outputs[-1] if self._method_outputs else result
# Emit flow finished
crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._state,
),
)
if self._event_futures:
await asyncio.gather(
*[
asyncio.wrap_future(f)
for f in self._event_futures
if isinstance(f, Future)
]
)
self._event_futures.clear()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._copy_and_serialize_state(),
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_result