From d6b47bd61b55f3fde30ac1a7668a11bb6d1442e5 Mon Sep 17 00:00:00 2001 From: Heitor Sammuel Carvalho Date: Thu, 18 Dec 2025 10:52:15 -0300 Subject: [PATCH] Enhance typing and ensure timestamp is always included in event_data inside traces --- .../listeners/tracing/trace_listener.py | 150 ++++++++++-------- 1 file changed, 87 insertions(+), 63 deletions(-) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index c8f7000cd..e31b3f477 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,7 +1,7 @@ """Trace collection listener for orchestrating trace collection.""" import os -from typing import Any, ClassVar +from typing import Any, ClassVar, cast import uuid from typing_extensions import Self @@ -9,6 +9,7 @@ from typing_extensions import Self from crewai.cli.authentication.token import AuthError, get_auth_token from crewai.cli.version import get_crewai_version from crewai.events.base_event_listener import BaseEventListener +from crewai.events.base_events import BaseEvent from crewai.events.event_bus import CrewAIEventsBus from crewai.events.listeners.tracing.first_time_trace_handler import ( FirstTimeTraceHandler, @@ -83,6 +84,7 @@ from crewai.events.types.tool_usage_events import ( ToolUsageStartedEvent, ) from crewai.events.utils.console_formatter import ConsoleFormatter +from crewai.task import Task class TraceCollectionListener(BaseEventListener): @@ -567,79 +569,101 @@ class TraceCollectionListener(BaseEventListener): self.batch_manager.end_event_processing() def _create_trace_event( - self, event_type: str, source: Any, event: Any + self, event_type: str, source: Any, event: type[BaseEvent] ) -> TraceEvent: """Create a trace event""" - if hasattr(event, "timestamp") and event.timestamp: - trace_event = TraceEvent( - type=event_type, - timestamp=event.timestamp.isoformat(), - ) - else: - trace_event = TraceEvent( - type=event_type, - ) + trace_event = TraceEvent( + type=event_type, + timestamp=event.timestamp.isoformat(), + ) trace_event.event_data = self._build_event_data(event_type, event, source) return trace_event def _build_event_data( - self, event_type: str, event: Any, source: Any + self, event_type: str, event: type[BaseEvent], source: Any ) -> dict[str, Any]: """Build event data""" - if event_type not in self.complex_events: - return safe_serialize_to_dict(event) - if event_type == "task_started": - return { - "task_description": event.task.description, - "expected_output": event.task.expected_output, - "task_name": event.task.name or event.task.description, - "context": event.context, - "agent_role": source.agent.role, - "task_id": str(event.task.id), - } - if event_type == "task_completed": - return { - "task_description": event.task.description if event.task else None, - "task_name": event.task.name or event.task.description - if event.task - else None, - "task_id": str(event.task.id) if event.task else None, - "output_raw": event.output.raw if event.output else None, - "output_format": str(event.output.output_format) - if event.output - else None, - "agent_role": event.output.agent if event.output else None, - } - if event_type == "agent_execution_started": - return { - "agent_role": event.agent.role, - "agent_goal": event.agent.goal, - "agent_backstory": event.agent.backstory, - } - if event_type == "agent_execution_completed": - return { - "agent_role": event.agent.role, - "agent_goal": event.agent.goal, - "agent_backstory": event.agent.backstory, - } - if event_type == "llm_call_started": - event_data = safe_serialize_to_dict(event) - event_data["task_name"] = ( - event.task_name or event.task_description - if hasattr(event, "task_name") and event.task_name - else None - ) - return event_data - if event_type == "llm_call_completed": - return safe_serialize_to_dict(event) + event_data = None - return { - "event_type": event_type, - "event": safe_serialize_to_dict(event), - "source": source, - } + if event_type not in self.complex_events: + event_data = safe_serialize_to_dict(event) + elif event_type == "task_started": + task_started_event: TaskStartedEvent = cast(TaskStartedEvent, event) + event_data = { + "task_description": cast(Task, task_started_event.task).description, + "expected_output": cast(Task, task_started_event.task).expected_output, + "task_name": cast(Task, task_started_event.task).name + or cast(Task, task_started_event.task).description, + "context": task_started_event.context, + "agent_role": source.agent.role, + "task_id": str(cast(Task, task_started_event.task).id), + } + elif event_type == "task_completed": + task_completed_event: TaskCompletedEvent = cast(TaskCompletedEvent, event) + event_data = { + "task_description": cast(Task, task_completed_event.task).description + if cast(Task, task_completed_event.task) + else None, + "task_name": cast(Task, task_completed_event.task).name + or cast(Task, task_completed_event.task).description + if task_completed_event.task + else None, + "task_id": str(cast(Task, task_completed_event.task).id) + if cast(Task, task_completed_event.task) + else None, + "output_raw": task_completed_event.output.raw + if task_completed_event.output + else None, + "output_format": str(task_completed_event.output.output_format) + if task_completed_event.output + else None, + "agent_role": task_completed_event.output.agent + if task_completed_event.output + else None, + } + elif event_type == "agent_execution_started": + agent_execution_started_event: AgentExecutionStartedEvent = cast( + AgentExecutionStartedEvent, event + ) + event_data = { + "agent_role": agent_execution_started_event.agent.role, + "agent_goal": agent_execution_started_event.agent.goal, + "agent_backstory": agent_execution_started_event.agent.backstory, + } + elif event_type == "agent_execution_completed": + agent_execution_completed_event: AgentExecutionCompletedEvent = cast( + AgentExecutionCompletedEvent, event + ) + event_data = { + "agent_role": agent_execution_completed_event.agent.role, + "agent_goal": agent_execution_completed_event.agent.goal, + "agent_backstory": agent_execution_completed_event.agent.backstory, + } + elif event_type == "llm_call_started": + llm_call_started_event: LLMCallStartedEvent = cast( + LLMCallStartedEvent, event + ) + event_data = safe_serialize_to_dict(event) + event_data["task_name"] = llm_call_started_event.task_name + elif event_type == "llm_call_completed": + llm_call_completed_event: LLMCallCompletedEvent = cast( + LLMCallCompletedEvent, event + ) + event_data = safe_serialize_to_dict(event) + event_data["task_name"] = llm_call_completed_event.task_name + else: + event_data = { + "event_type": event_type, + "event": safe_serialize_to_dict(event), + "source": source, + } + + if "timestamp" not in event_data.keys(): + event_data["timestamp"] = event.timestamp.isoformat() + + return event_data def _show_tracing_disabled_message(self) -> None: """Show a message when tracing is disabled."""