mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-03 06:08:15 +00:00
fix: stop anonymous telemetry from globalizing the TracerProvider
`Telemetry.set_tracer()` installed crewAI's anonymous SDK
`TracerProvider` into OpenTelemetry's process-global slot, so the first
`Crew` constructed in a test or host application replaced the default
`ProxyTracerProvider` and exfiltrated every host span emitted via
`trace.get_tracer(...)` to crewAI's OTLP endpoint. Keep the provider
local to the `Telemetry` instance and route every anonymous span
through `self.provider.get_tracer("crewai.telemetry")` so the global
slot stays untouched. Mirrors the fix in `crewai_core.telemetry`,
drops the now-dead `set_tracer()` calls in `event_listener.py` and
`crewai_cli.command`, and adds regression coverage that asserts the
provider stays a `ProxyTracerProvider` after constructing a `Crew`.
This commit is contained in:
@@ -20,13 +20,11 @@ class AuthenticationRequiredError(SystemExit):
|
||||
class BaseCommand:
|
||||
def __init__(self) -> None:
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
|
||||
|
||||
class PlusAPIMixin:
|
||||
def __init__(self, telemetry: Telemetry) -> None:
|
||||
try:
|
||||
telemetry.set_tracer()
|
||||
self.plus_api_client = PlusAPI(api_key=get_auth_token())
|
||||
except Exception:
|
||||
telemetry.deploy_signup_error_span()
|
||||
|
||||
@@ -19,7 +19,6 @@ import os
|
||||
import threading
|
||||
from typing import Any, ClassVar, Final
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
@@ -27,7 +26,7 @@ from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
SpanExportResult,
|
||||
)
|
||||
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
|
||||
from opentelemetry.trace import Span, Status, StatusCode, Tracer
|
||||
from typing_extensions import Self
|
||||
|
||||
|
||||
@@ -70,6 +69,12 @@ class Telemetry:
|
||||
|
||||
crewai's runtime extends this with crew/agent/task/tool/flow execution spans
|
||||
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
|
||||
|
||||
The anonymous-telemetry pipeline owns a private ``TracerProvider`` that is
|
||||
never installed as OpenTelemetry's global provider. Host applications keep
|
||||
full control of the process-wide provider slot, and any host spans emitted
|
||||
through ``crewai.telemetry.otel.operation`` stay on the host pipeline
|
||||
rather than getting exfiltrated to crewAI's OTLP endpoint.
|
||||
"""
|
||||
|
||||
_instance: ClassVar[Self | None] = None
|
||||
@@ -88,7 +93,6 @@ class Telemetry:
|
||||
return
|
||||
|
||||
self.ready: bool = False
|
||||
self.trace_set: bool = False
|
||||
self._initialized: bool = True
|
||||
|
||||
if self._is_telemetry_disabled():
|
||||
@@ -144,21 +148,9 @@ class Telemetry:
|
||||
except Exception as e:
|
||||
logger.debug("Telemetry shutdown failed: %s", e)
|
||||
|
||||
def set_tracer(self) -> None:
|
||||
"""Install our TracerProvider as the global one (idempotent)."""
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with suppress_warnings():
|
||||
existing_provider = trace.get_tracer_provider()
|
||||
if not isinstance(existing_provider, ProxyTracerProvider):
|
||||
self.trace_set = True
|
||||
return
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception as e:
|
||||
logger.debug("Failed to set tracer provider: %s", e)
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
def _tracer(self) -> Tracer:
|
||||
"""Return the anonymous-telemetry tracer from the private provider."""
|
||||
return self.provider.get_tracer("crewai.telemetry")
|
||||
|
||||
def _safe_telemetry_operation(
|
||||
self, operation: Callable[[], Span | None]
|
||||
@@ -194,7 +186,7 @@ class Telemetry:
|
||||
"""Records when an error occurs during the deployment signup process."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Deploy Signup Error")
|
||||
close_span(span)
|
||||
|
||||
@@ -204,7 +196,7 @@ class Telemetry:
|
||||
"""Records the start of a deployment process."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Start Deployment")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
@@ -216,7 +208,7 @@ class Telemetry:
|
||||
"""Records the creation of a new crew deployment."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Create Crew Deployment")
|
||||
close_span(span)
|
||||
|
||||
@@ -228,7 +220,7 @@ class Telemetry:
|
||||
"""Records the retrieval of crew logs."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Get Crew Logs")
|
||||
self._add_attribute(span, "log_type", log_type)
|
||||
if uuid:
|
||||
@@ -241,7 +233,7 @@ class Telemetry:
|
||||
"""Records the removal of a crew."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Remove Crew")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
@@ -253,7 +245,7 @@ class Telemetry:
|
||||
"""Records the creation of a new flow."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Flow Creation")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
close_span(span)
|
||||
@@ -265,7 +257,7 @@ class Telemetry:
|
||||
from crewai_core.version import get_crewai_version
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Template Installed")
|
||||
self._add_attribute(span, "crewai_version", get_crewai_version())
|
||||
self._add_attribute(span, "template_name", template_name)
|
||||
|
||||
@@ -13,7 +13,8 @@ from crewai_core import (
|
||||
user_data,
|
||||
version,
|
||||
)
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import ProxyTracerProvider
|
||||
import pytest
|
||||
|
||||
|
||||
@@ -97,7 +98,7 @@ def test_unused_var_warning_silenced() -> None:
|
||||
assert os.environ is not None
|
||||
|
||||
|
||||
def test_core_telemetry_skips_duplicate_tracer_provider(
|
||||
def test_core_telemetry_does_not_install_global_tracer_provider(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
from crewai_core.telemetry import Telemetry
|
||||
@@ -107,24 +108,7 @@ def test_core_telemetry_skips_duplicate_tracer_provider(
|
||||
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
|
||||
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"crewai_core.telemetry.trace.get_tracer_provider",
|
||||
lambda: TracerProvider(),
|
||||
)
|
||||
|
||||
called = False
|
||||
|
||||
def fail_if_called(provider: object) -> None:
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
monkeypatch.setattr(
|
||||
"crewai_core.telemetry.trace.set_tracer_provider",
|
||||
fail_if_called,
|
||||
)
|
||||
|
||||
telemetry = Telemetry()
|
||||
telemetry.set_tracer()
|
||||
|
||||
assert called is False
|
||||
assert telemetry.trace_set is True
|
||||
assert telemetry.ready is True
|
||||
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)
|
||||
|
||||
@@ -149,7 +149,6 @@ class EventListener(BaseEventListener):
|
||||
if not self._initialized:
|
||||
super().__init__()
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
self.execution_spans = {}
|
||||
self._initialized = True
|
||||
self.formatter = ConsoleFormatter(verbose=True)
|
||||
|
||||
@@ -20,7 +20,6 @@ import signal
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
@@ -30,7 +29,7 @@ from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
SpanExportResult,
|
||||
)
|
||||
from opentelemetry.trace import ProxyTracerProvider, Span
|
||||
from opentelemetry.trace import Span, Tracer
|
||||
from typing_extensions import Self
|
||||
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
@@ -90,11 +89,17 @@ class SafeOTLPSpanExporter(OTLPSpanExporter):
|
||||
class Telemetry:
|
||||
"""Handle anonymous telemetry for the CrewAI package.
|
||||
|
||||
The anonymous-telemetry pipeline owns a private ``TracerProvider`` that
|
||||
is never installed as OpenTelemetry's global provider. Host applications
|
||||
keep full control of the process-wide provider slot, and any host spans
|
||||
emitted through ``crewai.telemetry.otel.operation`` stay on the host
|
||||
pipeline rather than getting exfiltrated to crewAI's OTLP endpoint.
|
||||
|
||||
Attributes:
|
||||
ready: Whether telemetry is initialized and ready.
|
||||
trace_set: Whether the tracer provider has been set.
|
||||
resource: OpenTelemetry resource for the telemetry service.
|
||||
provider: OpenTelemetry tracer provider.
|
||||
provider: Local OpenTelemetry tracer provider that is NOT registered
|
||||
globally; all anonymous spans are emitted through it directly.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
@@ -113,7 +118,6 @@ class Telemetry:
|
||||
return
|
||||
|
||||
self.ready: bool = False
|
||||
self.trace_set: bool = False
|
||||
self._initialized: bool = True
|
||||
|
||||
if self._is_telemetry_disabled():
|
||||
@@ -157,21 +161,9 @@ class Telemetry:
|
||||
"""Check if telemetry operations should be executed."""
|
||||
return self.ready and not self._is_telemetry_disabled()
|
||||
|
||||
def set_tracer(self) -> None:
|
||||
"""Set the tracer provider if ready and not already set."""
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with suppress_warnings():
|
||||
existing_provider = trace.get_tracer_provider()
|
||||
if not isinstance(existing_provider, ProxyTracerProvider):
|
||||
self.trace_set = True
|
||||
return
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to set tracer provider: {e}")
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
def _tracer(self) -> Tracer:
|
||||
"""Return the anonymous-telemetry tracer from the private provider."""
|
||||
return self.provider.get_tracer("crewai.telemetry")
|
||||
|
||||
def _register_shutdown_handlers(self) -> None:
|
||||
"""Register handlers for graceful shutdown on process exit and signals."""
|
||||
@@ -275,7 +267,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Crew Created")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -487,7 +479,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> Span:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
|
||||
created_span = tracer.start_span("Task Created")
|
||||
|
||||
@@ -581,7 +573,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Tool Repeated Usage")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -609,7 +601,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Tool Usage")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -638,7 +630,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Tool Usage Error")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -669,7 +661,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Crew Individual Test Result")
|
||||
|
||||
self._add_attribute(
|
||||
@@ -704,7 +696,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Crew Test Execution")
|
||||
|
||||
self._add_attribute(
|
||||
@@ -729,7 +721,7 @@ class Telemetry:
|
||||
"""Records when an error occurs during the deployment signup process."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Deploy Signup Error")
|
||||
close_span(span)
|
||||
|
||||
@@ -743,7 +735,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Start Deployment")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
@@ -755,7 +747,7 @@ class Telemetry:
|
||||
"""Records the creation of a new crew deployment."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Create Crew Deployment")
|
||||
close_span(span)
|
||||
|
||||
@@ -772,7 +764,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Get Crew Logs")
|
||||
self._add_attribute(span, "log_type", log_type)
|
||||
if uuid:
|
||||
@@ -789,7 +781,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Remove Crew")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
@@ -814,7 +806,7 @@ class Telemetry:
|
||||
self.crew_creation(crew, inputs)
|
||||
|
||||
def _operation() -> Span:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Crew Execution")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -947,7 +939,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Flow Creation")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
close_span(span)
|
||||
@@ -963,7 +955,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Flow Plotting")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
self._add_attribute(span, "node_names", json.dumps(node_names))
|
||||
@@ -980,7 +972,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Flow Execution")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -997,7 +989,7 @@ class Telemetry:
|
||||
"""Records the coding tool environment context."""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Environment Context")
|
||||
self._add_attribute(
|
||||
span,
|
||||
@@ -1028,7 +1020,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Human Feedback")
|
||||
self._add_attribute(span, "event_type", event_type)
|
||||
self._add_attribute(span, "has_routing", has_routing)
|
||||
@@ -1050,7 +1042,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Feature Usage")
|
||||
self._add_attribute(span, "crewai_version", version("crewai"))
|
||||
self._add_attribute(span, "feature", feature)
|
||||
@@ -1067,7 +1059,7 @@ class Telemetry:
|
||||
"""
|
||||
|
||||
def _operation() -> None:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
tracer = self._tracer()
|
||||
span = tracer.start_span("Template Installed")
|
||||
self._add_attribute(span, "crewai_version", version("crewai"))
|
||||
self._add_attribute(span, "template_name", template_name)
|
||||
|
||||
@@ -51,6 +51,21 @@ def test_operation_yields_non_recording_span_when_no_provider() -> None:
|
||||
assert isinstance(span, NonRecordingSpan)
|
||||
|
||||
|
||||
def test_constructing_crew_does_not_globalize_anonymous_telemetry_provider() -> None:
|
||||
agent = Agent(
|
||||
role="tester",
|
||||
goal="goal",
|
||||
backstory="backstory",
|
||||
llm=_FakeLLM(),
|
||||
allow_delegation=False,
|
||||
)
|
||||
Crew(
|
||||
agents=[agent],
|
||||
tasks=[Task(description="d", expected_output="o", agent=agent)],
|
||||
)
|
||||
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)
|
||||
|
||||
|
||||
def test_kickoff_runs_cleanly_without_provider() -> None:
|
||||
agent = Agent(
|
||||
role="tester",
|
||||
|
||||
@@ -5,7 +5,8 @@ from unittest.mock import Mock, patch
|
||||
import pytest
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.telemetry import Telemetry
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import ProxyTracerProvider
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -53,21 +54,21 @@ def test_telemetry_enabled_by_default():
|
||||
assert telemetry.ready is True
|
||||
|
||||
|
||||
def test_set_tracer_skips_when_provider_already_configured():
|
||||
"""A second telemetry instance must not re-install the global provider."""
|
||||
def test_telemetry_does_not_install_global_tracer_provider():
|
||||
with (
|
||||
patch.dict(os.environ, {}, clear=True),
|
||||
patch(
|
||||
"crewai.telemetry.telemetry.trace.get_tracer_provider",
|
||||
return_value=TracerProvider(),
|
||||
patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"CREWAI_DISABLE_TELEMETRY": "false",
|
||||
"CREWAI_DISABLE_TRACKING": "false",
|
||||
"OTEL_SDK_DISABLED": "false",
|
||||
},
|
||||
),
|
||||
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
|
||||
patch("crewai.telemetry.telemetry.TracerProvider"),
|
||||
):
|
||||
telemetry = Telemetry()
|
||||
telemetry.set_tracer()
|
||||
|
||||
mock_set.assert_not_called()
|
||||
assert telemetry.trace_set is True
|
||||
assert telemetry.ready is True
|
||||
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)
|
||||
|
||||
|
||||
def test_flow_execution_span_records_crewai_version():
|
||||
@@ -84,10 +85,10 @@ def test_flow_execution_span_records_crewai_version():
|
||||
"OTEL_SDK_DISABLED": "false",
|
||||
},
|
||||
),
|
||||
patch("crewai.telemetry.telemetry.TracerProvider"),
|
||||
patch("crewai.telemetry.telemetry.trace.get_tracer", return_value=tracer),
|
||||
patch("crewai.telemetry.telemetry.TracerProvider") as mock_provider_cls,
|
||||
patch("crewai.telemetry.telemetry.version", return_value="9.9.9"),
|
||||
):
|
||||
mock_provider_cls.return_value.get_tracer.return_value = tracer
|
||||
telemetry = Telemetry()
|
||||
telemetry.flow_execution_span("ResearchFlow", ["start", "finish"])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user