mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Improve Telemetry singleton with thread safety and resource cleanup
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from importlib.metadata import version
|
from importlib.metadata import version
|
||||||
@@ -42,13 +43,26 @@ class Telemetry:
|
|||||||
|
|
||||||
Users can opt-in to sharing more complete data using the `share_crew`
|
Users can opt-in to sharing more complete data using the `share_crew`
|
||||||
attribute in the Crew class.
|
attribute in the Crew class.
|
||||||
|
|
||||||
|
This class implements a singleton pattern to ensure that only one instance
|
||||||
|
of the telemetry system exists, preventing multiple OtelBatchSpanProcessor
|
||||||
|
threads from being created. This is particularly important in environments
|
||||||
|
like FastAPI endpoints where agents are created dynamically, as it prevents
|
||||||
|
resource leaks and excessive thread creation.
|
||||||
|
|
||||||
|
The implementation uses thread-safe double-checked locking to ensure
|
||||||
|
thread safety when multiple threads attempt to create a Telemetry instance
|
||||||
|
simultaneously.
|
||||||
"""
|
"""
|
||||||
_instance = None
|
_instance = None
|
||||||
_initialized = False
|
_initialized = False
|
||||||
|
_instance_lock = threading.Lock()
|
||||||
|
|
||||||
def __new__(cls):
|
def __new__(cls):
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
cls._instance = super().__new__(cls)
|
with cls._instance_lock:
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = super().__new__(cls)
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -87,6 +101,16 @@ class Telemetry:
|
|||||||
|
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
"""Clean up resources when the instance is destroyed."""
|
||||||
|
if hasattr(self, 'provider') and self.provider:
|
||||||
|
try:
|
||||||
|
self.provider.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
# Re-raise the exception to not interfere with system signals
|
||||||
|
if isinstance(e, (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError)):
|
||||||
|
raise
|
||||||
|
|
||||||
def set_tracer(self):
|
def set_tracer(self):
|
||||||
if self.ready and not self.trace_set:
|
if self.ready and not self.trace_set:
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user