Enhance event handling for Crew, Task, and Event classes

- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases
This commit is contained in:
Lorenze Jay
2025-02-13 12:01:18 -08:00
parent 62a20426a5
commit 00a98cd5c9
9 changed files with 184 additions and 32 deletions

View File

@@ -560,7 +560,12 @@ class Crew(BaseModel):
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit(self, CrewTrainFailedEvent(error=str(e))) event_bus.emit(
self,
CrewTrainFailedEvent(
error=str(e), crew_name=self.name or "crew"
),
)
self._logger.log("error", f"Training failed: {e}", color="red") self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear() CrewTrainingHandler(TRAINING_DATA_FILE).clear()
CrewTrainingHandler(filename).clear() CrewTrainingHandler(filename).clear()
@@ -630,7 +635,12 @@ class Crew(BaseModel):
self.usage_metrics.add_usage_metrics(metric) self.usage_metrics.add_usage_metrics(metric)
return result return result
except Exception as e: except Exception as e:
event_bus.emit(self, CrewKickoffFailedEvent(error=str(e))) event_bus.emit(
self,
CrewKickoffFailedEvent(
error=str(e), crew_name=self.name or "crew"
),
)
raise raise
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
@@ -1224,7 +1234,12 @@ class Crew(BaseModel):
), ),
) )
except Exception as e: except Exception as e:
event_bus.emit(self, CrewTestFailedEvent(error=str(e))) event_bus.emit(
self,
CrewTestFailedEvent(
error=str(e), crew_name=self.name or "crew"
),
)
raise raise
def __repr__(self): def __repr__(self):

View File

@@ -371,7 +371,7 @@ class Task(BaseModel):
tools = tools or self.tools or [] tools = tools or self.tools or []
self.processed_by_agents.add(agent.role) self.processed_by_agents.add(agent.role)
event_bus.emit(self, TaskStartedEvent(task=self)) event_bus.emit(self, TaskStartedEvent(context=context))
result = agent.execute_task( result = agent.execute_task(
task=self, task=self,
context=context, context=context,
@@ -451,7 +451,7 @@ class Task(BaseModel):
else result else result
) )
self._save_file(content) self._save_file(content)
event_bus.emit(self, TaskCompletedEvent(task=self, output=task_output)) event_bus.emit(self, TaskCompletedEvent(output=task_output))
return task_output return task_output
except Exception as e: except Exception as e:
self.end_time = datetime.datetime.now() self.end_time = datetime.datetime.now()

View File

@@ -2,6 +2,12 @@ from .crew_events import (
CrewKickoffStartedEvent, CrewKickoffStartedEvent,
CrewKickoffCompletedEvent, CrewKickoffCompletedEvent,
CrewKickoffFailedEvent, CrewKickoffFailedEvent,
CrewTrainStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTestStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
) )
from .agent_events import ( from .agent_events import (
AgentExecutionStartedEvent, AgentExecutionStartedEvent,

View File

@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from logging import Logger
from crewai.utilities.events.event_bus import EventBus, event_bus from crewai.utilities.events.event_bus import EventBus, event_bus

View File

@@ -38,6 +38,7 @@ class CrewKickoffFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete execution""" """Event emitted when a crew fails to complete execution"""
error: str error: str
crew_name: Optional[str]
type: str = "crew_kickoff_failed" type: str = "crew_kickoff_failed"
@@ -64,6 +65,7 @@ class CrewTrainFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete training""" """Event emitted when a crew fails to complete training"""
error: str error: str
crew_name: Optional[str]
type: str = "crew_train_failed" type: str = "crew_train_failed"
@@ -88,4 +90,6 @@ class CrewTestFailedEvent(CrewEvent):
"""Event emitted when a crew fails to complete testing""" """Event emitted when a crew fails to complete testing"""
error: str error: str
crew_name: Optional[str]
type: str = "crew_test_failed" type: str = "crew_test_failed"

View File

@@ -1,15 +1,22 @@
from re import S
from pydantic import PrivateAttr from pydantic import PrivateAttr
from crewai.telemetry.telemetry import Telemetry from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.base_event_listener import BaseEventListener from crewai.utilities.events.base_event_listener import BaseEventListener
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import ( from .crew_events import (
CrewKickoffCompletedEvent, CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent, CrewKickoffStartedEvent,
CrewTestCompletedEvent, CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent, CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
) )
from .flow_events import ( from .flow_events import (
FlowFinishedEvent, FlowFinishedEvent,
@@ -22,23 +29,42 @@ from .task_events import TaskCompletedEvent, TaskStartedEvent
class EventListener(BaseEventListener): class EventListener(BaseEventListener):
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry()) _telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True)
color = "bold_blue"
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._telemetry = Telemetry() self._telemetry = Telemetry()
self._telemetry.set_tracer() self._telemetry.set_tracer()
# Crew Events: kickoff, test, train
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
@event_bus.on(CrewKickoffStartedEvent) @event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent): def on_crew_started(source, event: CrewKickoffStartedEvent):
print(f"🚀 Crew '{event.crew_name}' started", event.timestamp) self.logger.log(
print("event.inputs", event.inputs) f"🚀 Crew '{event.crew_name}' started",
event.timestamp,
color=self.color,
)
self._telemetry.crew_execution_span(source, event.inputs) self._telemetry.crew_execution_span(source, event.inputs)
@event_bus.on(CrewKickoffCompletedEvent) @event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent): def on_crew_completed(source, event: CrewKickoffCompletedEvent):
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewTestStartedEvent) @event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent): def on_crew_test_started(source, event: CrewTestStartedEvent):
@@ -49,50 +75,109 @@ class EventListener(BaseEventListener):
event.inputs, event.inputs,
event.openai_model_name, event.openai_model_name,
) )
print(f"🚀 Crew '{event.crew_name}' started test") self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewTestCompletedEvent) @event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent): def on_crew_test_completed(source, event: CrewTestCompletedEvent):
print(f"👍 Crew '{event.crew_name}' completed test") self.logger.log(
f"✅ Crew '{event.crew_name}' completed test",
event.timestamp,
color=self.color,
)
@event_bus.on(TaskStartedEvent) @event_bus.on(CrewTestFailedEvent)
def on_task_started(source, event: TaskStartedEvent): def on_crew_test_failed(source, event: CrewTestFailedEvent):
print(f"📋 Task started: {event.task.description}") self.logger.log(
f"❌ Crew '{event.crew_name}' failed test",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.logger.log(
f"📋 Crew '{event.crew_name}' started train",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed train",
event.timestamp,
color=self.color,
)
@event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed train",
event.timestamp,
color=self.color,
)
@event_bus.on(TaskCompletedEvent) @event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent): def on_task_completed(source, event: TaskCompletedEvent):
print(f" Output: {event.output}") self._telemetry.task_ended(source._execution_span, source, source)
result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output) self.logger.log(
print(f" Evaluation: {result.quality}") f"📋 Task completed: {source.description}",
if result.quality > 5: event.timestamp,
print(f" ✅ Passed: {result.suggestions}") color=self.color,
else: )
print(f" ❌ Failed: {result.suggestions}")
@event_bus.on(AgentExecutionStartedEvent) @event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent): def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
print(f"🤖 Agent '{event.agent.role}' started task") self.logger.log(
f"🤖 Agent '{event.agent.role}' started task",
event.timestamp,
color=self.color,
)
@event_bus.on(AgentExecutionCompletedEvent) @event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent): def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
print(f"👍 Agent '{event.agent.role}' completed task") self.logger.log(
print(f" Output: {event.output}") f"👍 Agent '{event.agent.role}' completed task",
event.timestamp,
color=self.color,
)
# Flow Events
@event_bus.on(FlowStartedEvent) @event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent): def on_flow_started(source, event: FlowStartedEvent):
print(f"🤖 Flow Started: '{event.flow_name}'") self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
event.timestamp,
color=self.color,
)
@event_bus.on(FlowFinishedEvent) @event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent): def on_flow_finished(source, event: FlowFinishedEvent):
print(f"👍 Flow Finished: '{event.flow_name}'") self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
event.timestamp,
color=self.color,
)
@event_bus.on(MethodExecutionStartedEvent) @event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent): def on_method_execution_started(source, event: MethodExecutionStartedEvent):
print(f"🤖 Flow Method Started: '{event.method_name}'") self.logger.log(
f"🤖 Flow Method Started: '{event.method_name}'",
event.timestamp,
color=self.color,
)
@event_bus.on(MethodExecutionFinishedEvent) @event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent): def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
print(f"👍 Flow Method Finished: '{event.method_name}'") self.logger.log(
f"👍 Flow Method Finished: '{event.method_name}'",
event.timestamp,
color=self.color,
)
event_listener = EventListener() event_listener = EventListener()

View File

@@ -1,4 +1,4 @@
from typing import Any from typing import Any, Optional
from crewai.utilities.events.crew_events import CrewEvent from crewai.utilities.events.crew_events import CrewEvent
@@ -6,16 +6,15 @@ from crewai.utilities.events.crew_events import CrewEvent
class TaskStartedEvent(CrewEvent): class TaskStartedEvent(CrewEvent):
"""Event emitted when a task starts""" """Event emitted when a task starts"""
task: Any
type: str = "task_started" type: str = "task_started"
context: Optional[str]
model_config = {"arbitrary_types_allowed": True} model_config = {"arbitrary_types_allowed": True}
class TaskCompletedEvent(CrewEvent): class TaskCompletedEvent(CrewEvent):
"""Event emitted when a task completes""" """Event emitted when a task completes"""
task: Any
output: Any output: Any
type: str = "task_completed" type: str = "task_completed"

View File

@@ -24,8 +24,17 @@ from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger from crewai.utilities import Logger
from crewai.utilities.events.crew_events import (
CrewTestCompletedEvent,
CrewTestStartedEvent,
)
from crewai.utilities.rpm_controller import RPMController from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.events import (
CrewTrainStartedEvent,
CrewTrainCompletedEvent,
event_bus,
)
ceo = Agent( ceo = Agent(
role="CEO", role="CEO",
@@ -2568,6 +2577,16 @@ def test_crew_train_success(
# Create a mock for the copied crew # Create a mock for the copied crew
copy_mock.return_value = crew copy_mock.return_value = crew
received_events = []
@event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
received_events.append(event)
@event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
received_events.append(event)
crew.train( crew.train(
n_iterations=2, inputs={"topic": "AI"}, filename="trained_agents_data.pkl" n_iterations=2, inputs={"topic": "AI"}, filename="trained_agents_data.pkl"
) )
@@ -2613,6 +2632,10 @@ def test_crew_train_success(
] ]
) )
assert len(received_events) == 2
assert isinstance(received_events[0], CrewTrainStartedEvent)
assert isinstance(received_events[1], CrewTrainCompletedEvent)
def test_crew_train_error(): def test_crew_train_error():
task = Task( task = Task(
@@ -3341,6 +3364,17 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
copy_mock.return_value = crew copy_mock.return_value = crew
n_iterations = 2 n_iterations = 2
received_events = []
@event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
received_events.append(event)
@event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
received_events.append(event)
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"}) crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
# Ensure kickoff is called on the copied crew # Ensure kickoff is called on the copied crew
@@ -3357,6 +3391,10 @@ def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
] ]
) )
assert len(received_events) == 2
assert isinstance(received_events[0], CrewTestStartedEvent)
assert isinstance(received_events[1], CrewTestCompletedEvent)
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_verbose_manager_agent(): def test_hierarchical_verbose_manager_agent():

View File

@@ -388,12 +388,16 @@ def test_flow_emits_method_execution_started_event():
flow = TestFlow() flow = TestFlow()
flow.kickoff() flow.kickoff()
assert len(received_events) == 1 assert len(received_events) == 2
assert received_events[0].method_name == "second_method" assert received_events[0].method_name == "begin"
assert received_events[0].flow_name == "TestFlow" assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "method_execution_started" assert received_events[0].type == "method_execution_started"
assert received_events[1].method_name == "second_method"
assert received_events[1].flow_name == "TestFlow"
assert received_events[1].type == "method_execution_started"
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_register_handler_adds_new_handler(): def test_register_handler_adds_new_handler():