refactor: update type hints and enhance trace batch display logic

- Changed type hints from `Dict` and `List` to built-in `dict` and `list` for consistency with modern Python standards.
- Refactored the trace batch finalization display logic to improve user experience by introducing a new method `_display_traces_events_link`.
- Enhanced error handling in the display method to fallback to a simpler display in case of exceptions.
- Cleaned up unused imports and organized the import statements for better readability.
This commit is contained in:
lorenzejay
2025-09-10 10:28:48 -07:00
parent f0def350a4
commit 6d02b64674
2 changed files with 108 additions and 84 deletions

View File

@@ -1,18 +1,19 @@
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from typing import Any, Optional
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
from rich.align import Align
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.plus_api import PlusAPI
from crewai.cli.version import get_crewai_version
from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger
from crewai.utilities.constants import CREWAI_BASE_URL
logger = getLogger(__name__)
@@ -23,11 +24,11 @@ class TraceBatch:
version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list)
user_context: dict[str, str] = field(default_factory=dict)
execution_metadata: dict[str, Any] = field(default_factory=dict)
events: list[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
return {
"version": self.version,
"batch_id": self.batch_id,
@@ -43,8 +44,8 @@ class TraceBatchManager:
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
event_buffer: list[TraceEvent] = []
execution_start_times: list[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
@@ -246,12 +247,7 @@ class TraceBatchManager:
if not self.is_current_batch_ephemeral and access_code is None
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(panel)
self._display_traces_events_link(console, return_link, access_code)
else:
logger.error(
@@ -262,6 +258,57 @@ class TraceBatchManager:
logger.error(f"❌ Error finalizing trace batch: {str(e)}")
# TODO: send error to app
def _display_traces_events_link(
self, console: Console, return_link: str, access_code: Optional[str] = None
):
"""Display trace batch finalization information"""
try:
final_text = Text()
final_text.append("🎊", style="bold bright_yellow")
final_text.append(" TRACES READY FOR VIEWING! ", style="bold bright_green")
final_text.append("🎊", style="bold bright_yellow")
final_text.append("\n\n")
final_text.append("Trace ID: ", style="bold bright_cyan")
final_text.append(
f"{self.trace_batch_id}",
style="bright_blue",
)
final_text.append("\n\n")
final_text.append("View Your Traces: ", style="bold bright_cyan")
final_text.append(f"{return_link}", style="bright_white on red")
if access_code:
final_text.append("\n\n")
final_text.append("Access Code: ", style="bold bright_cyan")
final_text.append(f"{access_code}", style="bright_blue")
final_text.append("\n\n")
final_text.append("💡 ", style="bright_yellow")
final_text.append(
"Click the link above to dive into your agentic automation traces!",
style="italic bright_white",
)
final_panel = Panel(
Align.center(final_text),
title="🎊 Your Traces Are Ready! 🎊",
style="bright_green",
expand=True,
padding=(2, 4),
)
console.print(final_panel)
except Exception as e:
logger.warning(f"Display failed, falling back to simple display: {str(e)}")
fallback_panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(fallback_panel)
def _cleanup_batch_data(self):
"""Clean up batch data after successful finalization to free memory"""
try:

View File

@@ -1,28 +1,55 @@
import os
import uuid
from typing import Any, Optional
from typing import Dict, Any, Optional
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
)
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryQueryStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
AgentReasoningStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -33,42 +60,10 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
class TraceCollectionListener(BaseEventListener):
"""
@@ -112,7 +107,7 @@ class TraceCollectionListener(BaseEventListener):
except AuthError:
return False
def _get_user_context(self) -> Dict[str, str]:
def _get_user_context(self) -> dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
@@ -325,7 +320,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self, user_context: dict[str, str], execution_metadata: dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
@@ -371,7 +366,7 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
@@ -429,7 +424,7 @@ class TraceCollectionListener(BaseEventListener):
# TODO: move to utils
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)
@@ -439,21 +434,3 @@ class TraceCollectionListener(BaseEventListener):
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
# Limit number of messages
limited_messages = messages[:max_messages]
# Truncate each message content
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages