From 8f99fa76edbcfcaa7d459838ab96a6b18a844945 Mon Sep 17 00:00:00 2001 From: Koushiv Date: Mon, 12 Jan 2026 17:03:06 +0000 Subject: [PATCH] feat: additional a2a transports Co-authored-by: Koushiv Sadhukhan Co-authored-by: Greyson LaLonde --- docs/en/learn/a2a-agent-delegation.mdx | 4 +++ lib/crewai/src/crewai/a2a/config.py | 7 +++- lib/crewai/src/crewai/a2a/utils.py | 47 +++++++++++++++++++++++--- lib/crewai/src/crewai/a2a/wrapper.py | 2 ++ 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/docs/en/learn/a2a-agent-delegation.mdx b/docs/en/learn/a2a-agent-delegation.mdx index e4c9f8228..174939e3d 100644 --- a/docs/en/learn/a2a-agent-delegation.mdx +++ b/docs/en/learn/a2a-agent-delegation.mdx @@ -91,6 +91,10 @@ The `A2AConfig` class accepts the following parameters: Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`. + + Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`. + + ## Authentication For A2A agents that require authentication, use one of the provided auth schemes: diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index 26d3ce121..535db971f 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports. from __future__ import annotations -from typing import Annotated, Any, ClassVar +from typing import Annotated, Any, ClassVar, Literal from pydantic import ( BaseModel, @@ -53,6 +53,7 @@ class A2AConfig(BaseModel): fail_fast: If True, raise error when agent unreachable; if False, skip and continue. trust_remote_completion_status: If True, return A2A agent's result directly when completed. updates: Update mechanism config. + transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json). """ model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -82,3 +83,7 @@ class A2AConfig(BaseModel): default_factory=_get_default_update_config, description="Update mechanism config", ) + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field( + default="JSONRPC", + description="Specified mode of A2A transport protocol", + ) diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils.py index 4b3ba23e9..93aee5215 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils.py @@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping from contextlib import asynccontextmanager from functools import lru_cache import time -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import uuid from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory @@ -18,7 +18,6 @@ from a2a.types import ( PushNotificationConfig as A2APushNotificationConfig, Role, TextPart, - TransportProtocol, ) from aiocache import cached # type: ignore[import-untyped] from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] @@ -259,6 +258,7 @@ async def _afetch_agent_card_impl( def execute_a2a_delegation( endpoint: str, + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], auth: AuthScheme | None, timeout: int, task_description: str, @@ -282,6 +282,23 @@ def execute_a2a_delegation( use aexecute_a2a_delegation directly. Args: + endpoint: A2A agent endpoint URL (AgentCard URL) + transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json) + auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest) + timeout: Request timeout in seconds + task_description: The task to delegate + context: Optional context information + context_id: Context ID for correlating messages/tasks + task_id: Specific task identifier + reference_task_ids: List of related task IDs + metadata: Additional metadata (external_id, request_id, etc.) + extensions: Protocol extensions for custom fields + conversation_history: Previous Message objects from conversation + agent_id: Agent identifier for logging + agent_role: Role of the CrewAI agent delegating the task + agent_branch: Optional agent tree branch for logging + response_model: Optional Pydantic model for structured outputs + turn_number: Optional turn number for multi-turn conversations endpoint: A2A agent endpoint URL. auth: Optional AuthScheme for authentication. timeout: Request timeout in seconds. @@ -323,6 +340,7 @@ def execute_a2a_delegation( agent_role=agent_role, agent_branch=agent_branch, response_model=response_model, + transport_protocol=transport_protocol, turn_number=turn_number, updates=updates, ) @@ -333,6 +351,7 @@ def execute_a2a_delegation( async def aexecute_a2a_delegation( endpoint: str, + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], auth: AuthScheme | None, timeout: int, task_description: str, @@ -356,6 +375,23 @@ async def aexecute_a2a_delegation( in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()). Args: + endpoint: A2A agent endpoint URL + transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json) + auth: Optional AuthScheme for authentication + timeout: Request timeout in seconds + task_description: Task to delegate + context: Optional context + context_id: Context ID for correlation + task_id: Specific task identifier + reference_task_ids: Related task IDs + metadata: Additional metadata + extensions: Protocol extensions + conversation_history: Previous Message objects + turn_number: Current turn number + agent_branch: Agent tree branch for logging + agent_id: Agent identifier for logging + agent_role: Agent role for logging + response_model: Optional Pydantic model for structured outputs endpoint: A2A agent endpoint URL. auth: Optional AuthScheme for authentication. timeout: Request timeout in seconds. @@ -414,6 +450,7 @@ async def aexecute_a2a_delegation( agent_role=agent_role, response_model=response_model, updates=updates, + transport_protocol=transport_protocol, ) crewai_event_bus.emit( @@ -431,6 +468,7 @@ async def aexecute_a2a_delegation( async def _aexecute_a2a_delegation_impl( endpoint: str, + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], auth: AuthScheme | None, timeout: int, task_description: str, @@ -524,7 +562,6 @@ async def _aexecute_a2a_delegation_impl( extensions=extensions, ) - transport_protocol = TransportProtocol("JSONRPC") new_messages: list[Message] = [*conversation_history, message] crewai_event_bus.emit( None, @@ -596,7 +633,7 @@ async def _aexecute_a2a_delegation_impl( @asynccontextmanager async def _create_a2a_client( agent_card: AgentCard, - transport_protocol: TransportProtocol, + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], timeout: int, headers: MutableMapping[str, str], streaming: bool, @@ -640,7 +677,7 @@ async def _create_a2a_client( config = ClientConfig( httpx_client=httpx_client, - supported_transports=[str(transport_protocol.value)], + supported_transports=[transport_protocol], streaming=streaming and not use_polling, polling=use_polling, accepted_output_modes=["application/json"], diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 358d0fc79..7d52e85b8 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -771,6 +771,7 @@ def _delegate_to_a2a( response_model=agent_config.response_model, turn_number=turn_num + 1, updates=agent_config.updates, + transport_protocol=agent_config.transport_protocol, ) conversation_history = a2a_result.get("history", []) @@ -1085,6 +1086,7 @@ async def _adelegate_to_a2a( agent_branch=agent_branch, response_model=agent_config.response_model, turn_number=turn_num + 1, + transport_protocol=agent_config.transport_protocol, updates=agent_config.updates, )