From 22f1812824c841ec24a5ade81b98122fcf669b17 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 14 Jan 2026 22:09:11 -0500 Subject: [PATCH 1/4] feat: add a2a server config; agent card generation --- lib/crewai/src/crewai/a2a/config.py | 213 +++++++++- lib/crewai/src/crewai/a2a/types.py | 24 +- lib/crewai/src/crewai/a2a/utils/__init__.py | 1 + lib/crewai/src/crewai/a2a/utils/agent_card.py | 399 ++++++++++++++++++ .../a2a/{utils.py => utils/delegation.py} | 289 +------------ .../src/crewai/a2a/utils/response_model.py | 97 +++++ lib/crewai/src/crewai/a2a/wrapper.py | 43 +- lib/crewai/src/crewai/agent/core.py | 27 +- lib/crewai/tests/a2a/utils/test_agent_card.py | 325 ++++++++++++++ pyproject.toml | 2 +- 10 files changed, 1096 insertions(+), 324 deletions(-) create mode 100644 lib/crewai/src/crewai/a2a/utils/__init__.py create mode 100644 lib/crewai/src/crewai/a2a/utils/agent_card.py rename lib/crewai/src/crewai/a2a/{utils.py => utils/delegation.py} (64%) create mode 100644 lib/crewai/src/crewai/a2a/utils/response_model.py create mode 100644 lib/crewai/tests/a2a/utils/test_agent_card.py diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index 535db971f..1597ae821 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -5,18 +5,22 @@ This module is separate from experimental.a2a to avoid circular imports. from __future__ import annotations -from typing import Annotated, Any, ClassVar, Literal +from importlib.metadata import version +from typing import Any, ClassVar, Literal -from pydantic import ( - BaseModel, - BeforeValidator, - ConfigDict, - Field, - HttpUrl, - TypeAdapter, +from a2a.types import ( + AgentCapabilities, + AgentCardSignature, + AgentInterface, + AgentProvider, + AgentSkill, + SecurityScheme, ) +from pydantic import BaseModel, ConfigDict, Field +from typing_extensions import deprecated from crewai.a2a.auth.schemas import AuthScheme +from crewai.a2a.types import TransportType, Url try: @@ -25,25 +29,25 @@ except ImportError: UpdateConfig = Any # type: ignore[misc,assignment] -http_url_adapter = TypeAdapter(HttpUrl) - -Url = Annotated[ - str, - BeforeValidator( - lambda value: str(http_url_adapter.validate_python(value, strict=True)) - ), -] - - def _get_default_update_config() -> UpdateConfig: from crewai.a2a.updates import StreamingConfig return StreamingConfig() +@deprecated( + """ + `crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0, + use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead. + """, + category=FutureWarning, +) class A2AConfig(BaseModel): """Configuration for A2A protocol integration. + Deprecated: + Use A2AClientConfig instead. This class will be removed in a future version. + Attributes: endpoint: A2A agent endpoint URL. auth: Authentication scheme. @@ -87,3 +91,176 @@ class A2AConfig(BaseModel): default="JSONRPC", description="Specified mode of A2A transport protocol", ) + + +class A2AClientConfig(BaseModel): + """Configuration for connecting to remote A2A agents. + + Attributes: + endpoint: A2A agent endpoint URL. + auth: Authentication scheme. + timeout: Request timeout in seconds. + max_turns: Maximum conversation turns with A2A agent. + response_model: Optional Pydantic model for structured A2A agent responses. + fail_fast: If True, raise error when agent unreachable; if False, skip and continue. + trust_remote_completion_status: If True, return A2A agent's result directly when completed. + updates: Update mechanism config. + accepted_output_modes: Media types the client can accept in responses. + supported_transports: Ordered list of transport protocols the client supports. + use_client_preference: Whether to prioritize client transport preferences over server. + extensions: Extension URIs the client supports. + """ + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + endpoint: Url = Field(description="A2A agent endpoint URL") + auth: AuthScheme | None = Field( + default=None, + description="Authentication scheme", + ) + timeout: int = Field(default=120, description="Request timeout in seconds") + max_turns: int = Field( + default=10, description="Maximum conversation turns with A2A agent" + ) + response_model: type[BaseModel] | None = Field( + default=None, + description="Optional Pydantic model for structured A2A agent responses", + ) + fail_fast: bool = Field( + default=True, + description="If True, raise error when agent unreachable; if False, skip", + ) + trust_remote_completion_status: bool = Field( + default=False, + description="If True, return A2A result directly when completed", + ) + updates: UpdateConfig = Field( + default_factory=_get_default_update_config, + description="Update mechanism config", + ) + accepted_output_modes: list[str] = Field( + default_factory=lambda: ["application/json"], + description="Media types the client can accept in responses", + ) + supported_transports: list[str] = Field( + default_factory=lambda: ["JSONRPC"], + description="Ordered list of transport protocols the client supports", + ) + use_client_preference: bool = Field( + default=False, + description="Whether to prioritize client transport preferences over server", + ) + extensions: list[str] = Field( + default_factory=list, + description="Extension URIs the client supports", + ) + transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field( + default="JSONRPC", + description="Specified mode of A2A transport protocol", + ) + + +class A2AServerConfig(BaseModel): + """Configuration for exposing a Crew or Agent as an A2A server. + + All fields correspond to A2A AgentCard fields. Fields like name, description, + and skills can be auto-derived from the Crew/Agent if not provided. + + Attributes: + name: Human-readable name for the agent. + description: Human-readable description of the agent. + version: Version string for the agent card. + skills: List of agent skills/capabilities. + default_input_modes: Default supported input MIME types. + default_output_modes: Default supported output MIME types. + capabilities: Declaration of optional capabilities. + preferred_transport: Transport protocol for the preferred endpoint. + protocol_version: A2A protocol version this agent supports. + provider: Information about the agent's service provider. + documentation_url: URL to the agent's documentation. + icon_url: URL to an icon for the agent. + additional_interfaces: Additional supported interfaces. + security: Security requirement objects for all interactions. + security_schemes: Security schemes available to authorize requests. + supports_authenticated_extended_card: Whether agent provides extended card to authenticated users. + url: Preferred endpoint URL for the agent. + signatures: JSON Web Signatures for the AgentCard. + """ + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + name: str | None = Field( + default=None, + description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.", + ) + description: str | None = Field( + default=None, + description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.", + ) + version: str = Field( + default="1.0.0", + description="Version string for the agent card", + ) + skills: list[AgentSkill] = Field( + default_factory=list, + description="List of agent skills. Auto-derived from tasks/tools if not provided.", + ) + default_input_modes: list[str] = Field( + default_factory=lambda: ["text/plain", "application/json"], + description="Default supported input MIME types", + ) + default_output_modes: list[str] = Field( + default_factory=lambda: ["text/plain", "application/json"], + description="Default supported output MIME types", + ) + capabilities: AgentCapabilities = Field( + default_factory=lambda: AgentCapabilities( + streaming=True, + push_notifications=False, + ), + description="Declaration of optional capabilities supported by the agent", + ) + preferred_transport: TransportType = Field( + default="JSONRPC", + description="Transport protocol for the preferred endpoint", + ) + protocol_version: str = Field( + default_factory=lambda: version("a2a-sdk"), + description="A2A protocol version this agent supports", + ) + provider: AgentProvider | None = Field( + default=None, + description="Information about the agent's service provider", + ) + documentation_url: Url | None = Field( + default=None, + description="URL to the agent's documentation", + ) + icon_url: Url | None = Field( + default=None, + description="URL to an icon for the agent", + ) + additional_interfaces: list[AgentInterface] = Field( + default_factory=list, + description="Additional supported interfaces (transport and URL combinations)", + ) + security: list[dict[str, list[str]]] = Field( + default_factory=list, + description="Security requirement objects for all agent interactions", + ) + security_schemes: dict[str, SecurityScheme] = Field( + default_factory=dict, + description="Security schemes available to authorize requests", + ) + supports_authenticated_extended_card: bool = Field( + default=False, + description="Whether agent provides extended card to authenticated users", + ) + url: Url | None = Field( + default=None, + description="Preferred endpoint URL for the agent. Set at runtime if not provided.", + ) + signatures: list[AgentCardSignature] = Field( + default_factory=list, + description="JSON Web Signatures for the AgentCard", + ) diff --git a/lib/crewai/src/crewai/a2a/types.py b/lib/crewai/src/crewai/a2a/types.py index 217b59467..90473b669 100644 --- a/lib/crewai/src/crewai/a2a/types.py +++ b/lib/crewai/src/crewai/a2a/types.py @@ -1,7 +1,17 @@ """Type definitions for A2A protocol message parts.""" -from typing import Any, Literal, Protocol, TypedDict, runtime_checkable +from __future__ import annotations +from typing import ( + Annotated, + Any, + Literal, + Protocol, + TypedDict, + runtime_checkable, +) + +from pydantic import BeforeValidator, HttpUrl, TypeAdapter from typing_extensions import NotRequired from crewai.a2a.updates import ( @@ -15,6 +25,18 @@ from crewai.a2a.updates import ( ) +TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"] + +http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl) + +Url = Annotated[ + str, + BeforeValidator( + lambda value: str(http_url_adapter.validate_python(value, strict=True)) + ), +] + + @runtime_checkable class AgentResponseProtocol(Protocol): """Protocol for the dynamically created AgentResponse model.""" diff --git a/lib/crewai/src/crewai/a2a/utils/__init__.py b/lib/crewai/src/crewai/a2a/utils/__init__.py new file mode 100644 index 000000000..bdb7bed62 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/__init__.py @@ -0,0 +1 @@ +"""A2A utility modules for client operations.""" diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py new file mode 100644 index 000000000..7c798dc1a --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -0,0 +1,399 @@ +"""AgentCard utilities for A2A client and server operations.""" + +from __future__ import annotations + +import asyncio +from collections.abc import MutableMapping +from functools import lru_cache +import time +from types import MethodType +from typing import TYPE_CHECKING + +from a2a.client.errors import A2AClientHTTPError +from a2a.types import AgentCapabilities, AgentCard, AgentSkill +from aiocache import cached # type: ignore[import-untyped] +from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] +import httpx + +from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth +from crewai.a2a.auth.utils import ( + _auth_store, + configure_auth_client, + retry_on_401, +) +from crewai.a2a.config import A2AServerConfig +from crewai.crew import Crew + + +if TYPE_CHECKING: + from crewai.a2a.auth.schemas import AuthScheme + from crewai.agent import Agent + from crewai.task import Task + + +def _get_server_config(agent: Agent) -> A2AServerConfig | None: + """Get A2AServerConfig from an agent's a2a configuration. + + Args: + agent: The Agent instance to check. + + Returns: + A2AServerConfig if present, None otherwise. + """ + if agent.a2a is None: + return None + if isinstance(agent.a2a, A2AServerConfig): + return agent.a2a + if isinstance(agent.a2a, list): + for config in agent.a2a: + if isinstance(config, A2AServerConfig): + return config + return None + + +def fetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, + cache_ttl: int = 300, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint with optional caching. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + cache_ttl: Cache TTL in seconds (default 300 = 5 minutes). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + ttl_hash = int(time.time() // cache_ttl) + return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +async def afetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint asynchronously. + + Native async implementation. Use this when running in an async context. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + agent_card: AgentCard = await _afetch_agent_card_cached( + endpoint, auth_hash, timeout + ) + return agent_card + + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +@lru_cache() +def _fetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, + _ttl_hash: int, +) -> AgentCard: + """Cached sync version of fetch_agent_card.""" + auth = _auth_store.get(auth_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] +async def _afetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, +) -> AgentCard: + """Cached async implementation of AgentCard fetching.""" + auth = _auth_store.get(auth_hash) + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +async def _afetch_agent_card_impl( + endpoint: str, + auth: AuthScheme | None, + timeout: int, +) -> AgentCard: + """Internal async implementation of AgentCard fetching.""" + if "/.well-known/agent-card.json" in endpoint: + base_url = endpoint.replace("/.well-known/agent-card.json", "") + agent_card_path = "/.well-known/agent-card.json" + else: + url_parts = endpoint.split("/", 3) + base_url = f"{url_parts[0]}//{url_parts[2]}" + agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" + + headers: MutableMapping[str, str] = {} + if auth: + async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: + if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_auth_client) + headers = await auth.apply_auth(temp_auth_client, {}) + + async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: + if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_client) + + agent_card_url = f"{base_url}{agent_card_path}" + + async def _fetch_agent_card_request() -> httpx.Response: + return await temp_client.get(agent_card_url) + + try: + response = await retry_on_401( + request_func=_fetch_agent_card_request, + auth_scheme=auth, + client=temp_client, + headers=temp_client.headers, + max_retries=2, + ) + response.raise_for_status() + + return AgentCard.model_validate(response.json()) + + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + error_details = ["Authentication failed"] + www_auth = e.response.headers.get("WWW-Authenticate") + if www_auth: + error_details.append(f"WWW-Authenticate: {www_auth}") + if not auth: + error_details.append("No auth scheme provided") + msg = " | ".join(error_details) + raise A2AClientHTTPError(401, msg) from e + raise + + +def _task_to_skill(task: Task) -> AgentSkill: + """Convert a CrewAI Task to an A2A AgentSkill. + + Args: + task: The CrewAI Task to convert. + + Returns: + AgentSkill representing the task's capability. + """ + task_name = task.name or task.description[:50] + task_id = task_name.lower().replace(" ", "_") + + tags: list[str] = [] + if task.agent: + tags.append(task.agent.role.lower().replace(" ", "-")) + + return AgentSkill( + id=task_id, + name=task_name, + description=task.description, + tags=tags, + examples=[task.expected_output] if task.expected_output else None, + ) + + +def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill: + """Convert an Agent's tool to an A2A AgentSkill. + + Args: + tool_name: Name of the tool. + tool_description: Description of what the tool does. + + Returns: + AgentSkill representing the tool's capability. + """ + tool_id = tool_name.lower().replace(" ", "_") + + return AgentSkill( + id=tool_id, + name=tool_name, + description=tool_description, + tags=[tool_name.lower().replace(" ", "-")], + ) + + +def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard: + """Generate an A2A AgentCard from a Crew instance. + + Args: + crew: The Crew instance to generate a card for. + url: The base URL where this crew will be exposed. + + Returns: + AgentCard describing the crew's capabilities. + """ + crew_name = getattr(crew, "name", None) or crew.__class__.__name__ + + description_parts: list[str] = [] + crew_description = getattr(crew, "description", None) + if crew_description: + description_parts.append(crew_description) + else: + agent_roles = [agent.role for agent in crew.agents] + description_parts.append( + f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}" + ) + + skills = [_task_to_skill(task) for task in crew.tasks] + + return AgentCard( + name=crew_name, + description=" ".join(description_parts), + url=url, + version="1.0.0", + capabilities=AgentCapabilities( + streaming=True, + push_notifications=True, + ), + default_input_modes=["text/plain", "application/json"], + default_output_modes=["text/plain", "application/json"], + skills=skills, + ) + + +def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard: + """Generate an A2A AgentCard from an Agent instance. + + Uses A2AServerConfig values when available, falling back to agent properties. + + Args: + agent: The Agent instance to generate a card for. + url: The base URL where this agent will be exposed. + + Returns: + AgentCard describing the agent's capabilities. + """ + server_config = _get_server_config(agent) or A2AServerConfig() + + name = server_config.name or agent.role + + description_parts = [agent.goal] + if agent.backstory: + description_parts.append(agent.backstory) + description = server_config.description or " ".join(description_parts) + + skills: list[AgentSkill] = ( + server_config.skills.copy() if server_config.skills else [] + ) + + if not skills: + if agent.tools: + for tool in agent.tools: + tool_name = getattr(tool, "name", None) or tool.__class__.__name__ + tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}" + skills.append(_tool_to_skill(tool_name, tool_desc)) + + if not skills: + skills.append( + AgentSkill( + id=agent.role.lower().replace(" ", "_"), + name=agent.role, + description=agent.goal, + tags=[agent.role.lower().replace(" ", "-")], + ) + ) + + return AgentCard( + name=name, + description=description, + url=server_config.url or url, + version=server_config.version, + capabilities=server_config.capabilities, + default_input_modes=server_config.default_input_modes, + default_output_modes=server_config.default_output_modes, + skills=skills, + protocol_version=server_config.protocol_version, + provider=server_config.provider, + documentation_url=server_config.documentation_url, + icon_url=server_config.icon_url, + additional_interfaces=server_config.additional_interfaces, + security=server_config.security, + security_schemes=server_config.security_schemes, + supports_authenticated_extended_card=server_config.supports_authenticated_extended_card, + signatures=server_config.signatures, + ) + + +def inject_a2a_server_methods(agent: Agent) -> None: + """Inject A2A server methods onto an Agent instance. + + Adds a `to_agent_card(url: str) -> AgentCard` method to the agent + that generates an A2A-compliant AgentCard. + + Only injects if the agent has an A2AServerConfig. + + Args: + agent: The Agent instance to inject methods onto. + """ + if _get_server_config(agent) is None: + return + + def _to_agent_card(self: Agent, url: str) -> AgentCard: + return _agent_to_agent_card(self, url) + + object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent)) diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils/delegation.py similarity index 64% rename from lib/crewai/src/crewai/a2a/utils.py rename to lib/crewai/src/crewai/a2a/utils/delegation.py index 93aee5215..740f914bc 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -1,16 +1,14 @@ -"""Utility functions for A2A (Agent-to-Agent) protocol delegation.""" +"""A2A delegation utilities for executing tasks on remote agents.""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator, MutableMapping from contextlib import asynccontextmanager -from functools import lru_cache -import time from typing import TYPE_CHECKING, Any, Literal import uuid -from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory +from a2a.client import Client, ClientConfig, ClientFactory from a2a.types import ( AgentCard, Message, @@ -19,19 +17,15 @@ from a2a.types import ( Role, TextPart, ) -from aiocache import cached # type: ignore[import-untyped] -from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] import httpx -from pydantic import BaseModel, Field, create_model +from pydantic import BaseModel from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth from crewai.a2a.auth.utils import ( _auth_store, configure_auth_client, - retry_on_401, validate_auth_against_agent_card, ) -from crewai.a2a.config import A2AConfig from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.types import ( HANDLER_REGISTRY, @@ -45,6 +39,7 @@ from crewai.a2a.updates import ( StreamingHandler, UpdateConfig, ) +from crewai.a2a.utils.agent_card import _afetch_agent_card_cached from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationStartedEvent, @@ -52,7 +47,6 @@ from crewai.events.types.a2a_events import ( A2ADelegationStartedEvent, A2AMessageSentEvent, ) -from crewai.types.utils import create_literals_from_strings if TYPE_CHECKING: @@ -75,187 +69,6 @@ def get_handler(config: UpdateConfig | None) -> HandlerType: return HANDLER_REGISTRY.get(type(config), StreamingHandler) -@lru_cache() -def _fetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, - _ttl_hash: int, -) -> AgentCard: - """Cached sync version of fetch_agent_card.""" - auth = _auth_store.get(auth_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -def fetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, - cache_ttl: int = 300, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint with optional caching. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL) - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - use_cache: Whether to use caching (default True) - cache_ttl: Cache TTL in seconds (default 300 = 5 minutes) - - Returns: - AgentCard object with agent capabilities and skills - - Raises: - httpx.HTTPStatusError: If the request fails - A2AClientHTTPError: If authentication fails - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - ttl_hash = int(time.time() // cache_ttl) - return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -async def afetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint asynchronously. - - Native async implementation. Use this when running in an async context. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL). - auth: Optional AuthScheme for authentication. - timeout: Request timeout in seconds. - use_cache: Whether to use caching (default True). - - Returns: - AgentCard object with agent capabilities and skills. - - Raises: - httpx.HTTPStatusError: If the request fails. - A2AClientHTTPError: If authentication fails. - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - agent_card: AgentCard = await _afetch_agent_card_cached( - endpoint, auth_hash, timeout - ) - return agent_card - - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] -async def _afetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, -) -> AgentCard: - """Cached async implementation of AgentCard fetching.""" - auth = _auth_store.get(auth_hash) - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -async def _afetch_agent_card_impl( - endpoint: str, - auth: AuthScheme | None, - timeout: int, -) -> AgentCard: - """Internal async implementation of AgentCard fetching.""" - if "/.well-known/agent-card.json" in endpoint: - base_url = endpoint.replace("/.well-known/agent-card.json", "") - agent_card_path = "/.well-known/agent-card.json" - else: - url_parts = endpoint.split("/", 3) - base_url = f"{url_parts[0]}//{url_parts[2]}" - agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" - - headers: MutableMapping[str, str] = {} - if auth: - async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: - if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_auth_client) - headers = await auth.apply_auth(temp_auth_client, {}) - - async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: - if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_client) - - agent_card_url = f"{base_url}{agent_card_path}" - - async def _fetch_agent_card_request() -> httpx.Response: - return await temp_client.get(agent_card_url) - - try: - response = await retry_on_401( - request_func=_fetch_agent_card_request, - auth_scheme=auth, - client=temp_client, - headers=temp_client.headers, - max_retries=2, - ) - response.raise_for_status() - - return AgentCard.model_validate(response.json()) - - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: - error_details = ["Authentication failed"] - www_auth = e.response.headers.get("WWW-Authenticate") - if www_auth: - error_details.append(f"WWW-Authenticate: {www_auth}") - if not auth: - error_details.append("No auth scheme provided") - msg = " | ".join(error_details) - raise A2AClientHTTPError(401, msg) from e - raise - - def execute_a2a_delegation( endpoint: str, transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"], @@ -644,19 +457,18 @@ async def _create_a2a_client( """Create and configure an A2A client. Args: - agent_card: The A2A agent card - transport_protocol: Transport protocol to use - timeout: Request timeout in seconds - headers: HTTP headers (already with auth applied) - streaming: Enable streaming responses - auth: Optional AuthScheme for client configuration - use_polling: Enable polling mode - push_notification_config: Optional push notification config to include in requests + agent_card: The A2A agent card. + transport_protocol: Transport protocol to use. + timeout: Request timeout in seconds. + headers: HTTP headers (already with auth applied). + streaming: Enable streaming responses. + auth: Optional AuthScheme for client configuration. + use_polling: Enable polling mode. + push_notification_config: Optional push notification config. Yields: - Configured A2A client instance + Configured A2A client instance. """ - async with httpx.AsyncClient( timeout=timeout, headers=headers, @@ -687,78 +499,3 @@ async def _create_a2a_client( factory = ClientFactory(config) client = factory.create(agent_card) yield client - - -def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: - """Create a dynamic AgentResponse model with Literal types for agent IDs. - - Args: - agent_ids: List of available A2A agent IDs - - Returns: - Dynamically created Pydantic model with Literal-constrained a2a_ids field - """ - - DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 - - return create_model( - "AgentResponse", - a2a_ids=( - tuple[DynamicLiteral, ...], # type: ignore[valid-type] - Field( - default_factory=tuple, - max_length=len(agent_ids), - description="A2A agent IDs to delegate to.", - ), - ), - message=( - str, - Field( - description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." - ), - ), - is_a2a=( - bool, - Field( - description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." - ), - ), - __base__=BaseModel, - ) - - -def extract_a2a_agent_ids_from_config( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], tuple[str, ...]]: - """Extract A2A agent IDs from A2A configuration. - - Args: - a2a_config: A2A configuration - - Returns: - List of A2A agent IDs - """ - if a2a_config is None: - return [], () - - if isinstance(a2a_config, A2AConfig): - a2a_agents = [a2a_config] - else: - a2a_agents = a2a_config - return a2a_agents, tuple(config.endpoint for config in a2a_agents) - - -def get_a2a_agents_and_response_model( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], type[BaseModel]]: - """Get A2A agent IDs and response model. - - Args: - a2a_config: A2A configuration - - Returns: - Tuple of A2A agent IDs and response model - """ - a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) - - return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py new file mode 100644 index 000000000..44d8a5ba6 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/response_model.py @@ -0,0 +1,97 @@ +"""Response model utilities for A2A agent interactions.""" + +from __future__ import annotations + +from typing import TypeAlias + +from pydantic import BaseModel, Field, create_model + +from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig +from crewai.types.utils import create_literals_from_strings + + +A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig +A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig + + +def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: + """Create a dynamic AgentResponse model with Literal types for agent IDs. + + Args: + agent_ids: List of available A2A agent IDs. + + Returns: + Dynamically created Pydantic model with Literal-constrained a2a_ids field. + """ + DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 + + return create_model( + "AgentResponse", + a2a_ids=( + tuple[DynamicLiteral, ...], # type: ignore[valid-type] + Field( + default_factory=tuple, + max_length=len(agent_ids), + description="A2A agent IDs to delegate to.", + ), + ), + message=( + str, + Field( + description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." + ), + ), + is_a2a=( + bool, + Field( + description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." + ), + ), + __base__=BaseModel, + ) + + +def extract_a2a_agent_ids_from_config( + a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, +) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]: + """Extract A2A agent IDs from A2A configuration. + + Filters out A2AServerConfig since it doesn't have an endpoint for delegation. + + Args: + a2a_config: A2A configuration (any type). + + Returns: + Tuple of client A2A configs list and agent endpoint IDs. + """ + if a2a_config is None: + return [], () + + configs: list[A2AConfigTypes] + if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)): + configs = [a2a_config] + else: + configs = a2a_config + + # Filter to only client configs (those with endpoint) + client_configs: list[A2AClientConfigTypes] = [ + config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig)) + ] + + return client_configs, tuple(config.endpoint for config in client_configs) + + +def get_a2a_agents_and_response_model( + a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, +) -> tuple[list[A2AClientConfigTypes], type[BaseModel]]: + """Get A2A agent configs and response model. + + Args: + a2a_config: A2A configuration (any type). + + Returns: + Tuple of client A2A configs and response model. + """ + a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) + + return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 7d52e85b8..37a6c665e 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any from a2a.types import Role, TaskState from pydantic import BaseModel, ValidationError -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig from crewai.a2a.extensions.base import ExtensionRegistry from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.templates import ( @@ -26,13 +26,16 @@ from crewai.a2a.templates import ( UNAVAILABLE_AGENTS_NOTICE_TEMPLATE, ) from crewai.a2a.types import AgentResponseProtocol -from crewai.a2a.utils import ( - aexecute_a2a_delegation, +from crewai.a2a.utils.agent_card import ( afetch_agent_card, - execute_a2a_delegation, fetch_agent_card, - get_a2a_agents_and_response_model, + inject_a2a_server_methods, ) +from crewai.a2a.utils.delegation import ( + aexecute_a2a_delegation, + execute_a2a_delegation, +) +from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationCompletedEvent, @@ -122,10 +125,12 @@ def wrap_agent_with_a2a_instance( agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) ) + inject_a2a_server_methods(agent) + def _fetch_card_from_config( - config: A2AConfig, -) -> tuple[A2AConfig, AgentCard | Exception]: + config: A2AConfig | A2AClientConfig, +) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: """Fetch agent card from A2A config. Args: @@ -146,7 +151,7 @@ def _fetch_card_from_config( def _fetch_agent_cards_concurrently( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: """Fetch agent cards concurrently for multiple A2A agents. @@ -181,7 +186,7 @@ def _fetch_agent_cards_concurrently( def _execute_task_with_a2a( self: Agent, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., str], task: Task, agent_response_model: type[BaseModel], @@ -270,7 +275,7 @@ def _execute_task_with_a2a( def _augment_prompt_with_a2a( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], task_description: str, agent_cards: dict[str, AgentCard], conversation_history: list[Message] | None = None, @@ -523,11 +528,11 @@ def _prepare_delegation_context( task: Task, original_task_description: str | None, ) -> tuple[ - list[A2AConfig], + list[A2AConfig | A2AClientConfig], type[BaseModel], str, str, - A2AConfig, + A2AConfig | A2AClientConfig, str | None, str | None, dict[str, Any] | None, @@ -591,7 +596,7 @@ def _handle_task_completion( task: Task, task_id_config: str | None, reference_task_ids: list[str], - agent_config: A2AConfig, + agent_config: A2AConfig | A2AClientConfig, turn_num: int, ) -> tuple[str | None, str | None, list[str]]: """Handle task completion state including reference task updates. @@ -631,7 +636,7 @@ def _handle_agent_response_and_continue( a2a_result: TaskStateResult, agent_id: str, agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_task_description: str, conversation_history: list[Message], turn_num: int, @@ -868,8 +873,8 @@ def _delegate_to_a2a( async def _afetch_card_from_config( - config: A2AConfig, -) -> tuple[A2AConfig, AgentCard | Exception]: + config: A2AConfig | A2AClientConfig, +) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: """Fetch agent card from A2A config asynchronously.""" try: card = await afetch_agent_card( @@ -883,7 +888,7 @@ async def _afetch_card_from_config( async def _afetch_agent_cards_concurrently( - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: """Fetch agent cards concurrently for multiple A2A agents using asyncio.""" agent_cards: dict[str, AgentCard] = {} @@ -908,7 +913,7 @@ async def _afetch_agent_cards_concurrently( async def _aexecute_task_with_a2a( self: Agent, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., Coroutine[Any, Any, str]], task: Task, agent_response_model: type[BaseModel], @@ -987,7 +992,7 @@ async def _ahandle_agent_response_and_continue( a2a_result: TaskStateResult, agent_id: str, agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig], + a2a_agents: list[A2AConfig | A2AClientConfig], original_task_description: str, conversation_history: list[Message], turn_num: int, diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index d06b3b6f7..1c7a653ec 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -17,7 +17,7 @@ from urllib.parse import urlparse from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator from typing_extensions import Self -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig from crewai.agent.utils import ( ahandle_knowledge_retrieval, apply_training_data, @@ -73,7 +73,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F from crewai.utilities.converter import Converter from crewai.utilities.guardrail_types import GuardrailType from crewai.utilities.llm_utils import create_llm -from crewai.utilities.prompts import Prompts +from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler @@ -218,9 +218,18 @@ class Agent(BaseAgent): guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) - a2a: list[A2AConfig] | A2AConfig | None = Field( + a2a: ( + list[A2AConfig | A2AServerConfig | A2AClientConfig] + | A2AConfig + | A2AServerConfig + | A2AClientConfig + | None + ) = Field( default=None, - description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.", + description=""" + A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. + Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig. + """, ) executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field( default=CrewAgentExecutor, @@ -733,7 +742,7 @@ class Agent(BaseAgent): if self.agent_executor is not None: self._update_executor_parameters( task=task, - tools=parsed_tools, + tools=parsed_tools, # type: ignore[arg-type] raw_tools=raw_tools, prompt=prompt, stop_words=stop_words, @@ -742,7 +751,7 @@ class Agent(BaseAgent): else: self.agent_executor = self.executor_class( llm=cast(BaseLLM, self.llm), - task=task, + task=task, # type: ignore[arg-type] i18n=self.i18n, agent=self, crew=self.crew, @@ -765,11 +774,11 @@ class Agent(BaseAgent): def _update_executor_parameters( self, task: Task | None, - tools: list, + tools: list[BaseTool], raw_tools: list[BaseTool], - prompt: dict, + prompt: SystemPromptResult | StandardPromptResult, stop_words: list[str], - rpm_limit_fn: Callable | None, + rpm_limit_fn: Callable | None, # type: ignore[type-arg] ) -> None: """Update executor parameters without recreating instance. diff --git a/lib/crewai/tests/a2a/utils/test_agent_card.py b/lib/crewai/tests/a2a/utils/test_agent_card.py new file mode 100644 index 000000000..fb96710a7 --- /dev/null +++ b/lib/crewai/tests/a2a/utils/test_agent_card.py @@ -0,0 +1,325 @@ +"""Tests for A2A agent card utilities.""" + +from __future__ import annotations + +from a2a.types import AgentCard, AgentSkill + +from crewai import Agent +from crewai.a2a.config import A2AClientConfig, A2AServerConfig +from crewai.a2a.utils.agent_card import inject_a2a_server_methods + + +class TestInjectA2AServerMethods: + """Tests for inject_a2a_server_methods function.""" + + def test_agent_with_server_config_gets_to_agent_card_method(self) -> None: + """Agent with A2AServerConfig should have to_agent_card method injected.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + def test_agent_without_server_config_no_injection(self) -> None: + """Agent without A2AServerConfig should not get to_agent_card method.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AClientConfig(endpoint="http://example.com"), + ) + + assert not hasattr(agent, "to_agent_card") + + def test_agent_without_a2a_no_injection(self) -> None: + """Agent without any a2a config should not get to_agent_card method.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + ) + + assert not hasattr(agent, "to_agent_card") + + def test_agent_with_mixed_configs_gets_injection(self) -> None: + """Agent with list containing A2AServerConfig should get to_agent_card.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=[ + A2AClientConfig(endpoint="http://example.com"), + A2AServerConfig(name="My Agent"), + ], + ) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + def test_manual_injection_on_plain_agent(self) -> None: + """inject_a2a_server_methods should work when called manually.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + ) + # Manually set server config and inject + object.__setattr__(agent, "a2a", A2AServerConfig()) + inject_a2a_server_methods(agent) + + assert hasattr(agent, "to_agent_card") + assert callable(agent.to_agent_card) + + +class TestToAgentCard: + """Tests for the injected to_agent_card method.""" + + def test_returns_agent_card(self) -> None: + """to_agent_card should return an AgentCard instance.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert isinstance(card, AgentCard) + + def test_uses_agent_role_as_name(self) -> None: + """AgentCard name should default to agent role.""" + agent = Agent( + role="Data Analyst", + goal="Analyze data", + backstory="Expert analyst", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.name == "Data Analyst" + + def test_uses_server_config_name(self) -> None: + """AgentCard name should prefer A2AServerConfig.name over role.""" + agent = Agent( + role="Data Analyst", + goal="Analyze data", + backstory="Expert analyst", + a2a=A2AServerConfig(name="Custom Agent Name"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.name == "Custom Agent Name" + + def test_uses_goal_as_description(self) -> None: + """AgentCard description should include agent goal.""" + agent = Agent( + role="Test Agent", + goal="Accomplish important tasks", + backstory="Has extensive experience", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert "Accomplish important tasks" in card.description + + def test_uses_server_config_description(self) -> None: + """AgentCard description should prefer A2AServerConfig.description.""" + agent = Agent( + role="Test Agent", + goal="Accomplish important tasks", + backstory="Has extensive experience", + a2a=A2AServerConfig(description="Custom description"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.description == "Custom description" + + def test_uses_provided_url(self) -> None: + """AgentCard url should use the provided URL.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://my-server.com:9000") + + assert card.url == "http://my-server.com:9000" + + def test_uses_server_config_url(self) -> None: + """AgentCard url should prefer A2AServerConfig.url over provided URL.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(url="http://configured-url.com"), + ) + + card = agent.to_agent_card("http://fallback-url.com") + + assert card.url == "http://configured-url.com/" + + def test_generates_default_skill(self) -> None: + """AgentCard should have at least one skill based on agent role.""" + agent = Agent( + role="Research Assistant", + goal="Help with research", + backstory="Skilled researcher", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert len(card.skills) >= 1 + skill = card.skills[0] + assert skill.name == "Research Assistant" + assert skill.description == "Help with research" + + def test_uses_server_config_skills(self) -> None: + """AgentCard skills should prefer A2AServerConfig.skills.""" + custom_skill = AgentSkill( + id="custom-skill", + name="Custom Skill", + description="A custom skill", + tags=["custom"], + ) + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(skills=[custom_skill]), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert len(card.skills) == 1 + assert card.skills[0].id == "custom-skill" + assert card.skills[0].name == "Custom Skill" + + def test_includes_custom_version(self) -> None: + """AgentCard should include version from A2AServerConfig.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(version="2.0.0"), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.version == "2.0.0" + + def test_default_version(self) -> None: + """AgentCard should have default version 1.0.0.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + + assert card.version == "1.0.0" + + +class TestAgentCardJsonStructure: + """Tests for the JSON structure of AgentCard.""" + + def test_json_has_required_fields(self) -> None: + """AgentCard JSON should contain all required A2A protocol fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + assert "name" in json_data + assert "description" in json_data + assert "url" in json_data + assert "version" in json_data + assert "skills" in json_data + assert "capabilities" in json_data + assert "defaultInputModes" in json_data + assert "defaultOutputModes" in json_data + + def test_json_skills_structure(self) -> None: + """Each skill in JSON should have required fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + assert len(json_data["skills"]) >= 1 + skill = json_data["skills"][0] + assert "id" in skill + assert "name" in skill + assert "description" in skill + assert "tags" in skill + + def test_json_capabilities_structure(self) -> None: + """Capabilities in JSON should have expected fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump() + + capabilities = json_data["capabilities"] + assert "streaming" in capabilities + assert "pushNotifications" in capabilities + + def test_json_serializable(self) -> None: + """AgentCard should be JSON serializable.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_str = card.model_dump_json() + + assert isinstance(json_str, str) + assert "Test Agent" in json_str + assert "http://localhost:8000" in json_str + + def test_json_excludes_none_values(self) -> None: + """AgentCard JSON with exclude_none should omit None fields.""" + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + a2a=A2AServerConfig(), + ) + + card = agent.to_agent_card("http://localhost:8000") + json_data = card.model_dump(exclude_none=True) + + assert "provider" not in json_data + assert "documentationUrl" not in json_data + assert "iconUrl" not in json_data diff --git a/pyproject.toml b/pyproject.toml index 594bdf9aa..de3f03ecb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,7 +117,7 @@ show_error_codes = true warn_unused_ignores = true python_version = "3.12" exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)" -plugins = ["pydantic.mypy", "crewai.mypy"] +plugins = ["pydantic.mypy"] [tool.bandit] From 641c336b2c590baa1f269570e463f76d2282be68 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 14 Jan 2026 22:46:53 -0500 Subject: [PATCH 2/4] chore: a2a agent card docs, refine existing a2a docs --- docs/en/learn/a2a-agent-delegation.mdx | 233 ++++++++++++++++++++----- lib/crewai/src/crewai/a2a/__init__.py | 4 +- 2 files changed, 189 insertions(+), 48 deletions(-) diff --git a/docs/en/learn/a2a-agent-delegation.mdx b/docs/en/learn/a2a-agent-delegation.mdx index 174939e3d..942ca8bd0 100644 --- a/docs/en/learn/a2a-agent-delegation.mdx +++ b/docs/en/learn/a2a-agent-delegation.mdx @@ -1,43 +1,48 @@ --- title: Agent-to-Agent (A2A) Protocol -description: Enable CrewAI agents to delegate tasks to remote A2A-compliant agents for specialized handling +description: Agents delegate tasks to remote A2A agents and/or operate as A2A-compliant server agents. icon: network-wired mode: "wide" --- ## A2A Agent Delegation -CrewAI supports the Agent-to-Agent (A2A) protocol, allowing agents to delegate tasks to remote specialized agents. The agent's LLM automatically decides whether to handle a task directly or delegate to an A2A agent based on the task requirements. - - - A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'` - +CrewAI treats [A2A protocol](https://a2a-protocol.org/latest/) as a first-class delegation primitive, enabling agents to delegate tasks, request information, and collaborate with remote agents, as well as act as A2A-compliant server agents. +In client mode, agents autonomously choose between local execution and remote delegation based on task requirements. ## How It Works When an agent is configured with A2A capabilities: -1. The LLM analyzes each task +1. The Agent analyzes each task 2. It decides to either: - Handle the task directly using its own capabilities - Delegate to a remote A2A agent for specialized handling 3. If delegating, the agent communicates with the remote A2A agent through the protocol 4. Results are returned to the CrewAI workflow + + A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'` + + ## Basic Configuration + + `crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0. Use `A2AClientConfig` for connecting to remote agents and/or `A2AServerConfig` for exposing agents as servers. + + Configure an agent for A2A delegation by setting the `a2a` parameter: ```python Code from crewai import Agent, Crew, Task -from crewai.a2a import A2AConfig +from crewai.a2a import A2AClientConfig agent = Agent( role="Research Coordinator", goal="Coordinate research tasks efficiently", backstory="Expert at delegating to specialized research agents", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://example.com/.well-known/agent-card.json", timeout=120, max_turns=10 @@ -54,9 +59,9 @@ crew = Crew(agents=[agent], tasks=[task], verbose=True) result = crew.kickoff() ``` -## Configuration Options +## Client Configuration Options -The `A2AConfig` class accepts the following parameters: +The `A2AClientConfig` class accepts the following parameters: The A2A agent endpoint URL (typically points to `.well-known/agent-card.json`) @@ -95,14 +100,30 @@ The `A2AConfig` class accepts the following parameters: Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`. + + Media types the client can accept in responses. + + + + Ordered list of transport protocols the client supports. + + + + Whether to prioritize client transport preferences over server. + + + + Extension URIs the client supports. + + ## Authentication For A2A agents that require authentication, use one of the provided auth schemes: - ```python Code -from crewai.a2a import A2AConfig +```python bearer_token_auth.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.auth import BearerTokenAuth agent = Agent( @@ -110,18 +131,18 @@ agent = Agent( goal="Coordinate tasks with secured agents", backstory="Manages secure agent communications", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://secure-agent.example.com/.well-known/agent-card.json", auth=BearerTokenAuth(token="your-bearer-token"), timeout=120 ) ) - ``` +``` - ```python Code -from crewai.a2a import A2AConfig +```python api_key_auth.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.auth import APIKeyAuth agent = Agent( @@ -129,7 +150,7 @@ agent = Agent( goal="Coordinate with API-based agents", backstory="Manages API-authenticated communications", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://api-agent.example.com/.well-known/agent-card.json", auth=APIKeyAuth( api_key="your-api-key", @@ -139,12 +160,12 @@ agent = Agent( timeout=120 ) ) - ``` +``` - ```python Code -from crewai.a2a import A2AConfig +```python oauth2_auth.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.auth import OAuth2ClientCredentials agent = Agent( @@ -152,7 +173,7 @@ agent = Agent( goal="Coordinate with OAuth-secured agents", backstory="Manages OAuth-authenticated communications", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://oauth-agent.example.com/.well-known/agent-card.json", auth=OAuth2ClientCredentials( token_url="https://auth.example.com/oauth/token", @@ -163,12 +184,12 @@ agent = Agent( timeout=120 ) ) - ``` +``` - ```python Code -from crewai.a2a import A2AConfig +```python http_basic_auth.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.auth import HTTPBasicAuth agent = Agent( @@ -176,7 +197,7 @@ agent = Agent( goal="Coordinate with basic auth agents", backstory="Manages basic authentication communications", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://basic-agent.example.com/.well-known/agent-card.json", auth=HTTPBasicAuth( username="your-username", @@ -185,7 +206,7 @@ agent = Agent( timeout=120 ) ) - ``` +``` @@ -194,7 +215,7 @@ agent = Agent( Configure multiple A2A agents for delegation by passing a list: ```python Code -from crewai.a2a import A2AConfig +from crewai.a2a import A2AClientConfig from crewai.a2a.auth import BearerTokenAuth agent = Agent( @@ -203,11 +224,11 @@ agent = Agent( backstory="Expert at delegating to the right specialist", llm="gpt-4o", a2a=[ - A2AConfig( + A2AClientConfig( endpoint="https://research.example.com/.well-known/agent-card.json", timeout=120 ), - A2AConfig( + A2AClientConfig( endpoint="https://data.example.com/.well-known/agent-card.json", auth=BearerTokenAuth(token="data-token"), timeout=90 @@ -223,7 +244,7 @@ The LLM will automatically choose which A2A agent to delegate to based on the ta Control how agent connection failures are handled using the `fail_fast` parameter: ```python Code -from crewai.a2a import A2AConfig +from crewai.a2a import A2AClientConfig # Fail immediately on connection errors (default) agent = Agent( @@ -231,7 +252,7 @@ agent = Agent( goal="Coordinate research tasks", backstory="Expert at delegation", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://research.example.com/.well-known/agent-card.json", fail_fast=True ) @@ -244,11 +265,11 @@ agent = Agent( backstory="Expert at working with available resources", llm="gpt-4o", a2a=[ - A2AConfig( + A2AClientConfig( endpoint="https://primary.example.com/.well-known/agent-card.json", fail_fast=False ), - A2AConfig( + A2AClientConfig( endpoint="https://backup.example.com/.well-known/agent-card.json", fail_fast=False ) @@ -267,8 +288,8 @@ Control how your agent receives task status updates from remote A2A agents: - ```python Code -from crewai.a2a import A2AConfig +```python streaming_config.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.updates import StreamingConfig agent = Agent( @@ -276,17 +297,17 @@ agent = Agent( goal="Coordinate research tasks", backstory="Expert at delegation", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://research.example.com/.well-known/agent-card.json", updates=StreamingConfig() ) ) - ``` +``` - ```python Code -from crewai.a2a import A2AConfig +```python polling_config.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.updates import PollingConfig agent = Agent( @@ -294,7 +315,7 @@ agent = Agent( goal="Coordinate research tasks", backstory="Expert at delegation", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://research.example.com/.well-known/agent-card.json", updates=PollingConfig( interval=2.0, @@ -303,12 +324,12 @@ agent = Agent( ) ) ) - ``` +``` - ```python Code -from crewai.a2a import A2AConfig +```python push_notifications_config.py lines +from crewai.a2a import A2AClientConfig from crewai.a2a.updates import PushNotificationConfig agent = Agent( @@ -316,19 +337,137 @@ agent = Agent( goal="Coordinate research tasks", backstory="Expert at delegation", llm="gpt-4o", - a2a=A2AConfig( + a2a=A2AClientConfig( endpoint="https://research.example.com/.well-known/agent-card.json", updates=PushNotificationConfig( - url={base_url}/a2a/callback", + url="{base_url}/a2a/callback", token="your-validation-token", timeout=300.0 ) ) ) - ``` +``` +## Exposing Agents as A2A Servers + +You can expose your CrewAI agents as A2A-compliant servers, allowing other A2A clients to delegate tasks to them. + +### Server Configuration + +Add an `A2AServerConfig` to your agent to enable server capabilities: + +```python a2a_server_agent.py lines +from crewai import Agent +from crewai.a2a import A2AServerConfig + +agent = Agent( + role="Data Analyst", + goal="Analyze datasets and provide insights", + backstory="Expert data scientist with statistical analysis skills", + llm="gpt-4o", + a2a=A2AServerConfig(url="https://your-server.com") +) +``` + +### Server Configuration Options + + + Human-readable name for the agent. Defaults to the agent's role if not provided. + + + + Human-readable description. Defaults to the agent's goal and backstory if not provided. + + + + Version string for the agent card. + + + + List of agent skills. Auto-generated from agent tools if not provided. + + + + Declaration of optional capabilities supported by the agent. + + + + Supported input MIME types. + + + + Supported output MIME types. + + + + Preferred endpoint URL. If set, overrides the URL passed to `to_agent_card()`. + + + + Transport protocol for the preferred endpoint. + + + + A2A protocol version this agent supports. + + + + Information about the agent's service provider. + + + + URL to the agent's documentation. + + + + URL to an icon for the agent. + + + + Additional supported interfaces (transport and URL combinations). + + + + Security requirement objects for all agent interactions. + + + + Security schemes available to authorize requests. + + + + Whether agent provides extended card to authenticated users. + + + + JSON Web Signatures for the AgentCard. + + +### Combined Client and Server + +An agent can act as both client and server by providing both configurations: + +```python Code +from crewai import Agent +from crewai.a2a import A2AClientConfig, A2AServerConfig + +agent = Agent( + role="Research Coordinator", + goal="Coordinate research and serve analysis requests", + backstory="Expert at delegation and analysis", + llm="gpt-4o", + a2a=[ + A2AClientConfig( + endpoint="https://specialist.example.com/.well-known/agent-card.json", + timeout=120 + ), + A2AServerConfig(url="https://your-server.com") + ] +) +``` + ## Best Practices diff --git a/lib/crewai/src/crewai/a2a/__init__.py b/lib/crewai/src/crewai/a2a/__init__.py index 288e805f6..634f77708 100644 --- a/lib/crewai/src/crewai/a2a/__init__.py +++ b/lib/crewai/src/crewai/a2a/__init__.py @@ -1,8 +1,10 @@ """Agent-to-Agent (A2A) protocol communication module for CrewAI.""" -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig __all__ = [ + "A2AClientConfig", "A2AConfig", + "A2AServerConfig", ] From 6a19b0a279a9eb6414affd9be79e52520067b3e0 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 14 Jan 2026 22:56:17 -0500 Subject: [PATCH 3/4] feat: a2a task execution utilities --- lib/crewai/src/crewai/a2a/config.py | 24 +- .../src/crewai/a2a/utils/response_model.py | 10 +- lib/crewai/src/crewai/a2a/utils/task.py | 284 ++++++++++++++ lib/crewai/src/crewai/agent/core.py | 9 +- lib/crewai/src/crewai/events/event_types.py | 34 +- .../src/crewai/events/types/a2a_events.py | 34 ++ lib/crewai/src/crewai/types/utils.py | 11 +- lib/crewai/tests/a2a/utils/test_task.py | 370 ++++++++++++++++++ 8 files changed, 759 insertions(+), 17 deletions(-) create mode 100644 lib/crewai/src/crewai/a2a/utils/task.py create mode 100644 lib/crewai/tests/a2a/utils/test_task.py diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index 1597ae821..1b8cd7d81 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -8,14 +8,6 @@ from __future__ import annotations from importlib.metadata import version from typing import Any, ClassVar, Literal -from a2a.types import ( - AgentCapabilities, - AgentCardSignature, - AgentInterface, - AgentProvider, - AgentSkill, - SecurityScheme, -) from pydantic import BaseModel, ConfigDict, Field from typing_extensions import deprecated @@ -24,8 +16,24 @@ from crewai.a2a.types import TransportType, Url try: + from a2a.types import ( + AgentCapabilities, + AgentCardSignature, + AgentInterface, + AgentProvider, + AgentSkill, + SecurityScheme, + ) + from crewai.a2a.updates import UpdateConfig except ImportError: + UpdateConfig = Any + AgentCapabilities = Any + AgentCardSignature = Any + AgentInterface = Any + AgentProvider = Any + SecurityScheme = Any + AgentSkill = Any UpdateConfig = Any # type: ignore[misc,assignment] diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py index 44d8a5ba6..4e65ef2b7 100644 --- a/lib/crewai/src/crewai/a2a/utils/response_model.py +++ b/lib/crewai/src/crewai/a2a/utils/response_model.py @@ -14,15 +14,19 @@ A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig -def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: +def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None: """Create a dynamic AgentResponse model with Literal types for agent IDs. Args: agent_ids: List of available A2A agent IDs. Returns: - Dynamically created Pydantic model with Literal-constrained a2a_ids field. + Dynamically created Pydantic model with Literal-constrained a2a_ids field, + or None if agent_ids is empty. """ + if not agent_ids: + return None + DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 return create_model( @@ -83,7 +87,7 @@ def extract_a2a_agent_ids_from_config( def get_a2a_agents_and_response_model( a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, -) -> tuple[list[A2AClientConfigTypes], type[BaseModel]]: +) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]: """Get A2A agent configs and response model. Args: diff --git a/lib/crewai/src/crewai/a2a/utils/task.py b/lib/crewai/src/crewai/a2a/utils/task.py new file mode 100644 index 000000000..5669e7e4b --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/task.py @@ -0,0 +1,284 @@ +"""A2A task utilities for server-side task management.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Coroutine +from functools import wraps +import logging +import os +from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast + +from a2a.server.agent_execution import RequestContext +from a2a.server.events import EventQueue +from a2a.types import ( + InternalError, + InvalidParamsError, + Message, + Task as A2ATask, + TaskState, + TaskStatus, + TaskStatusUpdateEvent, +) +from a2a.utils import new_agent_text_message, new_text_artifact +from a2a.utils.errors import ServerError +from aiocache import SimpleMemoryCache, caches # type: ignore[import-untyped] + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.a2a_events import ( + A2AServerTaskCanceledEvent, + A2AServerTaskCompletedEvent, + A2AServerTaskFailedEvent, + A2AServerTaskStartedEvent, +) +from crewai.task import Task + + +if TYPE_CHECKING: + from crewai.agent import Agent + + +logger = logging.getLogger(__name__) + +P = ParamSpec("P") +T = TypeVar("T") + + +def _parse_redis_url(url: str) -> dict[str, Any]: + from urllib.parse import urlparse + + parsed = urlparse(url) + config: dict[str, Any] = { + "cache": "aiocache.RedisCache", + "endpoint": parsed.hostname or "localhost", + "port": parsed.port or 6379, + } + if parsed.path and parsed.path != "/": + try: + config["db"] = int(parsed.path.lstrip("/")) + except ValueError: + pass + if parsed.password: + config["password"] = parsed.password + return config + + +_redis_url = os.environ.get("REDIS_URL") + +caches.set_config( + { + "default": _parse_redis_url(_redis_url) + if _redis_url + else { + "cache": "aiocache.SimpleMemoryCache", + } + } +) + + +def cancellable( + fn: Callable[P, Coroutine[Any, Any, T]], +) -> Callable[P, Coroutine[Any, Any, T]]: + """Decorator that enables cancellation for A2A task execution. + + Runs a cancellation watcher concurrently with the wrapped function. + When a cancel event is published, the execution is cancelled. + + Args: + fn: The async function to wrap. + + Returns: + Wrapped function with cancellation support. + """ + + @wraps(fn) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + """Wrap function with cancellation monitoring.""" + context: RequestContext | None = None + for arg in args: + if isinstance(arg, RequestContext): + context = arg + break + if context is None: + context = cast(RequestContext | None, kwargs.get("context")) + + if context is None: + return await fn(*args, **kwargs) + + task_id = context.task_id + cache = caches.get("default") + + async def poll_for_cancel() -> bool: + """Poll cache for cancellation flag.""" + while True: + if await cache.get(f"cancel:{task_id}"): + return True + await asyncio.sleep(0.1) + + async def watch_for_cancel() -> bool: + """Watch for cancellation events via pub/sub or polling.""" + if isinstance(cache, SimpleMemoryCache): + return await poll_for_cancel() + + try: + client = cache.client + pubsub = client.pubsub() + await pubsub.subscribe(f"cancel:{task_id}") + async for message in pubsub.listen(): + if message["type"] == "message": + return True + except Exception as e: + logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e) + return await poll_for_cancel() + return False + + execute_task = asyncio.create_task(fn(*args, **kwargs)) + cancel_watch = asyncio.create_task(watch_for_cancel()) + + try: + done, _ = await asyncio.wait( + [execute_task, cancel_watch], + return_when=asyncio.FIRST_COMPLETED, + ) + + if cancel_watch in done: + execute_task.cancel() + try: + await execute_task + except asyncio.CancelledError: + pass + raise asyncio.CancelledError(f"Task {task_id} was cancelled") + cancel_watch.cancel() + return execute_task.result() + finally: + await cache.delete(f"cancel:{task_id}") + + return wrapper + + +@cancellable +async def execute( + agent: Agent, + context: RequestContext, + event_queue: EventQueue, +) -> None: + """Execute an A2A task using a CrewAI agent. + + Args: + agent: The CrewAI agent to execute the task. + context: The A2A request context containing the user's message. + event_queue: The event queue for sending responses back. + + TODOs: + * need to impl both of structured output and file inputs, depends on `file_inputs` for + `crewai.task.Task`, pass the below two to Task. both utils in `a2a.utils.parts` + * structured outputs ingestion, `structured_inputs = get_data_parts(parts=context.message.parts)` + * file inputs ingestion, `file_inputs = get_file_parts(parts=context.message.parts)` + """ + + user_message = context.get_user_input() + task_id = context.task_id + context_id = context.context_id + if task_id is None or context_id is None: + msg = "task_id and context_id are required" + crewai_event_bus.emit( + agent, + A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg), + ) + raise ServerError(InvalidParamsError(message=msg)) from None + + task = Task( + description=user_message, + expected_output="Response to the user's request", + agent=agent, + ) + + crewai_event_bus.emit( + agent, + A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id), + ) + + try: + result = await agent.aexecute_task(task=task, tools=agent.tools) + result_str = str(result) + history: list[Message] = [context.message] if context.message else [] + history.append(new_agent_text_message(result_str, context_id, task_id)) + await event_queue.enqueue_event( + A2ATask( + id=task_id, + context_id=context_id, + status=TaskStatus(state=TaskState.input_required), + artifacts=[new_text_artifact(result_str, f"result_{task_id}")], + history=history, + ) + ) + crewai_event_bus.emit( + agent, + A2AServerTaskCompletedEvent( + a2a_task_id=task_id, a2a_context_id=context_id, result=str(result) + ), + ) + except asyncio.CancelledError: + crewai_event_bus.emit( + agent, + A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id), + ) + raise + except Exception as e: + crewai_event_bus.emit( + agent, + A2AServerTaskFailedEvent( + a2a_task_id=task_id, a2a_context_id=context_id, error=str(e) + ), + ) + raise ServerError( + error=InternalError(message=f"Task execution failed: {e}") + ) from e + + +async def cancel( + context: RequestContext, + event_queue: EventQueue, +) -> A2ATask | None: + """Cancel an A2A task. + + Publishes a cancel event that the cancellable decorator listens for. + + Args: + context: The A2A request context containing task information. + event_queue: The event queue for sending the cancellation status. + + Returns: + The canceled task with updated status. + """ + task_id = context.task_id + context_id = context.context_id + if task_id is None or context_id is None: + raise ServerError(InvalidParamsError(message="task_id and context_id required")) + + if context.current_task and context.current_task.status.state in ( + TaskState.completed, + TaskState.failed, + TaskState.canceled, + ): + return context.current_task + + cache = caches.get("default") + + await cache.set(f"cancel:{task_id}", True, ttl=3600) + if not isinstance(cache, SimpleMemoryCache): + await cache.client.publish(f"cancel:{task_id}", "cancel") + + await event_queue.enqueue_event( + TaskStatusUpdateEvent( + task_id=task_id, + context_id=context_id, + status=TaskStatus(state=TaskState.canceled), + final=True, + ) + ) + + if context.current_task: + context.current_task.status = TaskStatus(state=TaskState.canceled) + return context.current_task + return None diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 1c7a653ec..bc964754c 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -17,7 +17,6 @@ from urllib.parse import urlparse from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator from typing_extensions import Self -from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig from crewai.agent.utils import ( ahandle_knowledge_retrieval, apply_training_data, @@ -78,6 +77,14 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler +try: + from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig +except ImportError: + A2AClientConfig = Any + A2AConfig = Any + A2AServerConfig = Any + + if TYPE_CHECKING: from crewai_tools import CodeInterpreterTool diff --git a/lib/crewai/src/crewai/events/event_types.py b/lib/crewai/src/crewai/events/event_types.py index ea00aa9ae..b4479021e 100644 --- a/lib/crewai/src/crewai/events/event_types.py +++ b/lib/crewai/src/crewai/events/event_types.py @@ -1,3 +1,20 @@ +from crewai.events.types.a2a_events import ( + A2AConversationCompletedEvent, + A2AConversationStartedEvent, + A2ADelegationCompletedEvent, + A2ADelegationStartedEvent, + A2AMessageSentEvent, + A2APollingStartedEvent, + A2APollingStatusEvent, + A2APushNotificationReceivedEvent, + A2APushNotificationRegisteredEvent, + A2APushNotificationTimeoutEvent, + A2AResponseReceivedEvent, + A2AServerTaskCanceledEvent, + A2AServerTaskCompletedEvent, + A2AServerTaskFailedEvent, + A2AServerTaskStartedEvent, +) from crewai.events.types.agent_events import ( AgentExecutionCompletedEvent, AgentExecutionErrorEvent, @@ -76,7 +93,22 @@ from crewai.events.types.tool_usage_events import ( EventTypes = ( - CrewKickoffStartedEvent + A2AConversationCompletedEvent + | A2AConversationStartedEvent + | A2ADelegationCompletedEvent + | A2ADelegationStartedEvent + | A2AMessageSentEvent + | A2APollingStartedEvent + | A2APollingStatusEvent + | A2APushNotificationReceivedEvent + | A2APushNotificationRegisteredEvent + | A2APushNotificationTimeoutEvent + | A2AResponseReceivedEvent + | A2AServerTaskCanceledEvent + | A2AServerTaskCompletedEvent + | A2AServerTaskFailedEvent + | A2AServerTaskStartedEvent + | CrewKickoffStartedEvent | CrewKickoffCompletedEvent | CrewKickoffFailedEvent | CrewTestStartedEvent diff --git a/lib/crewai/src/crewai/events/types/a2a_events.py b/lib/crewai/src/crewai/events/types/a2a_events.py index 87eb6040b..9f414b333 100644 --- a/lib/crewai/src/crewai/events/types/a2a_events.py +++ b/lib/crewai/src/crewai/events/types/a2a_events.py @@ -210,3 +210,37 @@ class A2APushNotificationTimeoutEvent(A2AEventBase): type: str = "a2a_push_notification_timeout" task_id: str timeout_seconds: float + + +class A2AServerTaskStartedEvent(A2AEventBase): + """Event emitted when an A2A server task execution starts.""" + + type: str = "a2a_server_task_started" + a2a_task_id: str + a2a_context_id: str + + +class A2AServerTaskCompletedEvent(A2AEventBase): + """Event emitted when an A2A server task execution completes.""" + + type: str = "a2a_server_task_completed" + a2a_task_id: str + a2a_context_id: str + result: str + + +class A2AServerTaskCanceledEvent(A2AEventBase): + """Event emitted when an A2A server task execution is canceled.""" + + type: str = "a2a_server_task_canceled" + a2a_task_id: str + a2a_context_id: str + + +class A2AServerTaskFailedEvent(A2AEventBase): + """Event emitted when an A2A server task execution fails.""" + + type: str = "a2a_server_task_failed" + a2a_task_id: str + a2a_context_id: str + error: str diff --git a/lib/crewai/src/crewai/types/utils.py b/lib/crewai/src/crewai/types/utils.py index f46f9795c..afc9f5329 100644 --- a/lib/crewai/src/crewai/types/utils.py +++ b/lib/crewai/src/crewai/types/utils.py @@ -1,8 +1,6 @@ """Utilities for creating and manipulating types.""" -from typing import Annotated, Final, Literal - -from typing_extensions import TypeAliasType +from typing import Annotated, Final, Literal, cast _DYNAMIC_LITERAL_ALIAS: Final[Literal["DynamicLiteral"]] = "DynamicLiteral" @@ -20,6 +18,11 @@ def create_literals_from_strings( Returns: Literal type for each A2A agent ID + + Raises: + ValueError: If values is empty (Literal requires at least one value) """ unique_values: tuple[str, ...] = tuple(dict.fromkeys(values)) - return Literal.__getitem__(unique_values) + if not unique_values: + raise ValueError("Cannot create Literal type from empty values") + return cast(type, Literal.__getitem__(unique_values)) diff --git a/lib/crewai/tests/a2a/utils/test_task.py b/lib/crewai/tests/a2a/utils/test_task.py new file mode 100644 index 000000000..0c01a0afc --- /dev/null +++ b/lib/crewai/tests/a2a/utils/test_task.py @@ -0,0 +1,370 @@ +"""Tests for A2A task utilities.""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio +from a2a.server.agent_execution import RequestContext +from a2a.server.events import EventQueue +from a2a.types import Message, Task as A2ATask, TaskState, TaskStatus + +from crewai.a2a.utils.task import cancel, cancellable, execute + + +@pytest.fixture +def mock_agent() -> MagicMock: + """Create a mock CrewAI agent.""" + agent = MagicMock() + agent.role = "Test Agent" + agent.tools = [] + agent.aexecute_task = AsyncMock(return_value="Task completed successfully") + return agent + + +@pytest.fixture +def mock_task() -> MagicMock: + """Create a mock Task.""" + return MagicMock() + + +@pytest.fixture +def mock_context() -> MagicMock: + """Create a mock RequestContext.""" + context = MagicMock(spec=RequestContext) + context.task_id = "test-task-123" + context.context_id = "test-context-456" + context.get_user_input.return_value = "Test user message" + context.message = MagicMock(spec=Message) + context.current_task = None + return context + + +@pytest.fixture +def mock_event_queue() -> AsyncMock: + """Create a mock EventQueue.""" + queue = AsyncMock(spec=EventQueue) + queue.enqueue_event = AsyncMock() + return queue + + +@pytest_asyncio.fixture(autouse=True) +async def clear_cache(mock_context: MagicMock) -> None: + """Clear cancel flag from cache before each test.""" + from aiocache import caches + + cache = caches.get("default") + await cache.delete(f"cancel:{mock_context.task_id}") + + +class TestCancellableDecorator: + """Tests for the cancellable decorator.""" + + @pytest.mark.asyncio + async def test_executes_function_without_context(self) -> None: + """Function executes normally when no RequestContext is provided.""" + call_count = 0 + + @cancellable + async def my_func(value: int) -> int: + nonlocal call_count + call_count += 1 + return value * 2 + + result = await my_func(5) + + assert result == 10 + assert call_count == 1 + + @pytest.mark.asyncio + async def test_executes_function_with_context(self, mock_context: MagicMock) -> None: + """Function executes normally with RequestContext when not cancelled.""" + @cancellable + async def my_func(context: RequestContext) -> str: + await asyncio.sleep(0.01) + return "completed" + + result = await my_func(mock_context) + + assert result == "completed" + + @pytest.mark.asyncio + async def test_cancellation_raises_cancelled_error( + self, mock_context: MagicMock + ) -> None: + """Function raises CancelledError when cancel flag is set.""" + from aiocache import caches + + cache = caches.get("default") + + @cancellable + async def slow_func(context: RequestContext) -> str: + await asyncio.sleep(1.0) + return "should not reach" + + await cache.set(f"cancel:{mock_context.task_id}", True) + + with pytest.raises(asyncio.CancelledError): + await slow_func(mock_context) + + @pytest.mark.asyncio + async def test_cleanup_removes_cancel_flag(self, mock_context: MagicMock) -> None: + """Cancel flag is cleaned up after execution.""" + from aiocache import caches + + cache = caches.get("default") + + @cancellable + async def quick_func(context: RequestContext) -> str: + return "done" + + await quick_func(mock_context) + + flag = await cache.get(f"cancel:{mock_context.task_id}") + assert flag is None + + @pytest.mark.asyncio + async def test_extracts_context_from_kwargs(self, mock_context: MagicMock) -> None: + """Context can be passed as keyword argument.""" + @cancellable + async def my_func(value: int, context: RequestContext | None = None) -> int: + return value + 1 + + result = await my_func(10, context=mock_context) + + assert result == 11 + + +class TestExecute: + """Tests for the execute function.""" + + @pytest.mark.asyncio + async def test_successful_execution( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Execute completes successfully and enqueues completed task.""" + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus, + ): + await execute(mock_agent, mock_context, mock_event_queue) + + mock_agent.aexecute_task.assert_called_once() + mock_event_queue.enqueue_event.assert_called_once() + assert mock_bus.emit.call_count == 2 + + @pytest.mark.asyncio + async def test_emits_started_event( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Execute emits A2AServerTaskStartedEvent.""" + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus, + ): + await execute(mock_agent, mock_context, mock_event_queue) + + first_call = mock_bus.emit.call_args_list[0] + event = first_call[0][1] + + assert event.type == "a2a_server_task_started" + assert event.a2a_task_id == mock_context.task_id + assert event.a2a_context_id == mock_context.context_id + + @pytest.mark.asyncio + async def test_emits_completed_event( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Execute emits A2AServerTaskCompletedEvent on success.""" + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus, + ): + await execute(mock_agent, mock_context, mock_event_queue) + + second_call = mock_bus.emit.call_args_list[1] + event = second_call[0][1] + + assert event.type == "a2a_server_task_completed" + assert event.a2a_task_id == mock_context.task_id + assert event.result == "Task completed successfully" + + @pytest.mark.asyncio + async def test_emits_failed_event_on_exception( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Execute emits A2AServerTaskFailedEvent on exception.""" + mock_agent.aexecute_task = AsyncMock(side_effect=ValueError("Test error")) + + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus, + ): + with pytest.raises(Exception): + await execute(mock_agent, mock_context, mock_event_queue) + + failed_call = mock_bus.emit.call_args_list[1] + event = failed_call[0][1] + + assert event.type == "a2a_server_task_failed" + assert "Test error" in event.error + + @pytest.mark.asyncio + async def test_emits_canceled_event_on_cancellation( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Execute emits A2AServerTaskCanceledEvent on CancelledError.""" + mock_agent.aexecute_task = AsyncMock(side_effect=asyncio.CancelledError()) + + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus, + ): + with pytest.raises(asyncio.CancelledError): + await execute(mock_agent, mock_context, mock_event_queue) + + canceled_call = mock_bus.emit.call_args_list[1] + event = canceled_call[0][1] + + assert event.type == "a2a_server_task_canceled" + assert event.a2a_task_id == mock_context.task_id + + +class TestCancel: + """Tests for the cancel function.""" + + @pytest.mark.asyncio + async def test_sets_cancel_flag_in_cache( + self, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + ) -> None: + """Cancel sets the cancel flag in cache.""" + from aiocache import caches + + cache = caches.get("default") + + await cancel(mock_context, mock_event_queue) + + flag = await cache.get(f"cancel:{mock_context.task_id}") + assert flag is True + + @pytest.mark.asyncio + async def test_enqueues_task_status_update_event( + self, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + ) -> None: + """Cancel enqueues TaskStatusUpdateEvent with canceled state.""" + await cancel(mock_context, mock_event_queue) + + mock_event_queue.enqueue_event.assert_called_once() + event = mock_event_queue.enqueue_event.call_args[0][0] + + assert event.task_id == mock_context.task_id + assert event.context_id == mock_context.context_id + assert event.status.state == TaskState.canceled + assert event.final is True + + @pytest.mark.asyncio + async def test_returns_none_when_no_current_task( + self, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + ) -> None: + """Cancel returns None when context has no current_task.""" + mock_context.current_task = None + + result = await cancel(mock_context, mock_event_queue) + + assert result is None + + @pytest.mark.asyncio + async def test_returns_updated_task_when_current_task_exists( + self, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + ) -> None: + """Cancel returns updated task when context has current_task.""" + current_task = MagicMock(spec=A2ATask) + current_task.status = TaskStatus(state=TaskState.working) + mock_context.current_task = current_task + + result = await cancel(mock_context, mock_event_queue) + + assert result is current_task + assert result.status.state == TaskState.canceled + + @pytest.mark.asyncio + async def test_cleanup_after_cancel( + self, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + ) -> None: + """Cancel flag persists for cancellable decorator to detect.""" + from aiocache import caches + + cache = caches.get("default") + + await cancel(mock_context, mock_event_queue) + + flag = await cache.get(f"cancel:{mock_context.task_id}") + assert flag is True + + await cache.delete(f"cancel:{mock_context.task_id}") + + +class TestExecuteAndCancelIntegration: + """Integration tests for execute and cancel working together.""" + + @pytest.mark.asyncio + async def test_cancel_stops_running_execute( + self, + mock_agent: MagicMock, + mock_context: MagicMock, + mock_event_queue: AsyncMock, + mock_task: MagicMock, + ) -> None: + """Calling cancel stops a running execute.""" + async def slow_task(**kwargs: Any) -> str: + await asyncio.sleep(2.0) + return "should not complete" + + mock_agent.aexecute_task = slow_task + + with ( + patch("crewai.a2a.utils.task.Task", return_value=mock_task), + patch("crewai.a2a.utils.task.crewai_event_bus"), + ): + execute_task = asyncio.create_task( + execute(mock_agent, mock_context, mock_event_queue) + ) + + await asyncio.sleep(0.1) + await cancel(mock_context, mock_event_queue) + + with pytest.raises(asyncio.CancelledError): + await execute_task \ No newline at end of file From 8f022be1063688cb6cad79127f394250fc7f80e7 Mon Sep 17 00:00:00 2001 From: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Date: Wed, 14 Jan 2026 20:49:14 -0800 Subject: [PATCH 4/4] feat: bump versions to 1.8.1 (#4242) * feat: bump versions to 1.8.1 * bump bump --- lib/crewai-tools/pyproject.toml | 2 +- lib/crewai-tools/src/crewai_tools/__init__.py | 2 +- lib/crewai/pyproject.toml | 2 +- lib/crewai/src/crewai/__init__.py | 2 +- lib/crewai/src/crewai/cli/templates/crew/pyproject.toml | 2 +- lib/crewai/src/crewai/cli/templates/flow/pyproject.toml | 2 +- lib/devtools/src/crewai_devtools/__init__.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/crewai-tools/pyproject.toml b/lib/crewai-tools/pyproject.toml index 682f7c48f..87bfa6ddc 100644 --- a/lib/crewai-tools/pyproject.toml +++ b/lib/crewai-tools/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "pytube~=15.0.0", "requests~=2.32.5", "docker~=7.1.0", - "crewai==1.8.0", + "crewai==1.8.1", "lancedb~=0.5.4", "tiktoken~=0.8.0", "beautifulsoup4~=4.13.4", diff --git a/lib/crewai-tools/src/crewai_tools/__init__.py b/lib/crewai-tools/src/crewai_tools/__init__.py index a88e2eb52..d70fba254 100644 --- a/lib/crewai-tools/src/crewai_tools/__init__.py +++ b/lib/crewai-tools/src/crewai_tools/__init__.py @@ -291,4 +291,4 @@ __all__ = [ "ZapierActionTools", ] -__version__ = "1.8.0" +__version__ = "1.8.1" diff --git a/lib/crewai/pyproject.toml b/lib/crewai/pyproject.toml index 2c22c84f3..d00340e9f 100644 --- a/lib/crewai/pyproject.toml +++ b/lib/crewai/pyproject.toml @@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI" [project.optional-dependencies] tools = [ - "crewai-tools==1.8.0", + "crewai-tools==1.8.1", ] embeddings = [ "tiktoken~=0.8.0" diff --git a/lib/crewai/src/crewai/__init__.py b/lib/crewai/src/crewai/__init__.py index 2ba698a2a..f11433cb9 100644 --- a/lib/crewai/src/crewai/__init__.py +++ b/lib/crewai/src/crewai/__init__.py @@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None: _suppress_pydantic_deprecation_warnings() -__version__ = "1.8.0" +__version__ = "1.8.1" _telemetry_submitted = False diff --git a/lib/crewai/src/crewai/cli/templates/crew/pyproject.toml b/lib/crewai/src/crewai/cli/templates/crew/pyproject.toml index df21ce3e4..18e6829fc 100644 --- a/lib/crewai/src/crewai/cli/templates/crew/pyproject.toml +++ b/lib/crewai/src/crewai/cli/templates/crew/pyproject.toml @@ -5,7 +5,7 @@ description = "{{name}} using crewAI" authors = [{ name = "Your Name", email = "you@example.com" }] requires-python = ">=3.10,<3.14" dependencies = [ - "crewai[tools]==1.8.0" + "crewai[tools]==1.8.1" ] [project.scripts] diff --git a/lib/crewai/src/crewai/cli/templates/flow/pyproject.toml b/lib/crewai/src/crewai/cli/templates/flow/pyproject.toml index 928d7048e..1addffcfd 100644 --- a/lib/crewai/src/crewai/cli/templates/flow/pyproject.toml +++ b/lib/crewai/src/crewai/cli/templates/flow/pyproject.toml @@ -5,7 +5,7 @@ description = "{{name}} using crewAI" authors = [{ name = "Your Name", email = "you@example.com" }] requires-python = ">=3.10,<3.14" dependencies = [ - "crewai[tools]==1.8.0" + "crewai[tools]==1.8.1" ] [project.scripts] diff --git a/lib/devtools/src/crewai_devtools/__init__.py b/lib/devtools/src/crewai_devtools/__init__.py index 202b40db8..c83abc077 100644 --- a/lib/devtools/src/crewai_devtools/__init__.py +++ b/lib/devtools/src/crewai_devtools/__init__.py @@ -1,3 +1,3 @@ """CrewAI development tools.""" -__version__ = "1.8.0" +__version__ = "1.8.1"