Refactor event system and add third-party event listeners

- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports
This commit is contained in:
Lorenze Jay
2025-02-12 10:29:27 -08:00
parent 3a89b9feab
commit f70162c064
11 changed files with 65 additions and 22 deletions

View File

@@ -18,11 +18,11 @@ from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import event_bus
from crewai.utilities.events.agent_events import (
AgentExecutionError,
AgentExecutionStarted,
)
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)

View File

@@ -43,7 +43,6 @@ from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events import event_bus
from crewai.utilities.events.crew_events import (
CrewKickoffCompleted,
CrewKickoffFailed,
@@ -55,6 +54,7 @@ from crewai.utilities.events.crew_events import (
CrewTrainFailed,
CrewTrainStarted,
)
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -471,8 +471,6 @@ class Crew(BaseModel):
)
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -970,13 +968,13 @@ class Crew(BaseModel):
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()

View File

@@ -28,8 +28,8 @@ from crewai.utilities.events import (
FlowStarted,
MethodExecutionFinished,
MethodExecutionStarted,
event_bus,
)
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)

View File

@@ -17,7 +17,7 @@ from crewai.tools import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.events import event_bus
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.event_types import (
ToolUsageError,
ToolUsageFinished,

View File

@@ -6,13 +6,14 @@ from .crew_events import (
from .agent_events import AgentExecutionStarted, AgentExecutionCompleted
from .task_events import TaskStarted, TaskCompleted, TaskFailed
from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished
from .event_bus import event_bus, EventTypes
from .event_bus import EventTypes, EventBus
from .events import emit, on
from .event_bus import EventBus
from .event_listener import EventListener
from .tool_usage_events import ToolUsageFinished, ToolUsageError
event_bus = EventBus()
event_listener = EventListener()
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
__all__ = [
AgentExecutionStarted,
@@ -29,10 +30,11 @@ __all__ = [
MethodExecutionFinished,
EventTypes,
emit,
on,
on,
event_bus,
ToolUsageFinished,
ToolUsageError,
EventBus
]

View File

@@ -0,0 +1,13 @@
from abc import ABC, abstractmethod
from crewai.utilities.events.event_bus import EventBus, event_bus
class BaseEventListener(ABC):
def __init__(self):
super().__init__()
self.setup_listeners(event_bus)
@abstractmethod
def setup_listeners(self, event_bus: EventBus):
pass

View File

@@ -29,6 +29,7 @@ class EventBus:
self._signal = Signal("event_bus")
self._handlers: Dict[Type[EventTypes], List[Callable]] = {}
# TODO: generic types
def on(self, event_type: Type[EventTypes]) -> Callable:
"""
Decorator to register an event handler for a specific event type.

View File

@@ -1,5 +1,8 @@
from pydantic import PrivateAttr
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.base_event_listener import BaseEventListener
from .agent_events import (
AgentExecutionCompleted,
@@ -11,7 +14,6 @@ from .crew_events import (
CrewTestCompleted,
CrewTestStarted,
)
from .event_bus import EventBus
from .flow_events import (
FlowFinished,
FlowStarted,
@@ -21,16 +23,15 @@ from .flow_events import (
from .task_events import TaskCompleted, TaskStarted
class EventListener:
_telemetry = Telemetry()
class EventListener(BaseEventListener):
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
def __init__(self):
self._setup_listeners()
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
def _setup_listeners(self):
event_bus = EventBus()
def setup_listeners(self, event_bus):
@event_bus.on(CrewKickoffStarted)
def on_crew_started(source, event):
print(f"🚀 Crew '{event.crew_name}' started", event.timestamp)
@@ -95,3 +96,6 @@ class EventListener:
@event_bus.on(MethodExecutionFinished)
def on_method_execution_finished(source, event):
print(f"👍 Flow Method Finished: '{event.method_name}'")
event_listener = EventListener()

View File

@@ -0,0 +1 @@
from .agentops_listener import agentops_listener

View File

@@ -0,0 +1,25 @@
from crewai.utilities.events.agent_events import AgentExecutionStarted
from crewai.utilities.events.base_event_listener import BaseEventListener
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
def __init__(self):
super().__init__()
print("AgentOpsListener init")
def setup_listeners(self, event_bus):
if AGENTOPS_INSTALLED:
@event_bus.on(AgentExecutionStarted)
def on_agent_started(source, event: AgentExecutionStarted):
print("AGENTOPS WORKS !!!", event.agent)
agentops_listener = AgentOpsListener()

View File

@@ -1,5 +1,4 @@
from datetime import datetime
from unittest import mock
from unittest.mock import patch
import pytest