Implement PR feedback: Add logging, specific exception handling, type hints, and enhanced tests

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-03-22 15:04:37 +00:00
parent 487da2af19
commit 341686247c
2 changed files with 93 additions and 14 deletions

View File

@@ -2,12 +2,13 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import platform
import warnings
from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Sequence
@contextmanager
@@ -22,7 +23,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
@@ -32,22 +33,59 @@ if TYPE_CHECKING:
# A custom BatchSpanProcessor that catches and suppresses all exceptions
logger = logging.getLogger(__name__)
class SafeBatchSpanProcessor(BatchSpanProcessor):
"""A wrapper around BatchSpanProcessor that suppresses all exceptions."""
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
def force_flush(self, timeout_millis=None):
"""Override force_flush to catch and suppress all exceptions."""
try:
super().force_flush(timeout_millis)
except Exception:
pass
This processor ensures that telemetry operations do not disrupt user code
by catching and suppressing connection and timeout errors that might occur
during span export operations.
It logs suppressed errors at the debug level for diagnostic purposes without
propagating them to calling code.
"""
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
"""Override force_flush to catch and suppress all exceptions.
Args:
timeout_millis: The maximum amount of time to wait for spans to be exported.
def export(self, spans):
"""Override export to catch and suppress all exceptions."""
Returns:
bool: True if the flush was successful, False otherwise.
"""
try:
return super().export(spans)
except Exception:
pass
return super().force_flush(timeout_millis)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
return False
except TimeoutError as e:
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
return False
except Exception as e:
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
return False
def export(self, spans: Sequence[ReadableSpan]) -> None:
"""Override export to catch and suppress all exceptions.
Args:
spans: The spans to export.
"""
try:
if hasattr(super(), 'export'):
super().export(spans)
else:
# Call the exporter directly if super().export doesn't exist
self._span_exporter.export(spans)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
except TimeoutError as e:
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
except Exception as e:
logger.debug(f"Unexpected telemetry export error: {str(e)}")
class Telemetry:

View File

@@ -8,6 +8,8 @@ from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
class TestTelemetry:
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
def test_safe_batch_span_processor(self):
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
# Create a mock exporter that will be used by the processor
@@ -43,3 +45,42 @@ class TestTelemetry:
# Reset environment variables
os.environ["OTEL_SDK_DISABLED"] = "true"
def test_safe_batch_span_processor_with_timeout(self):
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with a timeout error
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.force_flush()
# Test that the processor's export method suppresses timeout exceptions
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.export([])
def test_safe_batch_span_processor_with_valid_data(self):
"""Test SafeBatchSpanProcessor normal operation with valid data."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with no exception
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
# This should complete normally
processor.force_flush()
# Mock some valid spans
mock_spans = [Mock() for _ in range(3)]
# Test that the processor's export method works with valid data
with patch.object(mock_exporter, 'export', return_value=None):
# This should complete normally
processor.export(mock_spans)