Refactor event handling and introduce new event types

- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information
This commit is contained in:
Lorenze Jay
2025-02-11 14:31:50 -08:00
parent 95bae8bba3
commit 676cabfdd6
19 changed files with 7369 additions and 441 deletions

View File

@@ -1,18 +1,35 @@
import pytest
from datetime import datetime
from crewai.utilities.events.events import on, emit
from unittest import mock
from unittest.mock import patch
import pytest
from pydantic import Field
from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.flow.flow import Flow, listen, start
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.utilities.events.agent_events import (
AgentExecutionStarted,
AgentExecutionCompleted,
AgentExecutionError,
AgentExecutionStarted,
)
from crewai.utilities.events.task_events import TaskStarted, TaskCompleted
from crewai.utilities.events.crew_events import CrewKickoffStarted, CrewKickoffCompleted
from crewai.crew import Crew
from crewai.agent import Agent
from crewai.task import Task
from unittest.mock import patch
from unittest import mock
from crewai.utilities.events.crew_events import (
CrewKickoffCompleted,
CrewKickoffFailed,
CrewKickoffStarted,
)
from crewai.utilities.events.event_bus import event_bus
from crewai.utilities.events.event_types import ToolUsageFinished
from crewai.utilities.events.flow_events import (
FlowFinished,
FlowStarted,
MethodExecutionStarted,
)
from crewai.utilities.events.task_events import TaskCompleted, TaskFailed, TaskStarted
from crewai.utilities.events.tool_usage_events import ToolUsageError
base_agent = Agent(
role="base_agent",
@@ -30,42 +47,36 @@ base_task = Task(
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_start_kickoff_event():
# Setup event listener
received_events = []
@on(CrewKickoffStarted)
def handle_crew_start(source, event):
received_events.append(event)
with event_bus.scoped_handlers():
# Create a simple crew
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
@event_bus.on(CrewKickoffStarted)
def handle_crew_start(source, event):
received_events.append(event)
# Run the crew
crew.kickoff()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
# Verify the event was emitted
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
crew.kickoff()
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_end_kickoff_event():
# Setup event listener
received_events = []
@on(CrewKickoffCompleted)
@event_bus.on(CrewKickoffCompleted)
def handle_crew_end(source, event):
received_events.append(event)
# Create a simple crew
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
# Run the crew
crew.kickoff()
# Verify the event was emitted
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
@@ -73,21 +84,42 @@ def test_crew_emits_end_kickoff_event():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_start_task_event():
# Setup event listener
def test_crew_emits_kickoff_failed_event():
received_events = []
@on(TaskStarted)
with event_bus.scoped_handlers():
@event_bus.on(CrewKickoffFailed)
def handle_crew_failed(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with patch.object(Crew, "_execute_tasks") as mock_execute:
error_message = "Simulated crew kickoff failure"
mock_execute.side_effect = Exception(error_message)
with pytest.raises(Exception):
crew.kickoff()
assert len(received_events) == 1
assert received_events[0].error == error_message
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_failed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_start_task_event():
received_events = []
@event_bus.on(TaskStarted)
def handle_task_start(source, event):
received_events.append(event)
# Create a simple crew
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
# Run the crew
crew.kickoff()
# Verify the event was emitted
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "task_started"
@@ -95,90 +127,308 @@ def test_crew_emits_start_task_event():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_end_task_event():
# Setup event listener
received_events = []
@on(TaskCompleted)
@event_bus.on(TaskCompleted)
def handle_task_end(source, event):
received_events.append(event)
# Create a simple crew
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
# Run the crew
crew.kickoff()
# Verify the event was emitted
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "task_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_emits_execution_error_event():
# Setup event listener
def test_task_emits_failed_event_on_execution_error():
received_events = []
@on(AgentExecutionError)
def handle_agent_error(source, event):
@event_bus.on(TaskFailed)
def handle_task_failed(source, event):
received_events.append(event)
# Create an agent that will fail
failing_agent = Agent(
role="failing_agent",
goal="Fail execution",
backstory="You are an agent that will fail",
max_retry_limit=1, # Set low retry limit for testing
task = Task(
description="Just say hi",
expected_output="hi",
agent=None,
)
# Create a task that will trigger an error
failing_task = Task(
description="This will fail", agent=failing_agent, expected_output="hi"
)
with pytest.raises(Exception) as exc_info:
task._execute_core(agent=None, context=None, tools=None)
error_message = "Forced error for testing"
# Mock the agent executor to raise an exception
with patch.object(failing_agent.agent_executor, "invoke") as mock_invoke:
mock_invoke.side_effect = Exception(error_message)
assert failing_agent._times_executed == 0
assert failing_agent.max_retry_limit == 1
assert "has no agent assigned" in str(exc_info.value)
# Execute task which should fail and emit error
with pytest.raises(Exception) as e:
failing_agent.execute_task(failing_task)
print("error message: ", e.value.args[0])
# assert e.value.args[0] == error_message
# assert failing_agent._times_executed == 2 # Initial attempt + 1 retry
# Verify the invoke was called twice (initial + retry)
mock_invoke.assert_has_calls(
[
mock.call(
{
"input": "This will fail\n\nThis is the expect criteria for your final answer: hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": False,
}
),
mock.call(
{
"input": "This will fail\n\nThis is the expect criteria for your final answer: hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": False,
}
),
]
)
print("made it here")
# Verify the error event was emitted
assert len(received_events) == 1
assert received_events[0].task == task
assert "has no agent assigned" in received_events[0].error
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "agent_execution_error"
assert received_events[0].agent == failing_agent
assert received_events[0].task == failing_task
assert error_message in received_events[0].error
assert received_events[0].type == "task_failed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_emits_execution_started_and_completed_events():
received_events = []
@event_bus.on(AgentExecutionStarted)
def handle_agent_start(source, event):
received_events.append(event)
@event_bus.on(AgentExecutionCompleted)
def handle_agent_completed(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert len(received_events) == 2
assert received_events[0].agent == base_agent
assert received_events[0].task == base_task
assert received_events[0].tools == []
assert received_events[0].inputs == {
"ask_for_human_input": False,
"input": "Just say hi\n"
"\n"
"This is the expect criteria for your final answer: hi\n"
"you MUST return the actual complete content as the final answer, not a "
"summary.",
"tool_names": "",
"tools": "",
}
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "agent_execution_started"
assert isinstance(received_events[1].timestamp, datetime)
assert received_events[1].type == "agent_execution_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_emits_execution_error_event():
received_events = []
@event_bus.on(AgentExecutionError)
def handle_agent_start(source, event):
received_events.append(event)
error_message = "Error happening while sending prompt to model."
base_agent.max_retry_limit = 0
with patch.object(
CrewAgentExecutor, "invoke", wraps=base_agent.agent_executor.invoke
) as invoke_mock:
invoke_mock.side_effect = Exception(error_message)
with pytest.raises(Exception) as e:
base_agent.execute_task(
task=base_task,
)
assert len(received_events) == 1
assert received_events[0].agent == base_agent
assert received_events[0].task == base_task
assert received_events[0].error == error_message
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "agent_execution_error"
class SayHiTool(BaseTool):
name: str = Field(default="say_hi", description="The name of the tool")
description: str = Field(
default="Say hi", description="The description of the tool"
)
def _run(self) -> str:
return "hi"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_tools_emits_finished_events():
received_events = []
@event_bus.on(ToolUsageFinished)
def handle_tool_end(source, event):
received_events.append(event)
agent = Agent(
role="base_agent",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
tools=[SayHiTool()],
)
task = Task(
description="Just say hi",
expected_output="hi",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], name="TestCrew")
crew.kickoff()
assert len(received_events) == 1
assert received_events[0].agent_key == agent.key
assert received_events[0].agent_role == agent.role
assert received_events[0].tool_name == SayHiTool().name
assert received_events[0].tool_args == {}
assert received_events[0].type == "tool_usage_finished"
assert isinstance(received_events[0].timestamp, datetime)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_tools_emits_error_events():
received_events = []
@event_bus.on(ToolUsageError)
def handle_tool_end(source, event):
received_events.append(event)
class ErrorTool(BaseTool):
name: str = Field(
default="error_tool", description="A tool that raises an error"
)
description: str = Field(
default="This tool always raises an error",
description="The description of the tool",
)
def _run(self) -> str:
raise Exception("Simulated tool error")
agent = Agent(
role="base_agent",
goal="Try to use the error tool",
backstory="You are an assistant that tests error handling",
tools=[ErrorTool()],
)
task = Task(
description="Use the error tool",
expected_output="This should error",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], name="TestCrew")
crew.kickoff()
assert len(received_events) == 60
assert received_events[0].agent_key == agent.key
assert received_events[0].agent_role == agent.role
assert received_events[0].tool_name == "error_tool"
assert received_events[0].tool_args == {}
assert received_events[0].error == "Simulated tool error"
assert received_events[0].type == "tool_usage_error"
assert isinstance(received_events[0].timestamp, datetime)
def test_flow_emits_start_event():
received_events = []
with event_bus.scoped_handlers():
@event_bus.on(FlowStarted)
def handle_flow_start(source, event):
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
flow = TestFlow()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
def test_flow_emits_finish_event():
received_events = []
with event_bus.scoped_handlers():
@event_bus.on(FlowFinished)
def handle_flow_finish(source, event):
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "completed"
flow = TestFlow()
result = flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_finished"
assert received_events[0].result == "completed"
assert result == "completed"
def test_flow_emits_method_execution_started_event():
received_events = []
with event_bus.scoped_handlers():
@event_bus.on(MethodExecutionStarted)
def handle_method_start(source, event):
print("event in method name", event.method_name)
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
@listen("begin")
def second_method(self):
return "executed"
flow = TestFlow()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].method_name == "second_method"
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "method_execution_started"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_register_handler_adds_new_handler():
received_events = []
def custom_handler(source, event):
received_events.append(event)
with event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStarted, custom_handler)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_handlers_for_same_event():
received_events_1 = []
received_events_2 = []
def handler_1(source, event):
received_events_1.append(event)
def handler_2(source, event):
received_events_2.append(event)
with event_bus.scoped_handlers():
event_bus.register_handler(CrewKickoffStarted, handler_1)
event_bus.register_handler(CrewKickoffStarted, handler_2)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert len(received_events_1) == 1
assert len(received_events_2) == 1
assert received_events_1[0].type == "crew_kickoff_started"
assert received_events_2[0].type == "crew_kickoff_started"