From 04a03d332f00db35a2bb683b2002ecb95b9d5a24 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Fri, 15 Aug 2025 13:37:16 -0700 Subject: [PATCH] Lorenze/emphemeral tracing (#3323) * for ephemeral traces * default false * simpler and consolidated * keep raising exception but catch it and continue if its for trace batches * cleanup * more cleanup * not using logger * refactor: rename TEMP_TRACING_RESOURCE to EPHEMERAL_TRACING_RESOURCE for clarity and consistency in PlusAPI; update related method calls accordingly * default true * drop print --- src/crewai/cli/authentication/token.py | 6 +- src/crewai/cli/plus_api.py | 24 +++ src/crewai/crew.py | 10 +- src/crewai/flow/flow.py | 13 +- .../listeners/tracing/trace_batch_manager.py | 68 ++++++-- .../listeners/tracing/trace_listener.py | 44 +++--- .../events/listeners/tracing/utils.py | 148 ++++++++++++++++++ tests/conftest.py | 4 +- tests/tracing/test_tracing.py | 51 +++--- 9 files changed, 306 insertions(+), 62 deletions(-) diff --git a/src/crewai/cli/authentication/token.py b/src/crewai/cli/authentication/token.py index 46ff75e89..e59a13b38 100644 --- a/src/crewai/cli/authentication/token.py +++ b/src/crewai/cli/authentication/token.py @@ -1,9 +1,13 @@ from .utils import TokenManager +class AuthError(Exception): + pass + + def get_auth_token() -> str: """Get the authentication token.""" access_token = TokenManager().get_token() if not access_token: - raise Exception("No token found, make sure you are logged in") + raise AuthError("No token found, make sure you are logged in") return access_token diff --git a/src/crewai/cli/plus_api.py b/src/crewai/cli/plus_api.py index 26f0953f0..d86858c4a 100644 --- a/src/crewai/cli/plus_api.py +++ b/src/crewai/cli/plus_api.py @@ -18,6 +18,7 @@ class PlusAPI: CREWS_RESOURCE = "/crewai_plus/api/v1/crews" AGENTS_RESOURCE = "/crewai_plus/api/v1/agents" TRACING_RESOURCE = "/crewai_plus/api/v1/tracing" + EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral" def __init__(self, api_key: str) -> None: self.api_key = api_key @@ -124,6 +125,11 @@ class PlusAPI: "POST", f"{self.TRACING_RESOURCE}/batches", json=payload ) + def initialize_ephemeral_trace_batch(self, payload) -> requests.Response: + return self._make_request( + "POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload + ) + def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response: return self._make_request( "POST", @@ -131,9 +137,27 @@ class PlusAPI: json=payload, ) + def send_ephemeral_trace_events( + self, trace_batch_id: str, payload + ) -> requests.Response: + return self._make_request( + "POST", + f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events", + json=payload, + ) + def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response: return self._make_request( "PATCH", f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize", json=payload, ) + + def finalize_ephemeral_trace_batch( + self, trace_batch_id: str, payload + ) -> requests.Response: + return self._make_request( + "PATCH", + f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize", + json=payload, + ) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 5e654463c..a14c8f278 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -77,7 +77,10 @@ from crewai.utilities.events.listeners.tracing.trace_listener import ( ) -from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled +from crewai.utilities.events.listeners.tracing.utils import ( + is_tracing_enabled, + on_first_execution_tracing_confirmation, +) from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -283,8 +286,11 @@ class Crew(FlowTrackable, BaseModel): self._cache_handler = CacheHandler() event_listener = EventListener() + if on_first_execution_tracing_confirmation(): + self.tracing = True + if is_tracing_enabled() or self.tracing: - trace_listener = TraceCollectionListener(tracing=self.tracing) + trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose event_listener.formatter.verbose = self.verbose diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 111cae7d6..35a83ea1f 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -38,7 +38,10 @@ from crewai.utilities.events.flow_events import ( from crewai.utilities.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) -from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled +from crewai.utilities.events.listeners.tracing.utils import ( + is_tracing_enabled, + on_first_execution_tracing_confirmation, +) from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -476,8 +479,12 @@ class Flow(Generic[T], metaclass=FlowMeta): # Initialize state with initial values self._state = self._create_initial_state() self.tracing = tracing - if is_tracing_enabled() or tracing: - trace_listener = TraceCollectionListener(tracing=tracing) + if ( + on_first_execution_tracing_confirmation() + or is_tracing_enabled() + or self.tracing + ): + trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) # Apply any additional kwargs if kwargs: diff --git a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py index 4a1f06621..c06d51af7 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_batch_manager.py @@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from crewai.utilities.constants import CREWAI_BASE_URL -from crewai.cli.authentication.token import get_auth_token +from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.version import get_crewai_version from crewai.cli.plus_api import PlusAPI @@ -41,14 +41,21 @@ class TraceBatchManager: """Single responsibility: Manage batches and event buffering""" def __init__(self): - self.plus_api = PlusAPI(api_key=get_auth_token()) + try: + self.plus_api = PlusAPI(api_key=get_auth_token()) + except AuthError: + self.plus_api = PlusAPI(api_key="") + self.trace_batch_id: Optional[str] = None # Backend ID self.current_batch: Optional[TraceBatch] = None self.event_buffer: List[TraceEvent] = [] self.execution_start_times: Dict[str, datetime] = {} def initialize_batch( - self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] + self, + user_context: Dict[str, str], + execution_metadata: Dict[str, Any], + use_ephemeral: bool = False, ) -> TraceBatch: """Initialize a new trace batch""" self.current_batch = TraceBatch( @@ -57,13 +64,15 @@ class TraceBatchManager: self.event_buffer.clear() self.record_start_time("execution") - - self._initialize_backend_batch(user_context, execution_metadata) + self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral) return self.current_batch def _initialize_backend_batch( - self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] + self, + user_context: Dict[str, str], + execution_metadata: Dict[str, Any], + use_ephemeral: bool = False, ): """Send batch initialization to backend""" @@ -74,6 +83,7 @@ class TraceBatchManager: payload = { "trace_id": self.current_batch.batch_id, "execution_type": execution_metadata.get("execution_type", "crew"), + "user_identifier": execution_metadata.get("user_context", None), "execution_context": { "crew_fingerprint": execution_metadata.get("crew_fingerprint"), "crew_name": execution_metadata.get("crew_name", None), @@ -91,12 +101,22 @@ class TraceBatchManager: "execution_started_at": datetime.now(timezone.utc).isoformat(), }, } + if use_ephemeral: + payload["ephemeral_trace_id"] = self.current_batch.batch_id - response = self.plus_api.initialize_trace_batch(payload) + response = ( + self.plus_api.initialize_ephemeral_trace_batch(payload) + if use_ephemeral + else self.plus_api.initialize_trace_batch(payload) + ) if response.status_code == 201 or response.status_code == 200: response_data = response.json() - self.trace_batch_id = response_data["trace_id"] + self.trace_batch_id = ( + response_data["trace_id"] + if not use_ephemeral + else response_data["ephemeral_trace_id"] + ) console = Console() panel = Panel( f"✅ Trace batch initialized with session ID: {self.trace_batch_id}", @@ -116,7 +136,7 @@ class TraceBatchManager: """Add event to buffer""" self.event_buffer.append(trace_event) - def _send_events_to_backend(self): + def _send_events_to_backend(self, ephemeral: bool = True): """Send buffered events to backend""" if not self.plus_api or not self.trace_batch_id or not self.event_buffer: return @@ -134,7 +154,11 @@ class TraceBatchManager: if not self.trace_batch_id: raise Exception("❌ Trace batch ID not found") - response = self.plus_api.send_trace_events(self.trace_batch_id, payload) + response = ( + self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload) + if ephemeral + else self.plus_api.send_trace_events(self.trace_batch_id, payload) + ) if response.status_code == 200 or response.status_code == 201: self.event_buffer.clear() @@ -146,15 +170,15 @@ class TraceBatchManager: except Exception as e: logger.error(f"❌ Error sending events to backend: {str(e)}") - def finalize_batch(self) -> Optional[TraceBatch]: + def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]: """Finalize batch and return it for sending""" if not self.current_batch: return None if self.event_buffer: - self._send_events_to_backend() + self._send_events_to_backend(ephemeral) - self._finalize_backend_batch() + self._finalize_backend_batch(ephemeral) self.current_batch.events = self.event_buffer.copy() @@ -168,7 +192,7 @@ class TraceBatchManager: return finalized_batch - def _finalize_backend_batch(self): + def _finalize_backend_batch(self, ephemeral: bool = True): """Send batch finalization to backend""" if not self.plus_api or not self.trace_batch_id: return @@ -182,12 +206,24 @@ class TraceBatchManager: "final_event_count": total_events, } - response = self.plus_api.finalize_trace_batch(self.trace_batch_id, payload) + response = ( + self.plus_api.finalize_ephemeral_trace_batch( + self.trace_batch_id, payload + ) + if ephemeral + else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload) + ) if response.status_code == 200: + access_code = response.json().get("access_code", None) console = Console() + return_link = ( + f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}" + if not ephemeral and access_code + else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" + ) panel = Panel( - f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}", + f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", title="Trace Batch Finalization", border_style="green", ) diff --git a/src/crewai/utilities/events/listeners/tracing/trace_listener.py b/src/crewai/utilities/events/listeners/tracing/trace_listener.py index e8d68ccae..eaa993c8f 100644 --- a/src/crewai/utilities/events/listeners/tracing/trace_listener.py +++ b/src/crewai/utilities/events/listeners/tracing/trace_listener.py @@ -13,7 +13,6 @@ from crewai.utilities.events.agent_events import ( AgentExecutionErrorEvent, ) from crewai.utilities.events.listeners.tracing.types import TraceEvent -from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled from crewai.utilities.events.reasoning_events import ( AgentReasoningStartedEvent, AgentReasoningCompletedEvent, @@ -67,7 +66,7 @@ from crewai.utilities.events.memory_events import ( MemorySaveFailedEvent, ) -from crewai.cli.authentication.token import get_auth_token +from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.version import get_crewai_version @@ -76,13 +75,12 @@ class TraceCollectionListener(BaseEventListener): Trace collection listener that orchestrates trace collection """ - trace_enabled: Optional[bool] = False complex_events = ["task_started", "llm_call_started", "llm_call_completed"] _instance = None _initialized = False - def __new__(cls, batch_manager=None, tracing: Optional[bool] = False): + def __new__(cls, batch_manager=None): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @@ -90,25 +88,22 @@ class TraceCollectionListener(BaseEventListener): def __init__( self, batch_manager: Optional[TraceBatchManager] = None, - tracing: Optional[bool] = False, ): if self._initialized: return super().__init__() self.batch_manager = batch_manager or TraceBatchManager() - self.tracing = tracing or False - self.trace_enabled = self._check_trace_enabled() self._initialized = True - def _check_trace_enabled(self) -> bool: + def _check_authenticated(self) -> bool: """Check if tracing should be enabled""" - auth_token = get_auth_token() - if not auth_token: + try: + res = bool(get_auth_token()) + return res + except AuthError: return False - return is_tracing_enabled() or self.tracing - def _get_user_context(self) -> Dict[str, str]: """Extract user context for tracing""" return { @@ -120,8 +115,6 @@ class TraceCollectionListener(BaseEventListener): def setup_listeners(self, crewai_event_bus): """Setup event listeners - delegates to specific handlers""" - if not self.trace_enabled: - return self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) @@ -167,13 +160,13 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(CrewKickoffStartedEvent) def on_crew_started(source, event): if not self.batch_manager.is_batch_initialized(): - self._initialize_batch(source, event) + self._initialize_crew_batch(source, event) self._handle_trace_event("crew_kickoff_started", source, event) @event_bus.on(CrewKickoffCompletedEvent) def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) - self.batch_manager.finalize_batch() + self.batch_manager.finalize_batch(ephemeral=True) @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): @@ -287,7 +280,7 @@ class TraceCollectionListener(BaseEventListener): def on_agent_reasoning_failed(source, event): self._handle_action_event("agent_reasoning_failed", source, event) - def _initialize_batch(self, source: Any, event: Any): + def _initialize_crew_batch(self, source: Any, event: Any): """Initialize trace batch""" user_context = self._get_user_context() execution_metadata = { @@ -296,7 +289,7 @@ class TraceCollectionListener(BaseEventListener): "crewai_version": get_crewai_version(), } - self.batch_manager.initialize_batch(user_context, execution_metadata) + self._initialize_batch(user_context, execution_metadata) def _initialize_flow_batch(self, source: Any, event: Any): """Initialize trace batch for Flow execution""" @@ -308,7 +301,20 @@ class TraceCollectionListener(BaseEventListener): "execution_type": "flow", } - self.batch_manager.initialize_batch(user_context, execution_metadata) + self._initialize_batch(user_context, execution_metadata) + + def _initialize_batch( + self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] + ): + """Initialize trace batch if ephemeral""" + if not self._check_authenticated(): + self.batch_manager.initialize_batch( + user_context, execution_metadata, use_ephemeral=True + ) + else: + self.batch_manager.initialize_batch( + user_context, execution_metadata, use_ephemeral=False + ) def _handle_trace_event(self, event_type: str, source: Any, event: Any): """Generic handler for context end events""" diff --git a/src/crewai/utilities/events/listeners/tracing/utils.py b/src/crewai/utilities/events/listeners/tracing/utils.py index e498bc4f9..89a27a7d5 100644 --- a/src/crewai/utilities/events/listeners/tracing/utils.py +++ b/src/crewai/utilities/events/listeners/tracing/utils.py @@ -1,5 +1,153 @@ import os +import platform +import uuid +import hashlib +import subprocess +import getpass +from pathlib import Path +from datetime import datetime +import re +import json + +import click + +from crewai.utilities.paths import db_storage_path def is_tracing_enabled() -> bool: return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true" + + +def on_first_execution_tracing_confirmation() -> bool: + if _is_test_environment(): + return False + + if is_first_execution(): + mark_first_execution_done() + return click.confirm( + "This is the first execution of CrewAI. Do you want to enable tracing?", + default=True, + show_default=True, + ) + return False + + +def _is_test_environment() -> bool: + """Detect if we're running in a test environment.""" + return os.environ.get("CREWAI_TESTING", "").lower() == "true" + + +def _get_machine_id() -> str: + """Stable, privacy-preserving machine fingerprint (cross-platform).""" + parts = [] + + try: + mac = ":".join( + ["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][ + ::-1 + ] + ) + parts.append(mac) + except Exception: + pass + + sysname = platform.system() + parts.append(sysname) + + try: + if sysname == "Darwin": + res = subprocess.run( + ["system_profiler", "SPHardwareDataType"], + capture_output=True, + text=True, + timeout=2, + ) + m = re.search(r"Hardware UUID:\s*([A-Fa-f0-9\-]+)", res.stdout) + if m: + parts.append(m.group(1)) + elif sysname == "Linux": + try: + parts.append(Path("/etc/machine-id").read_text().strip()) + except Exception: + parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip()) + elif sysname == "Windows": + res = subprocess.run( + ["wmic", "csproduct", "get", "UUID"], + capture_output=True, + text=True, + timeout=2, + ) + lines = [line.strip() for line in res.stdout.splitlines() if line.strip()] + if len(lines) >= 2: + parts.append(lines[1]) + except Exception: + pass + + return hashlib.sha256("".join(parts).encode()).hexdigest() + + +def _user_data_file() -> Path: + base = Path(db_storage_path()) + base.mkdir(parents=True, exist_ok=True) + return base / ".crewai_user.json" + + +def _load_user_data() -> dict: + p = _user_data_file() + if p.exists(): + try: + return json.loads(p.read_text()) + except Exception: + pass + return {} + + +def _save_user_data(data: dict) -> None: + try: + p = _user_data_file() + p.write_text(json.dumps(data, indent=2)) + except Exception: + pass + + +def get_user_id() -> str: + """Stable, anonymized user identifier with caching.""" + data = _load_user_data() + + if "user_id" in data: + return data["user_id"] + + try: + username = getpass.getuser() + except Exception: + username = "unknown" + + seed = f"{username}|{_get_machine_id()}" + uid = hashlib.sha256(seed.encode()).hexdigest() + + data["user_id"] = uid + _save_user_data(data) + return uid + + +def is_first_execution() -> bool: + """True if this is the first execution for this user.""" + data = _load_user_data() + return not data.get("first_execution_done", False) + + +def mark_first_execution_done() -> None: + """Mark that the first execution has been completed.""" + data = _load_user_data() + if data.get("first_execution_done", False): + return + + data.update( + { + "first_execution_done": True, + "first_execution_at": datetime.now().timestamp(), + "user_id": get_user_id(), + "machine_id": _get_machine_id(), + } + ) + _save_user_data(data) diff --git a/tests/conftest.py b/tests/conftest.py index 2a6f3b481..f67ad7222 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,11 +34,11 @@ def setup_test_environment(): f"Test storage directory {storage_dir} is not writable: {e}" ) - # Set environment variable to point to the test storage directory os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir) - + os.environ["CREWAI_TESTING"] = "true" yield + os.environ.pop("CREWAI_TESTING", None) # Cleanup is handled automatically when tempfile context exits diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 806eab3fb..62ef56c8b 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -2,8 +2,8 @@ import os import pytest from unittest.mock import patch, MagicMock -# Remove the module-level patch from crewai import Agent, Task, Crew +from crewai.flow.flow import Flow, start from crewai.utilities.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) @@ -284,29 +284,42 @@ class TestTraceListenerSetup: f"Found {len(trace_handlers)} trace handlers when tracing should be disabled" ) - def test_trace_listener_setup_correctly(self): + def test_trace_listener_setup_correctly_for_crew(self): """Test that trace listener is set up correctly when enabled""" with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}): - trace_listener = TraceCollectionListener() + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + with patch.object( + TraceCollectionListener, "setup_listeners" + ) as mock_listener_setup: + Crew(agents=[agent], tasks=[task], verbose=True) + assert mock_listener_setup.call_count >= 1 - assert trace_listener.trace_enabled is True - assert trace_listener.batch_manager is not None - - @pytest.mark.vcr(filter_headers=["authorization"]) - def test_trace_listener_setup_correctly_with_tracing_flag(self): + def test_trace_listener_setup_correctly_for_flow(self): """Test that trace listener is set up correctly when enabled""" - agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory") - task = Task( - description="Say hello to the world", - expected_output="hello world", - agent=agent, - ) - crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True) - crew.kickoff() - trace_listener = TraceCollectionListener(tracing=True) - assert trace_listener.trace_enabled is True - assert trace_listener.batch_manager is not None + + with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}): + + class FlowExample(Flow): + @start() + def start(self): + pass + + with patch.object( + TraceCollectionListener, "setup_listeners" + ) as mock_listener_setup: + FlowExample() + assert mock_listener_setup.call_count >= 1 # Helper method to ensure cleanup def teardown_method(self):