diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 984a4938d..490297b0c 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -43,38 +43,49 @@ class Telemetry: Users can opt-in to sharing more complete data using the `share_crew` attribute in the Crew class. """ + _instance = None + _initialized = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance def __init__(self): - self.ready = False - self.trace_set = False - - if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true": - return - - try: - telemetry_endpoint = "https://telemetry.crewai.com:4319" - self.resource = Resource( - attributes={SERVICE_NAME: "crewAI-telemetry"}, - ) - with suppress_warnings(): - self.provider = TracerProvider(resource=self.resource) - - processor = BatchSpanProcessor( - OTLPSpanExporter( - endpoint=f"{telemetry_endpoint}/v1/traces", - timeout=30, - ) - ) - - self.provider.add_span_processor(processor) - self.ready = True - except Exception as e: - if isinstance( - e, - (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError), - ): - raise # Re-raise the exception to not interfere with system signals + if not self._initialized: self.ready = False + self.trace_set = False + + if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true": + self._initialized = True + return + + try: + telemetry_endpoint = "https://telemetry.crewai.com:4319" + self.resource = Resource( + attributes={SERVICE_NAME: "crewAI-telemetry"}, + ) + with suppress_warnings(): + self.provider = TracerProvider(resource=self.resource) + + processor = BatchSpanProcessor( + OTLPSpanExporter( + endpoint=f"{telemetry_endpoint}/v1/traces", + timeout=30, + ) + ) + + self.provider.add_span_processor(processor) + self.ready = True + except Exception as e: + if isinstance( + e, + (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError), + ): + raise # Re-raise the exception to not interfere with system signals + self.ready = False + + self._initialized = True def set_tracer(self): if self.ready and not self.trace_set: diff --git a/tests/telemetry/test_telemetry_singleton.py b/tests/telemetry/test_telemetry_singleton.py new file mode 100644 index 000000000..4b0a8b809 --- /dev/null +++ b/tests/telemetry/test_telemetry_singleton.py @@ -0,0 +1,41 @@ +import threading +import pytest +from crewai.telemetry import Telemetry + + +def test_telemetry_singleton(): + """Test that Telemetry is a singleton and only one instance is created.""" + # Create multiple instances of Telemetry + telemetry1 = Telemetry() + telemetry2 = Telemetry() + + # Verify that they are the same instance + assert telemetry1 is telemetry2 + + # Verify that the BatchSpanProcessor is initialized only once + # by checking that the provider is the same + assert telemetry1.provider is telemetry2.provider + + +def test_telemetry_thread_safety(): + """Test that Telemetry singleton is thread-safe.""" + # List to store Telemetry instances created in threads + instances = [] + + def create_telemetry(): + instances.append(Telemetry()) + + # Create multiple threads that create Telemetry instances + threads = [] + for _ in range(10): + thread = threading.Thread(target=create_telemetry) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Verify that all instances are the same + for instance in instances[1:]: + assert instance is instances[0]