diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 559ca8d4f..4e2ad1df9 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -31,6 +31,25 @@ if TYPE_CHECKING: from crewai.task import Task +# A custom BatchSpanProcessor that catches and suppresses all exceptions +class SafeBatchSpanProcessor(BatchSpanProcessor): + """A wrapper around BatchSpanProcessor that suppresses all exceptions.""" + + def force_flush(self, timeout_millis=None): + """Override force_flush to catch and suppress all exceptions.""" + try: + super().force_flush(timeout_millis) + except Exception: + pass + + def export(self, spans): + """Override export to catch and suppress all exceptions.""" + try: + return super().export(spans) + except Exception: + pass + + class Telemetry: """A class to handle anonymous telemetry for the crewai package. @@ -59,7 +78,7 @@ class Telemetry: with suppress_warnings(): self.provider = TracerProvider(resource=self.resource) - processor = BatchSpanProcessor( + processor = SafeBatchSpanProcessor( OTLPSpanExporter( endpoint=f"{telemetry_endpoint}/v1/traces", timeout=30, diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py new file mode 100644 index 000000000..90f27e5c3 --- /dev/null +++ b/tests/telemetry_test.py @@ -0,0 +1,43 @@ +import os +import pytest +from unittest.mock import patch, Mock +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from crewai.telemetry.telemetry import Telemetry, SafeBatchSpanProcessor + + +class TestTelemetry: + def test_safe_batch_span_processor(self): + """Test that SafeBatchSpanProcessor properly suppresses exceptions.""" + # Create a mock exporter that will be used by the processor + mock_exporter = Mock() + + # Create a SafeBatchSpanProcessor with the mock exporter + processor = SafeBatchSpanProcessor(mock_exporter) + + # Test force_flush with an exception + with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")): + # This should not raise an exception + processor.force_flush() + + # Test that the processor's export method suppresses exceptions + with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")): + # This should not raise an exception + processor.export([]) + + def test_telemetry_with_connection_error(self): + """Test that telemetry connection errors are properly handled in real usage.""" + # Make sure telemetry is enabled for the test + os.environ["OTEL_SDK_DISABLED"] = "false" + + # Create a telemetry instance + telemetry = Telemetry() + + # Verify telemetry is initialized + assert telemetry.ready is True + + # Test a real telemetry operation + # This should not raise an exception even if there are connection issues + telemetry.flow_creation_span("test_flow") + + # Reset environment variables + os.environ["OTEL_SDK_DISABLED"] = "true"