mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-25 08:08:14 +00:00
Compare commits
5 Commits
devin/1768
...
devin/1741
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e14bcd59f3 | ||
|
|
93bea7e9b7 | ||
|
|
0fb0d188d1 | ||
|
|
f412d15cb1 | ||
|
|
77ee6e1eee |
@@ -4,6 +4,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from importlib.metadata import version
|
from importlib.metadata import version
|
||||||
@@ -42,39 +43,75 @@ class Telemetry:
|
|||||||
|
|
||||||
Users can opt-in to sharing more complete data using the `share_crew`
|
Users can opt-in to sharing more complete data using the `share_crew`
|
||||||
attribute in the Crew class.
|
attribute in the Crew class.
|
||||||
|
|
||||||
|
This class implements a singleton pattern to ensure that only one instance
|
||||||
|
of the telemetry system exists, preventing multiple OtelBatchSpanProcessor
|
||||||
|
threads from being created. This is particularly important in environments
|
||||||
|
like FastAPI endpoints where agents are created dynamically, as it prevents
|
||||||
|
resource leaks and excessive thread creation.
|
||||||
|
|
||||||
|
The implementation uses thread-safe double-checked locking to ensure
|
||||||
|
thread safety when multiple threads attempt to create a Telemetry instance
|
||||||
|
simultaneously.
|
||||||
"""
|
"""
|
||||||
|
_instance = None
|
||||||
|
_initialized = False
|
||||||
|
_instance_lock = threading.Lock()
|
||||||
|
|
||||||
|
def __new__(cls):
|
||||||
|
if cls._instance is None:
|
||||||
|
with cls._instance_lock:
|
||||||
|
if cls._instance is None:
|
||||||
|
cls._instance = super().__new__(cls)
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.ready = False
|
if not self._initialized:
|
||||||
self.trace_set = False
|
with self._instance_lock:
|
||||||
|
if not self._initialized:
|
||||||
|
self.ready = False
|
||||||
|
self.trace_set = False
|
||||||
|
|
||||||
if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true":
|
if os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true":
|
||||||
return
|
self._initialized = True
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
telemetry_endpoint = "https://telemetry.crewai.com:4319"
|
telemetry_endpoint = "https://telemetry.crewai.com:4319"
|
||||||
self.resource = Resource(
|
self.resource = Resource(
|
||||||
attributes={SERVICE_NAME: "crewAI-telemetry"},
|
attributes={SERVICE_NAME: "crewAI-telemetry"},
|
||||||
)
|
)
|
||||||
with suppress_warnings():
|
with suppress_warnings():
|
||||||
self.provider = TracerProvider(resource=self.resource)
|
self.provider = TracerProvider(resource=self.resource)
|
||||||
|
|
||||||
processor = BatchSpanProcessor(
|
processor = BatchSpanProcessor(
|
||||||
OTLPSpanExporter(
|
OTLPSpanExporter(
|
||||||
endpoint=f"{telemetry_endpoint}/v1/traces",
|
endpoint=f"{telemetry_endpoint}/v1/traces",
|
||||||
timeout=30,
|
timeout=30,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.provider.add_span_processor(processor)
|
self.provider.add_span_processor(processor)
|
||||||
self.ready = True
|
self.ready = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if isinstance(
|
if isinstance(
|
||||||
e,
|
e,
|
||||||
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
|
(SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError),
|
||||||
):
|
):
|
||||||
raise # Re-raise the exception to not interfere with system signals
|
raise # Re-raise the exception to not interfere with system signals
|
||||||
self.ready = False
|
self.ready = False
|
||||||
|
|
||||||
|
self._initialized = True
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
"""Clean up resources when the instance is destroyed."""
|
||||||
|
if hasattr(self, 'provider') and self.provider:
|
||||||
|
try:
|
||||||
|
self.provider.shutdown()
|
||||||
|
except Exception as e:
|
||||||
|
# Re-raise the exception to not interfere with system signals
|
||||||
|
if isinstance(e, (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError)):
|
||||||
|
raise
|
||||||
|
|
||||||
def set_tracer(self):
|
def set_tracer(self):
|
||||||
if self.ready and not self.trace_set:
|
if self.ready and not self.trace_set:
|
||||||
|
|||||||
43
tests/telemetry/test_telemetry_singleton.py
Normal file
43
tests/telemetry/test_telemetry_singleton.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import threading
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai.telemetry import Telemetry
|
||||||
|
|
||||||
|
|
||||||
|
def test_telemetry_singleton():
|
||||||
|
"""Test that Telemetry is a singleton and only one instance is created."""
|
||||||
|
# Create multiple instances of Telemetry
|
||||||
|
telemetry1 = Telemetry()
|
||||||
|
telemetry2 = Telemetry()
|
||||||
|
|
||||||
|
# Verify that they are the same instance
|
||||||
|
assert telemetry1 is telemetry2
|
||||||
|
|
||||||
|
# Verify that the BatchSpanProcessor is initialized only once
|
||||||
|
# by checking that the provider is the same
|
||||||
|
assert telemetry1.provider is telemetry2.provider
|
||||||
|
|
||||||
|
|
||||||
|
def test_telemetry_thread_safety():
|
||||||
|
"""Test that Telemetry singleton is thread-safe."""
|
||||||
|
# List to store Telemetry instances created in threads
|
||||||
|
instances = []
|
||||||
|
|
||||||
|
def create_telemetry():
|
||||||
|
instances.append(Telemetry())
|
||||||
|
|
||||||
|
# Create multiple threads that create Telemetry instances
|
||||||
|
threads = []
|
||||||
|
for _ in range(10):
|
||||||
|
thread = threading.Thread(target=create_telemetry)
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# Wait for all threads to complete
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
# Verify that all instances are the same
|
||||||
|
for instance in instances[1:]:
|
||||||
|
assert instance is instances[0]
|
||||||
Reference in New Issue
Block a user