From b126ab22dd61dff16ca667303c92c4a340ec0a71 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 10 Sep 2025 09:18:21 -0400 Subject: [PATCH] chore: refactor telemetry module with utility functions and modern typing (#3485) Co-authored-by: Lucas Gomide --- src/crewai/telemetry/constants.py | 11 +- src/crewai/telemetry/telemetry.py | 439 ++++++++++++++------------- src/crewai/telemetry/utils.py | 112 +++++++ src/crewai/utilities/logger_utils.py | 20 +- 4 files changed, 367 insertions(+), 215 deletions(-) create mode 100644 src/crewai/telemetry/utils.py diff --git a/src/crewai/telemetry/constants.py b/src/crewai/telemetry/constants.py index 95820a774..3c735d868 100644 --- a/src/crewai/telemetry/constants.py +++ b/src/crewai/telemetry/constants.py @@ -1,2 +1,9 @@ -CREWAI_TELEMETRY_BASE_URL: str = "https://telemetry.crewai.com:4319" -CREWAI_TELEMETRY_SERVICE_NAME: str = "crewAI-telemetry" +"""Telemetry configuration constants. + +This module defines constants used for CrewAI telemetry configuration. +""" + +from typing import Final + +CREWAI_TELEMETRY_BASE_URL: Final[str] = "https://telemetry.crewai.com:4319" +CREWAI_TELEMETRY_SERVICE_NAME: Final[str] = "crewAI-telemetry" diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index d43bd276d..b7d479069 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -1,3 +1,11 @@ +"""Telemetry module for CrewAI. + +This module provides anonymous telemetry collection for development purposes. +No prompts, task descriptions, agent backstories/goals, responses, or sensitive +data is collected. Users can opt-in to share more complete data using the +`share_crew` attribute. +""" + from __future__ import annotations import asyncio @@ -5,11 +13,10 @@ import json import logging import os import platform -import warnings -from contextlib import contextmanager -from importlib.metadata import version -from typing import TYPE_CHECKING, Any, Callable, Optional import threading +from collections.abc import Callable +from importlib.metadata import version +from typing import TYPE_CHECKING, Any from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -21,30 +28,43 @@ from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, SpanExportResult, ) -from opentelemetry.trace import Span, Status, StatusCode +from opentelemetry.trace import Span from crewai.telemetry.constants import ( CREWAI_TELEMETRY_BASE_URL, CREWAI_TELEMETRY_SERVICE_NAME, ) +from crewai.telemetry.utils import ( + add_agent_fingerprint_to_span, + add_crew_and_task_attributes, + add_crew_attributes, + close_span, +) +from crewai.utilities.logger_utils import suppress_warnings logger = logging.getLogger(__name__) - -@contextmanager -def suppress_warnings(): - with warnings.catch_warnings(): - warnings.filterwarnings("ignore") - yield - - if TYPE_CHECKING: from crewai.crew import Crew from crewai.task import Task class SafeOTLPSpanExporter(OTLPSpanExporter): - def export(self, spans) -> SpanExportResult: + """Safe wrapper for OTLP span exporter that handles exceptions gracefully. + + This exporter prevents telemetry failures from breaking the application + by catching and logging exceptions during span export. + """ + + def export(self, spans: Any) -> SpanExportResult: + """Export spans to the telemetry backend safely. + + Args: + spans: Collection of spans to export. + + Returns: + Export result status, FAILURE if an exception occurs. + """ try: return super().export(spans) except Exception as e: @@ -53,16 +73,13 @@ class SafeOTLPSpanExporter(OTLPSpanExporter): class Telemetry: - """A class to handle anonymous telemetry for the crewai package. + """Handle anonymous telemetry for the CrewAI package. - The data being collected is for development purpose, all data is anonymous. - - There is NO data being collected on the prompts, tasks descriptions - agents backstories or goals nor responses or any data that is being - processed by the agents, nor any secrets and env vars. - - Users can opt-in to sharing more complete data using the `share_crew` - attribute in the Crew class. + Attributes: + ready: Whether telemetry is initialized and ready. + trace_set: Whether the tracer provider has been set. + resource: OpenTelemetry resource for the telemetry service. + provider: OpenTelemetry tracer provider. """ _instance = None @@ -72,14 +89,14 @@ class Telemetry: if cls._instance is None: with cls._lock: if cls._instance is None: - cls._instance = super(Telemetry, cls).__new__(cls) + cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self) -> None: - if hasattr(self, '_initialized') and self._initialized: + if hasattr(self, "_initialized") and self._initialized: return - + self.ready: bool = False self.trace_set: bool = False self._initialized: bool = True @@ -124,29 +141,41 @@ class Telemetry: """Check if telemetry operations should be executed.""" return self.ready and not self._is_telemetry_disabled() - def set_tracer(self): + def set_tracer(self) -> None: + """Set the tracer provider if ready and not already set.""" if self.ready and not self.trace_set: try: with suppress_warnings(): trace.set_tracer_provider(self.provider) self.trace_set = True - except Exception: + except Exception as e: + logger.debug(f"Failed to set tracer provider: {e}") self.ready = False self.trace_set = False - def _safe_telemetry_operation(self, operation: Callable[[], None]) -> None: - """Execute telemetry operation safely, checking both readiness and environment variables.""" + def _safe_telemetry_operation(self, operation: Callable[[], Any]) -> None: + """Execute telemetry operation safely, checking both readiness and environment variables. + + Args: + operation: A callable that performs telemetry operations. May return any value, + but the return value is not used by this method. + """ if not self._should_execute_telemetry(): return try: operation() - except Exception: - pass + except Exception as e: + logger.debug(f"Telemetry operation failed: {e}") - def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): - """Records the creation of a crew.""" + def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None) -> None: + """Records the creation of a crew. - def operation(): + Args: + crew: The crew being created. + inputs: Optional input parameters for the crew. + """ + + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Created") self._add_attribute( @@ -155,16 +184,14 @@ class Telemetry: version("crewai"), ) self._add_attribute(span, "python_version", platform.python_version()) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) + add_crew_attributes(span, crew, self._add_attribute) self._add_attribute(span, "crew_process", crew.process) self._add_attribute(span, "crew_memory", crew.memory) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) - # Add fingerprint data + # Add additional fingerprint metadata if available if hasattr(crew, "fingerprint") and crew.fingerprint: - self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str) self._add_attribute( span, "crew_fingerprint_created_at", @@ -343,29 +370,27 @@ class Telemetry: ] ), ) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) def task_started(self, crew: Crew, task: Task) -> Span | None: - """Records task started in a crew.""" + """Records task started in a crew. - def operation(): + Args: + crew: The crew executing the task. + task: The task being started. + + Returns: + The span tracking the task execution, or None if telemetry is disabled. + """ + + def _operation(): tracer = trace.get_tracer("crewai.telemetry") created_span = tracer.start_span("Task Created") - self._add_attribute(created_span, "crew_key", crew.key) - self._add_attribute(created_span, "crew_id", str(crew.id)) - self._add_attribute(created_span, "task_key", task.key) - self._add_attribute(created_span, "task_id", str(task.id)) - - # Add fingerprint data - if hasattr(crew, "fingerprint") and crew.fingerprint: - self._add_attribute( - created_span, "crew_fingerprint", crew.fingerprint.uuid_str - ) + add_crew_and_task_attributes(created_span, crew, task, self._add_attribute) if hasattr(task, "fingerprint") and task.fingerprint: self._add_attribute( @@ -386,13 +411,9 @@ class Telemetry: # Add agent fingerprint if task has an assigned agent if hasattr(task, "agent") and task.agent: - agent_fingerprint = getattr( - getattr(task.agent, "fingerprint", None), "uuid_str", None + add_agent_fingerprint_to_span( + created_span, task.agent, self._add_attribute ) - if agent_fingerprint: - self._add_attribute( - created_span, "agent_fingerprint", agent_fingerprint - ) if crew.share_crew: self._add_attribute( @@ -402,30 +423,18 @@ class Telemetry: created_span, "formatted_expected_output", task.expected_output ) - created_span.set_status(Status(StatusCode.OK)) - created_span.end() + close_span(created_span) span = tracer.start_span("Task Execution") - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "task_key", task.key) - self._add_attribute(span, "task_id", str(task.id)) - - # Add fingerprint data to execution span - if hasattr(crew, "fingerprint") and crew.fingerprint: - self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str) + add_crew_and_task_attributes(span, crew, task, self._add_attribute) if hasattr(task, "fingerprint") and task.fingerprint: self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str) # Add agent fingerprint if task has an assigned agent if hasattr(task, "agent") and task.agent: - agent_fingerprint = getattr( - getattr(task.agent, "fingerprint", None), "uuid_str", None - ) - if agent_fingerprint: - self._add_attribute(span, "agent_fingerprint", agent_fingerprint) + add_agent_fingerprint_to_span(span, task.agent, self._add_attribute) if crew.share_crew: self._add_attribute(span, "formatted_description", task.description) @@ -435,22 +444,25 @@ class Telemetry: return span - self._safe_telemetry_operation(operation) - return None + if not self._should_execute_telemetry(): + return None - def task_ended(self, span: Span, task: Task, crew: Crew): + self._safe_telemetry_operation(_operation) + return _operation() + + def task_ended(self, span: Span, task: Task, crew: Crew) -> None: """Records the completion of a task execution in a crew. Args: - span (Span): The OpenTelemetry span tracking the task execution - task (Task): The task that was completed - crew (Crew): The crew context in which the task was executed + span: The OpenTelemetry span tracking the task execution. + task: The task that was completed. + crew: The crew context in which the task was executed. Note: - If share_crew is enabled, this will also record the task output + If share_crew is enabled, this will also record the task output. """ - def operation(): + def _operation(): # Ensure fingerprint data is present on completion span if hasattr(task, "fingerprint") and task.fingerprint: self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str) @@ -462,21 +474,20 @@ class Telemetry: task.output.raw if task.output else "", ) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int): + def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int) -> None: """Records when a tool is used repeatedly, which might indicate an issue. Args: - llm (Any): The language model being used - tool_name (str): Name of the tool being repeatedly used - attempts (int): Number of attempts made with this tool + llm: The language model being used. + tool_name: Name of the tool being repeatedly used. + attempts: Number of attempts made with this tool. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Tool Repeated Usage") self._add_attribute( @@ -488,22 +499,23 @@ class Telemetry: self._add_attribute(span, "attempts", attempts) if llm: self._add_attribute(span, "llm", llm.model) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def tool_usage(self, llm: Any, tool_name: str, attempts: int, agent: Any = None): + def tool_usage( + self, llm: Any, tool_name: str, attempts: int, agent: Any = None + ) -> None: """Records the usage of a tool by an agent. Args: - llm (Any): The language model being used - tool_name (str): Name of the tool being used - attempts (int): Number of attempts made with this tool - agent (Any, optional): The agent using the tool + llm: The language model being used. + tool_name: Name of the tool being used. + attempts: Number of attempts made with this tool. + agent: The agent using the tool. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Tool Usage") self._add_attribute( @@ -517,30 +529,23 @@ class Telemetry: self._add_attribute(span, "llm", llm.model) # Add agent fingerprint data if available - if agent and hasattr(agent, "fingerprint") and agent.fingerprint: - self._add_attribute( - span, "agent_fingerprint", agent.fingerprint.uuid_str - ) - if hasattr(agent, "role"): - self._add_attribute(span, "agent_role", agent.role) + add_agent_fingerprint_to_span(span, agent, self._add_attribute) + close_span(span) - span.set_status(Status(StatusCode.OK)) - span.end() - - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) def tool_usage_error( - self, llm: Any, agent: Any = None, tool_name: Optional[str] = None - ): + self, llm: Any, agent: Any = None, tool_name: str | None = None + ) -> None: """Records when a tool usage results in an error. Args: - llm (Any): The language model being used when the error occurred - agent (Any, optional): The agent using the tool - tool_name (str, optional): Name of the tool that caused the error + llm: The language model being used when the error occurred. + agent: The agent using the tool. + tool_name: Name of the tool that caused the error. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Tool Usage Error") self._add_attribute( @@ -555,31 +560,24 @@ class Telemetry: self._add_attribute(span, "tool_name", tool_name) # Add agent fingerprint data if available - if agent and hasattr(agent, "fingerprint") and agent.fingerprint: - self._add_attribute( - span, "agent_fingerprint", agent.fingerprint.uuid_str - ) - if hasattr(agent, "role"): - self._add_attribute(span, "agent_role", agent.role) + add_agent_fingerprint_to_span(span, agent, self._add_attribute) + close_span(span) - span.set_status(Status(StatusCode.OK)) - span.end() - - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) def individual_test_result_span( self, crew: Crew, quality: float, exec_time: int, model_name: str - ): + ) -> None: """Records individual test results for a crew execution. Args: - crew (Crew): The crew being tested - quality (float): Quality score of the execution - exec_time (int): Execution time in seconds - model_name (str): Name of the model used + crew: The crew being tested. + quality: Quality score of the execution. + exec_time: Execution time in seconds. + model_name: Name of the model used. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Individual Test Result") @@ -588,15 +586,15 @@ class Telemetry: "crewai_version", version("crewai"), ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) + add_crew_attributes( + span, crew, self._add_attribute, include_fingerprint=False + ) self._add_attribute(span, "quality", str(quality)) self._add_attribute(span, "exec_time", str(exec_time)) self._add_attribute(span, "model_name", model_name) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) def test_execution_span( self, @@ -604,17 +602,17 @@ class Telemetry: iterations: int, inputs: dict[str, Any] | None, model_name: str, - ): + ) -> None: """Records the execution of a test suite for a crew. Args: - crew (Crew): The crew being tested - iterations (int): Number of test iterations - inputs (dict[str, Any] | None): Input parameters for the test - model_name (str): Name of the model used in testing + crew: The crew being tested. + iterations: Number of test iterations. + inputs: Input parameters for the test. + model_name: Name of the model used in testing. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Test Execution") @@ -623,8 +621,9 @@ class Telemetry: "crewai_version", version("crewai"), ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) + add_crew_attributes( + span, crew, self._add_attribute, include_fingerprint=False + ) self._add_attribute(span, "iterations", str(iterations)) self._add_attribute(span, "model_name", model_name) @@ -633,93 +632,99 @@ class Telemetry: span, "inputs", json.dumps(inputs) if inputs else None ) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def deploy_signup_error_span(self): + def deploy_signup_error_span(self) -> None: """Records when an error occurs during the deployment signup process.""" - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Deploy Signup Error") - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def start_deployment_span(self, uuid: Optional[str] = None): + def start_deployment_span(self, uuid: str | None = None) -> None: """Records the start of a deployment process. Args: - uuid (Optional[str]): Unique identifier for the deployment + uuid: Unique identifier for the deployment. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Start Deployment") if uuid: self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def create_crew_deployment_span(self): + def create_crew_deployment_span(self) -> None: """Records the creation of a new crew deployment.""" - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Create Crew Deployment") - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"): + def get_crew_logs_span( + self, uuid: str | None, log_type: str = "deployment" + ) -> None: """Records the retrieval of crew logs. Args: - uuid (Optional[str]): Unique identifier for the crew - log_type (str, optional): Type of logs being retrieved. Defaults to "deployment". + uuid: Unique identifier for the crew. + log_type: Type of logs being retrieved. Defaults to "deployment". """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Get Crew Logs") self._add_attribute(span, "log_type", log_type) if uuid: self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def remove_crew_span(self, uuid: Optional[str] = None): + def remove_crew_span(self, uuid: str | None = None) -> None: """Records the removal of a crew. Args: - uuid (Optional[str]): Unique identifier for the crew being removed + uuid: Unique identifier for the crew being removed. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Remove Crew") if uuid: self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): + def crew_execution_span( + self, crew: Crew, inputs: dict[str, Any] | None + ) -> Span | None: """Records the complete execution of a crew. + This is only collected if the user has opted-in to share the crew. + + Args: + crew: The crew being executed. + inputs: Optional input parameters for the crew. + + Returns: + The execution span if crew sharing is enabled, None otherwise. """ self.crew_creation(crew, inputs) - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Execution") self._add_attribute( @@ -727,8 +732,9 @@ class Telemetry: "crewai_version", version("crewai"), ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) + add_crew_attributes( + span, crew, self._add_attribute, include_fingerprint=False + ) self._add_attribute( span, "crew_inputs", json.dumps(inputs) if inputs else None ) @@ -786,12 +792,19 @@ class Telemetry: return span if crew.share_crew: - self._safe_telemetry_operation(operation) - return operation() + self._safe_telemetry_operation(_operation) + return _operation() return None - def end_crew(self, crew, final_string_output): - def operation(): + def end_crew(self, crew: Any, final_string_output: str) -> None: + """Records the end of crew execution. + + Args: + crew: The crew that finished execution. + final_string_output: The final output from the crew. + """ + + def _operation(): self._add_attribute( crew._execution_span, "crewai_version", @@ -814,68 +827,70 @@ class Telemetry: ] ), ) - crew._execution_span.set_status(Status(StatusCode.OK)) - crew._execution_span.end() + close_span(crew._execution_span) if crew.share_crew: - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def _add_attribute(self, span, key, value): - """Add an attribute to a span.""" + def _add_attribute(self, span: Span, key: str, value: Any) -> None: + """Add an attribute to a span. - def operation(): + Args: + span: The span to add the attribute to. + key: The attribute key. + value: The attribute value. + """ + + def _operation(): return span.set_attribute(key, value) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def flow_creation_span(self, flow_name: str): + def flow_creation_span(self, flow_name: str) -> None: """Records the creation of a new flow. Args: - flow_name (str): Name of the flow being created + flow_name: Name of the flow being created. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Flow Creation") self._add_attribute(span, "flow_name", flow_name) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def flow_plotting_span(self, flow_name: str, node_names: list[str]): + def flow_plotting_span(self, flow_name: str, node_names: list[str]) -> None: """Records flow visualization/plotting activity. Args: - flow_name (str): Name of the flow being plotted - node_names (list[str]): List of node names in the flow + flow_name: Name of the flow being plotted. + node_names: List of node names in the flow. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Flow Plotting") self._add_attribute(span, "flow_name", flow_name) self._add_attribute(span, "node_names", json.dumps(node_names)) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) - def flow_execution_span(self, flow_name: str, node_names: list[str]): + def flow_execution_span(self, flow_name: str, node_names: list[str]) -> None: """Records the execution of a flow. Args: - flow_name (str): Name of the flow being executed - node_names (list[str]): List of nodes being executed in the flow + flow_name: Name of the flow being executed. + node_names: List of nodes being executed in the flow. """ - def operation(): + def _operation(): tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Flow Execution") self._add_attribute(span, "flow_name", flow_name) self._add_attribute(span, "node_names", json.dumps(node_names)) - span.set_status(Status(StatusCode.OK)) - span.end() + close_span(span) - self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(_operation) diff --git a/src/crewai/telemetry/utils.py b/src/crewai/telemetry/utils.py new file mode 100644 index 000000000..b56b58e8d --- /dev/null +++ b/src/crewai/telemetry/utils.py @@ -0,0 +1,112 @@ +"""Telemetry utility functions. + +This module provides utility functions for telemetry operations. +""" + +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +from opentelemetry.trace import Span, Status, StatusCode + +if TYPE_CHECKING: + from crewai.crew import Crew + from crewai.task import Task + + +def add_agent_fingerprint_to_span( + span: Span, agent: Any, add_attribute_fn: Callable[[Span, str, Any], None] +) -> None: + """Add agent fingerprint data to a span if available. + + Args: + span: The span to add the attributes to. + agent: The agent whose fingerprint data should be added. + add_attribute_fn: Function to add attributes to the span. + """ + if agent: + # Try to get fingerprint directly + if hasattr(agent, "fingerprint") and agent.fingerprint: + add_attribute_fn(span, "agent_fingerprint", agent.fingerprint.uuid_str) + if hasattr(agent, "role"): + add_attribute_fn(span, "agent_role", agent.role) + else: + # Try to get fingerprint using getattr (for cases where it might not be directly accessible) + agent_fingerprint = getattr( + getattr(agent, "fingerprint", None), "uuid_str", None + ) + if agent_fingerprint: + add_attribute_fn(span, "agent_fingerprint", agent_fingerprint) + if hasattr(agent, "role"): + add_attribute_fn(span, "agent_role", agent.role) + + +def add_crew_attributes( + span: Span, + crew: "Crew", + add_attribute_fn: Callable[[Span, str, Any], None], + include_fingerprint: bool = True, +) -> None: + """Add crew attributes to a span. + + Args: + span: The span to add the attributes to. + crew: The crew whose attributes should be added. + add_attribute_fn: Function to add attributes to the span. + include_fingerprint: Whether to include fingerprint data. + """ + add_attribute_fn(span, "crew_key", crew.key) + add_attribute_fn(span, "crew_id", str(crew.id)) + + if include_fingerprint and hasattr(crew, "fingerprint") and crew.fingerprint: + add_attribute_fn(span, "crew_fingerprint", crew.fingerprint.uuid_str) + + +def add_task_attributes( + span: Span, + task: "Task", + add_attribute_fn: Callable[[Span, str, Any], None], + include_fingerprint: bool = True, +) -> None: + """Add task attributes to a span. + + Args: + span: The span to add the attributes to. + task: The task whose attributes should be added. + add_attribute_fn: Function to add attributes to the span. + include_fingerprint: Whether to include fingerprint data. + """ + add_attribute_fn(span, "task_key", task.key) + add_attribute_fn(span, "task_id", str(task.id)) + + if include_fingerprint and hasattr(task, "fingerprint") and task.fingerprint: + add_attribute_fn(span, "task_fingerprint", task.fingerprint.uuid_str) + + +def add_crew_and_task_attributes( + span: Span, + crew: "Crew", + task: "Task", + add_attribute_fn: Callable[[Span, str, Any], None], + include_fingerprints: bool = True, +) -> None: + """Add both crew and task attributes to a span. + + Args: + span: The span to add the attributes to. + crew: The crew whose attributes should be added. + task: The task whose attributes should be added. + add_attribute_fn: Function to add attributes to the span. + include_fingerprints: Whether to include fingerprint data. + """ + add_crew_attributes(span, crew, add_attribute_fn, include_fingerprints) + add_task_attributes(span, task, add_attribute_fn, include_fingerprints) + + +def close_span(span: Span) -> None: + """Set span status to OK and end it. + + Args: + span: The span to close. + """ + span.set_status(Status(StatusCode.OK)) + span.end() diff --git a/src/crewai/utilities/logger_utils.py b/src/crewai/utilities/logger_utils.py index b8af01289..7d0e806be 100644 --- a/src/crewai/utilities/logger_utils.py +++ b/src/crewai/utilities/logger_utils.py @@ -1,8 +1,9 @@ -"""Logging utility functions for CrewAI.""" +"""Logging and warning utility functions for CrewAI.""" import contextlib import io import logging +import warnings from collections.abc import Generator @@ -36,3 +37,20 @@ def suppress_logging( ): yield logger.setLevel(original_level) + + +@contextlib.contextmanager +def suppress_warnings() -> Generator[None, None, None]: + """Context manager to suppress all warnings. + + Yields: + None during the context execution. + + Note: + There is a similar implementation in src/crewai/llm.py that also + suppresses a specific deprecation warning. That version may be + consolidated here in the future. + """ + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + yield