Compare commits

...

4 Commits

Author SHA1 Message Date
Heitor Sammuel Carvalho
2fd5c46b42 Emit FlowFailedEvent and finalise batch via added event listener 2025-12-18 11:04:46 -03:00
Heitor Sammuel Carvalho
97fed5229f Remove redundant parameter from emit call 2025-12-18 11:03:53 -03:00
Heitor Sammuel Carvalho
0f28d14e61 Move flow trace collection and batch finalisation to event listener 2025-12-18 11:03:10 -03:00
Heitor Sammuel Carvalho
b4b6434480 Add flow failed event 2025-12-18 11:00:16 -03:00
3 changed files with 39 additions and 10 deletions

View File

@@ -33,6 +33,7 @@ from crewai.events.types.crew_events import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -194,6 +195,22 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowFailedEvent)
def on_flow_failed(source: Any, event: FlowFailedEvent) -> None:
self._handle_trace_event("flow_failed", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:

View File

@@ -67,6 +67,16 @@ class FlowFinishedEvent(FlowEvent):
state: dict[str, Any] | BaseModel
class FlowFailedEvent(FlowEvent):
"""Event emitted when a flow fails execution"""
flow_name: str
error: Exception
type: str = "flow_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""

View File

@@ -40,6 +40,7 @@ from crewai.events.listeners.tracing.utils import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -977,7 +978,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
@@ -1005,7 +1005,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
state=self._copy_and_serialize_state(),
@@ -1020,15 +1019,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
except Exception as e:
future = crewai_event_bus.emit(
self,
FlowFailedEvent(
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e
finally:
detach(flow_token)