mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-16 19:48:30 +00:00
chore: refactor telemetry module with utility functions and modern typing (#3485)
Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
This commit is contained in:
@@ -1,2 +1,9 @@
|
||||
CREWAI_TELEMETRY_BASE_URL: str = "https://telemetry.crewai.com:4319"
|
||||
CREWAI_TELEMETRY_SERVICE_NAME: str = "crewAI-telemetry"
|
||||
"""Telemetry configuration constants.
|
||||
|
||||
This module defines constants used for CrewAI telemetry configuration.
|
||||
"""
|
||||
|
||||
from typing import Final
|
||||
|
||||
CREWAI_TELEMETRY_BASE_URL: Final[str] = "https://telemetry.crewai.com:4319"
|
||||
CREWAI_TELEMETRY_SERVICE_NAME: Final[str] = "crewAI-telemetry"
|
||||
|
||||
@@ -1,3 +1,11 @@
|
||||
"""Telemetry module for CrewAI.
|
||||
|
||||
This module provides anonymous telemetry collection for development purposes.
|
||||
No prompts, task descriptions, agent backstories/goals, responses, or sensitive
|
||||
data is collected. Users can opt-in to share more complete data using the
|
||||
`share_crew` attribute.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
@@ -5,11 +13,10 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from typing import TYPE_CHECKING, Any, Callable, Optional
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from importlib.metadata import version
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
@@ -21,30 +28,43 @@ from opentelemetry.sdk.trace.export import (
|
||||
BatchSpanProcessor,
|
||||
SpanExportResult,
|
||||
)
|
||||
from opentelemetry.trace import Span, Status, StatusCode
|
||||
from opentelemetry.trace import Span
|
||||
|
||||
from crewai.telemetry.constants import (
|
||||
CREWAI_TELEMETRY_BASE_URL,
|
||||
CREWAI_TELEMETRY_SERVICE_NAME,
|
||||
)
|
||||
from crewai.telemetry.utils import (
|
||||
add_agent_fingerprint_to_span,
|
||||
add_crew_and_task_attributes,
|
||||
add_crew_attributes,
|
||||
close_span,
|
||||
)
|
||||
from crewai.utilities.logger_utils import suppress_warnings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def suppress_warnings():
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore")
|
||||
yield
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class SafeOTLPSpanExporter(OTLPSpanExporter):
|
||||
def export(self, spans) -> SpanExportResult:
|
||||
"""Safe wrapper for OTLP span exporter that handles exceptions gracefully.
|
||||
|
||||
This exporter prevents telemetry failures from breaking the application
|
||||
by catching and logging exceptions during span export.
|
||||
"""
|
||||
|
||||
def export(self, spans: Any) -> SpanExportResult:
|
||||
"""Export spans to the telemetry backend safely.
|
||||
|
||||
Args:
|
||||
spans: Collection of spans to export.
|
||||
|
||||
Returns:
|
||||
Export result status, FAILURE if an exception occurs.
|
||||
"""
|
||||
try:
|
||||
return super().export(spans)
|
||||
except Exception as e:
|
||||
@@ -53,16 +73,13 @@ class SafeOTLPSpanExporter(OTLPSpanExporter):
|
||||
|
||||
|
||||
class Telemetry:
|
||||
"""A class to handle anonymous telemetry for the crewai package.
|
||||
"""Handle anonymous telemetry for the CrewAI package.
|
||||
|
||||
The data being collected is for development purpose, all data is anonymous.
|
||||
|
||||
There is NO data being collected on the prompts, tasks descriptions
|
||||
agents backstories or goals nor responses or any data that is being
|
||||
processed by the agents, nor any secrets and env vars.
|
||||
|
||||
Users can opt-in to sharing more complete data using the `share_crew`
|
||||
attribute in the Crew class.
|
||||
Attributes:
|
||||
ready: Whether telemetry is initialized and ready.
|
||||
trace_set: Whether the tracer provider has been set.
|
||||
resource: OpenTelemetry resource for the telemetry service.
|
||||
provider: OpenTelemetry tracer provider.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
@@ -72,14 +89,14 @@ class Telemetry:
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super(Telemetry, cls).__new__(cls)
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self) -> None:
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
if hasattr(self, "_initialized") and self._initialized:
|
||||
return
|
||||
|
||||
|
||||
self.ready: bool = False
|
||||
self.trace_set: bool = False
|
||||
self._initialized: bool = True
|
||||
@@ -124,29 +141,41 @@ class Telemetry:
|
||||
"""Check if telemetry operations should be executed."""
|
||||
return self.ready and not self._is_telemetry_disabled()
|
||||
|
||||
def set_tracer(self):
|
||||
def set_tracer(self) -> None:
|
||||
"""Set the tracer provider if ready and not already set."""
|
||||
if self.ready and not self.trace_set:
|
||||
try:
|
||||
with suppress_warnings():
|
||||
trace.set_tracer_provider(self.provider)
|
||||
self.trace_set = True
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to set tracer provider: {e}")
|
||||
self.ready = False
|
||||
self.trace_set = False
|
||||
|
||||
def _safe_telemetry_operation(self, operation: Callable[[], None]) -> None:
|
||||
"""Execute telemetry operation safely, checking both readiness and environment variables."""
|
||||
def _safe_telemetry_operation(self, operation: Callable[[], Any]) -> None:
|
||||
"""Execute telemetry operation safely, checking both readiness and environment variables.
|
||||
|
||||
Args:
|
||||
operation: A callable that performs telemetry operations. May return any value,
|
||||
but the return value is not used by this method.
|
||||
"""
|
||||
if not self._should_execute_telemetry():
|
||||
return
|
||||
try:
|
||||
operation()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug(f"Telemetry operation failed: {e}")
|
||||
|
||||
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
|
||||
"""Records the creation of a crew."""
|
||||
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None) -> None:
|
||||
"""Records the creation of a crew.
|
||||
|
||||
def operation():
|
||||
Args:
|
||||
crew: The crew being created.
|
||||
inputs: Optional input parameters for the crew.
|
||||
"""
|
||||
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Crew Created")
|
||||
self._add_attribute(
|
||||
@@ -155,16 +184,14 @@ class Telemetry:
|
||||
version("crewai"),
|
||||
)
|
||||
self._add_attribute(span, "python_version", platform.python_version())
|
||||
self._add_attribute(span, "crew_key", crew.key)
|
||||
self._add_attribute(span, "crew_id", str(crew.id))
|
||||
add_crew_attributes(span, crew, self._add_attribute)
|
||||
self._add_attribute(span, "crew_process", crew.process)
|
||||
self._add_attribute(span, "crew_memory", crew.memory)
|
||||
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
|
||||
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
|
||||
|
||||
# Add fingerprint data
|
||||
# Add additional fingerprint metadata if available
|
||||
if hasattr(crew, "fingerprint") and crew.fingerprint:
|
||||
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
|
||||
self._add_attribute(
|
||||
span,
|
||||
"crew_fingerprint_created_at",
|
||||
@@ -343,29 +370,27 @@ class Telemetry:
|
||||
]
|
||||
),
|
||||
)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def task_started(self, crew: Crew, task: Task) -> Span | None:
|
||||
"""Records task started in a crew."""
|
||||
"""Records task started in a crew.
|
||||
|
||||
def operation():
|
||||
Args:
|
||||
crew: The crew executing the task.
|
||||
task: The task being started.
|
||||
|
||||
Returns:
|
||||
The span tracking the task execution, or None if telemetry is disabled.
|
||||
"""
|
||||
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
|
||||
created_span = tracer.start_span("Task Created")
|
||||
|
||||
self._add_attribute(created_span, "crew_key", crew.key)
|
||||
self._add_attribute(created_span, "crew_id", str(crew.id))
|
||||
self._add_attribute(created_span, "task_key", task.key)
|
||||
self._add_attribute(created_span, "task_id", str(task.id))
|
||||
|
||||
# Add fingerprint data
|
||||
if hasattr(crew, "fingerprint") and crew.fingerprint:
|
||||
self._add_attribute(
|
||||
created_span, "crew_fingerprint", crew.fingerprint.uuid_str
|
||||
)
|
||||
add_crew_and_task_attributes(created_span, crew, task, self._add_attribute)
|
||||
|
||||
if hasattr(task, "fingerprint") and task.fingerprint:
|
||||
self._add_attribute(
|
||||
@@ -386,13 +411,9 @@ class Telemetry:
|
||||
|
||||
# Add agent fingerprint if task has an assigned agent
|
||||
if hasattr(task, "agent") and task.agent:
|
||||
agent_fingerprint = getattr(
|
||||
getattr(task.agent, "fingerprint", None), "uuid_str", None
|
||||
add_agent_fingerprint_to_span(
|
||||
created_span, task.agent, self._add_attribute
|
||||
)
|
||||
if agent_fingerprint:
|
||||
self._add_attribute(
|
||||
created_span, "agent_fingerprint", agent_fingerprint
|
||||
)
|
||||
|
||||
if crew.share_crew:
|
||||
self._add_attribute(
|
||||
@@ -402,30 +423,18 @@ class Telemetry:
|
||||
created_span, "formatted_expected_output", task.expected_output
|
||||
)
|
||||
|
||||
created_span.set_status(Status(StatusCode.OK))
|
||||
created_span.end()
|
||||
close_span(created_span)
|
||||
|
||||
span = tracer.start_span("Task Execution")
|
||||
|
||||
self._add_attribute(span, "crew_key", crew.key)
|
||||
self._add_attribute(span, "crew_id", str(crew.id))
|
||||
self._add_attribute(span, "task_key", task.key)
|
||||
self._add_attribute(span, "task_id", str(task.id))
|
||||
|
||||
# Add fingerprint data to execution span
|
||||
if hasattr(crew, "fingerprint") and crew.fingerprint:
|
||||
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
|
||||
add_crew_and_task_attributes(span, crew, task, self._add_attribute)
|
||||
|
||||
if hasattr(task, "fingerprint") and task.fingerprint:
|
||||
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
|
||||
|
||||
# Add agent fingerprint if task has an assigned agent
|
||||
if hasattr(task, "agent") and task.agent:
|
||||
agent_fingerprint = getattr(
|
||||
getattr(task.agent, "fingerprint", None), "uuid_str", None
|
||||
)
|
||||
if agent_fingerprint:
|
||||
self._add_attribute(span, "agent_fingerprint", agent_fingerprint)
|
||||
add_agent_fingerprint_to_span(span, task.agent, self._add_attribute)
|
||||
|
||||
if crew.share_crew:
|
||||
self._add_attribute(span, "formatted_description", task.description)
|
||||
@@ -435,22 +444,25 @@ class Telemetry:
|
||||
|
||||
return span
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
return None
|
||||
if not self._should_execute_telemetry():
|
||||
return None
|
||||
|
||||
def task_ended(self, span: Span, task: Task, crew: Crew):
|
||||
self._safe_telemetry_operation(_operation)
|
||||
return _operation()
|
||||
|
||||
def task_ended(self, span: Span, task: Task, crew: Crew) -> None:
|
||||
"""Records the completion of a task execution in a crew.
|
||||
|
||||
Args:
|
||||
span (Span): The OpenTelemetry span tracking the task execution
|
||||
task (Task): The task that was completed
|
||||
crew (Crew): The crew context in which the task was executed
|
||||
span: The OpenTelemetry span tracking the task execution.
|
||||
task: The task that was completed.
|
||||
crew: The crew context in which the task was executed.
|
||||
|
||||
Note:
|
||||
If share_crew is enabled, this will also record the task output
|
||||
If share_crew is enabled, this will also record the task output.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
# Ensure fingerprint data is present on completion span
|
||||
if hasattr(task, "fingerprint") and task.fingerprint:
|
||||
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
|
||||
@@ -462,21 +474,20 @@ class Telemetry:
|
||||
task.output.raw if task.output else "",
|
||||
)
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
|
||||
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int) -> None:
|
||||
"""Records when a tool is used repeatedly, which might indicate an issue.
|
||||
|
||||
Args:
|
||||
llm (Any): The language model being used
|
||||
tool_name (str): Name of the tool being repeatedly used
|
||||
attempts (int): Number of attempts made with this tool
|
||||
llm: The language model being used.
|
||||
tool_name: Name of the tool being repeatedly used.
|
||||
attempts: Number of attempts made with this tool.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Tool Repeated Usage")
|
||||
self._add_attribute(
|
||||
@@ -488,22 +499,23 @@ class Telemetry:
|
||||
self._add_attribute(span, "attempts", attempts)
|
||||
if llm:
|
||||
self._add_attribute(span, "llm", llm.model)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def tool_usage(self, llm: Any, tool_name: str, attempts: int, agent: Any = None):
|
||||
def tool_usage(
|
||||
self, llm: Any, tool_name: str, attempts: int, agent: Any = None
|
||||
) -> None:
|
||||
"""Records the usage of a tool by an agent.
|
||||
|
||||
Args:
|
||||
llm (Any): The language model being used
|
||||
tool_name (str): Name of the tool being used
|
||||
attempts (int): Number of attempts made with this tool
|
||||
agent (Any, optional): The agent using the tool
|
||||
llm: The language model being used.
|
||||
tool_name: Name of the tool being used.
|
||||
attempts: Number of attempts made with this tool.
|
||||
agent: The agent using the tool.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Tool Usage")
|
||||
self._add_attribute(
|
||||
@@ -517,30 +529,23 @@ class Telemetry:
|
||||
self._add_attribute(span, "llm", llm.model)
|
||||
|
||||
# Add agent fingerprint data if available
|
||||
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
|
||||
self._add_attribute(
|
||||
span, "agent_fingerprint", agent.fingerprint.uuid_str
|
||||
)
|
||||
if hasattr(agent, "role"):
|
||||
self._add_attribute(span, "agent_role", agent.role)
|
||||
add_agent_fingerprint_to_span(span, agent, self._add_attribute)
|
||||
close_span(span)
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def tool_usage_error(
|
||||
self, llm: Any, agent: Any = None, tool_name: Optional[str] = None
|
||||
):
|
||||
self, llm: Any, agent: Any = None, tool_name: str | None = None
|
||||
) -> None:
|
||||
"""Records when a tool usage results in an error.
|
||||
|
||||
Args:
|
||||
llm (Any): The language model being used when the error occurred
|
||||
agent (Any, optional): The agent using the tool
|
||||
tool_name (str, optional): Name of the tool that caused the error
|
||||
llm: The language model being used when the error occurred.
|
||||
agent: The agent using the tool.
|
||||
tool_name: Name of the tool that caused the error.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Tool Usage Error")
|
||||
self._add_attribute(
|
||||
@@ -555,31 +560,24 @@ class Telemetry:
|
||||
self._add_attribute(span, "tool_name", tool_name)
|
||||
|
||||
# Add agent fingerprint data if available
|
||||
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
|
||||
self._add_attribute(
|
||||
span, "agent_fingerprint", agent.fingerprint.uuid_str
|
||||
)
|
||||
if hasattr(agent, "role"):
|
||||
self._add_attribute(span, "agent_role", agent.role)
|
||||
add_agent_fingerprint_to_span(span, agent, self._add_attribute)
|
||||
close_span(span)
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def individual_test_result_span(
|
||||
self, crew: Crew, quality: float, exec_time: int, model_name: str
|
||||
):
|
||||
) -> None:
|
||||
"""Records individual test results for a crew execution.
|
||||
|
||||
Args:
|
||||
crew (Crew): The crew being tested
|
||||
quality (float): Quality score of the execution
|
||||
exec_time (int): Execution time in seconds
|
||||
model_name (str): Name of the model used
|
||||
crew: The crew being tested.
|
||||
quality: Quality score of the execution.
|
||||
exec_time: Execution time in seconds.
|
||||
model_name: Name of the model used.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Crew Individual Test Result")
|
||||
|
||||
@@ -588,15 +586,15 @@ class Telemetry:
|
||||
"crewai_version",
|
||||
version("crewai"),
|
||||
)
|
||||
self._add_attribute(span, "crew_key", crew.key)
|
||||
self._add_attribute(span, "crew_id", str(crew.id))
|
||||
add_crew_attributes(
|
||||
span, crew, self._add_attribute, include_fingerprint=False
|
||||
)
|
||||
self._add_attribute(span, "quality", str(quality))
|
||||
self._add_attribute(span, "exec_time", str(exec_time))
|
||||
self._add_attribute(span, "model_name", model_name)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def test_execution_span(
|
||||
self,
|
||||
@@ -604,17 +602,17 @@ class Telemetry:
|
||||
iterations: int,
|
||||
inputs: dict[str, Any] | None,
|
||||
model_name: str,
|
||||
):
|
||||
) -> None:
|
||||
"""Records the execution of a test suite for a crew.
|
||||
|
||||
Args:
|
||||
crew (Crew): The crew being tested
|
||||
iterations (int): Number of test iterations
|
||||
inputs (dict[str, Any] | None): Input parameters for the test
|
||||
model_name (str): Name of the model used in testing
|
||||
crew: The crew being tested.
|
||||
iterations: Number of test iterations.
|
||||
inputs: Input parameters for the test.
|
||||
model_name: Name of the model used in testing.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Crew Test Execution")
|
||||
|
||||
@@ -623,8 +621,9 @@ class Telemetry:
|
||||
"crewai_version",
|
||||
version("crewai"),
|
||||
)
|
||||
self._add_attribute(span, "crew_key", crew.key)
|
||||
self._add_attribute(span, "crew_id", str(crew.id))
|
||||
add_crew_attributes(
|
||||
span, crew, self._add_attribute, include_fingerprint=False
|
||||
)
|
||||
self._add_attribute(span, "iterations", str(iterations))
|
||||
self._add_attribute(span, "model_name", model_name)
|
||||
|
||||
@@ -633,93 +632,99 @@ class Telemetry:
|
||||
span, "inputs", json.dumps(inputs) if inputs else None
|
||||
)
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def deploy_signup_error_span(self):
|
||||
def deploy_signup_error_span(self) -> None:
|
||||
"""Records when an error occurs during the deployment signup process."""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Deploy Signup Error")
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def start_deployment_span(self, uuid: Optional[str] = None):
|
||||
def start_deployment_span(self, uuid: str | None = None) -> None:
|
||||
"""Records the start of a deployment process.
|
||||
|
||||
Args:
|
||||
uuid (Optional[str]): Unique identifier for the deployment
|
||||
uuid: Unique identifier for the deployment.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Start Deployment")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def create_crew_deployment_span(self):
|
||||
def create_crew_deployment_span(self) -> None:
|
||||
"""Records the creation of a new crew deployment."""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Create Crew Deployment")
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"):
|
||||
def get_crew_logs_span(
|
||||
self, uuid: str | None, log_type: str = "deployment"
|
||||
) -> None:
|
||||
"""Records the retrieval of crew logs.
|
||||
|
||||
Args:
|
||||
uuid (Optional[str]): Unique identifier for the crew
|
||||
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
|
||||
uuid: Unique identifier for the crew.
|
||||
log_type: Type of logs being retrieved. Defaults to "deployment".
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Get Crew Logs")
|
||||
self._add_attribute(span, "log_type", log_type)
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def remove_crew_span(self, uuid: Optional[str] = None):
|
||||
def remove_crew_span(self, uuid: str | None = None) -> None:
|
||||
"""Records the removal of a crew.
|
||||
|
||||
Args:
|
||||
uuid (Optional[str]): Unique identifier for the crew being removed
|
||||
uuid: Unique identifier for the crew being removed.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Remove Crew")
|
||||
if uuid:
|
||||
self._add_attribute(span, "uuid", uuid)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
|
||||
def crew_execution_span(
|
||||
self, crew: Crew, inputs: dict[str, Any] | None
|
||||
) -> Span | None:
|
||||
"""Records the complete execution of a crew.
|
||||
|
||||
This is only collected if the user has opted-in to share the crew.
|
||||
|
||||
Args:
|
||||
crew: The crew being executed.
|
||||
inputs: Optional input parameters for the crew.
|
||||
|
||||
Returns:
|
||||
The execution span if crew sharing is enabled, None otherwise.
|
||||
"""
|
||||
self.crew_creation(crew, inputs)
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Crew Execution")
|
||||
self._add_attribute(
|
||||
@@ -727,8 +732,9 @@ class Telemetry:
|
||||
"crewai_version",
|
||||
version("crewai"),
|
||||
)
|
||||
self._add_attribute(span, "crew_key", crew.key)
|
||||
self._add_attribute(span, "crew_id", str(crew.id))
|
||||
add_crew_attributes(
|
||||
span, crew, self._add_attribute, include_fingerprint=False
|
||||
)
|
||||
self._add_attribute(
|
||||
span, "crew_inputs", json.dumps(inputs) if inputs else None
|
||||
)
|
||||
@@ -786,12 +792,19 @@ class Telemetry:
|
||||
return span
|
||||
|
||||
if crew.share_crew:
|
||||
self._safe_telemetry_operation(operation)
|
||||
return operation()
|
||||
self._safe_telemetry_operation(_operation)
|
||||
return _operation()
|
||||
return None
|
||||
|
||||
def end_crew(self, crew, final_string_output):
|
||||
def operation():
|
||||
def end_crew(self, crew: Any, final_string_output: str) -> None:
|
||||
"""Records the end of crew execution.
|
||||
|
||||
Args:
|
||||
crew: The crew that finished execution.
|
||||
final_string_output: The final output from the crew.
|
||||
"""
|
||||
|
||||
def _operation():
|
||||
self._add_attribute(
|
||||
crew._execution_span,
|
||||
"crewai_version",
|
||||
@@ -814,68 +827,70 @@ class Telemetry:
|
||||
]
|
||||
),
|
||||
)
|
||||
crew._execution_span.set_status(Status(StatusCode.OK))
|
||||
crew._execution_span.end()
|
||||
close_span(crew._execution_span)
|
||||
|
||||
if crew.share_crew:
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def _add_attribute(self, span, key, value):
|
||||
"""Add an attribute to a span."""
|
||||
def _add_attribute(self, span: Span, key: str, value: Any) -> None:
|
||||
"""Add an attribute to a span.
|
||||
|
||||
def operation():
|
||||
Args:
|
||||
span: The span to add the attribute to.
|
||||
key: The attribute key.
|
||||
value: The attribute value.
|
||||
"""
|
||||
|
||||
def _operation():
|
||||
return span.set_attribute(key, value)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def flow_creation_span(self, flow_name: str):
|
||||
def flow_creation_span(self, flow_name: str) -> None:
|
||||
"""Records the creation of a new flow.
|
||||
|
||||
Args:
|
||||
flow_name (str): Name of the flow being created
|
||||
flow_name: Name of the flow being created.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Creation")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
|
||||
def flow_plotting_span(self, flow_name: str, node_names: list[str]) -> None:
|
||||
"""Records flow visualization/plotting activity.
|
||||
|
||||
Args:
|
||||
flow_name (str): Name of the flow being plotted
|
||||
node_names (list[str]): List of node names in the flow
|
||||
flow_name: Name of the flow being plotted.
|
||||
node_names: List of node names in the flow.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Plotting")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
self._add_attribute(span, "node_names", json.dumps(node_names))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
def flow_execution_span(self, flow_name: str, node_names: list[str]):
|
||||
def flow_execution_span(self, flow_name: str, node_names: list[str]) -> None:
|
||||
"""Records the execution of a flow.
|
||||
|
||||
Args:
|
||||
flow_name (str): Name of the flow being executed
|
||||
node_names (list[str]): List of nodes being executed in the flow
|
||||
flow_name: Name of the flow being executed.
|
||||
node_names: List of nodes being executed in the flow.
|
||||
"""
|
||||
|
||||
def operation():
|
||||
def _operation():
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Execution")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
self._add_attribute(span, "node_names", json.dumps(node_names))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
close_span(span)
|
||||
|
||||
self._safe_telemetry_operation(operation)
|
||||
self._safe_telemetry_operation(_operation)
|
||||
|
||||
112
src/crewai/telemetry/utils.py
Normal file
112
src/crewai/telemetry/utils.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Telemetry utility functions.
|
||||
|
||||
This module provides utility functions for telemetry operations.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from opentelemetry.trace import Span, Status, StatusCode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.crew import Crew
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
def add_agent_fingerprint_to_span(
|
||||
span: Span, agent: Any, add_attribute_fn: Callable[[Span, str, Any], None]
|
||||
) -> None:
|
||||
"""Add agent fingerprint data to a span if available.
|
||||
|
||||
Args:
|
||||
span: The span to add the attributes to.
|
||||
agent: The agent whose fingerprint data should be added.
|
||||
add_attribute_fn: Function to add attributes to the span.
|
||||
"""
|
||||
if agent:
|
||||
# Try to get fingerprint directly
|
||||
if hasattr(agent, "fingerprint") and agent.fingerprint:
|
||||
add_attribute_fn(span, "agent_fingerprint", agent.fingerprint.uuid_str)
|
||||
if hasattr(agent, "role"):
|
||||
add_attribute_fn(span, "agent_role", agent.role)
|
||||
else:
|
||||
# Try to get fingerprint using getattr (for cases where it might not be directly accessible)
|
||||
agent_fingerprint = getattr(
|
||||
getattr(agent, "fingerprint", None), "uuid_str", None
|
||||
)
|
||||
if agent_fingerprint:
|
||||
add_attribute_fn(span, "agent_fingerprint", agent_fingerprint)
|
||||
if hasattr(agent, "role"):
|
||||
add_attribute_fn(span, "agent_role", agent.role)
|
||||
|
||||
|
||||
def add_crew_attributes(
|
||||
span: Span,
|
||||
crew: "Crew",
|
||||
add_attribute_fn: Callable[[Span, str, Any], None],
|
||||
include_fingerprint: bool = True,
|
||||
) -> None:
|
||||
"""Add crew attributes to a span.
|
||||
|
||||
Args:
|
||||
span: The span to add the attributes to.
|
||||
crew: The crew whose attributes should be added.
|
||||
add_attribute_fn: Function to add attributes to the span.
|
||||
include_fingerprint: Whether to include fingerprint data.
|
||||
"""
|
||||
add_attribute_fn(span, "crew_key", crew.key)
|
||||
add_attribute_fn(span, "crew_id", str(crew.id))
|
||||
|
||||
if include_fingerprint and hasattr(crew, "fingerprint") and crew.fingerprint:
|
||||
add_attribute_fn(span, "crew_fingerprint", crew.fingerprint.uuid_str)
|
||||
|
||||
|
||||
def add_task_attributes(
|
||||
span: Span,
|
||||
task: "Task",
|
||||
add_attribute_fn: Callable[[Span, str, Any], None],
|
||||
include_fingerprint: bool = True,
|
||||
) -> None:
|
||||
"""Add task attributes to a span.
|
||||
|
||||
Args:
|
||||
span: The span to add the attributes to.
|
||||
task: The task whose attributes should be added.
|
||||
add_attribute_fn: Function to add attributes to the span.
|
||||
include_fingerprint: Whether to include fingerprint data.
|
||||
"""
|
||||
add_attribute_fn(span, "task_key", task.key)
|
||||
add_attribute_fn(span, "task_id", str(task.id))
|
||||
|
||||
if include_fingerprint and hasattr(task, "fingerprint") and task.fingerprint:
|
||||
add_attribute_fn(span, "task_fingerprint", task.fingerprint.uuid_str)
|
||||
|
||||
|
||||
def add_crew_and_task_attributes(
|
||||
span: Span,
|
||||
crew: "Crew",
|
||||
task: "Task",
|
||||
add_attribute_fn: Callable[[Span, str, Any], None],
|
||||
include_fingerprints: bool = True,
|
||||
) -> None:
|
||||
"""Add both crew and task attributes to a span.
|
||||
|
||||
Args:
|
||||
span: The span to add the attributes to.
|
||||
crew: The crew whose attributes should be added.
|
||||
task: The task whose attributes should be added.
|
||||
add_attribute_fn: Function to add attributes to the span.
|
||||
include_fingerprints: Whether to include fingerprint data.
|
||||
"""
|
||||
add_crew_attributes(span, crew, add_attribute_fn, include_fingerprints)
|
||||
add_task_attributes(span, task, add_attribute_fn, include_fingerprints)
|
||||
|
||||
|
||||
def close_span(span: Span) -> None:
|
||||
"""Set span status to OK and end it.
|
||||
|
||||
Args:
|
||||
span: The span to close.
|
||||
"""
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Logging utility functions for CrewAI."""
|
||||
"""Logging and warning utility functions for CrewAI."""
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import logging
|
||||
import warnings
|
||||
from collections.abc import Generator
|
||||
|
||||
|
||||
@@ -36,3 +37,20 @@ def suppress_logging(
|
||||
):
|
||||
yield
|
||||
logger.setLevel(original_level)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def suppress_warnings() -> Generator[None, None, None]:
|
||||
"""Context manager to suppress all warnings.
|
||||
|
||||
Yields:
|
||||
None during the context execution.
|
||||
|
||||
Note:
|
||||
There is a similar implementation in src/crewai/llm.py that also
|
||||
suppresses a specific deprecation warning. That version may be
|
||||
consolidated here in the future.
|
||||
"""
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore")
|
||||
yield
|
||||
|
||||
Reference in New Issue
Block a user