mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-18 04:27:32 +00:00
feat: register traces
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Trace collection listener for orchestrating trace collection."""
|
||||
|
||||
import os
|
||||
from typing import Any, ClassVar, cast
|
||||
from typing import Any, ClassVar
|
||||
import uuid
|
||||
|
||||
from typing_extensions import Self
|
||||
@@ -18,6 +18,32 @@ from crewai.events.listeners.tracing.types import TraceEvent
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
safe_serialize_to_dict,
|
||||
)
|
||||
from crewai.events.types.a2a_events import (
|
||||
A2AAgentCardFetchedEvent,
|
||||
A2AArtifactReceivedEvent,
|
||||
A2AAuthenticationFailedEvent,
|
||||
A2AConnectionErrorEvent,
|
||||
A2AConversationCompletedEvent,
|
||||
A2AConversationStartedEvent,
|
||||
A2ADelegationCompletedEvent,
|
||||
A2ADelegationStartedEvent,
|
||||
A2AMessageSentEvent,
|
||||
A2AParallelDelegationCompletedEvent,
|
||||
A2AParallelDelegationStartedEvent,
|
||||
A2APollingStartedEvent,
|
||||
A2APollingStatusEvent,
|
||||
A2APushNotificationReceivedEvent,
|
||||
A2APushNotificationRegisteredEvent,
|
||||
A2APushNotificationSentEvent,
|
||||
A2APushNotificationTimeoutEvent,
|
||||
A2AResponseReceivedEvent,
|
||||
A2AServerTaskCanceledEvent,
|
||||
A2AServerTaskCompletedEvent,
|
||||
A2AServerTaskFailedEvent,
|
||||
A2AServerTaskStartedEvent,
|
||||
A2AStreamingChunkEvent,
|
||||
A2AStreamingStartedEvent,
|
||||
)
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
@@ -105,7 +131,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
"""Create or return singleton instance."""
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cast(Self, cls._instance)
|
||||
return cls._instance
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -160,6 +186,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
self._register_flow_event_handlers(crewai_event_bus)
|
||||
self._register_context_event_handlers(crewai_event_bus)
|
||||
self._register_action_event_handlers(crewai_event_bus)
|
||||
self._register_a2a_event_handlers(crewai_event_bus)
|
||||
self._register_system_event_handlers(crewai_event_bus)
|
||||
|
||||
self._listeners_setup = True
|
||||
@@ -439,6 +466,147 @@ class TraceCollectionListener(BaseEventListener):
|
||||
) -> None:
|
||||
self._handle_action_event("knowledge_query_failed", source, event)
|
||||
|
||||
def _register_a2a_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
|
||||
"""Register handlers for A2A (Agent-to-Agent) events."""
|
||||
|
||||
@event_bus.on(A2ADelegationStartedEvent)
|
||||
def on_a2a_delegation_started(
|
||||
source: Any, event: A2ADelegationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_delegation_started", source, event)
|
||||
|
||||
@event_bus.on(A2ADelegationCompletedEvent)
|
||||
def on_a2a_delegation_completed(
|
||||
source: Any, event: A2ADelegationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_delegation_completed", source, event)
|
||||
|
||||
@event_bus.on(A2AConversationStartedEvent)
|
||||
def on_a2a_conversation_started(
|
||||
source: Any, event: A2AConversationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_conversation_started", source, event)
|
||||
|
||||
@event_bus.on(A2AMessageSentEvent)
|
||||
def on_a2a_message_sent(source: Any, event: A2AMessageSentEvent) -> None:
|
||||
self._handle_action_event("a2a_message_sent", source, event)
|
||||
|
||||
@event_bus.on(A2AResponseReceivedEvent)
|
||||
def on_a2a_response_received(
|
||||
source: Any, event: A2AResponseReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_response_received", source, event)
|
||||
|
||||
@event_bus.on(A2AConversationCompletedEvent)
|
||||
def on_a2a_conversation_completed(
|
||||
source: Any, event: A2AConversationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_conversation_completed", source, event)
|
||||
|
||||
@event_bus.on(A2APollingStartedEvent)
|
||||
def on_a2a_polling_started(source: Any, event: A2APollingStartedEvent) -> None:
|
||||
self._handle_action_event("a2a_polling_started", source, event)
|
||||
|
||||
@event_bus.on(A2APollingStatusEvent)
|
||||
def on_a2a_polling_status(source: Any, event: A2APollingStatusEvent) -> None:
|
||||
self._handle_action_event("a2a_polling_status", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationRegisteredEvent)
|
||||
def on_a2a_push_notification_registered(
|
||||
source: Any, event: A2APushNotificationRegisteredEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_registered", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationReceivedEvent)
|
||||
def on_a2a_push_notification_received(
|
||||
source: Any, event: A2APushNotificationReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_received", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationSentEvent)
|
||||
def on_a2a_push_notification_sent(
|
||||
source: Any, event: A2APushNotificationSentEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_sent", source, event)
|
||||
|
||||
@event_bus.on(A2APushNotificationTimeoutEvent)
|
||||
def on_a2a_push_notification_timeout(
|
||||
source: Any, event: A2APushNotificationTimeoutEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_push_notification_timeout", source, event)
|
||||
|
||||
@event_bus.on(A2AStreamingStartedEvent)
|
||||
def on_a2a_streaming_started(
|
||||
source: Any, event: A2AStreamingStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_streaming_started", source, event)
|
||||
|
||||
@event_bus.on(A2AStreamingChunkEvent)
|
||||
def on_a2a_streaming_chunk(source: Any, event: A2AStreamingChunkEvent) -> None:
|
||||
self._handle_action_event("a2a_streaming_chunk", source, event)
|
||||
|
||||
@event_bus.on(A2AAgentCardFetchedEvent)
|
||||
def on_a2a_agent_card_fetched(
|
||||
source: Any, event: A2AAgentCardFetchedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_agent_card_fetched", source, event)
|
||||
|
||||
@event_bus.on(A2AAuthenticationFailedEvent)
|
||||
def on_a2a_authentication_failed(
|
||||
source: Any, event: A2AAuthenticationFailedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_authentication_failed", source, event)
|
||||
|
||||
@event_bus.on(A2AArtifactReceivedEvent)
|
||||
def on_a2a_artifact_received(
|
||||
source: Any, event: A2AArtifactReceivedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_artifact_received", source, event)
|
||||
|
||||
@event_bus.on(A2AConnectionErrorEvent)
|
||||
def on_a2a_connection_error(
|
||||
source: Any, event: A2AConnectionErrorEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_connection_error", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskStartedEvent)
|
||||
def on_a2a_server_task_started(
|
||||
source: Any, event: A2AServerTaskStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_started", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskCompletedEvent)
|
||||
def on_a2a_server_task_completed(
|
||||
source: Any, event: A2AServerTaskCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_completed", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskCanceledEvent)
|
||||
def on_a2a_server_task_canceled(
|
||||
source: Any, event: A2AServerTaskCanceledEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_canceled", source, event)
|
||||
|
||||
@event_bus.on(A2AServerTaskFailedEvent)
|
||||
def on_a2a_server_task_failed(
|
||||
source: Any, event: A2AServerTaskFailedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_server_task_failed", source, event)
|
||||
|
||||
@event_bus.on(A2AParallelDelegationStartedEvent)
|
||||
def on_a2a_parallel_delegation_started(
|
||||
source: Any, event: A2AParallelDelegationStartedEvent
|
||||
) -> None:
|
||||
self._handle_action_event("a2a_parallel_delegation_started", source, event)
|
||||
|
||||
@event_bus.on(A2AParallelDelegationCompletedEvent)
|
||||
def on_a2a_parallel_delegation_completed(
|
||||
source: Any, event: A2AParallelDelegationCompletedEvent
|
||||
) -> None:
|
||||
self._handle_action_event(
|
||||
"a2a_parallel_delegation_completed", source, event
|
||||
)
|
||||
|
||||
def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
|
||||
"""Register handlers for system signal events (SIGTERM, SIGINT, etc.)."""
|
||||
|
||||
@@ -570,10 +738,15 @@ class TraceCollectionListener(BaseEventListener):
|
||||
if event_type not in self.complex_events:
|
||||
return safe_serialize_to_dict(event)
|
||||
if event_type == "task_started":
|
||||
task_name = event.task.name or event.task.description
|
||||
task_display_name = (
|
||||
task_name[:80] + "..." if len(task_name) > 80 else task_name
|
||||
)
|
||||
return {
|
||||
"task_description": event.task.description,
|
||||
"expected_output": event.task.expected_output,
|
||||
"task_name": event.task.name or event.task.description,
|
||||
"task_name": task_name,
|
||||
"task_display_name": task_display_name,
|
||||
"context": event.context,
|
||||
"agent_role": source.agent.role,
|
||||
"task_id": str(event.task.id),
|
||||
|
||||
@@ -26,9 +26,13 @@ def mock_agent() -> MagicMock:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task() -> MagicMock:
|
||||
def mock_task(mock_context: MagicMock) -> MagicMock:
|
||||
"""Create a mock Task."""
|
||||
return MagicMock()
|
||||
task = MagicMock()
|
||||
task.id = mock_context.task_id
|
||||
task.name = "Mock Task"
|
||||
task.description = "Mock task description"
|
||||
return task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
Reference in New Issue
Block a user