diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 490297b0c..f30c05b0a 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -4,6 +4,7 @@ import asyncio import json import os import platform +import threading import warnings from contextlib import contextmanager from importlib.metadata import version @@ -42,13 +43,26 @@ class Telemetry: Users can opt-in to sharing more complete data using the `share_crew` attribute in the Crew class. + + This class implements a singleton pattern to ensure that only one instance + of the telemetry system exists, preventing multiple OtelBatchSpanProcessor + threads from being created. This is particularly important in environments + like FastAPI endpoints where agents are created dynamically, as it prevents + resource leaks and excessive thread creation. + + The implementation uses thread-safe double-checked locking to ensure + thread safety when multiple threads attempt to create a Telemetry instance + simultaneously. """ _instance = None _initialized = False + _instance_lock = threading.Lock() def __new__(cls): if cls._instance is None: - cls._instance = super().__new__(cls) + with cls._instance_lock: + if cls._instance is None: + cls._instance = super().__new__(cls) return cls._instance def __init__(self): @@ -87,6 +101,16 @@ class Telemetry: self._initialized = True + def __del__(self): + """Clean up resources when the instance is destroyed.""" + if hasattr(self, 'provider') and self.provider: + try: + self.provider.shutdown() + except Exception as e: + # Re-raise the exception to not interfere with system signals + if isinstance(e, (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError)): + raise + def set_tracer(self): if self.ready and not self.trace_set: try: