From 6e8c1f332f574088793c44143948ec33bc02e143 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Thu, 11 Sep 2025 12:26:01 -0700 Subject: [PATCH] linted --- .../listeners/tracing/trace_batch_manager.py | 43 +++++++++---------- .../listeners/tracing/trace_listener.py | 35 +++++++-------- 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/src/crewai/events/listeners/tracing/trace_batch_manager.py b/src/crewai/events/listeners/tracing/trace_batch_manager.py index 8b931c18e..e5a0de0c8 100644 --- a/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -2,7 +2,7 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from logging import getLogger -from typing import Any, Optional +from typing import Any, ClassVar from rich.align import Align from rich.console import Console @@ -42,12 +42,12 @@ class TraceBatchManager: """Single responsibility: Manage batches and event buffering""" is_current_batch_ephemeral: bool = False - trace_batch_id: Optional[str] = None - current_batch: Optional[TraceBatch] = None - event_buffer: list[TraceEvent] = [] - execution_start_times: list[str, datetime] = {} - batch_owner_type: Optional[str] = None - batch_owner_id: Optional[str] = None + trace_batch_id: str | None = None + current_batch: TraceBatch | None = None + event_buffer: ClassVar[list[TraceEvent]] = [] + execution_start_times: ClassVar[dict[str, datetime]] = {} + batch_owner_type: str | None = None + batch_owner_id: str | None = None def __init__(self): try: @@ -144,7 +144,7 @@ class TraceBatchManager: except Exception as e: logger.warning( - f"Error initializing trace batch: {str(e)}. Continuing without tracing." + f"Error initializing trace batch: {e!s}. Continuing without tracing." ) def add_event(self, trace_event: TraceEvent): @@ -179,19 +179,18 @@ class TraceBatchManager: if response.status_code in [200, 201]: self.event_buffer.clear() return 200 - else: - logger.warning( - f"Failed to send events: {response.status_code}. Events will be lost." - ) - return 500 - - except Exception as e: logger.warning( - f"Error sending events to backend: {str(e)}. Events will be lost." + f"Failed to send events: {response.status_code}. Events will be lost." ) return 500 - def finalize_batch(self) -> Optional[TraceBatch]: + except Exception as e: + logger.warning( + f"Error sending events to backend: {e!s}. Events will be lost." + ) + return 500 + + def finalize_batch(self) -> TraceBatch | None: """Finalize batch and return it for sending""" if not self.current_batch: return None @@ -255,11 +254,11 @@ class TraceBatchManager: ) except Exception as e: - logger.error(f"❌ Error finalizing trace batch: {str(e)}") + logger.error(f"❌ Error finalizing trace batch: {e!s}") # TODO: send error to app def _display_traces_events_link( - self, console: Console, return_link: str, access_code: Optional[str] = None + self, console: Console, return_link: str, access_code: str | None = None ): """Display trace batch finalization information""" try: @@ -301,7 +300,7 @@ class TraceBatchManager: console.print(final_panel) except Exception as e: - logger.warning(f"Display failed, falling back to simple display: {str(e)}") + logger.warning(f"Display failed, falling back to simple display: {e!s}") fallback_panel = Panel( f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}", title="Trace Batch Finalization", @@ -324,7 +323,7 @@ class TraceBatchManager: self.batch_sequence = 0 except Exception as e: - logger.error(f"Warning: Error during cleanup: {str(e)}") + logger.error(f"Warning: Error during cleanup: {e!s}") def has_events(self) -> bool: """Check if there are events in the buffer""" @@ -353,7 +352,7 @@ class TraceBatchManager: return duration_ms return 0 - def get_trace_id(self) -> Optional[str]: + def get_trace_id(self) -> str | None: """Get current trace ID""" if self.current_batch: return self.current_batch.user_context.get("trace_id") diff --git a/src/crewai/events/listeners/tracing/trace_listener.py b/src/crewai/events/listeners/tracing/trace_listener.py index 9b2e32712..ba71254f4 100644 --- a/src/crewai/events/listeners/tracing/trace_listener.py +++ b/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,6 +1,6 @@ import os import uuid -from typing import Any, Optional +from typing import Any, ClassVar from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.version import get_crewai_version @@ -70,7 +70,7 @@ class TraceCollectionListener(BaseEventListener): Trace collection listener that orchestrates trace collection """ - complex_events = [ + complex_events: ClassVar[list[str]] = [ "task_started", "task_completed", "llm_call_started", @@ -90,7 +90,7 @@ class TraceCollectionListener(BaseEventListener): def __init__( self, - batch_manager: Optional[TraceBatchManager] = None, + batch_manager: TraceBatchManager | None = None, ): if self._initialized: return @@ -102,8 +102,7 @@ class TraceCollectionListener(BaseEventListener): def _check_authenticated(self) -> bool: """Check if tracing should be enabled""" try: - res = bool(get_auth_token()) - return res + return bool(get_auth_token()) except AuthError: return False @@ -370,7 +369,7 @@ class TraceCollectionListener(BaseEventListener): """Build event data""" if event_type not in self.complex_events: return self._safe_serialize_to_dict(event) - elif event_type == "task_started": + if event_type == "task_started": return { "task_description": event.task.description, "expected_output": event.task.expected_output, @@ -379,7 +378,7 @@ class TraceCollectionListener(BaseEventListener): "agent_role": source.agent.role, "task_id": str(event.task.id), } - elif event_type == "task_completed": + if event_type == "task_completed": return { "task_description": event.task.description if event.task else None, "task_name": event.task.name or event.task.description @@ -392,19 +391,19 @@ class TraceCollectionListener(BaseEventListener): else None, "agent_role": event.output.agent if event.output else None, } - elif event_type == "agent_execution_started": + if event_type == "agent_execution_started": return { "agent_role": event.agent.role, "agent_goal": event.agent.goal, "agent_backstory": event.agent.backstory, } - elif event_type == "agent_execution_completed": + if event_type == "agent_execution_completed": return { "agent_role": event.agent.role, "agent_goal": event.agent.goal, "agent_backstory": event.agent.backstory, } - elif event_type == "llm_call_started": + if event_type == "llm_call_started": event_data = self._safe_serialize_to_dict(event) event_data["task_name"] = ( event.task_name or event.task_description @@ -412,14 +411,13 @@ class TraceCollectionListener(BaseEventListener): else None ) return event_data - elif event_type == "llm_call_completed": + if event_type == "llm_call_completed": return self._safe_serialize_to_dict(event) - else: - return { - "event_type": event_type, - "event": self._safe_serialize_to_dict(event), - "source": source, - } + return { + "event_type": event_type, + "event": self._safe_serialize_to_dict(event), + "source": source, + } # TODO: move to utils def _safe_serialize_to_dict( @@ -430,7 +428,6 @@ class TraceCollectionListener(BaseEventListener): serialized = to_serializable(obj, exclude) if isinstance(serialized, dict): return serialized - else: - return {"serialized_data": serialized} + return {"serialized_data": serialized} except Exception as e: return {"serialization_error": str(e), "object_type": type(obj).__name__}