Fix failing test

This commit is contained in:
Vinicius Brasil
2025-03-20 10:27:29 -03:00
committed by Vini Brasil
parent 032922a4b9
commit fae1cfdc89
2 changed files with 9 additions and 18 deletions

View File

@@ -9,6 +9,7 @@ from crewai.utilities.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=CrewEvent)
class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
@@ -74,10 +75,6 @@ class CrewAIEventsBus:
self._signal.send(source, event=event)
def clear_handlers(self) -> None:
"""Clear all registered event handlers - useful for testing"""
self._handlers.clear()
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:

View File

@@ -1,40 +1,34 @@
from unittest.mock import Mock
import pytest
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
class TestEvent(CrewEvent):
pass
@pytest.fixture
def event_bus():
bus = CrewAIEventsBus()
bus.clear_handlers()
return bus
def test_specific_event_handler(event_bus):
def test_specific_event_handler():
mock_handler = Mock()
@event_bus.on(TestEvent)
@crewai_event_bus.on(TestEvent)
def handler(source, event):
mock_handler(source, event)
event = TestEvent(type="test_event")
event_bus.emit("source_object", event)
crewai_event_bus.emit("source_object", event)
mock_handler.assert_called_once_with("source_object", event)
def test_wildcard_event_handler(event_bus):
def test_wildcard_event_handler():
mock_handler = Mock()
@event_bus.on(CrewEvent)
@crewai_event_bus.on(CrewEvent)
def handler(source, event):
mock_handler(source, event)
event = TestEvent(type="test_event")
event_bus.emit("source_object", event)
crewai_event_bus.emit("source_object", event)
mock_handler.assert_called_once_with("source_object", event)