mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
* build(dev): add pytest-randomly dependency By randomizing the test execution order, this helps identify tests that unintentionally depend on shared state or specific execution order, which can lead to flaky or unreliable test behavior. * build(dev): add pytest-timeout This will prevent a test from running indefinitely * test: block external requests in CI and set default 10s timeout per test * test: adding missing cassettes We notice that those cassettes are missing after enabling block-network on CI * test: increase tests timeout on CI * test: fix flaky test ValueError: Circular reference detected (id repeated) * fix: prevent crash when event handler raises exception Previously, if a registered event handler raised an exception during execution, it could crash the entire application or interrupt the event dispatch process. This change wraps handler execution in a try/except block within the `emit` method, ensuring that exceptions are caught and logged without affecting other handlers or flow. This improves the resilience of the event bus, especially when handling third-party or temporary listeners.
116 lines
3.8 KiB
Python
116 lines
3.8 KiB
Python
import threading
|
|
from contextlib import contextmanager
|
|
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
|
|
|
|
from blinker import Signal
|
|
|
|
from crewai.utilities.events.base_events import BaseEvent
|
|
from crewai.utilities.events.event_types import EventTypes
|
|
|
|
EventT = TypeVar("EventT", bound=BaseEvent)
|
|
|
|
|
|
class CrewAIEventsBus:
|
|
"""
|
|
A singleton event bus that uses blinker signals for event handling.
|
|
Allows both internal (Flow/Crew) and external event handling.
|
|
"""
|
|
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None: # prevent race condition
|
|
cls._instance = super(CrewAIEventsBus, cls).__new__(cls)
|
|
cls._instance._initialize()
|
|
return cls._instance
|
|
|
|
def _initialize(self) -> None:
|
|
"""Initialize the event bus internal state"""
|
|
self._signal = Signal("crewai_event_bus")
|
|
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
|
|
|
|
def on(
|
|
self, event_type: Type[EventT]
|
|
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
|
|
"""
|
|
Decorator to register an event handler for a specific event type.
|
|
|
|
Usage:
|
|
@crewai_event_bus.on(AgentExecutionCompletedEvent)
|
|
def on_agent_execution_completed(
|
|
source: Any, event: AgentExecutionCompletedEvent
|
|
):
|
|
print(f"👍 Agent '{event.agent}' completed task")
|
|
print(f" Output: {event.output}")
|
|
"""
|
|
|
|
def decorator(
|
|
handler: Callable[[Any, EventT], None],
|
|
) -> Callable[[Any, EventT], None]:
|
|
if event_type not in self._handlers:
|
|
self._handlers[event_type] = []
|
|
self._handlers[event_type].append(
|
|
cast(Callable[[Any, EventT], None], handler)
|
|
)
|
|
return handler
|
|
|
|
return decorator
|
|
|
|
def emit(self, source: Any, event: BaseEvent) -> None:
|
|
"""
|
|
Emit an event to all registered handlers
|
|
|
|
Args:
|
|
source: The object emitting the event
|
|
event: The event instance to emit
|
|
"""
|
|
for event_type, handlers in self._handlers.items():
|
|
if isinstance(event, event_type):
|
|
for handler in handlers:
|
|
try:
|
|
handler(source, event)
|
|
except Exception as e:
|
|
print(
|
|
f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
|
|
)
|
|
|
|
self._signal.send(source, event=event)
|
|
|
|
def register_handler(
|
|
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
|
|
) -> None:
|
|
"""Register an event handler for a specific event type"""
|
|
if event_type not in self._handlers:
|
|
self._handlers[event_type] = []
|
|
self._handlers[event_type].append(
|
|
cast(Callable[[Any, EventTypes], None], handler)
|
|
)
|
|
|
|
@contextmanager
|
|
def scoped_handlers(self):
|
|
"""
|
|
Context manager for temporary event handling scope.
|
|
Useful for testing or temporary event handling.
|
|
|
|
Usage:
|
|
with crewai_event_bus.scoped_handlers():
|
|
@crewai_event_bus.on(CrewKickoffStarted)
|
|
def temp_handler(source, event):
|
|
print("Temporary handler")
|
|
# Do stuff...
|
|
# Handlers are cleared after the context
|
|
"""
|
|
previous_handlers = self._handlers.copy()
|
|
self._handlers.clear()
|
|
try:
|
|
yield
|
|
finally:
|
|
self._handlers = previous_handlers
|
|
|
|
|
|
# Global instance
|
|
crewai_event_bus = CrewAIEventsBus()
|