Enhance event system type safety and error handling

- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling
This commit is contained in:
Lorenze Jay
2025-02-12 15:46:56 -08:00
parent 25453f7cb1
commit 1250388635
9 changed files with 7099 additions and 4757 deletions

View File

@@ -20,7 +20,7 @@ from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.tools import BaseTool from crewai.tools.base_tool import BaseTool
from crewai.tools.base_tool import Tool from crewai.tools.base_tool import Tool
from crewai.utilities import I18N, Logger, RPMController from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config from crewai.utilities.config import process_config
@@ -112,7 +112,7 @@ class BaseAgent(ABC, BaseModel):
default=False, default=False,
description="Enable agent to delegate and ask questions among each other.", description="Enable agent to delegate and ask questions among each other.",
) )
tools: Optional[List[Any]] = Field( tools: Optional[List[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal" default_factory=list, description="Tools at agents' disposal"
) )
max_iter: int = Field( max_iter: int = Field(

View File

@@ -90,15 +90,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.llm.stop = list(set(self.llm.stop + self.stop)) self.llm.stop = list(set(self.llm.stop + self.stop))
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]: def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
event_bus.emit( if self.agent and self.task:
self, event_bus.emit(
event=AgentExecutionStarted( self,
agent=self.agent, event=AgentExecutionStarted(
task=self.task, agent=self.agent,
tools=self.tools, tools=self.tools,
inputs=inputs, inputs=inputs,
), task=self.task,
) ),
)
if "system" in self.prompt: if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs) system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs) user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -191,12 +192,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_unknown_error(self, exception: Exception) -> None: def _handle_unknown_error(self, exception: Exception) -> None:
"""Handle unknown errors by informing the user.""" """Handle unknown errors by informing the user."""
event_bus.emit( if self.agent:
self, event_bus.emit(
event=AgentExecutionError( self,
agent=self.agent, task=self.task, error=str(exception) event=AgentExecutionError(
), agent=self.agent, task=self.task, error=str(exception)
) ),
)
self._printer.print( self._printer.print(
content="An unknown error occurred. Please check the details below.", content="An unknown error occurred. Please check the details below.",
color="red", color="red",

View File

@@ -3,7 +3,7 @@ from .crew_events import (
CrewKickoffCompleted, CrewKickoffCompleted,
CrewKickoffFailed, CrewKickoffFailed,
) )
from .agent_events import AgentExecutionStarted, AgentExecutionCompleted from .agent_events import AgentExecutionStarted, AgentExecutionCompleted, AgentExecutionError
from .task_events import TaskStarted, TaskCompleted, TaskFailed from .task_events import TaskStarted, TaskCompleted, TaskFailed
from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished from .flow_events import FlowStarted, FlowFinished, MethodExecutionStarted, MethodExecutionFinished
from .event_bus import EventTypes, EventBus from .event_bus import EventTypes, EventBus
@@ -34,7 +34,8 @@ __all__ = [
event_bus, event_bus,
ToolUsageFinished, ToolUsageFinished,
ToolUsageError, ToolUsageError,
EventBus EventBus,
AgentExecutionError
] ]

View File

@@ -1,24 +1,21 @@
from typing import Any, Dict, List from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .crew_events import CrewEvent from .crew_events import CrewEvent
if TYPE_CHECKING:
class AgentExecutorCreated(CrewEvent): from crewai.agents.agent_builder.base_agent import BaseAgent
"""Event emitted when an agent executor is created"""
agent: Any
tools: List[BaseTool]
type: str = "agent_executor_created"
class AgentExecutionStarted(CrewEvent): class AgentExecutionStarted(CrewEvent):
"""Event emitted when an agent starts executing a task""" """Event emitted when an agent starts executing a task"""
agent: Any # type: ignore agent: BaseAgent
task: Any # type: ignore task: Any
tools: List[Any] tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
inputs: Dict[str, Any] inputs: Dict[str, Any]
type: str = "agent_execution_started" type: str = "agent_execution_started"
@@ -28,20 +25,16 @@ class AgentExecutionStarted(CrewEvent):
class AgentExecutionCompleted(CrewEvent): class AgentExecutionCompleted(CrewEvent):
"""Event emitted when an agent completes executing a task""" """Event emitted when an agent completes executing a task"""
agent: Any agent: BaseAgent
task: Any task: Any
output: str output: str
type: str = "agent_execution_completed" type: str = "agent_execution_completed"
model_config = {"arbitrary_types_allowed": True}
class AgentExecutionError(CrewEvent): class AgentExecutionError(CrewEvent):
"""Event emitted when an agent encounters an error during execution""" """Event emitted when an agent encounters an error during execution"""
agent: Any agent: BaseAgent
task: Any task: Any
error: str error: str
type: str = "agent_execution_error" type: str = "agent_execution_error"
model_config = {"arbitrary_types_allowed": True}

View File

@@ -1,11 +1,15 @@
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal from blinker import Signal
from crewai.utilities.events.crew_events import CrewEvent
from .event_types import EventTypes from .event_types import EventTypes
EventT = TypeVar("EventT", bound=CrewEvent)
class EventBus: class EventBus:
""" """
@@ -27,28 +31,36 @@ class EventBus:
def _initialize(self): def _initialize(self):
"""Initialize the event bus internal state""" """Initialize the event bus internal state"""
self._signal = Signal("event_bus") self._signal = Signal("event_bus")
self._handlers: Dict[Type[EventTypes], List[Callable]] = {} self._handlers: Dict[
Type[CrewEvent], List[Callable[[Any, CrewEvent], None]]
] = {}
# TODO: generic types def on(
def on(self, event_type: Type[EventTypes]) -> Callable: self, event_type: Type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
""" """
Decorator to register an event handler for a specific event type. Decorator to register an event handler for a specific event type.
Usage: Usage:
@event_bus.on(CrewKickoffStarted) @event_bus.on(AgentExecutionCompleted)
def handle_kickoff(source, event): def on_agent_execution_completed(source: Any, event: AgentExecutionCompleted):
print(f"Crew kickoff started: {event}") print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
""" """
def decorator(handler: Callable[[Any, EventTypes], None]): def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers: if event_type not in self._handlers:
self._handlers[event_type] = [] self._handlers[event_type] = []
self._handlers[event_type].append(handler) self._handlers[event_type].append(
cast(Callable[[Any, CrewEvent], None], handler)
)
return handler return handler
return decorator return decorator
def emit(self, source: Any, event: EventTypes) -> None: def emit(self, source: Any, event: CrewEvent) -> None:
""" """
Emit an event to all registered handlers Emit an event to all registered handlers
@@ -67,12 +79,14 @@ class EventBus:
self._handlers.clear() self._handlers.clear()
def register_handler( def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None] self, event_type: Type[EventT], handler: Callable[[Any, EventT], None]
) -> None: ) -> None:
"""Register an event handler for a specific event type""" """Register an event handler for a specific event type"""
if event_type not in self._handlers: if event_type not in self._handlers:
self._handlers[event_type] = [] self._handlers[event_type] = []
self._handlers[event_type].append(handler) self._handlers[event_type].append(
cast(Callable[[Any, CrewEvent], None], handler)
)
@contextmanager @contextmanager
def scoped_handlers(self): def scoped_handlers(self):

View File

@@ -33,18 +33,18 @@ class EventListener(BaseEventListener):
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
@event_bus.on(CrewKickoffStarted) @event_bus.on(CrewKickoffStarted)
def on_crew_started(source, event): def on_crew_started(source, event: CrewKickoffStarted):
print(f"🚀 Crew '{event.crew_name}' started", event.timestamp) print(f"🚀 Crew '{event.crew_name}' started", event.timestamp)
print("event.inputs", event.inputs) print("event.inputs", event.inputs)
self._telemetry.crew_execution_span(source, event.inputs) self._telemetry.crew_execution_span(source, event.inputs)
@event_bus.on(CrewKickoffCompleted) @event_bus.on(CrewKickoffCompleted)
def on_crew_completed(source, event): def on_crew_completed(source, event: CrewKickoffCompleted):
final_string_output = event.output.raw final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output) self._telemetry.end_crew(source, final_string_output)
@event_bus.on(CrewTestStarted) @event_bus.on(CrewTestStarted)
def on_crew_test_started(source, event): def on_crew_test_started(source, event: CrewTestStarted):
cloned_crew = source.copy() cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span( cloned_crew._telemetry.test_execution_span(
cloned_crew, cloned_crew,
@@ -55,15 +55,15 @@ class EventListener(BaseEventListener):
print(f"🚀 Crew '{event.crew_name}' started test") print(f"🚀 Crew '{event.crew_name}' started test")
@event_bus.on(CrewTestCompleted) @event_bus.on(CrewTestCompleted)
def on_crew_test_completed(source, event): def on_crew_test_completed(source, event: CrewTestCompleted):
print(f"👍 Crew '{event.crew_name}' completed test") print(f"👍 Crew '{event.crew_name}' completed test")
@event_bus.on(TaskStarted) @event_bus.on(TaskStarted)
def on_task_started(source, event): def on_task_started(source, event: TaskStarted):
print(f"📋 Task started: {event.task.description}") print(f"📋 Task started: {event.task.description}")
@event_bus.on(TaskCompleted) @event_bus.on(TaskCompleted)
def on_task_completed(source, event): def on_task_completed(source, event: TaskCompleted):
print(f" Output: {event.output}") print(f" Output: {event.output}")
result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output) result = TaskEvaluator(event.task.agent).evaluate(event.task, event.output)
print(f" Evaluation: {result.quality}") print(f" Evaluation: {result.quality}")
@@ -73,28 +73,28 @@ class EventListener(BaseEventListener):
print(f" ❌ Failed: {result.suggestions}") print(f" ❌ Failed: {result.suggestions}")
@event_bus.on(AgentExecutionStarted) @event_bus.on(AgentExecutionStarted)
def on_agent_execution_started(source, event): def on_agent_execution_started(source, event: AgentExecutionStarted):
print(f"🤖 Agent '{event.agent.role}' started task") print(f"🤖 Agent '{event.agent.role}' started task")
@event_bus.on(AgentExecutionCompleted) @event_bus.on(AgentExecutionCompleted)
def on_agent_execution_completed(source, event): def on_agent_execution_completed(source, event: AgentExecutionCompleted):
print(f"👍 Agent '{event.agent.role}' completed task") print(f"👍 Agent '{event.agent.role}' completed task")
print(f" Output: {event.output}") print(f" Output: {event.output}")
@event_bus.on(FlowStarted) @event_bus.on(FlowStarted)
def on_flow_started(source, event): def on_flow_started(source, event: FlowStarted):
print(f"🤖 Flow Started: '{event.flow_name}'") print(f"🤖 Flow Started: '{event.flow_name}'")
@event_bus.on(FlowFinished) @event_bus.on(FlowFinished)
def on_flow_finished(source, event): def on_flow_finished(source, event: FlowFinished):
print(f"👍 Flow Finished: '{event.flow_name}'") print(f"👍 Flow Finished: '{event.flow_name}'")
@event_bus.on(MethodExecutionStarted) @event_bus.on(MethodExecutionStarted)
def on_method_execution_started(source, event): def on_method_execution_started(source, event: MethodExecutionStarted):
print(f"🤖 Flow Method Started: '{event.method_name}'") print(f"🤖 Flow Method Started: '{event.method_name}'")
@event_bus.on(MethodExecutionFinished) @event_bus.on(MethodExecutionFinished)
def on_method_execution_finished(source, event): def on_method_execution_finished(source, event: MethodExecutionFinished):
print(f"👍 Flow Method Finished: '{event.method_name}'") print(f"👍 Flow Method Finished: '{event.method_name}'")

View File

@@ -12,7 +12,6 @@ except ImportError:
class AgentOpsListener(BaseEventListener): class AgentOpsListener(BaseEventListener):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
print("AgentOpsListener init")
def setup_listeners(self, event_bus): def setup_listeners(self, event_bus):
if AGENTOPS_INSTALLED: if AGENTOPS_INSTALLED:

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@ from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew from crewai.crew import Crew
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
from crewai.task import Task from crewai.task import Task
from crewai.tools import BaseTool from crewai.tools.base_tool import BaseTool
from crewai.utilities.events.agent_events import ( from crewai.utilities.events.agent_events import (
AgentExecutionCompleted, AgentExecutionCompleted,
AgentExecutionError, AgentExecutionError,
@@ -306,7 +306,7 @@ def test_tools_emits_error_events():
crew = Crew(agents=[agent], tasks=[task], name="TestCrew") crew = Crew(agents=[agent], tasks=[task], name="TestCrew")
crew.kickoff() crew.kickoff()
assert len(received_events) == 60 assert len(received_events) == 75
assert received_events[0].agent_key == agent.key assert received_events[0].agent_key == agent.key
assert received_events[0].agent_role == agent.role assert received_events[0].agent_role == agent.role
assert received_events[0].tool_name == "error_tool" assert received_events[0].tool_name == "error_tool"