From 779db3c3dd1ffa20ff6e7acbb55fca573d1e453b Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Wed, 12 Feb 2025 16:17:52 -0800 Subject: [PATCH] Refactor event classes to improve type safety and naming consistency - Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent) - Update import statements and references across multiple files - Remove deprecated events.py module - Enhance event type hints and configurations - Clean up unnecessary event-related code --- src/crewai/agent.py | 9 +- src/crewai/agents/agent_builder/base_agent.py | 3 +- src/crewai/agents/crew_agent_executor.py | 10 +-- src/crewai/crew.py | 37 +++++---- src/crewai/flow/flow.py | 19 +++-- src/crewai/flow/flow_events.py | 39 --------- src/crewai/task.py | 12 ++- src/crewai/tools/tool_usage.py | 10 ++- src/crewai/utilities/events/__init__.py | 57 +++++++------ src/crewai/utilities/events/agent_events.py | 6 +- src/crewai/utilities/events/crew_events.py | 18 ++-- src/crewai/utilities/events/event_bus.py | 6 +- src/crewai/utilities/events/event_listener.py | 71 ++++++++-------- src/crewai/utilities/events/event_types.py | 82 +++++++++---------- src/crewai/utilities/events/events.py | 53 ------------ src/crewai/utilities/events/flow_events.py | 24 ++++-- src/crewai/utilities/events/task_events.py | 6 +- .../events/third_party/agentops_listener.py | 6 +- .../utilities/events/tool_usage_events.py | 4 +- tests/utilities/test_events.py | 62 +++++++------- 20 files changed, 238 insertions(+), 296 deletions(-) delete mode 100644 src/crewai/flow/flow_events.py delete mode 100644 src/crewai/utilities/events/events.py diff --git a/src/crewai/agent.py b/src/crewai/agent.py index ad99ebdcc..83fa1ffc3 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -20,8 +20,8 @@ from crewai.utilities import Converter, Prompts from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.converter import generate_model_description from crewai.utilities.events.agent_events import ( - AgentExecutionCompleted, - AgentExecutionError, + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, ) from crewai.utilities.events.event_bus import event_bus from crewai.utilities.llm_utils import create_llm @@ -256,7 +256,7 @@ class Agent(BaseAgent): except Exception as e: event_bus.emit( self, - event=AgentExecutionError( + event=AgentExecutionErrorEvent( agent=self, task=task, error=str(e), @@ -280,7 +280,8 @@ class Agent(BaseAgent): if tool_result.get("result_as_answer", False): result = tool_result["result"] event_bus.emit( - self, event=AgentExecutionCompleted(agent=self, task=task, output=result) + self, + event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), ) return result diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index 14f4a32e0..64110c2ae 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -20,8 +20,7 @@ from crewai.agents.cache.cache_handler import CacheHandler from crewai.agents.tools_handler import ToolsHandler from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource -from crewai.tools.base_tool import BaseTool -from crewai.tools.base_tool import Tool +from crewai.tools.base_tool import BaseTool, Tool from crewai.utilities import I18N, Logger, RPMController from crewai.utilities.config import process_config from crewai.utilities.converter import Converter diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 3dc17001b..fee45b5ac 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -18,9 +18,9 @@ from crewai.tools.base_tool import BaseTool from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.utilities import I18N, Printer from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE -from crewai.utilities.events.agent_events import ( - AgentExecutionError, - AgentExecutionStarted, +from crewai.utilities.events import ( + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, ) from crewai.utilities.events.event_bus import event_bus from crewai.utilities.exceptions.context_window_exceeding_exception import ( @@ -93,7 +93,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if self.agent and self.task: event_bus.emit( self, - event=AgentExecutionStarted( + event=AgentExecutionStartedEvent( agent=self.agent, tools=self.tools, inputs=inputs, @@ -195,7 +195,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): if self.agent: event_bus.emit( self, - event=AgentExecutionError( + event=AgentExecutionErrorEvent( agent=self.agent, task=self.task, error=str(exception) ), ) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index e6186825c..20b2b76df 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -44,15 +44,15 @@ from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.events.crew_events import ( - CrewKickoffCompleted, - CrewKickoffFailed, - CrewKickoffStarted, - CrewTestCompleted, - CrewTestFailed, - CrewTestStarted, - CrewTrainCompleted, - CrewTrainFailed, - CrewTrainStarted, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewKickoffStartedEvent, + CrewTestCompletedEvent, + CrewTestFailedEvent, + CrewTestStartedEvent, + CrewTrainCompletedEvent, + CrewTrainFailedEvent, + CrewTrainStartedEvent, ) from crewai.utilities.events.event_bus import event_bus from crewai.utilities.formatter import ( @@ -526,7 +526,7 @@ class Crew(BaseModel): try: event_bus.emit( self, - CrewTrainStarted( + CrewTrainStartedEvent( crew_name=self.name or "crew", n_iterations=n_iterations, filename=filename, @@ -553,14 +553,14 @@ class Crew(BaseModel): event_bus.emit( self, - CrewTrainCompleted( + CrewTrainCompletedEvent( crew_name=self.name or "crew", n_iterations=n_iterations, filename=filename, ), ) except Exception as e: - event_bus.emit(self, CrewTrainFailed(error=str(e))) + event_bus.emit(self, CrewTrainFailedEvent(error=str(e))) self._logger.log("error", f"Training failed: {e}", color="red") CrewTrainingHandler(TRAINING_DATA_FILE).clear() CrewTrainingHandler(filename).clear() @@ -577,7 +577,8 @@ class Crew(BaseModel): inputs = before_callback(inputs) event_bus.emit( - self, CrewKickoffStarted(crew_name=self.name or "crew", inputs=inputs) + self, + CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs), ) """Starts the crew to work on its assigned tasks.""" @@ -629,7 +630,7 @@ class Crew(BaseModel): self.usage_metrics.add_usage_metrics(metric) return result except Exception as e: - event_bus.emit(self, CrewKickoffFailed(error=str(e))) + event_bus.emit(self, CrewKickoffFailedEvent(error=str(e))) raise def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: @@ -980,7 +981,7 @@ class Crew(BaseModel): token_usage = self.calculate_usage_metrics() event_bus.emit( self, - CrewKickoffCompleted( + CrewKickoffCompletedEvent( crew_name=self.name or "crew", output=final_task_output ), ) @@ -1200,7 +1201,7 @@ class Crew(BaseModel): try: event_bus.emit( self, - CrewTestStarted( + CrewTestStartedEvent( crew_name=self.name or "crew", n_iterations=n_iterations, openai_model_name=openai_model_name, @@ -1218,12 +1219,12 @@ class Crew(BaseModel): event_bus.emit( self, - CrewTestCompleted( + CrewTestCompletedEvent( crew_name=self.name or "crew", ), ) except Exception as e: - event_bus.emit(self, CrewTestFailed(error=str(e))) + event_bus.emit(self, CrewTestFailedEvent(error=str(e))) raise def __repr__(self): diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 548c24e8f..9b2a07540 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -25,10 +25,10 @@ from crewai.flow.persistence.base import FlowPersistence from crewai.flow.utils import get_possible_return_constants from crewai.telemetry import Telemetry from crewai.utilities.events import ( - FlowFinished, - FlowStarted, - MethodExecutionFinished, - MethodExecutionStarted, + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, ) from crewai.utilities.events.event_bus import event_bus from crewai.utilities.printer import Printer @@ -741,7 +741,7 @@ class Flow(Generic[T], metaclass=FlowMeta): # Start flow execution event_bus.emit( self, - FlowStarted( + FlowStartedEvent( type="flow_started", flow_name=self.__class__.__name__, inputs=inputs, @@ -774,7 +774,7 @@ class Flow(Generic[T], metaclass=FlowMeta): event_bus.emit( self, - FlowFinished( + FlowFinishedEvent( type="flow_finished", flow_name=self.__class__.__name__, result=final_output, @@ -982,10 +982,11 @@ class Flow(Generic[T], metaclass=FlowMeta): event_bus.emit( self, - MethodExecutionStarted( + MethodExecutionStartedEvent( type="method_execution_started", method_name=listener_name, flow_name=self.__class__.__name__, + state=self._copy_state(), ), ) @@ -1002,10 +1003,12 @@ class Flow(Generic[T], metaclass=FlowMeta): event_bus.emit( self, - MethodExecutionFinished( + MethodExecutionFinishedEvent( type="method_execution_finished", method_name=listener_name, flow_name=self.__class__.__name__, + state=self._copy_state(), + result=listener_result, ), ) diff --git a/src/crewai/flow/flow_events.py b/src/crewai/flow/flow_events.py deleted file mode 100644 index c8f9e9694..000000000 --- a/src/crewai/flow/flow_events.py +++ /dev/null @@ -1,39 +0,0 @@ -from dataclasses import dataclass, field -from datetime import datetime -from typing import Any, Dict, Optional, Union - -from pydantic import BaseModel - - -@dataclass -class Event: - type: str - flow_name: str - timestamp: datetime = field(init=False) - - def __post_init__(self): - self.timestamp = datetime.now() - - -@dataclass -class FlowStartedEvent(Event): - inputs: Optional[Dict[str, Any]] = None - - -@dataclass -class MethodExecutionStartedEvent(Event): - method_name: str - state: Union[Dict[str, Any], BaseModel] - params: Optional[Dict[str, Any]] = None - - -@dataclass -class MethodExecutionFinishedEvent(Event): - method_name: str - state: Union[Dict[str, Any], BaseModel] - result: Any = None - - -@dataclass -class FlowFinishedEvent(Event): - result: Optional[Any] = None diff --git a/src/crewai/task.py b/src/crewai/task.py index e13f62773..bbc7b2ace 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -41,7 +41,11 @@ from crewai.tools.base_tool import BaseTool from crewai.utilities.config import process_config from crewai.utilities.converter import Converter, convert_to_model from crewai.utilities.events.event_bus import event_bus -from crewai.utilities.events.task_events import TaskCompleted, TaskFailed, TaskStarted +from crewai.utilities.events import ( + TaskCompletedEvent, + TaskFailedEvent, + TaskStartedEvent, +) from crewai.utilities.i18n import I18N from crewai.utilities.printer import Printer @@ -367,7 +371,7 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - event_bus.emit(self, TaskStarted(task=self)) + event_bus.emit(self, TaskStartedEvent(task=self)) result = agent.execute_task( task=self, context=context, @@ -447,7 +451,7 @@ class Task(BaseModel): else result ) self._save_file(content) - event_bus.emit(self, TaskCompleted(task=self, output=task_output)) + event_bus.emit(self, TaskCompletedEvent(task=self, output=task_output)) return task_output except Exception as e: self.end_time = datetime.datetime.now() @@ -456,7 +460,7 @@ class Task(BaseModel): self._telemetry.task_ended(self._execution_span, self, agent.crew) self._execution_span = None - event_bus.emit(self, TaskFailed(task=self, error=str(e))) + event_bus.emit(self, TaskFailedEvent(task=self, error=str(e))) raise e # Re-raise the exception after emitting the event def prompt(self) -> str: diff --git a/src/crewai/tools/tool_usage.py b/src/crewai/tools/tool_usage.py index c58fbeb74..854e1309d 100644 --- a/src/crewai/tools/tool_usage.py +++ b/src/crewai/tools/tool_usage.py @@ -19,8 +19,8 @@ from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling from crewai.utilities import I18N, Converter, ConverterError, Printer from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_types import ( - ToolUsageError, - ToolUsageFinished, + ToolUsageErrorEvent, + ToolUsageFinishedEvent, ) try: @@ -468,7 +468,9 @@ class ToolUsage: def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None: event_data = self._prepare_event_data(tool, tool_calling) - event_bus.emit(self, event=ToolUsageError(**{**event_data, "error": str(e)})) + event_bus.emit( + self, event=ToolUsageErrorEvent(**{**event_data, "error": str(e)}) + ) def on_tool_use_finished( self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float @@ -482,7 +484,7 @@ class ToolUsage: "from_cache": from_cache, } ) - event_bus.emit(self, event=ToolUsageFinished(**event_data)) + event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data)) def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict: return { diff --git a/src/crewai/utilities/events/__init__.py b/src/crewai/utilities/events/__init__.py index f0e1edafb..5055930db 100644 --- a/src/crewai/utilities/events/__init__.py +++ b/src/crewai/utilities/events/__init__.py @@ -1,14 +1,22 @@ from .crew_events import ( - CrewKickoffStarted, - CrewKickoffCompleted, - CrewKickoffFailed, + CrewKickoffStartedEvent, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, +) +from .agent_events import ( + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, +) +from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent +from .flow_events import ( + FlowStartedEvent, + FlowFinishedEvent, + MethodExecutionStartedEvent, + MethodExecutionFinishedEvent, ) -from .agent_events import AgentExecutionStarted, AgentExecutionCompleted, AgentExecutionError -from .task_events import TaskStarted, TaskCompleted, TaskFailed -from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished from .event_bus import EventTypes, EventBus -from .events import emit, on -from .tool_usage_events import ToolUsageFinished, ToolUsageError +from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent # events from .event_listener import EventListener @@ -16,26 +24,25 @@ from .third_party.agentops_listener import agentops_listener __all__ = [ - AgentExecutionStarted, - AgentExecutionCompleted, - CrewKickoffStarted, - CrewKickoffCompleted, - CrewKickoffFailed, - TaskStarted, - TaskCompleted, - TaskFailed, - FlowStarted, - FlowFinished, - MethodExecutionStarted, - MethodExecutionFinished, + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + CrewKickoffStartedEvent, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + TaskStartedEvent, + TaskCompletedEvent, + TaskFailedEvent, + FlowStartedEvent, + FlowFinishedEvent, + MethodExecutionStartedEvent, + MethodExecutionFinishedEvent, EventTypes, - emit, - on, event_bus, - ToolUsageFinished, - ToolUsageError, + ToolUsageFinishedEvent, + ToolUsageErrorEvent, EventBus, - AgentExecutionError + AgentExecutionErrorEvent, ] diff --git a/src/crewai/utilities/events/agent_events.py b/src/crewai/utilities/events/agent_events.py index 17e442b86..2f6fde522 100644 --- a/src/crewai/utilities/events/agent_events.py +++ b/src/crewai/utilities/events/agent_events.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from crewai.agents.agent_builder.base_agent import BaseAgent -class AgentExecutionStarted(CrewEvent): +class AgentExecutionStartedEvent(CrewEvent): """Event emitted when an agent starts executing a task""" agent: BaseAgent @@ -22,7 +22,7 @@ class AgentExecutionStarted(CrewEvent): model_config = {"arbitrary_types_allowed": True} -class AgentExecutionCompleted(CrewEvent): +class AgentExecutionCompletedEvent(CrewEvent): """Event emitted when an agent completes executing a task""" agent: BaseAgent @@ -31,7 +31,7 @@ class AgentExecutionCompleted(CrewEvent): type: str = "agent_execution_completed" -class AgentExecutionError(CrewEvent): +class AgentExecutionErrorEvent(CrewEvent): """Event emitted when an agent encounters an error during execution""" agent: BaseAgent diff --git a/src/crewai/utilities/events/crew_events.py b/src/crewai/utilities/events/crew_events.py index e34d3b240..eb9a80f96 100644 --- a/src/crewai/utilities/events/crew_events.py +++ b/src/crewai/utilities/events/crew_events.py @@ -18,7 +18,7 @@ class CrewEvent(BaseModel): type: str -class CrewKickoffStarted(CrewEvent): +class CrewKickoffStartedEvent(CrewEvent): """Event emitted when a crew starts execution""" crew_name: Optional[str] @@ -26,7 +26,7 @@ class CrewKickoffStarted(CrewEvent): type: str = "crew_kickoff_started" -class CrewKickoffCompleted(CrewEvent): +class CrewKickoffCompletedEvent(CrewEvent): """Event emitted when a crew completes execution""" crew_name: Optional[str] @@ -34,14 +34,14 @@ class CrewKickoffCompleted(CrewEvent): type: str = "crew_kickoff_completed" -class CrewKickoffFailed(CrewEvent): +class CrewKickoffFailedEvent(CrewEvent): """Event emitted when a crew fails to complete execution""" error: str type: str = "crew_kickoff_failed" -class CrewTrainStarted(CrewEvent): +class CrewTrainStartedEvent(CrewEvent): """Event emitted when a crew starts training""" crew_name: Optional[str] @@ -51,7 +51,7 @@ class CrewTrainStarted(CrewEvent): type: str = "crew_train_started" -class CrewTrainCompleted(CrewEvent): +class CrewTrainCompletedEvent(CrewEvent): """Event emitted when a crew completes training""" crew_name: Optional[str] @@ -60,14 +60,14 @@ class CrewTrainCompleted(CrewEvent): type: str = "crew_train_completed" -class CrewTrainFailed(CrewEvent): +class CrewTrainFailedEvent(CrewEvent): """Event emitted when a crew fails to complete training""" error: str type: str = "crew_train_failed" -class CrewTestStarted(CrewEvent): +class CrewTestStartedEvent(CrewEvent): """Event emitted when a crew starts testing""" crew_name: Optional[str] @@ -77,14 +77,14 @@ class CrewTestStarted(CrewEvent): type: str = "crew_test_started" -class CrewTestCompleted(CrewEvent): +class CrewTestCompletedEvent(CrewEvent): """Event emitted when a crew completes testing""" crew_name: Optional[str] type: str = "crew_test_completed" -class CrewTestFailed(CrewEvent): +class CrewTestFailedEvent(CrewEvent): """Event emitted when a crew fails to complete testing""" error: str diff --git a/src/crewai/utilities/events/event_bus.py b/src/crewai/utilities/events/event_bus.py index 7a2b3a301..29387a268 100644 --- a/src/crewai/utilities/events/event_bus.py +++ b/src/crewai/utilities/events/event_bus.py @@ -42,8 +42,10 @@ class EventBus: Decorator to register an event handler for a specific event type. Usage: - @event_bus.on(AgentExecutionCompleted) - def on_agent_execution_completed(source: Any, event: AgentExecutionCompleted): + @event_bus.on(AgentExecutionCompletedEvent) + def on_agent_execution_completed( + source: Any, event: AgentExecutionCompletedEvent + ): print(f"👍 Agent '{event.agent}' completed task") print(f" Output: {event.output}") """ diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index f26e2058e..8409e2380 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -4,23 +4,20 @@ from crewai.telemetry.telemetry import Telemetry from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.events.base_event_listener import BaseEventListener -from .agent_events import ( - AgentExecutionCompleted, - AgentExecutionStarted, -) +from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent from .crew_events import ( - CrewKickoffCompleted, - CrewKickoffStarted, - CrewTestCompleted, - CrewTestStarted, + CrewKickoffCompletedEvent, + CrewKickoffStartedEvent, + CrewTestCompletedEvent, + CrewTestStartedEvent, ) from .flow_events import ( - FlowFinished, - FlowStarted, - MethodExecutionFinished, - MethodExecutionStarted, + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, ) -from .task_events import TaskCompleted, TaskStarted +from .task_events import TaskCompletedEvent, TaskStartedEvent class EventListener(BaseEventListener): @@ -32,19 +29,19 @@ class EventListener(BaseEventListener): self._telemetry.set_tracer() def setup_listeners(self, event_bus): - @event_bus.on(CrewKickoffStarted) - def on_crew_started(source, event: CrewKickoffStarted): + @event_bus.on(CrewKickoffStartedEvent) + def on_crew_started(source, event: CrewKickoffStartedEvent): print(f"🚀 Crew '{event.crew_name}' started", event.timestamp) print("event.inputs", event.inputs) self._telemetry.crew_execution_span(source, event.inputs) - @event_bus.on(CrewKickoffCompleted) - def on_crew_completed(source, event: CrewKickoffCompleted): + @event_bus.on(CrewKickoffCompletedEvent) + def on_crew_completed(source, event: CrewKickoffCompletedEvent): final_string_output = event.output.raw self._telemetry.end_crew(source, final_string_output) - @event_bus.on(CrewTestStarted) - def on_crew_test_started(source, event: CrewTestStarted): + @event_bus.on(CrewTestStartedEvent) + def on_crew_test_started(source, event: CrewTestStartedEvent): cloned_crew = source.copy() cloned_crew._telemetry.test_execution_span( cloned_crew, @@ -54,16 +51,16 @@ class EventListener(BaseEventListener): ) print(f"🚀 Crew '{event.crew_name}' started test") - @event_bus.on(CrewTestCompleted) - def on_crew_test_completed(source, event: CrewTestCompleted): + @event_bus.on(CrewTestCompletedEvent) + def on_crew_test_completed(source, event: CrewTestCompletedEvent): print(f"👍 Crew '{event.crew_name}' completed test") - @event_bus.on(TaskStarted) - def on_task_started(source, event: TaskStarted): + @event_bus.on(TaskStartedEvent) + def on_task_started(source, event: TaskStartedEvent): print(f"📋 Task started: {event.task.description}") - @event_bus.on(TaskCompleted) - def on_task_completed(source, event: TaskCompleted): + @event_bus.on(TaskCompletedEvent) + def on_task_completed(source, event: TaskCompletedEvent): print(f" Output: {event.output}") result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output) print(f" Evaluation: {result.quality}") @@ -72,29 +69,29 @@ class EventListener(BaseEventListener): else: print(f" ❌ Failed: {result.suggestions}") - @event_bus.on(AgentExecutionStarted) - def on_agent_execution_started(source, event: AgentExecutionStarted): + @event_bus.on(AgentExecutionStartedEvent) + def on_agent_execution_started(source, event: AgentExecutionStartedEvent): print(f"🤖 Agent '{event.agent.role}' started task") - @event_bus.on(AgentExecutionCompleted) - def on_agent_execution_completed(source, event: AgentExecutionCompleted): + @event_bus.on(AgentExecutionCompletedEvent) + def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): print(f"👍 Agent '{event.agent.role}' completed task") print(f" Output: {event.output}") - @event_bus.on(FlowStarted) - def on_flow_started(source, event: FlowStarted): + @event_bus.on(FlowStartedEvent) + def on_flow_started(source, event: FlowStartedEvent): print(f"🤖 Flow Started: '{event.flow_name}'") - @event_bus.on(FlowFinished) - def on_flow_finished(source, event: FlowFinished): + @event_bus.on(FlowFinishedEvent) + def on_flow_finished(source, event: FlowFinishedEvent): print(f"👍 Flow Finished: '{event.flow_name}'") - @event_bus.on(MethodExecutionStarted) - def on_method_execution_started(source, event: MethodExecutionStarted): + @event_bus.on(MethodExecutionStartedEvent) + def on_method_execution_started(source, event: MethodExecutionStartedEvent): print(f"🤖 Flow Method Started: '{event.method_name}'") - @event_bus.on(MethodExecutionFinished) - def on_method_execution_finished(source, event: MethodExecutionFinished): + @event_bus.on(MethodExecutionFinishedEvent) + def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): print(f"👍 Flow Method Finished: '{event.method_name}'") diff --git a/src/crewai/utilities/events/event_types.py b/src/crewai/utilities/events/event_types.py index 0fb6e35c3..b4fa0f929 100644 --- a/src/crewai/utilities/events/event_types.py +++ b/src/crewai/utilities/events/event_types.py @@ -1,54 +1,54 @@ from typing import Union from .agent_events import ( - AgentExecutionCompleted, - AgentExecutionError, - AgentExecutionStarted, + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, ) from .crew_events import ( - CrewKickoffCompleted, - CrewKickoffFailed, - CrewKickoffStarted, - CrewTestCompleted, - CrewTestFailed, - CrewTestStarted, - CrewTrainCompleted, - CrewTrainFailed, - CrewTrainStarted, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewKickoffStartedEvent, + CrewTestCompletedEvent, + CrewTestFailedEvent, + CrewTestStartedEvent, + CrewTrainCompletedEvent, + CrewTrainFailedEvent, + CrewTrainStartedEvent, ) from .flow_events import ( - FlowFinished, - FlowStarted, - MethodExecutionFinished, - MethodExecutionStarted, + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, ) from .task_events import ( - TaskCompleted, - TaskFailed, - TaskStarted, + TaskCompletedEvent, + TaskFailedEvent, + TaskStartedEvent, ) -from .tool_usage_events import ToolUsageError, ToolUsageFinished +from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent EventTypes = Union[ - CrewKickoffStarted, - CrewKickoffCompleted, - CrewKickoffFailed, - CrewTestStarted, - CrewTestCompleted, - CrewTestFailed, - CrewTrainStarted, - CrewTrainCompleted, - CrewTrainFailed, - AgentExecutionStarted, - AgentExecutionCompleted, - TaskStarted, - TaskCompleted, - TaskFailed, - FlowStarted, - FlowFinished, - MethodExecutionStarted, - MethodExecutionFinished, - AgentExecutionError, - ToolUsageFinished, - ToolUsageError, + CrewKickoffStartedEvent, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewTestStartedEvent, + CrewTestCompletedEvent, + CrewTestFailedEvent, + CrewTrainStartedEvent, + CrewTrainCompletedEvent, + CrewTrainFailedEvent, + AgentExecutionStartedEvent, + AgentExecutionCompletedEvent, + TaskStartedEvent, + TaskCompletedEvent, + TaskFailedEvent, + FlowStartedEvent, + FlowFinishedEvent, + MethodExecutionStartedEvent, + MethodExecutionFinishedEvent, + AgentExecutionErrorEvent, + ToolUsageFinishedEvent, + ToolUsageErrorEvent, ] diff --git a/src/crewai/utilities/events/events.py b/src/crewai/utilities/events/events.py deleted file mode 100644 index 203148bb0..000000000 --- a/src/crewai/utilities/events/events.py +++ /dev/null @@ -1,53 +0,0 @@ -from functools import wraps -from typing import ( - Any, - Callable, - Dict, - Generic, - List, - Type, - TypeVar, - TYPE_CHECKING, -) -from datetime import datetime -from typing import Optional - -from pydantic import BaseModel, Field -from .event_types import EventTypes - -T = TypeVar("T") -EVT = TypeVar("EVT", bound=BaseModel) - - -class Emitter(Generic[T, EVT]): - _listeners: Dict[Type[EVT], List[Callable]] = {} - - def on(self, event_type: Type[EVT]): - def decorator(func: Callable): - @wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - self._listeners.setdefault(event_type, []).append(wrapper) - return wrapper - - return decorator - - def emit(self, source: T, event: EVT) -> None: - event_type = type(event) - for func in self._listeners.get(event_type, []): - func(source, event) - - -# Global event emitter instance -default_emitter = Emitter[Any, BaseModel]() - - -def emit(source: Any, event: BaseModel) -> None: - """Emit an event to all registered listeners""" - default_emitter.emit(source, event) - - -def on(event_type: Type[EventTypes]) -> Callable: - """Register a listener for a specific event type""" - return default_emitter.on(event_type) diff --git a/src/crewai/utilities/events/flow_events.py b/src/crewai/utilities/events/flow_events.py index 27ef6446a..1eb988413 100644 --- a/src/crewai/utilities/events/flow_events.py +++ b/src/crewai/utilities/events/flow_events.py @@ -1,32 +1,46 @@ -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union + +from pydantic import BaseModel from .crew_events import CrewEvent -class FlowStarted(CrewEvent): +class FlowEvent(CrewEvent): + """Base class for all flow events""" + + type: str + flow_name: str + + +class FlowStartedEvent(FlowEvent): """Event emitted when a flow starts execution""" flow_name: str + inputs: Optional[Dict[str, Any]] = None type: str = "flow_started" -class MethodExecutionStarted(CrewEvent): +class MethodExecutionStartedEvent(FlowEvent): """Event emitted when a flow method starts execution""" flow_name: str method_name: str + state: Union[Dict[str, Any], BaseModel] + params: Optional[Dict[str, Any]] = None type: str = "method_execution_started" -class MethodExecutionFinished(CrewEvent): +class MethodExecutionFinishedEvent(FlowEvent): """Event emitted when a flow method completes execution""" flow_name: str method_name: str + result: Any = None + state: Union[Dict[str, Any], BaseModel] type: str = "method_execution_finished" -class FlowFinished(CrewEvent): +class FlowFinishedEvent(FlowEvent): """Event emitted when a flow completes execution""" flow_name: str diff --git a/src/crewai/utilities/events/task_events.py b/src/crewai/utilities/events/task_events.py index e057d406a..4cef75638 100644 --- a/src/crewai/utilities/events/task_events.py +++ b/src/crewai/utilities/events/task_events.py @@ -3,7 +3,7 @@ from typing import Any from crewai.utilities.events.crew_events import CrewEvent -class TaskStarted(CrewEvent): +class TaskStartedEvent(CrewEvent): """Event emitted when a task starts""" task: Any @@ -12,7 +12,7 @@ class TaskStarted(CrewEvent): model_config = {"arbitrary_types_allowed": True} -class TaskCompleted(CrewEvent): +class TaskCompletedEvent(CrewEvent): """Event emitted when a task completes""" task: Any @@ -22,7 +22,7 @@ class TaskCompleted(CrewEvent): model_config = {"arbitrary_types_allowed": True} -class TaskFailed(CrewEvent): +class TaskFailedEvent(CrewEvent): """Event emitted when a task fails""" task: Any diff --git a/src/crewai/utilities/events/third_party/agentops_listener.py b/src/crewai/utilities/events/third_party/agentops_listener.py index 2ac7c94e7..f1eb1f36f 100644 --- a/src/crewai/utilities/events/third_party/agentops_listener.py +++ b/src/crewai/utilities/events/third_party/agentops_listener.py @@ -1,4 +1,4 @@ -from crewai.utilities.events.agent_events import AgentExecutionStarted +from crewai.utilities.events.agent_events import AgentExecutionStartedEvent from crewai.utilities.events.base_event_listener import BaseEventListener try: @@ -16,8 +16,8 @@ class AgentOpsListener(BaseEventListener): def setup_listeners(self, event_bus): if AGENTOPS_INSTALLED: - @event_bus.on(AgentExecutionStarted) - def on_agent_started(source, event: AgentExecutionStarted): + @event_bus.on(AgentExecutionStartedEvent) + def on_agent_started(source, event: AgentExecutionStartedEvent): print("AGENTOPS WORKS !!!", event.agent) diff --git a/src/crewai/utilities/events/tool_usage_events.py b/src/crewai/utilities/events/tool_usage_events.py index 455fa51a0..95a969820 100644 --- a/src/crewai/utilities/events/tool_usage_events.py +++ b/src/crewai/utilities/events/tool_usage_events.py @@ -18,7 +18,7 @@ class ToolUsageEvent(CrewEvent): model_config = {"arbitrary_types_allowed": True} -class ToolUsageFinished(ToolUsageEvent): +class ToolUsageFinishedEvent(ToolUsageEvent): """Event emitted when a tool execution is completed""" started_at: datetime @@ -27,7 +27,7 @@ class ToolUsageFinished(ToolUsageEvent): type: str = "tool_usage_finished" -class ToolUsageError(ToolUsageEvent): +class ToolUsageErrorEvent(ToolUsageEvent): """Event emitted when a tool execution encounters an error""" error: str diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index cb37881e3..565d614c3 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -11,24 +11,28 @@ from crewai.flow.flow import Flow, listen, start from crewai.task import Task from crewai.tools.base_tool import BaseTool from crewai.utilities.events.agent_events import ( - AgentExecutionCompleted, - AgentExecutionError, - AgentExecutionStarted, + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, ) from crewai.utilities.events.crew_events import ( - CrewKickoffCompleted, - CrewKickoffFailed, - CrewKickoffStarted, + CrewKickoffCompletedEvent, + CrewKickoffFailedEvent, + CrewKickoffStartedEvent, ) from crewai.utilities.events.event_bus import event_bus -from crewai.utilities.events.event_types import ToolUsageFinished +from crewai.utilities.events.event_types import ToolUsageFinishedEvent from crewai.utilities.events.flow_events import ( - FlowFinished, - FlowStarted, - MethodExecutionStarted, + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionStartedEvent, ) -from crewai.utilities.events.task_events import TaskCompleted, TaskFailed, TaskStarted -from crewai.utilities.events.tool_usage_events import ToolUsageError +from crewai.utilities.events.task_events import ( + TaskCompletedEvent, + TaskFailedEvent, + TaskStartedEvent, +) +from crewai.utilities.events.tool_usage_events import ToolUsageErrorEvent base_agent = Agent( role="base_agent", @@ -50,7 +54,7 @@ def test_crew_emits_start_kickoff_event(): with event_bus.scoped_handlers(): - @event_bus.on(CrewKickoffStarted) + @event_bus.on(CrewKickoffStartedEvent) def handle_crew_start(source, event): received_events.append(event) @@ -68,7 +72,7 @@ def test_crew_emits_start_kickoff_event(): def test_crew_emits_end_kickoff_event(): received_events = [] - @event_bus.on(CrewKickoffCompleted) + @event_bus.on(CrewKickoffCompletedEvent) def handle_crew_end(source, event): received_events.append(event) @@ -88,7 +92,7 @@ def test_crew_emits_kickoff_failed_event(): with event_bus.scoped_handlers(): - @event_bus.on(CrewKickoffFailed) + @event_bus.on(CrewKickoffFailedEvent) def handle_crew_failed(source, event): received_events.append(event) @@ -111,7 +115,7 @@ def test_crew_emits_kickoff_failed_event(): def test_crew_emits_start_task_event(): received_events = [] - @event_bus.on(TaskStarted) + @event_bus.on(TaskStartedEvent) def handle_task_start(source, event): received_events.append(event) @@ -128,7 +132,7 @@ def test_crew_emits_start_task_event(): def test_crew_emits_end_task_event(): received_events = [] - @event_bus.on(TaskCompleted) + @event_bus.on(TaskCompletedEvent) def handle_task_end(source, event): received_events.append(event) @@ -145,7 +149,7 @@ def test_crew_emits_end_task_event(): def test_task_emits_failed_event_on_execution_error(): received_events = [] - @event_bus.on(TaskFailed) + @event_bus.on(TaskFailedEvent) def handle_task_failed(source, event): received_events.append(event) @@ -171,11 +175,11 @@ def test_task_emits_failed_event_on_execution_error(): def test_agent_emits_execution_started_and_completed_events(): received_events = [] - @event_bus.on(AgentExecutionStarted) + @event_bus.on(AgentExecutionStartedEvent) def handle_agent_start(source, event): received_events.append(event) - @event_bus.on(AgentExecutionCompleted) + @event_bus.on(AgentExecutionCompletedEvent) def handle_agent_completed(source, event): received_events.append(event) @@ -205,7 +209,7 @@ def test_agent_emits_execution_started_and_completed_events(): def test_agent_emits_execution_error_event(): received_events = [] - @event_bus.on(AgentExecutionError) + @event_bus.on(AgentExecutionErrorEvent) def handle_agent_start(source, event): received_events.append(event) @@ -243,7 +247,7 @@ class SayHiTool(BaseTool): def test_tools_emits_finished_events(): received_events = [] - @event_bus.on(ToolUsageFinished) + @event_bus.on(ToolUsageFinishedEvent) def handle_tool_end(source, event): received_events.append(event) @@ -274,7 +278,7 @@ def test_tools_emits_finished_events(): def test_tools_emits_error_events(): received_events = [] - @event_bus.on(ToolUsageError) + @event_bus.on(ToolUsageErrorEvent) def handle_tool_end(source, event): received_events.append(event) @@ -321,7 +325,7 @@ def test_flow_emits_start_event(): with event_bus.scoped_handlers(): - @event_bus.on(FlowStarted) + @event_bus.on(FlowStartedEvent) def handle_flow_start(source, event): received_events.append(event) @@ -343,7 +347,7 @@ def test_flow_emits_finish_event(): with event_bus.scoped_handlers(): - @event_bus.on(FlowFinished) + @event_bus.on(FlowFinishedEvent) def handle_flow_finish(source, event): received_events.append(event) @@ -367,7 +371,7 @@ def test_flow_emits_method_execution_started_event(): with event_bus.scoped_handlers(): - @event_bus.on(MethodExecutionStarted) + @event_bus.on(MethodExecutionStartedEvent) def handle_method_start(source, event): print("event in method name", event.method_name) received_events.append(event) @@ -399,7 +403,7 @@ def test_register_handler_adds_new_handler(): received_events.append(event) with event_bus.scoped_handlers(): - event_bus.register_handler(CrewKickoffStarted, custom_handler) + event_bus.register_handler(CrewKickoffStartedEvent, custom_handler) crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew.kickoff() @@ -421,8 +425,8 @@ def test_multiple_handlers_for_same_event(): received_events_2.append(event) with event_bus.scoped_handlers(): - event_bus.register_handler(CrewKickoffStarted, handler_1) - event_bus.register_handler(CrewKickoffStarted, handler_2) + event_bus.register_handler(CrewKickoffStartedEvent, handler_1) + event_bus.register_handler(CrewKickoffStartedEvent, handler_2) crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew.kickoff()