Compare commits

...

1 Commits

Author SHA1 Message Date
Alex
c57536e3d0 perf: reduce framework overhead for NVIDIA benchmarks
- Lazy initialize event bus thread pool and event loop on first emit()
  instead of at import time (~200ms savings)
- Skip trace listener registration (50+ handlers) when tracing disabled
- Skip trace prompt in non-interactive contexts (isatty check) to avoid
  20s timeout in CI/Docker/API servers
- Skip flush() when no events were emitted (avoids 30s timeout waste)
- Add _has_pending_events flag to track if any events were emitted
- Add _executor_initialized flag for lazy init double-checked locking

All existing behavior preserved when tracing IS enabled. No public APIs
changed - only conditional guards added.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-31 07:41:05 -07:00
4 changed files with 75 additions and 13 deletions

View File

@@ -85,6 +85,8 @@ class CrewAIEventsBus:
_shutting_down: bool
_pending_futures: set[Future[Any]]
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -102,8 +104,9 @@ class CrewAIEventsBus:
def _initialize(self) -> None:
"""Initialize the event bus internal state.
Creates handler dictionaries and starts a dedicated background
event loop for async handler execution.
Creates handler dictionaries. The thread pool executor and event loop
are lazily initialized on first emit() to avoid overhead when events
are never emitted.
"""
self._shutting_down = False
self._rwlock = RWLock()
@@ -115,19 +118,37 @@ class CrewAIEventsBus:
type[BaseEvent], dict[Handler, list[Depends[Any]]]
] = {}
self._execution_plan_cache: dict[type[BaseEvent], ExecutionPlan] = {}
self._sync_executor = ThreadPoolExecutor(
max_workers=10,
thread_name_prefix="CrewAISyncHandler",
)
self._console = ConsoleFormatter()
# Lazy initialization flags - executor and loop created on first emit
self._executor_initialized = False
self._has_pending_events = False
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(
target=self._run_loop,
name="CrewAIEventsLoop",
daemon=True,
)
self._loop_thread.start()
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
Called on first emit() to avoid startup overhead when events are never used.
Thread-safe via double-checked locking.
"""
if self._executor_initialized:
return
with self._instance_lock:
if self._executor_initialized:
return
self._sync_executor = ThreadPoolExecutor(
max_workers=10,
thread_name_prefix="CrewAISyncHandler",
)
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(
target=self._run_loop,
name="CrewAIEventsLoop",
daemon=True,
)
self._loop_thread.start()
self._executor_initialized = True
def _track_future(self, future: Future[Any]) -> Future[Any]:
"""Track a future and set up automatic cleanup when it completes.
@@ -396,6 +417,11 @@ class CrewAIEventsBus:
... await asyncio.wrap_future(future) # In async test
... # or future.result(timeout=5.0) in sync code
"""
# Lazily initialize executor and event loop on first emit
self._ensure_executor_initialized()
# Track that we have pending events for flush optimization
self._has_pending_events = True
event.previous_event_id = get_last_event_id()
event.triggered_by_event_id = get_triggering_event_id()
event.emission_sequence = get_next_emission_sequence()
@@ -474,6 +500,10 @@ class CrewAIEventsBus:
Returns:
True if all handlers completed, False if timeout occurred.
"""
# Skip flush entirely if no events were ever emitted
if not self._has_pending_events:
return True
with self._futures_lock:
futures_to_wait = list(self._pending_futures)
@@ -629,6 +659,9 @@ class CrewAIEventsBus:
with self._rwlock.w_locked():
self._shutting_down = True
# Check if executor was ever initialized (lazy init optimization)
if not self._executor_initialized:
return
loop = getattr(self, "_loop", None)
if loop is None or loop.is_closed():

View File

@@ -17,7 +17,10 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
safe_serialize_to_dict,
should_auto_collect_first_time_traces,
should_enable_tracing,
)
from crewai.events.types.a2a_events import (
A2AAgentCardFetchedEvent,
@@ -198,6 +201,12 @@ class TraceCollectionListener(BaseEventListener):
if self._listeners_setup:
return
# Skip registration entirely if tracing is disabled and not first-time user
# This avoids overhead of 50+ handler registrations when tracing won't be used
if not should_enable_tracing() and not should_auto_collect_first_time_traces():
self._listeners_setup = True
return
self._register_env_event_handlers(crewai_event_bus)
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)

View File

@@ -481,6 +481,17 @@ def should_auto_collect_first_time_traces() -> bool:
return is_first_execution()
def _is_interactive_terminal() -> bool:
"""Check if stdin is an interactive terminal.
Returns False in non-interactive contexts (CI, API servers, Docker, etc.)
to avoid blocking on prompts that no one can respond to.
"""
import sys
return sys.stdin.isatty()
def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
"""
Prompt user if they want to see their traces with timeout.
@@ -492,6 +503,11 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
if should_suppress_tracing_messages():
return False
# Skip prompt in non-interactive contexts (CI, API servers, Docker, etc.)
# This avoids blocking for 20 seconds when no one can respond
if not _is_interactive_terminal():
return False
try:
import threading

View File

@@ -793,6 +793,10 @@ class TestTraceListenerSetup:
"crewai.events.listeners.tracing.utils._is_test_environment",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.utils._is_interactive_terminal",
return_value=True,
),
patch("threading.Thread") as mock_thread,
):
from crewai.events.listeners.tracing.utils import (