From 64569ce130790f8a9a6a309e3a02cf088b5cd80f Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Tue, 18 Feb 2025 08:36:21 -0800 Subject: [PATCH] Rename event_bus to crewai_event_bus for improved clarity and specificity - Replace all references to `event_bus` with `crewai_event_bus` - Update import statements across multiple files - Remove the old `event_bus.py` file - Maintain existing event handling functionality --- src/crewai/agent.py | 6 +- src/crewai/agents/crew_agent_executor.py | 6 +- src/crewai/crew.py | 36 ++---- src/crewai/flow/flow.py | 23 ++-- .../knowledge/storage/knowledge_storage.py | 2 +- src/crewai/task.py | 8 +- src/crewai/tools/tool_usage.py | 10 +- .../utilities/evaluators/task_evaluator.py | 10 +- src/crewai/utilities/events/__init__.py | 2 +- .../utilities/events/base_event_listener.py | 6 +- src/crewai/utilities/events/event_bus.py | 114 ------------------ src/crewai/utilities/events/event_listener.py | 76 ++++++------ .../events/third_party/agentops_listener.py | 71 +++++------ tests/agent_test.py | 7 +- tests/crew_test.py | 12 +- tests/flow_test.py | 24 ++-- tests/utilities/test_events.py | 65 +++++----- 17 files changed, 182 insertions(+), 296 deletions(-) delete mode 100644 src/crewai/utilities/events/event_bus.py diff --git a/src/crewai/agent.py b/src/crewai/agent.py index 9daef6517..20f477aaf 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -23,7 +23,7 @@ from crewai.utilities.events.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.llm_utils import create_llm from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler @@ -240,7 +240,7 @@ class Agent(BaseAgent): } )["output"] except Exception as e: - event_bus.emit( + crewai_event_bus.emit( self, event=AgentExecutionErrorEvent( agent=self, @@ -265,7 +265,7 @@ class Agent(BaseAgent): for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) if tool_result.get("result_as_answer", False): result = tool_result["result"] - event_bus.emit( + crewai_event_bus.emit( self, event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), ) diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index fee45b5ac..275a7aabf 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -22,7 +22,7 @@ from crewai.utilities.events import ( AgentExecutionErrorEvent, AgentExecutionStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededException, ) @@ -91,7 +91,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: if self.agent and self.task: - event_bus.emit( + crewai_event_bus.emit( self, event=AgentExecutionStartedEvent( agent=self.agent, @@ -193,7 +193,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): def _handle_unknown_error(self, exception: Exception) -> None: """Handle unknown errors by informing the user.""" if self.agent: - event_bus.emit( + crewai_event_bus.emit( self, event=AgentExecutionErrorEvent( agent=self.agent, task=self.task, error=str(exception) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 4aaedc7f4..cae8cbfa5 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -54,7 +54,7 @@ from crewai.utilities.events.crew_events import ( CrewTrainFailedEvent, CrewTrainStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -221,10 +221,6 @@ class Crew(BaseModel): default=None, description="Knowledge for the crew.", ) - listen_to_events: Optional[bool] = Field( - default=True, - description="Whether the crew should listen to events.", - ) @field_validator("id", mode="before") @classmethod @@ -518,7 +514,7 @@ class Crew(BaseModel): ) -> None: """Trains the crew for a given number of iterations.""" try: - event_bus.emit( + crewai_event_bus.emit( self, CrewTrainStartedEvent( crew_name=self.name or "crew", @@ -545,7 +541,7 @@ class Crew(BaseModel): agent_id=str(agent.role), trained_data=result.model_dump() ) - event_bus.emit( + crewai_event_bus.emit( self, CrewTrainCompletedEvent( crew_name=self.name or "crew", @@ -554,11 +550,9 @@ class Crew(BaseModel): ), ) except Exception as e: - event_bus.emit( + crewai_event_bus.emit( self, - CrewTrainFailedEvent( - error=str(e), crew_name=self.name or "crew" - ), + CrewTrainFailedEvent(error=str(e), crew_name=self.name or "crew"), ) self._logger.log("error", f"Training failed: {e}", color="red") CrewTrainingHandler(TRAINING_DATA_FILE).clear() @@ -575,7 +569,7 @@ class Crew(BaseModel): inputs = {} inputs = before_callback(inputs) - event_bus.emit( + crewai_event_bus.emit( self, CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs), ) @@ -629,11 +623,9 @@ class Crew(BaseModel): self.usage_metrics.add_usage_metrics(metric) return result except Exception as e: - event_bus.emit( + crewai_event_bus.emit( self, - CrewKickoffFailedEvent( - error=str(e), crew_name=self.name or "crew" - ), + CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"), ) raise @@ -983,7 +975,7 @@ class Crew(BaseModel): final_string_output = final_task_output.raw self._finish_execution(final_string_output) token_usage = self.calculate_usage_metrics() - event_bus.emit( + crewai_event_bus.emit( self, CrewKickoffCompletedEvent( crew_name=self.name or "crew", output=final_task_output @@ -1196,7 +1188,7 @@ class Crew(BaseModel): ) -> None: """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" try: - event_bus.emit( + crewai_event_bus.emit( self, CrewTestStartedEvent( crew_name=self.name or "crew", @@ -1214,18 +1206,16 @@ class Crew(BaseModel): evaluator.print_crew_evaluation_result() - event_bus.emit( + crewai_event_bus.emit( self, CrewTestCompletedEvent( crew_name=self.name or "crew", ), ) except Exception as e: - event_bus.emit( + crewai_event_bus.emit( self, - CrewTestFailedEvent( - error=str(e), crew_name=self.name or "crew" - ), + CrewTestFailedEvent(error=str(e), crew_name=self.name or "crew"), ) raise diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 873987c07..d4c574d76 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -30,7 +30,7 @@ from crewai.utilities.events import ( MethodExecutionFinishedEvent, MethodExecutionStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.events.flow_events import MethodExecutionFailedEvent from crewai.utilities.printer import Printer @@ -469,7 +469,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if kwargs: self._initialize_state(kwargs) - event_bus.emit( + crewai_event_bus.emit( self, FlowCreatedEvent( type="flow_created", @@ -744,7 +744,7 @@ class Flow(Generic[T], metaclass=FlowMeta): self._initialize_state(filtered_inputs) # Start flow execution - event_bus.emit( + crewai_event_bus.emit( self, FlowStartedEvent( type="flow_started", @@ -773,7 +773,7 @@ class Flow(Generic[T], metaclass=FlowMeta): final_output = self._method_outputs[-1] if self._method_outputs else None - event_bus.emit( + crewai_event_bus.emit( self, FlowFinishedEvent( type="flow_finished", @@ -810,8 +810,10 @@ class Flow(Generic[T], metaclass=FlowMeta): self, method_name: str, method: Callable, *args: Any, **kwargs: Any ) -> Any: try: - dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) - event_bus.emit( + dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | ( + kwargs or {} + ) + crewai_event_bus.emit( self, MethodExecutionStartedEvent( type="method_execution_started", @@ -822,19 +824,18 @@ class Flow(Generic[T], metaclass=FlowMeta): ), ) - result = ( await method(*args, **kwargs) if asyncio.iscoroutinefunction(method) else method(*args, **kwargs) ) - + self._method_outputs.append(result) self._method_execution_counts[method_name] = ( self._method_execution_counts.get(method_name, 0) + 1 ) - event_bus.emit( + crewai_event_bus.emit( self, MethodExecutionFinishedEvent( type="method_execution_finished", @@ -847,7 +848,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return result except Exception as e: - event_bus.emit( + crewai_event_bus.emit( self, MethodExecutionFailedEvent( type="method_execution_failed", @@ -1043,7 +1044,7 @@ class Flow(Generic[T], metaclass=FlowMeta): elif level == "warning": logger.warning(message) - def plot(self, filename: str = "crewai_flow") -> None: + def plot(self, filename: str = "crewai_flow") -> None: Telemetry().flow_plotting_span( self.__class__.__name__, list(self._methods.keys()) ) diff --git a/src/crewai/knowledge/storage/knowledge_storage.py b/src/crewai/knowledge/storage/knowledge_storage.py index 9e6ab8041..72240e2b6 100644 --- a/src/crewai/knowledge/storage/knowledge_storage.py +++ b/src/crewai/knowledge/storage/knowledge_storage.py @@ -76,7 +76,7 @@ class KnowledgeStorage(BaseKnowledgeStorage): "context": fetched["documents"][0][i], # type: ignore "score": fetched["distances"][0][i], # type: ignore } - if result["score"] >= score_threshold: # type: ignore + if result["score"] >= score_threshold: results.append(result) return results else: diff --git a/src/crewai/task.py b/src/crewai/task.py index 32fbdd1d3..b9e341e33 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -43,7 +43,7 @@ from crewai.utilities.events import ( TaskFailedEvent, TaskStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.i18n import I18N from crewai.utilities.printer import Printer @@ -364,7 +364,7 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - event_bus.emit(self, TaskStartedEvent(context=context)) + crewai_event_bus.emit(self, TaskStartedEvent(context=context)) result = agent.execute_task( task=self, context=context, @@ -440,11 +440,11 @@ class Task(BaseModel): else result ) self._save_file(content) - event_bus.emit(self, TaskCompletedEvent(output=task_output)) + crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output)) return task_output except Exception as e: self.end_time = datetime.datetime.now() - event_bus.emit(self, TaskFailedEvent(error=str(e))) + crewai_event_bus.emit(self, TaskFailedEvent(error=str(e))) raise e # Re-raise the exception after emitting the event def prompt(self) -> str: diff --git a/src/crewai/tools/tool_usage.py b/src/crewai/tools/tool_usage.py index 7c893feb6..95519e16a 100644 --- a/src/crewai/tools/tool_usage.py +++ b/src/crewai/tools/tool_usage.py @@ -22,7 +22,7 @@ from crewai.utilities.events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus OPENAI_BIGGER_MODELS = [ "gpt-4", @@ -138,8 +138,8 @@ class ToolUsage: tool: Any, calling: Union[ToolCalling, InstructorToolCalling], ) -> str: # TODO: Fix this return type - event_data = self._prepare_event_data(tool, calling) # type: ignore - event_bus.emit(self, ToolUsageStartedEvent(**event_data)) + event_data = self._prepare_event_data(tool, calling) # type: ignore + crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data)) if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) try: result = self._i18n.errors("task_repeated_usage").format( @@ -458,7 +458,7 @@ class ToolUsage: def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None: event_data = self._prepare_event_data(tool, tool_calling) - event_bus.emit( + crewai_event_bus.emit( self, event=ToolUsageErrorEvent(**{**event_data, "error": e}) ) @@ -474,7 +474,7 @@ class ToolUsage: "from_cache": from_cache, } ) - event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data)) + crewai_event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data)) def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict: return { diff --git a/src/crewai/utilities/evaluators/task_evaluator.py b/src/crewai/utilities/evaluators/task_evaluator.py index 4b8975187..2e9907bd7 100644 --- a/src/crewai/utilities/evaluators/task_evaluator.py +++ b/src/crewai/utilities/evaluators/task_evaluator.py @@ -3,7 +3,7 @@ from typing import List from pydantic import BaseModel, Field from crewai.utilities import Converter -from crewai.utilities.events import TaskEvaluationEvent, event_bus +from crewai.utilities.events import TaskEvaluationEvent, crewai_event_bus from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser @@ -44,7 +44,9 @@ class TaskEvaluator: self.original_agent = original_agent def evaluate(self, task, output) -> TaskEvaluation: - event_bus.emit(self, TaskEvaluationEvent(evaluation_type="task_evaluation")) + crewai_event_bus.emit( + self, TaskEvaluationEvent(evaluation_type="task_evaluation") + ) evaluation_query = ( f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n" f"Task Description:\n{task.description}\n\n" @@ -81,7 +83,9 @@ class TaskEvaluator: - training_data (dict): The training data to be evaluated. - agent_id (str): The ID of the agent. """ - event_bus.emit(self, TaskEvaluationEvent(evaluation_type="training_data_evaluation")) + crewai_event_bus.emit( + self, TaskEvaluationEvent(evaluation_type="training_data_evaluation") + ) output_training_data = training_data[agent_id] final_aggregated_data = "" diff --git a/src/crewai/utilities/events/__init__.py b/src/crewai/utilities/events/__init__.py index f7d1dd2f5..24ad3121c 100644 --- a/src/crewai/utilities/events/__init__.py +++ b/src/crewai/utilities/events/__init__.py @@ -23,7 +23,7 @@ from .flow_events import ( MethodExecutionFinishedEvent, MethodExecutionFailedEvent, ) -from .event_bus import EventBus, event_bus +from .crewai_event_bus import CrewAIEventsBus, crewai_event_bus from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent # events diff --git a/src/crewai/utilities/events/base_event_listener.py b/src/crewai/utilities/events/base_event_listener.py index 621570a74..37763dcc1 100644 --- a/src/crewai/utilities/events/base_event_listener.py +++ b/src/crewai/utilities/events/base_event_listener.py @@ -1,14 +1,14 @@ from abc import ABC, abstractmethod from logging import Logger -from crewai.utilities.events.event_bus import EventBus, event_bus +from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus, crewai_event_bus class BaseEventListener(ABC): def __init__(self): super().__init__() - self.setup_listeners(event_bus) + self.setup_listeners(crewai_event_bus) @abstractmethod - def setup_listeners(self, event_bus: EventBus): + def setup_listeners(self, crewai_event_bus: CrewAIEventsBus): pass diff --git a/src/crewai/utilities/events/event_bus.py b/src/crewai/utilities/events/event_bus.py deleted file mode 100644 index 791c28ba4..000000000 --- a/src/crewai/utilities/events/event_bus.py +++ /dev/null @@ -1,114 +0,0 @@ -import threading -from contextlib import contextmanager -from typing import Any, Callable, Dict, List, Type, TypeVar, cast - -from blinker import Signal - -from crewai.utilities.events.crew_events import CrewEvent - -EventT = TypeVar("EventT", bound=CrewEvent) - - -class EventBus: - """ - A singleton event bus that uses blinker signals for event handling. - Allows both internal (Flow/Crew) and external event handling. - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: # prevent race condition - cls._instance = super(EventBus, cls).__new__(cls) - cls._instance._initialize() - return cls._instance - - def _initialize(self) -> None: - """Initialize the event bus internal state""" - self._signal = Signal("event_bus") - self._handlers: Dict[ - Type[CrewEvent], List[Callable[[Any, CrewEvent], None]] - ] = {} - - def on( - self, event_type: Type[EventT] - ) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]: - """ - Decorator to register an event handler for a specific event type. - - Usage: - @event_bus.on(AgentExecutionCompletedEvent) - def on_agent_execution_completed( - source: Any, event: AgentExecutionCompletedEvent - ): - print(f"👍 Agent '{event.agent}' completed task") - print(f" Output: {event.output}") - """ - - def decorator( - handler: Callable[[Any, EventT], None], - ) -> Callable[[Any, EventT], None]: - if event_type not in self._handlers: - self._handlers[event_type] = [] - self._handlers[event_type].append( - cast(Callable[[Any, CrewEvent], None], handler) - ) - return handler - - return decorator - - def emit(self, source: Any, event: CrewEvent) -> None: - """ - Emit an event to all registered handlers - - Args: - source: The object emitting the event - event: The event instance to emit - """ - event_type = type(event) - if event_type in self._handlers: - for handler in self._handlers[event_type]: - handler(source, event) - self._signal.send(source, event=event) - - def clear_handlers(self) -> None: - """Clear all registered event handlers - useful for testing""" - self._handlers.clear() - - def register_handler( - self, event_type: Type[EventT], handler: Callable[[Any, EventT], None] - ) -> None: - """Register an event handler for a specific event type""" - if event_type not in self._handlers: - self._handlers[event_type] = [] - self._handlers[event_type].append( - cast(Callable[[Any, CrewEvent], None], handler) - ) - - @contextmanager - def scoped_handlers(self): - """ - Context manager for temporary event handling scope. - Useful for testing or temporary event handling. - - Usage: - with event_bus.scoped_handlers(): - @event_bus.on(CrewKickoffStarted) - def temp_handler(source, event): - print("Temporary handler") - # Do stuff... - # Handlers are cleared after the context - """ - previous_handlers = self._handlers.copy() - self._handlers.clear() - try: - yield - finally: - self._handlers = previous_handlers - - -# Global instance -event_bus = EventBus() diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index 3906823b0..b87729c86 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -43,8 +43,8 @@ class EventListener(BaseEventListener): self._telemetry.set_tracer() # Crew Events: kickoff, test, train - def setup_listeners(self, event_bus): - @event_bus.on(CrewKickoffStartedEvent) + def setup_listeners(self, crewai_event_bus): + @crewai_event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source, event: CrewKickoffStartedEvent): self.logger.log( f"🚀 Crew '{event.crew_name}' started", @@ -53,7 +53,7 @@ class EventListener(BaseEventListener): ) self._telemetry.crew_execution_span(source, event.inputs) - @event_bus.on(CrewKickoffCompletedEvent) + @crewai_event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event: CrewKickoffCompletedEvent): final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) @@ -63,7 +63,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewKickoffFailedEvent) + @crewai_event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event: CrewKickoffFailedEvent): self.logger.log( f"❌ Crew '{event.crew_name}' failed", @@ -71,7 +71,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTestStartedEvent) + @crewai_event_bus.on(CrewTestStartedEvent) def on_crew_test_started(source, event: CrewTestStartedEvent): cloned_crew = source.copy() cloned_crew._telemetry.test_execution_span( @@ -86,7 +86,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTestCompletedEvent) + @crewai_event_bus.on(CrewTestCompletedEvent) def on_crew_test_completed(source, event: CrewTestCompletedEvent): self.logger.log( f"✅ Crew '{event.crew_name}' completed test", @@ -94,7 +94,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTestFailedEvent) + @crewai_event_bus.on(CrewTestFailedEvent) def on_crew_test_failed(source, event: CrewTestFailedEvent): self.logger.log( f"❌ Crew '{event.crew_name}' failed test", @@ -102,7 +102,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTrainStartedEvent) + @crewai_event_bus.on(CrewTrainStartedEvent) def on_crew_train_started(source, event: CrewTrainStartedEvent): self.logger.log( f"📋 Crew '{event.crew_name}' started train", @@ -110,7 +110,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTrainCompletedEvent) + @crewai_event_bus.on(CrewTrainCompletedEvent) def on_crew_train_completed(source, event: CrewTrainCompletedEvent): self.logger.log( f"✅ Crew '{event.crew_name}' completed train", @@ -118,15 +118,15 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(CrewTrainFailedEvent) + @crewai_event_bus.on(CrewTrainFailedEvent) def on_crew_train_failed(source, event: CrewTrainFailedEvent): self.logger.log( f"❌ Crew '{event.crew_name}' failed train", event.timestamp, color=self.color, ) - - @event_bus.on(TaskStartedEvent) + + @crewai_event_bus.on(TaskStartedEvent) def on_task_started(source, event: TaskStartedEvent): source._execution_span = self._telemetry.task_started( crew=source.agent.crew, task=source @@ -136,25 +136,27 @@ class EventListener(BaseEventListener): event.timestamp, color=self.color, ) - - - - @event_bus.on(TaskCompletedEvent) + + @crewai_event_bus.on(TaskCompletedEvent) def on_task_completed(source, event): if source._execution_span: - self._telemetry.task_ended(source._execution_span, source, source.agent.crew) + self._telemetry.task_ended( + source._execution_span, source, source.agent.crew + ) self.logger.log( f"✅ Task completed: {source.description}", event.timestamp, color=self.color, ) source._execution_span = None - - @event_bus.on(TaskFailedEvent) + + @crewai_event_bus.on(TaskFailedEvent) def on_task_failed(source, event: TaskFailedEvent): if source._execution_span: if source.agent and source.agent.crew: - self._telemetry.task_ended(source._execution_span, source, source.agent.crew) + self._telemetry.task_ended( + source._execution_span, source, source.agent.crew + ) source._execution_span = None self.logger.log( f"❌ Task failed: {source.description}", @@ -162,8 +164,7 @@ class EventListener(BaseEventListener): color=self.color, ) - - @event_bus.on(AgentExecutionStartedEvent) + @crewai_event_bus.on(AgentExecutionStartedEvent) def on_agent_execution_started(source, event: AgentExecutionStartedEvent): self.logger.log( f"🤖 Agent '{event.agent.role}' started task", @@ -171,7 +172,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(AgentExecutionCompletedEvent) + @crewai_event_bus.on(AgentExecutionCompletedEvent) def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): self.logger.log( f"✅ Agent '{event.agent.role}' completed task", @@ -180,8 +181,8 @@ class EventListener(BaseEventListener): ) # Flow Events - - @event_bus.on(FlowCreatedEvent) + + @crewai_event_bus.on(FlowCreatedEvent) def on_flow_created(source, event: FlowCreatedEvent): self._telemetry.flow_creation_span(self.__class__.__name__) self.logger.log( @@ -189,8 +190,8 @@ class EventListener(BaseEventListener): event.timestamp, color=self.color, ) - - @event_bus.on(FlowStartedEvent) + + @crewai_event_bus.on(FlowStartedEvent) def on_flow_started(source, event: FlowStartedEvent): self._telemetry.flow_execution_span( source.__class__.__name__, list(source._methods.keys()) @@ -201,7 +202,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(FlowFinishedEvent) + @crewai_event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event: FlowFinishedEvent): self.logger.log( f"👍 Flow Finished: '{event.flow_name}'", @@ -209,7 +210,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(MethodExecutionStartedEvent) + @crewai_event_bus.on(MethodExecutionStartedEvent) def on_method_execution_started(source, event: MethodExecutionStartedEvent): self.logger.log( f"🤖 Flow Method Started: '{event.method_name}'", @@ -217,7 +218,7 @@ class EventListener(BaseEventListener): color=self.color, ) - @event_bus.on(MethodExecutionFailedEvent) + @crewai_event_bus.on(MethodExecutionFailedEvent) def on_method_execution_failed(source, event: MethodExecutionFailedEvent): self.logger.log( f"❌ Flow Method Failed: '{event.method_name}'", @@ -225,33 +226,32 @@ class EventListener(BaseEventListener): color=self.color, ) - - @event_bus.on(MethodExecutionFinishedEvent) + @crewai_event_bus.on(MethodExecutionFinishedEvent) def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): self.logger.log( f"👍 Flow Method Finished: '{event.method_name}'", event.timestamp, color=self.color, ) - + # Tool Usage Events - @event_bus.on(ToolUsageStartedEvent) + @crewai_event_bus.on(ToolUsageStartedEvent) def on_tool_usage_started(source, event: ToolUsageStartedEvent): self.logger.log( f"🤖 Tool Usage Started: '{event.tool_name}'", event.timestamp, color=self.color, ) - - @event_bus.on(ToolUsageFinishedEvent) + + @crewai_event_bus.on(ToolUsageFinishedEvent) def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): self.logger.log( f"✅ Tool Usage Finished: '{event.tool_name}'", event.timestamp, color=self.color, ) - - @event_bus.on(ToolUsageErrorEvent) + + @crewai_event_bus.on(ToolUsageErrorEvent) def on_tool_usage_error(source, event: ToolUsageErrorEvent): self.logger.log( f"❌ Tool Usage Error: '{event.tool_name}'", diff --git a/src/crewai/utilities/events/third_party/agentops_listener.py b/src/crewai/utilities/events/third_party/agentops_listener.py index b4fcf841f..294a820ee 100644 --- a/src/crewai/utilities/events/third_party/agentops_listener.py +++ b/src/crewai/utilities/events/third_party/agentops_listener.py @@ -11,54 +11,57 @@ from crewai.utilities.events.task_events import TaskEvaluationEvent try: import agentops - from agentops import Client + AGENTOPS_INSTALLED = True except ImportError: AGENTOPS_INSTALLED = False - + class AgentOpsListener(BaseEventListener): tool_event: Optional["agentops.ToolEvent"] = None session: Optional["agentops.Session"] = None - + def __init__(self): super().__init__() - def setup_listeners(self, event_bus): - if not AGENTOPS_INSTALLED: - return - - @event_bus.on(CrewKickoffStartedEvent) - def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent): - self.session = agentops.init() - for agent in source.agents: - if self.session: - self.session.create_agent( - name=agent.role, - agent_id=str(agent.id), - ) - - @event_bus.on(CrewKickoffCompletedEvent) - def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent): + def setup_listeners(self, crewai_event_bus): + if not AGENTOPS_INSTALLED: + return + + @crewai_event_bus.on(CrewKickoffStartedEvent) + def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent): + self.session = agentops.init() + for agent in source.agents: + if self.session: + self.session.create_agent( + name=agent.role, + agent_id=str(agent.id), + ) + + @crewai_event_bus.on(CrewKickoffCompletedEvent) + def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent): if self.session: self.session.end_session( end_state="Success", end_state_reason="Finished Execution", ) - - @event_bus.on(ToolUsageStartedEvent) - def on_tool_usage_started(source, event: ToolUsageStartedEvent): - self.tool_event = agentops.ToolEvent(name=event.tool_name) - if self.session: - self.session.record(self.tool_event) - - @event_bus.on(ToolUsageErrorEvent) - def on_tool_usage_error(source, event: ToolUsageErrorEvent): - agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event) - - @event_bus.on(TaskEvaluationEvent) - def on_task_evaluation(source, event: TaskEvaluationEvent): - if self.session: - self.session.create_agent(name="Task Evaluator", agent_id=str(source.original_agent.id)) + + @crewai_event_bus.on(ToolUsageStartedEvent) + def on_tool_usage_started(source, event: ToolUsageStartedEvent): + self.tool_event = agentops.ToolEvent(name=event.tool_name) + if self.session: + self.session.record(self.tool_event) + + @crewai_event_bus.on(ToolUsageErrorEvent) + def on_tool_usage_error(source, event: ToolUsageErrorEvent): + agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event) + + @crewai_event_bus.on(TaskEvaluationEvent) + def on_task_evaluation(source, event: TaskEvaluationEvent): + if self.session: + self.session.create_agent( + name="Task Evaluator", agent_id=str(source.original_agent.id) + ) + agentops_listener = AgentOpsListener() diff --git a/tests/agent_test.py b/tests/agent_test.py index 2301ad452..873010eb4 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -17,7 +17,7 @@ from crewai.tools import tool from crewai.tools.tool_calling import InstructorToolCalling from crewai.tools.tool_usage import ToolUsage from crewai.utilities import RPMController -from crewai.utilities.events import event_bus +from crewai.utilities.events import crewai_event_bus from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent @@ -156,11 +156,10 @@ def test_agent_execution_with_tools(): ) received_events = [] - @event_bus.on(ToolUsageFinishedEvent) + @crewai_event_bus.on(ToolUsageFinishedEvent) def handle_tool_end(source, event): received_events.append(event) - # with patch.object(EventBus, "emit") as emit: output = agent.execute_task(task) assert output == "The result of the multiplication is 12." @@ -256,7 +255,7 @@ def test_cache_hitting(): } received_events = [] - @event_bus.on(ToolUsageFinishedEvent) + @crewai_event_bus.on(ToolUsageFinishedEvent) def handle_tool_end(source, event): received_events.append(event) diff --git a/tests/crew_test.py b/tests/crew_test.py index 7e8c8e582..f15f5ac47 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -25,7 +25,7 @@ from crewai.utilities import Logger from crewai.utilities.events import ( CrewTrainCompletedEvent, CrewTrainStartedEvent, - event_bus, + crewai_event_bus, ) from crewai.utilities.events.crew_events import ( CrewTestCompletedEvent, @@ -860,7 +860,7 @@ def test_crew_verbose_output(capsys): "[🤖 AGENT 'SENIOR WRITER' STARTED TASK]", "[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]", "[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]", - "[✅ CREW 'CREW' COMPLETED]" + "[✅ CREW 'CREW' COMPLETED]", ] captured = capsys.readouterr() for log in expected_listener_logs: @@ -2590,11 +2590,11 @@ def test_crew_train_success( received_events = [] - @event_bus.on(CrewTrainStartedEvent) + @crewai_event_bus.on(CrewTrainStartedEvent) def on_crew_train_started(source, event: CrewTrainStartedEvent): received_events.append(event) - @event_bus.on(CrewTrainCompletedEvent) + @crewai_event_bus.on(CrewTrainCompletedEvent) def on_crew_train_completed(source, event: CrewTrainCompletedEvent): received_events.append(event) @@ -3378,11 +3378,11 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator): received_events = [] - @event_bus.on(CrewTestStartedEvent) + @crewai_event_bus.on(CrewTestStartedEvent) def on_crew_test_started(source, event: CrewTestStartedEvent): received_events.append(event) - @event_bus.on(CrewTestCompletedEvent) + @crewai_event_bus.on(CrewTestCompletedEvent) def on_crew_test_completed(source, event: CrewTestCompletedEvent): received_events.append(event) diff --git a/tests/flow_test.py b/tests/flow_test.py index 7114d55ee..7e32a9c4c 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -12,7 +12,7 @@ from crewai.utilities.events import ( FlowStartedEvent, MethodExecutionFinishedEvent, MethodExecutionStartedEvent, - event_bus, + crewai_event_bus, ) @@ -440,15 +440,15 @@ def test_unstructured_flow_event_emission(): flow = PoemFlow() received_events = [] - @event_bus.on(FlowStartedEvent) + @crewai_event_bus.on(FlowStartedEvent) def handle_flow_start(source, event): received_events.append(event) - @event_bus.on(MethodExecutionStartedEvent) + @crewai_event_bus.on(MethodExecutionStartedEvent) def handle_method_start(source, event): received_events.append(event) - @event_bus.on(FlowFinishedEvent) + @crewai_event_bus.on(FlowFinishedEvent) def handle_flow_end(source, event): received_events.append(event) @@ -519,19 +519,19 @@ def test_structured_flow_event_emission(): received_events = [] - @event_bus.on(FlowStartedEvent) + @crewai_event_bus.on(FlowStartedEvent) def handle_flow_start(source, event): received_events.append(event) - @event_bus.on(MethodExecutionStartedEvent) + @crewai_event_bus.on(MethodExecutionStartedEvent) def handle_method_start(source, event): received_events.append(event) - @event_bus.on(MethodExecutionFinishedEvent) + @crewai_event_bus.on(MethodExecutionFinishedEvent) def handle_method_end(source, event): received_events.append(event) - @event_bus.on(FlowFinishedEvent) + @crewai_event_bus.on(FlowFinishedEvent) def handle_flow_end(source, event): received_events.append(event) @@ -585,19 +585,19 @@ def test_stateless_flow_event_emission(): flow = StatelessFlow() received_events = [] - @event_bus.on(FlowStartedEvent) + @crewai_event_bus.on(FlowStartedEvent) def handle_flow_start(source, event): received_events.append(event) - @event_bus.on(MethodExecutionStartedEvent) + @crewai_event_bus.on(MethodExecutionStartedEvent) def handle_method_start(source, event): received_events.append(event) - @event_bus.on(MethodExecutionFinishedEvent) + @crewai_event_bus.on(MethodExecutionFinishedEvent) def handle_method_end(source, event): received_events.append(event) - @event_bus.on(FlowFinishedEvent) + @crewai_event_bus.on(FlowFinishedEvent) def handle_flow_end(source, event): received_events.append(event) diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 7fc9c264b..954edeefa 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -20,7 +20,7 @@ from crewai.utilities.events.crew_events import ( CrewKickoffFailedEvent, CrewKickoffStartedEvent, ) -from crewai.utilities.events.event_bus import event_bus +from crewai.utilities.events.crewai_event_bus import crewai_event_bus from crewai.utilities.events.event_types import ToolUsageFinishedEvent from crewai.utilities.events.flow_events import ( FlowCreatedEvent, @@ -54,9 +54,9 @@ base_task = Task( def test_crew_emits_start_kickoff_event(): received_events = [] - with event_bus.scoped_handlers(): + with crewai_event_bus.scoped_handlers(): - @event_bus.on(CrewKickoffStartedEvent) + @crewai_event_bus.on(CrewKickoffStartedEvent) def handle_crew_start(source, event): received_events.append(event) @@ -74,7 +74,7 @@ def test_crew_emits_start_kickoff_event(): def test_crew_emits_end_kickoff_event(): received_events = [] - @event_bus.on(CrewKickoffCompletedEvent) + @crewai_event_bus.on(CrewKickoffCompletedEvent) def handle_crew_end(source, event): received_events.append(event) @@ -92,9 +92,9 @@ def test_crew_emits_end_kickoff_event(): def test_crew_emits_kickoff_failed_event(): received_events = [] - with event_bus.scoped_handlers(): + with crewai_event_bus.scoped_handlers(): - @event_bus.on(CrewKickoffFailedEvent) + @crewai_event_bus.on(CrewKickoffFailedEvent) def handle_crew_failed(source, event): received_events.append(event) @@ -117,7 +117,7 @@ def test_crew_emits_kickoff_failed_event(): def test_crew_emits_start_task_event(): received_events = [] - @event_bus.on(TaskStartedEvent) + @crewai_event_bus.on(TaskStartedEvent) def handle_task_start(source, event): received_events.append(event) @@ -134,7 +134,7 @@ def test_crew_emits_start_task_event(): def test_crew_emits_end_task_event(): received_events = [] - @event_bus.on(TaskCompletedEvent) + @crewai_event_bus.on(TaskCompletedEvent) def handle_task_end(source, event): received_events.append(event) @@ -152,25 +152,28 @@ def test_task_emits_failed_event_on_execution_error(): received_events = [] received_sources = [] - @event_bus.on(TaskFailedEvent) + @crewai_event_bus.on(TaskFailedEvent) def handle_task_failed(source, event): received_events.append(event) received_sources.append(source) - - with patch.object(Task, "_execute_core",) as mock_execute: + + with patch.object( + Task, + "_execute_core", + ) as mock_execute: error_message = "Simulated task failure" mock_execute.side_effect = Exception(error_message) agent = Agent( role="base_agent", goal="Just say hi", backstory="You are a helpful assistant that just says hi", - ) + ) task = Task( description="Just say hi", expected_output="hi", agent=agent, ) - + with pytest.raises(Exception): agent.execute_task(task=task) @@ -185,11 +188,11 @@ def test_task_emits_failed_event_on_execution_error(): def test_agent_emits_execution_started_and_completed_events(): received_events = [] - @event_bus.on(AgentExecutionStartedEvent) + @crewai_event_bus.on(AgentExecutionStartedEvent) def handle_agent_start(source, event): received_events.append(event) - @event_bus.on(AgentExecutionCompletedEvent) + @crewai_event_bus.on(AgentExecutionCompletedEvent) def handle_agent_completed(source, event): received_events.append(event) @@ -219,7 +222,7 @@ def test_agent_emits_execution_started_and_completed_events(): def test_agent_emits_execution_error_event(): received_events = [] - @event_bus.on(AgentExecutionErrorEvent) + @crewai_event_bus.on(AgentExecutionErrorEvent) def handle_agent_start(source, event): received_events.append(event) @@ -257,7 +260,7 @@ class SayHiTool(BaseTool): def test_tools_emits_finished_events(): received_events = [] - @event_bus.on(ToolUsageFinishedEvent) + @crewai_event_bus.on(ToolUsageFinishedEvent) def handle_tool_end(source, event): received_events.append(event) @@ -288,7 +291,7 @@ def test_tools_emits_finished_events(): def test_tools_emits_error_events(): received_events = [] - @event_bus.on(ToolUsageErrorEvent) + @crewai_event_bus.on(ToolUsageErrorEvent) def handle_tool_end(source, event): received_events.append(event) @@ -333,9 +336,9 @@ def test_tools_emits_error_events(): def test_flow_emits_start_event(): received_events = [] - with event_bus.scoped_handlers(): + with crewai_event_bus.scoped_handlers(): - @event_bus.on(FlowStartedEvent) + @crewai_event_bus.on(FlowStartedEvent) def handle_flow_start(source, event): received_events.append(event) @@ -355,9 +358,9 @@ def test_flow_emits_start_event(): def test_flow_emits_finish_event(): received_events = [] - with event_bus.scoped_handlers(): + with crewai_event_bus.scoped_handlers(): - @event_bus.on(FlowFinishedEvent) + @crewai_event_bus.on(FlowFinishedEvent) def handle_flow_finish(source, event): received_events.append(event) @@ -379,9 +382,9 @@ def test_flow_emits_finish_event(): def test_flow_emits_method_execution_started_event(): received_events = [] - with event_bus.scoped_handlers(): + with crewai_event_bus.scoped_handlers(): - @event_bus.on(MethodExecutionStartedEvent) + @crewai_event_bus.on(MethodExecutionStartedEvent) def handle_method_start(source, event): print("event in method name", event.method_name) received_events.append(event) @@ -416,8 +419,8 @@ def test_register_handler_adds_new_handler(): def custom_handler(source, event): received_events.append(event) - with event_bus.scoped_handlers(): - event_bus.register_handler(CrewKickoffStartedEvent, custom_handler) + with crewai_event_bus.scoped_handlers(): + crewai_event_bus.register_handler(CrewKickoffStartedEvent, custom_handler) crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew.kickoff() @@ -438,9 +441,9 @@ def test_multiple_handlers_for_same_event(): def handler_2(source, event): received_events_2.append(event) - with event_bus.scoped_handlers(): - event_bus.register_handler(CrewKickoffStartedEvent, handler_1) - event_bus.register_handler(CrewKickoffStartedEvent, handler_2) + with crewai_event_bus.scoped_handlers(): + crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_1) + crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_2) crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew.kickoff() @@ -454,7 +457,7 @@ def test_multiple_handlers_for_same_event(): def test_flow_emits_created_event(): received_events = [] - @event_bus.on(FlowCreatedEvent) + @crewai_event_bus.on(FlowCreatedEvent) def handle_flow_created(source, event): received_events.append(event) @@ -475,7 +478,7 @@ def test_flow_emits_method_execution_failed_event(): received_events = [] error = Exception("Simulated method failure") - @event_bus.on(MethodExecutionFailedEvent) + @crewai_event_bus.on(MethodExecutionFailedEvent) def handle_method_failed(source, event): received_events.append(event)