feat: improve event bus thread safety and async support

Add thread-safe, async-compatible event bus with read–write locking and
handler dependency ordering. Remove blinker dependency and implement
direct dispatch. Improve type safety, error handling, and deterministic
event synchronization.

Refactor tests to auto-wait for async handlers, ensure clean teardown,
and add comprehensive concurrency coverage. Replace thread-local state
in AgentEvaluator with instance-based locking for correct cross-thread
access. Enhance tracing reliability and event finalization.
This commit is contained in:
Greyson LaLonde
2025-10-14 13:28:58 -04:00
committed by GitHub
parent cec4e4c2e9
commit 53b239c6df
34 changed files with 3360 additions and 876 deletions

View File

@@ -14,6 +14,7 @@ from crewai.events.listeners.tracing.trace_listener import (
)
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.flow.flow import Flow, start
from tests.utils import wait_for_event_handlers
class TestTraceListenerSetup:
@@ -39,38 +40,44 @@ class TestTraceListenerSetup:
):
yield
@pytest.fixture(autouse=True)
def clear_event_bus(self):
"""Clear event bus listeners before and after each test"""
from crewai.events.event_bus import crewai_event_bus
# Store original handlers
original_handlers = crewai_event_bus._handlers.copy()
# Clear for test
crewai_event_bus._handlers.clear()
yield
# Restore original state
crewai_event_bus._handlers.clear()
crewai_event_bus._handlers.update(original_handlers)
@pytest.fixture(autouse=True)
def reset_tracing_singletons(self):
"""Reset tracing singleton instances between tests"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
# Clear event bus handlers BEFORE creating any new singletons
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers = {}
crewai_event_bus._async_handlers = {}
crewai_event_bus._handler_dependencies = {}
crewai_event_bus._execution_plan_cache = {}
# Reset TraceCollectionListener singleton
if hasattr(TraceCollectionListener, "_instance"):
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
# Reset EventListener singleton
if hasattr(EventListener, "_instance"):
EventListener._instance = None
yield
# Clean up after test
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers = {}
crewai_event_bus._async_handlers = {}
crewai_event_bus._handler_dependencies = {}
crewai_event_bus._execution_plan_cache = {}
if hasattr(TraceCollectionListener, "_instance"):
TraceCollectionListener._instance = None
TraceCollectionListener._initialized = False
if hasattr(EventListener, "_instance"):
EventListener._instance = None
@pytest.fixture(autouse=True)
def mock_plus_api_calls(self):
"""Mock all PlusAPI HTTP calls to avoid network requests"""
@@ -167,15 +174,26 @@ class TestTraceListenerSetup:
from crewai.events.event_bus import crewai_event_bus
trace_listener = None
for handler_list in crewai_event_bus._handlers.values():
for handler in handler_list:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_listener = handler.__self__
with crewai_event_bus._rwlock.r_locked():
for handler_set in crewai_event_bus._sync_handlers.values():
for handler in handler_set:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_listener = handler.__self__
break
if trace_listener:
break
if trace_listener:
break
if not trace_listener:
for handler_set in crewai_event_bus._async_handlers.values():
for handler in handler_set:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_listener = handler.__self__
break
if trace_listener:
break
if not trace_listener:
pytest.skip(
@@ -221,6 +239,7 @@ class TestTraceListenerSetup:
wraps=trace_listener.batch_manager.add_event,
) as add_event_mock:
crew.kickoff()
wait_for_event_handlers()
assert add_event_mock.call_count >= 2
@@ -267,24 +286,22 @@ class TestTraceListenerSetup:
from crewai.events.event_bus import crewai_event_bus
trace_handlers = []
for handlers in crewai_event_bus._handlers.values():
for handler in handlers:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_handlers.append(handler)
elif hasattr(handler, "__name__") and any(
trace_name in handler.__name__
for trace_name in [
"on_crew_started",
"on_crew_completed",
"on_flow_started",
]
):
trace_handlers.append(handler)
with crewai_event_bus._rwlock.r_locked():
for handlers in crewai_event_bus._sync_handlers.values():
for handler in handlers:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_handlers.append(handler)
for handlers in crewai_event_bus._async_handlers.values():
for handler in handlers:
if hasattr(handler, "__self__") and isinstance(
handler.__self__, TraceCollectionListener
):
trace_handlers.append(handler)
assert len(trace_handlers) == 0, (
f"Found {len(trace_handlers)} trace handlers when tracing should be disabled"
f"Found {len(trace_handlers)} TraceCollectionListener handlers when tracing should be disabled"
)
def test_trace_listener_setup_correctly_for_crew(self):
@@ -385,6 +402,7 @@ class TestTraceListenerSetup:
):
crew = Crew(agents=[agent], tasks=[task], tracing=True)
crew.kickoff()
wait_for_event_handlers()
mock_plus_api_class.assert_called_with(api_key="mock_token_12345")
@@ -396,15 +414,33 @@ class TestTraceListenerSetup:
def teardown_method(self):
"""Cleanup after each test method"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
crewai_event_bus._handlers.clear()
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers = {}
crewai_event_bus._async_handlers = {}
crewai_event_bus._handler_dependencies = {}
crewai_event_bus._execution_plan_cache = {}
# Reset EventListener singleton
if hasattr(EventListener, "_instance"):
EventListener._instance = None
@classmethod
def teardown_class(cls):
"""Final cleanup after all tests in this class"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
crewai_event_bus._handlers.clear()
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers = {}
crewai_event_bus._async_handlers = {}
crewai_event_bus._handler_dependencies = {}
crewai_event_bus._execution_plan_cache = {}
# Reset EventListener singleton
if hasattr(EventListener, "_instance"):
EventListener._instance = None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls):
@@ -466,6 +502,7 @@ class TestTraceListenerSetup:
) as mock_add_event,
):
result = crew.kickoff()
wait_for_event_handlers()
assert result is not None
assert mock_handle_completion.call_count >= 1
@@ -543,6 +580,7 @@ class TestTraceListenerSetup:
)
crew.kickoff()
wait_for_event_handlers()
assert mock_handle_completion.call_count >= 1, (
"handle_execution_completion should be called"
@@ -561,7 +599,6 @@ class TestTraceListenerSetup:
@pytest.mark.vcr(filter_headers=["authorization"])
def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls):
"""Test the consolidation logic for first-time users vs regular tracing"""
with (
patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}),
patch(
@@ -579,7 +616,9 @@ class TestTraceListenerSetup:
):
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus._handlers.clear()
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers = {}
crewai_event_bus._async_handlers = {}
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
@@ -600,6 +639,9 @@ class TestTraceListenerSetup:
with patch.object(TraceBatchManager, "initialize_batch") as mock_initialize:
result = crew.kickoff()
assert trace_listener.batch_manager.wait_for_pending_events(timeout=5.0), (
"Timeout waiting for trace event handlers to complete"
)
assert mock_initialize.call_count >= 1
assert mock_initialize.call_args_list[0][1]["use_ephemeral"] is True
assert result is not None
@@ -700,6 +742,7 @@ class TestTraceListenerSetup:
) as mock_mark_failed,
):
crew.kickoff()
wait_for_event_handlers()
mock_mark_failed.assert_called_once()
call_args = mock_mark_failed.call_args_list[0]