mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-25 08:08:14 +00:00
Compare commits
3 Commits
devin/1768
...
devin/1742
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
341686247c | ||
|
|
487da2af19 | ||
|
|
e1a085b106 |
@@ -2,12 +2,13 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import warnings
|
import warnings
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from importlib.metadata import version
|
from importlib.metadata import version
|
||||||
from typing import TYPE_CHECKING, Any, Optional
|
from typing import TYPE_CHECKING, Any, Optional, Sequence
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
@@ -22,7 +23,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
|||||||
OTLPSpanExporter, # noqa: E402
|
OTLPSpanExporter, # noqa: E402
|
||||||
)
|
)
|
||||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
||||||
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
||||||
|
|
||||||
@@ -31,6 +32,62 @@ if TYPE_CHECKING:
|
|||||||
from crewai.task import Task
|
from crewai.task import Task
|
||||||
|
|
||||||
|
|
||||||
|
# A custom BatchSpanProcessor that catches and suppresses all exceptions
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class SafeBatchSpanProcessor(BatchSpanProcessor):
|
||||||
|
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
|
||||||
|
|
||||||
|
This processor ensures that telemetry operations do not disrupt user code
|
||||||
|
by catching and suppressing connection and timeout errors that might occur
|
||||||
|
during span export operations.
|
||||||
|
|
||||||
|
It logs suppressed errors at the debug level for diagnostic purposes without
|
||||||
|
propagating them to calling code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
|
||||||
|
"""Override force_flush to catch and suppress all exceptions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout_millis: The maximum amount of time to wait for spans to be exported.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the flush was successful, False otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return super().force_flush(timeout_millis)
|
||||||
|
except ConnectionError as e:
|
||||||
|
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
|
||||||
|
return False
|
||||||
|
except TimeoutError as e:
|
||||||
|
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def export(self, spans: Sequence[ReadableSpan]) -> None:
|
||||||
|
"""Override export to catch and suppress all exceptions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spans: The spans to export.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if hasattr(super(), 'export'):
|
||||||
|
super().export(spans)
|
||||||
|
else:
|
||||||
|
# Call the exporter directly if super().export doesn't exist
|
||||||
|
self._span_exporter.export(spans)
|
||||||
|
except ConnectionError as e:
|
||||||
|
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
|
||||||
|
except TimeoutError as e:
|
||||||
|
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Unexpected telemetry export error: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Telemetry:
|
class Telemetry:
|
||||||
"""A class to handle anonymous telemetry for the crewai package.
|
"""A class to handle anonymous telemetry for the crewai package.
|
||||||
|
|
||||||
@@ -59,7 +116,7 @@ class Telemetry:
|
|||||||
with suppress_warnings():
|
with suppress_warnings():
|
||||||
self.provider = TracerProvider(resource=self.resource)
|
self.provider = TracerProvider(resource=self.resource)
|
||||||
|
|
||||||
processor = BatchSpanProcessor(
|
processor = SafeBatchSpanProcessor(
|
||||||
OTLPSpanExporter(
|
OTLPSpanExporter(
|
||||||
endpoint=f"{telemetry_endpoint}/v1/traces",
|
endpoint=f"{telemetry_endpoint}/v1/traces",
|
||||||
timeout=30,
|
timeout=30,
|
||||||
|
|||||||
86
tests/telemetry_test.py
Normal file
86
tests/telemetry_test.py
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
import os
|
||||||
|
from unittest.mock import Mock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
|
||||||
|
from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
|
||||||
|
|
||||||
|
|
||||||
|
class TestTelemetry:
|
||||||
|
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
|
||||||
|
|
||||||
|
def test_safe_batch_span_processor(self):
|
||||||
|
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
|
||||||
|
# Create a mock exporter that will be used by the processor
|
||||||
|
mock_exporter = Mock()
|
||||||
|
|
||||||
|
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||||
|
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||||
|
|
||||||
|
# Test force_flush with an exception
|
||||||
|
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")):
|
||||||
|
# This should not raise an exception
|
||||||
|
processor.force_flush()
|
||||||
|
|
||||||
|
# Test that the processor's export method suppresses exceptions
|
||||||
|
with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")):
|
||||||
|
# This should not raise an exception
|
||||||
|
processor.export([])
|
||||||
|
|
||||||
|
def test_telemetry_with_connection_error(self):
|
||||||
|
"""Test that telemetry connection errors are properly handled in real usage."""
|
||||||
|
# Make sure telemetry is enabled for the test
|
||||||
|
os.environ["OTEL_SDK_DISABLED"] = "false"
|
||||||
|
|
||||||
|
# Create a telemetry instance
|
||||||
|
telemetry = Telemetry()
|
||||||
|
|
||||||
|
# Verify telemetry is initialized
|
||||||
|
assert telemetry.ready is True
|
||||||
|
|
||||||
|
# Test a real telemetry operation
|
||||||
|
# This should not raise an exception even if there are connection issues
|
||||||
|
telemetry.flow_creation_span("test_flow")
|
||||||
|
|
||||||
|
# Reset environment variables
|
||||||
|
os.environ["OTEL_SDK_DISABLED"] = "true"
|
||||||
|
|
||||||
|
def test_safe_batch_span_processor_with_timeout(self):
|
||||||
|
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
|
||||||
|
# Create a mock exporter that will be used by the processor
|
||||||
|
mock_exporter = Mock()
|
||||||
|
|
||||||
|
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||||
|
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||||
|
|
||||||
|
# Test force_flush with a timeout error
|
||||||
|
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
|
||||||
|
# This should not raise an exception
|
||||||
|
processor.force_flush()
|
||||||
|
|
||||||
|
# Test that the processor's export method suppresses timeout exceptions
|
||||||
|
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
|
||||||
|
# This should not raise an exception
|
||||||
|
processor.export([])
|
||||||
|
|
||||||
|
def test_safe_batch_span_processor_with_valid_data(self):
|
||||||
|
"""Test SafeBatchSpanProcessor normal operation with valid data."""
|
||||||
|
# Create a mock exporter that will be used by the processor
|
||||||
|
mock_exporter = Mock()
|
||||||
|
|
||||||
|
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||||
|
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||||
|
|
||||||
|
# Test force_flush with no exception
|
||||||
|
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
|
||||||
|
# This should complete normally
|
||||||
|
processor.force_flush()
|
||||||
|
|
||||||
|
# Mock some valid spans
|
||||||
|
mock_spans = [Mock() for _ in range(3)]
|
||||||
|
|
||||||
|
# Test that the processor's export method works with valid data
|
||||||
|
with patch.object(mock_exporter, 'export', return_value=None):
|
||||||
|
# This should complete normally
|
||||||
|
processor.export(mock_spans)
|
||||||
Reference in New Issue
Block a user