Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
341686247c Implement PR feedback: Add logging, specific exception handling, type hints, and enhanced tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-22 15:04:37 +00:00
Devin AI
487da2af19 Fix import order in telemetry_test.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-22 14:58:49 +00:00
Devin AI
e1a085b106 Fix issue #2444: Add error handling to telemetry span processor to prevent connection errors from propagating to user code
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-22 14:57:14 +00:00
2 changed files with 146 additions and 3 deletions

View File

@@ -2,12 +2,13 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import platform
import warnings
from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Sequence
@contextmanager
@@ -22,7 +23,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
@@ -31,6 +32,62 @@ if TYPE_CHECKING:
from crewai.task import Task
# A custom BatchSpanProcessor that catches and suppresses all exceptions
logger = logging.getLogger(__name__)
class SafeBatchSpanProcessor(BatchSpanProcessor):
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
This processor ensures that telemetry operations do not disrupt user code
by catching and suppressing connection and timeout errors that might occur
during span export operations.
It logs suppressed errors at the debug level for diagnostic purposes without
propagating them to calling code.
"""
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
"""Override force_flush to catch and suppress all exceptions.
Args:
timeout_millis: The maximum amount of time to wait for spans to be exported.
Returns:
bool: True if the flush was successful, False otherwise.
"""
try:
return super().force_flush(timeout_millis)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
return False
except TimeoutError as e:
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
return False
except Exception as e:
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
return False
def export(self, spans: Sequence[ReadableSpan]) -> None:
"""Override export to catch and suppress all exceptions.
Args:
spans: The spans to export.
"""
try:
if hasattr(super(), 'export'):
super().export(spans)
else:
# Call the exporter directly if super().export doesn't exist
self._span_exporter.export(spans)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
except TimeoutError as e:
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
except Exception as e:
logger.debug(f"Unexpected telemetry export error: {str(e)}")
class Telemetry:
"""A class to handle anonymous telemetry for the crewai package.
@@ -59,7 +116,7 @@ class Telemetry:
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
processor = SafeBatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{telemetry_endpoint}/v1/traces",
timeout=30,

86
tests/telemetry_test.py Normal file
View File

@@ -0,0 +1,86 @@
import os
from unittest.mock import Mock, patch
import pytest
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
class TestTelemetry:
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
def test_safe_batch_span_processor(self):
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with an exception
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")):
# This should not raise an exception
processor.force_flush()
# Test that the processor's export method suppresses exceptions
with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")):
# This should not raise an exception
processor.export([])
def test_telemetry_with_connection_error(self):
"""Test that telemetry connection errors are properly handled in real usage."""
# Make sure telemetry is enabled for the test
os.environ["OTEL_SDK_DISABLED"] = "false"
# Create a telemetry instance
telemetry = Telemetry()
# Verify telemetry is initialized
assert telemetry.ready is True
# Test a real telemetry operation
# This should not raise an exception even if there are connection issues
telemetry.flow_creation_span("test_flow")
# Reset environment variables
os.environ["OTEL_SDK_DISABLED"] = "true"
def test_safe_batch_span_processor_with_timeout(self):
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with a timeout error
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.force_flush()
# Test that the processor's export method suppresses timeout exceptions
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.export([])
def test_safe_batch_span_processor_with_valid_data(self):
"""Test SafeBatchSpanProcessor normal operation with valid data."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with no exception
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
# This should complete normally
processor.force_flush()
# Mock some valid spans
mock_spans = [Mock() for _ in range(3)]
# Test that the processor's export method works with valid data
with patch.object(mock_exporter, 'export', return_value=None):
# This should complete normally
processor.export(mock_spans)