diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 6821d5e22..142cafb2a 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import json +import logging import os import platform import warnings @@ -14,6 +15,8 @@ from crewai.telemetry.constants import ( CREWAI_TELEMETRY_SERVICE_NAME, ) +logger = logging.getLogger(__name__) + @contextmanager def suppress_warnings(): @@ -28,7 +31,10 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( ) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402 from opentelemetry.sdk.trace import TracerProvider # noqa: E402 -from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402 +from opentelemetry.sdk.trace.export import ( # noqa: E402 + BatchSpanProcessor, + SpanExportResult, +) from opentelemetry.trace import Span, Status, StatusCode # noqa: E402 if TYPE_CHECKING: @@ -36,6 +42,15 @@ if TYPE_CHECKING: from crewai.task import Task +class SafeOTLPSpanExporter(OTLPSpanExporter): + def export(self, spans) -> SpanExportResult: + try: + return super().export(spans) + except Exception as e: + logger.error(e) + return SpanExportResult.FAILURE + + class Telemetry: """A class to handle anonymous telemetry for the crewai package. @@ -64,7 +79,7 @@ class Telemetry: self.provider = TracerProvider(resource=self.resource) processor = BatchSpanProcessor( - OTLPSpanExporter( + SafeOTLPSpanExporter( endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces", timeout=30, ) diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py new file mode 100644 index 000000000..ce9682125 --- /dev/null +++ b/tests/telemetry/test_telemetry.py @@ -0,0 +1,68 @@ +import os +from unittest.mock import patch + +import pytest + +from crewai import Agent, Crew, Task +from crewai.telemetry import Telemetry + + +@pytest.mark.parametrize( + "env_var,value,expected_ready", + [ + ("OTEL_SDK_DISABLED", "true", False), + ("OTEL_SDK_DISABLED", "TRUE", False), + ("CREWAI_DISABLE_TELEMETRY", "true", False), + ("CREWAI_DISABLE_TELEMETRY", "TRUE", False), + ("OTEL_SDK_DISABLED", "false", True), + ("CREWAI_DISABLE_TELEMETRY", "false", True), + ], +) +def test_telemetry_environment_variables(env_var, value, expected_ready): + """Test telemetry state with different environment variable configurations.""" + with patch.dict(os.environ, {env_var: value}): + with patch("crewai.telemetry.telemetry.TracerProvider"): + telemetry = Telemetry() + assert telemetry.ready is expected_ready + + +def test_telemetry_enabled_by_default(): + """Test that telemetry is enabled by default.""" + with patch.dict(os.environ, {}, clear=True): + with patch("crewai.telemetry.telemetry.TracerProvider"): + telemetry = Telemetry() + assert telemetry.ready is True + + +from opentelemetry import trace + + +@patch("crewai.telemetry.telemetry.logger.error") +@patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export", + side_effect=Exception("Test exception"), +) +def test_telemetry_fails_due_connect_timeout(export_mock, logger_mock): + error = Exception("Test exception") + export_mock.side_effect = error + + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("test-span"): + base_agent = Agent( + role="base_agent", + llm="gpt-4o-mini", + goal="Just say hi", + backstory="You are a helpful assistant that just says hi", + ) + base_task = Task( + description="Just say hi", + expected_output="hi", + agent=base_agent, + ) + crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew") + crew.kickoff() + + trace.get_tracer_provider().force_flush() + + export_mock.assert_called_once() + logger_mock.assert_called_once_with(error)