Rename event_bus to crewai_event_bus for improved clarity and specificity

- Replace all references to `event_bus` with `crewai_event_bus`
- Update import statements across multiple files
- Remove the old `event_bus.py` file
- Maintain existing event handling functionality
This commit is contained in:
Lorenze Jay
2025-02-18 08:36:21 -08:00
parent 1603a1d9ac
commit 64569ce130
17 changed files with 182 additions and 296 deletions

View File

@@ -23,7 +23,7 @@ from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent, AgentExecutionCompletedEvent,
AgentExecutionErrorEvent, AgentExecutionErrorEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.llm_utils import create_llm from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.training_handler import CrewTrainingHandler
@@ -240,7 +240,7 @@ class Agent(BaseAgent):
} }
)["output"] )["output"]
except Exception as e: except Exception as e:
event_bus.emit( crewai_event_bus.emit(
self, self,
event=AgentExecutionErrorEvent( event=AgentExecutionErrorEvent(
agent=self, agent=self,
@@ -265,7 +265,7 @@ class Agent(BaseAgent):
for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable)
if tool_result.get("result_as_answer", False): if tool_result.get("result_as_answer", False):
result = tool_result["result"] result = tool_result["result"]
event_bus.emit( crewai_event_bus.emit(
self, self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result), event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
) )

View File

@@ -22,7 +22,7 @@ from crewai.utilities.events import (
AgentExecutionErrorEvent, AgentExecutionErrorEvent,
AgentExecutionStartedEvent, AgentExecutionStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import ( from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException, LLMContextLengthExceededException,
) )
@@ -91,7 +91,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if self.agent and self.task: if self.agent and self.task:
event_bus.emit( crewai_event_bus.emit(
self, self,
event=AgentExecutionStartedEvent( event=AgentExecutionStartedEvent(
agent=self.agent, agent=self.agent,
@@ -193,7 +193,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_unknown_error(self, exception: Exception) -> None: def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user.""" """Handle unknown errors by informing the user."""
if self.agent: if self.agent:
event_bus.emit( crewai_event_bus.emit(
self, self,
event=AgentExecutionErrorEvent( event=AgentExecutionErrorEvent(
agent=self.agent, task=self.task, error=str(exception) agent=self.agent, task=self.task, error=str(exception)

View File

@@ -54,7 +54,7 @@ from crewai.utilities.events.crew_events import (
CrewTrainFailedEvent, CrewTrainFailedEvent,
CrewTrainStartedEvent, CrewTrainStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.formatter import ( from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks, aggregate_raw_outputs_from_tasks,
@@ -221,10 +221,6 @@ class Crew(BaseModel):
default=None, default=None,
description="Knowledge for the crew.", description="Knowledge for the crew.",
) )
listen_to_events: Optional[bool] = Field(
default=True,
description="Whether the crew should listen to events.",
)
@field_validator("id", mode="before") @field_validator("id", mode="before")
@classmethod @classmethod
@@ -518,7 +514,7 @@ class Crew(BaseModel):
) -> None: ) -> None:
"""Trains the crew for a given number of iterations.""" """Trains the crew for a given number of iterations."""
try: try:
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTrainStartedEvent( CrewTrainStartedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
@@ -545,7 +541,7 @@ class Crew(BaseModel):
agent_id=str(agent.role), trained_data=result.model_dump() agent_id=str(agent.role), trained_data=result.model_dump()
) )
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTrainCompletedEvent( CrewTrainCompletedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
@@ -554,11 +550,9 @@ class Crew(BaseModel):
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTrainFailedEvent( CrewTrainFailedEvent(error=str(e), crew_name=self.name or "crew"),
error=str(e), crew_name=self.name or "crew"
),
) )
self._logger.log("error", f"Training failed: {e}", color="red") self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear() CrewTrainingHandler(TRAINING_DATA_FILE).clear()
@@ -575,7 +569,7 @@ class Crew(BaseModel):
inputs = {} inputs = {}
inputs = before_callback(inputs) inputs = before_callback(inputs)
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs), CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs),
) )
@@ -629,11 +623,9 @@ class Crew(BaseModel):
self.usage_metrics.add_usage_metrics(metric) self.usage_metrics.add_usage_metrics(metric)
return result return result
except Exception as e: except Exception as e:
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewKickoffFailedEvent( CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"),
error=str(e), crew_name=self.name or "crew"
),
) )
raise raise
@@ -983,7 +975,7 @@ class Crew(BaseModel):
final_string_output = final_task_output.raw final_string_output = final_task_output.raw
self._finish_execution(final_string_output) self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics() token_usage = self.calculate_usage_metrics()
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewKickoffCompletedEvent( CrewKickoffCompletedEvent(
crew_name=self.name or "crew", output=final_task_output crew_name=self.name or "crew", output=final_task_output
@@ -1196,7 +1188,7 @@ class Crew(BaseModel):
) -> None: ) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try: try:
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTestStartedEvent( CrewTestStartedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
@@ -1214,18 +1206,16 @@ class Crew(BaseModel):
evaluator.print_crew_evaluation_result() evaluator.print_crew_evaluation_result()
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTestCompletedEvent( CrewTestCompletedEvent(
crew_name=self.name or "crew", crew_name=self.name or "crew",
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit( crewai_event_bus.emit(
self, self,
CrewTestFailedEvent( CrewTestFailedEvent(error=str(e), crew_name=self.name or "crew"),
error=str(e), crew_name=self.name or "crew"
),
) )
raise raise

View File

@@ -30,7 +30,7 @@ from crewai.utilities.events import (
MethodExecutionFinishedEvent, MethodExecutionFinishedEvent,
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import MethodExecutionFailedEvent from crewai.utilities.events.flow_events import MethodExecutionFailedEvent
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
@@ -469,7 +469,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if kwargs: if kwargs:
self._initialize_state(kwargs) self._initialize_state(kwargs)
event_bus.emit( crewai_event_bus.emit(
self, self,
FlowCreatedEvent( FlowCreatedEvent(
type="flow_created", type="flow_created",
@@ -744,7 +744,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(filtered_inputs) self._initialize_state(filtered_inputs)
# Start flow execution # Start flow execution
event_bus.emit( crewai_event_bus.emit(
self, self,
FlowStartedEvent( FlowStartedEvent(
type="flow_started", type="flow_started",
@@ -773,7 +773,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
final_output = self._method_outputs[-1] if self._method_outputs else None final_output = self._method_outputs[-1] if self._method_outputs else None
event_bus.emit( crewai_event_bus.emit(
self, self,
FlowFinishedEvent( FlowFinishedEvent(
type="flow_finished", type="flow_finished",
@@ -810,8 +810,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
self, method_name: str, method: Callable, *args: Any, **kwargs: Any self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any: ) -> Any:
try: try:
dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (kwargs or {}) dumped_params = {f"_{i}": arg for i, arg in enumerate(args)} | (
event_bus.emit( kwargs or {}
)
crewai_event_bus.emit(
self, self,
MethodExecutionStartedEvent( MethodExecutionStartedEvent(
type="method_execution_started", type="method_execution_started",
@@ -822,19 +824,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
), ),
) )
result = ( result = (
await method(*args, **kwargs) await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method) if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs) else method(*args, **kwargs)
) )
self._method_outputs.append(result) self._method_outputs.append(result)
self._method_execution_counts[method_name] = ( self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1 self._method_execution_counts.get(method_name, 0) + 1
) )
event_bus.emit( crewai_event_bus.emit(
self, self,
MethodExecutionFinishedEvent( MethodExecutionFinishedEvent(
type="method_execution_finished", type="method_execution_finished",
@@ -847,7 +848,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result return result
except Exception as e: except Exception as e:
event_bus.emit( crewai_event_bus.emit(
self, self,
MethodExecutionFailedEvent( MethodExecutionFailedEvent(
type="method_execution_failed", type="method_execution_failed",
@@ -1043,7 +1044,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif level == "warning": elif level == "warning":
logger.warning(message) logger.warning(message)
def plot(self, filename: str = "crewai_flow") -> None: def plot(self, filename: str = "crewai_flow") -> None:
Telemetry().flow_plotting_span( Telemetry().flow_plotting_span(
self.__class__.__name__, list(self._methods.keys()) self.__class__.__name__, list(self._methods.keys())
) )

View File

@@ -76,7 +76,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
"context": fetched["documents"][0][i], # type: ignore "context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore "score": fetched["distances"][0][i], # type: ignore
} }
if result["score"] >= score_threshold: # type: ignore if result["score"] >= score_threshold:
results.append(result) results.append(result)
return results return results
else: else:

View File

@@ -43,7 +43,7 @@ from crewai.utilities.events import (
TaskFailedEvent, TaskFailedEvent,
TaskStartedEvent, TaskStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
@@ -364,7 +364,7 @@ class Task(BaseModel):
tools = tools or self.tools or [] tools = tools or self.tools or []
self.processed_by_agents.add(agent.role) self.processed_by_agents.add(agent.role)
event_bus.emit(self, TaskStartedEvent(context=context)) crewai_event_bus.emit(self, TaskStartedEvent(context=context))
result = agent.execute_task( result = agent.execute_task(
task=self, task=self,
context=context, context=context,
@@ -440,11 +440,11 @@ class Task(BaseModel):
else result else result
) )
self._save_file(content) self._save_file(content)
event_bus.emit(self, TaskCompletedEvent(output=task_output)) crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output))
return task_output return task_output
except Exception as e: except Exception as e:
self.end_time = datetime.datetime.now() self.end_time = datetime.datetime.now()
event_bus.emit(self, TaskFailedEvent(error=str(e))) crewai_event_bus.emit(self, TaskFailedEvent(error=str(e)))
raise e # Re-raise the exception after emitting the event raise e # Re-raise the exception after emitting the event
def prompt(self) -> str: def prompt(self) -> str:

View File

@@ -22,7 +22,7 @@ from crewai.utilities.events import (
ToolUsageFinishedEvent, ToolUsageFinishedEvent,
ToolUsageStartedEvent, ToolUsageStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
OPENAI_BIGGER_MODELS = [ OPENAI_BIGGER_MODELS = [
"gpt-4", "gpt-4",
@@ -138,8 +138,8 @@ class ToolUsage:
tool: Any, tool: Any,
calling: Union[ToolCalling, InstructorToolCalling], calling: Union[ToolCalling, InstructorToolCalling],
) -> str: # TODO: Fix this return type ) -> str: # TODO: Fix this return type
event_data = self._prepare_event_data(tool, calling) # type: ignore event_data = self._prepare_event_data(tool, calling) # type: ignore
event_bus.emit(self, ToolUsageStartedEvent(**event_data)) crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data))
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None) if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try: try:
result = self._i18n.errors("task_repeated_usage").format( result = self._i18n.errors("task_repeated_usage").format(
@@ -458,7 +458,7 @@ class ToolUsage:
def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None: def on_tool_error(self, tool: Any, tool_calling: ToolCalling, e: Exception) -> None:
event_data = self._prepare_event_data(tool, tool_calling) event_data = self._prepare_event_data(tool, tool_calling)
event_bus.emit( crewai_event_bus.emit(
self, event=ToolUsageErrorEvent(**{**event_data, "error": e}) self, event=ToolUsageErrorEvent(**{**event_data, "error": e})
) )
@@ -474,7 +474,7 @@ class ToolUsage:
"from_cache": from_cache, "from_cache": from_cache,
} }
) )
event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data)) crewai_event_bus.emit(self, event=ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict: def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
return { return {

View File

@@ -3,7 +3,7 @@ from typing import List
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.utilities import Converter from crewai.utilities import Converter
from crewai.utilities.events import TaskEvaluationEvent, event_bus from crewai.utilities.events import TaskEvaluationEvent, crewai_event_bus
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -44,7 +44,9 @@ class TaskEvaluator:
self.original_agent = original_agent self.original_agent = original_agent
def evaluate(self, task, output) -> TaskEvaluation: def evaluate(self, task, output) -> TaskEvaluation:
event_bus.emit(self, TaskEvaluationEvent(evaluation_type="task_evaluation")) crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="task_evaluation")
)
evaluation_query = ( evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n" f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"
f"Task Description:\n{task.description}\n\n" f"Task Description:\n{task.description}\n\n"
@@ -81,7 +83,9 @@ class TaskEvaluator:
- training_data (dict): The training data to be evaluated. - training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent. - agent_id (str): The ID of the agent.
""" """
event_bus.emit(self, TaskEvaluationEvent(evaluation_type="training_data_evaluation")) crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="training_data_evaluation")
)
output_training_data = training_data[agent_id] output_training_data = training_data[agent_id]
final_aggregated_data = "" final_aggregated_data = ""

View File

@@ -23,7 +23,7 @@ from .flow_events import (
MethodExecutionFinishedEvent, MethodExecutionFinishedEvent,
MethodExecutionFailedEvent, MethodExecutionFailedEvent,
) )
from .event_bus import EventBus, event_bus from .crewai_event_bus import CrewAIEventsBus, crewai_event_bus
from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent from .tool_usage_events import ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent
# events # events

View File

@@ -1,14 +1,14 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from logging import Logger from logging import Logger
from crewai.utilities.events.event_bus import EventBus, event_bus from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus, crewai_event_bus
class BaseEventListener(ABC): class BaseEventListener(ABC):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.setup_listeners(event_bus) self.setup_listeners(crewai_event_bus)
@abstractmethod @abstractmethod
def setup_listeners(self, event_bus: EventBus): def setup_listeners(self, crewai_event_bus: CrewAIEventsBus):
pass pass

View File

@@ -1,114 +0,0 @@
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.utilities.events.crew_events import CrewEvent
EventT = TypeVar("EventT", bound=CrewEvent)
class EventBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None: # prevent race condition
cls._instance = super(EventBus, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("event_bus")
self._handlers: Dict[
Type[CrewEvent], List[Callable[[Any, CrewEvent], None]]
] = {}
def on(
self, event_type: Type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
Usage:
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
"""
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, CrewEvent], None], handler)
)
return handler
return decorator
def emit(self, source: Any, event: CrewEvent) -> None:
"""
Emit an event to all registered handlers
Args:
source: The object emitting the event
event: The event instance to emit
"""
event_type = type(event)
if event_type in self._handlers:
for handler in self._handlers[event_type]:
handler(source, event)
self._signal.send(source, event=event)
def clear_handlers(self) -> None:
"""Clear all registered event handlers - useful for testing"""
self._handlers.clear()
def register_handler(
self, event_type: Type[EventT], handler: Callable[[Any, EventT], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, CrewEvent], None], handler)
)
@contextmanager
def scoped_handlers(self):
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Usage:
with event_bus.scoped_handlers():
@event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
# Do stuff...
# Handlers are cleared after the context
"""
previous_handlers = self._handlers.copy()
self._handlers.clear()
try:
yield
finally:
self._handlers = previous_handlers
# Global instance
event_bus = EventBus()

View File

@@ -43,8 +43,8 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer() self._telemetry.set_tracer()
# Crew Events: kickoff, test, train # Crew Events: kickoff, test, train
def setup_listeners(self, event_bus): def setup_listeners(self, crewai_event_bus):
@event_bus.on(CrewKickoffStartedEvent) @crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent): def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log( self.logger.log(
f"🚀 Crew '{event.crew_name}' started", f"🚀 Crew '{event.crew_name}' started",
@@ -53,7 +53,7 @@ class EventListener(BaseEventListener):
) )
self._telemetry.crew_execution_span(source, event.inputs) self._telemetry.crew_execution_span(source, event.inputs)
@event_bus.on(CrewKickoffCompletedEvent) @crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent): def on_crew_completed(source, event: CrewKickoffCompletedEvent):
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
@@ -63,7 +63,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewKickoffFailedEvent) @crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent): def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log( self.logger.log(
f"❌ Crew '{event.crew_name}' failed", f"❌ Crew '{event.crew_name}' failed",
@@ -71,7 +71,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTestStartedEvent) @crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent): def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy() cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span( cloned_crew._telemetry.test_execution_span(
@@ -86,7 +86,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTestCompletedEvent) @crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent): def on_crew_test_completed(source, event: CrewTestCompletedEvent):
self.logger.log( self.logger.log(
f"✅ Crew '{event.crew_name}' completed test", f"✅ Crew '{event.crew_name}' completed test",
@@ -94,7 +94,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTestFailedEvent) @crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent): def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.logger.log( self.logger.log(
f"❌ Crew '{event.crew_name}' failed test", f"❌ Crew '{event.crew_name}' failed test",
@@ -102,7 +102,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTrainStartedEvent) @crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent): def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.logger.log( self.logger.log(
f"📋 Crew '{event.crew_name}' started train", f"📋 Crew '{event.crew_name}' started train",
@@ -110,7 +110,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTrainCompletedEvent) @crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent): def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.logger.log( self.logger.log(
f"✅ Crew '{event.crew_name}' completed train", f"✅ Crew '{event.crew_name}' completed train",
@@ -118,15 +118,15 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(CrewTrainFailedEvent) @crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent): def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.logger.log( self.logger.log(
f"❌ Crew '{event.crew_name}' failed train", f"❌ Crew '{event.crew_name}' failed train",
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
@event_bus.on(TaskStartedEvent) @crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent): def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started( source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source crew=source.agent.crew, task=source
@@ -136,25 +136,27 @@ class EventListener(BaseEventListener):
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
@crewai_event_bus.on(TaskCompletedEvent)
@event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event): def on_task_completed(source, event):
if source._execution_span: if source._execution_span:
self._telemetry.task_ended(source._execution_span, source, source.agent.crew) self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
self.logger.log( self.logger.log(
f"✅ Task completed: {source.description}", f"✅ Task completed: {source.description}",
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
source._execution_span = None source._execution_span = None
@event_bus.on(TaskFailedEvent) @crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent): def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span: if source._execution_span:
if source.agent and source.agent.crew: if source.agent and source.agent.crew:
self._telemetry.task_ended(source._execution_span, source, source.agent.crew) self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
source._execution_span = None source._execution_span = None
self.logger.log( self.logger.log(
f"❌ Task failed: {source.description}", f"❌ Task failed: {source.description}",
@@ -162,8 +164,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@crewai_event_bus.on(AgentExecutionStartedEvent)
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent): def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.logger.log( self.logger.log(
f"🤖 Agent '{event.agent.role}' started task", f"🤖 Agent '{event.agent.role}' started task",
@@ -171,7 +172,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(AgentExecutionCompletedEvent) @crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.logger.log( self.logger.log(
f"✅ Agent '{event.agent.role}' completed task", f"✅ Agent '{event.agent.role}' completed task",
@@ -180,8 +181,8 @@ class EventListener(BaseEventListener):
) )
# Flow Events # Flow Events
@event_bus.on(FlowCreatedEvent) @crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent): def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__) self._telemetry.flow_creation_span(self.__class__.__name__)
self.logger.log( self.logger.log(
@@ -189,8 +190,8 @@ class EventListener(BaseEventListener):
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
@event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent): def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span( self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys()) source.__class__.__name__, list(source._methods.keys())
@@ -201,7 +202,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent): def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log( self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'", f"👍 Flow Finished: '{event.flow_name}'",
@@ -209,7 +210,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent): def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.logger.log( self.logger.log(
f"🤖 Flow Method Started: '{event.method_name}'", f"🤖 Flow Method Started: '{event.method_name}'",
@@ -217,7 +218,7 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@event_bus.on(MethodExecutionFailedEvent) @crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent): def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.logger.log( self.logger.log(
f"❌ Flow Method Failed: '{event.method_name}'", f"❌ Flow Method Failed: '{event.method_name}'",
@@ -225,33 +226,32 @@ class EventListener(BaseEventListener):
color=self.color, color=self.color,
) )
@crewai_event_bus.on(MethodExecutionFinishedEvent)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.logger.log( self.logger.log(
f"👍 Flow Method Finished: '{event.method_name}'", f"👍 Flow Method Finished: '{event.method_name}'",
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
# Tool Usage Events # Tool Usage Events
@event_bus.on(ToolUsageStartedEvent) @crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent): def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.logger.log( self.logger.log(
f"🤖 Tool Usage Started: '{event.tool_name}'", f"🤖 Tool Usage Started: '{event.tool_name}'",
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
@event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent): def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.logger.log( self.logger.log(
f"✅ Tool Usage Finished: '{event.tool_name}'", f"✅ Tool Usage Finished: '{event.tool_name}'",
event.timestamp, event.timestamp,
color=self.color, color=self.color,
) )
@event_bus.on(ToolUsageErrorEvent) @crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent): def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.logger.log( self.logger.log(
f"❌ Tool Usage Error: '{event.tool_name}'", f"❌ Tool Usage Error: '{event.tool_name}'",

View File

@@ -11,54 +11,57 @@ from crewai.utilities.events.task_events import TaskEvaluationEvent
try: try:
import agentops import agentops
from agentops import Client
AGENTOPS_INSTALLED = True AGENTOPS_INSTALLED = True
except ImportError: except ImportError:
AGENTOPS_INSTALLED = False AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener): class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None session: Optional["agentops.Session"] = None
def __init__(self): def __init__(self):
super().__init__() super().__init__()
def setup_listeners(self, event_bus): def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED: if not AGENTOPS_INSTALLED:
return return
@event_bus.on(CrewKickoffStartedEvent) @crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent): def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init() self.session = agentops.init()
for agent in source.agents: for agent in source.agents:
if self.session: if self.session:
self.session.create_agent( self.session.create_agent(
name=agent.role, name=agent.role,
agent_id=str(agent.id), agent_id=str(agent.id),
) )
@event_bus.on(CrewKickoffCompletedEvent) @crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent): def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session: if self.session:
self.session.end_session( self.session.end_session(
end_state="Success", end_state="Success",
end_state_reason="Finished Execution", end_state_reason="Finished Execution",
) )
@event_bus.on(ToolUsageStartedEvent) @crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent): def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name) self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session: if self.session:
self.session.record(self.tool_event) self.session.record(self.tool_event)
@event_bus.on(ToolUsageErrorEvent) @crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent): def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event) agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
@event_bus.on(TaskEvaluationEvent) @crewai_event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent): def on_task_evaluation(source, event: TaskEvaluationEvent):
if self.session: if self.session:
self.session.create_agent(name="Task Evaluator", agent_id=str(source.original_agent.id)) self.session.create_agent(
name="Task Evaluator", agent_id=str(source.original_agent.id)
)
agentops_listener = AgentOpsListener() agentops_listener = AgentOpsListener()

View File

@@ -17,7 +17,7 @@ from crewai.tools import tool
from crewai.tools.tool_calling import InstructorToolCalling from crewai.tools.tool_calling import InstructorToolCalling
from crewai.tools.tool_usage import ToolUsage from crewai.tools.tool_usage import ToolUsage
from crewai.utilities import RPMController from crewai.utilities import RPMController
from crewai.utilities.events import event_bus from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent from crewai.utilities.events.tool_usage_events import ToolUsageFinishedEvent
@@ -156,11 +156,10 @@ def test_agent_execution_with_tools():
) )
received_events = [] received_events = []
@event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)
# with patch.object(EventBus, "emit") as emit:
output = agent.execute_task(task) output = agent.execute_task(task)
assert output == "The result of the multiplication is 12." assert output == "The result of the multiplication is 12."
@@ -256,7 +255,7 @@ def test_cache_hitting():
} }
received_events = [] received_events = []
@event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)

View File

@@ -25,7 +25,7 @@ from crewai.utilities import Logger
from crewai.utilities.events import ( from crewai.utilities.events import (
CrewTrainCompletedEvent, CrewTrainCompletedEvent,
CrewTrainStartedEvent, CrewTrainStartedEvent,
event_bus, crewai_event_bus,
) )
from crewai.utilities.events.crew_events import ( from crewai.utilities.events.crew_events import (
CrewTestCompletedEvent, CrewTestCompletedEvent,
@@ -860,7 +860,7 @@ def test_crew_verbose_output(capsys):
"[🤖 AGENT 'SENIOR WRITER' STARTED TASK]", "[🤖 AGENT 'SENIOR WRITER' STARTED TASK]",
"[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]", "[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]",
"[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]", "[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]",
"[✅ CREW 'CREW' COMPLETED]" "[✅ CREW 'CREW' COMPLETED]",
] ]
captured = capsys.readouterr() captured = capsys.readouterr()
for log in expected_listener_logs: for log in expected_listener_logs:
@@ -2590,11 +2590,11 @@ def test_crew_train_success(
received_events = [] received_events = []
@event_bus.on(CrewTrainStartedEvent) @crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent): def on_crew_train_started(source, event: CrewTrainStartedEvent):
received_events.append(event) received_events.append(event)
@event_bus.on(CrewTrainCompletedEvent) @crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent): def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
received_events.append(event) received_events.append(event)
@@ -3378,11 +3378,11 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
received_events = [] received_events = []
@event_bus.on(CrewTestStartedEvent) @crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent): def on_crew_test_started(source, event: CrewTestStartedEvent):
received_events.append(event) received_events.append(event)
@event_bus.on(CrewTestCompletedEvent) @crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent): def on_crew_test_completed(source, event: CrewTestCompletedEvent):
received_events.append(event) received_events.append(event)

View File

@@ -12,7 +12,7 @@ from crewai.utilities.events import (
FlowStartedEvent, FlowStartedEvent,
MethodExecutionFinishedEvent, MethodExecutionFinishedEvent,
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
event_bus, crewai_event_bus,
) )
@@ -440,15 +440,15 @@ def test_unstructured_flow_event_emission():
flow = PoemFlow() flow = PoemFlow()
received_events = [] received_events = []
@event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event): def handle_flow_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event): def handle_method_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_end(source, event): def handle_flow_end(source, event):
received_events.append(event) received_events.append(event)
@@ -519,19 +519,19 @@ def test_structured_flow_event_emission():
received_events = [] received_events = []
@event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event): def handle_flow_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event): def handle_method_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(MethodExecutionFinishedEvent) @crewai_event_bus.on(MethodExecutionFinishedEvent)
def handle_method_end(source, event): def handle_method_end(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_end(source, event): def handle_flow_end(source, event):
received_events.append(event) received_events.append(event)
@@ -585,19 +585,19 @@ def test_stateless_flow_event_emission():
flow = StatelessFlow() flow = StatelessFlow()
received_events = [] received_events = []
@event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event): def handle_flow_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event): def handle_method_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(MethodExecutionFinishedEvent) @crewai_event_bus.on(MethodExecutionFinishedEvent)
def handle_method_end(source, event): def handle_method_end(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_end(source, event): def handle_flow_end(source, event):
received_events.append(event) received_events.append(event)

View File

@@ -20,7 +20,7 @@ from crewai.utilities.events.crew_events import (
CrewKickoffFailedEvent, CrewKickoffFailedEvent,
CrewKickoffStartedEvent, CrewKickoffStartedEvent,
) )
from crewai.utilities.events.event_bus import event_bus from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_types import ToolUsageFinishedEvent from crewai.utilities.events.event_types import ToolUsageFinishedEvent
from crewai.utilities.events.flow_events import ( from crewai.utilities.events.flow_events import (
FlowCreatedEvent, FlowCreatedEvent,
@@ -54,9 +54,9 @@ base_task = Task(
def test_crew_emits_start_kickoff_event(): def test_crew_emits_start_kickoff_event():
received_events = [] received_events = []
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
@event_bus.on(CrewKickoffStartedEvent) @crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event): def handle_crew_start(source, event):
received_events.append(event) received_events.append(event)
@@ -74,7 +74,7 @@ def test_crew_emits_start_kickoff_event():
def test_crew_emits_end_kickoff_event(): def test_crew_emits_end_kickoff_event():
received_events = [] received_events = []
@event_bus.on(CrewKickoffCompletedEvent) @crewai_event_bus.on(CrewKickoffCompletedEvent)
def handle_crew_end(source, event): def handle_crew_end(source, event):
received_events.append(event) received_events.append(event)
@@ -92,9 +92,9 @@ def test_crew_emits_end_kickoff_event():
def test_crew_emits_kickoff_failed_event(): def test_crew_emits_kickoff_failed_event():
received_events = [] received_events = []
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
@event_bus.on(CrewKickoffFailedEvent) @crewai_event_bus.on(CrewKickoffFailedEvent)
def handle_crew_failed(source, event): def handle_crew_failed(source, event):
received_events.append(event) received_events.append(event)
@@ -117,7 +117,7 @@ def test_crew_emits_kickoff_failed_event():
def test_crew_emits_start_task_event(): def test_crew_emits_start_task_event():
received_events = [] received_events = []
@event_bus.on(TaskStartedEvent) @crewai_event_bus.on(TaskStartedEvent)
def handle_task_start(source, event): def handle_task_start(source, event):
received_events.append(event) received_events.append(event)
@@ -134,7 +134,7 @@ def test_crew_emits_start_task_event():
def test_crew_emits_end_task_event(): def test_crew_emits_end_task_event():
received_events = [] received_events = []
@event_bus.on(TaskCompletedEvent) @crewai_event_bus.on(TaskCompletedEvent)
def handle_task_end(source, event): def handle_task_end(source, event):
received_events.append(event) received_events.append(event)
@@ -152,25 +152,28 @@ def test_task_emits_failed_event_on_execution_error():
received_events = [] received_events = []
received_sources = [] received_sources = []
@event_bus.on(TaskFailedEvent) @crewai_event_bus.on(TaskFailedEvent)
def handle_task_failed(source, event): def handle_task_failed(source, event):
received_events.append(event) received_events.append(event)
received_sources.append(source) received_sources.append(source)
with patch.object(Task, "_execute_core",) as mock_execute: with patch.object(
Task,
"_execute_core",
) as mock_execute:
error_message = "Simulated task failure" error_message = "Simulated task failure"
mock_execute.side_effect = Exception(error_message) mock_execute.side_effect = Exception(error_message)
agent = Agent( agent = Agent(
role="base_agent", role="base_agent",
goal="Just say hi", goal="Just say hi",
backstory="You are a helpful assistant that just says hi", backstory="You are a helpful assistant that just says hi",
) )
task = Task( task = Task(
description="Just say hi", description="Just say hi",
expected_output="hi", expected_output="hi",
agent=agent, agent=agent,
) )
with pytest.raises(Exception): with pytest.raises(Exception):
agent.execute_task(task=task) agent.execute_task(task=task)
@@ -185,11 +188,11 @@ def test_task_emits_failed_event_on_execution_error():
def test_agent_emits_execution_started_and_completed_events(): def test_agent_emits_execution_started_and_completed_events():
received_events = [] received_events = []
@event_bus.on(AgentExecutionStartedEvent) @crewai_event_bus.on(AgentExecutionStartedEvent)
def handle_agent_start(source, event): def handle_agent_start(source, event):
received_events.append(event) received_events.append(event)
@event_bus.on(AgentExecutionCompletedEvent) @crewai_event_bus.on(AgentExecutionCompletedEvent)
def handle_agent_completed(source, event): def handle_agent_completed(source, event):
received_events.append(event) received_events.append(event)
@@ -219,7 +222,7 @@ def test_agent_emits_execution_started_and_completed_events():
def test_agent_emits_execution_error_event(): def test_agent_emits_execution_error_event():
received_events = [] received_events = []
@event_bus.on(AgentExecutionErrorEvent) @crewai_event_bus.on(AgentExecutionErrorEvent)
def handle_agent_start(source, event): def handle_agent_start(source, event):
received_events.append(event) received_events.append(event)
@@ -257,7 +260,7 @@ class SayHiTool(BaseTool):
def test_tools_emits_finished_events(): def test_tools_emits_finished_events():
received_events = [] received_events = []
@event_bus.on(ToolUsageFinishedEvent) @crewai_event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)
@@ -288,7 +291,7 @@ def test_tools_emits_finished_events():
def test_tools_emits_error_events(): def test_tools_emits_error_events():
received_events = [] received_events = []
@event_bus.on(ToolUsageErrorEvent) @crewai_event_bus.on(ToolUsageErrorEvent)
def handle_tool_end(source, event): def handle_tool_end(source, event):
received_events.append(event) received_events.append(event)
@@ -333,9 +336,9 @@ def test_tools_emits_error_events():
def test_flow_emits_start_event(): def test_flow_emits_start_event():
received_events = [] received_events = []
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
@event_bus.on(FlowStartedEvent) @crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event): def handle_flow_start(source, event):
received_events.append(event) received_events.append(event)
@@ -355,9 +358,9 @@ def test_flow_emits_start_event():
def test_flow_emits_finish_event(): def test_flow_emits_finish_event():
received_events = [] received_events = []
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
@event_bus.on(FlowFinishedEvent) @crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_finish(source, event): def handle_flow_finish(source, event):
received_events.append(event) received_events.append(event)
@@ -379,9 +382,9 @@ def test_flow_emits_finish_event():
def test_flow_emits_method_execution_started_event(): def test_flow_emits_method_execution_started_event():
received_events = [] received_events = []
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
@event_bus.on(MethodExecutionStartedEvent) @crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source, event): def handle_method_start(source, event):
print("event in method name", event.method_name) print("event in method name", event.method_name)
received_events.append(event) received_events.append(event)
@@ -416,8 +419,8 @@ def test_register_handler_adds_new_handler():
def custom_handler(source, event): def custom_handler(source, event):
received_events.append(event) received_events.append(event)
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStartedEvent, custom_handler) crewai_event_bus.register_handler(CrewKickoffStartedEvent, custom_handler)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff() crew.kickoff()
@@ -438,9 +441,9 @@ def test_multiple_handlers_for_same_event():
def handler_2(source, event): def handler_2(source, event):
received_events_2.append(event) received_events_2.append(event)
with event_bus.scoped_handlers(): with crewai_event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStartedEvent, handler_1) crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_1)
event_bus.register_handler(CrewKickoffStartedEvent, handler_2) crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_2)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff() crew.kickoff()
@@ -454,7 +457,7 @@ def test_multiple_handlers_for_same_event():
def test_flow_emits_created_event(): def test_flow_emits_created_event():
received_events = [] received_events = []
@event_bus.on(FlowCreatedEvent) @crewai_event_bus.on(FlowCreatedEvent)
def handle_flow_created(source, event): def handle_flow_created(source, event):
received_events.append(event) received_events.append(event)
@@ -475,7 +478,7 @@ def test_flow_emits_method_execution_failed_event():
received_events = [] received_events = []
error = Exception("Simulated method failure") error = Exception("Simulated method failure")
@event_bus.on(MethodExecutionFailedEvent) @crewai_event_bus.on(MethodExecutionFailedEvent)
def handle_method_failed(source, event): def handle_method_failed(source, event):
received_events.append(event) received_events.append(event)