Lorenze/ephemeral trace ask (#3530)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled

* feat(tracing): implement first-time trace handling and improve event management

- Added FirstTimeTraceHandler for managing first-time user trace collection and display.
- Enhanced TraceBatchManager to support ephemeral trace URLs and improved event buffering.
- Updated TraceCollectionListener to utilize the new FirstTimeTraceHandler.
- Refactored type annotations across multiple files for consistency and clarity.
- Improved error handling and logging for trace-related operations.
- Introduced utility functions for trace viewing prompts and first execution checks.

* brought back crew finalize batch events

* refactor(trace): move instance variables to __init__ in TraceBatchManager

- Refactored TraceBatchManager to initialize instance variables in the constructor instead of as class variables.
- Improved clarity and encapsulation of the class state.

* fix(tracing): improve error handling in user data loading and saving

- Enhanced error handling in _load_user_data and _save_user_data functions to log warnings for JSON decoding and file access issues.
- Updated documentation for trace usage to clarify the addition of tracing parameters in Crew and Flow initialization.
- Refined state management in Flow class to ensure proper handling of state IDs when persistence is enabled.

* add some tests

* fix test

* fix tests

* refactor(tracing): enhance user input handling for trace viewing

- Replaced signal-based timeout handling with threading for user input in prompt_user_for_trace_viewing function.
- Improved user experience by allowing a configurable timeout for viewing execution traces.
- Updated tests to mock threading behavior and verify timeout handling correctly.

* fix(tracing): improve machine ID retrieval with error handling

- Added error handling to the _get_machine_id function to log warnings when retrieving the machine ID fails.
- Ensured that the function continues to provide a stable, privacy-preserving machine fingerprint even in case of errors.

* refactor(flow): streamline state ID assignment in Flow class

- Replaced direct attribute assignment with setattr for improved flexibility in handling state IDs.
- Enhanced code readability by simplifying the logic for setting the state ID when persistence is enabled.
This commit is contained in:
Lorenze Jay
2025-09-18 10:17:34 -07:00
committed by GitHub
parent 6f5af2b27c
commit 578fa8c2e4
10 changed files with 1173 additions and 260 deletions

View File

@@ -37,6 +37,7 @@ from crewai.events.listeners.tracing.trace_listener import (
) )
from crewai.events.listeners.tracing.utils import ( from crewai.events.listeners.tracing.utils import (
is_tracing_enabled, is_tracing_enabled,
should_auto_collect_first_time_traces,
) )
from crewai.events.types.crew_events import ( from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent, CrewKickoffCompletedEvent,
@@ -89,8 +90,8 @@ class Crew(FlowTrackable, BaseModel):
tasks they should perform. tasks they should perform.
Attributes: Attributes:
tasks: List of tasks assigned to the crew. tasks: list of tasks assigned to the crew.
agents: List of agents part of this crew. agents: list of agents part of this crew.
manager_llm: The language model that will run manager agent. manager_llm: The language model that will run manager agent.
manager_agent: Custom agent that will be used as manager. manager_agent: Custom agent that will be used as manager.
memory: Whether the crew should use memory to store memories of it's memory: Whether the crew should use memory to store memories of it's
@@ -238,11 +239,11 @@ class Crew(FlowTrackable, BaseModel):
) )
task_execution_output_json_files: list[str] | None = Field( task_execution_output_json_files: list[str] | None = Field(
default=None, default=None,
description="List of file paths for task execution JSON files.", description="list of file paths for task execution JSON files.",
) )
execution_logs: list[dict[str, Any]] = Field( execution_logs: list[dict[str, Any]] = Field(
default=[], default=[],
description="List of execution logs for tasks", description="list of execution logs for tasks",
) )
knowledge_sources: list[BaseKnowledgeSource] | None = Field( knowledge_sources: list[BaseKnowledgeSource] | None = Field(
default=None, default=None,
@@ -296,12 +297,16 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after") @model_validator(mode="after")
def set_private_attrs(self) -> "Crew": def set_private_attrs(self) -> "Crew":
"""Set private attributes.""" """set private attributes."""
self._cache_handler = CacheHandler() self._cache_handler = CacheHandler()
event_listener = EventListener() event_listener = EventListener()
if is_tracing_enabled() or self.tracing: if (
is_tracing_enabled()
or self.tracing
or should_auto_collect_first_time_traces()
):
trace_listener = TraceCollectionListener() trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus) trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose event_listener.verbose = self.verbose

View File

@@ -0,0 +1,174 @@
import logging
import uuid
from rich.console import Console
from rich.panel import Panel
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
from crewai.events.listeners.tracing.utils import (
mark_first_execution_completed,
prompt_user_for_trace_viewing,
should_auto_collect_first_time_traces,
)
logger = logging.getLogger(__name__)
class FirstTimeTraceHandler:
"""Handles the first-time user trace collection and display flow."""
def __init__(self):
self.is_first_time: bool = False
self.collected_events: bool = False
self.trace_batch_id: str | None = None
self.ephemeral_url: str | None = None
self.batch_manager: TraceBatchManager | None = None
def initialize_for_first_time_user(self) -> bool:
"""Check if this is first time and initialize collection."""
self.is_first_time = should_auto_collect_first_time_traces()
return self.is_first_time
def set_batch_manager(self, batch_manager: TraceBatchManager):
"""Set reference to batch manager for sending events."""
self.batch_manager = batch_manager
def mark_events_collected(self):
"""Mark that events have been collected during execution."""
self.collected_events = True
def handle_execution_completion(self):
"""Handle the completion flow as shown in your diagram."""
if not self.is_first_time or not self.collected_events:
return
try:
user_wants_traces = prompt_user_for_trace_viewing(timeout_seconds=20)
if user_wants_traces:
self._initialize_backend_and_send_events()
if self.ephemeral_url:
self._display_ephemeral_trace_link()
mark_first_execution_completed()
except Exception as e:
self._gracefully_fail(f"Error in trace handling: {e}")
mark_first_execution_completed()
def _initialize_backend_and_send_events(self):
"""Initialize backend batch and send collected events."""
if not self.batch_manager:
return
try:
if not self.batch_manager.backend_initialized:
original_metadata = (
self.batch_manager.current_batch.execution_metadata
if self.batch_manager.current_batch
else {}
)
user_context = {
"privacy_level": "standard",
"user_id": "first_time_user",
"session_id": str(uuid.uuid4()),
"trace_id": self.batch_manager.trace_batch_id,
}
execution_metadata = {
"execution_type": original_metadata.get("execution_type", "crew"),
"crew_name": original_metadata.get(
"crew_name", "First Time Execution"
),
"flow_name": original_metadata.get("flow_name"),
"agent_count": original_metadata.get("agent_count", 1),
"task_count": original_metadata.get("task_count", 1),
"crewai_version": original_metadata.get("crewai_version"),
}
self.batch_manager._initialize_backend_batch(
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
)
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
self.batch_manager.finalize_batch()
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
def _display_ephemeral_trace_link(self):
"""Display the ephemeral trace link to the user."""
console = Console()
panel_content = f"""
🎉 Your First CrewAI Execution Trace is Ready!
View your execution details here:
{self.ephemeral_url}
This trace shows:
• Agent decisions and interactions
• Task execution timeline
• Tool usage and results
• LLM calls and responses
To use traces add tracing=True to your Crew(tracing=True) / Flow(tracing=True)
📝 Note: This link will expire in 24 hours.
""".strip()
panel = Panel(
panel_content,
title="🔍 Execution Trace Generated",
border_style="bright_green",
padding=(1, 2),
)
console.print("\n")
console.print(panel)
console.print()
def _gracefully_fail(self, error_message: str):
"""Handle errors gracefully without disrupting user experience."""
console = Console()
console.print(f"[yellow]Note: {error_message}[/yellow]")
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self):
"""Show message when traces were collected locally but couldn't be uploaded."""
console = Console()
panel_content = f"""
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
The traces include agent decisions, task execution, and tool usage.
Try running with CREWAI_TRACING_ENABLED=true next time for persistent traces.
""".strip()
panel = Panel(
panel_content,
title="🔍 Local Traces Collected",
border_style="yellow",
padding=(1, 2),
)
console.print("\n")
console.print(panel)
console.print()

View File

@@ -1,18 +1,18 @@
import uuid import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from typing import Any
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
from rich.console import Console from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.plus_api import PlusAPI
from crewai.cli.version import get_crewai_version
from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger from crewai.events.listeners.tracing.utils import should_auto_collect_first_time_traces
from crewai.utilities.constants import CREWAI_BASE_URL
logger = getLogger(__name__) logger = getLogger(__name__)
@@ -23,11 +23,11 @@ class TraceBatch:
version: str = field(default_factory=get_crewai_version) version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4())) batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict) user_context: dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict) execution_metadata: dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list) events: list[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
"version": self.version, "version": self.version,
"batch_id": self.batch_id, "batch_id": self.batch_id,
@@ -40,26 +40,28 @@ class TraceBatch:
class TraceBatchManager: class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering""" """Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
def __init__(self): def __init__(self):
self.is_current_batch_ephemeral: bool = False
self.trace_batch_id: str | None = None
self.current_batch: TraceBatch | None = None
self.event_buffer: list[TraceEvent] = []
self.execution_start_times: dict[str, datetime] = {}
self.batch_owner_type: str | None = None
self.batch_owner_id: str | None = None
self.backend_initialized: bool = False
self.ephemeral_trace_url: str | None = None
try: try:
self.plus_api = PlusAPI( self.plus_api = PlusAPI(
api_key=get_auth_token(), api_key=get_auth_token(),
) )
except AuthError: except AuthError:
self.plus_api = PlusAPI(api_key="") self.plus_api = PlusAPI(api_key="")
self.ephemeral_trace_url = None
def initialize_batch( def initialize_batch(
self, self,
user_context: Dict[str, str], user_context: dict[str, str],
execution_metadata: Dict[str, Any], execution_metadata: dict[str, Any],
use_ephemeral: bool = False, use_ephemeral: bool = False,
) -> TraceBatch: ) -> TraceBatch:
"""Initialize a new trace batch""" """Initialize a new trace batch"""
@@ -70,14 +72,21 @@ class TraceBatchManager:
self.is_current_batch_ephemeral = use_ephemeral self.is_current_batch_ephemeral = use_ephemeral
self.record_start_time("execution") self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
if should_auto_collect_first_time_traces():
self.trace_batch_id = self.current_batch.batch_id
else:
self._initialize_backend_batch(
user_context, execution_metadata, use_ephemeral
)
self.backend_initialized = True
return self.current_batch return self.current_batch
def _initialize_backend_batch( def _initialize_backend_batch(
self, self,
user_context: Dict[str, str], user_context: dict[str, str],
execution_metadata: Dict[str, Any], execution_metadata: dict[str, Any],
use_ephemeral: bool = False, use_ephemeral: bool = False,
): ):
"""Send batch initialization to backend""" """Send batch initialization to backend"""
@@ -143,7 +152,7 @@ class TraceBatchManager:
except Exception as e: except Exception as e:
logger.warning( logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing." f"Error initializing trace batch: {e}. Continuing without tracing."
) )
def add_event(self, trace_event: TraceEvent): def add_event(self, trace_event: TraceEvent):
@@ -154,7 +163,6 @@ class TraceBatchManager:
"""Send buffered events to backend with graceful failure handling""" """Send buffered events to backend with graceful failure handling"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer: if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return 500 return 500
try: try:
payload = { payload = {
"events": [event.to_dict() for event in self.event_buffer], "events": [event.to_dict() for event in self.event_buffer],
@@ -178,19 +186,19 @@ class TraceBatchManager:
if response.status_code in [200, 201]: if response.status_code in [200, 201]:
self.event_buffer.clear() self.event_buffer.clear()
return 200 return 200
else:
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.warning( logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost." f"Failed to send events: {response.status_code}. Events will be lost."
) )
return 500 return 500
def finalize_batch(self) -> Optional[TraceBatch]: except Exception as e:
logger.warning(
f"Error sending events to backend: {e}. Events will be lost."
)
return 500
def finalize_batch(self) -> TraceBatch | None:
"""Finalize batch and return it for sending""" """Finalize batch and return it for sending"""
if not self.current_batch: if not self.current_batch:
return None return None
@@ -246,6 +254,10 @@ class TraceBatchManager:
if not self.is_current_batch_ephemeral and access_code is None if not self.is_current_batch_ephemeral and access_code is None
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
) )
if self.is_current_batch_ephemeral:
self.ephemeral_trace_url = return_link
panel = Panel( panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization", title="Trace Batch Finalization",
@@ -259,8 +271,8 @@ class TraceBatchManager:
) )
except Exception as e: except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {str(e)}") logger.error(f"❌ Error finalizing trace batch: {e}")
# TODO: send error to app # TODO: send error to app marking as failed
def _cleanup_batch_data(self): def _cleanup_batch_data(self):
"""Clean up batch data after successful finalization to free memory""" """Clean up batch data after successful finalization to free memory"""
@@ -277,7 +289,7 @@ class TraceBatchManager:
self.batch_sequence = 0 self.batch_sequence = 0
except Exception as e: except Exception as e:
logger.error(f"Warning: Error during cleanup: {str(e)}") logger.error(f"Warning: Error during cleanup: {e}")
def has_events(self) -> bool: def has_events(self) -> bool:
"""Check if there are events in the buffer""" """Check if there are events in the buffer"""
@@ -306,7 +318,7 @@ class TraceBatchManager:
return duration_ms return duration_ms
return 0 return 0
def get_trace_id(self) -> Optional[str]: def get_trace_id(self) -> str | None:
"""Get current trace ID""" """Get current trace ID"""
if self.current_batch: if self.current_batch:
return self.current_batch.user_context.get("trace_id") return self.current_batch.user_context.get("trace_id")

View File

@@ -1,28 +1,59 @@
import os import os
import uuid import uuid
from typing import Any, ClassVar
from typing import Dict, Any, Optional from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.agent_events import ( from crewai.events.listeners.tracing.first_time_trace_handler import (
AgentExecutionCompletedEvent, FirstTimeTraceHandler,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
) )
from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.reasoning_events import ( from crewai.events.listeners.tracing.utils import safe_serialize_to_dict
AgentReasoningStartedEvent, from crewai.events.types.agent_events import (
AgentReasoningCompletedEvent, AgentExecutionCompletedEvent,
AgentReasoningFailedEvent, AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
) )
from crewai.events.types.crew_events import ( from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent, CrewKickoffCompletedEvent,
CrewKickoffFailedEvent, CrewKickoffFailedEvent,
CrewKickoffStartedEvent, CrewKickoffStartedEvent,
) )
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryQueryStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
AgentReasoningStartedEvent,
)
from crewai.events.types.task_events import ( from crewai.events.types.task_events import (
TaskCompletedEvent, TaskCompletedEvent,
TaskFailedEvent, TaskFailedEvent,
@@ -33,49 +64,16 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent, ToolUsageFinishedEvent,
ToolUsageStartedEvent, ToolUsageStartedEvent,
) )
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager from .trace_batch_manager import TraceBatchManager
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
class TraceCollectionListener(BaseEventListener): class TraceCollectionListener(BaseEventListener):
""" """
Trace collection listener that orchestrates trace collection Trace collection listener that orchestrates trace collection
""" """
complex_events = [ complex_events: ClassVar[list[str]] = [
"task_started", "task_started",
"task_completed", "task_completed",
"llm_call_started", "llm_call_started",
@@ -88,14 +86,14 @@ class TraceCollectionListener(BaseEventListener):
_initialized = False _initialized = False
_listeners_setup = False _listeners_setup = False
def __new__(cls, batch_manager=None): def __new__(cls, batch_manager: TraceBatchManager | None = None):
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
return cls._instance return cls._instance
def __init__( def __init__(
self, self,
batch_manager: Optional[TraceBatchManager] = None, batch_manager: TraceBatchManager | None = None,
): ):
if self._initialized: if self._initialized:
return return
@@ -103,16 +101,19 @@ class TraceCollectionListener(BaseEventListener):
super().__init__() super().__init__()
self.batch_manager = batch_manager or TraceBatchManager() self.batch_manager = batch_manager or TraceBatchManager()
self._initialized = True self._initialized = True
self.first_time_handler = FirstTimeTraceHandler()
if self.first_time_handler.initialize_for_first_time_user():
self.first_time_handler.set_batch_manager(self.batch_manager)
def _check_authenticated(self) -> bool: def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled""" """Check if tracing should be enabled"""
try: try:
res = bool(get_auth_token()) return bool(get_auth_token())
return res
except AuthError: except AuthError:
return False return False
def _get_user_context(self) -> Dict[str, str]: def _get_user_context(self) -> dict[str, str]:
"""Extract user context for tracing""" """Extract user context for tracing"""
return { return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"), "user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
@@ -161,8 +162,14 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent) @event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event): def on_flow_finished(source, event):
self._handle_trace_event("flow_finished", source, event) self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow": if self.batch_manager.batch_owner_type == "flow":
self.batch_manager.finalize_batch() if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
# Normal flow finalization
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent) @event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event): def on_flow_plot(source, event):
@@ -181,12 +188,20 @@ class TraceCollectionListener(BaseEventListener):
def on_crew_completed(source, event): def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event) self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.batch_owner_type == "crew": if self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch() if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent) @event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event): def on_crew_failed(source, event):
self._handle_trace_event("crew_kickoff_failed", source, event) self._handle_trace_event("crew_kickoff_failed", source, event)
self.batch_manager.finalize_batch() if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent) @event_bus.on(TaskStartedEvent)
def on_task_started(source, event): def on_task_started(source, event):
@@ -325,17 +340,19 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata) self._initialize_batch(user_context, execution_metadata)
def _initialize_batch( def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] self, user_context: dict[str, str], execution_metadata: dict[str, Any]
): ):
"""Initialize trace batch if ephemeral""" """Initialize trace batch - auto-enable ephemeral for first-time users."""
if not self._check_authenticated():
self.batch_manager.initialize_batch( if self.first_time_handler.is_first_time:
return self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True user_context, execution_metadata, use_ephemeral=True
) )
else:
self.batch_manager.initialize_batch( use_ephemeral = not self._check_authenticated()
user_context, execution_metadata, use_ephemeral=False return self.batch_manager.initialize_batch(
) user_context, execution_metadata, use_ephemeral=use_ephemeral
)
def _handle_trace_event(self, event_type: str, source: Any, event: Any): def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events""" """Generic handler for context end events"""
@@ -371,11 +388,11 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data( def _build_event_data(
self, event_type: str, event: Any, source: Any self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]: ) -> dict[str, Any]:
"""Build event data""" """Build event data"""
if event_type not in self.complex_events: if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event) return safe_serialize_to_dict(event)
elif event_type == "task_started": if event_type == "task_started":
return { return {
"task_description": event.task.description, "task_description": event.task.description,
"expected_output": event.task.expected_output, "expected_output": event.task.expected_output,
@@ -384,7 +401,7 @@ class TraceCollectionListener(BaseEventListener):
"agent_role": source.agent.role, "agent_role": source.agent.role,
"task_id": str(event.task.id), "task_id": str(event.task.id),
} }
elif event_type == "task_completed": if event_type == "task_completed":
return { return {
"task_description": event.task.description if event.task else None, "task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description "task_name": event.task.name or event.task.description
@@ -397,63 +414,31 @@ class TraceCollectionListener(BaseEventListener):
else None, else None,
"agent_role": event.output.agent if event.output else None, "agent_role": event.output.agent if event.output else None,
} }
elif event_type == "agent_execution_started": if event_type == "agent_execution_started":
return { return {
"agent_role": event.agent.role, "agent_role": event.agent.role,
"agent_goal": event.agent.goal, "agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory, "agent_backstory": event.agent.backstory,
} }
elif event_type == "agent_execution_completed": if event_type == "agent_execution_completed":
return { return {
"agent_role": event.agent.role, "agent_role": event.agent.role,
"agent_goal": event.agent.goal, "agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory, "agent_backstory": event.agent.backstory,
} }
elif event_type == "llm_call_started": if event_type == "llm_call_started":
event_data = self._safe_serialize_to_dict(event) event_data = safe_serialize_to_dict(event)
event_data["task_name"] = ( event_data["task_name"] = (
event.task_name or event.task_description event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name if hasattr(event, "task_name") and event.task_name
else None else None
) )
return event_data return event_data
elif event_type == "llm_call_completed": if event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event) return safe_serialize_to_dict(event)
else:
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
# TODO: move to utils return {
def _safe_serialize_to_dict( "event_type": event_type,
self, obj, exclude: set[str] | None = None "event": safe_serialize_to_dict(event),
) -> Dict[str, Any]: "source": source,
"""Safely serialize an object to a dictionary for event data.""" }
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
else:
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
# Limit number of messages
limited_messages = messages[:max_messages]
# Truncate each message content
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages

View File

@@ -1,17 +1,25 @@
import getpass
import hashlib
import json
import logging
import os import os
import platform import platform
import uuid
import hashlib
import subprocess
import getpass
from pathlib import Path
from datetime import datetime
import re import re
import json import subprocess
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
import click import click
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
from crewai.utilities.serialization import to_serializable
logger = logging.getLogger(__name__)
def is_tracing_enabled() -> bool: def is_tracing_enabled() -> bool:
@@ -43,13 +51,11 @@ def _get_machine_id() -> str:
try: try:
mac = ":".join( mac = ":".join(
["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][ [f"{(uuid.getnode() >> b) & 0xFF:02x}" for b in range(0, 12, 2)][::-1]
::-1
]
) )
parts.append(mac) parts.append(mac)
except Exception: except Exception:
pass logger.warning("Error getting machine id for fingerprinting")
sysname = platform.system() sysname = platform.system()
parts.append(sysname) parts.append(sysname)
@@ -57,7 +63,7 @@ def _get_machine_id() -> str:
try: try:
if sysname == "Darwin": if sysname == "Darwin":
res = subprocess.run( res = subprocess.run(
["system_profiler", "SPHardwareDataType"], ["/usr/sbin/system_profiler", "SPHardwareDataType"],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=2, timeout=2,
@@ -72,7 +78,7 @@ def _get_machine_id() -> str:
parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip()) parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip())
elif sysname == "Windows": elif sysname == "Windows":
res = subprocess.run( res = subprocess.run(
["wmic", "csproduct", "get", "UUID"], ["C:\\Windows\\System32\\wbem\\wmic.exe", "csproduct", "get", "UUID"],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=2, timeout=2,
@@ -81,7 +87,7 @@ def _get_machine_id() -> str:
if len(lines) >= 2: if len(lines) >= 2:
parts.append(lines[1]) parts.append(lines[1])
except Exception: except Exception:
pass logger.exception("Error getting machine ID")
return hashlib.sha256("".join(parts).encode()).hexdigest() return hashlib.sha256("".join(parts).encode()).hexdigest()
@@ -97,8 +103,8 @@ def _load_user_data() -> dict:
if p.exists(): if p.exists():
try: try:
return json.loads(p.read_text()) return json.loads(p.read_text())
except Exception: except (json.JSONDecodeError, OSError, PermissionError) as e:
pass logger.warning(f"Failed to load user data: {e}")
return {} return {}
@@ -106,8 +112,8 @@ def _save_user_data(data: dict) -> None:
try: try:
p = _user_data_file() p = _user_data_file()
p.write_text(json.dumps(data, indent=2)) p.write_text(json.dumps(data, indent=2))
except Exception: except (OSError, PermissionError) as e:
pass logger.warning(f"Failed to save user data: {e}")
def get_user_id() -> str: def get_user_id() -> str:
@@ -151,3 +157,103 @@ def mark_first_execution_done() -> None:
} }
) )
_save_user_data(data) _save_user_data(data)
def safe_serialize_to_dict(obj, exclude: set[str] | None = None) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
def truncate_messages(messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
limited_messages = messages[:max_messages]
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages
def should_auto_collect_first_time_traces() -> bool:
"""True if we should auto-collect traces for first-time user."""
if _is_test_environment():
return False
return is_first_execution()
def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
"""
Prompt user if they want to see their traces with timeout.
Returns True if user wants to see traces, False otherwise.
"""
if _is_test_environment():
return False
try:
import threading
console = Console()
content = Text()
content.append("🔍 ", style="cyan bold")
content.append(
"Detailed execution traces are available!\n\n", style="cyan bold"
)
content.append("View insights including:\n", style="white")
content.append(" • Agent decision-making process\n", style="bright_blue")
content.append(" • Task execution flow and timing\n", style="bright_blue")
content.append(" • Tool usage details", style="bright_blue")
panel = Panel(
content,
title="[bold cyan]Execution Traces[/bold cyan]",
border_style="cyan",
padding=(1, 2),
)
console.print("\n")
console.print(panel)
prompt_text = click.style(
f"Would you like to view your execution traces? [y/N] ({timeout_seconds}s timeout): ",
fg="white",
bold=True,
)
click.echo(prompt_text, nl=False)
result = [False]
def get_input():
try:
response = input().strip().lower()
result[0] = response in ["y", "yes"]
except (EOFError, KeyboardInterrupt):
result[0] = False
input_thread = threading.Thread(target=get_input, daemon=True)
input_thread.start()
input_thread.join(timeout=timeout_seconds)
if input_thread.is_alive():
return False
return result[0]
except Exception:
return False
def mark_first_execution_completed() -> None:
"""Mark first execution as completed (called after trace prompt)."""
mark_first_execution_done()

View File

@@ -2,30 +2,22 @@ import asyncio
import copy import copy
import inspect import inspect
import logging import logging
from typing import ( from collections.abc import Callable
Any, from typing import Any, ClassVar, Generic, TypeVar, cast
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from uuid import uuid4 from uuid import uuid4
from opentelemetry import baggage from opentelemetry import baggage
from opentelemetry.context import attach, detach from opentelemetry.context import attach, detach
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
should_auto_collect_first_time_traces,
)
from crewai.events.types.flow_events import ( from crewai.events.types.flow_events import (
FlowCreatedEvent, FlowCreatedEvent,
FlowFinishedEvent, FlowFinishedEvent,
@@ -35,12 +27,10 @@ from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent, MethodExecutionFinishedEvent,
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
) )
from crewai.events.listeners.tracing.trace_listener import ( from crewai.flow.flow_visualizer import plot_flow
TraceCollectionListener, from crewai.flow.persistence.base import FlowPersistence
) from crewai.flow.types import FlowExecutionData
from crewai.events.listeners.tracing.utils import ( from crewai.flow.utils import get_possible_return_constants
is_tracing_enabled,
)
from crewai.utilities.printer import Printer from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -55,16 +45,14 @@ class FlowState(BaseModel):
) )
# Type variables with explicit bounds # type variables with explicit bounds
T = TypeVar( T = TypeVar("T", bound=dict[str, Any] | BaseModel) # Generic flow state type parameter
"T", bound=Union[Dict[str, Any], BaseModel]
) # Generic flow state type parameter
StateT = TypeVar( StateT = TypeVar(
"StateT", bound=Union[Dict[str, Any], BaseModel] "StateT", bound=dict[str, Any] | BaseModel
) # State validation type parameter ) # State validation type parameter
def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT: def ensure_state_type(state: Any, expected_type: type[StateT]) -> StateT:
"""Ensure state matches expected type with proper validation. """Ensure state matches expected type with proper validation.
Args: Args:
@@ -104,7 +92,7 @@ def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT:
raise TypeError(f"Invalid expected_type: {expected_type}") raise TypeError(f"Invalid expected_type: {expected_type}")
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: def start(condition: str | dict | Callable | None = None) -> Callable:
""" """
Marks a method as a flow's starting point. Marks a method as a flow's starting point.
@@ -171,7 +159,7 @@ def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
return decorator return decorator
def listen(condition: Union[str, dict, Callable]) -> Callable: def listen(condition: str | dict | Callable) -> Callable:
""" """
Creates a listener that executes when specified conditions are met. Creates a listener that executes when specified conditions are met.
@@ -231,7 +219,7 @@ def listen(condition: Union[str, dict, Callable]) -> Callable:
return decorator return decorator
def router(condition: Union[str, dict, Callable]) -> Callable: def router(condition: str | dict | Callable) -> Callable:
""" """
Creates a routing method that directs flow execution based on conditions. Creates a routing method that directs flow execution based on conditions.
@@ -297,7 +285,7 @@ def router(condition: Union[str, dict, Callable]) -> Callable:
return decorator return decorator
def or_(*conditions: Union[str, dict, Callable]) -> dict: def or_(*conditions: str | dict | Callable) -> dict:
""" """
Combines multiple conditions with OR logic for flow control. Combines multiple conditions with OR logic for flow control.
@@ -343,7 +331,7 @@ def or_(*conditions: Union[str, dict, Callable]) -> dict:
return {"type": "OR", "methods": methods} return {"type": "OR", "methods": methods}
def and_(*conditions: Union[str, dict, Callable]) -> dict: def and_(*conditions: str | dict | Callable) -> dict:
""" """
Combines multiple conditions with AND logic for flow control. Combines multiple conditions with AND logic for flow control.
@@ -425,10 +413,10 @@ class FlowMeta(type):
if possible_returns: if possible_returns:
router_paths[attr_name] = possible_returns router_paths[attr_name] = possible_returns
setattr(cls, "_start_methods", start_methods) cls._start_methods = start_methods
setattr(cls, "_listeners", listeners) cls._listeners = listeners
setattr(cls, "_routers", routers) cls._routers = routers
setattr(cls, "_router_paths", router_paths) cls._router_paths = router_paths
return cls return cls
@@ -436,29 +424,29 @@ class FlowMeta(type):
class Flow(Generic[T], metaclass=FlowMeta): class Flow(Generic[T], metaclass=FlowMeta):
"""Base class for all flows. """Base class for all flows.
Type parameter T must be either Dict[str, Any] or a subclass of BaseModel.""" type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
_printer = Printer() _printer = Printer()
_start_methods: List[str] = [] _start_methods: ClassVar[list[str]] = []
_listeners: Dict[str, tuple[str, List[str]]] = {} _listeners: ClassVar[dict[str, tuple[str, list[str]]]] = {}
_routers: Set[str] = set() _routers: ClassVar[set[str]] = set()
_router_paths: Dict[str, List[str]] = {} _router_paths: ClassVar[dict[str, list[str]]] = {}
initial_state: Union[Type[T], T, None] = None initial_state: type[T] | T | None = None
name: Optional[str] = None name: str | None = None
tracing: Optional[bool] = False tracing: bool | None = False
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]: def __class_getitem__(cls: type["Flow"], item: type[T]) -> type["Flow"]:
class _FlowGeneric(cls): # type: ignore class _FlowGeneric(cls): # type: ignore
_initial_state_T = item # type: ignore _initial_state_t = item # type: ignore
_FlowGeneric.__name__ = f"{cls.__name__}[{item.__name__}]" _FlowGeneric.__name__ = f"{cls.__name__}[{item.__name__}]"
return _FlowGeneric return _FlowGeneric
def __init__( def __init__(
self, self,
persistence: Optional[FlowPersistence] = None, persistence: FlowPersistence | None = None,
tracing: Optional[bool] = False, tracing: bool | None = False,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
"""Initialize a new Flow instance. """Initialize a new Flow instance.
@@ -468,18 +456,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
**kwargs: Additional state values to initialize or override **kwargs: Additional state values to initialize or override
""" """
# Initialize basic instance attributes # Initialize basic instance attributes
self._methods: Dict[str, Callable] = {} self._methods: dict[str, Callable] = {}
self._method_execution_counts: Dict[str, int] = {} self._method_execution_counts: dict[str, int] = {}
self._pending_and_listeners: Dict[str, Set[str]] = {} self._pending_and_listeners: dict[str, set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs self._method_outputs: list[Any] = [] # list to store all method outputs
self._completed_methods: Set[str] = set() # Track completed methods for reload self._completed_methods: set[str] = set() # Track completed methods for reload
self._persistence: Optional[FlowPersistence] = persistence self._persistence: FlowPersistence | None = persistence
self._is_execution_resuming: bool = False self._is_execution_resuming: bool = False
# Initialize state with initial values # Initialize state with initial values
self._state = self._create_initial_state() self._state = self._create_initial_state()
self.tracing = tracing self.tracing = tracing
if is_tracing_enabled() or self.tracing: if (
is_tracing_enabled()
or self.tracing
or should_auto_collect_first_time_traces()
):
trace_listener = TraceCollectionListener() trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus) trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs # Apply any additional kwargs
@@ -521,25 +513,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
TypeError: If state is neither BaseModel nor dictionary TypeError: If state is neither BaseModel nor dictionary
""" """
# Handle case where initial_state is None but we have a type parameter # Handle case where initial_state is None but we have a type parameter
if self.initial_state is None and hasattr(self, "_initial_state_T"): if self.initial_state is None and hasattr(self, "_initial_state_t"):
state_type = getattr(self, "_initial_state_T") state_type = self._initial_state_t
if isinstance(state_type, type): if isinstance(state_type, type):
if issubclass(state_type, FlowState): if issubclass(state_type, FlowState):
# Create instance without id, then set it # Create instance without id, then set it
instance = state_type() instance = state_type()
if not hasattr(instance, "id"): if not hasattr(instance, "id"):
setattr(instance, "id", str(uuid4())) instance.id = str(uuid4())
return cast(T, instance) return cast(T, instance)
elif issubclass(state_type, BaseModel): if issubclass(state_type, BaseModel):
# Create a new type that includes the ID field # Create a new type that includes the ID field
class StateWithId(state_type, FlowState): # type: ignore class StateWithId(state_type, FlowState): # type: ignore
pass pass
instance = StateWithId() instance = StateWithId()
if not hasattr(instance, "id"): if not hasattr(instance, "id"):
setattr(instance, "id", str(uuid4())) instance.id = str(uuid4())
return cast(T, instance) return cast(T, instance)
elif state_type is dict: if state_type is dict:
return cast(T, {"id": str(uuid4())}) return cast(T, {"id": str(uuid4())})
# Handle case where no initial state is provided # Handle case where no initial state is provided
@@ -550,13 +542,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(self.initial_state, type): if isinstance(self.initial_state, type):
if issubclass(self.initial_state, FlowState): if issubclass(self.initial_state, FlowState):
return cast(T, self.initial_state()) # Uses model defaults return cast(T, self.initial_state()) # Uses model defaults
elif issubclass(self.initial_state, BaseModel): if issubclass(self.initial_state, BaseModel):
# Validate that the model has an id field # Validate that the model has an id field
model_fields = getattr(self.initial_state, "model_fields", None) model_fields = getattr(self.initial_state, "model_fields", None)
if not model_fields or "id" not in model_fields: if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field") raise ValueError("Flow state model must have an 'id' field")
return cast(T, self.initial_state()) # Uses model defaults return cast(T, self.initial_state()) # Uses model defaults
elif self.initial_state is dict: if self.initial_state is dict:
return cast(T, {"id": str(uuid4())}) return cast(T, {"id": str(uuid4())})
# Handle dictionary instance case # Handle dictionary instance case
@@ -600,7 +592,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._state return self._state
@property @property
def method_outputs(self) -> List[Any]: def method_outputs(self) -> list[Any]:
"""Returns the list of all outputs from executed methods.""" """Returns the list of all outputs from executed methods."""
return self._method_outputs return self._method_outputs
@@ -631,13 +623,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(self._state, dict): if isinstance(self._state, dict):
return str(self._state.get("id", "")) return str(self._state.get("id", ""))
elif isinstance(self._state, BaseModel): if isinstance(self._state, BaseModel):
return str(getattr(self._state, "id", "")) return str(getattr(self._state, "id", ""))
return "" return ""
except (AttributeError, TypeError): except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues return "" # Safely handle any unexpected attribute access issues
def _initialize_state(self, inputs: Dict[str, Any]) -> None: def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs. """Initialize or update flow state with new inputs.
Args: Args:
@@ -691,7 +683,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
else: else:
raise TypeError("State must be a BaseModel instance or a dictionary.") raise TypeError("State must be a BaseModel instance or a dictionary.")
def _restore_state(self, stored_state: Dict[str, Any]) -> None: def _restore_state(self, stored_state: dict[str, Any]) -> None:
"""Restore flow state from persistence. """Restore flow state from persistence.
Args: Args:
@@ -735,7 +727,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
execution_data: Flow execution data containing: execution_data: Flow execution data containing:
- id: Flow execution ID - id: Flow execution ID
- flow: Flow structure - flow: Flow structure
- completed_methods: List of successfully completed methods - completed_methods: list of successfully completed methods
- execution_methods: All execution methods with their status - execution_methods: All execution methods with their status
""" """
flow_id = execution_data.get("id") flow_id = execution_data.get("id")
@@ -771,7 +763,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if state_to_apply: if state_to_apply:
self._apply_state_updates(state_to_apply) self._apply_state_updates(state_to_apply)
for i, method in enumerate(sorted_methods[:-1]): for method in sorted_methods[:-1]:
method_name = method.get("flow_method", {}).get("name") method_name = method.get("flow_method", {}).get("name")
if method_name: if method_name:
self._completed_methods.add(method_name) self._completed_methods.add(method_name)
@@ -783,7 +775,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif hasattr(self._state, field_name): elif hasattr(self._state, field_name):
object.__setattr__(self._state, field_name, value) object.__setattr__(self._state, field_name, value)
def _apply_state_updates(self, updates: Dict[str, Any]) -> None: def _apply_state_updates(self, updates: dict[str, Any]) -> None:
"""Apply multiple state updates efficiently.""" """Apply multiple state updates efficiently."""
if isinstance(self._state, dict): if isinstance(self._state, dict):
self._state.update(updates) self._state.update(updates)
@@ -792,7 +784,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key): if hasattr(self._state, key):
object.__setattr__(self._state, key, value) object.__setattr__(self._state, key, value)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any: def kickoff(self, inputs: dict[str, Any] | None = None) -> Any:
""" """
Start the flow execution in a synchronous context. Start the flow execution in a synchronous context.
@@ -805,7 +797,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return asyncio.run(run_flow()) return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any: async def kickoff_async(self, inputs: dict[str, Any] | None = None) -> Any:
""" """
Start the flow execution asynchronously. Start the flow execution asynchronously.
@@ -840,7 +832,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(self._state, dict): if isinstance(self._state, dict):
self._state["id"] = inputs["id"] self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel): elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"]) setattr(self._state, "id", inputs["id"]) # noqa: B010
# If persistence is enabled, attempt to restore the stored state using the provided id. # If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None: if "id" in inputs and self._persistence is not None:
@@ -1075,7 +1067,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
) )
# Now execute normal listeners for all router results and the original trigger # Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results all_triggers = [trigger_method, *router_results]
for current_trigger in all_triggers: for current_trigger in all_triggers:
if current_trigger: # Skip None results if current_trigger: # Skip None results
@@ -1109,7 +1101,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods( def _find_triggered_methods(
self, trigger_method: str, router_only: bool self, trigger_method: str, router_only: bool
) -> List[str]: ) -> list[str]:
""" """
Finds all methods that should be triggered based on conditions. Finds all methods that should be triggered based on conditions.
@@ -1126,7 +1118,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns Returns
------- -------
List[str] list[str]
Names of methods that should be triggered. Names of methods that should be triggered.
Notes Notes

View File

@@ -0,0 +1,130 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to
the world\n\nThis is the expected criteria for your final answer: hello world\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '825'
content-type:
- application/json
cookie:
- _cfuvid=NaXWifUGChHp6Ap1mvfMrNzmO4HdzddrqXkSR9T.hYo-1754508545647-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFLBbtswDL37Kzid4yFx46bxbVixtsfssB22wlAl2lEri5okJ+uK/Psg
OY3dtQV2MWA+vqf3SD5lAExJVgETWx5EZ3X++SrcY/Hrcec3l+SKP5frm/16Yx92m6/f9mwWGXR3
jyI8sz4K6qzGoMgMsHDIA0bVxaq8WCzPyrN5AjqSqCOttSFfUt4po/JiXizz+SpfXBzZW1ICPavg
RwYA8JS+0aeR+JtVkLRSpUPveYusOjUBMEc6Vhj3XvnATWCzERRkAppk/QYM7UFwA63aIXBoo23g
xu/RAfw0X5ThGj6l/wquUWuawXdyWn6YSjpses9jLNNrPQG4MRR4HEsKc3tEDif7mlrr6M7/Q2WN
Mspva4fck4lWfSDLEnrIAG7TmPoXyZl11NlQB3rA9NyiXA16bNzOFD2CgQLXk/qqmL2hV0sMXGk/
GTQTXGxRjtRxK7yXiiZANkn92s1b2kNyZdr/kR8BIdAGlLV1KJV4mXhscxiP972205STYebR7ZTA
Oih0cRMSG97r4aSYf/QBu7pRpkVnnRruqrF1eT7nzTmW5Zplh+wvAAAA//8DAGKunMhlAwAA
headers:
CF-RAY:
- 980b99a73c1c22c6-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 17 Sep 2025 21:12:11 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=Ahwkw3J9CDiluZudRgDmybz4FO07eXLz2MQDtkgfct4-1758143531-1.0.1.1-_3e8agfTZW.FPpRMLb1A2nET4OHQEGKNZeGeWT8LIiuSi8R2HWsGsJyueUyzYBYnfHqsfBUO16K1.TkEo2XiqVCaIi6pymeeQxwtXFF1wj8;
path=/; expires=Wed, 17-Sep-25 21:42:11 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=iHqLoc_2sNQLMyzfGCLtGol8vf1Y44xirzQJUuUF_TI-1758143531242-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '419'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '609'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-project-tokens:
- '150000000'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-project-tokens:
- '149999827'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999830'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ece5f999e09e4c189d38e5bc08b2fad9
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,128 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}, {"role": "user", "content": "\nCurrent Task: Say hello to
the world\n\nThis is the expected criteria for your final answer: hello world\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '825'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJNj9MwEL3nV4x8blBSmrabG0ICViqCCydYRbPOJDHreIztbIFV/zty
0m1SPiQukTJv3vN7M/OUAAhVixKE7DDI3ur09dvgfa8P/FO+e/9wa/aHb4cPH9viEw5yK1aRwfdf
SYZn1gvJvdUUFJsJlo4wUFTNd8U+32zybD0CPdekI621Id1w2iuj0nW23qTZLs33Z3bHSpIXJXxO
AACexm/0aWr6LkrIVs+VnrzHlkR5aQIQjnWsCPRe+YAmiNUMSjaBzGj9FgwfQaKBVj0SILTRNqDx
R3IAX8wbZVDDq/G/hI60Zjiy0/VS0FEzeIyhzKD1AkBjOGAcyhjl7oycLuY1t9bxvf+NKhpllO8q
R+jZRKM+sBUjekoA7sYhDVe5hXXc21AFfqDxubzYTXpi3s0CfXkGAwfUi/ruPNprvaqmgEr7xZiF
RNlRPVPnneBQK14AySL1n27+pj0lV6b9H/kZkJJsoLqyjmolrxPPbY7i6f6r7TLl0bDw5B6VpCoo
cnETNTU46OmghP/hA/VVo0xLzjo1XVVjq2KbYbOlorgRySn5BQAA//8DALxsmCBjAwAA
headers:
CF-RAY:
- 980ba79a4ab5f555-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 17 Sep 2025 21:21:42 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=aMMf0fLckKHz0BLW_2lATxD.7R61uYo1ZVW8aeFbruA-1758144102-1.0.1.1-6EKM3UxpdczoiQ6VpPpqqVnY7ftnXndFRWE4vyTzVcy.CQ4N539D97Wh8Ye9EUAvpUuukhW.r5MznkXq4tPXgCCmEv44RvVz2GBAz_e31h8;
path=/; expires=Wed, 17-Sep-25 21:51:42 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=VqrtvU8.QdEHc4.1XXUVmccaCcoj_CiNfI2zhKJoGRs-1758144102566-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '308'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '620'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-project-tokens:
- '150000000'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-project-tokens:
- '149999827'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999830'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_fa896433021140238115972280c05651
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,127 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"}, {"role": "user", "content": "\nCurrent Task: Test task\n\nThis
is the expected criteria for your final answer: test output\nyou MUST return
the actual complete content as the final answer, not a summary.\n\nBegin! This
is VERY important to you, use the tools available and give your best Final Answer,
your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '812'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.93.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.93.0
x-stainless-raw-response:
- 'true'
x-stainless-read-timeout:
- '600.0'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4WV+JHoVgR95NJD4UvaBgJDrSS2FJclV3bSwP9e
kHYsuU2BXghwZ2c4s8vnDEDoWpQgVCdZ9c7kNx+4v7vb7jafnrrPX25/vtObX48f1Q31m+uFmEUG
PXxHxS+sN4p6Z5A12QOsPErGqFqsl1fF4nJdzBPQU40m0lrH+YLyXludX8wvFvl8nRdXR3ZHWmEQ
JXzNAACe0xl92hofRQlJK1V6DEG2KMpTE4DwZGJFyBB0YGlZzEZQkWW0yfotWNqBkhZavUWQ0Ebb
IG3YoQf4Zt9rKw28TfcSNhgYaGA3nAl6bIYgYyg7GDMBpLXEMg4lRbk/IvuTeUOt8/QQ/qCKRlsd
usqjDGSj0cDkREL3GcB9GtJwlls4T73jiukHpueK5eKgJ8bdTNDLI8jE0kzqq/XsFb2qRpbahMmY
hZKqw3qkjjuRQ61pAmST1H+7eU37kFzb9n/kR0ApdIx15TzWWp0nHts8xq/7r7bTlJNhEdBvtcKK
Nfq4iRobOZjD/kV4Cox91WjbondeH35V46rlai6bFS6X1yLbZ78BAAD//wMAZdfoWWMDAAA=
headers:
CF-RAY:
- 980b9e0c5fa516a0-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Wed, 17 Sep 2025 21:15:11 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=w6UZxbAZgYg9EFkKPfrSbMK97MB4jfs7YyvcEmgkvak-1758143711-1.0.1.1-j7YC1nvoMKxYK0T.5G2XDF6TXUCPu_HUs4YO9v65r3NHQFIcOaHbQXX4vqabSgynL2tZy23pbZgD8Cdmxhdw9dp4zkAXhU.imP43_pw4dSE;
path=/; expires=Wed, 17-Sep-25 21:45:11 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=ij9Q8tB7sj2GczANlJ7gbXVjj6hMhz1iVb6oGHuRYu8-1758143711202-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '462'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '665'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-project-tokens:
- '150000000'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-project-tokens:
- '149999830'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999830'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_04536db97c8c4768a200e38c1368c176
status:
code: 200
message: OK
version: 1

View File

@@ -1,17 +1,20 @@
import os import os
from unittest.mock import MagicMock, Mock, patch
import pytest import pytest
from unittest.mock import patch, MagicMock
from crewai import Agent, Crew, Task
from crewai import Agent, Task, Crew from crewai.events.listeners.tracing.first_time_trace_handler import (
from crewai.flow.flow import Flow, start FirstTimeTraceHandler,
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
) )
from crewai.events.listeners.tracing.trace_batch_manager import ( from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatchManager, TraceBatchManager,
) )
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.types import TraceEvent
from crewai.flow.flow import Flow, start
class TestTraceListenerSetup: class TestTraceListenerSetup:
@@ -281,9 +284,9 @@ class TestTraceListenerSetup:
): ):
trace_handlers.append(handler) trace_handlers.append(handler)
assert ( assert len(trace_handlers) == 0, (
len(trace_handlers) == 0 f"Found {len(trace_handlers)} trace handlers when tracing should be disabled"
), f"Found {len(trace_handlers)} trace handlers when tracing should be disabled" )
def test_trace_listener_setup_correctly_for_crew(self): def test_trace_listener_setup_correctly_for_crew(self):
"""Test that trace listener is set up correctly when enabled""" """Test that trace listener is set up correctly when enabled"""
@@ -403,3 +406,254 @@ class TestTraceListenerSetup:
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
crewai_event_bus._handlers.clear() crewai_event_bus._handlers.clear()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_first_time_user_trace_collection_with_timeout(self, mock_plus_api_calls):
"""Test first-time user trace collection logic with timeout behavior"""
with (
patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}),
patch(
"crewai.events.listeners.tracing.utils._is_test_environment",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.utils.is_first_execution",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
return_value=False,
) as mock_prompt,
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
) as mock_mark_completed,
):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True)
from crewai.events.event_bus import crewai_event_bus
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
assert trace_listener.first_time_handler.is_first_time is True
assert trace_listener.first_time_handler.collected_events is False
with (
patch.object(
trace_listener.first_time_handler,
"handle_execution_completion",
wraps=trace_listener.first_time_handler.handle_execution_completion,
) as mock_handle_completion,
patch.object(
trace_listener.batch_manager,
"add_event",
wraps=trace_listener.batch_manager.add_event,
) as mock_add_event,
):
result = crew.kickoff()
assert result is not None
assert mock_handle_completion.call_count >= 1
assert mock_add_event.call_count >= 1
assert trace_listener.first_time_handler.collected_events is True
mock_prompt.assert_called_once_with(timeout_seconds=20)
mock_mark_completed.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_first_time_user_trace_collection_user_accepts(self, mock_plus_api_calls):
"""Test first-time user trace collection when user accepts viewing traces"""
with (
patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}),
patch(
"crewai.events.listeners.tracing.utils._is_test_environment",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.utils.is_first_execution",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
) as mock_mark_completed,
):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True)
from crewai.events.event_bus import crewai_event_bus
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
assert trace_listener.first_time_handler.is_first_time is True
with (
patch.object(
trace_listener.first_time_handler,
"_initialize_backend_and_send_events",
wraps=trace_listener.first_time_handler._initialize_backend_and_send_events,
) as mock_init_backend,
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.first_time_handler,
"handle_execution_completion",
wraps=trace_listener.first_time_handler.handle_execution_completion,
) as mock_handle_completion,
):
trace_listener.batch_manager.ephemeral_trace_url = (
"https://crewai.com/trace/mock-id"
)
crew.kickoff()
assert mock_handle_completion.call_count >= 1, (
"handle_execution_completion should be called"
)
assert trace_listener.first_time_handler.collected_events is True, (
"Events should be marked as collected"
)
mock_init_backend.assert_called_once()
mock_display_link.assert_called_once()
mock_mark_completed.assert_called_once()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_first_time_user_trace_consolidation_logic(self, mock_plus_api_calls):
"""Test the consolidation logic for first-time users vs regular tracing"""
with (
patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "false"}),
patch(
"crewai.events.listeners.tracing.utils._is_test_environment",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.utils.is_first_execution",
return_value=True,
),
):
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus._handlers.clear()
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
assert trace_listener.first_time_handler.is_first_time is True
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Test task", expected_output="test output", agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
with patch.object(TraceBatchManager, "initialize_batch") as mock_initialize:
result = crew.kickoff()
assert mock_initialize.call_count >= 1
assert mock_initialize.call_args_list[0][1]["use_ephemeral"] is True
assert result is not None
def test_first_time_handler_timeout_behavior(self):
"""Test the timeout behavior of the first-time trace prompt"""
with (
patch(
"crewai.events.listeners.tracing.utils._is_test_environment",
return_value=False,
),
patch("threading.Thread") as mock_thread,
):
from crewai.events.listeners.tracing.utils import (
prompt_user_for_trace_viewing,
)
mock_thread_instance = Mock()
mock_thread_instance.is_alive.return_value = True
mock_thread.return_value = mock_thread_instance
result = prompt_user_for_trace_viewing(timeout_seconds=5)
assert result is False
mock_thread.assert_called_once()
call_args = mock_thread.call_args
assert call_args[1]["daemon"] is True
mock_thread_instance.start.assert_called_once()
mock_thread_instance.join.assert_called_once_with(timeout=5)
mock_thread_instance.is_alive.assert_called_once()
def test_first_time_handler_graceful_error_handling(self):
"""Test graceful error handling in first-time trace logic"""
with (
patch(
"crewai.events.listeners.tracing.utils.should_auto_collect_first_time_traces",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.prompt_user_for_trace_viewing",
side_effect=Exception("Prompt failed"),
),
patch(
"crewai.events.listeners.tracing.first_time_trace_handler.mark_first_execution_completed"
) as mock_mark_completed,
):
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.handle_execution_completion()
mock_mark_completed.assert_called_once()