mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-19 04:48:13 +00:00
Compare commits
4 Commits
devin/1768
...
heitor/add
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2fd5c46b42 | ||
|
|
97fed5229f | ||
|
|
0f28d14e61 | ||
|
|
b4b6434480 |
@@ -33,6 +33,7 @@ from crewai.events.types.crew_events import (
|
|||||||
)
|
)
|
||||||
from crewai.events.types.flow_events import (
|
from crewai.events.types.flow_events import (
|
||||||
FlowCreatedEvent,
|
FlowCreatedEvent,
|
||||||
|
FlowFailedEvent,
|
||||||
FlowFinishedEvent,
|
FlowFinishedEvent,
|
||||||
FlowPlotEvent,
|
FlowPlotEvent,
|
||||||
FlowStartedEvent,
|
FlowStartedEvent,
|
||||||
@@ -194,6 +195,22 @@ class TraceCollectionListener(BaseEventListener):
|
|||||||
@event_bus.on(FlowFinishedEvent)
|
@event_bus.on(FlowFinishedEvent)
|
||||||
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
|
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
|
||||||
self._handle_trace_event("flow_finished", source, event)
|
self._handle_trace_event("flow_finished", source, event)
|
||||||
|
if self.batch_manager.batch_owner_type == "flow":
|
||||||
|
if self.first_time_handler.is_first_time:
|
||||||
|
self.first_time_handler.mark_events_collected()
|
||||||
|
self.first_time_handler.handle_execution_completion()
|
||||||
|
else:
|
||||||
|
self.batch_manager.finalize_batch()
|
||||||
|
|
||||||
|
@event_bus.on(FlowFailedEvent)
|
||||||
|
def on_flow_failed(source: Any, event: FlowFailedEvent) -> None:
|
||||||
|
self._handle_trace_event("flow_failed", source, event)
|
||||||
|
if self.batch_manager.batch_owner_type == "flow":
|
||||||
|
if self.first_time_handler.is_first_time:
|
||||||
|
self.first_time_handler.mark_events_collected()
|
||||||
|
self.first_time_handler.handle_execution_completion()
|
||||||
|
else:
|
||||||
|
self.batch_manager.finalize_batch()
|
||||||
|
|
||||||
@event_bus.on(FlowPlotEvent)
|
@event_bus.on(FlowPlotEvent)
|
||||||
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:
|
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:
|
||||||
|
|||||||
@@ -67,6 +67,16 @@ class FlowFinishedEvent(FlowEvent):
|
|||||||
state: dict[str, Any] | BaseModel
|
state: dict[str, Any] | BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class FlowFailedEvent(FlowEvent):
|
||||||
|
"""Event emitted when a flow fails execution"""
|
||||||
|
|
||||||
|
flow_name: str
|
||||||
|
error: Exception
|
||||||
|
type: str = "flow_failed"
|
||||||
|
|
||||||
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||||
|
|
||||||
|
|
||||||
class FlowPlotEvent(FlowEvent):
|
class FlowPlotEvent(FlowEvent):
|
||||||
"""Event emitted when a flow plot is created"""
|
"""Event emitted when a flow plot is created"""
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ from crewai.events.listeners.tracing.utils import (
|
|||||||
)
|
)
|
||||||
from crewai.events.types.flow_events import (
|
from crewai.events.types.flow_events import (
|
||||||
FlowCreatedEvent,
|
FlowCreatedEvent,
|
||||||
|
FlowFailedEvent,
|
||||||
FlowFinishedEvent,
|
FlowFinishedEvent,
|
||||||
FlowPlotEvent,
|
FlowPlotEvent,
|
||||||
FlowStartedEvent,
|
FlowStartedEvent,
|
||||||
@@ -977,7 +978,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
future = crewai_event_bus.emit(
|
future = crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
FlowStartedEvent(
|
FlowStartedEvent(
|
||||||
type="flow_started",
|
|
||||||
flow_name=self.name or self.__class__.__name__,
|
flow_name=self.name or self.__class__.__name__,
|
||||||
inputs=inputs,
|
inputs=inputs,
|
||||||
),
|
),
|
||||||
@@ -1005,7 +1005,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
future = crewai_event_bus.emit(
|
future = crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
FlowFinishedEvent(
|
FlowFinishedEvent(
|
||||||
type="flow_finished",
|
|
||||||
flow_name=self.name or self.__class__.__name__,
|
flow_name=self.name or self.__class__.__name__,
|
||||||
result=final_output,
|
result=final_output,
|
||||||
state=self._copy_and_serialize_state(),
|
state=self._copy_and_serialize_state(),
|
||||||
@@ -1020,15 +1019,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
)
|
)
|
||||||
self._event_futures.clear()
|
self._event_futures.clear()
|
||||||
|
|
||||||
trace_listener = TraceCollectionListener()
|
|
||||||
if trace_listener.batch_manager.batch_owner_type == "flow":
|
|
||||||
if trace_listener.first_time_handler.is_first_time:
|
|
||||||
trace_listener.first_time_handler.mark_events_collected()
|
|
||||||
trace_listener.first_time_handler.handle_execution_completion()
|
|
||||||
else:
|
|
||||||
trace_listener.batch_manager.finalize_batch()
|
|
||||||
|
|
||||||
return final_output
|
return final_output
|
||||||
|
except Exception as e:
|
||||||
|
future = crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
FlowFailedEvent(
|
||||||
|
flow_name=self.name or self.__class__.__name__,
|
||||||
|
error=e,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if future:
|
||||||
|
self._event_futures.append(future)
|
||||||
|
raise e
|
||||||
finally:
|
finally:
|
||||||
detach(flow_token)
|
detach(flow_token)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user