diff --git a/src/crewai/utilities/events/crewai_event_bus.py b/src/crewai/utilities/events/crewai_event_bus.py index d8d7d7d4b..d11ee0c2a 100644 --- a/src/crewai/utilities/events/crewai_event_bus.py +++ b/src/crewai/utilities/events/crewai_event_bus.py @@ -1,3 +1,4 @@ +import logging import threading from contextlib import contextmanager from typing import Any, Callable, Dict, List, Type, TypeVar, cast @@ -12,12 +13,34 @@ EventT = TypeVar("EventT", bound=BaseEvent) class CrewAIEventsBus: """ - A singleton event bus that uses blinker signals for event handling. - Allows both internal (Flow/Crew) and external event handling. + Thread-safe singleton event bus for CrewAI events. + + This class provides a centralized event handling system that allows components + to emit and listen for events throughout the CrewAI framework. + + Thread Safety: + - All public methods are thread-safe + - Uses a class-level lock to ensure synchronized access to shared resources + - Safe for concurrent event emission and handler registration/deregistration + - Prevents race conditions that could cause event mixing between sessions + + Usage: + @crewai_event_bus.on(SomeEvent) + def handle_event(source, event): + # Handle the event + pass + + # Emit an event + event = SomeEvent(type="example") + crewai_event_bus.emit(source_object, event) + + # Deregister a handler + crewai_event_bus.deregister_handler(SomeEvent, handle_event) """ _instance = None _lock = threading.Lock() + _logger = logging.getLogger(__name__) def __new__(cls): if cls._instance is None: @@ -74,8 +97,15 @@ class CrewAIEventsBus: try: handler(source, event) except Exception as e: - print( - f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}" + CrewAIEventsBus._logger.error( + "Handler execution failed", + extra={ + "handler": handler.__name__, + "event_type": event_type.__name__, + "error": str(e), + "source": str(source) + }, + exc_info=True ) self._signal.send(source, event=event) @@ -91,6 +121,31 @@ class CrewAIEventsBus: cast(Callable[[Any, EventTypes], None], handler) ) + def deregister_handler( + self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None] + ) -> bool: + """ + Deregister an event handler for a specific event type. + + Args: + event_type: The event type to deregister the handler from + handler: The handler function to remove + + Returns: + bool: True if the handler was found and removed, False otherwise + """ + with CrewAIEventsBus._lock: + if event_type not in self._handlers: + return False + + try: + self._handlers[event_type].remove(handler) + if not self._handlers[event_type]: + del self._handlers[event_type] + return True + except ValueError: + return False + @contextmanager def scoped_handlers(self): """ diff --git a/tests/utilities/events/test_crewai_event_bus.py b/tests/utilities/events/test_crewai_event_bus.py index 4cf24f64e..be7d02a09 100644 --- a/tests/utilities/events/test_crewai_event_bus.py +++ b/tests/utilities/events/test_crewai_event_bus.py @@ -160,3 +160,45 @@ def test_thread_safety_with_mixed_operations(): future.result() assert len(received_events) >= 0 + + +def test_handler_deregistration_thread_safety(): + """Test that concurrent handler deregistration is thread-safe""" + + handlers_to_remove = [] + + for i in range(10): + def handler(source, event): + pass + handler.__name__ = f"handler_{i}" + crewai_event_bus.register_handler(TestEvent, handler) + handlers_to_remove.append(handler) + + initial_count = len(crewai_event_bus._handlers.get(TestEvent, [])) + + def deregister_handler(handler): + """Deregister a handler from a specific thread""" + return crewai_event_bus.deregister_handler(TestEvent, handler) + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [] + for handler in handlers_to_remove: + future = executor.submit(deregister_handler, handler) + futures.append(future) + + results = [future.result() for future in futures] + + assert all(results), "All handlers should be successfully deregistered" + + remaining_count = len(crewai_event_bus._handlers.get(TestEvent, [])) + assert remaining_count == 0, f"Expected 0 handlers remaining, got {remaining_count}" + + +def test_deregister_nonexistent_handler(): + """Test deregistering a handler that doesn't exist""" + + def dummy_handler(source, event): + pass + + result = crewai_event_bus.deregister_handler(TestEvent, dummy_handler) + assert result is False, "Deregistering non-existent handler should return False"