From f70162c064d7d2f38bf6b75e92d87ed2791bb1d9 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Wed, 12 Feb 2025 10:29:27 -0800 Subject: [PATCH] Refactor event system and add third-party event listeners - Move event_bus import to correct module paths - Introduce BaseEventListener abstract base class - Add AgentOpsListener for third-party event tracking - Update event listener initialization and setup - Clean up event-related imports and exports --- src/crewai/agents/crew_agent_executor.py | 2 +- src/crewai/crew.py | 8 +++--- src/crewai/flow/flow.py | 2 +- src/crewai/tools/tool_usage.py | 2 +- src/crewai/utilities/events/__init__.py | 14 ++++++----- .../utilities/events/base_event_listener.py | 13 ++++++++++ src/crewai/utilities/events/event_bus.py | 1 + src/crewai/utilities/events/event_listener.py | 18 +++++++------ .../utilities/events/third_party/__init__.py | 1 + .../events/third_party/agentops_listener.py | 25 +++++++++++++++++++ tests/utilities/test_events.py | 1 - 11 files changed, 65 insertions(+), 22 deletions(-) create mode 100644 src/crewai/utilities/events/base_event_listener.py create mode 100644 src/crewai/utilities/events/third_party/__init__.py create mode 100644 src/crewai/utilities/events/third_party/agentops_listener.py diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 1b267457c..d237444b3 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -18,11 +18,11 @@ from crewai.tools.base_tool import BaseTool from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.utilities import I18N, Printer from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE -from crewai.utilities.events import event_bus from crewai.utilities.events.agent_events import ( AgentExecutionError, AgentExecutionStarted, ) +from crewai.utilities.events.event_bus import event_bus from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededException, ) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index ef26b1d39..e6186825c 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -43,7 +43,6 @@ from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator -from crewai.utilities.events import event_bus from crewai.utilities.events.crew_events import ( CrewKickoffCompleted, CrewKickoffFailed, @@ -55,6 +54,7 @@ from crewai.utilities.events.crew_events import ( CrewTrainFailed, CrewTrainStarted, ) +from crewai.utilities.events.event_bus import event_bus from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -471,8 +471,6 @@ class Crew(BaseModel): ) return self - - @property def key(self) -> str: source = [agent.key for agent in self.agents] + [ @@ -970,13 +968,13 @@ class Crew(BaseModel): def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: if not task_outputs: raise ValueError("No task outputs available to create crew output.") - + # Filter out empty outputs and get the last valid one as the main output valid_outputs = [t for t in task_outputs if t.raw] if not valid_outputs: raise ValueError("No valid task outputs available to create crew output.") final_task_output = valid_outputs[-1] - + final_string_output = final_task_output.raw self._finish_execution(final_string_output) token_usage = self.calculate_usage_metrics() diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 0ea3e87e4..c2abc0a89 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -28,8 +28,8 @@ from crewai.utilities.events import ( FlowStarted, MethodExecutionFinished, MethodExecutionStarted, - event_bus, ) +from crewai.utilities.events.event_bus import event_bus from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) diff --git a/src/crewai/tools/tool_usage.py b/src/crewai/tools/tool_usage.py index 9a09375bd..c58fbeb74 100644 --- a/src/crewai/tools/tool_usage.py +++ b/src/crewai/tools/tool_usage.py @@ -17,7 +17,7 @@ from crewai.tools import BaseTool from crewai.tools.structured_tool import CrewStructuredTool from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling from crewai.utilities import I18N, Converter, ConverterError, Printer -from crewai.utilities.events import event_bus +from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_types import ( ToolUsageError, ToolUsageFinished, diff --git a/src/crewai/utilities/events/__init__.py b/src/crewai/utilities/events/__init__.py index de1d74da2..64392e13e 100644 --- a/src/crewai/utilities/events/__init__.py +++ b/src/crewai/utilities/events/__init__.py @@ -6,13 +6,14 @@ from .crew_events import ( from .agent_events import AgentExecutionStarted, AgentExecutionCompleted from .task_events import TaskStarted, TaskCompleted, TaskFailed from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished -from .event_bus import event_bus, EventTypes +from .event_bus import EventTypes, EventBus from .events import emit, on -from .event_bus import EventBus -from .event_listener import EventListener from .tool_usage_events import ToolUsageFinished, ToolUsageError -event_bus = EventBus() -event_listener = EventListener() + +# events +from .event_listener import EventListener +from .third_party.agentops_listener import agentops_listener + __all__ = [ AgentExecutionStarted, @@ -29,10 +30,11 @@ __all__ = [ MethodExecutionFinished, EventTypes, emit, - on, + on, event_bus, ToolUsageFinished, ToolUsageError, + EventBus ] diff --git a/src/crewai/utilities/events/base_event_listener.py b/src/crewai/utilities/events/base_event_listener.py new file mode 100644 index 000000000..77567bfef --- /dev/null +++ b/src/crewai/utilities/events/base_event_listener.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod + +from crewai.utilities.events.event_bus import EventBus, event_bus + + +class BaseEventListener(ABC): + def __init__(self): + super().__init__() + self.setup_listeners(event_bus) + + @abstractmethod + def setup_listeners(self, event_bus: EventBus): + pass diff --git a/src/crewai/utilities/events/event_bus.py b/src/crewai/utilities/events/event_bus.py index 6651a09af..18bec8373 100644 --- a/src/crewai/utilities/events/event_bus.py +++ b/src/crewai/utilities/events/event_bus.py @@ -29,6 +29,7 @@ class EventBus: self._signal = Signal("event_bus") self._handlers: Dict[Type[EventTypes], List[Callable]] = {} + # TODO: generic types def on(self, event_type: Type[EventTypes]) -> Callable: """ Decorator to register an event handler for a specific event type. diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index b3121ef50..6d771bc49 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -1,5 +1,8 @@ +from pydantic import PrivateAttr + from crewai.telemetry.telemetry import Telemetry from crewai.utilities.evaluators.task_evaluator import TaskEvaluator +from crewai.utilities.events.base_event_listener import BaseEventListener from .agent_events import ( AgentExecutionCompleted, @@ -11,7 +14,6 @@ from .crew_events import ( CrewTestCompleted, CrewTestStarted, ) -from .event_bus import EventBus from .flow_events import ( FlowFinished, FlowStarted, @@ -21,16 +23,15 @@ from .flow_events import ( from .task_events import TaskCompleted, TaskStarted -class EventListener: - _telemetry = Telemetry() +class EventListener(BaseEventListener): + _telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry()) def __init__(self): - self._setup_listeners() + super().__init__() + self._telemetry = Telemetry() self._telemetry.set_tracer() - def _setup_listeners(self): - event_bus = EventBus() - + def setup_listeners(self, event_bus): @event_bus.on(CrewKickoffStarted) def on_crew_started(source, event): print(f"🚀 Crew '{event.crew_name}' started", event.timestamp) @@ -95,3 +96,6 @@ class EventListener: @event_bus.on(MethodExecutionFinished) def on_method_execution_finished(source, event): print(f"👍 Flow Method Finished: '{event.method_name}'") + + +event_listener = EventListener() diff --git a/src/crewai/utilities/events/third_party/__init__.py b/src/crewai/utilities/events/third_party/__init__.py new file mode 100644 index 000000000..e9de52477 --- /dev/null +++ b/src/crewai/utilities/events/third_party/__init__.py @@ -0,0 +1 @@ +from .agentops_listener import agentops_listener diff --git a/src/crewai/utilities/events/third_party/agentops_listener.py b/src/crewai/utilities/events/third_party/agentops_listener.py new file mode 100644 index 000000000..7502ccde0 --- /dev/null +++ b/src/crewai/utilities/events/third_party/agentops_listener.py @@ -0,0 +1,25 @@ +from crewai.utilities.events.agent_events import AgentExecutionStarted +from crewai.utilities.events.base_event_listener import BaseEventListener + +try: + import agentops + + AGENTOPS_INSTALLED = True +except ImportError: + AGENTOPS_INSTALLED = False + + +class AgentOpsListener(BaseEventListener): + def __init__(self): + super().__init__() + print("AgentOpsListener init") + + def setup_listeners(self, event_bus): + if AGENTOPS_INSTALLED: + + @event_bus.on(AgentExecutionStarted) + def on_agent_started(source, event: AgentExecutionStarted): + print("AGENTOPS WORKS !!!", event.agent) + + +agentops_listener = AgentOpsListener() diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index f7f7a8d09..3d3682d38 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -1,5 +1,4 @@ from datetime import datetime -from unittest import mock from unittest.mock import patch import pytest