Compare commits

...

5 Commits

Author SHA1 Message Date
Devin AI
e14bcd59f3 Fix syntax errors in Telemetry class by adding thread safety to __init__ method
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 20:13:47 +00:00
Devin AI
93bea7e9b7 Improve Telemetry singleton with thread safety and resource cleanup
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 20:07:28 +00:00
Devin AI
0fb0d188d1 Fix import order in test_telemetry_singleton.py according to Ruff rules
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 20:02:40 +00:00
Devin AI
f412d15cb1 Fix import order in test_telemetry_singleton.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 20:00:16 +00:00
Devin AI
77ee6e1eee Fix issue 2279: Implement singleton pattern for Telemetry class to prevent multiple OtelBatchSpanProcessor threads
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-04 19:58:08 +00:00
2 changed files with 106 additions and 26 deletions

View File

@@ -4,6 +4,7 @@ import asyncio
import json
import os
import platform
import threading
import warnings
from contextlib import contextmanager
from importlib.metadata import version
@@ -42,39 +43,75 @@ class Telemetry:
Users can opt-in to sharing more complete data using the `share_crew`
attribute in the Crew class.
This class implements a singleton pattern to ensure that only one instance
of the telemetry system exists, preventing multiple OtelBatchSpanProcessor
threads from being created. This is particularly important in environments
like FastAPI endpoints where agents are created dynamically, as it prevents
resource leaks and excessive thread creation.
The implementation uses thread-safe double-checked locking to ensure
thread safety when multiple threads attempt to create a Telemetry instance
simultaneously.
"""
_instance = None
_initialized = False
_instance_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.ready = False
self.trace_set = False
if not self._initialized:
with self._instance_lock:
if not self._initialized:
self.ready = False
self.trace_set = False
if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true":
return
if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true":
self._initialized = True
return
try:
telemetry_endpoint = "https://telemetry.crewai.com:4319"
self.resource = Resource(
attributes={SERVICE_NAME: "crewAI-telemetry"},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
try:
telemetry_endpoint = "https://telemetry.crewai.com:4319"
self.resource = Resource(
attributes={SERVICE_NAME: "crewAI-telemetry"},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{telemetry_endpoint}/v1/traces",
timeout=30,
)
)
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{telemetry_endpoint}/v1/traces",
timeout=30,
)
)
self.provider.add_span_processor(processor)
self.ready = True
except Exception as e:
if isinstance(
e,
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
):
raise # Re-raise the exception to not interfere with system signals
self.ready = False
self.provider.add_span_processor(processor)
self.ready = True
except Exception as e:
if isinstance(
e,
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
):
raise # Re-raise the exception to not interfere with system signals
self.ready = False
self._initialized = True
def __del__(self):
"""Clean up resources when the instance is destroyed."""
if hasattr(self, 'provider') and self.provider:
try:
self.provider.shutdown()
except Exception as e:
# Re-raise the exception to not interfere with system signals
if isinstance(e, (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError)):
raise
def set_tracer(self):
if self.ready and not self.trace_set:

View File

@@ -0,0 +1,43 @@
import threading
import pytest
from crewai.telemetry import Telemetry
def test_telemetry_singleton():
"""Test that Telemetry is a singleton and only one instance is created."""
# Create multiple instances of Telemetry
telemetry1 = Telemetry()
telemetry2 = Telemetry()
# Verify that they are the same instance
assert telemetry1 is telemetry2
# Verify that the BatchSpanProcessor is initialized only once
# by checking that the provider is the same
assert telemetry1.provider is telemetry2.provider
def test_telemetry_thread_safety():
"""Test that Telemetry singleton is thread-safe."""
# List to store Telemetry instances created in threads
instances = []
def create_telemetry():
instances.append(Telemetry())
# Create multiple threads that create Telemetry instances
threads = []
for _ in range(10):
thread = threading.Thread(target=create_telemetry)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Verify that all instances are the same
for instance in instances[1:]:
assert instance is instances[0]