From 60f405fe542cc5a54896395c7f340e253e3f63a4 Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Tue, 23 Jun 2026 13:46:55 -0300 Subject: [PATCH] fix: stop anonymous telemetry from globalizing the TracerProvider `Telemetry.set_tracer()` installed crewAI's anonymous SDK `TracerProvider` into OpenTelemetry's process-global slot, so the first `Crew` constructed in a test or host application replaced the default `ProxyTracerProvider` and exfiltrated every host span emitted via `trace.get_tracer(...)` to crewAI's OTLP endpoint. Keep the provider local to the `Telemetry` instance and route every anonymous span through `self.provider.get_tracer("crewai.telemetry")` so the global slot stays untouched. Mirrors the fix in `crewai_core.telemetry`, drops the now-dead `set_tracer()` calls in `event_listener.py` and `crewai_cli.command`, and adds regression coverage that asserts the provider stays a `ProxyTracerProvider` after constructing a `Crew`. --- lib/cli/src/crewai_cli/command.py | 2 - lib/crewai-core/src/crewai_core/telemetry.py | 42 +++++------ lib/crewai-core/tests/test_smoke.py | 26 ++----- .../src/crewai/events/event_listener.py | 1 - lib/crewai/src/crewai/telemetry/telemetry.py | 72 +++++++++---------- lib/crewai/tests/telemetry/test_otel_noop.py | 15 ++++ lib/crewai/tests/telemetry/test_telemetry.py | 29 ++++---- 7 files changed, 84 insertions(+), 103 deletions(-) diff --git a/lib/cli/src/crewai_cli/command.py b/lib/cli/src/crewai_cli/command.py index d5e62cf55..7571c0103 100644 --- a/lib/cli/src/crewai_cli/command.py +++ b/lib/cli/src/crewai_cli/command.py @@ -20,13 +20,11 @@ class AuthenticationRequiredError(SystemExit): class BaseCommand: def __init__(self) -> None: self._telemetry = Telemetry() - self._telemetry.set_tracer() class PlusAPIMixin: def __init__(self, telemetry: Telemetry) -> None: try: - telemetry.set_tracer() self.plus_api_client = PlusAPI(api_key=get_auth_token()) except Exception: telemetry.deploy_signup_error_span() diff --git a/lib/crewai-core/src/crewai_core/telemetry.py b/lib/crewai-core/src/crewai_core/telemetry.py index 08aef9b71..7ec5d58ec 100644 --- a/lib/crewai-core/src/crewai_core/telemetry.py +++ b/lib/crewai-core/src/crewai_core/telemetry.py @@ -19,7 +19,6 @@ import os import threading from typing import Any, ClassVar, Final -from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider @@ -27,7 +26,7 @@ from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, SpanExportResult, ) -from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode +from opentelemetry.trace import Span, Status, StatusCode, Tracer from typing_extensions import Self @@ -70,6 +69,12 @@ class Telemetry: crewai's runtime extends this with crew/agent/task/tool/flow execution spans and event-bus signal handlers (see ``crewai.telemetry.telemetry``). + + The anonymous-telemetry pipeline owns a private ``TracerProvider`` that is + never installed as OpenTelemetry's global provider. Host applications keep + full control of the process-wide provider slot, and any host spans emitted + through ``crewai.telemetry.otel.operation`` stay on the host pipeline + rather than getting exfiltrated to crewAI's OTLP endpoint. """ _instance: ClassVar[Self | None] = None @@ -88,7 +93,6 @@ class Telemetry: return self.ready: bool = False - self.trace_set: bool = False self._initialized: bool = True if self._is_telemetry_disabled(): @@ -144,21 +148,9 @@ class Telemetry: except Exception as e: logger.debug("Telemetry shutdown failed: %s", e) - def set_tracer(self) -> None: - """Install our TracerProvider as the global one (idempotent).""" - if self.ready and not self.trace_set: - try: - with suppress_warnings(): - existing_provider = trace.get_tracer_provider() - if not isinstance(existing_provider, ProxyTracerProvider): - self.trace_set = True - return - trace.set_tracer_provider(self.provider) - self.trace_set = True - except Exception as e: - logger.debug("Failed to set tracer provider: %s", e) - self.ready = False - self.trace_set = False + def _tracer(self) -> Tracer: + """Return the anonymous-telemetry tracer from the private provider.""" + return self.provider.get_tracer("crewai.telemetry") def _safe_telemetry_operation( self, operation: Callable[[], Span | None] @@ -194,7 +186,7 @@ class Telemetry: """Records when an error occurs during the deployment signup process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Deploy Signup Error") close_span(span) @@ -204,7 +196,7 @@ class Telemetry: """Records the start of a deployment process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Start Deployment") if uuid: self._add_attribute(span, "uuid", uuid) @@ -216,7 +208,7 @@ class Telemetry: """Records the creation of a new crew deployment.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Create Crew Deployment") close_span(span) @@ -228,7 +220,7 @@ class Telemetry: """Records the retrieval of crew logs.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Get Crew Logs") self._add_attribute(span, "log_type", log_type) if uuid: @@ -241,7 +233,7 @@ class Telemetry: """Records the removal of a crew.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Remove Crew") if uuid: self._add_attribute(span, "uuid", uuid) @@ -253,7 +245,7 @@ class Telemetry: """Records the creation of a new flow.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Creation") self._add_attribute(span, "flow_name", flow_name) close_span(span) @@ -265,7 +257,7 @@ class Telemetry: from crewai_core.version import get_crewai_version def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Template Installed") self._add_attribute(span, "crewai_version", get_crewai_version()) self._add_attribute(span, "template_name", template_name) diff --git a/lib/crewai-core/tests/test_smoke.py b/lib/crewai-core/tests/test_smoke.py index 7980cfd1e..3e65752b6 100644 --- a/lib/crewai-core/tests/test_smoke.py +++ b/lib/crewai-core/tests/test_smoke.py @@ -13,7 +13,8 @@ from crewai_core import ( user_data, version, ) -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace +from opentelemetry.trace import ProxyTracerProvider import pytest @@ -97,7 +98,7 @@ def test_unused_var_warning_silenced() -> None: assert os.environ is not None -def test_core_telemetry_skips_duplicate_tracer_provider( +def test_core_telemetry_does_not_install_global_tracer_provider( monkeypatch: pytest.MonkeyPatch, ) -> None: from crewai_core.telemetry import Telemetry @@ -107,24 +108,7 @@ def test_core_telemetry_skips_duplicate_tracer_provider( monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False) monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False) - monkeypatch.setattr( - "crewai_core.telemetry.trace.get_tracer_provider", - lambda: TracerProvider(), - ) - - called = False - - def fail_if_called(provider: object) -> None: - nonlocal called - called = True - - monkeypatch.setattr( - "crewai_core.telemetry.trace.set_tracer_provider", - fail_if_called, - ) - telemetry = Telemetry() - telemetry.set_tracer() - assert called is False - assert telemetry.trace_set is True + assert telemetry.ready is True + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 883147478..9d35c4814 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -149,7 +149,6 @@ class EventListener(BaseEventListener): if not self._initialized: super().__init__() self._telemetry = Telemetry() - self._telemetry.set_tracer() self.execution_spans = {} self._initialized = True self.formatter = ConsoleFormatter(verbose=True) diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index c6c0060d1..ad3a1bb0c 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -20,7 +20,6 @@ import signal import threading from typing import TYPE_CHECKING, Any -from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter, ) @@ -30,7 +29,7 @@ from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, SpanExportResult, ) -from opentelemetry.trace import ProxyTracerProvider, Span +from opentelemetry.trace import Span, Tracer from typing_extensions import Self from crewai.events.event_bus import crewai_event_bus @@ -90,11 +89,17 @@ class SafeOTLPSpanExporter(OTLPSpanExporter): class Telemetry: """Handle anonymous telemetry for the CrewAI package. + The anonymous-telemetry pipeline owns a private ``TracerProvider`` that + is never installed as OpenTelemetry's global provider. Host applications + keep full control of the process-wide provider slot, and any host spans + emitted through ``crewai.telemetry.otel.operation`` stay on the host + pipeline rather than getting exfiltrated to crewAI's OTLP endpoint. + Attributes: ready: Whether telemetry is initialized and ready. - trace_set: Whether the tracer provider has been set. resource: OpenTelemetry resource for the telemetry service. - provider: OpenTelemetry tracer provider. + provider: Local OpenTelemetry tracer provider that is NOT registered + globally; all anonymous spans are emitted through it directly. """ _instance = None @@ -113,7 +118,6 @@ class Telemetry: return self.ready: bool = False - self.trace_set: bool = False self._initialized: bool = True if self._is_telemetry_disabled(): @@ -157,21 +161,9 @@ class Telemetry: """Check if telemetry operations should be executed.""" return self.ready and not self._is_telemetry_disabled() - def set_tracer(self) -> None: - """Set the tracer provider if ready and not already set.""" - if self.ready and not self.trace_set: - try: - with suppress_warnings(): - existing_provider = trace.get_tracer_provider() - if not isinstance(existing_provider, ProxyTracerProvider): - self.trace_set = True - return - trace.set_tracer_provider(self.provider) - self.trace_set = True - except Exception as e: - logger.debug(f"Failed to set tracer provider: {e}") - self.ready = False - self.trace_set = False + def _tracer(self) -> Tracer: + """Return the anonymous-telemetry tracer from the private provider.""" + return self.provider.get_tracer("crewai.telemetry") def _register_shutdown_handlers(self) -> None: """Register handlers for graceful shutdown on process exit and signals.""" @@ -275,7 +267,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Created") self._add_attribute( span, @@ -487,7 +479,7 @@ class Telemetry: """ def _operation() -> Span: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() created_span = tracer.start_span("Task Created") @@ -581,7 +573,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Repeated Usage") self._add_attribute( span, @@ -609,7 +601,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Usage") self._add_attribute( span, @@ -638,7 +630,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Usage Error") self._add_attribute( span, @@ -669,7 +661,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Individual Test Result") self._add_attribute( @@ -704,7 +696,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Test Execution") self._add_attribute( @@ -729,7 +721,7 @@ class Telemetry: """Records when an error occurs during the deployment signup process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Deploy Signup Error") close_span(span) @@ -743,7 +735,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Start Deployment") if uuid: self._add_attribute(span, "uuid", uuid) @@ -755,7 +747,7 @@ class Telemetry: """Records the creation of a new crew deployment.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Create Crew Deployment") close_span(span) @@ -772,7 +764,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Get Crew Logs") self._add_attribute(span, "log_type", log_type) if uuid: @@ -789,7 +781,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Remove Crew") if uuid: self._add_attribute(span, "uuid", uuid) @@ -814,7 +806,7 @@ class Telemetry: self.crew_creation(crew, inputs) def _operation() -> Span: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Execution") self._add_attribute( span, @@ -947,7 +939,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Creation") self._add_attribute(span, "flow_name", flow_name) close_span(span) @@ -963,7 +955,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Plotting") self._add_attribute(span, "flow_name", flow_name) self._add_attribute(span, "node_names", json.dumps(node_names)) @@ -980,7 +972,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Execution") self._add_attribute( span, @@ -997,7 +989,7 @@ class Telemetry: """Records the coding tool environment context.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Environment Context") self._add_attribute( span, @@ -1028,7 +1020,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Human Feedback") self._add_attribute(span, "event_type", event_type) self._add_attribute(span, "has_routing", has_routing) @@ -1050,7 +1042,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Feature Usage") self._add_attribute(span, "crewai_version", version("crewai")) self._add_attribute(span, "feature", feature) @@ -1067,7 +1059,7 @@ class Telemetry: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Template Installed") self._add_attribute(span, "crewai_version", version("crewai")) self._add_attribute(span, "template_name", template_name) diff --git a/lib/crewai/tests/telemetry/test_otel_noop.py b/lib/crewai/tests/telemetry/test_otel_noop.py index 142bec44e..0b34e1b33 100644 --- a/lib/crewai/tests/telemetry/test_otel_noop.py +++ b/lib/crewai/tests/telemetry/test_otel_noop.py @@ -51,6 +51,21 @@ def test_operation_yields_non_recording_span_when_no_provider() -> None: assert isinstance(span, NonRecordingSpan) +def test_constructing_crew_does_not_globalize_anonymous_telemetry_provider() -> None: + agent = Agent( + role="tester", + goal="goal", + backstory="backstory", + llm=_FakeLLM(), + allow_delegation=False, + ) + Crew( + agents=[agent], + tasks=[Task(description="d", expected_output="o", agent=agent)], + ) + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) + + def test_kickoff_runs_cleanly_without_provider() -> None: agent = Agent( role="tester", diff --git a/lib/crewai/tests/telemetry/test_telemetry.py b/lib/crewai/tests/telemetry/test_telemetry.py index 3bb05b8bd..d41b85355 100644 --- a/lib/crewai/tests/telemetry/test_telemetry.py +++ b/lib/crewai/tests/telemetry/test_telemetry.py @@ -5,7 +5,8 @@ from unittest.mock import Mock, patch import pytest from crewai import Agent, Crew, Task from crewai.telemetry import Telemetry -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace +from opentelemetry.trace import ProxyTracerProvider @pytest.fixture(autouse=True) @@ -53,21 +54,21 @@ def test_telemetry_enabled_by_default(): assert telemetry.ready is True -def test_set_tracer_skips_when_provider_already_configured(): - """A second telemetry instance must not re-install the global provider.""" +def test_telemetry_does_not_install_global_tracer_provider(): with ( - patch.dict(os.environ, {}, clear=True), - patch( - "crewai.telemetry.telemetry.trace.get_tracer_provider", - return_value=TracerProvider(), + patch.dict( + os.environ, + { + "CREWAI_DISABLE_TELEMETRY": "false", + "CREWAI_DISABLE_TRACKING": "false", + "OTEL_SDK_DISABLED": "false", + }, ), - patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set, + patch("crewai.telemetry.telemetry.TracerProvider"), ): telemetry = Telemetry() - telemetry.set_tracer() - - mock_set.assert_not_called() - assert telemetry.trace_set is True + assert telemetry.ready is True + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) def test_flow_execution_span_records_crewai_version(): @@ -84,10 +85,10 @@ def test_flow_execution_span_records_crewai_version(): "OTEL_SDK_DISABLED": "false", }, ), - patch("crewai.telemetry.telemetry.TracerProvider"), - patch("crewai.telemetry.telemetry.trace.get_tracer", return_value=tracer), + patch("crewai.telemetry.telemetry.TracerProvider") as mock_provider_cls, patch("crewai.telemetry.telemetry.version", return_value="9.9.9"), ): + mock_provider_cls.return_value.get_tracer.return_value = tracer telemetry = Telemetry() telemetry.flow_execution_span("ResearchFlow", ["start", "finish"])