Lorenze/emphemeral tracing (#3323)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* for ephemeral traces

* default false

* simpler and consolidated

* keep raising exception but catch it and continue if its for trace batches

* cleanup

* more cleanup

* not using logger

* refactor: rename TEMP_TRACING_RESOURCE to EPHEMERAL_TRACING_RESOURCE for clarity and consistency in PlusAPI; update related method calls accordingly

* default true

* drop print
This commit is contained in:
Lorenze Jay
2025-08-15 13:37:16 -07:00
committed by GitHub
parent 992e093610
commit 04a03d332f
9 changed files with 306 additions and 62 deletions

View File

@@ -1,9 +1,13 @@
from .utils import TokenManager
class AuthError(Exception):
pass
def get_auth_token() -> str:
"""Get the authentication token."""
access_token = TokenManager().get_token()
if not access_token:
raise Exception("No token found, make sure you are logged in")
raise AuthError("No token found, make sure you are logged in")
return access_token

View File

@@ -18,6 +18,7 @@ class PlusAPI:
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -124,6 +125,11 @@ class PlusAPI:
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"POST",
@@ -131,9 +137,27 @@ class PlusAPI:
json=payload,
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)

View File

@@ -77,7 +77,10 @@ from crewai.utilities.events.listeners.tracing.trace_listener import (
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -283,8 +286,11 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if on_first_execution_tracing_confirmation():
self.tracing = True
if is_tracing_enabled() or self.tracing:
trace_listener = TraceCollectionListener(tracing=self.tracing)
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose

View File

@@ -38,7 +38,10 @@ from crewai.utilities.events.flow_events import (
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -476,8 +479,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Initialize state with initial values
self._state = self._create_initial_state()
self.tracing = tracing
if is_tracing_enabled() or tracing:
trace_listener = TraceCollectionListener(tracing=tracing)
if (
on_first_execution_tracing_confirmation()
or is_tracing_enabled()
or self.tracing
):
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:

View File

@@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
@@ -41,14 +41,21 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
def __init__(self):
self.plus_api = PlusAPI(api_key=get_auth_token())
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
) -> TraceBatch:
"""Initialize a new trace batch"""
self.current_batch = TraceBatch(
@@ -57,13 +64,15 @@ class TraceBatchManager:
self.event_buffer.clear()
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata)
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
return self.current_batch
def _initialize_backend_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
):
"""Send batch initialization to backend"""
@@ -74,6 +83,7 @@ class TraceBatchManager:
payload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"user_identifier": execution_metadata.get("user_context", None),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
@@ -91,12 +101,22 @@ class TraceBatchManager:
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
}
if use_ephemeral:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
response = self.plus_api.initialize_trace_batch(payload)
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response.status_code == 201 or response.status_code == 200:
response_data = response.json()
self.trace_batch_id = response_data["trace_id"]
self.trace_batch_id = (
response_data["trace_id"]
if not use_ephemeral
else response_data["ephemeral_trace_id"]
)
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
@@ -116,7 +136,7 @@ class TraceBatchManager:
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self):
def _send_events_to_backend(self, ephemeral: bool = True):
"""Send buffered events to backend"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
@@ -134,7 +154,11 @@ class TraceBatchManager:
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = self.plus_api.send_trace_events(self.trace_batch_id, payload)
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
if response.status_code == 200 or response.status_code == 201:
self.event_buffer.clear()
@@ -146,15 +170,15 @@ class TraceBatchManager:
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
def finalize_batch(self) -> Optional[TraceBatch]:
def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
if self.event_buffer:
self._send_events_to_backend()
self._send_events_to_backend(ephemeral)
self._finalize_backend_batch()
self._finalize_backend_batch(ephemeral)
self.current_batch.events = self.event_buffer.copy()
@@ -168,7 +192,7 @@ class TraceBatchManager:
return finalized_batch
def _finalize_backend_batch(self):
def _finalize_backend_batch(self, ephemeral: bool = True):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
@@ -182,12 +206,24 @@ class TraceBatchManager:
"final_event_count": total_events,
}
response = self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
response = (
self.plus_api.finalize_ephemeral_trace_batch(
self.trace_batch_id, payload
)
if ephemeral
else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
)
if response.status_code == 200:
access_code = response.json().get("access_code", None)
console = Console()
return_link = (
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not ephemeral and access_code
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}",
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)

View File

@@ -13,7 +13,6 @@ from crewai.utilities.events.agent_events import (
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
@@ -67,7 +66,7 @@ from crewai.utilities.events.memory_events import (
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
@@ -76,13 +75,12 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
trace_enabled: Optional[bool] = False
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
_instance = None
_initialized = False
def __new__(cls, batch_manager=None, tracing: Optional[bool] = False):
def __new__(cls, batch_manager=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -90,25 +88,22 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
tracing: Optional[bool] = False,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.tracing = tracing or False
self.trace_enabled = self._check_trace_enabled()
self._initialized = True
def _check_trace_enabled(self) -> bool:
def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled"""
auth_token = get_auth_token()
if not auth_token:
try:
res = bool(get_auth_token())
return res
except AuthError:
return False
return is_tracing_enabled() or self.tracing
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
return {
@@ -120,8 +115,6 @@ class TraceCollectionListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if not self.trace_enabled:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
@@ -167,13 +160,13 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_batch(source, event)
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch()
self.batch_manager.finalize_batch(ephemeral=True)
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
@@ -287,7 +280,7 @@ class TraceCollectionListener(BaseEventListener):
def on_agent_reasoning_failed(source, event):
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_batch(self, source: Any, event: Any):
def _initialize_crew_batch(self, source: Any, event: Any):
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
@@ -296,7 +289,7 @@ class TraceCollectionListener(BaseEventListener):
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
"""Initialize trace batch for Flow execution"""
@@ -308,7 +301,20 @@ class TraceCollectionListener(BaseEventListener):
"execution_type": "flow",
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True
)
else:
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=False
)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events"""

View File

@@ -1,5 +1,153 @@
import os
import platform
import uuid
import hashlib
import subprocess
import getpass
from pathlib import Path
from datetime import datetime
import re
import json
import click
from crewai.utilities.paths import db_storage_path
def is_tracing_enabled() -> bool:
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
def on_first_execution_tracing_confirmation() -> bool:
if _is_test_environment():
return False
if is_first_execution():
mark_first_execution_done()
return click.confirm(
"This is the first execution of CrewAI. Do you want to enable tracing?",
default=True,
show_default=True,
)
return False
def _is_test_environment() -> bool:
"""Detect if we're running in a test environment."""
return os.environ.get("CREWAI_TESTING", "").lower() == "true"
def _get_machine_id() -> str:
"""Stable, privacy-preserving machine fingerprint (cross-platform)."""
parts = []
try:
mac = ":".join(
["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][
::-1
]
)
parts.append(mac)
except Exception:
pass
sysname = platform.system()
parts.append(sysname)
try:
if sysname == "Darwin":
res = subprocess.run(
["system_profiler", "SPHardwareDataType"],
capture_output=True,
text=True,
timeout=2,
)
m = re.search(r"Hardware UUID:\s*([A-Fa-f0-9\-]+)", res.stdout)
if m:
parts.append(m.group(1))
elif sysname == "Linux":
try:
parts.append(Path("/etc/machine-id").read_text().strip())
except Exception:
parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip())
elif sysname == "Windows":
res = subprocess.run(
["wmic", "csproduct", "get", "UUID"],
capture_output=True,
text=True,
timeout=2,
)
lines = [line.strip() for line in res.stdout.splitlines() if line.strip()]
if len(lines) >= 2:
parts.append(lines[1])
except Exception:
pass
return hashlib.sha256("".join(parts).encode()).hexdigest()
def _user_data_file() -> Path:
base = Path(db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _load_user_data() -> dict:
p = _user_data_file()
if p.exists():
try:
return json.loads(p.read_text())
except Exception:
pass
return {}
def _save_user_data(data: dict) -> None:
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except Exception:
pass
def get_user_id() -> str:
"""Stable, anonymized user identifier with caching."""
data = _load_user_data()
if "user_id" in data:
return data["user_id"]
try:
username = getpass.getuser()
except Exception:
username = "unknown"
seed = f"{username}|{_get_machine_id()}"
uid = hashlib.sha256(seed.encode()).hexdigest()
data["user_id"] = uid
_save_user_data(data)
return uid
def is_first_execution() -> bool:
"""True if this is the first execution for this user."""
data = _load_user_data()
return not data.get("first_execution_done", False)
def mark_first_execution_done() -> None:
"""Mark that the first execution has been completed."""
data = _load_user_data()
if data.get("first_execution_done", False):
return
data.update(
{
"first_execution_done": True,
"first_execution_at": datetime.now().timestamp(),
"user_id": get_user_id(),
"machine_id": _get_machine_id(),
}
)
_save_user_data(data)

View File

@@ -34,11 +34,11 @@ def setup_test_environment():
f"Test storage directory {storage_dir} is not writable: {e}"
)
# Set environment variable to point to the test storage directory
os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir)
os.environ["CREWAI_TESTING"] = "true"
yield
os.environ.pop("CREWAI_TESTING", None)
# Cleanup is handled automatically when tempfile context exits

View File

@@ -2,8 +2,8 @@ import os
import pytest
from unittest.mock import patch, MagicMock
# Remove the module-level patch
from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, start
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -284,29 +284,42 @@ class TestTraceListenerSetup:
f"Found {len(trace_handlers)} trace handlers when tracing should be disabled"
)
def test_trace_listener_setup_correctly(self):
def test_trace_listener_setup_correctly_for_crew(self):
"""Test that trace listener is set up correctly when enabled"""
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
trace_listener = TraceCollectionListener()
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
Crew(agents=[agent], tasks=[task], verbose=True)
assert mock_listener_setup.call_count >= 1
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_trace_listener_setup_correctly_with_tracing_flag(self):
def test_trace_listener_setup_correctly_for_flow(self):
"""Test that trace listener is set up correctly when enabled"""
agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory")
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True)
crew.kickoff()
trace_listener = TraceCollectionListener(tracing=True)
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
class FlowExample(Flow):
@start()
def start(self):
pass
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
FlowExample()
assert mock_listener_setup.call_count >= 1
# Helper method to ensure cleanup
def teardown_method(self):