From 22f1812824c841ec24a5ade81b98122fcf669b17 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 14 Jan 2026 22:09:11 -0500 Subject: [PATCH] feat: add a2a server config; agent card generation --- lib/crewai/src/crewai/a2a/config.py | 213 +++++++++- lib/crewai/src/crewai/a2a/types.py | 24 +- lib/crewai/src/crewai/a2a/utils/__init__.py | 1 + lib/crewai/src/crewai/a2a/utils/agent_card.py | 399 ++++++++++++++++++ .../a2a/{utils.py => utils/delegation.py} | 289 +------------ .../src/crewai/a2a/utils/response_model.py | 97 +++++ lib/crewai/src/crewai/a2a/wrapper.py | 43 +- lib/crewai/src/crewai/agent/core.py | 27 +- lib/crewai/tests/a2a/utils/test_agent_card.py | 325 ++++++++++++++ pyproject.toml | 2 +- 10 files changed, 1096 insertions(+), 324 deletions(-) create mode 100644 lib/crewai/src/crewai/a2a/utils/__init__.py create mode 100644 lib/crewai/src/crewai/a2a/utils/agent_card.py rename lib/crewai/src/crewai/a2a/{utils.py => utils/delegation.py} (64%) create mode 100644 lib/crewai/src/crewai/a2a/utils/response_model.py create mode 100644 lib/crewai/tests/a2a/utils/test_agent_card.py diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index 535db971f..1597ae821 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -5,18 +5,22 @@ This module is separate from experimental.a2a to avoid circular imports. from __future__ import annotations -from typing import Annotated, Any, ClassVar, Literal +from importlib.metadata import version +from typing import Any, ClassVar, Literal -from pydantic import ( - BaseModel, - BeforeValidator, - ConfigDict, - Field, - HttpUrl, - TypeAdapter, +from a2a.types import ( + AgentCapabilities, + AgentCardSignature, + AgentInterface, + AgentProvider, + AgentSkill, + SecurityScheme, ) +from pydantic import BaseModel, ConfigDict, Field +from typing_extensions import deprecated from crewai.a2a.auth.schemas import AuthScheme +from crewai.a2a.types import TransportType, Url try: @@ -25,25 +29,25 @@ except ImportError: UpdateConfig = Any # type: ignore[misc,assignment] -http_url_adapter = TypeAdapter(HttpUrl) - -Url = Annotated[ - str, - BeforeValidator( - lambda value: str(http_url_adapter.validate_python(value, strict=True)) - ), -] - - def _get_default_update_config() -> UpdateConfig: from crewai.a2a.updates import StreamingConfig return StreamingConfig() +@deprecated( + """ + `crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0, + use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead. + """, + category=FutureWarning, +) class A2AConfig(BaseModel): """Configuration for A2A protocol integration. + Deprecated: + Use A2AClientConfig instead. This class will be removed in a future version. + Attributes: endpoint: A2A agent endpoint URL. auth: Authentication scheme. @@ -87,3 +91,176 @@ class A2AConfig(BaseModel): default="JSONRPC", description="Specified mode of A2A transport protocol", ) + + +class A2AClientConfig(BaseModel): + """Configuration for connecting to remote A2A agents. + + Attributes: + endpoint: A2A agent endpoint URL. + auth: Authentication scheme. + timeout: Request timeout in seconds. + max_turns: Maximum conversation turns with A2A agent. + response_model: Optional Pydantic model for structured A2A agent responses. + fail_fast: If True, raise error when agent unreachable; if False, skip and continue. + trust_remote_completion_status: If True, return A2A agent's result directly when completed. + updates: Update mechanism config. + accepted_output_modes: Media types the client can accept in responses. + supported_transports: Ordered list of transport protocols the client supports. + use_client_preference: Whether to prioritize client transport preferences over server. + extensions: Extension URIs the client supports. + """ + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + endpoint: Url = Field(description="A2A agent endpoint URL") + auth: AuthScheme | None = Field( + default=None, + description="Authentication scheme", + ) + timeout: int = Field(default=120, description="Request timeout in seconds") + max_turns: int = Field( + default=10, description="Maximum conversation turns with A2A agent" + ) + response_model: type[BaseModel] | None = Field( + default=None, + description="Optional Pydantic model for structured A2A agent responses", + ) + fail_fast: bool = Field( + default=True, + description="If True, raise error when agent unreachable; if False, skip", + ) + trust_remote_completion_status: bool = Field( + default=False, + description="If True, return A2A result directly when completed", + ) + updates: UpdateConfig = Field( + default_factory=_get_default_update_config, + description="Update mechanism config", + ) + accepted_output_modes: list[str] = Field( + default_factory=lambda: ["application/json"], + description="Media types the client can accept in responses", + ) + supported_transports: list[str] = Field( + default_factory=lambda: ["JSONRPC"], + description="Ordered list of transport protocols the client supports", + ) + use_client_preference: bool = Field( + default=False, + description="Whether to prioritize client transport preferences over server", + ) + extensions: list[str] = Field( + default_factory=list, + description="Extension URIs the client supports", + ) + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field( + default="JSONRPC", + description="Specified mode of A2A transport protocol", + ) + + +class A2AServerConfig(BaseModel): + """Configuration for exposing a Crew or Agent as an A2A server. + + All fields correspond to A2A AgentCard fields. Fields like name, description, + and skills can be auto-derived from the Crew/Agent if not provided. + + Attributes: + name: Human-readable name for the agent. + description: Human-readable description of the agent. + version: Version string for the agent card. + skills: List of agent skills/capabilities. + default_input_modes: Default supported input MIME types. + default_output_modes: Default supported output MIME types. + capabilities: Declaration of optional capabilities. + preferred_transport: Transport protocol for the preferred endpoint. + protocol_version: A2A protocol version this agent supports. + provider: Information about the agent's service provider. + documentation_url: URL to the agent's documentation. + icon_url: URL to an icon for the agent. + additional_interfaces: Additional supported interfaces. + security: Security requirement objects for all interactions. + security_schemes: Security schemes available to authorize requests. + supports_authenticated_extended_card: Whether agent provides extended card to authenticated users. + url: Preferred endpoint URL for the agent. + signatures: JSON Web Signatures for the AgentCard. + """ + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + name: str | None = Field( + default=None, + description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.", + ) + description: str | None = Field( + default=None, + description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.", + ) + version: str = Field( + default="1.0.0", + description="Version string for the agent card", + ) + skills: list[AgentSkill] = Field( + default_factory=list, + description="List of agent skills. Auto-derived from tasks/tools if not provided.", + ) + default_input_modes: list[str] = Field( + default_factory=lambda: ["text/plain", "application/json"], + description="Default supported input MIME types", + ) + default_output_modes: list[str] = Field( + default_factory=lambda: ["text/plain", "application/json"], + description="Default supported output MIME types", + ) + capabilities: AgentCapabilities = Field( + default_factory=lambda: AgentCapabilities( + streaming=True, + push_notifications=False, + ), + description="Declaration of optional capabilities supported by the agent", + ) + preferred_transport: TransportType = Field( + default="JSONRPC", + description="Transport protocol for the preferred endpoint", + ) + protocol_version: str = Field( + default_factory=lambda: version("a2a-sdk"), + description="A2A protocol version this agent supports", + ) + provider: AgentProvider | None = Field( + default=None, + description="Information about the agent's service provider", + ) + documentation_url: Url | None = Field( + default=None, + description="URL to the agent's documentation", + ) + icon_url: Url | None = Field( + default=None, + description="URL to an icon for the agent", + ) + additional_interfaces: list[AgentInterface] = Field( + default_factory=list, + description="Additional supported interfaces (transport and URL combinations)", + ) + security: list[dict[str, list[str]]] = Field( + default_factory=list, + description="Security requirement objects for all agent interactions", + ) + security_schemes: dict[str, SecurityScheme] = Field( + default_factory=dict, + description="Security schemes available to authorize requests", + ) + supports_authenticated_extended_card: bool = Field( + default=False, + description="Whether agent provides extended card to authenticated users", + ) + url: Url | None = Field( + default=None, + description="Preferred endpoint URL for the agent. Set at runtime if not provided.", + ) + signatures: list[AgentCardSignature] = Field( + default_factory=list, + description="JSON Web Signatures for the AgentCard", + ) diff --git a/lib/crewai/src/crewai/a2a/types.py b/lib/crewai/src/crewai/a2a/types.py index 217b59467..90473b669 100644 --- a/lib/crewai/src/crewai/a2a/types.py +++ b/lib/crewai/src/crewai/a2a/types.py @@ -1,7 +1,17 @@ """Type definitions for A2A protocol message parts.""" -from typing import Any, Literal, Protocol, TypedDict, runtime_checkable +from __future__ import annotations +from typing import ( + Annotated, + Any, + Literal, + Protocol, + TypedDict, + runtime_checkable, +) + +from pydantic import BeforeValidator, HttpUrl, TypeAdapter from typing_extensions import NotRequired from crewai.a2a.updates import ( @@ -15,6 +25,18 @@ from crewai.a2a.updates import ( ) +TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"] + +http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl) + +Url = Annotated[ + str, + BeforeValidator( + lambda value: str(http_url_adapter.validate_python(value, strict=True)) + ), +] + + @runtime_checkable class AgentResponseProtocol(Protocol): """Protocol for the dynamically created AgentResponse model.""" diff --git a/lib/crewai/src/crewai/a2a/utils/__init__.py b/lib/crewai/src/crewai/a2a/utils/__init__.py new file mode 100644 index 000000000..bdb7bed62 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/__init__.py @@ -0,0 +1 @@ +"""A2A utility modules for client operations.""" diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py new file mode 100644 index 000000000..7c798dc1a --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -0,0 +1,399 @@ +"""AgentCard utilities for A2A client and server operations.""" + +from __future__ import annotations + +import asyncio +from collections.abc import MutableMapping +from functools import lru_cache +import time +from types import MethodType +from typing import TYPE_CHECKING + +from a2a.client.errors import A2AClientHTTPError +from a2a.types import AgentCapabilities, AgentCard, AgentSkill +from aiocache import cached # type: ignore[import-untyped] +from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] +import httpx + +from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth +from crewai.a2a.auth.utils import ( + _auth_store, + configure_auth_client, + retry_on_401, +) +from crewai.a2a.config import A2AServerConfig +from crewai.crew import Crew + + +if TYPE_CHECKING: + from crewai.a2a.auth.schemas import AuthScheme + from crewai.agent import Agent + from crewai.task import Task + + +def _get_server_config(agent: Agent) -> A2AServerConfig | None: + """Get A2AServerConfig from an agent's a2a configuration. + + Args: + agent: The Agent instance to check. + + Returns: + A2AServerConfig if present, None otherwise. + """ + if agent.a2a is None: + return None + if isinstance(agent.a2a, A2AServerConfig): + return agent.a2a + if isinstance(agent.a2a, list): + for config in agent.a2a: + if isinstance(config, A2AServerConfig): + return config + return None + + +def fetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, + cache_ttl: int = 300, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint with optional caching. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + cache_ttl: Cache TTL in seconds (default 300 = 5 minutes). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + ttl_hash = int(time.time() // cache_ttl) + return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +async def afetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint asynchronously. + + Native async implementation. Use this when running in an async context. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + agent_card: AgentCard = await _afetch_agent_card_cached( + endpoint, auth_hash, timeout + ) + return agent_card + + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +@lru_cache() +def _fetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, + _ttl_hash: int, +) -> AgentCard: + """Cached sync version of fetch_agent_card.""" + auth = _auth_store.get(auth_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] +async def _afetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, +) -> AgentCard: + """Cached async implementation of AgentCard fetching.""" + auth = _auth_store.get(auth_hash) + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +async def _afetch_agent_card_impl( + endpoint: str, + auth: AuthScheme | None, + timeout: int, +) -> AgentCard: + """Internal async implementation of AgentCard fetching.""" + if "/.well-known/agent-card.json" in endpoint: + base_url = endpoint.replace("/.well-known/agent-card.json", "") + agent_card_path = "/.well-known/agent-card.json" + else: + url_parts = endpoint.split("/", 3) + base_url = f"{url_parts[0]}//{url_parts[2]}" + agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" + + headers: MutableMapping[str, str] = {} + if auth: + async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: + if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_auth_client) + headers = await auth.apply_auth(temp_auth_client, {}) + + async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: + if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_client) + + agent_card_url = f"{base_url}{agent_card_path}" + + async def _fetch_agent_card_request() -> httpx.Response: + return await temp_client.get(agent_card_url) + + try: + response = await retry_on_401( + request_func=_fetch_agent_card_request, + auth_scheme=auth, + client=temp_client, + headers=temp_client.headers, + max_retries=2, + ) + response.raise_for_status() + + return AgentCard.model_validate(response.json()) + + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + error_details = ["Authentication failed"] + www_auth = e.response.headers.get("WWW-Authenticate") + if www_auth: + error_details.append(f"WWW-Authenticate: {www_auth}") + if not auth: + error_details.append("No auth scheme provided") + msg = " | ".join(error_details) + raise A2AClientHTTPError(401, msg) from e + raise + + +def _task_to_skill(task: Task) -> AgentSkill: + """Convert a CrewAI Task to an A2A AgentSkill. + + Args: + task: The CrewAI Task to convert. + + Returns: + AgentSkill representing the task's capability. + """ + task_name = task.name or task.description[:50] + task_id = task_name.lower().replace(" ", "_") + + tags: list[str] = [] + if task.agent: + tags.append(task.agent.role.lower().replace(" ", "-")) + + return AgentSkill( + id=task_id, + name=task_name, + description=task.description, + tags=tags, + examples=[task.expected_output] if task.expected_output else None, + ) + + +def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill: + """Convert an Agent's tool to an A2A AgentSkill. + + Args: + tool_name: Name of the tool. + tool_description: Description of what the tool does. + + Returns: + AgentSkill representing the tool's capability. + """ + tool_id = tool_name.lower().replace(" ", "_") + + return AgentSkill( + id=tool_id, + name=tool_name, + description=tool_description, + tags=[tool_name.lower().replace(" ", "-")], + ) + + +def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard: + """Generate an A2A AgentCard from a Crew instance. + + Args: + crew: The Crew instance to generate a card for. + url: The base URL where this crew will be exposed. + + Returns: + AgentCard describing the crew's capabilities. + """ + crew_name = getattr(crew, "name", None) or crew.__class__.__name__ + + description_parts: list[str] = [] + crew_description = getattr(crew, "description", None) + if crew_description: + description_parts.append(crew_description) + else: + agent_roles = [agent.role for agent in crew.agents] + description_parts.append( + f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}" + ) + + skills = [_task_to_skill(task) for task in crew.tasks] + + return AgentCard( + name=crew_name, + description=" ".join(description_parts), + url=url, + version="1.0.0", + capabilities=AgentCapabilities( + streaming=True, + push_notifications=True, + ), + default_input_modes=["text/plain", "application/json"], + default_output_modes=["text/plain", "application/json"], + skills=skills, + ) + + +def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard: + """Generate an A2A AgentCard from an Agent instance. + + Uses A2AServerConfig values when available, falling back to agent properties. + + Args: + agent: The Agent instance to generate a card for. + url: The base URL where this agent will be exposed. + + Returns: + AgentCard describing the agent's capabilities. + """ + server_config = _get_server_config(agent) or A2AServerConfig() + + name = server_config.name or agent.role + + description_parts = [agent.goal] + if agent.backstory: + description_parts.append(agent.backstory) + description = server_config.description or " ".join(description_parts) + + skills: list[AgentSkill] = ( + server_config.skills.copy() if server_config.skills else [] + ) + + if not skills: + if agent.tools: + for tool in agent.tools: + tool_name = getattr(tool, "name", None) or tool.__class__.__name__ + tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}" + skills.append(_tool_to_skill(tool_name, tool_desc)) + + if not skills: + skills.append( + AgentSkill( + id=agent.role.lower().replace(" ", "_"), + name=agent.role, + description=agent.goal, + tags=[agent.role.lower().replace(" ", "-")], + ) + ) + + return AgentCard( + name=name, + description=description, + url=server_config.url or url, + version=server_config.version, + capabilities=server_config.capabilities, + default_input_modes=server_config.default_input_modes, + default_output_modes=server_config.default_output_modes, + skills=skills, + protocol_version=server_config.protocol_version, + provider=server_config.provider, + documentation_url=server_config.documentation_url, + icon_url=server_config.icon_url, + additional_interfaces=server_config.additional_interfaces, + security=server_config.security, + security_schemes=server_config.security_schemes, + supports_authenticated_extended_card=server_config.supports_authenticated_extended_card, + signatures=server_config.signatures, + ) + + +def inject_a2a_server_methods(agent: Agent) -> None: + """Inject A2A server methods onto an Agent instance. + + Adds a `to_agent_card(url: str) -> AgentCard` method to the agent + that generates an A2A-compliant AgentCard. + + Only injects if the agent has an A2AServerConfig. + + Args: + agent: The Agent instance to inject methods onto. + """ + if _get_server_config(agent) is None: + return + + def _to_agent_card(self: Agent, url: str) -> AgentCard: + return _agent_to_agent_card(self, url) + + object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent)) diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils/delegation.py similarity index 64% rename from lib/crewai/src/crewai/a2a/utils.py rename to lib/crewai/src/crewai/a2a/utils/delegation.py index 93aee5215..740f914bc 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -1,16 +1,14 @@ -"""Utility functions for A2A (Agent-to-Agent) protocol delegation.""" +"""A2A delegation utilities for executing tasks on remote agents.""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator, MutableMapping from contextlib import asynccontextmanager -from functools import lru_cache -import time from typing import TYPE_CHECKING, Any, Literal import uuid -from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory +from a2a.client import Client, ClientConfig, ClientFactory from a2a.types import ( AgentCard, Message, @@ -19,19 +17,15 @@ from a2a.types import ( Role, TextPart, ) -from aiocache import cached # type: ignore[import-untyped] -from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] import httpx -from pydantic import BaseModel, Field, create_model +from pydantic import BaseModel from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth from crewai.a2a.auth.utils import ( _auth_store, configure_auth_client, - retry_on_401, validate_auth_against_agent_card, ) -from crewai.a2a.config import A2AConfig from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.types import ( HANDLER_REGISTRY, @@ -45,6 +39,7 @@ from crewai.a2a.updates import ( StreamingHandler, UpdateConfig, ) +from crewai.a2a.utils.agent_card import _afetch_agent_card_cached from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationStartedEvent, @@ -52,7 +47,6 @@ from crewai.events.types.a2a_events import ( A2ADelegationStartedEvent, A2AMessageSentEvent, ) -from crewai.types.utils import create_literals_from_strings if TYPE_CHECKING: @@ -75,187 +69,6 @@ def get_handler(config: UpdateConfig | None) -> HandlerType: return HANDLER_REGISTRY.get(type(config), StreamingHandler) -@lru_cache() -def _fetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, - _ttl_hash: int, -) -> AgentCard: - """Cached sync version of fetch_agent_card.""" - auth = _auth_store.get(auth_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -def fetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, - cache_ttl: int = 300, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint with optional caching. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL) - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - use_cache: Whether to use caching (default True) - cache_ttl: Cache TTL in seconds (default 300 = 5 minutes) - - Returns: - AgentCard object with agent capabilities and skills - - Raises: - httpx.HTTPStatusError: If the request fails - A2AClientHTTPError: If authentication fails - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - ttl_hash = int(time.time() // cache_ttl) - return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -async def afetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint asynchronously. - - Native async implementation. Use this when running in an async context. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL). - auth: Optional AuthScheme for authentication. - timeout: Request timeout in seconds. - use_cache: Whether to use caching (default True). - - Returns: - AgentCard object with agent capabilities and skills. - - Raises: - httpx.HTTPStatusError: If the request fails. - A2AClientHTTPError: If authentication fails. - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - agent_card: AgentCard = await _afetch_agent_card_cached( - endpoint, auth_hash, timeout - ) - return agent_card - - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] -async def _afetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, -) -> AgentCard: - """Cached async implementation of AgentCard fetching.""" - auth = _auth_store.get(auth_hash) - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -async def _afetch_agent_card_impl( - endpoint: str, - auth: AuthScheme | None, - timeout: int, -) -> AgentCard: - """Internal async implementation of AgentCard fetching.""" - if "/.well-known/agent-card.json" in endpoint: - base_url = endpoint.replace("/.well-known/agent-card.json", "") - agent_card_path = "/.well-known/agent-card.json" - else: - url_parts = endpoint.split("/", 3) - base_url = f"{url_parts[0]}//{url_parts[2]}" - agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" - - headers: MutableMapping[str, str] = {} - if auth: - async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: - if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_auth_client) - headers = await auth.apply_auth(temp_auth_client, {}) - - async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: - if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_client) - - agent_card_url = f"{base_url}{agent_card_path}" - - async def _fetch_agent_card_request() -> httpx.Response: - return await temp_client.get(agent_card_url) - - try: - response = await retry_on_401( - request_func=_fetch_agent_card_request, - auth_scheme=auth, - client=temp_client, - headers=temp_client.headers, - max_retries=2, - ) - response.raise_for_status() - - return AgentCard.model_validate(response.json()) - - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: - error_details = ["Authentication failed"] - www_auth = e.response.headers.get("WWW-Authenticate") - if www_auth: - error_details.append(f"WWW-Authenticate: {www_auth}") - if not auth: - error_details.append("No auth scheme provided") - msg = " | ".join(error_details) - raise A2AClientHTTPError(401, msg) from e - raise - - def execute_a2a_delegation( endpoint: str, transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], @@ -644,19 +457,18 @@ async def _create_a2a_client( """Create and configure an A2A client. Args: - agent_card: The A2A agent card - transport_protocol: Transport protocol to use - timeout: Request timeout in seconds - headers: HTTP headers (already with auth applied) - streaming: Enable streaming responses - auth: Optional AuthScheme for client configuration - use_polling: Enable polling mode - push_notification_config: Optional push notification config to include in requests + agent_card: The A2A agent card. + transport_protocol: Transport protocol to use. + timeout: Request timeout in seconds. + headers: HTTP headers (already with auth applied). + streaming: Enable streaming responses. + auth: Optional AuthScheme for client configuration. + use_polling: Enable polling mode. + push_notification_config: Optional push notification config. Yields: - Configured A2A client instance + Configured A2A client instance. """ - async with httpx.AsyncClient( timeout=timeout, headers=headers, @@ -687,78 +499,3 @@ async def _create_a2a_client( factory = ClientFactory(config) client = factory.create(agent_card) yield client - - -def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: - """Create a dynamic AgentResponse model with Literal types for agent IDs. - - Args: - agent_ids: List of available A2A agent IDs - - Returns: - Dynamically created Pydantic model with Literal-constrained a2a_ids field - """ - - DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 - - return create_model( - "AgentResponse", - a2a_ids=( - tuple[DynamicLiteral, ...], # type: ignore[valid-type] - Field( - default_factory=tuple, - max_length=len(agent_ids), - description="A2A agent IDs to delegate to.", - ), - ), - message=( - str, - Field( - description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." - ), - ), - is_a2a=( - bool, - Field( - description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." - ), - ), - __base__=BaseModel, - ) - - -def extract_a2a_agent_ids_from_config( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], tuple[str, ...]]: - """Extract A2A agent IDs from A2A configuration. - - Args: - a2a_config: A2A configuration - - Returns: - List of A2A agent IDs - """ - if a2a_config is None: - return [], () - - if isinstance(a2a_config, A2AConfig): - a2a_agents = [a2a_config] - else: - a2a_agents = a2a_config - return a2a_agents, tuple(config.endpoint for config in a2a_agents) - - -def get_a2a_agents_and_response_model( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], type[BaseModel]]: - """Get A2A agent IDs and response model. - - Args: - a2a_config: A2A configuration - - Returns: - Tuple of A2A agent IDs and response model - """ - a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) - - return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py new file mode 100644 index 000000000..44d8a5ba6 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/response_model.py @@ -0,0 +1,97 @@ +"""Response model utilities for A2A agent interactions.""" + +from __future__ import annotations + +from typing import TypeAlias + +from pydantic import BaseModel, Field, create_model + +from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig +from crewai.types.utils import create_literals_from_strings + + +A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig +A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig + + +def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: + """Create a dynamic AgentResponse model with Literal types for agent IDs. + + Args: + agent_ids: List of available A2A agent IDs. + + Returns: + Dynamically created Pydantic model with Literal-constrained a2a_ids field. + """ + DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 + + return create_model( + "AgentResponse", + a2a_ids=( + tuple[DynamicLiteral, ...], # type: ignore[valid-type] + Field( + default_factory=tuple, + max_length=len(agent_ids), + description="A2A agent IDs to delegate to.", + ), + ), + message=( + str, + Field( + description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." + ), + ), + is_a2a=( + bool, + Field( + description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." + ), + ), + __base__=BaseModel, + ) + + +def extract_a2a_agent_ids_from_config( + a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, +) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]: + """Extract A2A agent IDs from A2A configuration. + + Filters out A2AServerConfig since it doesn't have an endpoint for delegation. + + Args: + a2a_config: A2A configuration (any type). + + Returns: + Tuple of client A2A configs list and agent endpoint IDs. + """ + if a2a_config is None: + return [], () + + configs: list[A2AConfigTypes] + if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)): + configs = [a2a_config] + else: + configs = a2a_config + + # Filter to only client configs (those with endpoint) + client_configs: list[A2AClientConfigTypes] = [ + config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig)) + ] + + return client_configs, tuple(config.endpoint for config in client_configs) + + +def get_a2a_agents_and_response_model( + a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, +) -> tuple[list[A2AClientConfigTypes], type[BaseModel]]: + """Get A2A agent configs and response model. + + Args: + a2a_config: A2A configuration (any type). + + Returns: + Tuple of client A2A configs and response model. + """ + a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) + + return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 7d52e85b8..37a6c665e 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any from a2a.types import Role, TaskState from pydantic import BaseModel, ValidationError -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig from crewai.a2a.extensions.base import ExtensionRegistry from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.templates import ( @@ -26,13 +26,16 @@ from crewai.a2a.templates import ( UNAVAILABLE_AGENTS_NOTICE_TEMPLATE, ) from crewai.a2a.types import AgentResponseProtocol -from crewai.a2a.utils import ( - aexecute_a2a_delegation, +from crewai.a2a.utils.agent_card import ( afetch_agent_card, - execute_a2a_delegation, fetch_agent_card, - get_a2a_agents_and_response_model, + inject_a2a_server_methods, ) +from crewai.a2a.utils.delegation import ( + aexecute_a2a_delegation, + execute_a2a_delegation, +) +from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationCompletedEvent, @@ -122,10 +125,12 @@ def wrap_agent_with_a2a_instance( agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) ) + inject_a2a_server_methods(agent) + def _fetch_card_from_config( - config: A2AConfig, -) -> tuple[A2AConfig, AgentCard | Exception]: + config: A2AConfig | A2AClientConfig, +) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: """Fetch agent card from A2A config. Args: @@ -146,7 +151,7 @@ def _fetch_card_from_config( def _fetch_agent_cards_concurrently( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: """Fetch agent cards concurrently for multiple A2A agents. @@ -181,7 +186,7 @@ def _fetch_agent_cards_concurrently( def _execute_task_with_a2a( self: Agent, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., str], task: Task, agent_response_model: type[BaseModel], @@ -270,7 +275,7 @@ def _execute_task_with_a2a( def _augment_prompt_with_a2a( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], task_description: str, agent_cards: dict[str, AgentCard], conversation_history: list[Message] | None = None, @@ -523,11 +528,11 @@ def _prepare_delegation_context( task: Task, original_task_description: str | None, ) -> tuple[ - list[A2AConfig], + list[A2AConfig | A2AClientConfig], type[BaseModel], str, str, - A2AConfig, + A2AConfig | A2AClientConfig, str | None, str | None, dict[str, Any] | None, @@ -591,7 +596,7 @@ def _handle_task_completion( task: Task, task_id_config: str | None, reference_task_ids: list[str], - agent_config: A2AConfig, + agent_config: A2AConfig | A2AClientConfig, turn_num: int, ) -> tuple[str | None, str | None, list[str]]: """Handle task completion state including reference task updates. @@ -631,7 +636,7 @@ def _handle_agent_response_and_continue( a2a_result: TaskStateResult, agent_id: str, agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_task_description: str, conversation_history: list[Message], turn_num: int, @@ -868,8 +873,8 @@ def _delegate_to_a2a( async def _afetch_card_from_config( - config: A2AConfig, -) -> tuple[A2AConfig, AgentCard | Exception]: + config: A2AConfig | A2AClientConfig, +) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: """Fetch agent card from A2A config asynchronously.""" try: card = await afetch_agent_card( @@ -883,7 +888,7 @@ async def _afetch_card_from_config( async def _afetch_agent_cards_concurrently( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: """Fetch agent cards concurrently for multiple A2A agents using asyncio.""" agent_cards: dict[str, AgentCard] = {} @@ -908,7 +913,7 @@ async def _afetch_agent_cards_concurrently( async def _aexecute_task_with_a2a( self: Agent, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., Coroutine[Any, Any, str]], task: Task, agent_response_model: type[BaseModel], @@ -987,7 +992,7 @@ async def _ahandle_agent_response_and_continue( a2a_result: TaskStateResult, agent_id: str, agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_task_description: str, conversation_history: list[Message], turn_num: int, diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index d06b3b6f7..1c7a653ec 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -17,7 +17,7 @@ from urllib.parse import urlparse from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator from typing_extensions import Self -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig from crewai.agent.utils import ( ahandle_knowledge_retrieval, apply_training_data, @@ -73,7 +73,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F from crewai.utilities.converter import Converter from crewai.utilities.guardrail_types import GuardrailType from crewai.utilities.llm_utils import create_llm -from crewai.utilities.prompts import Prompts +from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler @@ -218,9 +218,18 @@ class Agent(BaseAgent): guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) - a2a: list[A2AConfig] | A2AConfig | None = Field( + a2a: ( + list[A2AConfig | A2AServerConfig | A2AClientConfig] + | A2AConfig + | A2AServerConfig + | A2AClientConfig + | None + ) = Field( default=None, - description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", + description=""" + A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. + Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig. + """, ) executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field( default=CrewAgentExecutor, @@ -733,7 +742,7 @@ class Agent(BaseAgent): if self.agent_executor is not None: self._update_executor_parameters( task=task, - tools=parsed_tools, + tools=parsed_tools, # type: ignore[arg-type] raw_tools=raw_tools, prompt=prompt, stop_words=stop_words, @@ -742,7 +751,7 @@ class Agent(BaseAgent): else: self.agent_executor = self.executor_class( llm=cast(BaseLLM, self.llm), - task=task, + task=task, # type: ignore[arg-type] i18n=self.i18n, agent=self, crew=self.crew, @@ -765,11 +774,11 @@ class Agent(BaseAgent): def _update_executor_parameters( self, task: Task | None, - tools: list, + tools: list[BaseTool], raw_tools: list[BaseTool], - prompt: dict, + prompt: SystemPromptResult | StandardPromptResult, stop_words: list[str], - rpm_limit_fn: Callable | None, + rpm_limit_fn: Callable | None, # type: ignore[type-arg] ) -> None: """Update executor parameters without recreating instance. diff --git a/lib/crewai/tests/a2a/utils/test_agent_card.py b/lib/crewai/tests/a2a/utils/test_agent_card.py new file mode 100644 index 000000000..fb96710a7 --- /dev/null +++ b/lib/crewai/tests/a2a/utils/test_agent_card.py @@ -0,0 +1,325 @@ +"""Tests for A2A agent card utilities.""" + +from __future__ import annotations + +from a2a.types import AgentCard, AgentSkill + +from crewai import Agent +from crewai.a2a.config import A2AClientConfig, A2AServerConfig +from crewai.a2a.utils.agent_card import inject_a2a_server_methods + + +class TestInjectA2AServerMethods: + """Tests for inject_a2a_server_methods function.""" + + def test_agent_with_server_config_gets_to_agent_card_method(self) -> None: + """Agent with A2AServerConfig should have to_agent_card method injected.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + def test_agent_without_server_config_no_injection(self) -> None: + """Agent without A2AServerConfig should not get to_agent_card method.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AClientConfig(endpoint="http://example.com"), + ) + + assert not hasattr(agent, "to_agent_card") + + def test_agent_without_a2a_no_injection(self) -> None: + """Agent without any a2a config should not get to_agent_card method.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + ) + + assert not hasattr(agent, "to_agent_card") + + def test_agent_with_mixed_configs_gets_injection(self) -> None: + """Agent with list containing A2AServerConfig should get to_agent_card.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=[ + A2AClientConfig(endpoint="http://example.com"), + A2AServerConfig(name="My Agent"), + ], + ) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + def test_manual_injection_on_plain_agent(self) -> None: + """inject_a2a_server_methods should work when called manually.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + ) + # Manually set server config and inject + object.__setattr__(agent, "a2a", A2AServerConfig()) + inject_a2a_server_methods(agent) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + +class TestToAgentCard: + """Tests for the injected to_agent_card method.""" + + def test_returns_agent_card(self) -> None: + """to_agent_card should return an AgentCard instance.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert isinstance(card, AgentCard) + + def test_uses_agent_role_as_name(self) -> None: + """AgentCard name should default to agent role.""" + agent = Agent( + role="Data Analyst", + goal="Analyze data", + backstory="Expert analyst", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.name == "Data Analyst" + + def test_uses_server_config_name(self) -> None: + """AgentCard name should prefer A2AServerConfig.name over role.""" + agent = Agent( + role="Data Analyst", + goal="Analyze data", + backstory="Expert analyst", + a2a=A2AServerConfig(name="Custom Agent Name"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.name == "Custom Agent Name" + + def test_uses_goal_as_description(self) -> None: + """AgentCard description should include agent goal.""" + agent = Agent( + role="Test Agent", + goal="Accomplish important tasks", + backstory="Has extensive experience", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert "Accomplish important tasks" in card.description + + def test_uses_server_config_description(self) -> None: + """AgentCard description should prefer A2AServerConfig.description.""" + agent = Agent( + role="Test Agent", + goal="Accomplish important tasks", + backstory="Has extensive experience", + a2a=A2AServerConfig(description="Custom description"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.description == "Custom description" + + def test_uses_provided_url(self) -> None: + """AgentCard url should use the provided URL.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://my-server.com:9000") + + assert card.url == "http://my-server.com:9000" + + def test_uses_server_config_url(self) -> None: + """AgentCard url should prefer A2AServerConfig.url over provided URL.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(url="http://configured-url.com"), + ) + + card = agent.to_agent_card("http://fallback-url.com") + + assert card.url == "http://configured-url.com/" + + def test_generates_default_skill(self) -> None: + """AgentCard should have at least one skill based on agent role.""" + agent = Agent( + role="Research Assistant", + goal="Help with research", + backstory="Skilled researcher", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert len(card.skills) >= 1 + skill = card.skills[0] + assert skill.name == "Research Assistant" + assert skill.description == "Help with research" + + def test_uses_server_config_skills(self) -> None: + """AgentCard skills should prefer A2AServerConfig.skills.""" + custom_skill = AgentSkill( + id="custom-skill", + name="Custom Skill", + description="A custom skill", + tags=["custom"], + ) + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(skills=[custom_skill]), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert len(card.skills) == 1 + assert card.skills[0].id == "custom-skill" + assert card.skills[0].name == "Custom Skill" + + def test_includes_custom_version(self) -> None: + """AgentCard should include version from A2AServerConfig.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(version="2.0.0"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.version == "2.0.0" + + def test_default_version(self) -> None: + """AgentCard should have default version 1.0.0.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.version == "1.0.0" + + +class TestAgentCardJsonStructure: + """Tests for the JSON structure of AgentCard.""" + + def test_json_has_required_fields(self) -> None: + """AgentCard JSON should contain all required A2A protocol fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + assert "name" in json_data + assert "description" in json_data + assert "url" in json_data + assert "version" in json_data + assert "skills" in json_data + assert "capabilities" in json_data + assert "defaultInputModes" in json_data + assert "defaultOutputModes" in json_data + + def test_json_skills_structure(self) -> None: + """Each skill in JSON should have required fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + assert len(json_data["skills"]) >= 1 + skill = json_data["skills"][0] + assert "id" in skill + assert "name" in skill + assert "description" in skill + assert "tags" in skill + + def test_json_capabilities_structure(self) -> None: + """Capabilities in JSON should have expected fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + capabilities = json_data["capabilities"] + assert "streaming" in capabilities + assert "pushNotifications" in capabilities + + def test_json_serializable(self) -> None: + """AgentCard should be JSON serializable.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_str = card.model_dump_json() + + assert isinstance(json_str, str) + assert "Test Agent" in json_str + assert "http://localhost:8000" in json_str + + def test_json_excludes_none_values(self) -> None: + """AgentCard JSON with exclude_none should omit None fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump(exclude_none=True) + + assert "provider" not in json_data + assert "documentationUrl" not in json_data + assert "iconUrl" not in json_data diff --git a/pyproject.toml b/pyproject.toml index 594bdf9aa..de3f03ecb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,7 +117,7 @@ show_error_codes = true warn_unused_ignores = true python_version = "3.12" exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)" -plugins = ["pydantic.mypy", "crewai.mypy"] +plugins = ["pydantic.mypy"] [tool.bandit]