fix: fix double trace call; add types

* fix: fix double trace call; add types

* tests: skip long running uv install test, refactor in future

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
This commit is contained in:
Greyson LaLonde
2025-10-21 14:15:39 -04:00
committed by GitHub
parent 6469f224f6
commit dba27cf8b5
2 changed files with 34 additions and 32 deletions

View File

@@ -29,6 +29,7 @@ from opentelemetry.sdk.trace.export import (
SpanExportResult,
)
from opentelemetry.trace import Span
from typing_extensions import Self
from crewai.telemetry.constants import (
CREWAI_TELEMETRY_BASE_URL,
@@ -86,7 +87,7 @@ class Telemetry:
_instance = None
_lock = threading.Lock()
def __new__(cls):
def __new__(cls) -> Self:
if cls._instance is None:
with cls._lock:
if cls._instance is None:
@@ -154,19 +155,24 @@ class Telemetry:
self.ready = False
self.trace_set = False
def _safe_telemetry_operation(self, operation: Callable[[], Any]) -> None:
def _safe_telemetry_operation(
self, operation: Callable[[], Span | None]
) -> Span | None:
"""Execute telemetry operation safely, checking both readiness and environment variables.
Args:
operation: A callable that performs telemetry operations. May return any value,
but the return value is not used by this method.
operation: A callable that performs telemetry operations.
Returns:
The return value from the operation, or None if telemetry is disabled or fails.
"""
if not self._should_execute_telemetry():
return
return None
try:
operation()
return operation()
except Exception as e:
logger.debug(f"Telemetry operation failed: {e}")
return None
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None) -> None:
"""Records the creation of a crew.
@@ -176,7 +182,7 @@ class Telemetry:
inputs: Optional input parameters for the crew.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Created")
self._add_attribute(
@@ -386,7 +392,7 @@ class Telemetry:
The span tracking the task execution, or None if telemetry is disabled.
"""
def _operation():
def _operation() -> Span:
tracer = trace.get_tracer("crewai.telemetry")
created_span = tracer.start_span("Task Created")
@@ -445,11 +451,7 @@ class Telemetry:
return span
if not self._should_execute_telemetry():
return None
self._safe_telemetry_operation(_operation)
return _operation()
return self._safe_telemetry_operation(_operation)
def task_ended(self, span: Span, task: Task, crew: Crew) -> None:
"""Records the completion of a task execution in a crew.
@@ -463,7 +465,7 @@ class Telemetry:
If share_crew is enabled, this will also record the task output.
"""
def _operation():
def _operation() -> None:
# Ensure fingerprint data is present on completion span
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
@@ -488,7 +490,7 @@ class Telemetry:
attempts: Number of attempts made with this tool.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
self._add_attribute(
@@ -516,7 +518,7 @@ class Telemetry:
agent: The agent using the tool.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
self._add_attribute(
@@ -546,7 +548,7 @@ class Telemetry:
tool_name: Name of the tool that caused the error.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
self._add_attribute(
@@ -578,7 +580,7 @@ class Telemetry:
model_name: Name of the model used.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -613,7 +615,7 @@ class Telemetry:
model_name: Name of the model used in testing.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -640,7 +642,7 @@ class Telemetry:
def deploy_signup_error_span(self) -> None:
"""Records when an error occurs during the deployment signup process."""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
close_span(span)
@@ -654,7 +656,7 @@ class Telemetry:
uuid: Unique identifier for the deployment.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
if uuid:
@@ -666,7 +668,7 @@ class Telemetry:
def create_crew_deployment_span(self) -> None:
"""Records the creation of a new crew deployment."""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
close_span(span)
@@ -683,7 +685,7 @@ class Telemetry:
log_type: Type of logs being retrieved. Defaults to "deployment".
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
self._add_attribute(span, "log_type", log_type)
@@ -700,7 +702,7 @@ class Telemetry:
uuid: Unique identifier for the crew being removed.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
if uuid:
@@ -725,7 +727,7 @@ class Telemetry:
"""
self.crew_creation(crew, inputs)
def _operation():
def _operation() -> Span:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(
@@ -793,8 +795,7 @@ class Telemetry:
return span
if crew.share_crew:
self._safe_telemetry_operation(_operation)
return _operation()
return self._safe_telemetry_operation(_operation)
return None
def end_crew(self, crew: Any, final_string_output: str) -> None:
@@ -805,7 +806,7 @@ class Telemetry:
final_string_output: The final output from the crew.
"""
def _operation():
def _operation() -> None:
self._add_attribute(
crew._execution_span,
"crewai_version",
@@ -842,7 +843,7 @@ class Telemetry:
value: The attribute value.
"""
def _operation():
def _operation() -> None:
return span.set_attribute(key, value)
self._safe_telemetry_operation(_operation)
@@ -854,7 +855,7 @@ class Telemetry:
flow_name: Name of the flow being created.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
@@ -870,7 +871,7 @@ class Telemetry:
node_names: List of node names in the flow.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
self._add_attribute(span, "flow_name", flow_name)
@@ -887,7 +888,7 @@ class Telemetry:
node_names: List of nodes being executed in the flow.
"""
def _operation():
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")
self._add_attribute(span, "flow_name", flow_name)