From 6d02b6467441f3b2fe85f4eee10b7432bbb8299a Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 10 Sep 2025 10:28:48 -0700 Subject: [PATCH] refactor: update type hints and enhance trace batch display logic - Changed type hints from `Dict` and `List` to built-in `dict` and `list` for consistency with modern Python standards. - Refactored the trace batch finalization display logic to improve user experience by introducing a new method `_display_traces_events_link`. - Enhanced error handling in the display method to fallback to a simpler display in case of exceptions. - Cleaned up unused imports and organized the import statements for better readability. --- .../listeners/tracing/trace_batch_manager.py | 87 +++++++++++---- .../listeners/tracing/trace_listener.py | 105 +++++++----------- 2 files changed, 108 insertions(+), 84 deletions(-) diff --git a/src/crewai/events/listeners/tracing/trace_batch_manager.py b/src/crewai/events/listeners/tracing/trace_batch_manager.py index 414bbdfd1..50ab39f8c 100644 --- a/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -1,18 +1,19 @@ import uuid -from datetime import datetime, timezone -from typing import Dict, List, Any, Optional from dataclasses import dataclass, field +from datetime import datetime, timezone +from logging import getLogger +from typing import Any, Optional -from crewai.utilities.constants import CREWAI_BASE_URL -from crewai.cli.authentication.token import AuthError, get_auth_token - -from crewai.cli.version import get_crewai_version -from crewai.cli.plus_api import PlusAPI +from rich.align import Align from rich.console import Console from rich.panel import Panel +from rich.text import Text +from crewai.cli.authentication.token import AuthError, get_auth_token +from crewai.cli.plus_api import PlusAPI +from crewai.cli.version import get_crewai_version from crewai.events.listeners.tracing.types import TraceEvent -from logging import getLogger +from crewai.utilities.constants import CREWAI_BASE_URL logger = getLogger(__name__) @@ -23,11 +24,11 @@ class TraceBatch: version: str = field(default_factory=get_crewai_version) batch_id: str = field(default_factory=lambda: str(uuid.uuid4())) - user_context: Dict[str, str] = field(default_factory=dict) - execution_metadata: Dict[str, Any] = field(default_factory=dict) - events: List[TraceEvent] = field(default_factory=list) + user_context: dict[str, str] = field(default_factory=dict) + execution_metadata: dict[str, Any] = field(default_factory=dict) + events: list[TraceEvent] = field(default_factory=list) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: return { "version": self.version, "batch_id": self.batch_id, @@ -43,8 +44,8 @@ class TraceBatchManager: is_current_batch_ephemeral: bool = False trace_batch_id: Optional[str] = None current_batch: Optional[TraceBatch] = None - event_buffer: List[TraceEvent] = [] - execution_start_times: Dict[str, datetime] = {} + event_buffer: list[TraceEvent] = [] + execution_start_times: list[str, datetime] = {} batch_owner_type: Optional[str] = None batch_owner_id: Optional[str] = None @@ -246,12 +247,7 @@ class TraceBatchManager: if not self.is_current_batch_ephemeral and access_code is None else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}" ) - panel = Panel( - f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", - title="Trace Batch Finalization", - border_style="green", - ) - console.print(panel) + self._display_traces_events_link(console, return_link, access_code) else: logger.error( @@ -262,6 +258,57 @@ class TraceBatchManager: logger.error(f"❌ Error finalizing trace batch: {str(e)}") # TODO: send error to app + def _display_traces_events_link( + self, console: Console, return_link: str, access_code: Optional[str] = None + ): + """Display trace batch finalization information""" + try: + final_text = Text() + final_text.append("🎊", style="bold bright_yellow") + final_text.append(" TRACES READY FOR VIEWING! ", style="bold bright_green") + final_text.append("🎊", style="bold bright_yellow") + final_text.append("\n\n") + + final_text.append("Trace ID: ", style="bold bright_cyan") + final_text.append( + f"{self.trace_batch_id}", + style="bright_blue", + ) + final_text.append("\n\n") + + final_text.append("View Your Traces: ", style="bold bright_cyan") + final_text.append(f"{return_link}", style="bright_white on red") + + if access_code: + final_text.append("\n\n") + final_text.append("Access Code: ", style="bold bright_cyan") + final_text.append(f"{access_code}", style="bright_blue") + + final_text.append("\n\n") + final_text.append("💡 ", style="bright_yellow") + final_text.append( + "Click the link above to dive into your agentic automation traces!", + style="italic bright_white", + ) + + final_panel = Panel( + Align.center(final_text), + title="🎊 Your Traces Are Ready! 🎊", + style="bright_green", + expand=True, + padding=(2, 4), + ) + console.print(final_panel) + + except Exception as e: + logger.warning(f"Display failed, falling back to simple display: {str(e)}") + fallback_panel = Panel( + f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", + title="Trace Batch Finalization", + border_style="green", + ) + console.print(fallback_panel) + def _cleanup_batch_data(self): """Clean up batch data after successful finalization to free memory""" try: diff --git a/src/crewai/events/listeners/tracing/trace_listener.py b/src/crewai/events/listeners/tracing/trace_listener.py index bc9a27d7f..9b2e32712 100644 --- a/src/crewai/events/listeners/tracing/trace_listener.py +++ b/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,28 +1,55 @@ import os import uuid +from typing import Any, Optional -from typing import Dict, Any, Optional - +from crewai.cli.authentication.token import AuthError, get_auth_token +from crewai.cli.version import get_crewai_version from crewai.events.base_event_listener import BaseEventListener +from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, AgentExecutionStartedEvent, - LiteAgentExecutionStartedEvent, LiteAgentExecutionCompletedEvent, LiteAgentExecutionErrorEvent, - AgentExecutionErrorEvent, -) -from crewai.events.listeners.tracing.types import TraceEvent -from crewai.events.types.reasoning_events import ( - AgentReasoningStartedEvent, - AgentReasoningCompletedEvent, - AgentReasoningFailedEvent, + LiteAgentExecutionStartedEvent, ) from crewai.events.types.crew_events import ( CrewKickoffCompletedEvent, CrewKickoffFailedEvent, CrewKickoffStartedEvent, ) +from crewai.events.types.flow_events import ( + FlowCreatedEvent, + FlowFinishedEvent, + FlowPlotEvent, + FlowStartedEvent, + MethodExecutionFailedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) +from crewai.events.types.llm_events import ( + LLMCallCompletedEvent, + LLMCallFailedEvent, + LLMCallStartedEvent, +) +from crewai.events.types.llm_guardrail_events import ( + LLMGuardrailCompletedEvent, + LLMGuardrailStartedEvent, +) +from crewai.events.types.memory_events import ( + MemoryQueryCompletedEvent, + MemoryQueryFailedEvent, + MemoryQueryStartedEvent, + MemorySaveCompletedEvent, + MemorySaveFailedEvent, + MemorySaveStartedEvent, +) +from crewai.events.types.reasoning_events import ( + AgentReasoningCompletedEvent, + AgentReasoningFailedEvent, + AgentReasoningStartedEvent, +) from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -33,42 +60,10 @@ from crewai.events.types.tool_usage_events import ( ToolUsageFinishedEvent, ToolUsageStartedEvent, ) -from crewai.events.types.llm_events import ( - LLMCallCompletedEvent, - LLMCallFailedEvent, - LLMCallStartedEvent, -) - -from crewai.events.types.flow_events import ( - FlowCreatedEvent, - FlowStartedEvent, - FlowFinishedEvent, - MethodExecutionStartedEvent, - MethodExecutionFinishedEvent, - MethodExecutionFailedEvent, - FlowPlotEvent, -) -from crewai.events.types.llm_guardrail_events import ( - LLMGuardrailStartedEvent, - LLMGuardrailCompletedEvent, -) from crewai.utilities.serialization import to_serializable - from .trace_batch_manager import TraceBatchManager -from crewai.events.types.memory_events import ( - MemoryQueryStartedEvent, - MemoryQueryCompletedEvent, - MemoryQueryFailedEvent, - MemorySaveStartedEvent, - MemorySaveCompletedEvent, - MemorySaveFailedEvent, -) - -from crewai.cli.authentication.token import AuthError, get_auth_token -from crewai.cli.version import get_crewai_version - class TraceCollectionListener(BaseEventListener): """ @@ -112,7 +107,7 @@ class TraceCollectionListener(BaseEventListener): except AuthError: return False - def _get_user_context(self) -> Dict[str, str]: + def _get_user_context(self) -> dict[str, str]: """Extract user context for tracing""" return { "user_id": os.getenv("CREWAI_USER_ID", "anonymous"), @@ -325,7 +320,7 @@ class TraceCollectionListener(BaseEventListener): self._initialize_batch(user_context, execution_metadata) def _initialize_batch( - self, user_context: Dict[str, str], execution_metadata: Dict[str, Any] + self, user_context: dict[str, str], execution_metadata: dict[str, Any] ): """Initialize trace batch if ephemeral""" if not self._check_authenticated(): @@ -371,7 +366,7 @@ class TraceCollectionListener(BaseEventListener): def _build_event_data( self, event_type: str, event: Any, source: Any - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Build event data""" if event_type not in self.complex_events: return self._safe_serialize_to_dict(event) @@ -429,7 +424,7 @@ class TraceCollectionListener(BaseEventListener): # TODO: move to utils def _safe_serialize_to_dict( self, obj, exclude: set[str] | None = None - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Safely serialize an object to a dictionary for event data.""" try: serialized = to_serializable(obj, exclude) @@ -439,21 +434,3 @@ class TraceCollectionListener(BaseEventListener): return {"serialized_data": serialized} except Exception as e: return {"serialization_error": str(e), "object_type": type(obj).__name__} - - # TODO: move to utils - def _truncate_messages(self, messages, max_content_length=500, max_messages=5): - """Truncate message content and limit number of messages""" - if not messages or not isinstance(messages, list): - return messages - - # Limit number of messages - limited_messages = messages[:max_messages] - - # Truncate each message content - for msg in limited_messages: - if isinstance(msg, dict) and "content" in msg: - content = msg["content"] - if len(content) > max_content_length: - msg["content"] = content[:max_content_length] + "..." - - return limited_messages