feat(a2a): add plus api token auth

This commit is contained in:
Greyson LaLonde
2026-03-14 21:37:05 -04:00
parent e1d7de0dba
commit e3f1a16451
5 changed files with 112 additions and 1 deletions

View File

@@ -13,6 +13,7 @@ from crewai.a2a.auth.client_schemes import (
)
from crewai.a2a.auth.server_schemes import (
AuthenticatedUser,
EnterpriseTokenAuth,
OIDCAuth,
ServerAuthScheme,
SimpleTokenAuth,
@@ -25,6 +26,7 @@ __all__ = [
"AuthenticatedUser",
"BearerTokenAuth",
"ClientAuthScheme",
"EnterpriseTokenAuth",
"HTTPBasicAuth",
"HTTPDigestAuth",
"OAuth2AuthorizationCode",

View File

@@ -4,6 +4,7 @@ These schemes validate incoming requests to A2A server endpoints.
Supported authentication methods:
- Simple token validation with static bearer tokens
- Enterprise token validation (via PlusAPI)
- OpenID Connect with JWT validation using JWKS
- OAuth2 with JWT validation or token introspection
"""
@@ -33,6 +34,7 @@ from typing_extensions import Self
if TYPE_CHECKING:
from a2a.types import OAuth2SecurityScheme
from jwt.types import Options
logger = logging.getLogger(__name__)
@@ -183,6 +185,73 @@ class SimpleTokenAuth(ServerAuthScheme):
)
class EnterpriseTokenAuth(ServerAuthScheme):
"""Enterprise token authentication.
Validates tokens via the PlusAPI enterprise verification endpoint.
"""
async def authenticate(self, token: str) -> AuthenticatedUser:
"""Authenticate using enterprise token verification.
Args:
token: The bearer token to authenticate.
Returns:
AuthenticatedUser on successful authentication.
Raises:
HTTPException: If authentication fails.
"""
integration_token = self._verify_enterprise_token(token)
if integration_token is None:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Invalid or missing authentication credentials",
)
return AuthenticatedUser(
token=token,
scheme="enterprise",
claims={"integration_token": integration_token},
)
@staticmethod
def _verify_enterprise_token(auth_token: str) -> str | None:
"""Verify enterprise token via PlusAPI.
Args:
auth_token: The token to verify.
Returns:
The integration token if valid, None otherwise.
"""
try:
from crewai.cli.plus_api import PlusAPI
plus_api = PlusAPI(auth_token)
response = plus_api.verify_enterprise_token(auth_token) # type: ignore[attr-defined]
if response.status_code == 200:
verified_response_json: dict[str, Any] = response.json()
integration_token: str | None = verified_response_json.get(
"integration_token"
)
return integration_token
logger.error(
"Enterprise token verification failed",
extra={"status_code": response.status_code, "response": response.text},
)
except Exception as e:
logger.error(
"Error verifying enterprise token",
extra={"error": str(e)},
)
return None
class OIDCAuth(ServerAuthScheme):
"""OpenID Connect authentication.
@@ -475,7 +544,7 @@ class OAuth2ServerAuth(ServerAuthScheme):
try:
signing_key = self._jwk_client.get_signing_key_from_jwt(token)
decode_options: dict[str, Any] = {
decode_options: Options = {
"require": self.required_claims,
}

View File

@@ -633,6 +633,10 @@ class A2AServerConfig(BaseModel):
default=False,
description="Whether agent provides extended card to authenticated users",
)
extended_skills: list[AgentSkill] = Field(
default_factory=list,
description="Additional skills visible only to authenticated users in the extended card",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",

View File

@@ -63,6 +63,9 @@ class A2AErrorCode(IntEnum):
INVALID_AGENT_RESPONSE = -32006
"""The agent produced an invalid response."""
AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED = -32007
"""Authenticated extended card feature is not configured."""
# CrewAI Custom Extensions (-32768 to -32100)
UNSUPPORTED_VERSION = -32009
"""The requested A2A protocol version is not supported."""
@@ -108,6 +111,7 @@ ERROR_MESSAGES: dict[int, str] = {
A2AErrorCode.UNSUPPORTED_OPERATION: "This operation is not supported",
A2AErrorCode.CONTENT_TYPE_NOT_SUPPORTED: "Incompatible content types",
A2AErrorCode.INVALID_AGENT_RESPONSE: "Invalid agent response",
A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED: "Authenticated Extended Card is not configured",
A2AErrorCode.UNSUPPORTED_VERSION: "Unsupported A2A version",
A2AErrorCode.UNSUPPORTED_EXTENSION: "Client does not support required extensions",
A2AErrorCode.AUTHENTICATION_REQUIRED: "Authentication required",
@@ -284,6 +288,15 @@ class InvalidAgentResponseError(A2AError):
code: int = field(default=A2AErrorCode.INVALID_AGENT_RESPONSE, init=False)
@dataclass
class AuthenticatedExtendedCardNotConfiguredError(A2AError):
"""Authenticated extended card is not configured."""
code: int = field(
default=A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED, init=False
)
@dataclass
class UnsupportedVersionError(A2AError):
"""The requested A2A version is not supported."""

View File

@@ -13,6 +13,7 @@ _QUOTE_PATTERN: Final[re.Pattern[str]] = re.compile(r"[\'\"]+")
_CAMEL_LOWER_UPPER: Final[re.Pattern[str]] = re.compile(r"([a-z])([A-Z])")
_CAMEL_UPPER_LOWER: Final[re.Pattern[str]] = re.compile(r"([A-Z]+)([A-Z][a-z])")
_DISALLOWED_CHARS_PATTERN: Final[re.Pattern[str]] = re.compile(r"[^a-zA-Z0-9]+")
_DUPLICATE_SEPARATOR_PATTERN: Final[re.Pattern[str]] = re.compile(r"[-_]{2,}")
_DUPLICATE_UNDERSCORE_PATTERN: Final[re.Pattern[str]] = re.compile(r"_+")
_MAX_TOOL_NAME_LENGTH: Final[int] = 64
@@ -48,6 +49,28 @@ def sanitize_tool_name(name: str, max_length: int = _MAX_TOOL_NAME_LENGTH) -> st
return name
def slugify(text: str, separator: str = "_") -> str:
"""Convert text to a URL-safe slug.
Normalizes Unicode characters, removes special characters,
and replaces whitespace with the separator.
Args:
text: The text to slugify.
separator: The separator to use between words. Defaults to underscore.
Returns:
A URL-safe slug.
"""
text = unicodedata.normalize("NFKD", text)
text = text.encode("ascii", "ignore").decode("ascii")
text = text.lower()
text = _QUOTE_PATTERN.sub("", text)
text = _DISALLOWED_CHARS_PATTERN.sub(separator, text)
text = _DUPLICATE_SEPARATOR_PATTERN.sub(separator, text)
return text.strip("-_")
def interpolate_only(
input_string: str | None,
inputs: dict[str, str | int | float | dict[str, Any] | list[Any]],