This commit is contained in:
lorenzejay
2025-09-11 12:26:01 -07:00
parent 51767f2e15
commit 6e8c1f332f
2 changed files with 37 additions and 41 deletions

View File

@@ -2,7 +2,7 @@ import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from typing import Any, Optional
from typing import Any, ClassVar
from rich.align import Align
from rich.console import Console
@@ -42,12 +42,12 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: list[TraceEvent] = []
execution_start_times: list[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
trace_batch_id: str | None = None
current_batch: TraceBatch | None = None
event_buffer: ClassVar[list[TraceEvent]] = []
execution_start_times: ClassVar[dict[str, datetime]] = {}
batch_owner_type: str | None = None
batch_owner_id: str | None = None
def __init__(self):
try:
@@ -144,7 +144,7 @@ class TraceBatchManager:
except Exception as e:
logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
f"Error initializing trace batch: {e!s}. Continuing without tracing."
)
def add_event(self, trace_event: TraceEvent):
@@ -179,19 +179,18 @@ class TraceBatchManager:
if response.status_code in [200, 201]:
self.event_buffer.clear()
return 200
else:
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost."
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
def finalize_batch(self) -> Optional[TraceBatch]:
except Exception as e:
logger.warning(
f"Error sending events to backend: {e!s}. Events will be lost."
)
return 500
def finalize_batch(self) -> TraceBatch | None:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
@@ -255,11 +254,11 @@ class TraceBatchManager:
)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {str(e)}")
logger.error(f"❌ Error finalizing trace batch: {e!s}")
# TODO: send error to app
def _display_traces_events_link(
self, console: Console, return_link: str, access_code: Optional[str] = None
self, console: Console, return_link: str, access_code: str | None = None
):
"""Display trace batch finalization information"""
try:
@@ -301,7 +300,7 @@ class TraceBatchManager:
console.print(final_panel)
except Exception as e:
logger.warning(f"Display failed, falling back to simple display: {str(e)}")
logger.warning(f"Display failed, falling back to simple display: {e!s}")
fallback_panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
@@ -324,7 +323,7 @@ class TraceBatchManager:
self.batch_sequence = 0
except Exception as e:
logger.error(f"Warning: Error during cleanup: {str(e)}")
logger.error(f"Warning: Error during cleanup: {e!s}")
def has_events(self) -> bool:
"""Check if there are events in the buffer"""
@@ -353,7 +352,7 @@ class TraceBatchManager:
return duration_ms
return 0
def get_trace_id(self) -> Optional[str]:
def get_trace_id(self) -> str | None:
"""Get current trace ID"""
if self.current_batch:
return self.current_batch.user_context.get("trace_id")

View File

@@ -1,6 +1,6 @@
import os
import uuid
from typing import Any, Optional
from typing import Any, ClassVar
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
@@ -70,7 +70,7 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
complex_events = [
complex_events: ClassVar[list[str]] = [
"task_started",
"task_completed",
"llm_call_started",
@@ -90,7 +90,7 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
batch_manager: TraceBatchManager | None = None,
):
if self._initialized:
return
@@ -102,8 +102,7 @@ class TraceCollectionListener(BaseEventListener):
def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled"""
try:
res = bool(get_auth_token())
return res
return bool(get_auth_token())
except AuthError:
return False
@@ -370,7 +369,7 @@ class TraceCollectionListener(BaseEventListener):
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
elif event_type == "task_started":
if event_type == "task_started":
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
@@ -379,7 +378,7 @@ class TraceCollectionListener(BaseEventListener):
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
elif event_type == "task_completed":
if event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
@@ -392,19 +391,19 @@ class TraceCollectionListener(BaseEventListener):
else None,
"agent_role": event.output.agent if event.output else None,
}
elif event_type == "agent_execution_started":
if event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "agent_execution_completed":
if event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "llm_call_started":
if event_type == "llm_call_started":
event_data = self._safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
@@ -412,14 +411,13 @@ class TraceCollectionListener(BaseEventListener):
else None
)
return event_data
elif event_type == "llm_call_completed":
if event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event)
else:
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
return {
"event_type": event_type,
"event": self._safe_serialize_to_dict(event),
"source": source,
}
# TODO: move to utils
def _safe_serialize_to_dict(
@@ -430,7 +428,6 @@ class TraceCollectionListener(BaseEventListener):
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
return serialized
else:
return {"serialized_data": serialized}
return {"serialized_data": serialized}
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}