feat: add a2a server config; agent card generation

This commit is contained in:
Greyson LaLonde
2026-01-14 22:09:11 -05:00
committed by GitHub
parent 9edbf89b68
commit 22f1812824
10 changed files with 1096 additions and 324 deletions

View File

@@ -5,18 +5,22 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations from __future__ import annotations
from typing import Annotated, Any, ClassVar, Literal from importlib.metadata import version
from typing import Any, ClassVar, Literal
from pydantic import ( from a2a.types import (
BaseModel, AgentCapabilities,
BeforeValidator, AgentCardSignature,
ConfigDict, AgentInterface,
Field, AgentProvider,
HttpUrl, AgentSkill,
TypeAdapter, SecurityScheme,
) )
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import deprecated
from crewai.a2a.auth.schemas import AuthScheme from crewai.a2a.auth.schemas import AuthScheme
from crewai.a2a.types import TransportType, Url
try: try:
@@ -25,25 +29,25 @@ except ImportError:
UpdateConfig = Any # type: ignore[misc,assignment] UpdateConfig = Any # type: ignore[misc,assignment]
http_url_adapter = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
def _get_default_update_config() -> UpdateConfig: def _get_default_update_config() -> UpdateConfig:
from crewai.a2a.updates import StreamingConfig from crewai.a2a.updates import StreamingConfig
return StreamingConfig() return StreamingConfig()
@deprecated(
"""
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0,
use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead.
""",
category=FutureWarning,
)
class A2AConfig(BaseModel): class A2AConfig(BaseModel):
"""Configuration for A2A protocol integration. """Configuration for A2A protocol integration.
Deprecated:
Use A2AClientConfig instead. This class will be removed in a future version.
Attributes: Attributes:
endpoint: A2A agent endpoint URL. endpoint: A2A agent endpoint URL.
auth: Authentication scheme. auth: Authentication scheme.
@@ -87,3 +91,176 @@ class A2AConfig(BaseModel):
default="JSONRPC", default="JSONRPC",
description="Specified mode of A2A transport protocol", description="Specified mode of A2A transport protocol",
) )
class A2AClientConfig(BaseModel):
"""Configuration for connecting to remote A2A agents.
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme.
timeout: Request timeout in seconds.
max_turns: Maximum conversation turns with A2A agent.
response_model: Optional Pydantic model for structured A2A agent responses.
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
accepted_output_modes: Media types the client can accept in responses.
supported_transports: Ordered list of transport protocols the client supports.
use_client_preference: Whether to prioritize client transport preferences over server.
extensions: Extension URIs the client supports.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
endpoint: Url = Field(description="A2A agent endpoint URL")
auth: AuthScheme | None = Field(
default=None,
description="Authentication scheme",
)
timeout: int = Field(default=120, description="Request timeout in seconds")
max_turns: int = Field(
default=10, description="Maximum conversation turns with A2A agent"
)
response_model: type[BaseModel] | None = Field(
default=None,
description="Optional Pydantic model for structured A2A agent responses",
)
fail_fast: bool = Field(
default=True,
description="If True, raise error when agent unreachable; if False, skip",
)
trust_remote_completion_status: bool = Field(
default=False,
description="If True, return A2A result directly when completed",
)
updates: UpdateConfig = Field(
default_factory=_get_default_update_config,
description="Update mechanism config",
)
accepted_output_modes: list[str] = Field(
default_factory=lambda: ["application/json"],
description="Media types the client can accept in responses",
)
supported_transports: list[str] = Field(
default_factory=lambda: ["JSONRPC"],
description="Ordered list of transport protocols the client supports",
)
use_client_preference: bool = Field(
default=False,
description="Whether to prioritize client transport preferences over server",
)
extensions: list[str] = Field(
default_factory=list,
description="Extension URIs the client supports",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
class A2AServerConfig(BaseModel):
"""Configuration for exposing a Crew or Agent as an A2A server.
All fields correspond to A2A AgentCard fields. Fields like name, description,
and skills can be auto-derived from the Crew/Agent if not provided.
Attributes:
name: Human-readable name for the agent.
description: Human-readable description of the agent.
version: Version string for the agent card.
skills: List of agent skills/capabilities.
default_input_modes: Default supported input MIME types.
default_output_modes: Default supported output MIME types.
capabilities: Declaration of optional capabilities.
preferred_transport: Transport protocol for the preferred endpoint.
protocol_version: A2A protocol version this agent supports.
provider: Information about the agent's service provider.
documentation_url: URL to the agent's documentation.
icon_url: URL to an icon for the agent.
additional_interfaces: Additional supported interfaces.
security: Security requirement objects for all interactions.
security_schemes: Security schemes available to authorize requests.
supports_authenticated_extended_card: Whether agent provides extended card to authenticated users.
url: Preferred endpoint URL for the agent.
signatures: JSON Web Signatures for the AgentCard.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
name: str | None = Field(
default=None,
description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.",
)
description: str | None = Field(
default=None,
description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.",
)
version: str = Field(
default="1.0.0",
description="Version string for the agent card",
)
skills: list[AgentSkill] = Field(
default_factory=list,
description="List of agent skills. Auto-derived from tasks/tools if not provided.",
)
default_input_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported input MIME types",
)
default_output_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported output MIME types",
)
capabilities: AgentCapabilities = Field(
default_factory=lambda: AgentCapabilities(
streaming=True,
push_notifications=False,
),
description="Declaration of optional capabilities supported by the agent",
)
preferred_transport: TransportType = Field(
default="JSONRPC",
description="Transport protocol for the preferred endpoint",
)
protocol_version: str = Field(
default_factory=lambda: version("a2a-sdk"),
description="A2A protocol version this agent supports",
)
provider: AgentProvider | None = Field(
default=None,
description="Information about the agent's service provider",
)
documentation_url: Url | None = Field(
default=None,
description="URL to the agent's documentation",
)
icon_url: Url | None = Field(
default=None,
description="URL to an icon for the agent",
)
additional_interfaces: list[AgentInterface] = Field(
default_factory=list,
description="Additional supported interfaces (transport and URL combinations)",
)
security: list[dict[str, list[str]]] = Field(
default_factory=list,
description="Security requirement objects for all agent interactions",
)
security_schemes: dict[str, SecurityScheme] = Field(
default_factory=dict,
description="Security schemes available to authorize requests",
)
supports_authenticated_extended_card: bool = Field(
default=False,
description="Whether agent provides extended card to authenticated users",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",
)
signatures: list[AgentCardSignature] = Field(
default_factory=list,
description="JSON Web Signatures for the AgentCard",
)

View File

@@ -1,7 +1,17 @@
"""Type definitions for A2A protocol message parts.""" """Type definitions for A2A protocol message parts."""
from typing import Any, Literal, Protocol, TypedDict, runtime_checkable from __future__ import annotations
from typing import (
Annotated,
Any,
Literal,
Protocol,
TypedDict,
runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
from typing_extensions import NotRequired from typing_extensions import NotRequired
from crewai.a2a.updates import ( from crewai.a2a.updates import (
@@ -15,6 +25,18 @@ from crewai.a2a.updates import (
) )
TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"]
http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
@runtime_checkable @runtime_checkable
class AgentResponseProtocol(Protocol): class AgentResponseProtocol(Protocol):
"""Protocol for the dynamically created AgentResponse model.""" """Protocol for the dynamically created AgentResponse model."""

View File

@@ -0,0 +1 @@
"""A2A utility modules for client operations."""

View File

@@ -0,0 +1,399 @@
"""AgentCard utilities for A2A client and server operations."""
from __future__ import annotations
import asyncio
from collections.abc import MutableMapping
from functools import lru_cache
import time
from types import MethodType
from typing import TYPE_CHECKING
from a2a.client.errors import A2AClientHTTPError
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import (
_auth_store,
configure_auth_client,
retry_on_401,
)
from crewai.a2a.config import A2AServerConfig
from crewai.crew import Crew
if TYPE_CHECKING:
from crewai.a2a.auth.schemas import AuthScheme
from crewai.agent import Agent
from crewai.task import Task
def _get_server_config(agent: Agent) -> A2AServerConfig | None:
"""Get A2AServerConfig from an agent's a2a configuration.
Args:
agent: The Agent instance to check.
Returns:
A2AServerConfig if present, None otherwise.
"""
if agent.a2a is None:
return None
if isinstance(agent.a2a, A2AServerConfig):
return agent.a2a
if isinstance(agent.a2a, list):
for config in agent.a2a:
if isinstance(config, A2AServerConfig):
return config
return None
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def _task_to_skill(task: Task) -> AgentSkill:
"""Convert a CrewAI Task to an A2A AgentSkill.
Args:
task: The CrewAI Task to convert.
Returns:
AgentSkill representing the task's capability.
"""
task_name = task.name or task.description[:50]
task_id = task_name.lower().replace(" ", "_")
tags: list[str] = []
if task.agent:
tags.append(task.agent.role.lower().replace(" ", "-"))
return AgentSkill(
id=task_id,
name=task_name,
description=task.description,
tags=tags,
examples=[task.expected_output] if task.expected_output else None,
)
def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill:
"""Convert an Agent's tool to an A2A AgentSkill.
Args:
tool_name: Name of the tool.
tool_description: Description of what the tool does.
Returns:
AgentSkill representing the tool's capability.
"""
tool_id = tool_name.lower().replace(" ", "_")
return AgentSkill(
id=tool_id,
name=tool_name,
description=tool_description,
tags=[tool_name.lower().replace(" ", "-")],
)
def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard:
"""Generate an A2A AgentCard from a Crew instance.
Args:
crew: The Crew instance to generate a card for.
url: The base URL where this crew will be exposed.
Returns:
AgentCard describing the crew's capabilities.
"""
crew_name = getattr(crew, "name", None) or crew.__class__.__name__
description_parts: list[str] = []
crew_description = getattr(crew, "description", None)
if crew_description:
description_parts.append(crew_description)
else:
agent_roles = [agent.role for agent in crew.agents]
description_parts.append(
f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}"
)
skills = [_task_to_skill(task) for task in crew.tasks]
return AgentCard(
name=crew_name,
description=" ".join(description_parts),
url=url,
version="1.0.0",
capabilities=AgentCapabilities(
streaming=True,
push_notifications=True,
),
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
skills=skills,
)
def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard:
"""Generate an A2A AgentCard from an Agent instance.
Uses A2AServerConfig values when available, falling back to agent properties.
Args:
agent: The Agent instance to generate a card for.
url: The base URL where this agent will be exposed.
Returns:
AgentCard describing the agent's capabilities.
"""
server_config = _get_server_config(agent) or A2AServerConfig()
name = server_config.name or agent.role
description_parts = [agent.goal]
if agent.backstory:
description_parts.append(agent.backstory)
description = server_config.description or " ".join(description_parts)
skills: list[AgentSkill] = (
server_config.skills.copy() if server_config.skills else []
)
if not skills:
if agent.tools:
for tool in agent.tools:
tool_name = getattr(tool, "name", None) or tool.__class__.__name__
tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}"
skills.append(_tool_to_skill(tool_name, tool_desc))
if not skills:
skills.append(
AgentSkill(
id=agent.role.lower().replace(" ", "_"),
name=agent.role,
description=agent.goal,
tags=[agent.role.lower().replace(" ", "-")],
)
)
return AgentCard(
name=name,
description=description,
url=server_config.url or url,
version=server_config.version,
capabilities=server_config.capabilities,
default_input_modes=server_config.default_input_modes,
default_output_modes=server_config.default_output_modes,
skills=skills,
protocol_version=server_config.protocol_version,
provider=server_config.provider,
documentation_url=server_config.documentation_url,
icon_url=server_config.icon_url,
additional_interfaces=server_config.additional_interfaces,
security=server_config.security,
security_schemes=server_config.security_schemes,
supports_authenticated_extended_card=server_config.supports_authenticated_extended_card,
signatures=server_config.signatures,
)
def inject_a2a_server_methods(agent: Agent) -> None:
"""Inject A2A server methods onto an Agent instance.
Adds a `to_agent_card(url: str) -> AgentCard` method to the agent
that generates an A2A-compliant AgentCard.
Only injects if the agent has an A2AServerConfig.
Args:
agent: The Agent instance to inject methods onto.
"""
if _get_server_config(agent) is None:
return
def _to_agent_card(self: Agent, url: str) -> AgentCard:
return _agent_to_agent_card(self, url)
object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent))

View File

@@ -1,16 +1,14 @@
"""Utility functions for A2A (Agent-to-Agent) protocol delegation.""" """A2A delegation utilities for executing tasks on remote agents."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import AsyncIterator, MutableMapping from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
import uuid import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory from a2a.client import Client, ClientConfig, ClientFactory
from a2a.types import ( from a2a.types import (
AgentCard, AgentCard,
Message, Message,
@@ -19,19 +17,15 @@ from a2a.types import (
Role, Role,
TextPart, TextPart,
) )
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx import httpx
from pydantic import BaseModel, Field, create_model from pydantic import BaseModel
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import ( from crewai.a2a.auth.utils import (
_auth_store, _auth_store,
configure_auth_client, configure_auth_client,
retry_on_401,
validate_auth_against_agent_card, validate_auth_against_agent_card,
) )
from crewai.a2a.config import A2AConfig
from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.types import ( from crewai.a2a.types import (
HANDLER_REGISTRY, HANDLER_REGISTRY,
@@ -45,6 +39,7 @@ from crewai.a2a.updates import (
StreamingHandler, StreamingHandler,
UpdateConfig, UpdateConfig,
) )
from crewai.a2a.utils.agent_card import _afetch_agent_card_cached
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import ( from crewai.events.types.a2a_events import (
A2AConversationStartedEvent, A2AConversationStartedEvent,
@@ -52,7 +47,6 @@ from crewai.events.types.a2a_events import (
A2ADelegationStartedEvent, A2ADelegationStartedEvent,
A2AMessageSentEvent, A2AMessageSentEvent,
) )
from crewai.types.utils import create_literals_from_strings
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -75,187 +69,6 @@ def get_handler(config: UpdateConfig | None) -> HandlerType:
return HANDLER_REGISTRY.get(type(config), StreamingHandler) return HANDLER_REGISTRY.get(type(config), StreamingHandler)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
use_cache: Whether to use caching (default True)
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes)
Returns:
AgentCard object with agent capabilities and skills
Raises:
httpx.HTTPStatusError: If the request fails
A2AClientHTTPError: If authentication fails
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def execute_a2a_delegation( def execute_a2a_delegation(
endpoint: str, endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
@@ -644,19 +457,18 @@ async def _create_a2a_client(
"""Create and configure an A2A client. """Create and configure an A2A client.
Args: Args:
agent_card: The A2A agent card agent_card: The A2A agent card.
transport_protocol: Transport protocol to use transport_protocol: Transport protocol to use.
timeout: Request timeout in seconds timeout: Request timeout in seconds.
headers: HTTP headers (already with auth applied) headers: HTTP headers (already with auth applied).
streaming: Enable streaming responses streaming: Enable streaming responses.
auth: Optional AuthScheme for client configuration auth: Optional AuthScheme for client configuration.
use_polling: Enable polling mode use_polling: Enable polling mode.
push_notification_config: Optional push notification config to include in requests push_notification_config: Optional push notification config.
Yields: Yields:
Configured A2A client instance Configured A2A client instance.
""" """
async with httpx.AsyncClient( async with httpx.AsyncClient(
timeout=timeout, timeout=timeout,
headers=headers, headers=headers,
@@ -687,78 +499,3 @@ async def _create_a2a_client(
factory = ClientFactory(config) factory = ClientFactory(config)
client = factory.create(agent_card) client = factory.create(agent_card)
yield client yield client
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field
"""
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Args:
a2a_config: A2A configuration
Returns:
List of A2A agent IDs
"""
if a2a_config is None:
return [], ()
if isinstance(a2a_config, A2AConfig):
a2a_agents = [a2a_config]
else:
a2a_agents = a2a_config
return a2a_agents, tuple(config.endpoint for config in a2a_agents)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], type[BaseModel]]:
"""Get A2A agent IDs and response model.
Args:
a2a_config: A2A configuration
Returns:
Tuple of A2A agent IDs and response model
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -0,0 +1,97 @@
"""Response model utilities for A2A agent interactions."""
from __future__ import annotations
from typing import TypeAlias
from pydantic import BaseModel, Field, create_model
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs.
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field.
"""
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs list and agent endpoint IDs.
"""
if a2a_config is None:
return [], ()
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
configs = [a2a_config]
else:
configs = a2a_config
# Filter to only client configs (those with endpoint)
client_configs: list[A2AClientConfigTypes] = [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
return client_configs, tuple(config.endpoint for config in client_configs)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], type[BaseModel]]:
"""Get A2A agent configs and response model.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs and response model.
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any
from a2a.types import Role, TaskState from a2a.types import Role, TaskState
from pydantic import BaseModel, ValidationError from pydantic import BaseModel, ValidationError
from crewai.a2a.config import A2AConfig from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.extensions.base import ExtensionRegistry from crewai.a2a.extensions.base import ExtensionRegistry
from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.templates import ( from crewai.a2a.templates import (
@@ -26,13 +26,16 @@ from crewai.a2a.templates import (
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE, UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
) )
from crewai.a2a.types import AgentResponseProtocol from crewai.a2a.types import AgentResponseProtocol
from crewai.a2a.utils import ( from crewai.a2a.utils.agent_card import (
aexecute_a2a_delegation,
afetch_agent_card, afetch_agent_card,
execute_a2a_delegation,
fetch_agent_card, fetch_agent_card,
get_a2a_agents_and_response_model, inject_a2a_server_methods,
) )
from crewai.a2a.utils.delegation import (
aexecute_a2a_delegation,
execute_a2a_delegation,
)
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import ( from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent, A2AConversationCompletedEvent,
@@ -122,10 +125,12 @@ def wrap_agent_with_a2a_instance(
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
) )
inject_a2a_server_methods(agent)
def _fetch_card_from_config( def _fetch_card_from_config(
config: A2AConfig, config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig, AgentCard | Exception]: ) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config. """Fetch agent card from A2A config.
Args: Args:
@@ -146,7 +151,7 @@ def _fetch_card_from_config(
def _fetch_agent_cards_concurrently( def _fetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]: ) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents. """Fetch agent cards concurrently for multiple A2A agents.
@@ -181,7 +186,7 @@ def _fetch_agent_cards_concurrently(
def _execute_task_with_a2a( def _execute_task_with_a2a(
self: Agent, self: Agent,
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., str], original_fn: Callable[..., str],
task: Task, task: Task,
agent_response_model: type[BaseModel], agent_response_model: type[BaseModel],
@@ -270,7 +275,7 @@ def _execute_task_with_a2a(
def _augment_prompt_with_a2a( def _augment_prompt_with_a2a(
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
task_description: str, task_description: str,
agent_cards: dict[str, AgentCard], agent_cards: dict[str, AgentCard],
conversation_history: list[Message] | None = None, conversation_history: list[Message] | None = None,
@@ -523,11 +528,11 @@ def _prepare_delegation_context(
task: Task, task: Task,
original_task_description: str | None, original_task_description: str | None,
) -> tuple[ ) -> tuple[
list[A2AConfig], list[A2AConfig | A2AClientConfig],
type[BaseModel], type[BaseModel],
str, str,
str, str,
A2AConfig, A2AConfig | A2AClientConfig,
str | None, str | None,
str | None, str | None,
dict[str, Any] | None, dict[str, Any] | None,
@@ -591,7 +596,7 @@ def _handle_task_completion(
task: Task, task: Task,
task_id_config: str | None, task_id_config: str | None,
reference_task_ids: list[str], reference_task_ids: list[str],
agent_config: A2AConfig, agent_config: A2AConfig | A2AClientConfig,
turn_num: int, turn_num: int,
) -> tuple[str | None, str | None, list[str]]: ) -> tuple[str | None, str | None, list[str]]:
"""Handle task completion state including reference task updates. """Handle task completion state including reference task updates.
@@ -631,7 +636,7 @@ def _handle_agent_response_and_continue(
a2a_result: TaskStateResult, a2a_result: TaskStateResult,
agent_id: str, agent_id: str,
agent_cards: dict[str, AgentCard] | None, agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
original_task_description: str, original_task_description: str,
conversation_history: list[Message], conversation_history: list[Message],
turn_num: int, turn_num: int,
@@ -868,8 +873,8 @@ def _delegate_to_a2a(
async def _afetch_card_from_config( async def _afetch_card_from_config(
config: A2AConfig, config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig, AgentCard | Exception]: ) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config asynchronously.""" """Fetch agent card from A2A config asynchronously."""
try: try:
card = await afetch_agent_card( card = await afetch_agent_card(
@@ -883,7 +888,7 @@ async def _afetch_card_from_config(
async def _afetch_agent_cards_concurrently( async def _afetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]: ) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents using asyncio.""" """Fetch agent cards concurrently for multiple A2A agents using asyncio."""
agent_cards: dict[str, AgentCard] = {} agent_cards: dict[str, AgentCard] = {}
@@ -908,7 +913,7 @@ async def _afetch_agent_cards_concurrently(
async def _aexecute_task_with_a2a( async def _aexecute_task_with_a2a(
self: Agent, self: Agent,
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]], original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task, task: Task,
agent_response_model: type[BaseModel], agent_response_model: type[BaseModel],
@@ -987,7 +992,7 @@ async def _ahandle_agent_response_and_continue(
a2a_result: TaskStateResult, a2a_result: TaskStateResult,
agent_id: str, agent_id: str,
agent_cards: dict[str, AgentCard] | None, agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig], a2a_agents: list[A2AConfig | A2AClientConfig],
original_task_description: str, original_task_description: str,
conversation_history: list[Message], conversation_history: list[Message],
turn_num: int, turn_num: int,

View File

@@ -17,7 +17,7 @@ from urllib.parse import urlparse
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
from typing_extensions import Self from typing_extensions import Self
from crewai.a2a.config import A2AConfig from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.agent.utils import ( from crewai.agent.utils import (
ahandle_knowledge_retrieval, ahandle_knowledge_retrieval,
apply_training_data, apply_training_data,
@@ -73,7 +73,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
from crewai.utilities.converter import Converter from crewai.utilities.converter import Converter
from crewai.utilities.guardrail_types import GuardrailType from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.training_handler import CrewTrainingHandler
@@ -218,9 +218,18 @@ class Agent(BaseAgent):
guardrail_max_retries: int = Field( guardrail_max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails" default=3, description="Maximum number of retries when guardrail fails"
) )
a2a: list[A2AConfig] | A2AConfig | None = Field( a2a: (
list[A2AConfig | A2AServerConfig | A2AClientConfig]
| A2AConfig
| A2AServerConfig
| A2AClientConfig
| None
) = Field(
default=None, default=None,
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", description="""
A2A (Agent-to-Agent) configuration for delegating tasks to remote agents.
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
) )
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field( executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
default=CrewAgentExecutor, default=CrewAgentExecutor,
@@ -733,7 +742,7 @@ class Agent(BaseAgent):
if self.agent_executor is not None: if self.agent_executor is not None:
self._update_executor_parameters( self._update_executor_parameters(
task=task, task=task,
tools=parsed_tools, tools=parsed_tools, # type: ignore[arg-type]
raw_tools=raw_tools, raw_tools=raw_tools,
prompt=prompt, prompt=prompt,
stop_words=stop_words, stop_words=stop_words,
@@ -742,7 +751,7 @@ class Agent(BaseAgent):
else: else:
self.agent_executor = self.executor_class( self.agent_executor = self.executor_class(
llm=cast(BaseLLM, self.llm), llm=cast(BaseLLM, self.llm),
task=task, task=task, # type: ignore[arg-type]
i18n=self.i18n, i18n=self.i18n,
agent=self, agent=self,
crew=self.crew, crew=self.crew,
@@ -765,11 +774,11 @@ class Agent(BaseAgent):
def _update_executor_parameters( def _update_executor_parameters(
self, self,
task: Task | None, task: Task | None,
tools: list, tools: list[BaseTool],
raw_tools: list[BaseTool], raw_tools: list[BaseTool],
prompt: dict, prompt: SystemPromptResult | StandardPromptResult,
stop_words: list[str], stop_words: list[str],
rpm_limit_fn: Callable | None, rpm_limit_fn: Callable | None, # type: ignore[type-arg]
) -> None: ) -> None:
"""Update executor parameters without recreating instance. """Update executor parameters without recreating instance.

View File

@@ -0,0 +1,325 @@
"""Tests for A2A agent card utilities."""
from __future__ import annotations
from a2a.types import AgentCard, AgentSkill
from crewai import Agent
from crewai.a2a.config import A2AClientConfig, A2AServerConfig
from crewai.a2a.utils.agent_card import inject_a2a_server_methods
class TestInjectA2AServerMethods:
"""Tests for inject_a2a_server_methods function."""
def test_agent_with_server_config_gets_to_agent_card_method(self) -> None:
"""Agent with A2AServerConfig should have to_agent_card method injected."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_agent_without_server_config_no_injection(self) -> None:
"""Agent without A2AServerConfig should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AClientConfig(endpoint="http://example.com"),
)
assert not hasattr(agent, "to_agent_card")
def test_agent_without_a2a_no_injection(self) -> None:
"""Agent without any a2a config should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
assert not hasattr(agent, "to_agent_card")
def test_agent_with_mixed_configs_gets_injection(self) -> None:
"""Agent with list containing A2AServerConfig should get to_agent_card."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=[
A2AClientConfig(endpoint="http://example.com"),
A2AServerConfig(name="My Agent"),
],
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_manual_injection_on_plain_agent(self) -> None:
"""inject_a2a_server_methods should work when called manually."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
# Manually set server config and inject
object.__setattr__(agent, "a2a", A2AServerConfig())
inject_a2a_server_methods(agent)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
class TestToAgentCard:
"""Tests for the injected to_agent_card method."""
def test_returns_agent_card(self) -> None:
"""to_agent_card should return an AgentCard instance."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert isinstance(card, AgentCard)
def test_uses_agent_role_as_name(self) -> None:
"""AgentCard name should default to agent role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Data Analyst"
def test_uses_server_config_name(self) -> None:
"""AgentCard name should prefer A2AServerConfig.name over role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(name="Custom Agent Name"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Custom Agent Name"
def test_uses_goal_as_description(self) -> None:
"""AgentCard description should include agent goal."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert "Accomplish important tasks" in card.description
def test_uses_server_config_description(self) -> None:
"""AgentCard description should prefer A2AServerConfig.description."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(description="Custom description"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.description == "Custom description"
def test_uses_provided_url(self) -> None:
"""AgentCard url should use the provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://my-server.com:9000")
assert card.url == "http://my-server.com:9000"
def test_uses_server_config_url(self) -> None:
"""AgentCard url should prefer A2AServerConfig.url over provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(url="http://configured-url.com"),
)
card = agent.to_agent_card("http://fallback-url.com")
assert card.url == "http://configured-url.com/"
def test_generates_default_skill(self) -> None:
"""AgentCard should have at least one skill based on agent role."""
agent = Agent(
role="Research Assistant",
goal="Help with research",
backstory="Skilled researcher",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) >= 1
skill = card.skills[0]
assert skill.name == "Research Assistant"
assert skill.description == "Help with research"
def test_uses_server_config_skills(self) -> None:
"""AgentCard skills should prefer A2AServerConfig.skills."""
custom_skill = AgentSkill(
id="custom-skill",
name="Custom Skill",
description="A custom skill",
tags=["custom"],
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(skills=[custom_skill]),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) == 1
assert card.skills[0].id == "custom-skill"
assert card.skills[0].name == "Custom Skill"
def test_includes_custom_version(self) -> None:
"""AgentCard should include version from A2AServerConfig."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(version="2.0.0"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "2.0.0"
def test_default_version(self) -> None:
"""AgentCard should have default version 1.0.0."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "1.0.0"
class TestAgentCardJsonStructure:
"""Tests for the JSON structure of AgentCard."""
def test_json_has_required_fields(self) -> None:
"""AgentCard JSON should contain all required A2A protocol fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert "name" in json_data
assert "description" in json_data
assert "url" in json_data
assert "version" in json_data
assert "skills" in json_data
assert "capabilities" in json_data
assert "defaultInputModes" in json_data
assert "defaultOutputModes" in json_data
def test_json_skills_structure(self) -> None:
"""Each skill in JSON should have required fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert len(json_data["skills"]) >= 1
skill = json_data["skills"][0]
assert "id" in skill
assert "name" in skill
assert "description" in skill
assert "tags" in skill
def test_json_capabilities_structure(self) -> None:
"""Capabilities in JSON should have expected fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
capabilities = json_data["capabilities"]
assert "streaming" in capabilities
assert "pushNotifications" in capabilities
def test_json_serializable(self) -> None:
"""AgentCard should be JSON serializable."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_str = card.model_dump_json()
assert isinstance(json_str, str)
assert "Test Agent" in json_str
assert "http://localhost:8000" in json_str
def test_json_excludes_none_values(self) -> None:
"""AgentCard JSON with exclude_none should omit None fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump(exclude_none=True)
assert "provider" not in json_data
assert "documentationUrl" not in json_data
assert "iconUrl" not in json_data

View File

@@ -117,7 +117,7 @@ show_error_codes = true
warn_unused_ignores = true warn_unused_ignores = true
python_version = "3.12" python_version = "3.12"
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)" exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)"
plugins = ["pydantic.mypy", "crewai.mypy"] plugins = ["pydantic.mypy"]
[tool.bandit] [tool.bandit]