diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 6e7fba0ef..a4d4cbd31 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,7 +1,7 @@ """Trace collection listener for orchestrating trace collection.""" import os -from typing import Any, ClassVar, cast +from typing import Any, ClassVar import uuid from typing_extensions import Self @@ -18,6 +18,32 @@ from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.utils import ( safe_serialize_to_dict, ) +from crewai.events.types.a2a_events import ( + A2AAgentCardFetchedEvent, + A2AArtifactReceivedEvent, + A2AAuthenticationFailedEvent, + A2AConnectionErrorEvent, + A2AConversationCompletedEvent, + A2AConversationStartedEvent, + A2ADelegationCompletedEvent, + A2ADelegationStartedEvent, + A2AMessageSentEvent, + A2AParallelDelegationCompletedEvent, + A2AParallelDelegationStartedEvent, + A2APollingStartedEvent, + A2APollingStatusEvent, + A2APushNotificationReceivedEvent, + A2APushNotificationRegisteredEvent, + A2APushNotificationSentEvent, + A2APushNotificationTimeoutEvent, + A2AResponseReceivedEvent, + A2AServerTaskCanceledEvent, + A2AServerTaskCompletedEvent, + A2AServerTaskFailedEvent, + A2AServerTaskStartedEvent, + A2AStreamingChunkEvent, + A2AStreamingStartedEvent, +) from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, @@ -105,7 +131,7 @@ class TraceCollectionListener(BaseEventListener): """Create or return singleton instance.""" if cls._instance is None: cls._instance = super().__new__(cls) - return cast(Self, cls._instance) + return cls._instance def __init__( self, @@ -160,6 +186,7 @@ class TraceCollectionListener(BaseEventListener): self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) self._register_action_event_handlers(crewai_event_bus) + self._register_a2a_event_handlers(crewai_event_bus) self._register_system_event_handlers(crewai_event_bus) self._listeners_setup = True @@ -439,6 +466,147 @@ class TraceCollectionListener(BaseEventListener): ) -> None: self._handle_action_event("knowledge_query_failed", source, event) + def _register_a2a_event_handlers(self, event_bus: CrewAIEventsBus) -> None: + """Register handlers for A2A (Agent-to-Agent) events.""" + + @event_bus.on(A2ADelegationStartedEvent) + def on_a2a_delegation_started( + source: Any, event: A2ADelegationStartedEvent + ) -> None: + self._handle_action_event("a2a_delegation_started", source, event) + + @event_bus.on(A2ADelegationCompletedEvent) + def on_a2a_delegation_completed( + source: Any, event: A2ADelegationCompletedEvent + ) -> None: + self._handle_action_event("a2a_delegation_completed", source, event) + + @event_bus.on(A2AConversationStartedEvent) + def on_a2a_conversation_started( + source: Any, event: A2AConversationStartedEvent + ) -> None: + self._handle_action_event("a2a_conversation_started", source, event) + + @event_bus.on(A2AMessageSentEvent) + def on_a2a_message_sent(source: Any, event: A2AMessageSentEvent) -> None: + self._handle_action_event("a2a_message_sent", source, event) + + @event_bus.on(A2AResponseReceivedEvent) + def on_a2a_response_received( + source: Any, event: A2AResponseReceivedEvent + ) -> None: + self._handle_action_event("a2a_response_received", source, event) + + @event_bus.on(A2AConversationCompletedEvent) + def on_a2a_conversation_completed( + source: Any, event: A2AConversationCompletedEvent + ) -> None: + self._handle_action_event("a2a_conversation_completed", source, event) + + @event_bus.on(A2APollingStartedEvent) + def on_a2a_polling_started(source: Any, event: A2APollingStartedEvent) -> None: + self._handle_action_event("a2a_polling_started", source, event) + + @event_bus.on(A2APollingStatusEvent) + def on_a2a_polling_status(source: Any, event: A2APollingStatusEvent) -> None: + self._handle_action_event("a2a_polling_status", source, event) + + @event_bus.on(A2APushNotificationRegisteredEvent) + def on_a2a_push_notification_registered( + source: Any, event: A2APushNotificationRegisteredEvent + ) -> None: + self._handle_action_event("a2a_push_notification_registered", source, event) + + @event_bus.on(A2APushNotificationReceivedEvent) + def on_a2a_push_notification_received( + source: Any, event: A2APushNotificationReceivedEvent + ) -> None: + self._handle_action_event("a2a_push_notification_received", source, event) + + @event_bus.on(A2APushNotificationSentEvent) + def on_a2a_push_notification_sent( + source: Any, event: A2APushNotificationSentEvent + ) -> None: + self._handle_action_event("a2a_push_notification_sent", source, event) + + @event_bus.on(A2APushNotificationTimeoutEvent) + def on_a2a_push_notification_timeout( + source: Any, event: A2APushNotificationTimeoutEvent + ) -> None: + self._handle_action_event("a2a_push_notification_timeout", source, event) + + @event_bus.on(A2AStreamingStartedEvent) + def on_a2a_streaming_started( + source: Any, event: A2AStreamingStartedEvent + ) -> None: + self._handle_action_event("a2a_streaming_started", source, event) + + @event_bus.on(A2AStreamingChunkEvent) + def on_a2a_streaming_chunk(source: Any, event: A2AStreamingChunkEvent) -> None: + self._handle_action_event("a2a_streaming_chunk", source, event) + + @event_bus.on(A2AAgentCardFetchedEvent) + def on_a2a_agent_card_fetched( + source: Any, event: A2AAgentCardFetchedEvent + ) -> None: + self._handle_action_event("a2a_agent_card_fetched", source, event) + + @event_bus.on(A2AAuthenticationFailedEvent) + def on_a2a_authentication_failed( + source: Any, event: A2AAuthenticationFailedEvent + ) -> None: + self._handle_action_event("a2a_authentication_failed", source, event) + + @event_bus.on(A2AArtifactReceivedEvent) + def on_a2a_artifact_received( + source: Any, event: A2AArtifactReceivedEvent + ) -> None: + self._handle_action_event("a2a_artifact_received", source, event) + + @event_bus.on(A2AConnectionErrorEvent) + def on_a2a_connection_error( + source: Any, event: A2AConnectionErrorEvent + ) -> None: + self._handle_action_event("a2a_connection_error", source, event) + + @event_bus.on(A2AServerTaskStartedEvent) + def on_a2a_server_task_started( + source: Any, event: A2AServerTaskStartedEvent + ) -> None: + self._handle_action_event("a2a_server_task_started", source, event) + + @event_bus.on(A2AServerTaskCompletedEvent) + def on_a2a_server_task_completed( + source: Any, event: A2AServerTaskCompletedEvent + ) -> None: + self._handle_action_event("a2a_server_task_completed", source, event) + + @event_bus.on(A2AServerTaskCanceledEvent) + def on_a2a_server_task_canceled( + source: Any, event: A2AServerTaskCanceledEvent + ) -> None: + self._handle_action_event("a2a_server_task_canceled", source, event) + + @event_bus.on(A2AServerTaskFailedEvent) + def on_a2a_server_task_failed( + source: Any, event: A2AServerTaskFailedEvent + ) -> None: + self._handle_action_event("a2a_server_task_failed", source, event) + + @event_bus.on(A2AParallelDelegationStartedEvent) + def on_a2a_parallel_delegation_started( + source: Any, event: A2AParallelDelegationStartedEvent + ) -> None: + self._handle_action_event("a2a_parallel_delegation_started", source, event) + + @event_bus.on(A2AParallelDelegationCompletedEvent) + def on_a2a_parallel_delegation_completed( + source: Any, event: A2AParallelDelegationCompletedEvent + ) -> None: + self._handle_action_event( + "a2a_parallel_delegation_completed", source, event + ) + def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None: """Register handlers for system signal events (SIGTERM, SIGINT, etc.).""" @@ -570,10 +738,15 @@ class TraceCollectionListener(BaseEventListener): if event_type not in self.complex_events: return safe_serialize_to_dict(event) if event_type == "task_started": + task_name = event.task.name or event.task.description + task_display_name = ( + task_name[:80] + "..." if len(task_name) > 80 else task_name + ) return { "task_description": event.task.description, "expected_output": event.task.expected_output, - "task_name": event.task.name or event.task.description, + "task_name": task_name, + "task_display_name": task_display_name, "context": event.context, "agent_role": source.agent.role, "task_id": str(event.task.id), diff --git a/lib/crewai/tests/a2a/utils/test_task.py b/lib/crewai/tests/a2a/utils/test_task.py index 7c26eec22..3c3f8865e 100644 --- a/lib/crewai/tests/a2a/utils/test_task.py +++ b/lib/crewai/tests/a2a/utils/test_task.py @@ -26,9 +26,13 @@ def mock_agent() -> MagicMock: @pytest.fixture -def mock_task() -> MagicMock: +def mock_task(mock_context: MagicMock) -> MagicMock: """Create a mock Task.""" - return MagicMock() + task = MagicMock() + task.id = mock_context.task_id + task.name = "Mock Task" + task.description = "Mock task description" + return task @pytest.fixture