From 578fa8c2e4d74faa2cccce5fcfbdaee2535a0800 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Thu, 18 Sep 2025 10:17:34 -0700 Subject: [PATCH] Lorenze/ephemeral trace ask (#3530) * feat(tracing): implement first-time trace handling and improve event management - Added FirstTimeTraceHandler for managing first-time user trace collection and display. - Enhanced TraceBatchManager to support ephemeral trace URLs and improved event buffering. - Updated TraceCollectionListener to utilize the new FirstTimeTraceHandler. - Refactored type annotations across multiple files for consistency and clarity. - Improved error handling and logging for trace-related operations. - Introduced utility functions for trace viewing prompts and first execution checks. * brought back crew finalize batch events * refactor(trace): move instance variables to __init__ in TraceBatchManager - Refactored TraceBatchManager to initialize instance variables in the constructor instead of as class variables. - Improved clarity and encapsulation of the class state. * fix(tracing): improve error handling in user data loading and saving - Enhanced error handling in _load_user_data and _save_user_data functions to log warnings for JSON decoding and file access issues. - Updated documentation for trace usage to clarify the addition of tracing parameters in Crew and Flow initialization. - Refined state management in Flow class to ensure proper handling of state IDs when persistence is enabled. * add some tests * fix test * fix tests * refactor(tracing): enhance user input handling for trace viewing - Replaced signal-based timeout handling with threading for user input in prompt_user_for_trace_viewing function. - Improved user experience by allowing a configurable timeout for viewing execution traces. - Updated tests to mock threading behavior and verify timeout handling correctly. * fix(tracing): improve machine ID retrieval with error handling - Added error handling to the _get_machine_id function to log warnings when retrieving the machine ID fails. - Ensured that the function continues to provide a stable, privacy-preserving machine fingerprint even in case of errors. * refactor(flow): streamline state ID assignment in Flow class - Replaced direct attribute assignment with setattr for improved flexibility in handling state IDs. - Enhanced code readability by simplifying the logic for setting the state ID when persistence is enabled. --- src/crewai/crew.py | 17 +- .../tracing/first_time_trace_handler.py | 174 +++++++++++ .../listeners/tracing/trace_batch_manager.py | 90 +++--- .../listeners/tracing/trace_listener.py | 205 ++++++------- src/crewai/events/listeners/tracing/utils.py | 142 +++++++-- src/crewai/flow/flow.py | 148 +++++----- ...me_user_trace_collection_user_accepts.yaml | 130 +++++++++ ...me_user_trace_collection_with_timeout.yaml | 128 +++++++++ ...t_time_user_trace_consolidation_logic.yaml | 127 ++++++++ tests/tracing/test_tracing.py | 272 +++++++++++++++++- 10 files changed, 1173 insertions(+), 260 deletions(-) create mode 100644 src/crewai/events/listeners/tracing/first_time_trace_handler.py create mode 100644 tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_user_accepts.yaml create mode 100644 tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_with_timeout.yaml create mode 100644 tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_consolidation_logic.yaml diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 124966116..95b3e3c30 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -37,6 +37,7 @@ from crewai.events.listeners.tracing.trace_listener import ( ) from crewai.events.listeners.tracing.utils import ( is_tracing_enabled, + should_auto_collect_first_time_traces, ) from crewai.events.types.crew_events import ( CrewKickoffCompletedEvent, @@ -89,8 +90,8 @@ class Crew(FlowTrackable, BaseModel): tasks they should perform. Attributes: - tasks: List of tasks assigned to the crew. - agents: List of agents part of this crew. + tasks: list of tasks assigned to the crew. + agents: list of agents part of this crew. manager_llm: The language model that will run manager agent. manager_agent: Custom agent that will be used as manager. memory: Whether the crew should use memory to store memories of it's @@ -238,11 +239,11 @@ class Crew(FlowTrackable, BaseModel): ) task_execution_output_json_files: list[str] | None = Field( default=None, - description="List of file paths for task execution JSON files.", + description="list of file paths for task execution JSON files.", ) execution_logs: list[dict[str, Any]] = Field( default=[], - description="List of execution logs for tasks", + description="list of execution logs for tasks", ) knowledge_sources: list[BaseKnowledgeSource] | None = Field( default=None, @@ -296,12 +297,16 @@ class Crew(FlowTrackable, BaseModel): @model_validator(mode="after") def set_private_attrs(self) -> "Crew": - """Set private attributes.""" + """set private attributes.""" self._cache_handler = CacheHandler() event_listener = EventListener() - if is_tracing_enabled() or self.tracing: + if ( + is_tracing_enabled() + or self.tracing + or should_auto_collect_first_time_traces() + ): trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) event_listener.verbose = self.verbose diff --git a/src/crewai/events/listeners/tracing/first_time_trace_handler.py b/src/crewai/events/listeners/tracing/first_time_trace_handler.py new file mode 100644 index 000000000..1e66ec555 --- /dev/null +++ b/src/crewai/events/listeners/tracing/first_time_trace_handler.py @@ -0,0 +1,174 @@ +import logging +import uuid + +from rich.console import Console +from rich.panel import Panel + +from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager +from crewai.events.listeners.tracing.utils import ( + mark_first_execution_completed, + prompt_user_for_trace_viewing, + should_auto_collect_first_time_traces, +) + +logger = logging.getLogger(__name__) + + +class FirstTimeTraceHandler: + """Handles the first-time user trace collection and display flow.""" + + def __init__(self): + self.is_first_time: bool = False + self.collected_events: bool = False + self.trace_batch_id: str | None = None + self.ephemeral_url: str | None = None + self.batch_manager: TraceBatchManager | None = None + + def initialize_for_first_time_user(self) -> bool: + """Check if this is first time and initialize collection.""" + self.is_first_time = should_auto_collect_first_time_traces() + return self.is_first_time + + def set_batch_manager(self, batch_manager: TraceBatchManager): + """Set reference to batch manager for sending events.""" + self.batch_manager = batch_manager + + def mark_events_collected(self): + """Mark that events have been collected during execution.""" + self.collected_events = True + + def handle_execution_completion(self): + """Handle the completion flow as shown in your diagram.""" + if not self.is_first_time or not self.collected_events: + return + + try: + user_wants_traces = prompt_user_for_trace_viewing(timeout_seconds=20) + + if user_wants_traces: + self._initialize_backend_and_send_events() + + if self.ephemeral_url: + self._display_ephemeral_trace_link() + + mark_first_execution_completed() + + except Exception as e: + self._gracefully_fail(f"Error in trace handling: {e}") + mark_first_execution_completed() + + def _initialize_backend_and_send_events(self): + """Initialize backend batch and send collected events.""" + if not self.batch_manager: + return + + try: + if not self.batch_manager.backend_initialized: + original_metadata = ( + self.batch_manager.current_batch.execution_metadata + if self.batch_manager.current_batch + else {} + ) + + user_context = { + "privacy_level": "standard", + "user_id": "first_time_user", + "session_id": str(uuid.uuid4()), + "trace_id": self.batch_manager.trace_batch_id, + } + + execution_metadata = { + "execution_type": original_metadata.get("execution_type", "crew"), + "crew_name": original_metadata.get( + "crew_name", "First Time Execution" + ), + "flow_name": original_metadata.get("flow_name"), + "agent_count": original_metadata.get("agent_count", 1), + "task_count": original_metadata.get("task_count", 1), + "crewai_version": original_metadata.get("crewai_version"), + } + + self.batch_manager._initialize_backend_batch( + user_context=user_context, + execution_metadata=execution_metadata, + use_ephemeral=True, + ) + self.batch_manager.backend_initialized = True + + if self.batch_manager.event_buffer: + self.batch_manager._send_events_to_backend() + + self.batch_manager.finalize_batch() + self.ephemeral_url = self.batch_manager.ephemeral_trace_url + + if not self.ephemeral_url: + self._show_local_trace_message() + + except Exception as e: + self._gracefully_fail(f"Backend initialization failed: {e}") + + def _display_ephemeral_trace_link(self): + """Display the ephemeral trace link to the user.""" + console = Console() + + panel_content = f""" +🎉 Your First CrewAI Execution Trace is Ready! + +View your execution details here: +{self.ephemeral_url} + +This trace shows: +• Agent decisions and interactions +• Task execution timeline +• Tool usage and results +• LLM calls and responses + +To use traces add tracing=True to your Crew(tracing=True) / Flow(tracing=True) + +📝 Note: This link will expire in 24 hours. + """.strip() + + panel = Panel( + panel_content, + title="🔍 Execution Trace Generated", + border_style="bright_green", + padding=(1, 2), + ) + + console.print("\n") + console.print(panel) + console.print() + + def _gracefully_fail(self, error_message: str): + """Handle errors gracefully without disrupting user experience.""" + console = Console() + console.print(f"[yellow]Note: {error_message}[/yellow]") + + logger.debug(f"First-time trace error: {error_message}") + + def _show_local_trace_message(self): + """Show message when traces were collected locally but couldn't be uploaded.""" + console = Console() + + panel_content = f""" +📊 Your execution traces were collected locally! + +Unfortunately, we couldn't upload them to the server right now, but here's what we captured: +• {len(self.batch_manager.event_buffer)} trace events +• Execution duration: {self.batch_manager.calculate_duration("execution")}ms +• Batch ID: {self.batch_manager.trace_batch_id} + +The traces include agent decisions, task execution, and tool usage. +Try running with CREWAI_TRACING_ENABLED=true next time for persistent traces. + """.strip() + + panel = Panel( + panel_content, + title="🔍 Local Traces Collected", + border_style="yellow", + padding=(1, 2), + ) + + console.print("\n") + console.print(panel) + console.print() diff --git a/src/crewai/events/listeners/tracing/trace_batch_manager.py b/src/crewai/events/listeners/tracing/trace_batch_manager.py index 414bbdfd1..5bc898923 100644 --- a/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -1,18 +1,18 @@ import uuid -from datetime import datetime, timezone -from typing import Dict, List, Any, Optional from dataclasses import dataclass, field +from datetime import datetime, timezone +from logging import getLogger +from typing import Any -from crewai.utilities.constants import CREWAI_BASE_URL -from crewai.cli.authentication.token import AuthError, get_auth_token - -from crewai.cli.version import get_crewai_version -from crewai.cli.plus_api import PlusAPI from rich.console import Console from rich.panel import Panel +from crewai.cli.authentication.token import AuthError, get_auth_token +from crewai.cli.plus_api import PlusAPI +from crewai.cli.version import get_crewai_version from crewai.events.listeners.tracing.types import TraceEvent -from logging import getLogger +from crewai.events.listeners.tracing.utils import should_auto_collect_first_time_traces +from crewai.utilities.constants import CREWAI_BASE_URL logger = getLogger(__name__) @@ -23,11 +23,11 @@ class TraceBatch: version: str = field(default_factory=get_crewai_version) batch_id: str = field(default_factory=lambda: str(uuid.uuid4())) - user_context: Dict[str, str] = field(default_factory=dict) - execution_metadata: Dict[str, Any] = field(default_factory=dict) - events: List[TraceEvent] = field(default_factory=list) + user_context: dict[str, str] = field(default_factory=dict) + execution_metadata: dict[str, Any] = field(default_factory=dict) + events: list[TraceEvent] = field(default_factory=list) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: return { "version": self.version, "batch_id": self.batch_id, @@ -40,26 +40,28 @@ class TraceBatch: class TraceBatchManager: """Single responsibility: Manage batches and event buffering""" - is_current_batch_ephemeral: bool = False - trace_batch_id: Optional[str] = None - current_batch: Optional[TraceBatch] = None - event_buffer: List[TraceEvent] = [] - execution_start_times: Dict[str, datetime] = {} - batch_owner_type: Optional[str] = None - batch_owner_id: Optional[str] = None - def __init__(self): + self.is_current_batch_ephemeral: bool = False + self.trace_batch_id: str | None = None + self.current_batch: TraceBatch | None = None + self.event_buffer: list[TraceEvent] = [] + self.execution_start_times: dict[str, datetime] = {} + self.batch_owner_type: str | None = None + self.batch_owner_id: str | None = None + self.backend_initialized: bool = False + self.ephemeral_trace_url: str | None = None try: self.plus_api = PlusAPI( api_key=get_auth_token(), ) except AuthError: self.plus_api = PlusAPI(api_key="") + self.ephemeral_trace_url = None def initialize_batch( self, - user_context: Dict[str, str], - execution_metadata: Dict[str, Any], + user_context: dict[str, str], + execution_metadata: dict[str, Any], use_ephemeral: bool = False, ) -> TraceBatch: """Initialize a new trace batch""" @@ -70,14 +72,21 @@ class TraceBatchManager: self.is_current_batch_ephemeral = use_ephemeral self.record_start_time("execution") - self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral) + + if should_auto_collect_first_time_traces(): + self.trace_batch_id = self.current_batch.batch_id + else: + self._initialize_backend_batch( + user_context, execution_metadata, use_ephemeral + ) + self.backend_initialized = True return self.current_batch def _initialize_backend_batch( self, - user_context: Dict[str, str], - execution_metadata: Dict[str, Any], + user_context: dict[str, str], + execution_metadata: dict[str, Any], use_ephemeral: bool = False, ): """Send batch initialization to backend""" @@ -143,7 +152,7 @@ class TraceBatchManager: except Exception as e: logger.warning( - f"Error initializing trace batch: {str(e)}. Continuing without tracing." + f"Error initializing trace batch: {e}. Continuing without tracing." ) def add_event(self, trace_event: TraceEvent): @@ -154,7 +163,6 @@ class TraceBatchManager: """Send buffered events to backend with graceful failure handling""" if not self.plus_api or not self.trace_batch_id or not self.event_buffer: return 500 - try: payload = { "events": [event.to_dict() for event in self.event_buffer], @@ -178,19 +186,19 @@ class TraceBatchManager: if response.status_code in [200, 201]: self.event_buffer.clear() return 200 - else: - logger.warning( - f"Failed to send events: {response.status_code}. Events will be lost." - ) - return 500 - except Exception as e: logger.warning( - f"Error sending events to backend: {str(e)}. Events will be lost." + f"Failed to send events: {response.status_code}. Events will be lost." ) return 500 - def finalize_batch(self) -> Optional[TraceBatch]: + except Exception as e: + logger.warning( + f"Error sending events to backend: {e}. Events will be lost." + ) + return 500 + + def finalize_batch(self) -> TraceBatch | None: """Finalize batch and return it for sending""" if not self.current_batch: return None @@ -246,6 +254,10 @@ class TraceBatchManager: if not self.is_current_batch_ephemeral and access_code is None else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" ) + + if self.is_current_batch_ephemeral: + self.ephemeral_trace_url = return_link + panel = Panel( f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", title="Trace Batch Finalization", @@ -259,8 +271,8 @@ class TraceBatchManager: ) except Exception as e: - logger.error(f"❌ Error finalizing trace batch: {str(e)}") - # TODO: send error to app + logger.error(f"❌ Error finalizing trace batch: {e}") + # TODO: send error to app marking as failed def _cleanup_batch_data(self): """Clean up batch data after successful finalization to free memory""" @@ -277,7 +289,7 @@ class TraceBatchManager: self.batch_sequence = 0 except Exception as e: - logger.error(f"Warning: Error during cleanup: {str(e)}") + logger.error(f"Warning: Error during cleanup: {e}") def has_events(self) -> bool: """Check if there are events in the buffer""" @@ -306,7 +318,7 @@ class TraceBatchManager: return duration_ms return 0 - def get_trace_id(self) -> Optional[str]: + def get_trace_id(self) -> str | None: """Get current trace ID""" if self.current_batch: return self.current_batch.user_context.get("trace_id") diff --git a/src/crewai/events/listeners/tracing/trace_listener.py b/src/crewai/events/listeners/tracing/trace_listener.py index bc9a27d7f..a2a39bcac 100644 --- a/src/crewai/events/listeners/tracing/trace_listener.py +++ b/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,28 +1,59 @@ import os import uuid +from typing import Any, ClassVar -from typing import Dict, Any, Optional - +from crewai.cli.authentication.token import AuthError, get_auth_token +from crewai.cli.version import get_crewai_version from crewai.events.base_event_listener import BaseEventListener -from crewai.events.types.agent_events import ( - AgentExecutionCompletedEvent, - AgentExecutionStartedEvent, - LiteAgentExecutionStartedEvent, - LiteAgentExecutionCompletedEvent, - LiteAgentExecutionErrorEvent, - AgentExecutionErrorEvent, +from crewai.events.listeners.tracing.first_time_trace_handler import ( + FirstTimeTraceHandler, ) from crewai.events.listeners.tracing.types import TraceEvent -from crewai.events.types.reasoning_events import ( - AgentReasoningStartedEvent, - AgentReasoningCompletedEvent, - AgentReasoningFailedEvent, +from crewai.events.listeners.tracing.utils import safe_serialize_to_dict +from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, + LiteAgentExecutionCompletedEvent, + LiteAgentExecutionErrorEvent, + LiteAgentExecutionStartedEvent, ) from crewai.events.types.crew_events import ( CrewKickoffCompletedEvent, CrewKickoffFailedEvent, CrewKickoffStartedEvent, ) +from crewai.events.types.flow_events import ( + FlowCreatedEvent, + FlowFinishedEvent, + FlowPlotEvent, + FlowStartedEvent, + MethodExecutionFailedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) +from crewai.events.types.llm_events import ( + LLMCallCompletedEvent, + LLMCallFailedEvent, + LLMCallStartedEvent, +) +from crewai.events.types.llm_guardrail_events import ( + LLMGuardrailCompletedEvent, + LLMGuardrailStartedEvent, +) +from crewai.events.types.memory_events import ( + MemoryQueryCompletedEvent, + MemoryQueryFailedEvent, + MemoryQueryStartedEvent, + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, +) +from crewai.events.types.reasoning_events import ( + AgentReasoningCompletedEvent, + AgentReasoningFailedEvent, + AgentReasoningStartedEvent, +) from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -33,49 +64,16 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) -from crewai.events.types.llm_events import ( - LLMCallCompletedEvent, - LLMCallFailedEvent, - LLMCallStartedEvent, -) - -from crewai.events.types.flow_events import ( - FlowCreatedEvent, - FlowStartedEvent, - FlowFinishedEvent, - MethodExecutionStartedEvent, - MethodExecutionFinishedEvent, - MethodExecutionFailedEvent, - FlowPlotEvent, -) -from crewai.events.types.llm_guardrail_events import ( - LLMGuardrailStartedEvent, - LLMGuardrailCompletedEvent, -) -from crewai.utilities.serialization import to_serializable - from .trace_batch_manager import TraceBatchManager -from crewai.events.types.memory_events import ( - MemoryQueryStartedEvent, - MemoryQueryCompletedEvent, - MemoryQueryFailedEvent, - MemorySaveStartedEvent, - MemorySaveCompletedEvent, - MemorySaveFailedEvent, -) - -from crewai.cli.authentication.token import AuthError, get_auth_token -from crewai.cli.version import get_crewai_version - class TraceCollectionListener(BaseEventListener): """ Trace collection listener that orchestrates trace collection """ - complex_events = [ + complex_events: ClassVar[list[str]] = [ "task_started", "task_completed", "llm_call_started", @@ -88,14 +86,14 @@ class TraceCollectionListener(BaseEventListener): _initialized = False _listeners_setup = False - def __new__(cls, batch_manager=None): + def __new__(cls, batch_manager: TraceBatchManager | None = None): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__( self, - batch_manager: Optional[TraceBatchManager] = None, + batch_manager: TraceBatchManager | None = None, ): if self._initialized: return @@ -103,16 +101,19 @@ class TraceCollectionListener(BaseEventListener): super().__init__() self.batch_manager = batch_manager or TraceBatchManager() self._initialized = True + self.first_time_handler = FirstTimeTraceHandler() + + if self.first_time_handler.initialize_for_first_time_user(): + self.first_time_handler.set_batch_manager(self.batch_manager) def _check_authenticated(self) -> bool: """Check if tracing should be enabled""" try: - res = bool(get_auth_token()) - return res + return bool(get_auth_token()) except AuthError: return False - def _get_user_context(self) -> Dict[str, str]: + def _get_user_context(self) -> dict[str, str]: """Extract user context for tracing""" return { "user_id": os.getenv("CREWAI_USER_ID", "anonymous"), @@ -161,8 +162,14 @@ class TraceCollectionListener(BaseEventListener): @event_bus.on(FlowFinishedEvent) def on_flow_finished(source, event): self._handle_trace_event("flow_finished", source, event) + if self.batch_manager.batch_owner_type == "flow": - self.batch_manager.finalize_batch() + if self.first_time_handler.is_first_time: + self.first_time_handler.mark_events_collected() + self.first_time_handler.handle_execution_completion() + else: + # Normal flow finalization + self.batch_manager.finalize_batch() @event_bus.on(FlowPlotEvent) def on_flow_plot(source, event): @@ -181,12 +188,20 @@ class TraceCollectionListener(BaseEventListener): def on_crew_completed(source, event): self._handle_trace_event("crew_kickoff_completed", source, event) if self.batch_manager.batch_owner_type == "crew": - self.batch_manager.finalize_batch() + if self.first_time_handler.is_first_time: + self.first_time_handler.mark_events_collected() + self.first_time_handler.handle_execution_completion() + else: + self.batch_manager.finalize_batch() @event_bus.on(CrewKickoffFailedEvent) def on_crew_failed(source, event): self._handle_trace_event("crew_kickoff_failed", source, event) - self.batch_manager.finalize_batch() + if self.first_time_handler.is_first_time: + self.first_time_handler.mark_events_collected() + self.first_time_handler.handle_execution_completion() + else: + self.batch_manager.finalize_batch() @event_bus.on(TaskStartedEvent) def on_task_started(source, event): @@ -325,17 +340,19 @@ class TraceCollectionListener(BaseEventListener): self._initialize_batch(user_context, execution_metadata) def _initialize_batch( - self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] + self, user_context: dict[str, str], execution_metadata: dict[str, Any] ): - """Initialize trace batch if ephemeral""" - if not self._check_authenticated(): - self.batch_manager.initialize_batch( + """Initialize trace batch - auto-enable ephemeral for first-time users.""" + + if self.first_time_handler.is_first_time: + return self.batch_manager.initialize_batch( user_context, execution_metadata, use_ephemeral=True ) - else: - self.batch_manager.initialize_batch( - user_context, execution_metadata, use_ephemeral=False - ) + + use_ephemeral = not self._check_authenticated() + return self.batch_manager.initialize_batch( + user_context, execution_metadata, use_ephemeral=use_ephemeral + ) def _handle_trace_event(self, event_type: str, source: Any, event: Any): """Generic handler for context end events""" @@ -371,11 +388,11 @@ class TraceCollectionListener(BaseEventListener): def _build_event_data( self, event_type: str, event: Any, source: Any - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Build event data""" if event_type not in self.complex_events: - return self._safe_serialize_to_dict(event) - elif event_type == "task_started": + return safe_serialize_to_dict(event) + if event_type == "task_started": return { "task_description": event.task.description, "expected_output": event.task.expected_output, @@ -384,7 +401,7 @@ class TraceCollectionListener(BaseEventListener): "agent_role": source.agent.role, "task_id": str(event.task.id), } - elif event_type == "task_completed": + if event_type == "task_completed": return { "task_description": event.task.description if event.task else None, "task_name": event.task.name or event.task.description @@ -397,63 +414,31 @@ class TraceCollectionListener(BaseEventListener): else None, "agent_role": event.output.agent if event.output else None, } - elif event_type == "agent_execution_started": + if event_type == "agent_execution_started": return { "agent_role": event.agent.role, "agent_goal": event.agent.goal, "agent_backstory": event.agent.backstory, } - elif event_type == "agent_execution_completed": + if event_type == "agent_execution_completed": return { "agent_role": event.agent.role, "agent_goal": event.agent.goal, "agent_backstory": event.agent.backstory, } - elif event_type == "llm_call_started": - event_data = self._safe_serialize_to_dict(event) + if event_type == "llm_call_started": + event_data = safe_serialize_to_dict(event) event_data["task_name"] = ( event.task_name or event.task_description if hasattr(event, "task_name") and event.task_name else None ) return event_data - elif event_type == "llm_call_completed": - return self._safe_serialize_to_dict(event) - else: - return { - "event_type": event_type, - "event": self._safe_serialize_to_dict(event), - "source": source, - } + if event_type == "llm_call_completed": + return safe_serialize_to_dict(event) - # TODO: move to utils - def _safe_serialize_to_dict( - self, obj, exclude: set[str] | None = None - ) -> Dict[str, Any]: - """Safely serialize an object to a dictionary for event data.""" - try: - serialized = to_serializable(obj, exclude) - if isinstance(serialized, dict): - return serialized - else: - return {"serialized_data": serialized} - except Exception as e: - return {"serialization_error": str(e), "object_type": type(obj).__name__} - - # TODO: move to utils - def _truncate_messages(self, messages, max_content_length=500, max_messages=5): - """Truncate message content and limit number of messages""" - if not messages or not isinstance(messages, list): - return messages - - # Limit number of messages - limited_messages = messages[:max_messages] - - # Truncate each message content - for msg in limited_messages: - if isinstance(msg, dict) and "content" in msg: - content = msg["content"] - if len(content) > max_content_length: - msg["content"] = content[:max_content_length] + "..." - - return limited_messages + return { + "event_type": event_type, + "event": safe_serialize_to_dict(event), + "source": source, + } diff --git a/src/crewai/events/listeners/tracing/utils.py b/src/crewai/events/listeners/tracing/utils.py index 89a27a7d5..1096db02b 100644 --- a/src/crewai/events/listeners/tracing/utils.py +++ b/src/crewai/events/listeners/tracing/utils.py @@ -1,17 +1,25 @@ +import getpass +import hashlib +import json +import logging import os import platform -import uuid -import hashlib -import subprocess -import getpass -from pathlib import Path -from datetime import datetime import re -import json +import subprocess +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any import click +from rich.console import Console +from rich.panel import Panel +from rich.text import Text from crewai.utilities.paths import db_storage_path +from crewai.utilities.serialization import to_serializable + +logger = logging.getLogger(__name__) def is_tracing_enabled() -> bool: @@ -43,13 +51,11 @@ def _get_machine_id() -> str: try: mac = ":".join( - ["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][ - ::-1 - ] + [f"{(uuid.getnode() >> b) & 0xFF:02x}" for b in range(0, 12, 2)][::-1] ) parts.append(mac) except Exception: - pass + logger.warning("Error getting machine id for fingerprinting") sysname = platform.system() parts.append(sysname) @@ -57,7 +63,7 @@ def _get_machine_id() -> str: try: if sysname == "Darwin": res = subprocess.run( - ["system_profiler", "SPHardwareDataType"], + ["/usr/sbin/system_profiler", "SPHardwareDataType"], capture_output=True, text=True, timeout=2, @@ -72,7 +78,7 @@ def _get_machine_id() -> str: parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip()) elif sysname == "Windows": res = subprocess.run( - ["wmic", "csproduct", "get", "UUID"], + ["C:\\Windows\\System32\\wbem\\wmic.exe", "csproduct", "get", "UUID"], capture_output=True, text=True, timeout=2, @@ -81,7 +87,7 @@ def _get_machine_id() -> str: if len(lines) >= 2: parts.append(lines[1]) except Exception: - pass + logger.exception("Error getting machine ID") return hashlib.sha256("".join(parts).encode()).hexdigest() @@ -97,8 +103,8 @@ def _load_user_data() -> dict: if p.exists(): try: return json.loads(p.read_text()) - except Exception: - pass + except (json.JSONDecodeError, OSError, PermissionError) as e: + logger.warning(f"Failed to load user data: {e}") return {} @@ -106,8 +112,8 @@ def _save_user_data(data: dict) -> None: try: p = _user_data_file() p.write_text(json.dumps(data, indent=2)) - except Exception: - pass + except (OSError, PermissionError) as e: + logger.warning(f"Failed to save user data: {e}") def get_user_id() -> str: @@ -151,3 +157,103 @@ def mark_first_execution_done() -> None: } ) _save_user_data(data) + + +def safe_serialize_to_dict(obj, exclude: set[str] | None = None) -> dict[str, Any]: + """Safely serialize an object to a dictionary for event data.""" + try: + serialized = to_serializable(obj, exclude) + if isinstance(serialized, dict): + return serialized + return {"serialized_data": serialized} + except Exception as e: + return {"serialization_error": str(e), "object_type": type(obj).__name__} + + +def truncate_messages(messages, max_content_length=500, max_messages=5): + """Truncate message content and limit number of messages""" + if not messages or not isinstance(messages, list): + return messages + + limited_messages = messages[:max_messages] + + for msg in limited_messages: + if isinstance(msg, dict) and "content" in msg: + content = msg["content"] + if len(content) > max_content_length: + msg["content"] = content[:max_content_length] + "..." + + return limited_messages + + +def should_auto_collect_first_time_traces() -> bool: + """True if we should auto-collect traces for first-time user.""" + if _is_test_environment(): + return False + return is_first_execution() + + +def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool: + """ + Prompt user if they want to see their traces with timeout. + Returns True if user wants to see traces, False otherwise. + """ + if _is_test_environment(): + return False + + try: + import threading + + console = Console() + + content = Text() + content.append("🔍 ", style="cyan bold") + content.append( + "Detailed execution traces are available!\n\n", style="cyan bold" + ) + content.append("View insights including:\n", style="white") + content.append(" • Agent decision-making process\n", style="bright_blue") + content.append(" • Task execution flow and timing\n", style="bright_blue") + content.append(" • Tool usage details", style="bright_blue") + + panel = Panel( + content, + title="[bold cyan]Execution Traces[/bold cyan]", + border_style="cyan", + padding=(1, 2), + ) + console.print("\n") + console.print(panel) + + prompt_text = click.style( + f"Would you like to view your execution traces? [y/N] ({timeout_seconds}s timeout): ", + fg="white", + bold=True, + ) + click.echo(prompt_text, nl=False) + + result = [False] + + def get_input(): + try: + response = input().strip().lower() + result[0] = response in ["y", "yes"] + except (EOFError, KeyboardInterrupt): + result[0] = False + + input_thread = threading.Thread(target=get_input, daemon=True) + input_thread.start() + input_thread.join(timeout=timeout_seconds) + + if input_thread.is_alive(): + return False + + return result[0] + + except Exception: + return False + + +def mark_first_execution_completed() -> None: + """Mark first execution as completed (called after trace prompt).""" + mark_first_execution_done() diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 735f57282..85bb077ee 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -2,30 +2,22 @@ import asyncio import copy import inspect import logging -from typing import ( - Any, - Callable, - Dict, - Generic, - List, - Optional, - Set, - Type, - TypeVar, - Union, - cast, -) +from collections.abc import Callable +from typing import Any, ClassVar, Generic, TypeVar, cast from uuid import uuid4 from opentelemetry import baggage from opentelemetry.context import attach, detach from pydantic import BaseModel, Field, ValidationError -from crewai.flow.flow_visualizer import plot_flow -from crewai.flow.persistence.base import FlowPersistence -from crewai.flow.types import FlowExecutionData -from crewai.flow.utils import get_possible_return_constants from crewai.events.event_bus import crewai_event_bus +from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, +) +from crewai.events.listeners.tracing.utils import ( + is_tracing_enabled, + should_auto_collect_first_time_traces, +) from crewai.events.types.flow_events import ( FlowCreatedEvent, FlowFinishedEvent, @@ -35,12 +27,10 @@ from crewai.events.types.flow_events import ( MethodExecutionFinishedEvent, MethodExecutionStartedEvent, ) -from crewai.events.listeners.tracing.trace_listener import ( - TraceCollectionListener, -) -from crewai.events.listeners.tracing.utils import ( - is_tracing_enabled, -) +from crewai.flow.flow_visualizer import plot_flow +from crewai.flow.persistence.base import FlowPersistence +from crewai.flow.types import FlowExecutionData +from crewai.flow.utils import get_possible_return_constants from crewai.utilities.printer import Printer logger = logging.getLogger(__name__) @@ -55,16 +45,14 @@ class FlowState(BaseModel): ) -# Type variables with explicit bounds -T = TypeVar( - "T", bound=Union[Dict[str, Any], BaseModel] -) # Generic flow state type parameter +# type variables with explicit bounds +T = TypeVar("T", bound=dict[str, Any] | BaseModel) # Generic flow state type parameter StateT = TypeVar( - "StateT", bound=Union[Dict[str, Any], BaseModel] + "StateT", bound=dict[str, Any] | BaseModel ) # State validation type parameter -def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT: +def ensure_state_type(state: Any, expected_type: type[StateT]) -> StateT: """Ensure state matches expected type with proper validation. Args: @@ -104,7 +92,7 @@ def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT: raise TypeError(f"Invalid expected_type: {expected_type}") -def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: +def start(condition: str | dict | Callable | None = None) -> Callable: """ Marks a method as a flow's starting point. @@ -171,7 +159,7 @@ def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: return decorator -def listen(condition: Union[str, dict, Callable]) -> Callable: +def listen(condition: str | dict | Callable) -> Callable: """ Creates a listener that executes when specified conditions are met. @@ -231,7 +219,7 @@ def listen(condition: Union[str, dict, Callable]) -> Callable: return decorator -def router(condition: Union[str, dict, Callable]) -> Callable: +def router(condition: str | dict | Callable) -> Callable: """ Creates a routing method that directs flow execution based on conditions. @@ -297,7 +285,7 @@ def router(condition: Union[str, dict, Callable]) -> Callable: return decorator -def or_(*conditions: Union[str, dict, Callable]) -> dict: +def or_(*conditions: str | dict | Callable) -> dict: """ Combines multiple conditions with OR logic for flow control. @@ -343,7 +331,7 @@ def or_(*conditions: Union[str, dict, Callable]) -> dict: return {"type": "OR", "methods": methods} -def and_(*conditions: Union[str, dict, Callable]) -> dict: +def and_(*conditions: str | dict | Callable) -> dict: """ Combines multiple conditions with AND logic for flow control. @@ -425,10 +413,10 @@ class FlowMeta(type): if possible_returns: router_paths[attr_name] = possible_returns - setattr(cls, "_start_methods", start_methods) - setattr(cls, "_listeners", listeners) - setattr(cls, "_routers", routers) - setattr(cls, "_router_paths", router_paths) + cls._start_methods = start_methods + cls._listeners = listeners + cls._routers = routers + cls._router_paths = router_paths return cls @@ -436,29 +424,29 @@ class FlowMeta(type): class Flow(Generic[T], metaclass=FlowMeta): """Base class for all flows. - Type parameter T must be either Dict[str, Any] or a subclass of BaseModel.""" + type parameter T must be either dict[str, Any] or a subclass of BaseModel.""" _printer = Printer() - _start_methods: List[str] = [] - _listeners: Dict[str, tuple[str, List[str]]] = {} - _routers: Set[str] = set() - _router_paths: Dict[str, List[str]] = {} - initial_state: Union[Type[T], T, None] = None - name: Optional[str] = None - tracing: Optional[bool] = False + _start_methods: ClassVar[list[str]] = [] + _listeners: ClassVar[dict[str, tuple[str, list[str]]]] = {} + _routers: ClassVar[set[str]] = set() + _router_paths: ClassVar[dict[str, list[str]]] = {} + initial_state: type[T] | T | None = None + name: str | None = None + tracing: bool | None = False - def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: + def __class_getitem__(cls: type["Flow"], item: type[T]) -> type["Flow"]: class _FlowGeneric(cls): # type: ignore - _initial_state_T = item # type: ignore + _initial_state_t = item # type: ignore _FlowGeneric.__name__ = f"{cls.__name__}[{item.__name__}]" return _FlowGeneric def __init__( self, - persistence: Optional[FlowPersistence] = None, - tracing: Optional[bool] = False, + persistence: FlowPersistence | None = None, + tracing: bool | None = False, **kwargs: Any, ) -> None: """Initialize a new Flow instance. @@ -468,18 +456,22 @@ class Flow(Generic[T], metaclass=FlowMeta): **kwargs: Additional state values to initialize or override """ # Initialize basic instance attributes - self._methods: Dict[str, Callable] = {} - self._method_execution_counts: Dict[str, int] = {} - self._pending_and_listeners: Dict[str, Set[str]] = {} - self._method_outputs: List[Any] = [] # List to store all method outputs - self._completed_methods: Set[str] = set() # Track completed methods for reload - self._persistence: Optional[FlowPersistence] = persistence + self._methods: dict[str, Callable] = {} + self._method_execution_counts: dict[str, int] = {} + self._pending_and_listeners: dict[str, set[str]] = {} + self._method_outputs: list[Any] = [] # list to store all method outputs + self._completed_methods: set[str] = set() # Track completed methods for reload + self._persistence: FlowPersistence | None = persistence self._is_execution_resuming: bool = False # Initialize state with initial values self._state = self._create_initial_state() self.tracing = tracing - if is_tracing_enabled() or self.tracing: + if ( + is_tracing_enabled() + or self.tracing + or should_auto_collect_first_time_traces() + ): trace_listener = TraceCollectionListener() trace_listener.setup_listeners(crewai_event_bus) # Apply any additional kwargs @@ -521,25 +513,25 @@ class Flow(Generic[T], metaclass=FlowMeta): TypeError: If state is neither BaseModel nor dictionary """ # Handle case where initial_state is None but we have a type parameter - if self.initial_state is None and hasattr(self, "_initial_state_T"): - state_type = getattr(self, "_initial_state_T") + if self.initial_state is None and hasattr(self, "_initial_state_t"): + state_type = self._initial_state_t if isinstance(state_type, type): if issubclass(state_type, FlowState): # Create instance without id, then set it instance = state_type() if not hasattr(instance, "id"): - setattr(instance, "id", str(uuid4())) + instance.id = str(uuid4()) return cast(T, instance) - elif issubclass(state_type, BaseModel): + if issubclass(state_type, BaseModel): # Create a new type that includes the ID field class StateWithId(state_type, FlowState): # type: ignore pass instance = StateWithId() if not hasattr(instance, "id"): - setattr(instance, "id", str(uuid4())) + instance.id = str(uuid4()) return cast(T, instance) - elif state_type is dict: + if state_type is dict: return cast(T, {"id": str(uuid4())}) # Handle case where no initial state is provided @@ -550,13 +542,13 @@ class Flow(Generic[T], metaclass=FlowMeta): if isinstance(self.initial_state, type): if issubclass(self.initial_state, FlowState): return cast(T, self.initial_state()) # Uses model defaults - elif issubclass(self.initial_state, BaseModel): + if issubclass(self.initial_state, BaseModel): # Validate that the model has an id field model_fields = getattr(self.initial_state, "model_fields", None) if not model_fields or "id" not in model_fields: raise ValueError("Flow state model must have an 'id' field") return cast(T, self.initial_state()) # Uses model defaults - elif self.initial_state is dict: + if self.initial_state is dict: return cast(T, {"id": str(uuid4())}) # Handle dictionary instance case @@ -600,7 +592,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return self._state @property - def method_outputs(self) -> List[Any]: + def method_outputs(self) -> list[Any]: """Returns the list of all outputs from executed methods.""" return self._method_outputs @@ -631,13 +623,13 @@ class Flow(Generic[T], metaclass=FlowMeta): if isinstance(self._state, dict): return str(self._state.get("id", "")) - elif isinstance(self._state, BaseModel): + if isinstance(self._state, BaseModel): return str(getattr(self._state, "id", "")) return "" except (AttributeError, TypeError): return "" # Safely handle any unexpected attribute access issues - def _initialize_state(self, inputs: Dict[str, Any]) -> None: + def _initialize_state(self, inputs: dict[str, Any]) -> None: """Initialize or update flow state with new inputs. Args: @@ -691,7 +683,7 @@ class Flow(Generic[T], metaclass=FlowMeta): else: raise TypeError("State must be a BaseModel instance or a dictionary.") - def _restore_state(self, stored_state: Dict[str, Any]) -> None: + def _restore_state(self, stored_state: dict[str, Any]) -> None: """Restore flow state from persistence. Args: @@ -735,7 +727,7 @@ class Flow(Generic[T], metaclass=FlowMeta): execution_data: Flow execution data containing: - id: Flow execution ID - flow: Flow structure - - completed_methods: List of successfully completed methods + - completed_methods: list of successfully completed methods - execution_methods: All execution methods with their status """ flow_id = execution_data.get("id") @@ -771,7 +763,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if state_to_apply: self._apply_state_updates(state_to_apply) - for i, method in enumerate(sorted_methods[:-1]): + for method in sorted_methods[:-1]: method_name = method.get("flow_method", {}).get("name") if method_name: self._completed_methods.add(method_name) @@ -783,7 +775,7 @@ class Flow(Generic[T], metaclass=FlowMeta): elif hasattr(self._state, field_name): object.__setattr__(self._state, field_name, value) - def _apply_state_updates(self, updates: Dict[str, Any]) -> None: + def _apply_state_updates(self, updates: dict[str, Any]) -> None: """Apply multiple state updates efficiently.""" if isinstance(self._state, dict): self._state.update(updates) @@ -792,7 +784,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if hasattr(self._state, key): object.__setattr__(self._state, key, value) - def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any: + def kickoff(self, inputs: dict[str, Any] | None = None) -> Any: """ Start the flow execution in a synchronous context. @@ -805,7 +797,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return asyncio.run(run_flow()) - async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any: + async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> Any: """ Start the flow execution asynchronously. @@ -840,7 +832,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if isinstance(self._state, dict): self._state["id"] = inputs["id"] elif isinstance(self._state, BaseModel): - setattr(self._state, "id", inputs["id"]) + setattr(self._state, "id", inputs["id"]) # noqa: B010 # If persistence is enabled, attempt to restore the stored state using the provided id. if "id" in inputs and self._persistence is not None: @@ -1075,7 +1067,7 @@ class Flow(Generic[T], metaclass=FlowMeta): ) # Now execute normal listeners for all router results and the original trigger - all_triggers = [trigger_method] + router_results + all_triggers = [trigger_method, *router_results] for current_trigger in all_triggers: if current_trigger: # Skip None results @@ -1109,7 +1101,7 @@ class Flow(Generic[T], metaclass=FlowMeta): def _find_triggered_methods( self, trigger_method: str, router_only: bool - ) -> List[str]: + ) -> list[str]: """ Finds all methods that should be triggered based on conditions. @@ -1126,7 +1118,7 @@ class Flow(Generic[T], metaclass=FlowMeta): Returns ------- - List[str] + list[str] Names of methods that should be triggered. Notes diff --git a/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_user_accepts.yaml b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_user_accepts.yaml new file mode 100644 index 000000000..14c8c07d7 --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_user_accepts.yaml @@ -0,0 +1,130 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + cookie: + - _cfuvid=NaXWifUGChHp6Ap1mvfMrNzmO4HdzddrqXkSR9T.hYo-1754508545647-0.0.1.1-604800000 + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFLBbtswDL37Kzid4yFx46bxbVixtsfssB22wlAl2lEri5okJ+uK/Psg + OY3dtQV2MWA+vqf3SD5lAExJVgETWx5EZ3X++SrcY/Hrcec3l+SKP5frm/16Yx92m6/f9mwWGXR3 + jyI8sz4K6qzGoMgMsHDIA0bVxaq8WCzPyrN5AjqSqCOttSFfUt4po/JiXizz+SpfXBzZW1ICPavg + RwYA8JS+0aeR+JtVkLRSpUPveYusOjUBMEc6Vhj3XvnATWCzERRkAppk/QYM7UFwA63aIXBoo23g + xu/RAfw0X5ThGj6l/wquUWuawXdyWn6YSjpses9jLNNrPQG4MRR4HEsKc3tEDif7mlrr6M7/Q2WN + Mspva4fck4lWfSDLEnrIAG7TmPoXyZl11NlQB3rA9NyiXA16bNzOFD2CgQLXk/qqmL2hV0sMXGk/ + GTQTXGxRjtRxK7yXiiZANkn92s1b2kNyZdr/kR8BIdAGlLV1KJV4mXhscxiP972205STYebR7ZTA + Oih0cRMSG97r4aSYf/QBu7pRpkVnnRruqrF1eT7nzTmW5Zplh+wvAAAA//8DAGKunMhlAwAA + headers: + CF-RAY: + - 980b99a73c1c22c6-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Wed, 17 Sep 2025 21:12:11 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=Ahwkw3J9CDiluZudRgDmybz4FO07eXLz2MQDtkgfct4-1758143531-1.0.1.1-_3e8agfTZW.FPpRMLb1A2nET4OHQEGKNZeGeWT8LIiuSi8R2HWsGsJyueUyzYBYnfHqsfBUO16K1.TkEo2XiqVCaIi6pymeeQxwtXFF1wj8; + path=/; expires=Wed, 17-Sep-25 21:42:11 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=iHqLoc_2sNQLMyzfGCLtGol8vf1Y44xirzQJUuUF_TI-1758143531242-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '419' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '609' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999827' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999830' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_ece5f999e09e4c189d38e5bc08b2fad9 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_with_timeout.yaml b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_with_timeout.yaml new file mode 100644 index 000000000..ebf4af34a --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_collection_with_timeout.yaml @@ -0,0 +1,128 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to + the world\n\nThis is the expected criteria for your final answer: hello world\nyou + MUST return the actual complete content as the final answer, not a summary.\n\nBegin! + This is VERY important to you, use the tools available and give your best Final + Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": + ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '825' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJNj9MwEL3nV4x8blBSmrabG0ICViqCCydYRbPOJDHreIztbIFV/zty + 0m1SPiQukTJv3vN7M/OUAAhVixKE7DDI3ur09dvgfa8P/FO+e/9wa/aHb4cPH9viEw5yK1aRwfdf + SYZn1gvJvdUUFJsJlo4wUFTNd8U+32zybD0CPdekI621Id1w2iuj0nW23qTZLs33Z3bHSpIXJXxO + AACexm/0aWr6LkrIVs+VnrzHlkR5aQIQjnWsCPRe+YAmiNUMSjaBzGj9FgwfQaKBVj0SILTRNqDx + R3IAX8wbZVDDq/G/hI60Zjiy0/VS0FEzeIyhzKD1AkBjOGAcyhjl7oycLuY1t9bxvf+NKhpllO8q + R+jZRKM+sBUjekoA7sYhDVe5hXXc21AFfqDxubzYTXpi3s0CfXkGAwfUi/ruPNprvaqmgEr7xZiF + RNlRPVPnneBQK14AySL1n27+pj0lV6b9H/kZkJJsoLqyjmolrxPPbY7i6f6r7TLl0bDw5B6VpCoo + cnETNTU46OmghP/hA/VVo0xLzjo1XVVjq2KbYbOlorgRySn5BQAA//8DALxsmCBjAwAA + headers: + CF-RAY: + - 980ba79a4ab5f555-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Wed, 17 Sep 2025 21:21:42 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=aMMf0fLckKHz0BLW_2lATxD.7R61uYo1ZVW8aeFbruA-1758144102-1.0.1.1-6EKM3UxpdczoiQ6VpPpqqVnY7ftnXndFRWE4vyTzVcy.CQ4N539D97Wh8Ye9EUAvpUuukhW.r5MznkXq4tPXgCCmEv44RvVz2GBAz_e31h8; + path=/; expires=Wed, 17-Sep-25 21:51:42 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=VqrtvU8.QdEHc4.1XXUVmccaCcoj_CiNfI2zhKJoGRs-1758144102566-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '308' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '620' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999827' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999830' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_fa896433021140238115972280c05651 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_consolidation_logic.yaml b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_consolidation_logic.yaml new file mode 100644 index 000000000..8a73e47fc --- /dev/null +++ b/tests/cassettes/TestTraceListenerSetup.test_first_time_user_trace_consolidation_logic.yaml @@ -0,0 +1,127 @@ +interactions: +- request: + body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour + personal goal is: Test goal\nTo give my best complete final answer to the task + respond using the exact following format:\n\nThought: I now can give a great + answer\nFinal Answer: Your final answer must be the great and the most complete + as possible, it must be outcome described.\n\nI MUST use these formats, my job + depends on it!"}, {"role": "user", "content": "\nCurrent Task: Test task\n\nThis + is the expected criteria for your final answer: test output\nyou MUST return + the actual complete content as the final answer, not a summary.\n\nBegin! This + is VERY important to you, use the tools available and give your best Final Answer, + your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate, zstd + connection: + - keep-alive + content-length: + - '812' + content-type: + - application/json + host: + - api.openai.com + user-agent: + - OpenAI/Python 1.93.0 + x-stainless-arch: + - arm64 + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - MacOS + x-stainless-package-version: + - 1.93.0 + x-stainless-raw-response: + - 'true' + x-stainless-read-timeout: + - '600.0' + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.9 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4WV+JHoVgR95NJD4UvaBgJDrSS2FJclV3bSwP9e + kHYsuU2BXghwZ2c4s8vnDEDoWpQgVCdZ9c7kNx+4v7vb7jafnrrPX25/vtObX48f1Q31m+uFmEUG + PXxHxS+sN4p6Z5A12QOsPErGqFqsl1fF4nJdzBPQU40m0lrH+YLyXludX8wvFvl8nRdXR3ZHWmEQ + JXzNAACe0xl92hofRQlJK1V6DEG2KMpTE4DwZGJFyBB0YGlZzEZQkWW0yfotWNqBkhZavUWQ0Ebb + IG3YoQf4Zt9rKw28TfcSNhgYaGA3nAl6bIYgYyg7GDMBpLXEMg4lRbk/IvuTeUOt8/QQ/qCKRlsd + usqjDGSj0cDkREL3GcB9GtJwlls4T73jiukHpueK5eKgJ8bdTNDLI8jE0kzqq/XsFb2qRpbahMmY + hZKqw3qkjjuRQ61pAmST1H+7eU37kFzb9n/kR0ApdIx15TzWWp0nHts8xq/7r7bTlJNhEdBvtcKK + Nfq4iRobOZjD/kV4Cox91WjbondeH35V46rlai6bFS6X1yLbZ78BAAD//wMAZdfoWWMDAAA= + headers: + CF-RAY: + - 980b9e0c5fa516a0-SJC + Connection: + - keep-alive + Content-Encoding: + - gzip + Content-Type: + - application/json + Date: + - Wed, 17 Sep 2025 21:15:11 GMT + Server: + - cloudflare + Set-Cookie: + - __cf_bm=w6UZxbAZgYg9EFkKPfrSbMK97MB4jfs7YyvcEmgkvak-1758143711-1.0.1.1-j7YC1nvoMKxYK0T.5G2XDF6TXUCPu_HUs4YO9v65r3NHQFIcOaHbQXX4vqabSgynL2tZy23pbZgD8Cdmxhdw9dp4zkAXhU.imP43_pw4dSE; + path=/; expires=Wed, 17-Sep-25 21:45:11 GMT; domain=.api.openai.com; HttpOnly; + Secure; SameSite=None + - _cfuvid=ij9Q8tB7sj2GczANlJ7gbXVjj6hMhz1iVb6oGHuRYu8-1758143711202-0.0.1.1-604800000; + path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + access-control-expose-headers: + - X-Request-ID + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - crewai-iuxna1 + openai-processing-ms: + - '462' + openai-project: + - proj_xitITlrFeen7zjNSzML82h9x + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '665' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-project-tokens: + - '150000000' + x-ratelimit-limit-requests: + - '30000' + x-ratelimit-limit-tokens: + - '150000000' + x-ratelimit-remaining-project-tokens: + - '149999830' + x-ratelimit-remaining-requests: + - '29999' + x-ratelimit-remaining-tokens: + - '149999830' + x-ratelimit-reset-project-tokens: + - 0s + x-ratelimit-reset-requests: + - 2ms + x-ratelimit-reset-tokens: + - 0s + x-request-id: + - req_04536db97c8c4768a200e38c1368c176 + status: + code: 200 + message: OK +version: 1 diff --git a/tests/tracing/test_tracing.py b/tests/tracing/test_tracing.py index 6cf5df2d8..2ac07eb6d 100644 --- a/tests/tracing/test_tracing.py +++ b/tests/tracing/test_tracing.py @@ -1,17 +1,20 @@ import os +from unittest.mock import MagicMock, Mock, patch + import pytest -from unittest.mock import patch, MagicMock - -from crewai import Agent, Task, Crew -from crewai.flow.flow import Flow, start -from crewai.events.listeners.tracing.trace_listener import ( - TraceCollectionListener, +from crewai import Agent, Crew, Task +from crewai.events.listeners.tracing.first_time_trace_handler import ( + FirstTimeTraceHandler, ) from crewai.events.listeners.tracing.trace_batch_manager import ( TraceBatchManager, ) +from crewai.events.listeners.tracing.trace_listener import ( + TraceCollectionListener, +) from crewai.events.listeners.tracing.types import TraceEvent +from crewai.flow.flow import Flow, start class TestTraceListenerSetup: @@ -281,9 +284,9 @@ class TestTraceListenerSetup: ): trace_handlers.append(handler) - assert ( - len(trace_handlers) == 0 - ), f"Found {len(trace_handlers)} trace handlers when tracing should be disabled" + assert len(trace_handlers) == 0, ( + f"Found {len(trace_handlers)} trace handlers when tracing should be disabled" + ) def test_trace_listener_setup_correctly_for_crew(self): """Test that trace listener is set up correctly when enabled""" @@ -403,3 +406,254 @@ class TestTraceListenerSetup: from crewai.events.event_bus import crewai_event_bus crewai_event_bus._handlers.clear() + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls): + """Test first-time user trace collection logic with timeout behavior""" + + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}), + patch( + "crewai.events.listeners.tracing.utils._is_test_environment", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.utils.is_first_execution", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", + return_value=False, + ) as mock_prompt, + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" + ) as mock_mark_completed, + ): + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=True) + + from crewai.events.event_bus import crewai_event_bus + + trace_listener = TraceCollectionListener() + trace_listener.setup_listeners(crewai_event_bus) + + assert trace_listener.first_time_handler.is_first_time is True + assert trace_listener.first_time_handler.collected_events is False + + with ( + patch.object( + trace_listener.first_time_handler, + "handle_execution_completion", + wraps=trace_listener.first_time_handler.handle_execution_completion, + ) as mock_handle_completion, + patch.object( + trace_listener.batch_manager, + "add_event", + wraps=trace_listener.batch_manager.add_event, + ) as mock_add_event, + ): + result = crew.kickoff() + assert result is not None + + assert mock_handle_completion.call_count >= 1 + assert mock_add_event.call_count >= 1 + + assert trace_listener.first_time_handler.collected_events is True + + mock_prompt.assert_called_once_with(timeout_seconds=20) + + mock_mark_completed.assert_called_once() + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_first_time_user_trace_collection_user_accepts(self, mock_plus_api_calls): + """Test first-time user trace collection when user accepts viewing traces""" + + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}), + patch( + "crewai.events.listeners.tracing.utils._is_test_environment", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.utils.is_first_execution", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" + ) as mock_mark_completed, + ): + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Say hello to the world", + expected_output="hello world", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=True) + + from crewai.events.event_bus import crewai_event_bus + + trace_listener = TraceCollectionListener() + trace_listener.setup_listeners(crewai_event_bus) + + assert trace_listener.first_time_handler.is_first_time is True + + with ( + patch.object( + trace_listener.first_time_handler, + "_initialize_backend_and_send_events", + wraps=trace_listener.first_time_handler._initialize_backend_and_send_events, + ) as mock_init_backend, + patch.object( + trace_listener.first_time_handler, "_display_ephemeral_trace_link" + ) as mock_display_link, + patch.object( + trace_listener.first_time_handler, + "handle_execution_completion", + wraps=trace_listener.first_time_handler.handle_execution_completion, + ) as mock_handle_completion, + ): + trace_listener.batch_manager.ephemeral_trace_url = ( + "https://crewai.com/trace/mock-id" + ) + + crew.kickoff() + + assert mock_handle_completion.call_count >= 1, ( + "handle_execution_completion should be called" + ) + + assert trace_listener.first_time_handler.collected_events is True, ( + "Events should be marked as collected" + ) + + mock_init_backend.assert_called_once() + + mock_display_link.assert_called_once() + + mock_mark_completed.assert_called_once() + + @pytest.mark.vcr(filter_headers=["authorization"]) + def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls): + """Test the consolidation logic for first-time users vs regular tracing""" + + with ( + patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}), + patch( + "crewai.events.listeners.tracing.utils._is_test_environment", + return_value=False, + ), + patch( + "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.utils.is_first_execution", + return_value=True, + ), + ): + from crewai.events.event_bus import crewai_event_bus + + crewai_event_bus._handlers.clear() + + trace_listener = TraceCollectionListener() + trace_listener.setup_listeners(crewai_event_bus) + + assert trace_listener.first_time_handler.is_first_time is True + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + llm="gpt-4o-mini", + ) + task = Task( + description="Test task", expected_output="test output", agent=agent + ) + crew = Crew(agents=[agent], tasks=[task]) + + with patch.object(TraceBatchManager, "initialize_batch") as mock_initialize: + result = crew.kickoff() + + assert mock_initialize.call_count >= 1 + assert mock_initialize.call_args_list[0][1]["use_ephemeral"] is True + assert result is not None + + def test_first_time_handler_timeout_behavior(self): + """Test the timeout behavior of the first-time trace prompt""" + + with ( + patch( + "crewai.events.listeners.tracing.utils._is_test_environment", + return_value=False, + ), + patch("threading.Thread") as mock_thread, + ): + from crewai.events.listeners.tracing.utils import ( + prompt_user_for_trace_viewing, + ) + + mock_thread_instance = Mock() + mock_thread_instance.is_alive.return_value = True + mock_thread.return_value = mock_thread_instance + + result = prompt_user_for_trace_viewing(timeout_seconds=5) + + assert result is False + mock_thread.assert_called_once() + call_args = mock_thread.call_args + assert call_args[1]["daemon"] is True + + mock_thread_instance.start.assert_called_once() + mock_thread_instance.join.assert_called_once_with(timeout=5) + mock_thread_instance.is_alive.assert_called_once() + + def test_first_time_handler_graceful_error_handling(self): + """Test graceful error handling in first-time trace logic""" + + with ( + patch( + "crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces", + return_value=True, + ), + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing", + side_effect=Exception("Prompt failed"), + ), + patch( + "crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed" + ) as mock_mark_completed, + ): + handler = FirstTimeTraceHandler() + handler.is_first_time = True + handler.collected_events = True + + handler.handle_execution_completion() + + mock_mark_completed.assert_called_once()