mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
3 Commits
devin/1750
...
devin/1742
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
341686247c | ||
|
|
487da2af19 | ||
|
|
e1a085b106 |
@@ -2,12 +2,13 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
from typing import TYPE_CHECKING, Any, Optional, Sequence
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -22,7 +23,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter, # noqa: E402
|
||||
)
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
||||
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
||||
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
||||
|
||||
@@ -31,6 +32,62 @@ if TYPE_CHECKING:
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
# A custom BatchSpanProcessor that catches and suppresses all exceptions
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SafeBatchSpanProcessor(BatchSpanProcessor):
|
||||
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
|
||||
|
||||
This processor ensures that telemetry operations do not disrupt user code
|
||||
by catching and suppressing connection and timeout errors that might occur
|
||||
during span export operations.
|
||||
|
||||
It logs suppressed errors at the debug level for diagnostic purposes without
|
||||
propagating them to calling code.
|
||||
"""
|
||||
|
||||
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
|
||||
"""Override force_flush to catch and suppress all exceptions.
|
||||
|
||||
Args:
|
||||
timeout_millis: The maximum amount of time to wait for spans to be exported.
|
||||
|
||||
Returns:
|
||||
bool: True if the flush was successful, False otherwise.
|
||||
"""
|
||||
try:
|
||||
return super().force_flush(timeout_millis)
|
||||
except ConnectionError as e:
|
||||
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
|
||||
return False
|
||||
except TimeoutError as e:
|
||||
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
|
||||
return False
|
||||
|
||||
def export(self, spans: Sequence[ReadableSpan]) -> None:
|
||||
"""Override export to catch and suppress all exceptions.
|
||||
|
||||
Args:
|
||||
spans: The spans to export.
|
||||
"""
|
||||
try:
|
||||
if hasattr(super(), 'export'):
|
||||
super().export(spans)
|
||||
else:
|
||||
# Call the exporter directly if super().export doesn't exist
|
||||
self._span_exporter.export(spans)
|
||||
except ConnectionError as e:
|
||||
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
|
||||
except TimeoutError as e:
|
||||
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Unexpected telemetry export error: {str(e)}")
|
||||
|
||||
|
||||
|
||||
class Telemetry:
|
||||
"""A class to handle anonymous telemetry for the crewai package.
|
||||
|
||||
@@ -59,7 +116,7 @@ class Telemetry:
|
||||
with suppress_warnings():
|
||||
self.provider = TracerProvider(resource=self.resource)
|
||||
|
||||
processor = BatchSpanProcessor(
|
||||
processor = SafeBatchSpanProcessor(
|
||||
OTLPSpanExporter(
|
||||
endpoint=f"{telemetry_endpoint}/v1/traces",
|
||||
timeout=30,
|
||||
|
||||
86
tests/telemetry_test.py
Normal file
86
tests/telemetry_test.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
|
||||
|
||||
|
||||
class TestTelemetry:
|
||||
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
|
||||
|
||||
def test_safe_batch_span_processor(self):
|
||||
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with an exception
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")):
|
||||
# This should not raise an exception
|
||||
processor.force_flush()
|
||||
|
||||
# Test that the processor's export method suppresses exceptions
|
||||
with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")):
|
||||
# This should not raise an exception
|
||||
processor.export([])
|
||||
|
||||
def test_telemetry_with_connection_error(self):
|
||||
"""Test that telemetry connection errors are properly handled in real usage."""
|
||||
# Make sure telemetry is enabled for the test
|
||||
os.environ["OTEL_SDK_DISABLED"] = "false"
|
||||
|
||||
# Create a telemetry instance
|
||||
telemetry = Telemetry()
|
||||
|
||||
# Verify telemetry is initialized
|
||||
assert telemetry.ready is True
|
||||
|
||||
# Test a real telemetry operation
|
||||
# This should not raise an exception even if there are connection issues
|
||||
telemetry.flow_creation_span("test_flow")
|
||||
|
||||
# Reset environment variables
|
||||
os.environ["OTEL_SDK_DISABLED"] = "true"
|
||||
|
||||
def test_safe_batch_span_processor_with_timeout(self):
|
||||
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with a timeout error
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
|
||||
# This should not raise an exception
|
||||
processor.force_flush()
|
||||
|
||||
# Test that the processor's export method suppresses timeout exceptions
|
||||
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
|
||||
# This should not raise an exception
|
||||
processor.export([])
|
||||
|
||||
def test_safe_batch_span_processor_with_valid_data(self):
|
||||
"""Test SafeBatchSpanProcessor normal operation with valid data."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with no exception
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
|
||||
# This should complete normally
|
||||
processor.force_flush()
|
||||
|
||||
# Mock some valid spans
|
||||
mock_spans = [Mock() for _ in range(3)]
|
||||
|
||||
# Test that the processor's export method works with valid data
|
||||
with patch.object(mock_exporter, 'export', return_value=None):
|
||||
# This should complete normally
|
||||
processor.export(mock_spans)
|
||||
Reference in New Issue
Block a user