Compare commits

...

8 Commits

Author SHA1 Message Date
Greyson LaLonde
3a8889ce61 Merge branch 'main' into lorenze/trace-improvements-3 2025-09-12 10:45:11 -04:00
Greyson LaLonde
d865a49f5a Merge branch 'main' into lorenze/trace-improvements-3 2025-09-11 17:41:47 -04:00
lorenzejay
677fe9032c Merge branch 'lorenze/trace-improvements-3' of github.com:crewAIInc/crewAI into lorenze/trace-improvements-3 2025-09-11 12:27:33 -07:00
lorenzejay
6e8c1f332f linted 2025-09-11 12:26:01 -07:00
Lorenze Jay
abe170cdc2 Merge branch 'main' into lorenze/trace-improvements-3 2025-09-11 12:20:49 -07:00
lorenzejay
51767f2e15 Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/trace-improvements-3 2025-09-10 10:43:38 -07:00
lorenzejay
dc41a0d13b fixed types 2025-09-10 10:32:59 -07:00
lorenzejay
6d02b64674 refactor: update type hints and enhance trace batch display logic
- Changed type hints from `Dict` and `List` to built-in `dict` and `list` for consistency with modern Python standards.
- Refactored the trace batch finalization display logic to improve user experience by introducing a new method `_display_traces_events_link`.
- Enhanced error handling in the display method to fallback to a simpler display in case of exceptions.
- Cleaned up unused imports and organized the import statements for better readability.
2025-09-10 10:28:48 -07:00
2 changed files with 143 additions and 123 deletions

View File

@@ -1,18 +1,19 @@
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from typing import Any, ClassVar
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
from rich.align import Align
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.plus_api import PlusAPI
from crewai.cli.version import get_crewai_version
from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger
from crewai.utilities.constants import CREWAI_BASE_URL
logger = getLogger(__name__)
@@ -23,11 +24,11 @@ class TraceBatch:
version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list)
user_context: dict[str, str] = field(default_factory=dict)
execution_metadata: dict[str, Any] = field(default_factory=dict)
events: list[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
return {
"version": self.version,
"batch_id": self.batch_id,
@@ -41,12 +42,12 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
trace_batch_id: str | None = None
current_batch: TraceBatch | None = None
event_buffer: ClassVar[list[TraceEvent]] = []
execution_start_times: ClassVar[dict[str, datetime]] = {}
batch_owner_type: str | None = None
batch_owner_id: str | None = None
def __init__(self):
try:
@@ -58,8 +59,8 @@ class TraceBatchManager:
def initialize_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
) -> TraceBatch:
"""Initialize a new trace batch"""
@@ -76,8 +77,8 @@ class TraceBatchManager:
def _initialize_backend_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
):
"""Send batch initialization to backend"""
@@ -143,7 +144,7 @@ class TraceBatchManager:
except Exception as e:
logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
f"Error initializing trace batch: {e!s}. Continuing without tracing."
)
def add_event(self, trace_event: TraceEvent):
@@ -178,19 +179,18 @@ class TraceBatchManager:
if response.status_code in [200, 201]:
self.event_buffer.clear()
return 200
else:
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost."
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
def finalize_batch(self) -> Optional[TraceBatch]:
except Exception as e:
logger.warning(
f"Error sending events to backend: {e!s}. Events will be lost."
)
return 500
def finalize_batch(self) -> TraceBatch | None:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
@@ -246,12 +246,7 @@ class TraceBatchManager:
if not self.is_current_batch_ephemeral and access_code is None
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(panel)
self._display_traces_events_link(console, return_link, access_code)
else:
logger.error(
@@ -259,9 +254,60 @@ class TraceBatchManager:
)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {str(e)}")
logger.error(f"❌ Error finalizing trace batch: {e!s}")
# TODO: send error to app
def _display_traces_events_link(
self, console: Console, return_link: str, access_code: str | None = None
):
"""Display trace batch finalization information"""
try:
final_text = Text()
final_text.append("🎊", style="bold bright_yellow")
final_text.append(" TRACES READY FOR VIEWING! ", style="bold bright_green")
final_text.append("🎊", style="bold bright_yellow")
final_text.append("\n\n")
final_text.append("Trace ID: ", style="bold bright_cyan")
final_text.append(
f"{self.trace_batch_id}",
style="bright_blue",
)
final_text.append("\n\n")
final_text.append("View Your Traces: ", style="bold bright_cyan")
final_text.append(f"{return_link}", style="bright_white on red")
if access_code:
final_text.append("\n\n")
final_text.append("Access Code: ", style="bold bright_cyan")
final_text.append(f"{access_code}", style="bright_blue")
final_text.append("\n\n")
final_text.append("💡 ", style="bright_yellow")
final_text.append(
"Click the link above to dive into your agentic automation traces!",
style="italic bright_white",
)
final_panel = Panel(
Align.center(final_text),
title="🎊 Your Traces Are Ready! 🎊",
style="bright_green",
expand=True,
padding=(2, 4),
)
console.print(final_panel)
except Exception as e:
logger.warning(f"Display failed, falling back to simple display: {e!s}")
fallback_panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)
console.print(fallback_panel)
def _cleanup_batch_data(self):
"""Clean up batch data after successful finalization to free memory"""
try:
@@ -277,7 +323,7 @@ class TraceBatchManager:
self.batch_sequence = 0
except Exception as e:
logger.error(f"Warning: Error during cleanup: {str(e)}")
logger.error(f"Warning: Error during cleanup: {e!s}")
def has_events(self) -> bool:
"""Check if there are events in the buffer"""
@@ -306,7 +352,7 @@ class TraceBatchManager:
return duration_ms
return 0
def get_trace_id(self) -> Optional[str]:
def get_trace_id(self) -> str | None:
"""Get current trace ID"""
if self.current_batch:
return self.current_batch.user_context.get("trace_id")

View File

@@ -1,28 +1,55 @@
import os
import uuid
from typing import Any, ClassVar
from typing import Dict, Any, Optional
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
)
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryQueryStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.events.types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
AgentReasoningStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -33,49 +60,17 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
class TraceCollectionListener(BaseEventListener):
"""
Trace collection listener that orchestrates trace collection
"""
complex_events = [
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
@@ -95,7 +90,7 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
batch_manager: TraceBatchManager | None = None,
):
if self._initialized:
return
@@ -107,12 +102,11 @@ class TraceCollectionListener(BaseEventListener):
def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled"""
try:
res = bool(get_auth_token())
return res
return bool(get_auth_token())
except AuthError:
return False
def _get_user_context(self) -> Dict[str, str]:
def _get_user_context(self) -> dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
@@ -325,7 +319,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self, user_context: dict[str, str], execution_metadata: dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
@@ -371,11 +365,11 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
elif event_type == "task_started":
if event_type == "task_started":
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
@@ -384,7 +378,7 @@ class TraceCollectionListener(BaseEventListener):
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
elif event_type == "task_completed":
if event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
@@ -397,19 +391,19 @@ class TraceCollectionListener(BaseEventListener):
else None,
"agent_role": event.output.agent if event.output else None,
}
elif event_type == "agent_execution_started":
if event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "agent_execution_completed":
if event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "llm_call_started":
if event_type == "llm_call_started":
event_data = self._safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
@@ -417,43 +411,23 @@ class TraceCollectionListener(BaseEventListener):
else None
)
return event_data
elif event_type == "llm_call_completed":
if event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event)
else:
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
# TODO: move to utils
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
else:
return {"serialized_data": serialized}
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages
# Limit number of messages
limited_messages = messages[:max_messages]
# Truncate each message content
for msg in limited_messages:
if isinstance(msg, dict) and "content" in msg:
content = msg["content"]
if len(content) > max_content_length:
msg["content"] = content[:max_content_length] + "..."
return limited_messages