mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-01 07:13:00 +00:00
perf: reduce framework overhead — lazy event bus, skip tracing when disabled (#5187)
* perf: reduce framework overhead for NVIDIA benchmarks - Lazy initialize event bus thread pool and event loop on first emit() instead of at import time (~200ms savings) - Skip trace listener registration (50+ handlers) when tracing disabled - Skip trace prompt in non-interactive contexts (isatty check) to avoid 20s timeout in CI/Docker/API servers - Skip flush() when no events were emitted (avoids 30s timeout waste) - Add _has_pending_events flag to track if any events were emitted - Add _executor_initialized flag for lazy init double-checked locking All existing behavior preserved when tracing IS enabled. No public APIs changed - only conditional guards added. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: address PR review comments — tracing override, executor init order, stdin guard, unused import Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * style: fix ruff formatting in trace_listener.py and utils.py --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Iris Clawd <iris@crewai.com> Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
@@ -85,6 +85,8 @@ class CrewAIEventsBus:
|
|||||||
_shutting_down: bool
|
_shutting_down: bool
|
||||||
_pending_futures: set[Future[Any]]
|
_pending_futures: set[Future[Any]]
|
||||||
_futures_lock: threading.Lock
|
_futures_lock: threading.Lock
|
||||||
|
_executor_initialized: bool
|
||||||
|
_has_pending_events: bool
|
||||||
|
|
||||||
def __new__(cls) -> Self:
|
def __new__(cls) -> Self:
|
||||||
"""Create or return the singleton instance.
|
"""Create or return the singleton instance.
|
||||||
@@ -102,8 +104,9 @@ class CrewAIEventsBus:
|
|||||||
def _initialize(self) -> None:
|
def _initialize(self) -> None:
|
||||||
"""Initialize the event bus internal state.
|
"""Initialize the event bus internal state.
|
||||||
|
|
||||||
Creates handler dictionaries and starts a dedicated background
|
Creates handler dictionaries. The thread pool executor and event loop
|
||||||
event loop for async handler execution.
|
are lazily initialized on first emit() to avoid overhead when events
|
||||||
|
are never emitted.
|
||||||
"""
|
"""
|
||||||
self._shutting_down = False
|
self._shutting_down = False
|
||||||
self._rwlock = RWLock()
|
self._rwlock = RWLock()
|
||||||
@@ -115,19 +118,37 @@ class CrewAIEventsBus:
|
|||||||
type[BaseEvent], dict[Handler, list[Depends[Any]]]
|
type[BaseEvent], dict[Handler, list[Depends[Any]]]
|
||||||
] = {}
|
] = {}
|
||||||
self._execution_plan_cache: dict[type[BaseEvent], ExecutionPlan] = {}
|
self._execution_plan_cache: dict[type[BaseEvent], ExecutionPlan] = {}
|
||||||
self._sync_executor = ThreadPoolExecutor(
|
|
||||||
max_workers=10,
|
|
||||||
thread_name_prefix="CrewAISyncHandler",
|
|
||||||
)
|
|
||||||
self._console = ConsoleFormatter()
|
self._console = ConsoleFormatter()
|
||||||
|
# Lazy initialization flags - executor and loop created on first emit
|
||||||
|
self._executor_initialized = False
|
||||||
|
self._has_pending_events = False
|
||||||
|
|
||||||
self._loop = asyncio.new_event_loop()
|
def _ensure_executor_initialized(self) -> None:
|
||||||
self._loop_thread = threading.Thread(
|
"""Lazily initialize the thread pool executor and event loop.
|
||||||
target=self._run_loop,
|
|
||||||
name="CrewAIEventsLoop",
|
Called on first emit() to avoid startup overhead when events are never used.
|
||||||
daemon=True,
|
Thread-safe via double-checked locking.
|
||||||
)
|
"""
|
||||||
self._loop_thread.start()
|
if self._executor_initialized:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._instance_lock:
|
||||||
|
if self._executor_initialized:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._sync_executor = ThreadPoolExecutor(
|
||||||
|
max_workers=10,
|
||||||
|
thread_name_prefix="CrewAISyncHandler",
|
||||||
|
)
|
||||||
|
|
||||||
|
self._loop = asyncio.new_event_loop()
|
||||||
|
self._loop_thread = threading.Thread(
|
||||||
|
target=self._run_loop,
|
||||||
|
name="CrewAIEventsLoop",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
self._loop_thread.start()
|
||||||
|
self._executor_initialized = True
|
||||||
|
|
||||||
def _track_future(self, future: Future[Any]) -> Future[Any]:
|
def _track_future(self, future: Future[Any]) -> Future[Any]:
|
||||||
"""Track a future and set up automatic cleanup when it completes.
|
"""Track a future and set up automatic cleanup when it completes.
|
||||||
@@ -431,6 +452,15 @@ class CrewAIEventsBus:
|
|||||||
sync_handlers = self._sync_handlers.get(event_type, frozenset())
|
sync_handlers = self._sync_handlers.get(event_type, frozenset())
|
||||||
async_handlers = self._async_handlers.get(event_type, frozenset())
|
async_handlers = self._async_handlers.get(event_type, frozenset())
|
||||||
|
|
||||||
|
# Skip executor initialization if no handlers exist for this event
|
||||||
|
if not sync_handlers and not async_handlers:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Lazily initialize executor and event loop only when handlers exist
|
||||||
|
self._ensure_executor_initialized()
|
||||||
|
# Track that we have pending events for flush optimization
|
||||||
|
self._has_pending_events = True
|
||||||
|
|
||||||
if has_dependencies:
|
if has_dependencies:
|
||||||
return self._track_future(
|
return self._track_future(
|
||||||
asyncio.run_coroutine_threadsafe(
|
asyncio.run_coroutine_threadsafe(
|
||||||
@@ -474,6 +504,10 @@ class CrewAIEventsBus:
|
|||||||
Returns:
|
Returns:
|
||||||
True if all handlers completed, False if timeout occurred.
|
True if all handlers completed, False if timeout occurred.
|
||||||
"""
|
"""
|
||||||
|
# Skip flush entirely if no events were ever emitted
|
||||||
|
if not self._has_pending_events:
|
||||||
|
return True
|
||||||
|
|
||||||
with self._futures_lock:
|
with self._futures_lock:
|
||||||
futures_to_wait = list(self._pending_futures)
|
futures_to_wait = list(self._pending_futures)
|
||||||
|
|
||||||
@@ -629,6 +663,9 @@ class CrewAIEventsBus:
|
|||||||
|
|
||||||
with self._rwlock.w_locked():
|
with self._rwlock.w_locked():
|
||||||
self._shutting_down = True
|
self._shutting_down = True
|
||||||
|
# Check if executor was ever initialized (lazy init optimization)
|
||||||
|
if not self._executor_initialized:
|
||||||
|
return
|
||||||
loop = getattr(self, "_loop", None)
|
loop = getattr(self, "_loop", None)
|
||||||
|
|
||||||
if loop is None or loop.is_closed():
|
if loop is None or loop.is_closed():
|
||||||
|
|||||||
@@ -17,7 +17,10 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
|
|||||||
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
|
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
|
||||||
from crewai.events.listeners.tracing.types import TraceEvent
|
from crewai.events.listeners.tracing.types import TraceEvent
|
||||||
from crewai.events.listeners.tracing.utils import (
|
from crewai.events.listeners.tracing.utils import (
|
||||||
|
is_tracing_enabled_in_context,
|
||||||
safe_serialize_to_dict,
|
safe_serialize_to_dict,
|
||||||
|
should_auto_collect_first_time_traces,
|
||||||
|
should_enable_tracing,
|
||||||
)
|
)
|
||||||
from crewai.events.types.a2a_events import (
|
from crewai.events.types.a2a_events import (
|
||||||
A2AAgentCardFetchedEvent,
|
A2AAgentCardFetchedEvent,
|
||||||
@@ -198,6 +201,17 @@ class TraceCollectionListener(BaseEventListener):
|
|||||||
if self._listeners_setup:
|
if self._listeners_setup:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Skip registration entirely if tracing is disabled and not first-time user
|
||||||
|
# This avoids overhead of 50+ handler registrations when tracing won't be used
|
||||||
|
# Also check is_tracing_enabled_in_context() so per-run overrides (Crew(tracing=True)) still work
|
||||||
|
if (
|
||||||
|
not should_enable_tracing()
|
||||||
|
and not is_tracing_enabled_in_context()
|
||||||
|
and not should_auto_collect_first_time_traces()
|
||||||
|
):
|
||||||
|
self._listeners_setup = True
|
||||||
|
return
|
||||||
|
|
||||||
self._register_env_event_handlers(crewai_event_bus)
|
self._register_env_event_handlers(crewai_event_bus)
|
||||||
self._register_flow_event_handlers(crewai_event_bus)
|
self._register_flow_event_handlers(crewai_event_bus)
|
||||||
self._register_context_event_handlers(crewai_event_bus)
|
self._register_context_event_handlers(crewai_event_bus)
|
||||||
|
|||||||
@@ -481,6 +481,26 @@ def should_auto_collect_first_time_traces() -> bool:
|
|||||||
return is_first_execution()
|
return is_first_execution()
|
||||||
|
|
||||||
|
|
||||||
|
def _is_interactive_terminal() -> bool:
|
||||||
|
"""Check if stdin is an interactive terminal.
|
||||||
|
|
||||||
|
Returns False in non-interactive contexts (CI, API servers, Docker, etc.)
|
||||||
|
to avoid blocking on prompts that no one can respond to.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
|
||||||
|
try:
|
||||||
|
stdin = getattr(sys, "stdin", None)
|
||||||
|
if stdin is None:
|
||||||
|
return False
|
||||||
|
isatty = getattr(stdin, "isatty", None)
|
||||||
|
if not callable(isatty):
|
||||||
|
return False
|
||||||
|
return bool(isatty())
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
|
def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
|
||||||
"""
|
"""
|
||||||
Prompt user if they want to see their traces with timeout.
|
Prompt user if they want to see their traces with timeout.
|
||||||
@@ -492,6 +512,11 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
|
|||||||
if should_suppress_tracing_messages():
|
if should_suppress_tracing_messages():
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# Skip prompt in non-interactive contexts (CI, API servers, Docker, etc.)
|
||||||
|
# This avoids blocking for 20 seconds when no one can respond
|
||||||
|
if not _is_interactive_terminal():
|
||||||
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
|||||||
@@ -793,6 +793,10 @@ class TestTraceListenerSetup:
|
|||||||
"crewai.events.listeners.tracing.utils._is_test_environment",
|
"crewai.events.listeners.tracing.utils._is_test_environment",
|
||||||
return_value=False,
|
return_value=False,
|
||||||
),
|
),
|
||||||
|
patch(
|
||||||
|
"crewai.events.listeners.tracing.utils._is_interactive_terminal",
|
||||||
|
return_value=True,
|
||||||
|
),
|
||||||
patch("threading.Thread") as mock_thread,
|
patch("threading.Thread") as mock_thread,
|
||||||
):
|
):
|
||||||
from crewai.events.listeners.tracing.utils import (
|
from crewai.events.listeners.tracing.utils import (
|
||||||
|
|||||||
Reference in New Issue
Block a user