diff --git a/lib/crewai/src/crewai/a2a/task_helpers.py b/lib/crewai/src/crewai/a2a/task_helpers.py index 9a84a1ffc..1b513612a 100644 --- a/lib/crewai/src/crewai/a2a/task_helpers.py +++ b/lib/crewai/src/crewai/a2a/task_helpers.py @@ -3,9 +3,10 @@ from __future__ import annotations from collections.abc import AsyncIterator -from typing import TYPE_CHECKING, TypedDict +from typing import TYPE_CHECKING, Any, TypedDict import uuid +from a2a.client.errors import A2AClientHTTPError from a2a.types import ( AgentCard, Message, @@ -20,7 +21,10 @@ from a2a.types import ( from typing_extensions import NotRequired from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.a2a_events import A2AResponseReceivedEvent +from crewai.events.types.a2a_events import ( + A2AConnectionErrorEvent, + A2AResponseReceivedEvent, +) if TYPE_CHECKING: @@ -55,7 +59,8 @@ class TaskStateResult(TypedDict): history: list[Message] result: NotRequired[str] error: NotRequired[str] - agent_card: NotRequired[AgentCard] + agent_card: NotRequired[dict[str, Any]] + a2a_agent_name: NotRequired[str | None] def extract_task_result_parts(a2a_task: A2ATask) -> list[str]: @@ -131,50 +136,69 @@ def process_task_state( is_multiturn: bool, agent_role: str | None, result_parts: list[str] | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + from_task: Any | None = None, + from_agent: Any | None = None, + is_final: bool = True, ) -> TaskStateResult | None: """Process A2A task state and return result dictionary. Shared logic for both polling and streaming handlers. Args: - a2a_task: The A2A task to process - new_messages: List to collect messages (modified in place) - agent_card: The agent card - turn_number: Current turn number - is_multiturn: Whether multi-turn conversation - agent_role: Agent role for logging + a2a_task: The A2A task to process. + new_messages: List to collect messages (modified in place). + agent_card: The agent card. + turn_number: Current turn number. + is_multiturn: Whether multi-turn conversation. + agent_role: Agent role for logging. result_parts: Accumulated result parts (streaming passes accumulated, - polling passes None to extract from task) + polling passes None to extract from task). + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + from_task: Optional CrewAI Task for event metadata. + from_agent: Optional CrewAI Agent for event metadata. + is_final: Whether this is the final response in the stream. Returns: - Result dictionary if terminal/actionable state, None otherwise + Result dictionary if terminal/actionable state, None otherwise. """ - should_extract = result_parts is None if result_parts is None: result_parts = [] if a2a_task.status.state == TaskState.completed: - if should_extract: + if not result_parts: extracted_parts = extract_task_result_parts(a2a_task) result_parts.extend(extracted_parts) if a2a_task.history: new_messages.extend(a2a_task.history) response_text = " ".join(result_parts) if result_parts else "" + message_id = None + if a2a_task.status and a2a_task.status.message: + message_id = a2a_task.status.message.message_id crewai_event_bus.emit( None, A2AResponseReceivedEvent( response=response_text, turn_number=turn_number, + context_id=a2a_task.context_id, + message_id=message_id, is_multiturn=is_multiturn, status="completed", + final=is_final, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) return TaskStateResult( status=TaskState.completed, - agent_card=agent_card, + agent_card=agent_card.model_dump(exclude_none=True), result=response_text, history=new_messages, ) @@ -194,14 +218,24 @@ def process_task_state( ) new_messages.append(agent_message) + input_message_id = None + if a2a_task.status and a2a_task.status.message: + input_message_id = a2a_task.status.message.message_id crewai_event_bus.emit( None, A2AResponseReceivedEvent( response=response_text, turn_number=turn_number, + context_id=a2a_task.context_id, + message_id=input_message_id, is_multiturn=is_multiturn, status="input_required", + final=is_final, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -209,7 +243,7 @@ def process_task_state( status=TaskState.input_required, error=response_text, history=new_messages, - agent_card=agent_card, + agent_card=agent_card.model_dump(exclude_none=True), ) if a2a_task.status.state in {TaskState.failed, TaskState.rejected}: @@ -248,6 +282,11 @@ async def send_message_and_get_task_id( turn_number: int, is_multiturn: bool, agent_role: str | None, + from_task: Any | None = None, + from_agent: Any | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + context_id: str | None = None, ) -> str | TaskStateResult: """Send message and process initial response. @@ -262,6 +301,11 @@ async def send_message_and_get_task_id( turn_number: Current turn number is_multiturn: Whether multi-turn conversation agent_role: Agent role for logging + from_task: Optional CrewAI Task object for event metadata. + from_agent: Optional CrewAI Agent object for event metadata. + endpoint: Optional A2A endpoint URL. + a2a_agent_name: Optional A2A agent name. + context_id: Optional A2A context ID for correlation. Returns: Task ID string if agent needs polling/waiting, or TaskStateResult if done. @@ -280,9 +324,16 @@ async def send_message_and_get_task_id( A2AResponseReceivedEvent( response=response_text, turn_number=turn_number, + context_id=event.context_id, + message_id=event.message_id, is_multiturn=is_multiturn, status="completed", + final=True, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -290,7 +341,7 @@ async def send_message_and_get_task_id( status=TaskState.completed, result=response_text, history=new_messages, - agent_card=agent_card, + agent_card=agent_card.model_dump(exclude_none=True), ) if isinstance(event, tuple): @@ -304,6 +355,10 @@ async def send_message_and_get_task_id( turn_number=turn_number, is_multiturn=is_multiturn, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ) if result: return result @@ -316,6 +371,99 @@ async def send_message_and_get_task_id( history=new_messages, ) + except A2AClientHTTPError as e: + error_msg = f"HTTP Error {e.status_code}: {e!s}" + + error_message = Message( + role=Role.agent, + message_id=str(uuid.uuid4()), + parts=[Part(root=TextPart(text=error_msg))], + context_id=context_id, + ) + new_messages.append(error_message) + + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="http_error", + status_code=e.status_code, + a2a_agent_name=a2a_agent_name, + operation="send_message", + context_id=context_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + None, + A2AResponseReceivedEvent( + response=error_msg, + turn_number=turn_number, + context_id=context_id, + is_multiturn=is_multiturn, + status="failed", + final=True, + agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + ), + ) + return TaskStateResult( + status=TaskState.failed, + error=error_msg, + history=new_messages, + ) + + except Exception as e: + error_msg = f"Unexpected error during send_message: {e!s}" + + error_message = Message( + role=Role.agent, + message_id=str(uuid.uuid4()), + parts=[Part(root=TextPart(text=error_msg))], + context_id=context_id, + ) + new_messages.append(error_message) + + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="unexpected_error", + a2a_agent_name=a2a_agent_name, + operation="send_message", + context_id=context_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + None, + A2AResponseReceivedEvent( + response=error_msg, + turn_number=turn_number, + context_id=context_id, + is_multiturn=is_multiturn, + status="failed", + final=True, + agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + ), + ) + return TaskStateResult( + status=TaskState.failed, + error=error_msg, + history=new_messages, + ) + finally: aclose = getattr(event_stream, "aclose", None) if aclose: diff --git a/lib/crewai/src/crewai/a2a/updates/base.py b/lib/crewai/src/crewai/a2a/updates/base.py index a1d859837..f81edf0bf 100644 --- a/lib/crewai/src/crewai/a2a/updates/base.py +++ b/lib/crewai/src/crewai/a2a/updates/base.py @@ -22,6 +22,13 @@ class BaseHandlerKwargs(TypedDict, total=False): turn_number: int is_multiturn: bool agent_role: str | None + context_id: str | None + task_id: str | None + endpoint: str | None + agent_branch: Any + a2a_agent_name: str | None + from_task: Any + from_agent: Any class PollingHandlerKwargs(BaseHandlerKwargs, total=False): @@ -29,8 +36,6 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False): polling_interval: float polling_timeout: float - endpoint: str - agent_branch: Any history_length: int max_polls: int | None @@ -38,9 +43,6 @@ class PollingHandlerKwargs(BaseHandlerKwargs, total=False): class StreamingHandlerKwargs(BaseHandlerKwargs, total=False): """Kwargs for streaming handler.""" - context_id: str | None - task_id: str | None - class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False): """Kwargs for push notification handler.""" @@ -49,7 +51,6 @@ class PushNotificationHandlerKwargs(BaseHandlerKwargs, total=False): result_store: PushNotificationResultStore polling_timeout: float polling_interval: float - agent_branch: Any class PushNotificationResultStore(Protocol): diff --git a/lib/crewai/src/crewai/a2a/updates/polling/handler.py b/lib/crewai/src/crewai/a2a/updates/polling/handler.py index e0518be9b..3981e554b 100644 --- a/lib/crewai/src/crewai/a2a/updates/polling/handler.py +++ b/lib/crewai/src/crewai/a2a/updates/polling/handler.py @@ -31,6 +31,7 @@ from crewai.a2a.task_helpers import ( from crewai.a2a.updates.base import PollingHandlerKwargs from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( + A2AConnectionErrorEvent, A2APollingStartedEvent, A2APollingStatusEvent, A2AResponseReceivedEvent, @@ -49,23 +50,33 @@ async def _poll_task_until_complete( agent_branch: Any | None = None, history_length: int = 100, max_polls: int | None = None, + from_task: Any | None = None, + from_agent: Any | None = None, + context_id: str | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, ) -> A2ATask: """Poll task status until terminal state reached. Args: - client: A2A client instance - task_id: Task ID to poll - polling_interval: Seconds between poll attempts - polling_timeout: Max seconds before timeout - agent_branch: Agent tree branch for logging - history_length: Number of messages to retrieve per poll - max_polls: Max number of poll attempts (None = unlimited) + client: A2A client instance. + task_id: Task ID to poll. + polling_interval: Seconds between poll attempts. + polling_timeout: Max seconds before timeout. + agent_branch: Agent tree branch for logging. + history_length: Number of messages to retrieve per poll. + max_polls: Max number of poll attempts (None = unlimited). + from_task: Optional CrewAI Task object for event metadata. + from_agent: Optional CrewAI Agent object for event metadata. + context_id: A2A context ID for correlation. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. Returns: - Final task object in terminal state + Final task object in terminal state. Raises: - A2APollingTimeoutError: If polling exceeds timeout or max_polls + A2APollingTimeoutError: If polling exceeds timeout or max_polls. """ start_time = time.monotonic() poll_count = 0 @@ -77,13 +88,19 @@ async def _poll_task_until_complete( ) elapsed = time.monotonic() - start_time + effective_context_id = task.context_id or context_id crewai_event_bus.emit( agent_branch, A2APollingStatusEvent( task_id=task_id, + context_id=effective_context_id, state=str(task.status.state.value) if task.status.state else "unknown", elapsed_seconds=elapsed, poll_count=poll_count, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -137,6 +154,9 @@ class PollingHandler: max_polls = kwargs.get("max_polls") context_id = kwargs.get("context_id") task_id = kwargs.get("task_id") + a2a_agent_name = kwargs.get("a2a_agent_name") + from_task = kwargs.get("from_task") + from_agent = kwargs.get("from_agent") try: result_or_task_id = await send_message_and_get_task_id( @@ -146,6 +166,11 @@ class PollingHandler: turn_number=turn_number, is_multiturn=is_multiturn, agent_role=agent_role, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + context_id=context_id, ) if not isinstance(result_or_task_id, str): @@ -157,8 +182,12 @@ class PollingHandler: agent_branch, A2APollingStartedEvent( task_id=task_id, + context_id=context_id, polling_interval=polling_interval, endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -170,6 +199,11 @@ class PollingHandler: agent_branch=agent_branch, history_length=history_length, max_polls=max_polls, + from_task=from_task, + from_agent=from_agent, + context_id=context_id, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, ) result = process_task_state( @@ -179,6 +213,10 @@ class PollingHandler: turn_number=turn_number, is_multiturn=is_multiturn, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ) if result: return result @@ -206,9 +244,15 @@ class PollingHandler: A2AResponseReceivedEvent( response=error_msg, turn_number=turn_number, + context_id=context_id, is_multiturn=is_multiturn, status="failed", + final=True, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) return TaskStateResult( @@ -229,14 +273,83 @@ class PollingHandler: ) new_messages.append(error_message) + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint, + error=str(e), + error_type="http_error", + status_code=e.status_code, + a2a_agent_name=a2a_agent_name, + operation="polling", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) crewai_event_bus.emit( agent_branch, A2AResponseReceivedEvent( response=error_msg, turn_number=turn_number, + context_id=context_id, is_multiturn=is_multiturn, status="failed", + final=True, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + ), + ) + return TaskStateResult( + status=TaskState.failed, + error=error_msg, + history=new_messages, + ) + + except Exception as e: + error_msg = f"Unexpected error during polling: {e!s}" + + error_message = Message( + role=Role.agent, + message_id=str(uuid.uuid4()), + parts=[Part(root=TextPart(text=error_msg))], + context_id=context_id, + task_id=task_id, + ) + new_messages.append(error_message) + + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="unexpected_error", + a2a_agent_name=a2a_agent_name, + operation="polling", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + agent_branch, + A2AResponseReceivedEvent( + response=error_msg, + turn_number=turn_number, + context_id=context_id, + is_multiturn=is_multiturn, + status="failed", + final=True, + agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) return TaskStateResult( diff --git a/lib/crewai/src/crewai/a2a/updates/push_notifications/handler.py b/lib/crewai/src/crewai/a2a/updates/push_notifications/handler.py index 04db239f2..b2bddf8f1 100644 --- a/lib/crewai/src/crewai/a2a/updates/push_notifications/handler.py +++ b/lib/crewai/src/crewai/a2a/updates/push_notifications/handler.py @@ -29,6 +29,7 @@ from crewai.a2a.updates.base import ( ) from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( + A2AConnectionErrorEvent, A2APushNotificationRegisteredEvent, A2APushNotificationTimeoutEvent, A2AResponseReceivedEvent, @@ -48,6 +49,11 @@ async def _wait_for_push_result( timeout: float, poll_interval: float, agent_branch: Any | None = None, + from_task: Any | None = None, + from_agent: Any | None = None, + context_id: str | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, ) -> A2ATask | None: """Wait for push notification result. @@ -57,6 +63,11 @@ async def _wait_for_push_result( timeout: Max seconds to wait. poll_interval: Seconds between polling attempts. agent_branch: Agent tree branch for logging. + from_task: Optional CrewAI Task object for event metadata. + from_agent: Optional CrewAI Agent object for event metadata. + context_id: A2A context ID for correlation. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent. Returns: Final task object, or None if timeout. @@ -72,7 +83,12 @@ async def _wait_for_push_result( agent_branch, A2APushNotificationTimeoutEvent( task_id=task_id, + context_id=context_id, timeout_seconds=timeout, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -115,18 +131,56 @@ class PushNotificationHandler: agent_role = kwargs.get("agent_role") context_id = kwargs.get("context_id") task_id = kwargs.get("task_id") + endpoint = kwargs.get("endpoint") + a2a_agent_name = kwargs.get("a2a_agent_name") + from_task = kwargs.get("from_task") + from_agent = kwargs.get("from_agent") if config is None: + error_msg = ( + "PushNotificationConfig is required for push notification handler" + ) + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=error_msg, + error_type="configuration_error", + a2a_agent_name=a2a_agent_name, + operation="push_notification", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) return TaskStateResult( status=TaskState.failed, - error="PushNotificationConfig is required for push notification handler", + error=error_msg, history=new_messages, ) if result_store is None: + error_msg = ( + "PushNotificationResultStore is required for push notification handler" + ) + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=error_msg, + error_type="configuration_error", + a2a_agent_name=a2a_agent_name, + operation="push_notification", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) return TaskStateResult( status=TaskState.failed, - error="PushNotificationResultStore is required for push notification handler", + error=error_msg, history=new_messages, ) @@ -138,6 +192,11 @@ class PushNotificationHandler: turn_number=turn_number, is_multiturn=is_multiturn, agent_role=agent_role, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + context_id=context_id, ) if not isinstance(result_or_task_id, str): @@ -149,7 +208,12 @@ class PushNotificationHandler: agent_branch, A2APushNotificationRegisteredEvent( task_id=task_id, + context_id=context_id, callback_url=str(config.url), + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) @@ -165,6 +229,11 @@ class PushNotificationHandler: timeout=polling_timeout, poll_interval=polling_interval, agent_branch=agent_branch, + from_task=from_task, + from_agent=from_agent, + context_id=context_id, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, ) if final_task is None: @@ -181,6 +250,10 @@ class PushNotificationHandler: turn_number=turn_number, is_multiturn=is_multiturn, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ) if result: return result @@ -203,14 +276,83 @@ class PushNotificationHandler: ) new_messages.append(error_message) + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="http_error", + status_code=e.status_code, + a2a_agent_name=a2a_agent_name, + operation="push_notification", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) crewai_event_bus.emit( agent_branch, A2AResponseReceivedEvent( response=error_msg, turn_number=turn_number, + context_id=context_id, is_multiturn=is_multiturn, status="failed", + final=True, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + ), + ) + return TaskStateResult( + status=TaskState.failed, + error=error_msg, + history=new_messages, + ) + + except Exception as e: + error_msg = f"Unexpected error during push notification: {e!s}" + + error_message = Message( + role=Role.agent, + message_id=str(uuid.uuid4()), + parts=[Part(root=TextPart(text=error_msg))], + context_id=context_id, + task_id=task_id, + ) + new_messages.append(error_message) + + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="unexpected_error", + a2a_agent_name=a2a_agent_name, + operation="push_notification", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + agent_branch, + A2AResponseReceivedEvent( + response=error_msg, + turn_number=turn_number, + context_id=context_id, + is_multiturn=is_multiturn, + status="failed", + final=True, + agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) return TaskStateResult( diff --git a/lib/crewai/src/crewai/a2a/updates/streaming/handler.py b/lib/crewai/src/crewai/a2a/updates/streaming/handler.py index 93d2c4500..2bfe4dbed 100644 --- a/lib/crewai/src/crewai/a2a/updates/streaming/handler.py +++ b/lib/crewai/src/crewai/a2a/updates/streaming/handler.py @@ -26,7 +26,13 @@ from crewai.a2a.task_helpers import ( ) from crewai.a2a.updates.base import StreamingHandlerKwargs from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.a2a_events import A2AResponseReceivedEvent +from crewai.events.types.a2a_events import ( + A2AArtifactReceivedEvent, + A2AConnectionErrorEvent, + A2AResponseReceivedEvent, + A2AStreamingChunkEvent, + A2AStreamingStartedEvent, +) class StreamingHandler: @@ -57,19 +63,57 @@ class StreamingHandler: turn_number = kwargs.get("turn_number", 0) is_multiturn = kwargs.get("is_multiturn", False) agent_role = kwargs.get("agent_role") + endpoint = kwargs.get("endpoint") + a2a_agent_name = kwargs.get("a2a_agent_name") + from_task = kwargs.get("from_task") + from_agent = kwargs.get("from_agent") + agent_branch = kwargs.get("agent_branch") result_parts: list[str] = [] final_result: TaskStateResult | None = None event_stream = client.send_message(message) + chunk_index = 0 + + crewai_event_bus.emit( + agent_branch, + A2AStreamingStartedEvent( + task_id=task_id, + context_id=context_id, + endpoint=endpoint or "", + a2a_agent_name=a2a_agent_name, + turn_number=turn_number, + is_multiturn=is_multiturn, + agent_role=agent_role, + from_task=from_task, + from_agent=from_agent, + ), + ) try: async for event in event_stream: if isinstance(event, Message): new_messages.append(event) + message_context_id = event.context_id or context_id for part in event.parts: if part.root.kind == "text": text = part.root.text result_parts.append(text) + crewai_event_bus.emit( + agent_branch, + A2AStreamingChunkEvent( + task_id=event.task_id or task_id, + context_id=message_context_id, + chunk=text, + chunk_index=chunk_index, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + turn_number=turn_number, + is_multiturn=is_multiturn, + from_task=from_task, + from_agent=from_agent, + ), + ) + chunk_index += 1 elif isinstance(event, tuple): a2a_task, update = event @@ -81,10 +125,51 @@ class StreamingHandler: for part in artifact.parts if part.root.kind == "text" ) + artifact_size = None + if artifact.parts: + artifact_size = sum( + len(p.root.text.encode("utf-8")) + if p.root.kind == "text" + else len(getattr(p.root, "data", b"")) + for p in artifact.parts + ) + effective_context_id = a2a_task.context_id or context_id + crewai_event_bus.emit( + agent_branch, + A2AArtifactReceivedEvent( + task_id=a2a_task.id, + artifact_id=artifact.artifact_id, + artifact_name=artifact.name, + artifact_description=artifact.description, + mime_type=artifact.parts[0].root.kind + if artifact.parts + else None, + size_bytes=artifact_size, + append=update.append or False, + last_chunk=update.last_chunk or False, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + context_id=effective_context_id, + turn_number=turn_number, + is_multiturn=is_multiturn, + from_task=from_task, + from_agent=from_agent, + ), + ) is_final_update = False if isinstance(update, TaskStatusUpdateEvent): is_final_update = update.final + if ( + update.status + and update.status.message + and update.status.message.parts + ): + result_parts.extend( + part.root.text + for part in update.status.message.parts + if part.root.kind == "text" and part.root.text + ) if ( not is_final_update @@ -101,6 +186,11 @@ class StreamingHandler: is_multiturn=is_multiturn, agent_role=agent_role, result_parts=result_parts, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + is_final=is_final_update, ) if final_result: break @@ -118,13 +208,82 @@ class StreamingHandler: new_messages.append(error_message) crewai_event_bus.emit( - None, + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="http_error", + status_code=e.status_code, + a2a_agent_name=a2a_agent_name, + operation="streaming", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + agent_branch, A2AResponseReceivedEvent( response=error_msg, turn_number=turn_number, + context_id=context_id, is_multiturn=is_multiturn, status="failed", + final=True, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, + ), + ) + return TaskStateResult( + status=TaskState.failed, + error=error_msg, + history=new_messages, + ) + + except Exception as e: + error_msg = f"Unexpected error during streaming: {e!s}" + + error_message = Message( + role=Role.agent, + message_id=str(uuid.uuid4()), + parts=[Part(root=TextPart(text=error_msg))], + context_id=context_id, + task_id=task_id, + ) + new_messages.append(error_message) + + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(e), + error_type="unexpected_error", + a2a_agent_name=a2a_agent_name, + operation="streaming", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) + crewai_event_bus.emit( + agent_branch, + A2AResponseReceivedEvent( + response=error_msg, + turn_number=turn_number, + context_id=context_id, + is_multiturn=is_multiturn, + status="failed", + final=True, + agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + from_task=from_task, + from_agent=from_agent, ), ) return TaskStateResult( @@ -136,7 +295,23 @@ class StreamingHandler: finally: aclose = getattr(event_stream, "aclose", None) if aclose: - await aclose() + try: + await aclose() + except Exception as close_error: + crewai_event_bus.emit( + agent_branch, + A2AConnectionErrorEvent( + endpoint=endpoint or "", + error=str(close_error), + error_type="stream_close_error", + a2a_agent_name=a2a_agent_name, + operation="stream_close", + context_id=context_id, + task_id=task_id, + from_task=from_task, + from_agent=from_agent, + ), + ) if final_result: return final_result @@ -145,5 +320,5 @@ class StreamingHandler: status=TaskState.completed, result=" ".join(result_parts) if result_parts else "", history=new_messages, - agent_card=agent_card, + agent_card=agent_card.model_dump(exclude_none=True), ) diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py index 7c798dc1a..a21bfefdb 100644 --- a/lib/crewai/src/crewai/a2a/utils/agent_card.py +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -23,6 +23,12 @@ from crewai.a2a.auth.utils import ( ) from crewai.a2a.config import A2AServerConfig from crewai.crew import Crew +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.a2a_events import ( + A2AAgentCardFetchedEvent, + A2AAuthenticationFailedEvent, + A2AConnectionErrorEvent, +) if TYPE_CHECKING: @@ -183,6 +189,8 @@ async def _afetch_agent_card_impl( timeout: int, ) -> AgentCard: """Internal async implementation of AgentCard fetching.""" + start_time = time.perf_counter() + if "/.well-known/agent-card.json" in endpoint: base_url = endpoint.replace("/.well-known/agent-card.json", "") agent_card_path = "/.well-known/agent-card.json" @@ -217,9 +225,29 @@ async def _afetch_agent_card_impl( ) response.raise_for_status() - return AgentCard.model_validate(response.json()) + agent_card = AgentCard.model_validate(response.json()) + fetch_time_ms = (time.perf_counter() - start_time) * 1000 + agent_card_dict = agent_card.model_dump(exclude_none=True) + + crewai_event_bus.emit( + None, + A2AAgentCardFetchedEvent( + endpoint=endpoint, + a2a_agent_name=agent_card.name, + agent_card=agent_card_dict, + protocol_version=agent_card.protocol_version, + provider=agent_card_dict.get("provider"), + cached=False, + fetch_time_ms=fetch_time_ms, + ), + ) + + return agent_card except httpx.HTTPStatusError as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + response_body = e.response.text[:1000] if e.response.text else None + if e.response.status_code == 401: error_details = ["Authentication failed"] www_auth = e.response.headers.get("WWW-Authenticate") @@ -228,7 +256,93 @@ async def _afetch_agent_card_impl( if not auth: error_details.append("No auth scheme provided") msg = " | ".join(error_details) + + auth_type = type(auth).__name__ if auth else None + crewai_event_bus.emit( + None, + A2AAuthenticationFailedEvent( + endpoint=endpoint, + auth_type=auth_type, + error=msg, + status_code=401, + metadata={ + "elapsed_ms": elapsed_ms, + "response_body": response_body, + "www_authenticate": www_auth, + "request_url": str(e.request.url), + }, + ), + ) + raise A2AClientHTTPError(401, msg) from e + + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint, + error=str(e), + error_type="http_error", + status_code=e.response.status_code, + operation="fetch_agent_card", + metadata={ + "elapsed_ms": elapsed_ms, + "response_body": response_body, + "request_url": str(e.request.url), + }, + ), + ) + raise + + except httpx.TimeoutException as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint, + error=str(e), + error_type="timeout", + operation="fetch_agent_card", + metadata={ + "elapsed_ms": elapsed_ms, + "timeout_config": timeout, + "request_url": str(e.request.url) if e.request else None, + }, + ), + ) + raise + + except httpx.ConnectError as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint, + error=str(e), + error_type="connection_error", + operation="fetch_agent_card", + metadata={ + "elapsed_ms": elapsed_ms, + "request_url": str(e.request.url) if e.request else None, + }, + ), + ) + raise + + except httpx.RequestError as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + crewai_event_bus.emit( + None, + A2AConnectionErrorEvent( + endpoint=endpoint, + error=str(e), + error_type="request_error", + operation="fetch_agent_card", + metadata={ + "elapsed_ms": elapsed_ms, + "request_url": str(e.request.url) if e.request else None, + }, + ), + ) raise diff --git a/lib/crewai/src/crewai/a2a/utils/delegation.py b/lib/crewai/src/crewai/a2a/utils/delegation.py index 740f914bc..0fc9eaec5 100644 --- a/lib/crewai/src/crewai/a2a/utils/delegation.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -88,6 +88,9 @@ def execute_a2a_delegation( response_model: type[BaseModel] | None = None, turn_number: int | None = None, updates: UpdateConfig | None = None, + from_task: Any | None = None, + from_agent: Any | None = None, + skill_id: str | None = None, ) -> TaskStateResult: """Execute a task delegation to a remote A2A agent synchronously. @@ -129,6 +132,9 @@ def execute_a2a_delegation( response_model: Optional Pydantic model for structured outputs. turn_number: Optional turn number for multi-turn conversations. updates: Update mechanism config from A2AConfig.updates. + from_task: Optional CrewAI Task object for event metadata. + from_agent: Optional CrewAI Agent object for event metadata. + skill_id: Optional skill ID to target a specific agent capability. Returns: TaskStateResult with status, result/error, history, and agent_card. @@ -156,10 +162,16 @@ def execute_a2a_delegation( transport_protocol=transport_protocol, turn_number=turn_number, updates=updates, + from_task=from_task, + from_agent=from_agent, + skill_id=skill_id, ) ) finally: - loop.close() + try: + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + loop.close() async def aexecute_a2a_delegation( @@ -181,6 +193,9 @@ async def aexecute_a2a_delegation( response_model: type[BaseModel] | None = None, turn_number: int | None = None, updates: UpdateConfig | None = None, + from_task: Any | None = None, + from_agent: Any | None = None, + skill_id: str | None = None, ) -> TaskStateResult: """Execute a task delegation to a remote A2A agent asynchronously. @@ -222,6 +237,9 @@ async def aexecute_a2a_delegation( response_model: Optional Pydantic model for structured outputs. turn_number: Optional turn number for multi-turn conversations. updates: Update mechanism config from A2AConfig.updates. + from_task: Optional CrewAI Task object for event metadata. + from_agent: Optional CrewAI Agent object for event metadata. + skill_id: Optional skill ID to target a specific agent capability. Returns: TaskStateResult with status, result/error, history, and agent_card. @@ -233,17 +251,6 @@ async def aexecute_a2a_delegation( if turn_number is None: turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1 - crewai_event_bus.emit( - agent_branch, - A2ADelegationStartedEvent( - endpoint=endpoint, - task_description=task_description, - agent_id=agent_id, - is_multiturn=is_multiturn, - turn_number=turn_number, - ), - ) - result = await _aexecute_a2a_delegation_impl( endpoint=endpoint, auth=auth, @@ -264,15 +271,28 @@ async def aexecute_a2a_delegation( response_model=response_model, updates=updates, transport_protocol=transport_protocol, + from_task=from_task, + from_agent=from_agent, + skill_id=skill_id, ) + agent_card_data: dict[str, Any] = result.get("agent_card") or {} crewai_event_bus.emit( agent_branch, A2ADelegationCompletedEvent( status=result["status"], result=result.get("result"), error=result.get("error"), + context_id=context_id, is_multiturn=is_multiturn, + endpoint=endpoint, + a2a_agent_name=result.get("a2a_agent_name"), + agent_card=agent_card_data, + provider=agent_card_data.get("provider"), + metadata=metadata, + extensions=list(extensions.keys()) if extensions else None, + from_task=from_task, + from_agent=from_agent, ), ) @@ -299,6 +319,9 @@ async def _aexecute_a2a_delegation_impl( agent_role: str | None, response_model: type[BaseModel] | None, updates: UpdateConfig | None, + from_task: Any | None = None, + from_agent: Any | None = None, + skill_id: str | None = None, ) -> TaskStateResult: """Internal async implementation of A2A delegation.""" if auth: @@ -331,6 +354,28 @@ async def _aexecute_a2a_delegation_impl( if agent_card.name: a2a_agent_name = agent_card.name + agent_card_dict = agent_card.model_dump(exclude_none=True) + crewai_event_bus.emit( + agent_branch, + A2ADelegationStartedEvent( + endpoint=endpoint, + task_description=task_description, + agent_id=agent_id or endpoint, + context_id=context_id, + is_multiturn=is_multiturn, + turn_number=turn_number, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card_dict, + protocol_version=agent_card.protocol_version, + provider=agent_card_dict.get("provider"), + skill_id=skill_id, + metadata=metadata, + extensions=list(extensions.keys()) if extensions else None, + from_task=from_task, + from_agent=from_agent, + ), + ) + if turn_number == 1: agent_id_for_event = agent_id or endpoint crewai_event_bus.emit( @@ -338,7 +383,17 @@ async def _aexecute_a2a_delegation_impl( A2AConversationStartedEvent( agent_id=agent_id_for_event, endpoint=endpoint, + context_id=context_id, a2a_agent_name=a2a_agent_name, + agent_card=agent_card_dict, + protocol_version=agent_card.protocol_version, + provider=agent_card_dict.get("provider"), + skill_id=skill_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=list(extensions.keys()) if extensions else None, + from_task=from_task, + from_agent=from_agent, ), ) @@ -364,6 +419,10 @@ async def _aexecute_a2a_delegation_impl( } ) + message_metadata = metadata.copy() if metadata else {} + if skill_id: + message_metadata["skill_id"] = skill_id + message = Message( role=Role.user, message_id=str(uuid.uuid4()), @@ -371,7 +430,7 @@ async def _aexecute_a2a_delegation_impl( context_id=context_id, task_id=task_id, reference_task_ids=reference_task_ids, - metadata=metadata, + metadata=message_metadata if message_metadata else None, extensions=extensions, ) @@ -381,8 +440,17 @@ async def _aexecute_a2a_delegation_impl( A2AMessageSentEvent( message=message_text, turn_number=turn_number, + context_id=context_id, + message_id=message.message_id, is_multiturn=is_multiturn, agent_role=agent_role, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + skill_id=skill_id, + metadata=message_metadata if message_metadata else None, + extensions=list(extensions.keys()) if extensions else None, + from_task=from_task, + from_agent=from_agent, ), ) @@ -397,6 +465,9 @@ async def _aexecute_a2a_delegation_impl( "task_id": task_id, "endpoint": endpoint, "agent_branch": agent_branch, + "a2a_agent_name": a2a_agent_name, + "from_task": from_task, + "from_agent": from_agent, } if isinstance(updates, PollingConfig): @@ -434,13 +505,16 @@ async def _aexecute_a2a_delegation_impl( use_polling=use_polling, push_notification_config=push_config_for_client, ) as client: - return await handler.execute( + result = await handler.execute( client=client, message=message, new_messages=new_messages, agent_card=agent_card, **handler_kwargs, ) + result["a2a_agent_name"] = a2a_agent_name + result["agent_card"] = agent_card.model_dump(exclude_none=True) + return result @asynccontextmanager diff --git a/lib/crewai/src/crewai/a2a/utils/task.py b/lib/crewai/src/crewai/a2a/utils/task.py index 5669e7e4b..479a3e1c9 100644 --- a/lib/crewai/src/crewai/a2a/utils/task.py +++ b/lib/crewai/src/crewai/a2a/utils/task.py @@ -3,11 +3,14 @@ from __future__ import annotations import asyncio +import base64 from collections.abc import Callable, Coroutine +from datetime import datetime from functools import wraps import logging import os from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast +from urllib.parse import urlparse from a2a.server.agent_execution import RequestContext from a2a.server.events import EventQueue @@ -45,7 +48,14 @@ T = TypeVar("T") def _parse_redis_url(url: str) -> dict[str, Any]: - from urllib.parse import urlparse + """Parse a Redis URL into aiocache configuration. + + Args: + url: Redis connection URL (e.g., redis://localhost:6379/0). + + Returns: + Configuration dict for aiocache.RedisCache. + """ parsed = urlparse(url) config: dict[str, Any] = { @@ -127,7 +137,7 @@ def cancellable( async for message in pubsub.listen(): if message["type"] == "message": return True - except Exception as e: + except (OSError, ConnectionError) as e: logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e) return await poll_for_cancel() return False @@ -183,7 +193,12 @@ async def execute( msg = "task_id and context_id are required" crewai_event_bus.emit( agent, - A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg), + A2AServerTaskFailedEvent( + task_id="", + context_id="", + error=msg, + from_agent=agent, + ), ) raise ServerError(InvalidParamsError(message=msg)) from None @@ -195,7 +210,12 @@ async def execute( crewai_event_bus.emit( agent, - A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id), + A2AServerTaskStartedEvent( + task_id=task_id, + context_id=context_id, + from_task=task, + from_agent=agent, + ), ) try: @@ -215,20 +235,33 @@ async def execute( crewai_event_bus.emit( agent, A2AServerTaskCompletedEvent( - a2a_task_id=task_id, a2a_context_id=context_id, result=str(result) + task_id=task_id, + context_id=context_id, + result=str(result), + from_task=task, + from_agent=agent, ), ) except asyncio.CancelledError: crewai_event_bus.emit( agent, - A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id), + A2AServerTaskCanceledEvent( + task_id=task_id, + context_id=context_id, + from_task=task, + from_agent=agent, + ), ) raise except Exception as e: crewai_event_bus.emit( agent, A2AServerTaskFailedEvent( - a2a_task_id=task_id, a2a_context_id=context_id, error=str(e) + task_id=task_id, + context_id=context_id, + error=str(e), + from_task=task, + from_agent=agent, ), ) raise ServerError( @@ -282,3 +315,85 @@ async def cancel( context.current_task.status = TaskStatus(state=TaskState.canceled) return context.current_task return None + + +def list_tasks( + tasks: list[A2ATask], + context_id: str | None = None, + status: TaskState | None = None, + status_timestamp_after: datetime | None = None, + page_size: int = 50, + page_token: str | None = None, + history_length: int | None = None, + include_artifacts: bool = False, +) -> tuple[list[A2ATask], str | None, int]: + """Filter and paginate A2A tasks. + + Provides filtering by context, status, and timestamp, along with + cursor-based pagination. This is a pure utility function that operates + on an in-memory list of tasks - storage retrieval is handled separately. + + Args: + tasks: All tasks to filter. + context_id: Filter by context ID to get tasks in a conversation. + status: Filter by task state (e.g., completed, working). + status_timestamp_after: Filter to tasks updated after this time. + page_size: Maximum tasks per page (default 50). + page_token: Base64-encoded cursor from previous response. + history_length: Limit history messages per task (None = full history). + include_artifacts: Whether to include task artifacts (default False). + + Returns: + Tuple of (filtered_tasks, next_page_token, total_count). + - filtered_tasks: Tasks matching filters, paginated and trimmed. + - next_page_token: Token for next page, or None if no more pages. + - total_count: Total number of tasks matching filters (before pagination). + """ + filtered: list[A2ATask] = [] + for task in tasks: + if context_id and task.context_id != context_id: + continue + if status and task.status.state != status: + continue + if status_timestamp_after and task.status.timestamp: + ts = datetime.fromisoformat(task.status.timestamp.replace("Z", "+00:00")) + if ts <= status_timestamp_after: + continue + filtered.append(task) + + def get_timestamp(t: A2ATask) -> datetime: + """Extract timestamp from task status for sorting.""" + if t.status.timestamp is None: + return datetime.min + return datetime.fromisoformat(t.status.timestamp.replace("Z", "+00:00")) + + filtered.sort(key=get_timestamp, reverse=True) + total = len(filtered) + + start = 0 + if page_token: + try: + cursor_id = base64.b64decode(page_token).decode() + for idx, task in enumerate(filtered): + if task.id == cursor_id: + start = idx + 1 + break + except (ValueError, UnicodeDecodeError): + pass + + page = filtered[start : start + page_size] + + result: list[A2ATask] = [] + for task in page: + task = task.model_copy(deep=True) + if history_length is not None and task.history: + task.history = task.history[-history_length:] + if not include_artifacts: + task.artifacts = None + result.append(task) + + next_token: str | None = None + if result and len(result) == page_size: + next_token = base64.b64encode(result[-1].id.encode()).decode() + + return result, next_token, total diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 37a6c665e..a149c46a0 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -6,9 +6,10 @@ Wraps agent classes with A2A delegation capabilities. from __future__ import annotations import asyncio -from collections.abc import Callable, Coroutine +from collections.abc import Callable, Coroutine, Mapping from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps +import json from types import MethodType from typing import TYPE_CHECKING, Any @@ -189,7 +190,7 @@ def _execute_task_with_a2a( a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., str], task: Task, - agent_response_model: type[BaseModel], + agent_response_model: type[BaseModel] | None, context: str | None, tools: list[BaseTool] | None, extension_registry: ExtensionRegistry, @@ -277,7 +278,7 @@ def _execute_task_with_a2a( def _augment_prompt_with_a2a( a2a_agents: list[A2AConfig | A2AClientConfig], task_description: str, - agent_cards: dict[str, AgentCard], + agent_cards: Mapping[str, AgentCard | dict[str, Any]], conversation_history: list[Message] | None = None, turn_num: int = 0, max_turns: int | None = None, @@ -309,7 +310,15 @@ def _augment_prompt_with_a2a( for config in a2a_agents: if config.endpoint in agent_cards: card = agent_cards[config.endpoint] - agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n" + if isinstance(card, dict): + filtered = { + k: v + for k, v in card.items() + if k in {"description", "url", "skills"} and v is not None + } + agents_text += f"\n{json.dumps(filtered, indent=2)}\n" + else: + agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n" failed_agents = failed_agents or {} if failed_agents: @@ -377,7 +386,7 @@ IMPORTANT: You have the ability to delegate this task to remote A2A agents. def _parse_agent_response( - raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] + raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] | None ) -> BaseModel | str | dict[str, Any]: """Parse LLM output as AgentResponse or return raw agent response.""" if agent_response_model: @@ -394,6 +403,11 @@ def _parse_agent_response( def _handle_max_turns_exceeded( conversation_history: list[Message], max_turns: int, + from_task: Any | None = None, + from_agent: Any | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + agent_card: dict[str, Any] | None = None, ) -> str: """Handle the case when max turns is exceeded. @@ -421,6 +435,11 @@ def _handle_max_turns_exceeded( final_result=final_message, error=None, total_turns=max_turns, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ), ) return final_message @@ -432,6 +451,11 @@ def _handle_max_turns_exceeded( final_result=None, error=f"Conversation exceeded maximum turns ({max_turns})", total_turns=max_turns, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ), ) raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})") @@ -442,7 +466,12 @@ def _process_response_result( disable_structured_output: bool, turn_num: int, agent_role: str, - agent_response_model: type[BaseModel], + agent_response_model: type[BaseModel] | None, + from_task: Any | None = None, + from_agent: Any | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + agent_card: dict[str, Any] | None = None, ) -> tuple[str | None, str | None]: """Process LLM response and determine next action. @@ -461,6 +490,10 @@ def _process_response_result( turn_number=final_turn_number, is_multiturn=True, agent_role=agent_role, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, ), ) crewai_event_bus.emit( @@ -470,6 +503,11 @@ def _process_response_result( final_result=result_text, error=None, total_turns=final_turn_number, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ), ) return result_text, None @@ -490,6 +528,10 @@ def _process_response_result( turn_number=final_turn_number, is_multiturn=True, agent_role=agent_role, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, ), ) crewai_event_bus.emit( @@ -499,6 +541,11 @@ def _process_response_result( final_result=str(llm_response.message), error=None, total_turns=final_turn_number, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ), ) return str(llm_response.message), None @@ -510,13 +557,15 @@ def _process_response_result( def _prepare_agent_cards_dict( a2a_result: TaskStateResult, agent_id: str, - agent_cards: dict[str, AgentCard] | None, -) -> dict[str, AgentCard]: + agent_cards: Mapping[str, AgentCard | dict[str, Any]] | None, +) -> dict[str, AgentCard | dict[str, Any]]: """Prepare agent cards dictionary from result and existing cards. Shared logic for both sync and async response handlers. """ - agent_cards_dict = agent_cards or {} + agent_cards_dict: dict[str, AgentCard | dict[str, Any]] = ( + dict(agent_cards) if agent_cards else {} + ) if "agent_card" in a2a_result and agent_id not in agent_cards_dict: agent_cards_dict[agent_id] = a2a_result["agent_card"] return agent_cards_dict @@ -529,7 +578,7 @@ def _prepare_delegation_context( original_task_description: str | None, ) -> tuple[ list[A2AConfig | A2AClientConfig], - type[BaseModel], + type[BaseModel] | None, str, str, A2AConfig | A2AClientConfig, @@ -598,6 +647,11 @@ def _handle_task_completion( reference_task_ids: list[str], agent_config: A2AConfig | A2AClientConfig, turn_num: int, + from_task: Any | None = None, + from_agent: Any | None = None, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + agent_card: dict[str, Any] | None = None, ) -> tuple[str | None, str | None, list[str]]: """Handle task completion state including reference task updates. @@ -624,6 +678,11 @@ def _handle_task_completion( final_result=result_text, error=None, total_turns=final_turn_number, + from_task=from_task, + from_agent=from_agent, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ), ) return str(result_text), task_id_config, reference_task_ids @@ -645,8 +704,11 @@ def _handle_agent_response_and_continue( original_fn: Callable[..., str], context: str | None, tools: list[BaseTool] | None, - agent_response_model: type[BaseModel], + agent_response_model: type[BaseModel] | None, remote_task_completed: bool = False, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + agent_card: dict[str, Any] | None = None, ) -> tuple[str | None, str | None]: """Handle A2A result and get CrewAI agent's response. @@ -698,6 +760,11 @@ def _handle_agent_response_and_continue( turn_num=turn_num, agent_role=self.role, agent_response_model=agent_response_model, + from_task=task, + from_agent=self, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ) @@ -750,6 +817,12 @@ def _delegate_to_a2a( conversation_history: list[Message] = [] + current_agent_card = agent_cards.get(agent_id) if agent_cards else None + current_agent_card_dict = ( + current_agent_card.model_dump() if current_agent_card else None + ) + current_a2a_agent_name = current_agent_card.name if current_agent_card else None + try: for turn_num in range(max_turns): console_formatter = getattr(crewai_event_bus, "_console", None) @@ -777,6 +850,8 @@ def _delegate_to_a2a( turn_number=turn_num + 1, updates=agent_config.updates, transport_protocol=agent_config.transport_protocol, + from_task=task, + from_agent=self, ) conversation_history = a2a_result.get("history", []) @@ -797,6 +872,11 @@ def _delegate_to_a2a( reference_task_ids, agent_config, turn_num, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) ) if trusted_result is not None: @@ -818,6 +898,9 @@ def _delegate_to_a2a( tools=tools, agent_response_model=agent_response_model, remote_task_completed=(a2a_result["status"] == TaskState.completed), + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) if final_result is not None: @@ -846,6 +929,9 @@ def _delegate_to_a2a( tools=tools, agent_response_model=agent_response_model, remote_task_completed=False, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) if final_result is not None: @@ -862,11 +948,24 @@ def _delegate_to_a2a( final_result=None, error=error_msg, total_turns=turn_num + 1, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ), ) return f"A2A delegation failed: {error_msg}" - return _handle_max_turns_exceeded(conversation_history, max_turns) + return _handle_max_turns_exceeded( + conversation_history, + max_turns, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, + ) finally: task.description = original_task_description @@ -916,7 +1015,7 @@ async def _aexecute_task_with_a2a( a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., Coroutine[Any, Any, str]], task: Task, - agent_response_model: type[BaseModel], + agent_response_model: type[BaseModel] | None, context: str | None, tools: list[BaseTool] | None, extension_registry: ExtensionRegistry, @@ -1001,8 +1100,11 @@ async def _ahandle_agent_response_and_continue( original_fn: Callable[..., Coroutine[Any, Any, str]], context: str | None, tools: list[BaseTool] | None, - agent_response_model: type[BaseModel], + agent_response_model: type[BaseModel] | None, remote_task_completed: bool = False, + endpoint: str | None = None, + a2a_agent_name: str | None = None, + agent_card: dict[str, Any] | None = None, ) -> tuple[str | None, str | None]: """Async version of _handle_agent_response_and_continue.""" agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards) @@ -1032,6 +1134,11 @@ async def _ahandle_agent_response_and_continue( turn_num=turn_num, agent_role=self.role, agent_response_model=agent_response_model, + from_task=task, + from_agent=self, + endpoint=endpoint, + a2a_agent_name=a2a_agent_name, + agent_card=agent_card, ) @@ -1066,6 +1173,12 @@ async def _adelegate_to_a2a( conversation_history: list[Message] = [] + current_agent_card = agent_cards.get(agent_id) if agent_cards else None + current_agent_card_dict = ( + current_agent_card.model_dump() if current_agent_card else None + ) + current_a2a_agent_name = current_agent_card.name if current_agent_card else None + try: for turn_num in range(max_turns): console_formatter = getattr(crewai_event_bus, "_console", None) @@ -1093,6 +1206,8 @@ async def _adelegate_to_a2a( turn_number=turn_num + 1, transport_protocol=agent_config.transport_protocol, updates=agent_config.updates, + from_task=task, + from_agent=self, ) conversation_history = a2a_result.get("history", []) @@ -1113,6 +1228,11 @@ async def _adelegate_to_a2a( reference_task_ids, agent_config, turn_num, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) ) if trusted_result is not None: @@ -1134,6 +1254,9 @@ async def _adelegate_to_a2a( tools=tools, agent_response_model=agent_response_model, remote_task_completed=(a2a_result["status"] == TaskState.completed), + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) if final_result is not None: @@ -1161,6 +1284,9 @@ async def _adelegate_to_a2a( context=context, tools=tools, agent_response_model=agent_response_model, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ) if final_result is not None: @@ -1177,11 +1303,24 @@ async def _adelegate_to_a2a( final_result=None, error=error_msg, total_turns=turn_num + 1, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, ), ) return f"A2A delegation failed: {error_msg}" - return _handle_max_turns_exceeded(conversation_history, max_turns) + return _handle_max_turns_exceeded( + conversation_history, + max_turns, + from_task=task, + from_agent=self, + endpoint=agent_config.endpoint, + a2a_agent_name=current_a2a_agent_name, + agent_card=current_agent_card_dict, + ) finally: task.description = original_task_description diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index b4479021e..78aa11fe0 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -1,19 +1,28 @@ from crewai.events.types.a2a_events import ( + A2AAgentCardFetchedEvent, + A2AArtifactReceivedEvent, + A2AAuthenticationFailedEvent, + A2AConnectionErrorEvent, A2AConversationCompletedEvent, A2AConversationStartedEvent, A2ADelegationCompletedEvent, A2ADelegationStartedEvent, A2AMessageSentEvent, + A2AParallelDelegationCompletedEvent, + A2AParallelDelegationStartedEvent, A2APollingStartedEvent, A2APollingStatusEvent, A2APushNotificationReceivedEvent, A2APushNotificationRegisteredEvent, + A2APushNotificationSentEvent, A2APushNotificationTimeoutEvent, A2AResponseReceivedEvent, A2AServerTaskCanceledEvent, A2AServerTaskCompletedEvent, A2AServerTaskFailedEvent, A2AServerTaskStartedEvent, + A2AStreamingChunkEvent, + A2AStreamingStartedEvent, ) from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, @@ -93,7 +102,11 @@ from crewai.events.types.tool_usage_events import ( EventTypes = ( - A2AConversationCompletedEvent + A2AAgentCardFetchedEvent + | A2AArtifactReceivedEvent + | A2AAuthenticationFailedEvent + | A2AConnectionErrorEvent + | A2AConversationCompletedEvent | A2AConversationStartedEvent | A2ADelegationCompletedEvent | A2ADelegationStartedEvent @@ -102,12 +115,17 @@ EventTypes = ( | A2APollingStatusEvent | A2APushNotificationReceivedEvent | A2APushNotificationRegisteredEvent + | A2APushNotificationSentEvent | A2APushNotificationTimeoutEvent | A2AResponseReceivedEvent | A2AServerTaskCanceledEvent | A2AServerTaskCompletedEvent | A2AServerTaskFailedEvent | A2AServerTaskStartedEvent + | A2AStreamingChunkEvent + | A2AStreamingStartedEvent + | A2AParallelDelegationStartedEvent + | A2AParallelDelegationCompletedEvent | CrewKickoffStartedEvent | CrewKickoffCompletedEvent | CrewKickoffFailedEvent diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index 6e7fba0ef..a4d4cbd31 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -1,7 +1,7 @@ """Trace collection listener for orchestrating trace collection.""" import os -from typing import Any, ClassVar, cast +from typing import Any, ClassVar import uuid from typing_extensions import Self @@ -18,6 +18,32 @@ from crewai.events.listeners.tracing.types import TraceEvent from crewai.events.listeners.tracing.utils import ( safe_serialize_to_dict, ) +from crewai.events.types.a2a_events import ( + A2AAgentCardFetchedEvent, + A2AArtifactReceivedEvent, + A2AAuthenticationFailedEvent, + A2AConnectionErrorEvent, + A2AConversationCompletedEvent, + A2AConversationStartedEvent, + A2ADelegationCompletedEvent, + A2ADelegationStartedEvent, + A2AMessageSentEvent, + A2AParallelDelegationCompletedEvent, + A2AParallelDelegationStartedEvent, + A2APollingStartedEvent, + A2APollingStatusEvent, + A2APushNotificationReceivedEvent, + A2APushNotificationRegisteredEvent, + A2APushNotificationSentEvent, + A2APushNotificationTimeoutEvent, + A2AResponseReceivedEvent, + A2AServerTaskCanceledEvent, + A2AServerTaskCompletedEvent, + A2AServerTaskFailedEvent, + A2AServerTaskStartedEvent, + A2AStreamingChunkEvent, + A2AStreamingStartedEvent, +) from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, @@ -105,7 +131,7 @@ class TraceCollectionListener(BaseEventListener): """Create or return singleton instance.""" if cls._instance is None: cls._instance = super().__new__(cls) - return cast(Self, cls._instance) + return cls._instance def __init__( self, @@ -160,6 +186,7 @@ class TraceCollectionListener(BaseEventListener): self._register_flow_event_handlers(crewai_event_bus) self._register_context_event_handlers(crewai_event_bus) self._register_action_event_handlers(crewai_event_bus) + self._register_a2a_event_handlers(crewai_event_bus) self._register_system_event_handlers(crewai_event_bus) self._listeners_setup = True @@ -439,6 +466,147 @@ class TraceCollectionListener(BaseEventListener): ) -> None: self._handle_action_event("knowledge_query_failed", source, event) + def _register_a2a_event_handlers(self, event_bus: CrewAIEventsBus) -> None: + """Register handlers for A2A (Agent-to-Agent) events.""" + + @event_bus.on(A2ADelegationStartedEvent) + def on_a2a_delegation_started( + source: Any, event: A2ADelegationStartedEvent + ) -> None: + self._handle_action_event("a2a_delegation_started", source, event) + + @event_bus.on(A2ADelegationCompletedEvent) + def on_a2a_delegation_completed( + source: Any, event: A2ADelegationCompletedEvent + ) -> None: + self._handle_action_event("a2a_delegation_completed", source, event) + + @event_bus.on(A2AConversationStartedEvent) + def on_a2a_conversation_started( + source: Any, event: A2AConversationStartedEvent + ) -> None: + self._handle_action_event("a2a_conversation_started", source, event) + + @event_bus.on(A2AMessageSentEvent) + def on_a2a_message_sent(source: Any, event: A2AMessageSentEvent) -> None: + self._handle_action_event("a2a_message_sent", source, event) + + @event_bus.on(A2AResponseReceivedEvent) + def on_a2a_response_received( + source: Any, event: A2AResponseReceivedEvent + ) -> None: + self._handle_action_event("a2a_response_received", source, event) + + @event_bus.on(A2AConversationCompletedEvent) + def on_a2a_conversation_completed( + source: Any, event: A2AConversationCompletedEvent + ) -> None: + self._handle_action_event("a2a_conversation_completed", source, event) + + @event_bus.on(A2APollingStartedEvent) + def on_a2a_polling_started(source: Any, event: A2APollingStartedEvent) -> None: + self._handle_action_event("a2a_polling_started", source, event) + + @event_bus.on(A2APollingStatusEvent) + def on_a2a_polling_status(source: Any, event: A2APollingStatusEvent) -> None: + self._handle_action_event("a2a_polling_status", source, event) + + @event_bus.on(A2APushNotificationRegisteredEvent) + def on_a2a_push_notification_registered( + source: Any, event: A2APushNotificationRegisteredEvent + ) -> None: + self._handle_action_event("a2a_push_notification_registered", source, event) + + @event_bus.on(A2APushNotificationReceivedEvent) + def on_a2a_push_notification_received( + source: Any, event: A2APushNotificationReceivedEvent + ) -> None: + self._handle_action_event("a2a_push_notification_received", source, event) + + @event_bus.on(A2APushNotificationSentEvent) + def on_a2a_push_notification_sent( + source: Any, event: A2APushNotificationSentEvent + ) -> None: + self._handle_action_event("a2a_push_notification_sent", source, event) + + @event_bus.on(A2APushNotificationTimeoutEvent) + def on_a2a_push_notification_timeout( + source: Any, event: A2APushNotificationTimeoutEvent + ) -> None: + self._handle_action_event("a2a_push_notification_timeout", source, event) + + @event_bus.on(A2AStreamingStartedEvent) + def on_a2a_streaming_started( + source: Any, event: A2AStreamingStartedEvent + ) -> None: + self._handle_action_event("a2a_streaming_started", source, event) + + @event_bus.on(A2AStreamingChunkEvent) + def on_a2a_streaming_chunk(source: Any, event: A2AStreamingChunkEvent) -> None: + self._handle_action_event("a2a_streaming_chunk", source, event) + + @event_bus.on(A2AAgentCardFetchedEvent) + def on_a2a_agent_card_fetched( + source: Any, event: A2AAgentCardFetchedEvent + ) -> None: + self._handle_action_event("a2a_agent_card_fetched", source, event) + + @event_bus.on(A2AAuthenticationFailedEvent) + def on_a2a_authentication_failed( + source: Any, event: A2AAuthenticationFailedEvent + ) -> None: + self._handle_action_event("a2a_authentication_failed", source, event) + + @event_bus.on(A2AArtifactReceivedEvent) + def on_a2a_artifact_received( + source: Any, event: A2AArtifactReceivedEvent + ) -> None: + self._handle_action_event("a2a_artifact_received", source, event) + + @event_bus.on(A2AConnectionErrorEvent) + def on_a2a_connection_error( + source: Any, event: A2AConnectionErrorEvent + ) -> None: + self._handle_action_event("a2a_connection_error", source, event) + + @event_bus.on(A2AServerTaskStartedEvent) + def on_a2a_server_task_started( + source: Any, event: A2AServerTaskStartedEvent + ) -> None: + self._handle_action_event("a2a_server_task_started", source, event) + + @event_bus.on(A2AServerTaskCompletedEvent) + def on_a2a_server_task_completed( + source: Any, event: A2AServerTaskCompletedEvent + ) -> None: + self._handle_action_event("a2a_server_task_completed", source, event) + + @event_bus.on(A2AServerTaskCanceledEvent) + def on_a2a_server_task_canceled( + source: Any, event: A2AServerTaskCanceledEvent + ) -> None: + self._handle_action_event("a2a_server_task_canceled", source, event) + + @event_bus.on(A2AServerTaskFailedEvent) + def on_a2a_server_task_failed( + source: Any, event: A2AServerTaskFailedEvent + ) -> None: + self._handle_action_event("a2a_server_task_failed", source, event) + + @event_bus.on(A2AParallelDelegationStartedEvent) + def on_a2a_parallel_delegation_started( + source: Any, event: A2AParallelDelegationStartedEvent + ) -> None: + self._handle_action_event("a2a_parallel_delegation_started", source, event) + + @event_bus.on(A2AParallelDelegationCompletedEvent) + def on_a2a_parallel_delegation_completed( + source: Any, event: A2AParallelDelegationCompletedEvent + ) -> None: + self._handle_action_event( + "a2a_parallel_delegation_completed", source, event + ) + def _register_system_event_handlers(self, event_bus: CrewAIEventsBus) -> None: """Register handlers for system signal events (SIGTERM, SIGINT, etc.).""" @@ -570,10 +738,15 @@ class TraceCollectionListener(BaseEventListener): if event_type not in self.complex_events: return safe_serialize_to_dict(event) if event_type == "task_started": + task_name = event.task.name or event.task.description + task_display_name = ( + task_name[:80] + "..." if len(task_name) > 80 else task_name + ) return { "task_description": event.task.description, "expected_output": event.task.expected_output, - "task_name": event.task.name or event.task.description, + "task_name": task_name, + "task_display_name": task_display_name, "context": event.context, "agent_role": source.agent.role, "task_id": str(event.task.id), diff --git a/lib/crewai/src/crewai/events/types/a2a_events.py b/lib/crewai/src/crewai/events/types/a2a_events.py index 9f414b333..d69878aac 100644 --- a/lib/crewai/src/crewai/events/types/a2a_events.py +++ b/lib/crewai/src/crewai/events/types/a2a_events.py @@ -4,68 +4,120 @@ This module defines events emitted during A2A protocol delegation, including both single-turn and multiturn conversation flows. """ +from __future__ import annotations + from typing import Any, Literal +from pydantic import model_validator + from crewai.events.base_events import BaseEvent class A2AEventBase(BaseEvent): """Base class for A2A events with task/agent context.""" - from_task: Any | None = None - from_agent: Any | None = None + from_task: Any = None + from_agent: Any = None - def __init__(self, **data: Any) -> None: - """Initialize A2A event, extracting task and agent metadata.""" - if data.get("from_task"): - task = data["from_task"] + @model_validator(mode="before") + @classmethod + def extract_task_and_agent_metadata(cls, data: dict[str, Any]) -> dict[str, Any]: + """Extract task and agent metadata before validation.""" + if task := data.get("from_task"): data["task_id"] = str(task.id) data["task_name"] = task.name or task.description + data.setdefault("source_fingerprint", str(task.id)) + data.setdefault("source_type", "task") + data.setdefault( + "fingerprint_metadata", + { + "task_id": str(task.id), + "task_name": task.name or task.description, + }, + ) data["from_task"] = None - if data.get("from_agent"): - agent = data["from_agent"] + if agent := data.get("from_agent"): data["agent_id"] = str(agent.id) data["agent_role"] = agent.role + data.setdefault("source_fingerprint", str(agent.id)) + data.setdefault("source_type", "agent") + data.setdefault( + "fingerprint_metadata", + { + "agent_id": str(agent.id), + "agent_role": agent.role, + }, + ) data["from_agent"] = None - super().__init__(**data) + return data class A2ADelegationStartedEvent(A2AEventBase): """Event emitted when A2A delegation starts. Attributes: - endpoint: A2A agent endpoint URL (AgentCard URL) - task_description: Task being delegated to the A2A agent - agent_id: A2A agent identifier - is_multiturn: Whether this is part of a multiturn conversation - turn_number: Current turn number (1-indexed, 1 for single-turn) + endpoint: A2A agent endpoint URL (AgentCard URL). + task_description: Task being delegated to the A2A agent. + agent_id: A2A agent identifier. + context_id: A2A context ID grouping related tasks. + is_multiturn: Whether this is part of a multiturn conversation. + turn_number: Current turn number (1-indexed, 1 for single-turn). + a2a_agent_name: Name of the A2A agent from agent card. + agent_card: Full A2A agent card metadata. + protocol_version: A2A protocol version being used. + provider: Agent provider/organization info from agent card. + skill_id: ID of the specific skill being invoked. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_delegation_started" endpoint: str task_description: str agent_id: str + context_id: str | None = None is_multiturn: bool = False turn_number: int = 1 + a2a_agent_name: str | None = None + agent_card: dict[str, Any] | None = None + protocol_version: str | None = None + provider: dict[str, Any] | None = None + skill_id: str | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2ADelegationCompletedEvent(A2AEventBase): """Event emitted when A2A delegation completes. Attributes: - status: Completion status (completed, input_required, failed, etc.) - result: Result message if status is completed - error: Error/response message (error for failed, response for input_required) - is_multiturn: Whether this is part of a multiturn conversation + status: Completion status (completed, input_required, failed, etc.). + result: Result message if status is completed. + error: Error/response message (error for failed, response for input_required). + context_id: A2A context ID grouping related tasks. + is_multiturn: Whether this is part of a multiturn conversation. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + agent_card: Full A2A agent card metadata. + provider: Agent provider/organization info from agent card. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_delegation_completed" status: str result: str | None = None error: str | None = None + context_id: str | None = None is_multiturn: bool = False + endpoint: str | None = None + a2a_agent_name: str | None = None + agent_card: dict[str, Any] | None = None + provider: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2AConversationStartedEvent(A2AEventBase): @@ -75,51 +127,95 @@ class A2AConversationStartedEvent(A2AEventBase): before the first message exchange. Attributes: - agent_id: A2A agent identifier - endpoint: A2A agent endpoint URL - a2a_agent_name: Name of the A2A agent from agent card + agent_id: A2A agent identifier. + endpoint: A2A agent endpoint URL. + context_id: A2A context ID grouping related tasks. + a2a_agent_name: Name of the A2A agent from agent card. + agent_card: Full A2A agent card metadata. + protocol_version: A2A protocol version being used. + provider: Agent provider/organization info from agent card. + skill_id: ID of the specific skill being invoked. + reference_task_ids: Related task IDs for context. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_conversation_started" agent_id: str endpoint: str + context_id: str | None = None a2a_agent_name: str | None = None + agent_card: dict[str, Any] | None = None + protocol_version: str | None = None + provider: dict[str, Any] | None = None + skill_id: str | None = None + reference_task_ids: list[str] | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2AMessageSentEvent(A2AEventBase): """Event emitted when a message is sent to the A2A agent. Attributes: - message: Message content sent to the A2A agent - turn_number: Current turn number (1-indexed) - is_multiturn: Whether this is part of a multiturn conversation - agent_role: Role of the CrewAI agent sending the message + message: Message content sent to the A2A agent. + turn_number: Current turn number (1-indexed). + context_id: A2A context ID grouping related tasks. + message_id: Unique A2A message identifier. + is_multiturn: Whether this is part of a multiturn conversation. + agent_role: Role of the CrewAI agent sending the message. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + skill_id: ID of the specific skill being invoked. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_message_sent" message: str turn_number: int + context_id: str | None = None + message_id: str | None = None is_multiturn: bool = False agent_role: str | None = None + endpoint: str | None = None + a2a_agent_name: str | None = None + skill_id: str | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2AResponseReceivedEvent(A2AEventBase): """Event emitted when a response is received from the A2A agent. Attributes: - response: Response content from the A2A agent - turn_number: Current turn number (1-indexed) - is_multiturn: Whether this is part of a multiturn conversation - status: Response status (input_required, completed, etc.) - agent_role: Role of the CrewAI agent (for display) + response: Response content from the A2A agent. + turn_number: Current turn number (1-indexed). + context_id: A2A context ID grouping related tasks. + message_id: Unique A2A message identifier. + is_multiturn: Whether this is part of a multiturn conversation. + status: Response status (input_required, completed, etc.). + final: Whether this is the final response in the stream. + agent_role: Role of the CrewAI agent (for display). + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_response_received" response: str turn_number: int + context_id: str | None = None + message_id: str | None = None is_multiturn: bool = False status: str + final: bool = False agent_role: str | None = None + endpoint: str | None = None + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2AConversationCompletedEvent(A2AEventBase): @@ -128,119 +224,433 @@ class A2AConversationCompletedEvent(A2AEventBase): This is emitted once at the end of a multiturn conversation. Attributes: - status: Final status (completed, failed, etc.) - final_result: Final result if completed successfully - error: Error message if failed - total_turns: Total number of turns in the conversation + status: Final status (completed, failed, etc.). + final_result: Final result if completed successfully. + error: Error message if failed. + context_id: A2A context ID grouping related tasks. + total_turns: Total number of turns in the conversation. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + agent_card: Full A2A agent card metadata. + reference_task_ids: Related task IDs for context. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. """ type: str = "a2a_conversation_completed" status: Literal["completed", "failed"] final_result: str | None = None error: str | None = None + context_id: str | None = None total_turns: int + endpoint: str | None = None + a2a_agent_name: str | None = None + agent_card: dict[str, Any] | None = None + reference_task_ids: list[str] | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None class A2APollingStartedEvent(A2AEventBase): """Event emitted when polling mode begins for A2A delegation. Attributes: - task_id: A2A task ID being polled - polling_interval: Seconds between poll attempts - endpoint: A2A agent endpoint URL + task_id: A2A task ID being polled. + context_id: A2A context ID grouping related tasks. + polling_interval: Seconds between poll attempts. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. """ type: str = "a2a_polling_started" task_id: str + context_id: str | None = None polling_interval: float endpoint: str + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None class A2APollingStatusEvent(A2AEventBase): """Event emitted on each polling iteration. Attributes: - task_id: A2A task ID being polled - state: Current task state from remote agent - elapsed_seconds: Time since polling started - poll_count: Number of polls completed + task_id: A2A task ID being polled. + context_id: A2A context ID grouping related tasks. + state: Current task state from remote agent. + elapsed_seconds: Time since polling started. + poll_count: Number of polls completed. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. """ type: str = "a2a_polling_status" task_id: str + context_id: str | None = None state: str elapsed_seconds: float poll_count: int + endpoint: str | None = None + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None class A2APushNotificationRegisteredEvent(A2AEventBase): """Event emitted when push notification callback is registered. Attributes: - task_id: A2A task ID for which callback is registered - callback_url: URL where agent will send push notifications + task_id: A2A task ID for which callback is registered. + context_id: A2A context ID grouping related tasks. + callback_url: URL where agent will send push notifications. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. """ type: str = "a2a_push_notification_registered" task_id: str + context_id: str | None = None callback_url: str + endpoint: str | None = None + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None class A2APushNotificationReceivedEvent(A2AEventBase): """Event emitted when a push notification is received. + This event should be emitted by the user's webhook handler when it receives + a push notification from the remote A2A agent, before calling + `result_store.store_result()`. + Attributes: - task_id: A2A task ID from the notification - state: Current task state from the notification + task_id: A2A task ID from the notification. + context_id: A2A context ID grouping related tasks. + state: Current task state from the notification. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. """ type: str = "a2a_push_notification_received" task_id: str + context_id: str | None = None state: str + endpoint: str | None = None + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None + + +class A2APushNotificationSentEvent(A2AEventBase): + """Event emitted when a push notification is sent to a callback URL. + + Emitted by the A2A server when it sends a task status update to the + client's registered push notification callback URL. + + Attributes: + task_id: A2A task ID being notified. + context_id: A2A context ID grouping related tasks. + callback_url: URL the notification was sent to. + state: Task state being reported. + success: Whether the notification was successfully delivered. + error: Error message if delivery failed. + metadata: Custom A2A metadata key-value pairs. + """ + + type: str = "a2a_push_notification_sent" + task_id: str + context_id: str | None = None + callback_url: str + state: str + success: bool = True + error: str | None = None + metadata: dict[str, Any] | None = None class A2APushNotificationTimeoutEvent(A2AEventBase): """Event emitted when push notification wait times out. Attributes: - task_id: A2A task ID that timed out - timeout_seconds: Timeout duration in seconds + task_id: A2A task ID that timed out. + context_id: A2A context ID grouping related tasks. + timeout_seconds: Timeout duration in seconds. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + metadata: Custom A2A metadata key-value pairs. """ type: str = "a2a_push_notification_timeout" task_id: str + context_id: str | None = None timeout_seconds: float + endpoint: str | None = None + a2a_agent_name: str | None = None + metadata: dict[str, Any] | None = None + + +class A2AStreamingStartedEvent(A2AEventBase): + """Event emitted when streaming mode begins for A2A delegation. + + Attributes: + task_id: A2A task ID for the streaming session. + context_id: A2A context ID grouping related tasks. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + turn_number: Current turn number (1-indexed). + is_multiturn: Whether this is part of a multiturn conversation. + agent_role: Role of the CrewAI agent. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. + """ + + type: str = "a2a_streaming_started" + task_id: str | None = None + context_id: str | None = None + endpoint: str + a2a_agent_name: str | None = None + turn_number: int = 1 + is_multiturn: bool = False + agent_role: str | None = None + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None + + +class A2AStreamingChunkEvent(A2AEventBase): + """Event emitted when a streaming chunk is received. + + Attributes: + task_id: A2A task ID for the streaming session. + context_id: A2A context ID grouping related tasks. + chunk: The text content of the chunk. + chunk_index: Index of this chunk in the stream (0-indexed). + final: Whether this is the final chunk in the stream. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + turn_number: Current turn number (1-indexed). + is_multiturn: Whether this is part of a multiturn conversation. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. + """ + + type: str = "a2a_streaming_chunk" + task_id: str | None = None + context_id: str | None = None + chunk: str + chunk_index: int + final: bool = False + endpoint: str | None = None + a2a_agent_name: str | None = None + turn_number: int = 1 + is_multiturn: bool = False + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None + + +class A2AAgentCardFetchedEvent(A2AEventBase): + """Event emitted when an agent card is successfully fetched. + + Attributes: + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + agent_card: Full A2A agent card metadata. + protocol_version: A2A protocol version from agent card. + provider: Agent provider/organization info from agent card. + cached: Whether the agent card was retrieved from cache. + fetch_time_ms: Time taken to fetch the agent card in milliseconds. + metadata: Custom A2A metadata key-value pairs. + """ + + type: str = "a2a_agent_card_fetched" + endpoint: str + a2a_agent_name: str | None = None + agent_card: dict[str, Any] | None = None + protocol_version: str | None = None + provider: dict[str, Any] | None = None + cached: bool = False + fetch_time_ms: float | None = None + metadata: dict[str, Any] | None = None + + +class A2AAuthenticationFailedEvent(A2AEventBase): + """Event emitted when authentication to an A2A agent fails. + + Attributes: + endpoint: A2A agent endpoint URL. + auth_type: Type of authentication attempted (e.g., bearer, oauth2, api_key). + error: Error message describing the failure. + status_code: HTTP status code if applicable. + a2a_agent_name: Name of the A2A agent if known. + protocol_version: A2A protocol version being used. + metadata: Custom A2A metadata key-value pairs. + """ + + type: str = "a2a_authentication_failed" + endpoint: str + auth_type: str | None = None + error: str + status_code: int | None = None + a2a_agent_name: str | None = None + protocol_version: str | None = None + metadata: dict[str, Any] | None = None + + +class A2AArtifactReceivedEvent(A2AEventBase): + """Event emitted when an artifact is received from a remote A2A agent. + + Attributes: + task_id: A2A task ID the artifact belongs to. + artifact_id: Unique identifier for the artifact. + artifact_name: Name of the artifact. + artifact_description: Purpose description of the artifact. + mime_type: MIME type of the artifact content. + size_bytes: Size of the artifact in bytes. + append: Whether content should be appended to existing artifact. + last_chunk: Whether this is the final chunk of the artifact. + endpoint: A2A agent endpoint URL. + a2a_agent_name: Name of the A2A agent from agent card. + context_id: Context ID for correlation. + turn_number: Current turn number (1-indexed). + is_multiturn: Whether this is part of a multiturn conversation. + metadata: Custom A2A metadata key-value pairs. + extensions: List of A2A extension URIs in use. + """ + + type: str = "a2a_artifact_received" + task_id: str + artifact_id: str + artifact_name: str | None = None + artifact_description: str | None = None + mime_type: str | None = None + size_bytes: int | None = None + append: bool = False + last_chunk: bool = False + endpoint: str | None = None + a2a_agent_name: str | None = None + context_id: str | None = None + turn_number: int = 1 + is_multiturn: bool = False + metadata: dict[str, Any] | None = None + extensions: list[str] | None = None + + +class A2AConnectionErrorEvent(A2AEventBase): + """Event emitted when a connection error occurs during A2A communication. + + Attributes: + endpoint: A2A agent endpoint URL. + error: Error message describing the connection failure. + error_type: Type of error (e.g., timeout, connection_refused, dns_error). + status_code: HTTP status code if applicable. + a2a_agent_name: Name of the A2A agent from agent card. + operation: The operation being attempted when error occurred. + context_id: A2A context ID grouping related tasks. + task_id: A2A task ID if applicable. + metadata: Custom A2A metadata key-value pairs. + """ + + type: str = "a2a_connection_error" + endpoint: str + error: str + error_type: str | None = None + status_code: int | None = None + a2a_agent_name: str | None = None + operation: str | None = None + context_id: str | None = None + task_id: str | None = None + metadata: dict[str, Any] | None = None class A2AServerTaskStartedEvent(A2AEventBase): - """Event emitted when an A2A server task execution starts.""" + """Event emitted when an A2A server task execution starts. + + Attributes: + task_id: A2A task ID for this execution. + context_id: A2A context ID grouping related tasks. + metadata: Custom A2A metadata key-value pairs. + """ type: str = "a2a_server_task_started" - a2a_task_id: str - a2a_context_id: str + task_id: str + context_id: str + metadata: dict[str, Any] | None = None class A2AServerTaskCompletedEvent(A2AEventBase): - """Event emitted when an A2A server task execution completes.""" + """Event emitted when an A2A server task execution completes. + + Attributes: + task_id: A2A task ID for this execution. + context_id: A2A context ID grouping related tasks. + result: The task result. + metadata: Custom A2A metadata key-value pairs. + """ type: str = "a2a_server_task_completed" - a2a_task_id: str - a2a_context_id: str + task_id: str + context_id: str result: str + metadata: dict[str, Any] | None = None class A2AServerTaskCanceledEvent(A2AEventBase): - """Event emitted when an A2A server task execution is canceled.""" + """Event emitted when an A2A server task execution is canceled. + + Attributes: + task_id: A2A task ID for this execution. + context_id: A2A context ID grouping related tasks. + metadata: Custom A2A metadata key-value pairs. + """ type: str = "a2a_server_task_canceled" - a2a_task_id: str - a2a_context_id: str + task_id: str + context_id: str + metadata: dict[str, Any] | None = None class A2AServerTaskFailedEvent(A2AEventBase): - """Event emitted when an A2A server task execution fails.""" + """Event emitted when an A2A server task execution fails. + + Attributes: + task_id: A2A task ID for this execution. + context_id: A2A context ID grouping related tasks. + error: Error message describing the failure. + metadata: Custom A2A metadata key-value pairs. + """ type: str = "a2a_server_task_failed" - a2a_task_id: str - a2a_context_id: str + task_id: str + context_id: str error: str + metadata: dict[str, Any] | None = None + + +class A2AParallelDelegationStartedEvent(A2AEventBase): + """Event emitted when parallel delegation to multiple A2A agents begins. + + Attributes: + endpoints: List of A2A agent endpoints being delegated to. + task_description: Description of the task being delegated. + """ + + type: str = "a2a_parallel_delegation_started" + endpoints: list[str] + task_description: str + + +class A2AParallelDelegationCompletedEvent(A2AEventBase): + """Event emitted when parallel delegation to multiple A2A agents completes. + + Attributes: + endpoints: List of A2A agent endpoints that were delegated to. + success_count: Number of successful delegations. + failure_count: Number of failed delegations. + results: Summary of results from each agent. + """ + + type: str = "a2a_parallel_delegation_completed" + endpoints: list[str] + success_count: int + failure_count: int + results: dict[str, str] | None = None diff --git a/lib/crewai/tests/a2a/utils/test_task.py b/lib/crewai/tests/a2a/utils/test_task.py index 0c01a0afc..3c3f8865e 100644 --- a/lib/crewai/tests/a2a/utils/test_task.py +++ b/lib/crewai/tests/a2a/utils/test_task.py @@ -26,9 +26,13 @@ def mock_agent() -> MagicMock: @pytest.fixture -def mock_task() -> MagicMock: +def mock_task(mock_context: MagicMock) -> MagicMock: """Create a mock Task.""" - return MagicMock() + task = MagicMock() + task.id = mock_context.task_id + task.name = "Mock Task" + task.description = "Mock task description" + return task @pytest.fixture @@ -179,8 +183,8 @@ class TestExecute: event = first_call[0][1] assert event.type == "a2a_server_task_started" - assert event.a2a_task_id == mock_context.task_id - assert event.a2a_context_id == mock_context.context_id + assert event.task_id == mock_context.task_id + assert event.context_id == mock_context.context_id @pytest.mark.asyncio async def test_emits_completed_event( @@ -201,7 +205,7 @@ class TestExecute: event = second_call[0][1] assert event.type == "a2a_server_task_completed" - assert event.a2a_task_id == mock_context.task_id + assert event.task_id == mock_context.task_id assert event.result == "Task completed successfully" @pytest.mark.asyncio @@ -250,7 +254,7 @@ class TestExecute: event = canceled_call[0][1] assert event.type == "a2a_server_task_canceled" - assert event.a2a_task_id == mock_context.task_id + assert event.task_id == mock_context.task_id class TestCancel: diff --git a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py index 7573ecb5d..6347f8e1c 100644 --- a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py +++ b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py @@ -14,6 +14,16 @@ except ImportError: A2A_SDK_INSTALLED = False +def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"): + """Create a mock agent card with proper model_dump behavior.""" + mock_card = MagicMock() + mock_card.name = name + mock_card.url = url + mock_card.model_dump.return_value = {"name": name, "url": url} + mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}' + return mock_card + + @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") def test_trust_remote_completion_status_true_returns_directly(): """When trust_remote_completion_status=True and A2A returns completed, return result directly.""" @@ -44,8 +54,7 @@ def test_trust_remote_completion_status_true_returns_directly(): patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, ): - mock_card = MagicMock() - mock_card.name = "Test" + mock_card = _create_mock_agent_card() mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) # A2A returns completed @@ -110,8 +119,7 @@ def test_trust_remote_completion_status_false_continues_conversation(): patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, ): - mock_card = MagicMock() - mock_card.name = "Test" + mock_card = _create_mock_agent_card() mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) # A2A returns completed