From 341686247c4de9cc9052748175365ed103dbdee1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 22 Mar 2025 15:04:37 +0000 Subject: [PATCH] Implement PR feedback: Add logging, specific exception handling, type hints, and enhanced tests Co-Authored-By: Joe Moura --- src/crewai/telemetry/telemetry.py | 66 ++++++++++++++++++++++++------- tests/telemetry_test.py | 41 +++++++++++++++++++ 2 files changed, 93 insertions(+), 14 deletions(-) diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 4e2ad1df9..833adcd31 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -2,12 +2,13 @@ from __future__ import annotations import asyncio import json +import logging import os import platform import warnings from contextlib import contextmanager from importlib.metadata import version -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Sequence @contextmanager @@ -22,7 +23,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter, # noqa: E402 ) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402 -from opentelemetry.sdk.trace import TracerProvider # noqa: E402 +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402 from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402 from opentelemetry.trace import Span, Status, StatusCode # noqa: E402 @@ -32,22 +33,59 @@ if TYPE_CHECKING: # A custom BatchSpanProcessor that catches and suppresses all exceptions +logger = logging.getLogger(__name__) + class SafeBatchSpanProcessor(BatchSpanProcessor): - """A wrapper around BatchSpanProcessor that suppresses all exceptions.""" + """A wrapper around BatchSpanProcessor that suppresses all exceptions. - def force_flush(self, timeout_millis=None): - """Override force_flush to catch and suppress all exceptions.""" - try: - super().force_flush(timeout_millis) - except Exception: - pass + This processor ensures that telemetry operations do not disrupt user code + by catching and suppressing connection and timeout errors that might occur + during span export operations. + + It logs suppressed errors at the debug level for diagnostic purposes without + propagating them to calling code. + """ + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + """Override force_flush to catch and suppress all exceptions. + + Args: + timeout_millis: The maximum amount of time to wait for spans to be exported. - def export(self, spans): - """Override export to catch and suppress all exceptions.""" + Returns: + bool: True if the flush was successful, False otherwise. + """ try: - return super().export(spans) - except Exception: - pass + return super().force_flush(timeout_millis) + except ConnectionError as e: + logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}") + return False + except TimeoutError as e: + logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}") + return False + except Exception as e: + logger.debug(f"Unexpected telemetry force_flush error: {str(e)}") + return False + + def export(self, spans: Sequence[ReadableSpan]) -> None: + """Override export to catch and suppress all exceptions. + + Args: + spans: The spans to export. + """ + try: + if hasattr(super(), 'export'): + super().export(spans) + else: + # Call the exporter directly if super().export doesn't exist + self._span_exporter.export(spans) + except ConnectionError as e: + logger.debug(f"Suppressed telemetry export connection error: {str(e)}") + except TimeoutError as e: + logger.debug(f"Suppressed telemetry export timeout: {str(e)}") + except Exception as e: + logger.debug(f"Unexpected telemetry export error: {str(e)}") + class Telemetry: diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index d4b0bb7a0..5ab12cc5d 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -8,6 +8,8 @@ from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry class TestTelemetry: + """Test suite for Telemetry functionality focusing on error handling and span processing.""" + def test_safe_batch_span_processor(self): """Test that SafeBatchSpanProcessor properly suppresses exceptions.""" # Create a mock exporter that will be used by the processor @@ -43,3 +45,42 @@ class TestTelemetry: # Reset environment variables os.environ["OTEL_SDK_DISABLED"] = "true" + + def test_safe_batch_span_processor_with_timeout(self): + """Test that SafeBatchSpanProcessor properly handles timeout errors.""" + # Create a mock exporter that will be used by the processor + mock_exporter = Mock() + + # Create a SafeBatchSpanProcessor with the mock exporter + processor = SafeBatchSpanProcessor(mock_exporter) + + # Test force_flush with a timeout error + with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")): + # This should not raise an exception + processor.force_flush() + + # Test that the processor's export method suppresses timeout exceptions + with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")): + # This should not raise an exception + processor.export([]) + + def test_safe_batch_span_processor_with_valid_data(self): + """Test SafeBatchSpanProcessor normal operation with valid data.""" + # Create a mock exporter that will be used by the processor + mock_exporter = Mock() + + # Create a SafeBatchSpanProcessor with the mock exporter + processor = SafeBatchSpanProcessor(mock_exporter) + + # Test force_flush with no exception + with patch.object(BatchSpanProcessor, 'force_flush', return_value=None): + # This should complete normally + processor.force_flush() + + # Mock some valid spans + mock_spans = [Mock() for _ in range(3)] + + # Test that the processor's export method works with valid data + with patch.object(mock_exporter, 'export', return_value=None): + # This should complete normally + processor.export(mock_spans)