Enhance typing and ensure timestamp is always included in event_data inside traces

This commit is contained in:
Heitor Sammuel Carvalho
2025-12-18 10:52:15 -03:00
parent 8d0effafec
commit d6b47bd61b

View File

@@ -1,7 +1,7 @@
"""Trace collection listener for orchestrating trace collection."""
import os
from typing import Any, ClassVar
from typing import Any, ClassVar, cast
import uuid
from typing_extensions import Self
@@ -9,6 +9,7 @@ from typing_extensions import Self
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
@@ -83,6 +84,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
)
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.task import Task
class TraceCollectionListener(BaseEventListener):
@@ -567,79 +569,101 @@ class TraceCollectionListener(BaseEventListener):
self.batch_manager.end_event_processing()
def _create_trace_event(
self, event_type: str, source: Any, event: Any
self, event_type: str, source: Any, event: type[BaseEvent]
) -> TraceEvent:
"""Create a trace event"""
if hasattr(event, "timestamp") and event.timestamp:
trace_event = TraceEvent(
type=event_type,
timestamp=event.timestamp.isoformat(),
)
else:
trace_event = TraceEvent(
type=event_type,
)
trace_event = TraceEvent(
type=event_type,
timestamp=event.timestamp.isoformat(),
)
trace_event.event_data = self._build_event_data(event_type, event, source)
return trace_event
def _build_event_data(
self, event_type: str, event: Any, source: Any
self, event_type: str, event: type[BaseEvent], source: Any
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return safe_serialize_to_dict(event)
if event_type == "task_started":
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": event.task.name or event.task.description,
"context": event.context,
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
if event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
if event.task
else None,
"task_id": str(event.task.id) if event.task else None,
"output_raw": event.output.raw if event.output else None,
"output_format": str(event.output.output_format)
if event.output
else None,
"agent_role": event.output.agent if event.output else None,
}
if event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
if event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
if event_type == "llm_call_started":
event_data = safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name
else None
)
return event_data
if event_type == "llm_call_completed":
return safe_serialize_to_dict(event)
event_data = None
return {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
if event_type not in self.complex_events:
event_data = safe_serialize_to_dict(event)
elif event_type == "task_started":
task_started_event: TaskStartedEvent = cast(TaskStartedEvent, event)
event_data = {
"task_description": cast(Task, task_started_event.task).description,
"expected_output": cast(Task, task_started_event.task).expected_output,
"task_name": cast(Task, task_started_event.task).name
or cast(Task, task_started_event.task).description,
"context": task_started_event.context,
"agent_role": source.agent.role,
"task_id": str(cast(Task, task_started_event.task).id),
}
elif event_type == "task_completed":
task_completed_event: TaskCompletedEvent = cast(TaskCompletedEvent, event)
event_data = {
"task_description": cast(Task, task_completed_event.task).description
if cast(Task, task_completed_event.task)
else None,
"task_name": cast(Task, task_completed_event.task).name
or cast(Task, task_completed_event.task).description
if task_completed_event.task
else None,
"task_id": str(cast(Task, task_completed_event.task).id)
if cast(Task, task_completed_event.task)
else None,
"output_raw": task_completed_event.output.raw
if task_completed_event.output
else None,
"output_format": str(task_completed_event.output.output_format)
if task_completed_event.output
else None,
"agent_role": task_completed_event.output.agent
if task_completed_event.output
else None,
}
elif event_type == "agent_execution_started":
agent_execution_started_event: AgentExecutionStartedEvent = cast(
AgentExecutionStartedEvent, event
)
event_data = {
"agent_role": agent_execution_started_event.agent.role,
"agent_goal": agent_execution_started_event.agent.goal,
"agent_backstory": agent_execution_started_event.agent.backstory,
}
elif event_type == "agent_execution_completed":
agent_execution_completed_event: AgentExecutionCompletedEvent = cast(
AgentExecutionCompletedEvent, event
)
event_data = {
"agent_role": agent_execution_completed_event.agent.role,
"agent_goal": agent_execution_completed_event.agent.goal,
"agent_backstory": agent_execution_completed_event.agent.backstory,
}
elif event_type == "llm_call_started":
llm_call_started_event: LLMCallStartedEvent = cast(
LLMCallStartedEvent, event
)
event_data = safe_serialize_to_dict(event)
event_data["task_name"] = llm_call_started_event.task_name
elif event_type == "llm_call_completed":
llm_call_completed_event: LLMCallCompletedEvent = cast(
LLMCallCompletedEvent, event
)
event_data = safe_serialize_to_dict(event)
event_data["task_name"] = llm_call_completed_event.task_name
else:
event_data = {
"event_type": event_type,
"event": safe_serialize_to_dict(event),
"source": source,
}
if "timestamp" not in event_data.keys():
event_data["timestamp"] = event.timestamp.isoformat()
return event_data
def _show_tracing_disabled_message(self) -> None:
"""Show a message when tracing is disabled."""