feat(a2a): add plus api token auth

* feat(a2a): add plus api token auth

* feat(a2a): use stub for plus api

* fix: use dynamic separator in slugify for dedup and strip

* fix: remove unused _DUPLICATE_SEPARATOR_PATTERN, cache compiled regex in slugify
This commit is contained in:
Greyson LaLonde
2026-03-15 23:30:29 -04:00
committed by GitHub
parent 32d7b4a8d4
commit 4d21c6e4ad
5 changed files with 69 additions and 2 deletions

View File

@@ -13,6 +13,7 @@ from crewai.a2a.auth.client_schemes import (
)
from crewai.a2a.auth.server_schemes import (
AuthenticatedUser,
EnterpriseTokenAuth,
OIDCAuth,
ServerAuthScheme,
SimpleTokenAuth,
@@ -25,6 +26,7 @@ __all__ = [
"AuthenticatedUser",
"BearerTokenAuth",
"ClientAuthScheme",
"EnterpriseTokenAuth",
"HTTPBasicAuth",
"HTTPDigestAuth",
"OAuth2AuthorizationCode",

View File

@@ -4,6 +4,7 @@ These schemes validate incoming requests to A2A server endpoints.
Supported authentication methods:
- Simple token validation with static bearer tokens
- Enterprise token validation (via PlusAPI)
- OpenID Connect with JWT validation using JWKS
- OAuth2 with JWT validation or token introspection
"""
@@ -16,6 +17,7 @@ import logging
import os
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal
import httpx
import jwt
from jwt import PyJWKClient
from pydantic import (
@@ -33,6 +35,7 @@ from typing_extensions import Self
if TYPE_CHECKING:
from a2a.types import OAuth2SecurityScheme
from jwt.types import Options
logger = logging.getLogger(__name__)
@@ -183,6 +186,24 @@ class SimpleTokenAuth(ServerAuthScheme):
)
class EnterpriseTokenAuth(ServerAuthScheme):
"""Enterprise token authentication.
Validates tokens via the PlusAPI enterprise verification endpoint.
"""
async def authenticate(self, token: str) -> AuthenticatedUser:
"""Authenticate using enterprise token verification.
Args:
token: The bearer token to authenticate.
Raises:
NotImplementedError
"""
raise NotImplementedError
class OIDCAuth(ServerAuthScheme):
"""OpenID Connect authentication.
@@ -475,7 +496,7 @@ class OAuth2ServerAuth(ServerAuthScheme):
try:
signing_key = self._jwk_client.get_signing_key_from_jwt(token)
decode_options: dict[str, Any] = {
decode_options: Options = {
"require": self.required_claims,
}
@@ -556,7 +577,6 @@ class OAuth2ServerAuth(ServerAuthScheme):
async def _authenticate_introspection(self, token: str) -> AuthenticatedUser:
"""Authenticate using OAuth2 token introspection (RFC 7662)."""
import httpx
if not self.introspection_url:
raise HTTPException(

View File

@@ -633,6 +633,10 @@ class A2AServerConfig(BaseModel):
default=False,
description="Whether agent provides extended card to authenticated users",
)
extended_skills: list[AgentSkill] = Field(
default_factory=list,
description="Additional skills visible only to authenticated users in the extended card",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",

View File

@@ -63,6 +63,9 @@ class A2AErrorCode(IntEnum):
INVALID_AGENT_RESPONSE = -32006
"""The agent produced an invalid response."""
AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED = -32007
"""Authenticated extended card feature is not configured."""
# CrewAI Custom Extensions (-32768 to -32100)
UNSUPPORTED_VERSION = -32009
"""The requested A2A protocol version is not supported."""
@@ -108,6 +111,7 @@ ERROR_MESSAGES: dict[int, str] = {
A2AErrorCode.UNSUPPORTED_OPERATION: "This operation is not supported",
A2AErrorCode.CONTENT_TYPE_NOT_SUPPORTED: "Incompatible content types",
A2AErrorCode.INVALID_AGENT_RESPONSE: "Invalid agent response",
A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED: "Authenticated Extended Card is not configured",
A2AErrorCode.UNSUPPORTED_VERSION: "Unsupported A2A version",
A2AErrorCode.UNSUPPORTED_EXTENSION: "Client does not support required extensions",
A2AErrorCode.AUTHENTICATION_REQUIRED: "Authentication required",
@@ -284,6 +288,15 @@ class InvalidAgentResponseError(A2AError):
code: int = field(default=A2AErrorCode.INVALID_AGENT_RESPONSE, init=False)
@dataclass
class AuthenticatedExtendedCardNotConfiguredError(A2AError):
"""Authenticated extended card is not configured."""
code: int = field(
default=A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED, init=False
)
@dataclass
class UnsupportedVersionError(A2AError):
"""The requested A2A version is not supported."""

View File

@@ -2,6 +2,7 @@
# https://github.com/un33k/python-slugify
# MIT License
import functools
import hashlib
import re
from typing import Any, Final
@@ -17,6 +18,11 @@ _DUPLICATE_UNDERSCORE_PATTERN: Final[re.Pattern[str]] = re.compile(r"_+")
_MAX_TOOL_NAME_LENGTH: Final[int] = 64
@functools.lru_cache(maxsize=8)
def _duplicate_separator_pattern(separator: str) -> re.Pattern[str]:
return re.compile(f"(?:{re.escape(separator)}){{2,}}")
def sanitize_tool_name(name: str, max_length: int = _MAX_TOOL_NAME_LENGTH) -> str:
"""Sanitize tool name for LLM provider compatibility.
@@ -48,6 +54,28 @@ def sanitize_tool_name(name: str, max_length: int = _MAX_TOOL_NAME_LENGTH) -> st
return name
def slugify(text: str, separator: str = "_") -> str:
"""Convert text to a URL-safe slug.
Normalizes Unicode characters, removes special characters,
and replaces whitespace with the separator.
Args:
text: The text to slugify.
separator: The separator to use between words. Defaults to underscore.
Returns:
A URL-safe slug.
"""
text = unicodedata.normalize("NFKD", text)
text = text.encode("ascii", "ignore").decode("ascii")
text = text.lower()
text = _QUOTE_PATTERN.sub("", text)
text = _DISALLOWED_CHARS_PATTERN.sub(separator, text)
text = _duplicate_separator_pattern(separator).sub(separator, text)
return text.strip(separator)
def interpolate_only(
input_string: str | None,
inputs: dict[str, str | int | float | dict[str, Any] | list[Any]],