Refactor event classes to improve type safety and naming consistency

- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code
This commit is contained in:
Lorenze Jay
2025-02-12 16:17:52 -08:00
parent 9debd3a6da
commit 779db3c3dd
20 changed files with 238 additions and 296 deletions

View File

@@ -20,8 +20,8 @@ from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import ( from crewai.utilities.events.agent_events import (
AgentExecutionCompleted, AgentExecutionCompletedEvent,
AgentExecutionError, AgentExecutionErrorEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.llm_utils import create_llm from crewai.utilities.llm_utils import create_llm
@@ -256,7 +256,7 @@ class Agent(BaseAgent):
except Exception as e: except Exception as e:
event_bus.emit( event_bus.emit(
self, self,
event=AgentExecutionError( event=AgentExecutionErrorEvent(
agent=self, agent=self,
task=task, task=task,
error=str(e), error=str(e),
@@ -280,7 +280,8 @@ class Agent(BaseAgent):
if tool_result.get("result_as_answer", False): if tool_result.get("result_as_answer", False):
result = tool_result["result"] result = tool_result["result"]
event_bus.emit( event_bus.emit(
self, event=AgentExecutionCompleted(agent=self, task=task, output=result) self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
) )
return result return result

View File

@@ -20,8 +20,7 @@ from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.tools.base_tool import BaseTool from crewai.tools.base_tool import BaseTool, Tool
from crewai.tools.base_tool import Tool
from crewai.utilities import I18N, Logger, RPMController from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter from crewai.utilities.converter import Converter

View File

@@ -18,9 +18,9 @@ from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events.agent_events import ( from crewai.utilities.events import (
AgentExecutionError, AgentExecutionErrorEvent,
AgentExecutionStarted, AgentExecutionStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
@@ -93,7 +93,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if self.agent and self.task: if self.agent and self.task:
event_bus.emit( event_bus.emit(
self, self,
event=AgentExecutionStarted( event=AgentExecutionStartedEvent(
agent=self.agent, agent=self.agent,
tools=self.tools, tools=self.tools,
inputs=inputs, inputs=inputs,
@@ -195,7 +195,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if self.agent: if self.agent:
event_bus.emit( event_bus.emit(
self, self,
event=AgentExecutionError( event=AgentExecutionErrorEvent(
agent=self.agent, task=self.task, error=str(exception) agent=self.agent, task=self.task, error=str(exception)
), ),
) )

View File

@@ -44,15 +44,15 @@ from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.crew_events import ( from crewai.utilities.events.crew_events import (
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffFailed, CrewKickoffFailedEvent,
CrewKickoffStarted, CrewKickoffStartedEvent,
CrewTestCompleted, CrewTestCompletedEvent,
CrewTestFailed, CrewTestFailedEvent,
CrewTestStarted, CrewTestStartedEvent,
CrewTrainCompleted, CrewTrainCompletedEvent,
CrewTrainFailed, CrewTrainFailedEvent,
CrewTrainStarted, CrewTrainStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.formatter import ( from crewai.utilities.formatter import (
@@ -526,7 +526,7 @@ class Crew(BaseModel):
try: try:
event_bus.emit( event_bus.emit(
self, self,
CrewTrainStarted( CrewTrainStartedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
n_iterations=n_iterations, n_iterations=n_iterations,
filename=filename, filename=filename,
@@ -553,14 +553,14 @@ class Crew(BaseModel):
event_bus.emit( event_bus.emit(
self, self,
CrewTrainCompleted( CrewTrainCompletedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
n_iterations=n_iterations, n_iterations=n_iterations,
filename=filename, filename=filename,
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit(self, CrewTrainFailed(error=str(e))) event_bus.emit(self, CrewTrainFailedEvent(error=str(e)))
self._logger.log("error", f"Training failed: {e}", color="red") self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear() CrewTrainingHandler(TRAINING_DATA_FILE).clear()
CrewTrainingHandler(filename).clear() CrewTrainingHandler(filename).clear()
@@ -577,7 +577,8 @@ class Crew(BaseModel):
inputs = before_callback(inputs) inputs = before_callback(inputs)
event_bus.emit( event_bus.emit(
self, CrewKickoffStarted(crew_name=self.name or "crew", inputs=inputs) self,
CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs),
) )
"""Starts the crew to work on its assigned tasks.""" """Starts the crew to work on its assigned tasks."""
@@ -629,7 +630,7 @@ class Crew(BaseModel):
self.usage_metrics.add_usage_metrics(metric) self.usage_metrics.add_usage_metrics(metric)
return result return result
except Exception as e: except Exception as e:
event_bus.emit(self, CrewKickoffFailed(error=str(e))) event_bus.emit(self, CrewKickoffFailedEvent(error=str(e)))
raise raise
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
@@ -980,7 +981,7 @@ class Crew(BaseModel):
token_usage = self.calculate_usage_metrics() token_usage = self.calculate_usage_metrics()
event_bus.emit( event_bus.emit(
self, self,
CrewKickoffCompleted( CrewKickoffCompletedEvent(
crew_name=self.name or "crew", output=final_task_output crew_name=self.name or "crew", output=final_task_output
), ),
) )
@@ -1200,7 +1201,7 @@ class Crew(BaseModel):
try: try:
event_bus.emit( event_bus.emit(
self, self,
CrewTestStarted( CrewTestStartedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
n_iterations=n_iterations, n_iterations=n_iterations,
openai_model_name=openai_model_name, openai_model_name=openai_model_name,
@@ -1218,12 +1219,12 @@ class Crew(BaseModel):
event_bus.emit( event_bus.emit(
self, self,
CrewTestCompleted( CrewTestCompletedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit(self, CrewTestFailed(error=str(e))) event_bus.emit(self, CrewTestFailedEvent(error=str(e)))
raise raise
def __repr__(self): def __repr__(self):

View File

@@ -25,10 +25,10 @@ from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.utils import get_possible_return_constants from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry from crewai.telemetry import Telemetry
from crewai.utilities.events import ( from crewai.utilities.events import (
FlowFinished, FlowFinishedEvent,
FlowStarted, FlowStartedEvent,
MethodExecutionFinished, MethodExecutionFinishedEvent,
MethodExecutionStarted, MethodExecutionStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
@@ -741,7 +741,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Start flow execution # Start flow execution
event_bus.emit( event_bus.emit(
self, self,
FlowStarted( FlowStartedEvent(
type="flow_started", type="flow_started",
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
inputs=inputs, inputs=inputs,
@@ -774,7 +774,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
event_bus.emit( event_bus.emit(
self, self,
FlowFinished( FlowFinishedEvent(
type="flow_finished", type="flow_finished",
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
result=final_output, result=final_output,
@@ -982,10 +982,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
event_bus.emit( event_bus.emit(
self, self,
MethodExecutionStarted( MethodExecutionStartedEvent(
type="method_execution_started", type="method_execution_started",
method_name=listener_name, method_name=listener_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
state=self._copy_state(),
), ),
) )
@@ -1002,10 +1003,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
event_bus.emit( event_bus.emit(
self, self,
MethodExecutionFinished( MethodExecutionFinishedEvent(
type="method_execution_finished", type="method_execution_finished",
method_name=listener_name, method_name=listener_name,
flow_name=self.__class__.__name__, flow_name=self.__class__.__name__,
state=self._copy_state(),
result=listener_result,
), ),
) )

View File

@@ -1,39 +0,0 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
@dataclass
class Event:
type: str
flow_name: str
timestamp: datetime = field(init=False)
def __post_init__(self):
self.timestamp = datetime.now()
@dataclass
class FlowStartedEvent(Event):
inputs: Optional[Dict[str, Any]] = None
@dataclass
class MethodExecutionStartedEvent(Event):
method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
@dataclass
class MethodExecutionFinishedEvent(Event):
method_name: str
state: Union[Dict[str, Any], BaseModel]
result: Any = None
@dataclass
class FlowFinishedEvent(Event):
result: Optional[Any] = None

View File

@@ -41,7 +41,11 @@ from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter, convert_to_model from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.task_events import TaskCompleted, TaskFailed, TaskStarted from crewai.utilities.events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.i18n import I18N from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
@@ -367,7 +371,7 @@ class Task(BaseModel):
tools = tools or self.tools or [] tools = tools or self.tools or []
self.processed_by_agents.add(agent.role) self.processed_by_agents.add(agent.role)
event_bus.emit(self, TaskStarted(task=self)) event_bus.emit(self, TaskStartedEvent(task=self))
result = agent.execute_task( result = agent.execute_task(
task=self, task=self,
context=context, context=context,
@@ -447,7 +451,7 @@ class Task(BaseModel):
else result else result
) )
self._save_file(content) self._save_file(content)
event_bus.emit(self, TaskCompleted(task=self, output=task_output)) event_bus.emit(self, TaskCompletedEvent(task=self, output=task_output))
return task_output return task_output
except Exception as e: except Exception as e:
self.end_time = datetime.datetime.now() self.end_time = datetime.datetime.now()
@@ -456,7 +460,7 @@ class Task(BaseModel):
self._telemetry.task_ended(self._execution_span, self, agent.crew) self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None self._execution_span = None
event_bus.emit(self, TaskFailed(task=self, error=str(e))) event_bus.emit(self, TaskFailedEvent(task=self, error=str(e)))
raise e # Re-raise the exception after emitting the event raise e # Re-raise the exception after emitting the event
def prompt(self) -> str: def prompt(self) -> str:

View File

@@ -19,8 +19,8 @@ from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.event_types import ( from crewai.utilities.events.event_types import (
ToolUsageError, ToolUsageErrorEvent,
ToolUsageFinished, ToolUsageFinishedEvent,
) )
try: try:
@@ -468,7 +468,9 @@ class ToolUsage:
def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None: def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None:
event_data = self._prepare_event_data(tool, tool_calling) event_data = self._prepare_event_data(tool, tool_calling)
event_bus.emit(self, event=ToolUsageError(**{**event_data, "error": str(e)})) event_bus.emit(
self, event=ToolUsageErrorEvent(**{**event_data, "error": str(e)})
)
def on_tool_use_finished( def on_tool_use_finished(
self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float
@@ -482,7 +484,7 @@ class ToolUsage:
"from_cache": from_cache, "from_cache": from_cache,
} }
) )
event_bus.emit(self, event=ToolUsageFinished(**event_data)) event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict: def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
return { return {

View File

@@ -1,14 +1,22 @@
from .crew_events import ( from .crew_events import (
CrewKickoffStarted, CrewKickoffStartedEvent,
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffFailed, CrewKickoffFailedEvent,
)
from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
)
from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent
from .flow_events import (
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
) )
from .agent_events import AgentExecutionStarted, AgentExecutionCompleted, AgentExecutionError
from .task_events import TaskStarted, TaskCompleted, TaskFailed
from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished
from .event_bus import EventTypes, EventBus from .event_bus import EventTypes, EventBus
from .events import emit, on from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent
from .tool_usage_events import ToolUsageFinished, ToolUsageError
# events # events
from .event_listener import EventListener from .event_listener import EventListener
@@ -16,26 +24,25 @@ from .third_party.agentops_listener import agentops_listener
__all__ = [ __all__ = [
AgentExecutionStarted, AgentExecutionStartedEvent,
AgentExecutionCompleted, AgentExecutionCompletedEvent,
CrewKickoffStarted, AgentExecutionErrorEvent,
CrewKickoffCompleted, CrewKickoffStartedEvent,
CrewKickoffFailed, CrewKickoffCompletedEvent,
TaskStarted, CrewKickoffFailedEvent,
TaskCompleted, TaskStartedEvent,
TaskFailed, TaskCompletedEvent,
FlowStarted, TaskFailedEvent,
FlowFinished, FlowStartedEvent,
MethodExecutionStarted, FlowFinishedEvent,
MethodExecutionFinished, MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
EventTypes, EventTypes,
emit,
on,
event_bus, event_bus,
ToolUsageFinished, ToolUsageFinishedEvent,
ToolUsageError, ToolUsageErrorEvent,
EventBus, EventBus,
AgentExecutionError AgentExecutionErrorEvent,
] ]

View File

@@ -10,7 +10,7 @@ if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
class AgentExecutionStarted(CrewEvent): class AgentExecutionStartedEvent(CrewEvent):
"""Event emitted when an agent starts executing a task""" """Event emitted when an agent starts executing a task"""
agent: BaseAgent agent: BaseAgent
@@ -22,7 +22,7 @@ class AgentExecutionStarted(CrewEvent):
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class AgentExecutionCompleted(CrewEvent): class AgentExecutionCompletedEvent(CrewEvent):
"""Event emitted when an agent completes executing a task""" """Event emitted when an agent completes executing a task"""
agent: BaseAgent agent: BaseAgent
@@ -31,7 +31,7 @@ class AgentExecutionCompleted(CrewEvent):
type: str = "agent_execution_completed" type: str = "agent_execution_completed"
class AgentExecutionError(CrewEvent): class AgentExecutionErrorEvent(CrewEvent):
"""Event emitted when an agent encounters an error during execution""" """Event emitted when an agent encounters an error during execution"""
agent: BaseAgent agent: BaseAgent

View File

@@ -18,7 +18,7 @@ class CrewEvent(BaseModel):
type: str type: str
class CrewKickoffStarted(CrewEvent): class CrewKickoffStartedEvent(CrewEvent):
"""Event emitted when a crew starts execution""" """Event emitted when a crew starts execution"""
crew_name: Optional[str] crew_name: Optional[str]
@@ -26,7 +26,7 @@ class CrewKickoffStarted(CrewEvent):
type: str = "crew_kickoff_started" type: str = "crew_kickoff_started"
class CrewKickoffCompleted(CrewEvent): class CrewKickoffCompletedEvent(CrewEvent):
"""Event emitted when a crew completes execution""" """Event emitted when a crew completes execution"""
crew_name: Optional[str] crew_name: Optional[str]
@@ -34,14 +34,14 @@ class CrewKickoffCompleted(CrewEvent):
type: str = "crew_kickoff_completed" type: str = "crew_kickoff_completed"
class CrewKickoffFailed(CrewEvent): class CrewKickoffFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete execution""" """Event emitted when a crew fails to complete execution"""
error: str error: str
type: str = "crew_kickoff_failed" type: str = "crew_kickoff_failed"
class CrewTrainStarted(CrewEvent): class CrewTrainStartedEvent(CrewEvent):
"""Event emitted when a crew starts training""" """Event emitted when a crew starts training"""
crew_name: Optional[str] crew_name: Optional[str]
@@ -51,7 +51,7 @@ class CrewTrainStarted(CrewEvent):
type: str = "crew_train_started" type: str = "crew_train_started"
class CrewTrainCompleted(CrewEvent): class CrewTrainCompletedEvent(CrewEvent):
"""Event emitted when a crew completes training""" """Event emitted when a crew completes training"""
crew_name: Optional[str] crew_name: Optional[str]
@@ -60,14 +60,14 @@ class CrewTrainCompleted(CrewEvent):
type: str = "crew_train_completed" type: str = "crew_train_completed"
class CrewTrainFailed(CrewEvent): class CrewTrainFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete training""" """Event emitted when a crew fails to complete training"""
error: str error: str
type: str = "crew_train_failed" type: str = "crew_train_failed"
class CrewTestStarted(CrewEvent): class CrewTestStartedEvent(CrewEvent):
"""Event emitted when a crew starts testing""" """Event emitted when a crew starts testing"""
crew_name: Optional[str] crew_name: Optional[str]
@@ -77,14 +77,14 @@ class CrewTestStarted(CrewEvent):
type: str = "crew_test_started" type: str = "crew_test_started"
class CrewTestCompleted(CrewEvent): class CrewTestCompletedEvent(CrewEvent):
"""Event emitted when a crew completes testing""" """Event emitted when a crew completes testing"""
crew_name: Optional[str] crew_name: Optional[str]
type: str = "crew_test_completed" type: str = "crew_test_completed"
class CrewTestFailed(CrewEvent): class CrewTestFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete testing""" """Event emitted when a crew fails to complete testing"""
error: str error: str

View File

@@ -42,8 +42,10 @@ class EventBus:
Decorator to register an event handler for a specific event type. Decorator to register an event handler for a specific event type.
Usage: Usage:
@event_bus.on(AgentExecutionCompleted) @event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source: Any, event: AgentExecutionCompleted): def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task") print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}") print(f" Output: {event.output}")
""" """

View File

@@ -4,23 +4,20 @@ from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.base_event_listener import BaseEventListener from crewai.utilities.events.base_event_listener import BaseEventListener
from .agent_events import ( from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
AgentExecutionCompleted,
AgentExecutionStarted,
)
from .crew_events import ( from .crew_events import (
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffStarted, CrewKickoffStartedEvent,
CrewTestCompleted, CrewTestCompletedEvent,
CrewTestStarted, CrewTestStartedEvent,
) )
from .flow_events import ( from .flow_events import (
FlowFinished, FlowFinishedEvent,
FlowStarted, FlowStartedEvent,
MethodExecutionFinished, MethodExecutionFinishedEvent,
MethodExecutionStarted, MethodExecutionStartedEvent,
) )
from .task_events import TaskCompleted, TaskStarted from .task_events import TaskCompletedEvent, TaskStartedEvent
class EventListener(BaseEventListener): class EventListener(BaseEventListener):
@@ -32,19 +29,19 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer() self._telemetry.set_tracer()
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
@event_bus.on(CrewKickoffStarted) @event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStarted): def on_crew_started(source, event: CrewKickoffStartedEvent):
print(f"🚀 Crew '{event.crew_name}' started", event.timestamp) print(f"🚀 Crew '{event.crew_name}' started", event.timestamp)
print("event.inputs", event.inputs) print("event.inputs", event.inputs)
self._telemetry.crew_execution_span(source, event.inputs) self._telemetry.crew_execution_span(source, event.inputs)
@event_bus.on(CrewKickoffCompleted) @event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompleted): def on_crew_completed(source, event: CrewKickoffCompletedEvent):
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
@event_bus.on(CrewTestStarted) @event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStarted): def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy() cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span( cloned_crew._telemetry.test_execution_span(
cloned_crew, cloned_crew,
@@ -54,16 +51,16 @@ class EventListener(BaseEventListener):
) )
print(f"🚀 Crew '{event.crew_name}' started test") print(f"🚀 Crew '{event.crew_name}' started test")
@event_bus.on(CrewTestCompleted) @event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompleted): def on_crew_test_completed(source, event: CrewTestCompletedEvent):
print(f"👍 Crew '{event.crew_name}' completed test") print(f"👍 Crew '{event.crew_name}' completed test")
@event_bus.on(TaskStarted) @event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStarted): def on_task_started(source, event: TaskStartedEvent):
print(f"📋 Task started: {event.task.description}") print(f"📋 Task started: {event.task.description}")
@event_bus.on(TaskCompleted) @event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompleted): def on_task_completed(source, event: TaskCompletedEvent):
print(f" Output: {event.output}") print(f" Output: {event.output}")
result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output) result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output)
print(f" Evaluation: {result.quality}") print(f" Evaluation: {result.quality}")
@@ -72,29 +69,29 @@ class EventListener(BaseEventListener):
else: else:
print(f" ❌ Failed: {result.suggestions}") print(f" ❌ Failed: {result.suggestions}")
@event_bus.on(AgentExecutionStarted) @event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStarted): def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
print(f"🤖 Agent '{event.agent.role}' started task") print(f"🤖 Agent '{event.agent.role}' started task")
@event_bus.on(AgentExecutionCompleted) @event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompleted): def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
print(f"👍 Agent '{event.agent.role}' completed task") print(f"👍 Agent '{event.agent.role}' completed task")
print(f" Output: {event.output}") print(f" Output: {event.output}")
@event_bus.on(FlowStarted) @event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStarted): def on_flow_started(source, event: FlowStartedEvent):
print(f"🤖 Flow Started: '{event.flow_name}'") print(f"🤖 Flow Started: '{event.flow_name}'")
@event_bus.on(FlowFinished) @event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinished): def on_flow_finished(source, event: FlowFinishedEvent):
print(f"👍 Flow Finished: '{event.flow_name}'") print(f"👍 Flow Finished: '{event.flow_name}'")
@event_bus.on(MethodExecutionStarted) @event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStarted): def on_method_execution_started(source, event: MethodExecutionStartedEvent):
print(f"🤖 Flow Method Started: '{event.method_name}'") print(f"🤖 Flow Method Started: '{event.method_name}'")
@event_bus.on(MethodExecutionFinished) @event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinished): def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
print(f"👍 Flow Method Finished: '{event.method_name}'") print(f"👍 Flow Method Finished: '{event.method_name}'")

View File

@@ -1,54 +1,54 @@
from typing import Union from typing import Union
from .agent_events import ( from .agent_events import (
AgentExecutionCompleted, AgentExecutionCompletedEvent,
AgentExecutionError, AgentExecutionErrorEvent,
AgentExecutionStarted, AgentExecutionStartedEvent,
) )
from .crew_events import ( from .crew_events import (
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffFailed, CrewKickoffFailedEvent,
CrewKickoffStarted, CrewKickoffStartedEvent,
CrewTestCompleted, CrewTestCompletedEvent,
CrewTestFailed, CrewTestFailedEvent,
CrewTestStarted, CrewTestStartedEvent,
CrewTrainCompleted, CrewTrainCompletedEvent,
CrewTrainFailed, CrewTrainFailedEvent,
CrewTrainStarted, CrewTrainStartedEvent,
) )
from .flow_events import ( from .flow_events import (
FlowFinished, FlowFinishedEvent,
FlowStarted, FlowStartedEvent,
MethodExecutionFinished, MethodExecutionFinishedEvent,
MethodExecutionStarted, MethodExecutionStartedEvent,
) )
from .task_events import ( from .task_events import (
TaskCompleted, TaskCompletedEvent,
TaskFailed, TaskFailedEvent,
TaskStarted, TaskStartedEvent,
) )
from .tool_usage_events import ToolUsageError, ToolUsageFinished from .tool_usage_events import ToolUsageErrorEvent, ToolUsageFinishedEvent
EventTypes = Union[ EventTypes = Union[
CrewKickoffStarted, CrewKickoffStartedEvent,
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffFailed, CrewKickoffFailedEvent,
CrewTestStarted, CrewTestStartedEvent,
CrewTestCompleted, CrewTestCompletedEvent,
CrewTestFailed, CrewTestFailedEvent,
CrewTrainStarted, CrewTrainStartedEvent,
CrewTrainCompleted, CrewTrainCompletedEvent,
CrewTrainFailed, CrewTrainFailedEvent,
AgentExecutionStarted, AgentExecutionStartedEvent,
AgentExecutionCompleted, AgentExecutionCompletedEvent,
TaskStarted, TaskStartedEvent,
TaskCompleted, TaskCompletedEvent,
TaskFailed, TaskFailedEvent,
FlowStarted, FlowStartedEvent,
FlowFinished, FlowFinishedEvent,
MethodExecutionStarted, MethodExecutionStartedEvent,
MethodExecutionFinished, MethodExecutionFinishedEvent,
AgentExecutionError, AgentExecutionErrorEvent,
ToolUsageFinished, ToolUsageFinishedEvent,
ToolUsageError, ToolUsageErrorEvent,
] ]

View File

@@ -1,53 +0,0 @@
from functools import wraps
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Type,
TypeVar,
TYPE_CHECKING,
)
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from .event_types import EventTypes
T = TypeVar("T")
EVT = TypeVar("EVT", bound=BaseModel)
class Emitter(Generic[T, EVT]):
_listeners: Dict[Type[EVT], List[Callable]] = {}
def on(self, event_type: Type[EVT]):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
self._listeners.setdefault(event_type, []).append(wrapper)
return wrapper
return decorator
def emit(self, source: T, event: EVT) -> None:
event_type = type(event)
for func in self._listeners.get(event_type, []):
func(source, event)
# Global event emitter instance
default_emitter = Emitter[Any, BaseModel]()
def emit(source: Any, event: BaseModel) -> None:
"""Emit an event to all registered listeners"""
default_emitter.emit(source, event)
def on(event_type: Type[EventTypes]) -> Callable:
"""Register a listener for a specific event type"""
return default_emitter.on(event_type)

View File

@@ -1,32 +1,46 @@
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
from .crew_events import CrewEvent from .crew_events import CrewEvent
class FlowStarted(CrewEvent): class FlowEvent(CrewEvent):
"""Base class for all flow events"""
type: str
flow_name: str
class FlowStartedEvent(FlowEvent):
"""Event emitted when a flow starts execution""" """Event emitted when a flow starts execution"""
flow_name: str flow_name: str
inputs: Optional[Dict[str, Any]] = None
type: str = "flow_started" type: str = "flow_started"
class MethodExecutionStarted(CrewEvent): class MethodExecutionStartedEvent(FlowEvent):
"""Event emitted when a flow method starts execution""" """Event emitted when a flow method starts execution"""
flow_name: str flow_name: str
method_name: str method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
type: str = "method_execution_started" type: str = "method_execution_started"
class MethodExecutionFinished(CrewEvent): class MethodExecutionFinishedEvent(FlowEvent):
"""Event emitted when a flow method completes execution""" """Event emitted when a flow method completes execution"""
flow_name: str flow_name: str
method_name: str method_name: str
result: Any = None
state: Union[Dict[str, Any], BaseModel]
type: str = "method_execution_finished" type: str = "method_execution_finished"
class FlowFinished(CrewEvent): class FlowFinishedEvent(FlowEvent):
"""Event emitted when a flow completes execution""" """Event emitted when a flow completes execution"""
flow_name: str flow_name: str

View File

@@ -3,7 +3,7 @@ from typing import Any
from crewai.utilities.events.crew_events import CrewEvent from crewai.utilities.events.crew_events import CrewEvent
class TaskStarted(CrewEvent): class TaskStartedEvent(CrewEvent):
"""Event emitted when a task starts""" """Event emitted when a task starts"""
task: Any task: Any
@@ -12,7 +12,7 @@ class TaskStarted(CrewEvent):
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class TaskCompleted(CrewEvent): class TaskCompletedEvent(CrewEvent):
"""Event emitted when a task completes""" """Event emitted when a task completes"""
task: Any task: Any
@@ -22,7 +22,7 @@ class TaskCompleted(CrewEvent):
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class TaskFailed(CrewEvent): class TaskFailedEvent(CrewEvent):
"""Event emitted when a task fails""" """Event emitted when a task fails"""
task: Any task: Any

View File

@@ -1,4 +1,4 @@
from crewai.utilities.events.agent_events import AgentExecutionStarted from crewai.utilities.events.agent_events import AgentExecutionStartedEvent
from crewai.utilities.events.base_event_listener import BaseEventListener from crewai.utilities.events.base_event_listener import BaseEventListener
try: try:
@@ -16,8 +16,8 @@ class AgentOpsListener(BaseEventListener):
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
if AGENTOPS_INSTALLED: if AGENTOPS_INSTALLED:
@event_bus.on(AgentExecutionStarted) @event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStarted): def on_agent_started(source, event: AgentExecutionStartedEvent):
print("AGENTOPS WORKS !!!", event.agent) print("AGENTOPS WORKS !!!", event.agent)

View File

@@ -18,7 +18,7 @@ class ToolUsageEvent(CrewEvent):
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class ToolUsageFinished(ToolUsageEvent): class ToolUsageFinishedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is completed""" """Event emitted when a tool execution is completed"""
started_at: datetime started_at: datetime
@@ -27,7 +27,7 @@ class ToolUsageFinished(ToolUsageEvent):
type: str = "tool_usage_finished" type: str = "tool_usage_finished"
class ToolUsageError(ToolUsageEvent): class ToolUsageErrorEvent(ToolUsageEvent):
"""Event emitted when a tool execution encounters an error""" """Event emitted when a tool execution encounters an error"""
error: str error: str

View File

@@ -11,24 +11,28 @@ from crewai.flow.flow import Flow, listen, start
from crewai.task import Task from crewai.task import Task
from crewai.tools.base_tool import BaseTool from crewai.tools.base_tool import BaseTool
from crewai.utilities.events.agent_events import ( from crewai.utilities.events.agent_events import (
AgentExecutionCompleted, AgentExecutionCompletedEvent,
AgentExecutionError, AgentExecutionErrorEvent,
AgentExecutionStarted, AgentExecutionStartedEvent,
) )
from crewai.utilities.events.crew_events import ( from crewai.utilities.events.crew_events import (
CrewKickoffCompleted, CrewKickoffCompletedEvent,
CrewKickoffFailed, CrewKickoffFailedEvent,
CrewKickoffStarted, CrewKickoffStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.event_types import ToolUsageFinished from crewai.utilities.events.event_types import ToolUsageFinishedEvent
from crewai.utilities.events.flow_events import ( from crewai.utilities.events.flow_events import (
FlowFinished, FlowFinishedEvent,
FlowStarted, FlowStartedEvent,
MethodExecutionStarted, MethodExecutionStartedEvent,
) )
from crewai.utilities.events.task_events import TaskCompleted, TaskFailed, TaskStarted from crewai.utilities.events.task_events import (
from crewai.utilities.events.tool_usage_events import ToolUsageError TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.events.tool_usage_events import ToolUsageErrorEvent
base_agent = Agent( base_agent = Agent(
role="base_agent", role="base_agent",
@@ -50,7 +54,7 @@ def test_crew_emits_start_kickoff_event():
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
@event_bus.on(CrewKickoffStarted) @event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event): def handle_crew_start(source, event):
received_events.append(event) received_events.append(event)
@@ -68,7 +72,7 @@ def test_crew_emits_start_kickoff_event():
def test_crew_emits_end_kickoff_event(): def test_crew_emits_end_kickoff_event():
received_events = [] received_events = []
@event_bus.on(CrewKickoffCompleted) @event_bus.on(CrewKickoffCompletedEvent)
def handle_crew_end(source, event): def handle_crew_end(source, event):
received_events.append(event) received_events.append(event)
@@ -88,7 +92,7 @@ def test_crew_emits_kickoff_failed_event():
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
@event_bus.on(CrewKickoffFailed) @event_bus.on(CrewKickoffFailedEvent)
def handle_crew_failed(source, event): def handle_crew_failed(source, event):
received_events.append(event) received_events.append(event)
@@ -111,7 +115,7 @@ def test_crew_emits_kickoff_failed_event():
def test_crew_emits_start_task_event(): def test_crew_emits_start_task_event():
received_events = [] received_events = []
@event_bus.on(TaskStarted) @event_bus.on(TaskStartedEvent)
def handle_task_start(source, event): def handle_task_start(source, event):
received_events.append(event) received_events.append(event)
@@ -128,7 +132,7 @@ def test_crew_emits_start_task_event():
def test_crew_emits_end_task_event(): def test_crew_emits_end_task_event():
received_events = [] received_events = []
@event_bus.on(TaskCompleted) @event_bus.on(TaskCompletedEvent)
def handle_task_end(source, event): def handle_task_end(source, event):
received_events.append(event) received_events.append(event)
@@ -145,7 +149,7 @@ def test_crew_emits_end_task_event():
def test_task_emits_failed_event_on_execution_error(): def test_task_emits_failed_event_on_execution_error():
received_events = [] received_events = []
@event_bus.on(TaskFailed) @event_bus.on(TaskFailedEvent)
def handle_task_failed(source, event): def handle_task_failed(source, event):
received_events.append(event) received_events.append(event)
@@ -171,11 +175,11 @@ def test_task_emits_failed_event_on_execution_error():
def test_agent_emits_execution_started_and_completed_events(): def test_agent_emits_execution_started_and_completed_events():
received_events = [] received_events = []
@event_bus.on(AgentExecutionStarted) @event_bus.on(AgentExecutionStartedEvent)
def handle_agent_start(source, event): def handle_agent_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(AgentExecutionCompleted) @event_bus.on(AgentExecutionCompletedEvent)
def handle_agent_completed(source, event): def handle_agent_completed(source, event):
received_events.append(event) received_events.append(event)
@@ -205,7 +209,7 @@ def test_agent_emits_execution_started_and_completed_events():
def test_agent_emits_execution_error_event(): def test_agent_emits_execution_error_event():
received_events = [] received_events = []
@event_bus.on(AgentExecutionError) @event_bus.on(AgentExecutionErrorEvent)
def handle_agent_start(source, event): def handle_agent_start(source, event):
received_events.append(event) received_events.append(event)
@@ -243,7 +247,7 @@ class SayHiTool(BaseTool):
def test_tools_emits_finished_events(): def test_tools_emits_finished_events():
received_events = [] received_events = []
@event_bus.on(ToolUsageFinished) @event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)
@@ -274,7 +278,7 @@ def test_tools_emits_finished_events():
def test_tools_emits_error_events(): def test_tools_emits_error_events():
received_events = [] received_events = []
@event_bus.on(ToolUsageError) @event_bus.on(ToolUsageErrorEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)
@@ -321,7 +325,7 @@ def test_flow_emits_start_event():
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
@event_bus.on(FlowStarted) @event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event): def handle_flow_start(source, event):
received_events.append(event) received_events.append(event)
@@ -343,7 +347,7 @@ def test_flow_emits_finish_event():
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
@event_bus.on(FlowFinished) @event_bus.on(FlowFinishedEvent)
def handle_flow_finish(source, event): def handle_flow_finish(source, event):
received_events.append(event) received_events.append(event)
@@ -367,7 +371,7 @@ def test_flow_emits_method_execution_started_event():
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
@event_bus.on(MethodExecutionStarted) @event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event): def handle_method_start(source, event):
print("event in method name", event.method_name) print("event in method name", event.method_name)
received_events.append(event) received_events.append(event)
@@ -399,7 +403,7 @@ def test_register_handler_adds_new_handler():
received_events.append(event) received_events.append(event)
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStarted, custom_handler) event_bus.register_handler(CrewKickoffStartedEvent, custom_handler)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff() crew.kickoff()
@@ -421,8 +425,8 @@ def test_multiple_handlers_for_same_event():
received_events_2.append(event) received_events_2.append(event)
with event_bus.scoped_handlers(): with event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStarted, handler_1) event_bus.register_handler(CrewKickoffStartedEvent, handler_1)
event_bus.register_handler(CrewKickoffStarted, handler_2) event_bus.register_handler(CrewKickoffStartedEvent, handler_2)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff() crew.kickoff()