mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
feat: prevent crash once Telemetry is not available
This commit is contained in:
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import warnings
|
import warnings
|
||||||
@@ -14,6 +15,8 @@ from crewai.telemetry.constants import (
|
|||||||
CREWAI_TELEMETRY_SERVICE_NAME,
|
CREWAI_TELEMETRY_SERVICE_NAME,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def suppress_warnings():
|
def suppress_warnings():
|
||||||
@@ -28,7 +31,10 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
|||||||
)
|
)
|
||||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
from opentelemetry.sdk.trace.export import ( # noqa: E402
|
||||||
|
BatchSpanProcessor,
|
||||||
|
SpanExportResult,
|
||||||
|
)
|
||||||
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -36,6 +42,15 @@ if TYPE_CHECKING:
|
|||||||
from crewai.task import Task
|
from crewai.task import Task
|
||||||
|
|
||||||
|
|
||||||
|
class SafeOTLPSpanExporter(OTLPSpanExporter):
|
||||||
|
def export(self, spans) -> SpanExportResult:
|
||||||
|
try:
|
||||||
|
return super().export(spans)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(e)
|
||||||
|
return SpanExportResult.FAILURE
|
||||||
|
|
||||||
|
|
||||||
class Telemetry:
|
class Telemetry:
|
||||||
"""A class to handle anonymous telemetry for the crewai package.
|
"""A class to handle anonymous telemetry for the crewai package.
|
||||||
|
|
||||||
@@ -64,7 +79,7 @@ class Telemetry:
|
|||||||
self.provider = TracerProvider(resource=self.resource)
|
self.provider = TracerProvider(resource=self.resource)
|
||||||
|
|
||||||
processor = BatchSpanProcessor(
|
processor = BatchSpanProcessor(
|
||||||
OTLPSpanExporter(
|
SafeOTLPSpanExporter(
|
||||||
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
|
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
|
||||||
timeout=30,
|
timeout=30,
|
||||||
)
|
)
|
||||||
|
|||||||
68
tests/telemetry/test_telemetry.py
Normal file
68
tests/telemetry/test_telemetry.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import os
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai import Agent, Crew, Task
|
||||||
|
from crewai.telemetry import Telemetry
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"env_var,value,expected_ready",
|
||||||
|
[
|
||||||
|
("OTEL_SDK_DISABLED", "true", False),
|
||||||
|
("OTEL_SDK_DISABLED", "TRUE", False),
|
||||||
|
("CREWAI_DISABLE_TELEMETRY", "true", False),
|
||||||
|
("CREWAI_DISABLE_TELEMETRY", "TRUE", False),
|
||||||
|
("OTEL_SDK_DISABLED", "false", True),
|
||||||
|
("CREWAI_DISABLE_TELEMETRY", "false", True),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_telemetry_environment_variables(env_var, value, expected_ready):
|
||||||
|
"""Test telemetry state with different environment variable configurations."""
|
||||||
|
with patch.dict(os.environ, {env_var: value}):
|
||||||
|
with patch("crewai.telemetry.telemetry.TracerProvider"):
|
||||||
|
telemetry = Telemetry()
|
||||||
|
assert telemetry.ready is expected_ready
|
||||||
|
|
||||||
|
|
||||||
|
def test_telemetry_enabled_by_default():
|
||||||
|
"""Test that telemetry is enabled by default."""
|
||||||
|
with patch.dict(os.environ, {}, clear=True):
|
||||||
|
with patch("crewai.telemetry.telemetry.TracerProvider"):
|
||||||
|
telemetry = Telemetry()
|
||||||
|
assert telemetry.ready is True
|
||||||
|
|
||||||
|
|
||||||
|
from opentelemetry import trace
|
||||||
|
|
||||||
|
|
||||||
|
@patch("crewai.telemetry.telemetry.logger.error")
|
||||||
|
@patch(
|
||||||
|
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",
|
||||||
|
side_effect=Exception("Test exception"),
|
||||||
|
)
|
||||||
|
def test_telemetry_fails_due_connect_timeout(export_mock, logger_mock):
|
||||||
|
error = Exception("Test exception")
|
||||||
|
export_mock.side_effect = error
|
||||||
|
|
||||||
|
tracer = trace.get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("test-span"):
|
||||||
|
base_agent = Agent(
|
||||||
|
role="base_agent",
|
||||||
|
llm="gpt-4o-mini",
|
||||||
|
goal="Just say hi",
|
||||||
|
backstory="You are a helpful assistant that just says hi",
|
||||||
|
)
|
||||||
|
base_task = Task(
|
||||||
|
description="Just say hi",
|
||||||
|
expected_output="hi",
|
||||||
|
agent=base_agent,
|
||||||
|
)
|
||||||
|
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||||
|
crew.kickoff()
|
||||||
|
|
||||||
|
trace.get_tracer_provider().force_flush()
|
||||||
|
|
||||||
|
export_mock.assert_called_once()
|
||||||
|
logger_mock.assert_called_once_with(error)
|
||||||
Reference in New Issue
Block a user