diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index b30d469b9..eefe1ad88 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -85,6 +85,8 @@ class CrewAIEventsBus: _shutting_down: bool _pending_futures: set[Future[Any]] _futures_lock: threading.Lock + _executor_initialized: bool + _has_pending_events: bool def __new__(cls) -> Self: """Create or return the singleton instance. @@ -102,8 +104,9 @@ class CrewAIEventsBus: def _initialize(self) -> None: """Initialize the event bus internal state. - Creates handler dictionaries and starts a dedicated background - event loop for async handler execution. + Creates handler dictionaries. The thread pool executor and event loop + are lazily initialized on first emit() to avoid overhead when events + are never emitted. """ self._shutting_down = False self._rwlock = RWLock() @@ -115,19 +118,37 @@ class CrewAIEventsBus: type[BaseEvent], dict[Handler, list[Depends[Any]]] ] = {} self._execution_plan_cache: dict[type[BaseEvent], ExecutionPlan] = {} - self._sync_executor = ThreadPoolExecutor( - max_workers=10, - thread_name_prefix="CrewAISyncHandler", - ) self._console = ConsoleFormatter() + # Lazy initialization flags - executor and loop created on first emit + self._executor_initialized = False + self._has_pending_events = False - self._loop = asyncio.new_event_loop() - self._loop_thread = threading.Thread( - target=self._run_loop, - name="CrewAIEventsLoop", - daemon=True, - ) - self._loop_thread.start() + def _ensure_executor_initialized(self) -> None: + """Lazily initialize the thread pool executor and event loop. + + Called on first emit() to avoid startup overhead when events are never used. + Thread-safe via double-checked locking. + """ + if self._executor_initialized: + return + + with self._instance_lock: + if self._executor_initialized: + return + + self._sync_executor = ThreadPoolExecutor( + max_workers=10, + thread_name_prefix="CrewAISyncHandler", + ) + + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread( + target=self._run_loop, + name="CrewAIEventsLoop", + daemon=True, + ) + self._loop_thread.start() + self._executor_initialized = True def _track_future(self, future: Future[Any]) -> Future[Any]: """Track a future and set up automatic cleanup when it completes. @@ -431,6 +452,15 @@ class CrewAIEventsBus: sync_handlers = self._sync_handlers.get(event_type, frozenset()) async_handlers = self._async_handlers.get(event_type, frozenset()) + # Skip executor initialization if no handlers exist for this event + if not sync_handlers and not async_handlers: + return None + + # Lazily initialize executor and event loop only when handlers exist + self._ensure_executor_initialized() + # Track that we have pending events for flush optimization + self._has_pending_events = True + if has_dependencies: return self._track_future( asyncio.run_coroutine_threadsafe( @@ -474,6 +504,10 @@ class CrewAIEventsBus: Returns: True if all handlers completed, False if timeout occurred. """ + # Skip flush entirely if no events were ever emitted + if not self._has_pending_events: + return True + with self._futures_lock: futures_to_wait = list(self._pending_futures) @@ -629,6 +663,9 @@ class CrewAIEventsBus: with self._rwlock.w_locked(): self._shutting_down = True + # Check if executor was ever initialized (lazy init optimization) + if not self._executor_initialized: + return loop = getattr(self, "_loop", None) if loop is None or loop.is_closed(): diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 9d81f1d55..0e3b284c0 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -17,7 +17,10 @@ from crewai.events.listeners.tracing.first_time_trace_handler import ( from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.utils import ( + is_tracing_enabled_in_context, safe_serialize_to_dict, + should_auto_collect_first_time_traces, + should_enable_tracing, ) from crewai.events.types.a2a_events import ( A2AAgentCardFetchedEvent, @@ -198,6 +201,17 @@ class TraceCollectionListener(BaseEventListener): if self._listeners_setup: return + # Skip registration entirely if tracing is disabled and not first-time user + # This avoids overhead of 50+ handler registrations when tracing won't be used + # Also check is_tracing_enabled_in_context() so per-run overrides (Crew(tracing=True)) still work + if ( + not should_enable_tracing() + and not is_tracing_enabled_in_context() + and not should_auto_collect_first_time_traces() + ): + self._listeners_setup = True + return + self._register_env_event_handlers(crewai_event_bus) self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index 7a6eff3f0..314922870 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -481,6 +481,26 @@ def should_auto_collect_first_time_traces() -> bool: return is_first_execution() +def _is_interactive_terminal() -> bool: + """Check if stdin is an interactive terminal. + + Returns False in non-interactive contexts (CI, API servers, Docker, etc.) + to avoid blocking on prompts that no one can respond to. + """ + import sys + + try: + stdin = getattr(sys, "stdin", None) + if stdin is None: + return False + isatty = getattr(stdin, "isatty", None) + if not callable(isatty): + return False + return bool(isatty()) + except Exception: + return False + + def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool: """ Prompt user if they want to see their traces with timeout. @@ -492,6 +512,11 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool: if should_suppress_tracing_messages(): return False + # Skip prompt in non-interactive contexts (CI, API servers, Docker, etc.) + # This avoids blocking for 20 seconds when no one can respond + if not _is_interactive_terminal(): + return False + try: import threading diff --git a/lib/crewai/tests/tracing/test_tracing.py b/lib/crewai/tests/tracing/test_tracing.py index 92f6e31c5..640aca832 100644 --- a/lib/crewai/tests/tracing/test_tracing.py +++ b/lib/crewai/tests/tracing/test_tracing.py @@ -793,6 +793,10 @@ class TestTraceListenerSetup: "crewai.events.listeners.tracing.utils._is_test_environment", return_value=False, ), + patch( + "crewai.events.listeners.tracing.utils._is_interactive_terminal", + return_value=True, + ), patch("threading.Thread") as mock_thread, ): from crewai.events.listeners.tracing.utils import (