diff --git a/lib/crewai/src/crewai/a2a/templates.py b/lib/crewai/src/crewai/a2a/templates.py
index 16f0c479e..893d0dfd9 100644
--- a/lib/crewai/src/crewai/a2a/templates.py
+++ b/lib/crewai/src/crewai/a2a/templates.py
@@ -1,23 +1,20 @@
-"""String templates for A2A (Agent-to-Agent) protocol messaging and status."""
+"""String templates for A2A (Agent-to-Agent) delegation prompts."""
from string import Template
from typing import Final
AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template(
- "\n\n $available_a2a_agents\n\n"
-)
-PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template(
- "\n\n"
- " $previous_a2a_conversation"
- "\n\n"
-)
-CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template(
- "\n\n"
- ' turn="$turn_count"\n'
- ' max_turns="$max_turns"\n'
- " $warning"
- "\n\n"
+ "\n\n"
+ "You can delegate to remote agents using the delegate_to_* tools below. "
+ "Each tool's description lists the remote agent's capabilities — call the "
+ "tool whose capabilities best match the task. Pass the question or sub-task "
+ "to the remote agent via the tool's `message` argument; the tool returns "
+ "the remote agent's response, which you should incorporate into your final "
+ "answer. If the available agents are not a good fit, answer directly "
+ "without calling a delegation tool.\n\n"
+ " $available_a2a_agents"
+ "\n\n"
)
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
"\n\n"
@@ -27,29 +24,3 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
" $unavailable_agents"
"\n\n"
)
-REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
-
-STATUS: COMPLETED
-The remote agent has finished processing your request. Their response is in the conversation history above.
-You MUST now:
-1. Extract the answer from the conversation history
-2. Set is_a2a=false
-3. Return the answer as your final message
-DO NOT send another request - the task is already done.
-
-"""
-
-REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """
-
-STATUS: RESPONSE_RECEIVED
-The remote agent has responded. Their response is in the conversation history above.
-
-You MUST now:
-1. Set is_a2a=false (the remote task is complete and cannot receive more messages)
-2. Provide YOUR OWN response to the original task based on the information received
-
-IMPORTANT: Your response should be addressed to the USER who gave you the original task.
-Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that...").
-Do NOT address the remote agent directly or use "you" to refer to them.
-
-"""
diff --git a/lib/crewai/src/crewai/a2a/tools.py b/lib/crewai/src/crewai/a2a/tools.py
new file mode 100644
index 000000000..200eeb1e8
--- /dev/null
+++ b/lib/crewai/src/crewai/a2a/tools.py
@@ -0,0 +1,394 @@
+"""Tool-based A2A delegation.
+
+Each remote A2A agent is exposed to the local LLM as a BaseTool. The local
+agent's normal tool-call loop drives multi-turn delegation: each tool call is
+one turn against the remote agent. Per-endpoint conversation state lives in
+``A2ADelegationState`` and is shared across the tools built for a single task.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any
+
+from a2a.types import Role, TaskState
+from pydantic import BaseModel, Field, PrivateAttr
+
+from crewai.a2a.config import A2AClientConfig, A2AConfig
+from crewai.a2a.extensions.base import (
+ A2AExtension,
+ ConversationState,
+ ExtensionRegistry,
+)
+from crewai.a2a.task_helpers import TaskStateResult
+from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation
+from crewai.events.event_bus import crewai_event_bus
+from crewai.events.types.a2a_events import A2AConversationCompletedEvent
+from crewai.tools.base_tool import BaseTool
+from crewai.utilities.string_utils import sanitize_tool_name
+
+
+if TYPE_CHECKING:
+ from a2a.types import AgentCard, Message
+
+ from crewai.task import Task
+
+
+_DELEGATE_PREFIX = "delegate_to_"
+
+
+@dataclass
+class _EndpointState:
+ """Mutable per-endpoint conversation state across tool calls."""
+
+ conversation_history: list[Message] = field(default_factory=list)
+ context_id: str | None = None
+ task_id: str | None = None
+ reference_task_ids: list[str] = field(default_factory=list)
+ turn_count: int = 0
+
+
+@dataclass
+class A2ADelegationState:
+ """State shared across all A2A delegation tools for a single task execution."""
+
+ agent: Any
+ task: Task
+ extension_registry: ExtensionRegistry | None = None
+ _per_endpoint: dict[str, _EndpointState] = field(default_factory=dict)
+
+ def _state_for(self, endpoint: str) -> _EndpointState:
+ return self._per_endpoint.setdefault(endpoint, _EndpointState())
+
+ def _initial_ids_from_task(self, state: _EndpointState) -> None:
+ if state.turn_count > 0:
+ return
+ task_config = self.task.config or {}
+ if state.context_id is None:
+ state.context_id = task_config.get("context_id")
+ if state.task_id is None:
+ state.task_id = task_config.get("task_id")
+ if not state.reference_task_ids:
+ state.reference_task_ids = list(task_config.get("reference_task_ids", []))
+
+ def delegate(
+ self,
+ config: A2AConfig | A2AClientConfig,
+ agent_card: AgentCard | None,
+ message: str,
+ ) -> str:
+ """Run one delegation turn against ``config.endpoint``.
+
+ Returns the remote agent's response text, suitable for handing back to
+ the local LLM as a tool result.
+ """
+ return _run_delegation(self, config, agent_card, message, sync=True)
+
+ async def adelegate(
+ self,
+ config: A2AConfig | A2AClientConfig,
+ agent_card: AgentCard | None,
+ message: str,
+ ) -> str:
+ """Async variant of :meth:`delegate`."""
+ return await _run_delegation_async(self, config, agent_card, message)
+
+
+class _A2ADelegationArgs(BaseModel):
+ """Argument schema for A2A delegation tools."""
+
+ message: str = Field(
+ ...,
+ description=(
+ "The question or task to send to the remote agent. Be specific and "
+ "self-contained: the remote agent does not see your other tools or "
+ "your prior reasoning."
+ ),
+ )
+
+
+class A2ADelegationTool(BaseTool):
+ """BaseTool that delegates one turn of conversation to a remote A2A agent.
+
+ Each instance is bound to a specific A2A endpoint via ``_config``. Calling
+ ``_run`` or ``_arun`` advances that endpoint's conversation by one turn and
+ returns the remote agent's response text.
+ """
+
+ args_schema: type[BaseModel] = _A2ADelegationArgs
+
+ _config: A2AConfig | A2AClientConfig = PrivateAttr()
+ _agent_card: AgentCard | None = PrivateAttr(default=None)
+ _state: A2ADelegationState = PrivateAttr()
+
+ def _run(self, message: str) -> str:
+ return self._state.delegate(self._config, self._agent_card, message)
+
+ async def _arun(self, message: str) -> str:
+ return await self._state.adelegate(self._config, self._agent_card, message)
+
+
+def build_a2a_tools(
+ a2a_agents: list[A2AConfig | A2AClientConfig],
+ agent_cards: dict[str, AgentCard],
+ state: A2ADelegationState,
+) -> list[BaseTool]:
+ """Build one ``A2ADelegationTool`` per available A2A agent.
+
+ Tool names collide-disambiguate with a numeric suffix; agents whose cards
+ failed to fetch are skipped.
+ """
+ tools: list[BaseTool] = []
+ used_names: set[str] = set()
+ for config in a2a_agents:
+ card = agent_cards.get(config.endpoint)
+ if card is None:
+ continue
+ name = _build_tool_name(card.name or "remote_agent", used_names)
+ used_names.add(name)
+ tool = A2ADelegationTool(
+ name=name,
+ description=_build_tool_description(card),
+ max_usage_count=config.max_turns,
+ )
+ tool._config = config
+ tool._agent_card = card
+ tool._state = state
+ tools.append(tool)
+ return tools
+
+
+def _build_tool_name(card_name: str, used: set[str]) -> str:
+ base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}")
+ if base not in used:
+ return base
+ for i in range(2, 1000):
+ candidate = sanitize_tool_name(f"{base}_{i}")
+ if candidate not in used:
+ return candidate
+ raise ValueError(f"Could not generate unique tool name for {card_name!r}")
+
+
+def _build_tool_description(card: AgentCard) -> str:
+ lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."]
+ if card.description:
+ lines.append(card.description.strip())
+ if card.skills:
+ skill_names = ", ".join(s.name for s in card.skills if s.name)
+ if skill_names:
+ lines.append(f"Capabilities: {skill_names}.")
+ lines.append(
+ "Use this tool only when the question matches the agent's capabilities. "
+ "After receiving a response, prefer answering directly unless you need "
+ "another round-trip."
+ )
+ return "\n".join(lines)
+
+
+def _run_delegation(
+ state: A2ADelegationState,
+ config: A2AConfig | A2AClientConfig,
+ agent_card: AgentCard | None,
+ message: str,
+ *,
+ sync: bool,
+) -> str:
+ endpoint_state = state._state_for(config.endpoint)
+ state._initial_ids_from_task(endpoint_state)
+
+ extension_states = _extract_extension_states(state, endpoint_state)
+ metadata = _merged_metadata(state, endpoint_state, extension_states)
+ agent_branch, accepted_output_modes = _turn_context(config)
+
+ a2a_result = execute_a2a_delegation(
+ endpoint=config.endpoint,
+ auth=config.auth,
+ timeout=config.timeout,
+ task_description=message,
+ context_id=endpoint_state.context_id,
+ task_id=endpoint_state.task_id,
+ reference_task_ids=endpoint_state.reference_task_ids,
+ metadata=metadata or None,
+ extensions=(state.task.config or {}).get("extensions"),
+ conversation_history=endpoint_state.conversation_history,
+ agent_id=config.endpoint,
+ agent_role=Role.user,
+ agent_branch=agent_branch,
+ response_model=config.response_model,
+ turn_number=endpoint_state.turn_count + 1,
+ updates=config.updates,
+ transport=config.transport,
+ from_task=state.task,
+ from_agent=state.agent,
+ client_extensions=getattr(config, "extensions", None),
+ accepted_output_modes=accepted_output_modes,
+ input_files=state.task.input_files,
+ )
+ return _finalize_turn(
+ state, endpoint_state, config, agent_card, a2a_result, extension_states
+ )
+
+
+async def _run_delegation_async(
+ state: A2ADelegationState,
+ config: A2AConfig | A2AClientConfig,
+ agent_card: AgentCard | None,
+ message: str,
+) -> str:
+ endpoint_state = state._state_for(config.endpoint)
+ state._initial_ids_from_task(endpoint_state)
+
+ extension_states = _extract_extension_states(state, endpoint_state)
+ metadata = _merged_metadata(state, endpoint_state, extension_states)
+ agent_branch, accepted_output_modes = _turn_context(config)
+
+ a2a_result = await aexecute_a2a_delegation(
+ endpoint=config.endpoint,
+ auth=config.auth,
+ timeout=config.timeout,
+ task_description=message,
+ context_id=endpoint_state.context_id,
+ task_id=endpoint_state.task_id,
+ reference_task_ids=endpoint_state.reference_task_ids,
+ metadata=metadata or None,
+ extensions=(state.task.config or {}).get("extensions"),
+ conversation_history=endpoint_state.conversation_history,
+ agent_id=config.endpoint,
+ agent_role=Role.user,
+ agent_branch=agent_branch,
+ response_model=config.response_model,
+ turn_number=endpoint_state.turn_count + 1,
+ updates=config.updates,
+ transport=config.transport,
+ from_task=state.task,
+ from_agent=state.agent,
+ client_extensions=getattr(config, "extensions", None),
+ accepted_output_modes=accepted_output_modes,
+ input_files=state.task.input_files,
+ )
+ return _finalize_turn(
+ state, endpoint_state, config, agent_card, a2a_result, extension_states
+ )
+
+
+def _extract_extension_states(
+ state: A2ADelegationState,
+ endpoint_state: _EndpointState,
+) -> dict[type[A2AExtension], ConversationState]:
+ if state.extension_registry and endpoint_state.conversation_history:
+ return state.extension_registry.extract_all_states(
+ endpoint_state.conversation_history
+ )
+ return {}
+
+
+def _merged_metadata(
+ state: A2ADelegationState,
+ endpoint_state: _EndpointState,
+ extension_states: dict[type[A2AExtension], ConversationState],
+) -> dict[str, Any]:
+ task_config = state.task.config or {}
+ metadata: dict[str, Any] = dict(task_config.get("metadata") or {})
+ if state.extension_registry and extension_states:
+ metadata.update(state.extension_registry.prepare_all_metadata(extension_states))
+ return metadata
+
+
+def _turn_context(
+ config: A2AConfig | A2AClientConfig,
+) -> tuple[Any | None, list[str] | None]:
+ console_formatter = getattr(crewai_event_bus, "_console", None)
+ agent_branch = None
+ if console_formatter:
+ agent_branch = getattr(
+ console_formatter, "current_agent_branch", None
+ ) or getattr(console_formatter, "current_task_branch", None)
+
+ accepted_output_modes = None
+ if isinstance(config, A2AClientConfig):
+ accepted_output_modes = config.accepted_output_modes
+ return agent_branch, accepted_output_modes
+
+
+def _finalize_turn(
+ state: A2ADelegationState,
+ endpoint_state: _EndpointState,
+ config: A2AConfig | A2AClientConfig,
+ agent_card: AgentCard | None,
+ a2a_result: TaskStateResult,
+ extension_states: dict[type[A2AExtension], ConversationState],
+) -> str:
+ endpoint_state.conversation_history = list(a2a_result.get("history", []))
+ if endpoint_state.conversation_history:
+ latest = endpoint_state.conversation_history[-1]
+ if latest.task_id is not None:
+ endpoint_state.task_id = latest.task_id
+ if latest.context_id is not None:
+ endpoint_state.context_id = latest.context_id
+
+ endpoint_state.turn_count += 1
+ status = a2a_result.get("status")
+
+ if status == TaskState.completed:
+ if (
+ endpoint_state.task_id is not None
+ and endpoint_state.task_id not in endpoint_state.reference_task_ids
+ ):
+ endpoint_state.reference_task_ids.append(endpoint_state.task_id)
+ if state.task.config is None:
+ state.task.config = {}
+ state.task.config["reference_task_ids"] = list(
+ endpoint_state.reference_task_ids
+ )
+ endpoint_state.task_id = None
+
+ result_text = str(a2a_result.get("result", ""))
+ crewai_event_bus.emit(
+ None,
+ A2AConversationCompletedEvent(
+ status="completed",
+ final_result=result_text,
+ error=None,
+ total_turns=endpoint_state.turn_count,
+ from_task=state.task,
+ from_agent=state.agent,
+ endpoint=config.endpoint,
+ a2a_agent_name=agent_card.name if agent_card else None,
+ agent_card=agent_card.model_dump() if agent_card else None,
+ ),
+ )
+ return _apply_response_extensions(state, result_text, extension_states)
+
+ if status == TaskState.input_required:
+ result_text = str(a2a_result.get("result", ""))
+ return _apply_response_extensions(state, result_text, extension_states)
+
+ error_msg = a2a_result.get("error", "Unknown error")
+ crewai_event_bus.emit(
+ None,
+ A2AConversationCompletedEvent(
+ status="failed",
+ final_result=None,
+ error=error_msg,
+ total_turns=endpoint_state.turn_count,
+ from_task=state.task,
+ from_agent=state.agent,
+ endpoint=config.endpoint,
+ a2a_agent_name=agent_card.name if agent_card else None,
+ agent_card=agent_card.model_dump() if agent_card else None,
+ ),
+ )
+ return f"Remote agent error: {error_msg}"
+
+
+def _apply_response_extensions(
+ state: A2ADelegationState,
+ response_text: str,
+ extension_states: dict[type[A2AExtension], ConversationState],
+) -> str:
+ if not state.extension_registry:
+ return response_text
+ processed = state.extension_registry.process_response_with_all(
+ response_text, extension_states
+ )
+ return processed if isinstance(processed, str) else str(processed)
diff --git a/lib/crewai/src/crewai/a2a/types.py b/lib/crewai/src/crewai/a2a/types.py
index 5b06f8b8b..a5bb63149 100644
--- a/lib/crewai/src/crewai/a2a/types.py
+++ b/lib/crewai/src/crewai/a2a/types.py
@@ -6,8 +6,6 @@ from typing import (
Annotated,
Any,
Literal,
- Protocol,
- runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
@@ -57,15 +55,6 @@ Url = Annotated[
]
-@runtime_checkable
-class AgentResponseProtocol(Protocol):
- """Protocol for the dynamically created AgentResponse model."""
-
- a2a_ids: tuple[str, ...]
- message: str
- is_a2a: bool
-
-
class PartsMetadataDict(TypedDict, total=False):
"""Metadata for A2A message parts.
diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py
index 1359e2f10..b95955143 100644
--- a/lib/crewai/src/crewai/a2a/utils/response_model.py
+++ b/lib/crewai/src/crewai/a2a/utils/response_model.py
@@ -1,75 +1,25 @@
-"""Response model utilities for A2A agent interactions."""
+"""Helpers for extracting A2A client configurations."""
from __future__ import annotations
from typing import TypeAlias
-from pydantic import BaseModel, Field, create_model
-
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
-from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
-def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
- """Create a dynamic AgentResponse model with Literal types for agent IDs.
-
- Args:
- agent_ids: List of available A2A agent IDs.
-
- Returns:
- Dynamically created Pydantic model with Literal-constrained a2a_ids field,
- or None if agent_ids is empty.
- """
- if not agent_ids:
- return None
-
- DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
-
- return create_model(
- "AgentResponse",
- a2a_ids=(
- tuple[DynamicLiteral, ...], # type: ignore[valid-type]
- Field(
- default_factory=tuple,
- max_length=len(agent_ids),
- description="A2A agent IDs to delegate to.",
- ),
- ),
- message=(
- str,
- Field(
- description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
- ),
- ),
- is_a2a=(
- bool,
- Field(
- description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
- ),
- ),
- __base__=BaseModel,
- )
-
-
-def extract_a2a_agent_ids_from_config(
+def extract_a2a_client_configs(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
-) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
- """Extract A2A agent IDs from A2A configuration.
+) -> list[A2AClientConfigTypes]:
+ """Return the client-side A2A configs from a possibly-mixed config list.
- Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
-
- Args:
- a2a_config: A2A configuration (any type).
-
- Returns:
- Tuple of client A2A configs list and agent endpoint IDs.
+ Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to.
"""
if a2a_config is None:
- return [], ()
+ return []
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
@@ -77,24 +27,6 @@ def extract_a2a_agent_ids_from_config(
else:
configs = a2a_config
- client_configs: list[A2AClientConfigTypes] = [
+ return [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
-
- return client_configs, tuple(config.endpoint for config in client_configs)
-
-
-def get_a2a_agents_and_response_model(
- a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
-) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
- """Get A2A agent configs and response model.
-
- Args:
- a2a_config: A2A configuration (any type).
-
- Returns:
- Tuple of client A2A configs and response model.
- """
- a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
-
- return a2a_agents, create_agent_response_model(agent_ids)
diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py
index 0ec7fc6ae..03f60993f 100644
--- a/lib/crewai/src/crewai/a2a/wrapper.py
+++ b/lib/crewai/src/crewai/a2a/wrapper.py
@@ -1,108 +1,51 @@
"""A2A agent wrapping logic for metaclass integration.
-Wraps agent classes with A2A delegation capabilities.
+Wraps agent classes with A2A delegation capabilities. Each remote A2A agent
+is exposed to the local LLM as a BaseTool; the local agent's tool-call loop
+drives multi-turn delegation.
"""
from __future__ import annotations
import asyncio
-from collections.abc import Callable, Coroutine, Mapping
+from collections.abc import Callable, Coroutine, Iterator, Mapping
from concurrent.futures import ThreadPoolExecutor, as_completed
+import contextlib
import contextvars
from functools import wraps
import json
from types import MethodType
-from typing import TYPE_CHECKING, Any, NamedTuple
-
-from a2a.types import Role, TaskState
-from pydantic import BaseModel, ValidationError
+from typing import TYPE_CHECKING, Any
from crewai.a2a.config import A2AClientConfig, A2AConfig
-from crewai.a2a.extensions.base import (
- A2AExtension,
- ConversationState,
- ExtensionRegistry,
-)
-from crewai.a2a.task_helpers import TaskStateResult
+from crewai.a2a.extensions.base import ExtensionRegistry
from crewai.a2a.templates import (
AVAILABLE_AGENTS_TEMPLATE,
- CONVERSATION_TURN_INFO_TEMPLATE,
- PREVIOUS_A2A_CONVERSATION_TEMPLATE,
- REMOTE_AGENT_RESPONSE_NOTICE,
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
)
-from crewai.a2a.types import AgentResponseProtocol
+from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
from crewai.a2a.utils.agent_card import (
afetch_agent_card,
fetch_agent_card,
inject_a2a_server_methods,
)
-from crewai.a2a.utils.delegation import (
- aexecute_a2a_delegation,
- execute_a2a_delegation,
-)
-from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
-from crewai.events.event_bus import crewai_event_bus
-from crewai.events.types.a2a_events import (
- A2AConversationCompletedEvent,
- A2AMessageSentEvent,
-)
+from crewai.a2a.utils.response_model import extract_a2a_client_configs
from crewai.lite_agent_output import LiteAgentOutput
from crewai.task import Task
if TYPE_CHECKING:
- from a2a.types import AgentCard, Message
+ from a2a.types import AgentCard
from crewai.agent.core import Agent
from crewai.tools.base_tool import BaseTool
-class DelegationContext(NamedTuple):
- """Context prepared for A2A delegation.
-
- Groups all the values needed to execute a delegation to a remote A2A agent.
- """
-
- a2a_agents: list[A2AConfig | A2AClientConfig]
- agent_response_model: type[BaseModel] | None
- current_request: str
- agent_id: str
- agent_config: A2AConfig | A2AClientConfig
- context_id: str | None
- task_id: str | None
- metadata: dict[str, Any] | None
- extensions: dict[str, Any] | None
- reference_task_ids: list[str]
- original_task_description: str
- max_turns: int
-
-
-class DelegationState(NamedTuple):
- """Mutable state for A2A delegation loop.
-
- Groups values that may change during delegation turns.
- """
-
- current_request: str
- context_id: str | None
- task_id: str | None
- reference_task_ids: list[str]
- conversation_history: list[Message]
- agent_card: AgentCard | None
- agent_card_dict: dict[str, Any] | None
- agent_name: str | None
-
-
def wrap_agent_with_a2a_instance(
agent: Agent, extension_registry: ExtensionRegistry | None = None
) -> None:
"""Wrap an agent instance's task execution and kickoff methods with A2A support.
- This function modifies the agent instance by wrapping its execute_task,
- aexecute_task, kickoff, and kickoff_async methods to add A2A delegation
- capabilities. Should only be called when the agent has a2a configuration set.
-
Args:
agent: The agent instance to wrap.
extension_registry: Optional registry of A2A extensions.
@@ -126,14 +69,13 @@ def wrap_agent_with_a2a_instance(
if not self.a2a:
return original_execute_task(self, task, context, tools) # type: ignore[no-any-return]
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
+ a2a_agents = extract_a2a_client_configs(self.a2a)
return _execute_task_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=original_execute_task,
task=task,
- agent_response_model=agent_response_model,
context=context,
tools=tools,
extension_registry=extension_registry,
@@ -150,14 +92,13 @@ def wrap_agent_with_a2a_instance(
if not self.a2a:
return await original_aexecute_task(self, task, context, tools) # type: ignore[no-any-return]
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
+ a2a_agents = extract_a2a_client_configs(self.a2a)
return await _aexecute_task_with_a2a(
self=self,
a2a_agents=a2a_agents,
original_fn=original_aexecute_task,
task=task,
- agent_response_model=agent_response_model,
context=context,
tools=tools,
extension_registry=extension_registry,
@@ -182,8 +123,7 @@ def wrap_agent_with_a2a_instance(
if not self.a2a:
return original_kickoff(self, messages, response_format, input_files)
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
-
+ a2a_agents = extract_a2a_client_configs(self.a2a)
if not a2a_agents:
return original_kickoff(self, messages, response_format, input_files)
@@ -194,7 +134,6 @@ def wrap_agent_with_a2a_instance(
messages=messages,
response_format=response_format,
input_files=input_files,
- agent_response_model=agent_response_model,
extension_registry=extension_registry,
)
@@ -211,8 +150,7 @@ def wrap_agent_with_a2a_instance(
self, messages, response_format, input_files
)
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
-
+ a2a_agents = extract_a2a_client_configs(self.a2a)
if not a2a_agents:
return await original_kickoff_async(
self, messages, response_format, input_files
@@ -225,7 +163,6 @@ def wrap_agent_with_a2a_instance(
messages=messages,
response_format=response_format,
input_files=input_files,
- agent_response_model=agent_response_model,
extension_registry=extension_registry,
)
@@ -240,14 +177,7 @@ def wrap_agent_with_a2a_instance(
def _fetch_card_from_config(
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
- """Fetch agent card from A2A config.
-
- Args:
- config: A2A configuration
-
- Returns:
- Tuple of (config, card or exception)
- """
+ """Fetch an agent card synchronously, capturing any exception."""
try:
card = fetch_agent_card(
endpoint=config.endpoint,
@@ -262,14 +192,7 @@ def _fetch_card_from_config(
def _fetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
- """Fetch agent cards concurrently for multiple A2A agents.
-
- Args:
- a2a_agents: List of A2A agent configurations
-
- Returns:
- Tuple of (agent_cards dict, failed_agents dict mapping endpoint to error message)
- """
+ """Fetch agent cards concurrently for multiple A2A agents."""
agent_cards: dict[str, AgentCard] = {}
failed_agents: dict[str, str] = {}
@@ -299,1129 +222,10 @@ def _fetch_agent_cards_concurrently(
return agent_cards, failed_agents
-def _execute_task_with_a2a(
- self: Agent,
- a2a_agents: list[A2AConfig | A2AClientConfig],
- original_fn: Callable[..., str],
- task: Task,
- agent_response_model: type[BaseModel] | None,
- context: str | None,
- tools: list[BaseTool] | None,
- extension_registry: ExtensionRegistry,
-) -> str:
- """Wrap execute_task with A2A delegation logic.
-
- Args:
- self: The agent instance
- a2a_agents: Dictionary of A2A agent configurations
- original_fn: The original execute_task method
- task: The task to execute
- context: Optional context for task execution
- tools: Optional tools available to the agent
- agent_response_model: Optional agent response model
- extension_registry: Registry of A2A extensions
-
- Returns:
- Task execution result (either from LLM or A2A agent)
- """
- original_description: str = task.description
- original_output_pydantic = task.output_pydantic
- original_response_model = task.response_model
-
- agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents)
-
- if not agent_cards and a2a_agents and failed_agents:
- unavailable_agents_text = ""
- for endpoint, error in failed_agents.items():
- unavailable_agents_text += f" - {endpoint}: {error}\n"
-
- notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute(
- unavailable_agents=unavailable_agents_text
- )
- task.description = f"{original_description}{notice}"
-
- try:
- return original_fn(self, task, context, tools)
- finally:
- task.description = original_description
-
- task.description, _, extension_states = _augment_prompt_with_a2a(
- a2a_agents=a2a_agents,
- task_description=original_description,
- agent_cards=agent_cards,
- failed_agents=failed_agents,
- extension_registry=extension_registry,
- )
- task.response_model = agent_response_model
-
- try:
- raw_result = original_fn(self, task, context, tools)
- agent_response = _parse_agent_response(
- raw_result=raw_result, agent_response_model=agent_response_model
- )
-
- if extension_registry and isinstance(agent_response, BaseModel):
- agent_response = extension_registry.process_response_with_all(
- agent_response, extension_states
- )
-
- if isinstance(agent_response, BaseModel) and isinstance(
- agent_response, AgentResponseProtocol
- ):
- if agent_response.is_a2a:
- return _delegate_to_a2a(
- self,
- agent_response=agent_response,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_cards=agent_cards,
- original_task_description=original_description,
- _extension_registry=extension_registry,
- )
- task.output_pydantic = None
- return agent_response.message
-
- return raw_result
- finally:
- task.description = original_description
- task.output_pydantic = original_output_pydantic
- task.response_model = original_response_model
-
-
-def _kickoff_with_a2a(
- self: Agent,
- a2a_agents: list[A2AConfig | A2AClientConfig],
- original_kickoff: Callable[..., LiteAgentOutput],
- messages: str | list[Any],
- response_format: type[Any] | None,
- input_files: dict[str, Any] | None,
- agent_response_model: type[BaseModel] | None,
- extension_registry: ExtensionRegistry,
-) -> LiteAgentOutput:
- """Execute kickoff with A2A delegation support (sync).
-
- Args:
- self: The agent instance.
- a2a_agents: List of A2A agent configurations.
- original_kickoff: The original kickoff method.
- messages: Messages to send to the agent.
- response_format: Optional response format.
- input_files: Optional input files.
- agent_response_model: Optional agent response model.
- extension_registry: Registry of A2A extensions.
-
- Returns:
- LiteAgentOutput from kickoff or A2A delegation.
- """
- if isinstance(messages, str):
- description = messages
- else:
- content = next(
- (m["content"] for m in reversed(messages) if m["role"] == "user"),
- None,
- )
- description = content if isinstance(content, str) else ""
-
- if not description:
- return original_kickoff(self, messages, response_format, input_files)
-
- fake_task = Task(
- description=description,
- agent=self,
- expected_output="Result from A2A delegation",
- input_files=input_files or {},
- )
-
- agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents)
-
- if not agent_cards and a2a_agents and failed_agents:
- return original_kickoff(self, messages, response_format, input_files)
-
- fake_task.description, _, extension_states = _augment_prompt_with_a2a(
- a2a_agents=a2a_agents,
- task_description=description,
- agent_cards=agent_cards,
- failed_agents=failed_agents,
- extension_registry=extension_registry,
- )
- fake_task.response_model = agent_response_model
-
- try:
- result: LiteAgentOutput = original_kickoff(
- self, messages, agent_response_model or response_format, input_files
- )
- agent_response = _parse_agent_response(
- raw_result=result.raw, agent_response_model=agent_response_model
- )
-
- if extension_registry and isinstance(agent_response, BaseModel):
- agent_response = extension_registry.process_response_with_all(
- agent_response, extension_states
- )
-
- if isinstance(agent_response, BaseModel) and isinstance(
- agent_response, AgentResponseProtocol
- ):
- if agent_response.is_a2a:
-
- def _kickoff_adapter(
- self_: Agent,
- _task: Task,
- _context: str | None,
- _tools: list[Any] | None,
- ) -> str:
- fmt = (
- _task.response_model or agent_response_model or response_format
- )
- output: LiteAgentOutput = original_kickoff(
- self_, messages, fmt, input_files
- )
- return output.raw
-
- result_str = _delegate_to_a2a(
- self,
- agent_response=agent_response,
- task=fake_task,
- original_fn=_kickoff_adapter,
- context=None,
- tools=None,
- agent_cards=agent_cards,
- original_task_description=description,
- _extension_registry=extension_registry,
- )
- return LiteAgentOutput(
- raw=result_str,
- pydantic=None,
- agent_role=self.role,
- usage_metrics=None,
- messages=[],
- )
- return LiteAgentOutput(
- raw=agent_response.message,
- pydantic=None,
- agent_role=self.role,
- usage_metrics=result.usage_metrics,
- messages=result.messages,
- )
-
- return result
- finally:
- fake_task.description = description
-
-
-async def _akickoff_with_a2a(
- self: Agent,
- a2a_agents: list[A2AConfig | A2AClientConfig],
- original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]],
- messages: str | list[Any],
- response_format: type[Any] | None,
- input_files: dict[str, Any] | None,
- agent_response_model: type[BaseModel] | None,
- extension_registry: ExtensionRegistry,
-) -> LiteAgentOutput:
- """Execute kickoff with A2A delegation support (async).
-
- Args:
- self: The agent instance.
- a2a_agents: List of A2A agent configurations.
- original_kickoff_async: The original kickoff_async method.
- messages: Messages to send to the agent.
- response_format: Optional response format.
- input_files: Optional input files.
- agent_response_model: Optional agent response model.
- extension_registry: Registry of A2A extensions.
-
- Returns:
- LiteAgentOutput from kickoff or A2A delegation.
- """
- if isinstance(messages, str):
- description = messages
- else:
- content = next(
- (m["content"] for m in reversed(messages) if m["role"] == "user"),
- None,
- )
- description = content if isinstance(content, str) else ""
-
- if not description:
- return await original_kickoff_async(
- self, messages, response_format, input_files
- )
-
- fake_task = Task(
- description=description,
- agent=self,
- expected_output="Result from A2A delegation",
- input_files=input_files or {},
- )
-
- agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents)
-
- if not agent_cards and a2a_agents and failed_agents:
- return await original_kickoff_async(
- self, messages, response_format, input_files
- )
-
- fake_task.description, _, extension_states = _augment_prompt_with_a2a(
- a2a_agents=a2a_agents,
- task_description=description,
- agent_cards=agent_cards,
- failed_agents=failed_agents,
- extension_registry=extension_registry,
- )
- fake_task.response_model = agent_response_model
-
- try:
- result: LiteAgentOutput = await original_kickoff_async(
- self, messages, agent_response_model or response_format, input_files
- )
- agent_response = _parse_agent_response(
- raw_result=result.raw, agent_response_model=agent_response_model
- )
-
- if extension_registry and isinstance(agent_response, BaseModel):
- agent_response = extension_registry.process_response_with_all(
- agent_response, extension_states
- )
-
- if isinstance(agent_response, BaseModel) and isinstance(
- agent_response, AgentResponseProtocol
- ):
- if agent_response.is_a2a:
-
- async def _kickoff_adapter(
- self_: Agent,
- _task: Task,
- _context: str | None,
- _tools: list[Any] | None,
- ) -> str:
- fmt = (
- _task.response_model or agent_response_model or response_format
- )
- output: LiteAgentOutput = await original_kickoff_async(
- self_, messages, fmt, input_files
- )
- return output.raw
-
- result_str = await _adelegate_to_a2a(
- self,
- agent_response=agent_response,
- task=fake_task,
- original_fn=_kickoff_adapter,
- context=None,
- tools=None,
- agent_cards=agent_cards,
- original_task_description=description,
- _extension_registry=extension_registry,
- )
- return LiteAgentOutput(
- raw=result_str,
- pydantic=None,
- agent_role=self.role,
- usage_metrics=None,
- messages=[],
- )
- return LiteAgentOutput(
- raw=agent_response.message,
- pydantic=None,
- agent_role=self.role,
- usage_metrics=result.usage_metrics,
- messages=result.messages,
- )
-
- return result
- finally:
- fake_task.description = description
-
-
-def _augment_prompt_with_a2a(
- a2a_agents: list[A2AConfig | A2AClientConfig],
- task_description: str,
- agent_cards: Mapping[str, AgentCard | dict[str, Any]],
- conversation_history: list[Message] | None = None,
- turn_num: int = 0,
- max_turns: int | None = None,
- failed_agents: dict[str, str] | None = None,
- extension_registry: ExtensionRegistry | None = None,
- remote_status_notice: str = "",
-) -> tuple[str, bool, dict[type[A2AExtension], ConversationState]]:
- """Add A2A delegation instructions to prompt.
-
- Args:
- a2a_agents: Dictionary of A2A agent configurations
- task_description: Original task description
- agent_cards: dictionary mapping agent IDs to AgentCards
- conversation_history: Previous A2A Messages from conversation
- turn_num: Current turn number (0-indexed)
- max_turns: Maximum allowed turns (from config)
- failed_agents: Dictionary mapping failed agent endpoints to error messages
- extension_registry: Optional registry of A2A extensions
- remote_status_notice: Optional notice about remote agent status to append
-
- Returns:
- Tuple of (augmented prompt, disable_structured_output flag, extension_states dict)
- """
-
- if not agent_cards:
- return task_description, False, {}
-
- agents_text = ""
-
- for config in a2a_agents:
- if config.endpoint in agent_cards:
- card = agent_cards[config.endpoint]
- if isinstance(card, dict):
- filtered = {
- k: v
- for k, v in card.items()
- if k in {"description", "url", "skills"} and v is not None
- }
- agents_text += f"\n{json.dumps(filtered, indent=2)}\n"
- else:
- agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n"
-
- failed_agents = failed_agents or {}
- if failed_agents:
- agents_text += "\n\n"
- for endpoint, error in failed_agents.items():
- agents_text += f"\n\n"
-
- agents_text = AVAILABLE_AGENTS_TEMPLATE.substitute(available_a2a_agents=agents_text)
-
- history_text = ""
-
- if conversation_history:
- for msg in conversation_history:
- history_text += f"\n{msg.model_dump_json(indent=2, exclude_none=True, exclude={'message_id'})}\n"
-
- history_text = PREVIOUS_A2A_CONVERSATION_TEMPLATE.substitute(
- previous_a2a_conversation=history_text
- )
-
- extension_states = {}
- disable_structured_output = False
- if extension_registry and conversation_history:
- extension_states = extension_registry.extract_all_states(conversation_history)
- for state in extension_states.values():
- if state.is_ready():
- disable_structured_output = True
- break
- turn_info = ""
-
- if max_turns is not None and conversation_history:
- turn_count = turn_num + 1
- warning = ""
- if turn_count >= max_turns:
- warning = (
- "CRITICAL: This is the FINAL turn. You MUST conclude the conversation now.\n"
- "Set is_a2a=false and provide your final response to complete the task."
- )
- elif turn_count == max_turns - 1:
- warning = "WARNING: Next turn will be the last. Consider wrapping up the conversation."
-
- turn_info = CONVERSATION_TURN_INFO_TEMPLATE.substitute(
- turn_count=turn_count,
- max_turns=max_turns,
- warning=warning,
- )
-
- augmented_prompt = f"""{task_description}
-
-IMPORTANT: You have the ability to delegate this task to remote A2A agents.
-{agents_text}
-{history_text}{turn_info}{remote_status_notice}
-
-"""
-
- if extension_registry:
- augmented_prompt = extension_registry.augment_prompt_with_all(
- augmented_prompt, extension_states
- )
-
- return augmented_prompt, disable_structured_output, extension_states
-
-
-def _parse_agent_response(
- raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] | None
-) -> BaseModel | str | dict[str, Any]:
- """Parse LLM output as AgentResponse or return raw agent response."""
- if agent_response_model:
- try:
- if isinstance(raw_result, str):
- return agent_response_model.model_validate_json(raw_result)
- if isinstance(raw_result, dict):
- return agent_response_model.model_validate(raw_result)
- except ValidationError:
- return raw_result
- return raw_result
-
-
-def _handle_max_turns_exceeded(
- conversation_history: list[Message],
- max_turns: int,
- from_task: Any | None = None,
- from_agent: Any | None = None,
- endpoint: str | None = None,
- a2a_agent_name: str | None = None,
- agent_card: dict[str, Any] | None = None,
-) -> str:
- """Handle the case when max turns is exceeded.
-
- Shared logic for both sync and async delegation.
-
- Returns:
- Final message if found in history.
-
- Raises:
- Exception: If no final message found and max turns exceeded.
- """
- if conversation_history:
- for msg in reversed(conversation_history):
- if msg.role == Role.agent:
- text_parts = [
- part.root.text for part in msg.parts if part.root.kind == "text"
- ]
- final_message = (
- " ".join(text_parts) if text_parts else "Conversation completed"
- )
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="completed",
- final_result=final_message,
- error=None,
- total_turns=max_turns,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- return final_message
-
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="failed",
- final_result=None,
- error=f"Conversation exceeded maximum turns ({max_turns})",
- total_turns=max_turns,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})")
-
-
-def _emit_delegation_failed(
- error_msg: str,
- turn_num: int,
- from_task: Any | None,
- from_agent: Any | None,
- endpoint: str | None,
- a2a_agent_name: str | None,
- agent_card: dict[str, Any] | None,
-) -> str:
- """Emit failure event and return formatted error message."""
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="failed",
- final_result=None,
- error=error_msg,
- total_turns=turn_num + 1,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- return f"A2A delegation failed: {error_msg}"
-
-
-def _process_response_result(
- raw_result: str,
- disable_structured_output: bool,
- turn_num: int,
- agent_role: str,
- agent_response_model: type[BaseModel] | None,
- extension_registry: ExtensionRegistry | None = None,
- extension_states: dict[type[A2AExtension], ConversationState] | None = None,
- from_task: Any | None = None,
- from_agent: Any | None = None,
- endpoint: str | None = None,
- a2a_agent_name: str | None = None,
- agent_card: dict[str, Any] | None = None,
-) -> tuple[str | None, str | None]:
- """Process LLM response and determine next action.
-
- Shared logic for both sync and async handlers.
-
- Returns:
- Tuple of (final_result, next_request).
- """
- if disable_structured_output:
- final_turn_number = turn_num + 1
- result_text = str(raw_result)
- crewai_event_bus.emit(
- None,
- A2AMessageSentEvent(
- message=result_text,
- turn_number=final_turn_number,
- is_multiturn=True,
- agent_role=agent_role,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- ),
- )
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="completed",
- final_result=result_text,
- error=None,
- total_turns=final_turn_number,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- return result_text, None
-
- llm_response = _parse_agent_response(
- raw_result=raw_result, agent_response_model=agent_response_model
- )
-
- if extension_registry and isinstance(llm_response, BaseModel):
- llm_response = extension_registry.process_response_with_all(
- llm_response, extension_states or {}
- )
-
- if isinstance(llm_response, BaseModel) and isinstance(
- llm_response, AgentResponseProtocol
- ):
- if not llm_response.is_a2a:
- final_turn_number = turn_num + 1
- crewai_event_bus.emit(
- None,
- A2AMessageSentEvent(
- message=str(llm_response.message),
- turn_number=final_turn_number,
- is_multiturn=True,
- agent_role=agent_role,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- ),
- )
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="completed",
- final_result=str(llm_response.message),
- error=None,
- total_turns=final_turn_number,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- return llm_response.message, None
- return None, llm_response.message
-
- return str(raw_result), None
-
-
-def _prepare_agent_cards_dict(
- a2a_result: TaskStateResult,
- agent_id: str,
- agent_cards: Mapping[str, AgentCard | dict[str, Any]] | None,
-) -> dict[str, AgentCard | dict[str, Any]]:
- """Prepare agent cards dictionary from result and existing cards.
-
- Shared logic for both sync and async response handlers.
- """
- agent_cards_dict: dict[str, AgentCard | dict[str, Any]] = (
- dict(agent_cards) if agent_cards else {}
- )
- if "agent_card" in a2a_result and agent_id not in agent_cards_dict:
- agent_cards_dict[agent_id] = a2a_result["agent_card"]
- return agent_cards_dict
-
-
-def _init_delegation_state(
- ctx: DelegationContext,
- agent_cards: dict[str, AgentCard] | None,
-) -> DelegationState:
- """Initialize delegation state from context and agent cards.
-
- Args:
- ctx: Delegation context with config and settings.
- agent_cards: Pre-fetched agent cards.
-
- Returns:
- Initial delegation state for the conversation loop.
- """
- current_agent_card = agent_cards.get(ctx.agent_id) if agent_cards else None
- return DelegationState(
- current_request=ctx.current_request,
- context_id=ctx.context_id,
- task_id=ctx.task_id,
- reference_task_ids=list(ctx.reference_task_ids),
- conversation_history=[],
- agent_card=current_agent_card,
- agent_card_dict=current_agent_card.model_dump() if current_agent_card else None,
- agent_name=current_agent_card.name if current_agent_card else None,
- )
-
-
-def _get_turn_context(
- agent_config: A2AConfig | A2AClientConfig,
-) -> tuple[Any | None, list[str] | None]:
- """Get context for a delegation turn.
-
- Returns:
- Tuple of (agent_branch, accepted_output_modes).
- """
- console_formatter = getattr(crewai_event_bus, "_console", None)
- agent_branch = None
- if console_formatter:
- agent_branch = getattr(
- console_formatter, "current_agent_branch", None
- ) or getattr(console_formatter, "current_task_branch", None)
-
- accepted_output_modes = None
- if isinstance(agent_config, A2AClientConfig):
- accepted_output_modes = agent_config.accepted_output_modes
-
- return agent_branch, accepted_output_modes
-
-
-def _prepare_delegation_context(
- self: Agent,
- agent_response: AgentResponseProtocol,
- task: Task,
- original_task_description: str | None,
-) -> DelegationContext:
- """Prepare delegation context from agent response and task.
-
- Shared logic for both sync and async delegation.
-
- Returns:
- DelegationContext with all values needed for delegation.
- """
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
- agent_ids = tuple(config.endpoint for config in a2a_agents)
- current_request = str(agent_response.message)
-
- if not a2a_agents:
- raise ValueError("No A2A agents configured for delegation")
-
- if isinstance(agent_response, AgentResponseProtocol) and agent_response.a2a_ids:
- agent_id = agent_response.a2a_ids[0]
- else:
- agent_id = agent_ids[0]
-
- if agent_id not in agent_ids:
- raise ValueError(f"Unknown A2A agent ID: {agent_id} not in {agent_ids}")
-
- agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents), None)
- if agent_config is None:
- raise ValueError(f"Agent configuration not found for endpoint: {agent_id}")
- task_config = task.config or {}
-
- if original_task_description is None:
- original_task_description = task.description
-
- return DelegationContext(
- a2a_agents=a2a_agents,
- agent_response_model=agent_response_model,
- current_request=current_request,
- agent_id=agent_id,
- agent_config=agent_config,
- context_id=task_config.get("context_id"),
- task_id=task_config.get("task_id"),
- metadata=task_config.get("metadata"),
- extensions=task_config.get("extensions"),
- reference_task_ids=task_config.get("reference_task_ids", []),
- original_task_description=original_task_description,
- max_turns=agent_config.max_turns,
- )
-
-
-def _handle_task_completion(
- a2a_result: TaskStateResult,
- task: Task,
- task_id_config: str | None,
- reference_task_ids: list[str],
- agent_config: A2AConfig | A2AClientConfig,
- turn_num: int,
- from_task: Any | None = None,
- from_agent: Any | None = None,
- endpoint: str | None = None,
- a2a_agent_name: str | None = None,
- agent_card: dict[str, Any] | None = None,
-) -> tuple[str | None, str | None, list[str], str]:
- """Handle task completion state including reference task updates.
-
- When a remote task completes, this function:
- 1. Adds the completed task_id to reference_task_ids (if not already present)
- 2. Clears task_id_config to signal that a new task ID should be generated for next turn
- 3. Updates task.config with the reference list for subsequent A2A calls
-
- The reference_task_ids list tracks all completed tasks in this conversation chain,
- allowing the remote agent to maintain context across multi-turn interactions.
-
- Shared logic for both sync and async delegation.
-
- Args:
- a2a_result: Result from A2A delegation containing task status.
- task: CrewAI Task object to update with reference IDs.
- task_id_config: Current task ID (will be added to references if task completed).
- reference_task_ids: Mutable list of completed task IDs (updated in place).
- agent_config: A2A configuration with trust settings.
- turn_num: Current turn number.
- from_task: Optional CrewAI Task for event metadata.
- from_agent: Optional CrewAI Agent for event metadata.
- endpoint: A2A endpoint URL.
- a2a_agent_name: Name of remote A2A agent.
- agent_card: Agent card dict for event metadata.
-
- Returns:
- Tuple of (result_if_trusted, updated_task_id, updated_reference_task_ids, remote_notice).
- - result_if_trusted: Final result if trust_remote_completion_status=True, else None
- - updated_task_id: None (cleared to generate new ID for next turn)
- - updated_reference_task_ids: The mutated list with completed task added
- - remote_notice: Template notice about remote agent response
- """
- remote_notice = ""
- if a2a_result["status"] == TaskState.completed:
- remote_notice = REMOTE_AGENT_RESPONSE_NOTICE
-
- if task_id_config is not None and task_id_config not in reference_task_ids:
- reference_task_ids.append(task_id_config)
-
- if task.config is None:
- task.config = {}
- task.config["reference_task_ids"] = list(reference_task_ids)
-
- task_id_config = None
-
- if agent_config.trust_remote_completion_status:
- result_text = a2a_result.get("result", "")
- final_turn_number = turn_num + 1
- crewai_event_bus.emit(
- None,
- A2AConversationCompletedEvent(
- status="completed",
- final_result=result_text,
- error=None,
- total_turns=final_turn_number,
- from_task=from_task,
- from_agent=from_agent,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- ),
- )
- return str(result_text), task_id_config, reference_task_ids, remote_notice
-
- return None, task_id_config, reference_task_ids, remote_notice
-
-
-def _handle_agent_response_and_continue(
- self: Agent,
- a2a_result: TaskStateResult,
- agent_id: str,
- agent_cards: dict[str, AgentCard] | None,
- a2a_agents: list[A2AConfig | A2AClientConfig],
- original_task_description: str,
- conversation_history: list[Message],
- turn_num: int,
- max_turns: int,
- task: Task,
- original_fn: Callable[..., str],
- context: str | None,
- tools: list[BaseTool] | None,
- agent_response_model: type[BaseModel] | None,
- extension_registry: ExtensionRegistry | None = None,
- remote_status_notice: str = "",
- endpoint: str | None = None,
- a2a_agent_name: str | None = None,
- agent_card: dict[str, Any] | None = None,
-) -> tuple[str | None, str | None]:
- """Handle A2A result and get CrewAI agent's response.
-
- Args:
- self: The agent instance
- a2a_result: Result from A2A delegation
- agent_id: ID of the A2A agent
- agent_cards: Pre-fetched agent cards
- a2a_agents: List of A2A configurations
- original_task_description: Original task description
- conversation_history: Conversation history
- turn_num: Current turn number
- max_turns: Maximum turns allowed
- task: The task being executed
- original_fn: Original execute_task method
- context: Optional context
- tools: Optional tools
- agent_response_model: Response model for parsing
-
- Returns:
- Tuple of (final_result, current_request) where:
- - final_result is not None if conversation should end
- - current_request is the next message to send if continuing
- """
- agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards)
-
- (
- task.description,
- disable_structured_output,
- extension_states,
- ) = _augment_prompt_with_a2a(
- a2a_agents=a2a_agents,
- task_description=original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=max_turns,
- agent_cards=agent_cards_dict,
- remote_status_notice=remote_status_notice,
- )
-
- original_response_model = task.response_model
- if disable_structured_output:
- task.response_model = None
-
- raw_result = original_fn(self, task, context, tools)
-
- if disable_structured_output:
- task.response_model = original_response_model
-
- return _process_response_result(
- raw_result=raw_result,
- disable_structured_output=disable_structured_output,
- turn_num=turn_num,
- agent_role=self.role,
- agent_response_model=agent_response_model,
- extension_registry=extension_registry,
- extension_states=extension_states,
- from_task=task,
- from_agent=self,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- )
-
-
-def _delegate_to_a2a(
- self: Agent,
- agent_response: AgentResponseProtocol,
- task: Task,
- original_fn: Callable[..., str],
- context: str | None,
- tools: list[BaseTool] | None,
- agent_cards: dict[str, AgentCard] | None = None,
- original_task_description: str | None = None,
- _extension_registry: ExtensionRegistry | None = None,
-) -> str:
- """Delegate to A2A agent with multi-turn conversation support.
-
- Args:
- self: The agent instance
- agent_response: The AgentResponse indicating delegation
- task: The task being executed (for extracting A2A fields)
- original_fn: The original execute_task method for follow-ups
- context: Optional context for task execution
- tools: Optional tools available to the agent
- agent_cards: Pre-fetched agent cards from _execute_task_with_a2a
- original_task_description: The original task description before A2A augmentation
- _extension_registry: Optional registry of A2A extensions (unused, reserved for future use)
-
- Returns:
- Result from A2A agent
-
- Raises:
- ImportError: If a2a-sdk is not installed
- """
- ctx = _prepare_delegation_context(
- self, agent_response, task, original_task_description
- )
- state = _init_delegation_state(ctx, agent_cards)
- current_request = state.current_request
- context_id = state.context_id
- task_id = state.task_id
- reference_task_ids = state.reference_task_ids
- conversation_history = state.conversation_history
-
- try:
- for turn_num in range(ctx.max_turns):
- agent_branch, accepted_output_modes = _get_turn_context(ctx.agent_config)
-
- merged_metadata = dict(ctx.metadata) if ctx.metadata else {}
- if _extension_registry and conversation_history:
- _ext_states = _extension_registry.extract_all_states(
- conversation_history
- )
- merged_metadata.update(
- _extension_registry.prepare_all_metadata(_ext_states)
- )
-
- a2a_result = execute_a2a_delegation(
- endpoint=ctx.agent_config.endpoint,
- auth=ctx.agent_config.auth,
- timeout=ctx.agent_config.timeout,
- task_description=current_request,
- context_id=context_id,
- task_id=task_id,
- reference_task_ids=reference_task_ids,
- metadata=merged_metadata or None,
- extensions=ctx.extensions,
- conversation_history=conversation_history,
- agent_id=ctx.agent_id,
- agent_role=Role.user,
- agent_branch=agent_branch,
- response_model=ctx.agent_config.response_model,
- turn_number=turn_num + 1,
- updates=ctx.agent_config.updates,
- transport=ctx.agent_config.transport,
- from_task=task,
- from_agent=self,
- client_extensions=getattr(ctx.agent_config, "extensions", None),
- accepted_output_modes=accepted_output_modes,
- input_files=task.input_files,
- )
-
- conversation_history = a2a_result.get("history", [])
-
- if conversation_history:
- latest_message = conversation_history[-1]
- if latest_message.task_id is not None:
- task_id = latest_message.task_id
- if latest_message.context_id is not None:
- context_id = latest_message.context_id
-
- if a2a_result["status"] in [TaskState.completed, TaskState.input_required]:
- trusted_result, task_id, reference_task_ids, remote_notice = (
- _handle_task_completion(
- a2a_result,
- task,
- task_id,
- reference_task_ids,
- ctx.agent_config,
- turn_num,
- from_task=task,
- from_agent=self,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
- )
- if trusted_result is not None:
- return trusted_result
-
- final_result, next_request = _handle_agent_response_and_continue(
- self=self,
- a2a_result=a2a_result,
- agent_id=ctx.agent_id,
- agent_cards=agent_cards,
- a2a_agents=ctx.a2a_agents,
- original_task_description=ctx.original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=ctx.max_turns,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_response_model=ctx.agent_response_model,
- extension_registry=_extension_registry,
- remote_status_notice=remote_notice,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
-
- if final_result is not None:
- return final_result
-
- if next_request is not None:
- current_request = next_request
-
- continue
-
- error_msg = a2a_result.get("error", "Unknown error")
-
- final_result, next_request = _handle_agent_response_and_continue(
- self=self,
- a2a_result=a2a_result,
- agent_id=ctx.agent_id,
- agent_cards=agent_cards,
- a2a_agents=ctx.a2a_agents,
- original_task_description=ctx.original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=ctx.max_turns,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_response_model=ctx.agent_response_model,
- extension_registry=_extension_registry,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
-
- if final_result is not None:
- return final_result
-
- if next_request is not None:
- current_request = next_request
- continue
-
- return _emit_delegation_failed(
- error_msg,
- turn_num,
- task,
- self,
- ctx.agent_config.endpoint,
- state.agent_name,
- state.agent_card_dict,
- )
-
- return _handle_max_turns_exceeded(
- conversation_history,
- ctx.max_turns,
- from_task=task,
- from_agent=self,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
-
- finally:
- task.description = ctx.original_task_description
-
-
async def _afetch_card_from_config(
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
- """Fetch agent card from A2A config asynchronously."""
+ """Async variant of :func:`_fetch_card_from_config`."""
try:
card = await afetch_agent_card(
endpoint=config.endpoint,
@@ -1436,15 +240,15 @@ async def _afetch_card_from_config(
async def _afetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
- """Fetch agent cards concurrently for multiple A2A agents using asyncio."""
+ """Async variant of :func:`_fetch_agent_cards_concurrently`."""
agent_cards: dict[str, AgentCard] = {}
failed_agents: dict[str, str] = {}
if not a2a_agents:
return agent_cards, failed_agents
- tasks = [_afetch_card_from_config(config) for config in a2a_agents]
- results = await asyncio.gather(*tasks)
+ fetch_tasks = [_afetch_card_from_config(config) for config in a2a_agents]
+ results = await asyncio.gather(*fetch_tasks)
for config, result in results:
if isinstance(result, Exception):
@@ -1460,313 +264,273 @@ async def _afetch_agent_cards_concurrently(
return agent_cards, failed_agents
+def _build_unavailable_notice(failed_agents: dict[str, str]) -> str:
+ text = ""
+ for endpoint, error in failed_agents.items():
+ text += f" - {endpoint}: {error}\n"
+ return UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute(unavailable_agents=text)
+
+
+def _augment_prompt_with_a2a(
+ a2a_agents: list[A2AConfig | A2AClientConfig],
+ task_description: str,
+ agent_cards: Mapping[str, AgentCard | dict[str, Any]],
+ failed_agents: dict[str, str] | None = None,
+) -> str:
+ """Add A2A delegation context (the available agent cards) to a prompt.
+
+ Tool-call mechanics are documented inside the template; this only renders
+ the cards themselves so the LLM can see each remote agent's capabilities.
+ """
+ if not agent_cards:
+ return task_description
+
+ agents_text = ""
+ for config in a2a_agents:
+ if config.endpoint in agent_cards:
+ card = agent_cards[config.endpoint]
+ if isinstance(card, dict):
+ filtered = {
+ k: v
+ for k, v in card.items()
+ if k in {"name", "description", "url", "skills"} and v is not None
+ }
+ agents_text += f"\n{json.dumps(filtered, indent=2)}\n"
+ else:
+ agents_text += (
+ "\n"
+ + card.model_dump_json(
+ indent=2,
+ exclude_none=True,
+ include={"name", "description", "url", "skills"},
+ )
+ + "\n"
+ )
+
+ failed_agents = failed_agents or {}
+ if failed_agents:
+ agents_text += "\n\n"
+ for endpoint, error in failed_agents.items():
+ agents_text += (
+ f"\n\n"
+ )
+
+ available = AVAILABLE_AGENTS_TEMPLATE.substitute(available_a2a_agents=agents_text)
+ return f"{task_description}\n{available}\n"
+
+
+def _execute_task_with_a2a(
+ self: Agent,
+ a2a_agents: list[A2AConfig | A2AClientConfig],
+ original_fn: Callable[..., str],
+ task: Task,
+ context: str | None,
+ tools: list[BaseTool] | None,
+ extension_registry: ExtensionRegistry,
+) -> str:
+ """Wrap execute_task with A2A delegation logic (sync)."""
+ original_description: str = task.description
+ agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents)
+
+ if not agent_cards and a2a_agents and failed_agents:
+ task.description = (
+ f"{original_description}{_build_unavailable_notice(failed_agents)}"
+ )
+ try:
+ return original_fn(self, task, context, tools)
+ finally:
+ task.description = original_description
+
+ state = A2ADelegationState(
+ agent=self, task=task, extension_registry=extension_registry
+ )
+ a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state)
+
+ augmented = _augment_prompt_with_a2a(
+ a2a_agents=a2a_agents,
+ task_description=original_description,
+ agent_cards=agent_cards,
+ failed_agents=failed_agents,
+ )
+ if extension_registry:
+ augmented = extension_registry.augment_prompt_with_all(augmented, {})
+
+ task.description = augmented
+ combined_tools: list[BaseTool] = [*(tools or []), *a2a_tools]
+ try:
+ return original_fn(self, task, context, combined_tools)
+ finally:
+ task.description = original_description
+
+
async def _aexecute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task,
- agent_response_model: type[BaseModel] | None,
context: str | None,
tools: list[BaseTool] | None,
extension_registry: ExtensionRegistry,
) -> str:
- """Async version of _execute_task_with_a2a."""
+ """Async variant of :func:`_execute_task_with_a2a`."""
original_description: str = task.description
- original_output_pydantic = task.output_pydantic
- original_response_model = task.response_model
-
agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents)
if not agent_cards and a2a_agents and failed_agents:
- unavailable_agents_text = ""
- for endpoint, error in failed_agents.items():
- unavailable_agents_text += f" - {endpoint}: {error}\n"
-
- notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute(
- unavailable_agents=unavailable_agents_text
+ task.description = (
+ f"{original_description}{_build_unavailable_notice(failed_agents)}"
)
- task.description = f"{original_description}{notice}"
-
try:
return await original_fn(self, task, context, tools)
finally:
task.description = original_description
- task.description, _, extension_states = _augment_prompt_with_a2a(
+ state = A2ADelegationState(
+ agent=self, task=task, extension_registry=extension_registry
+ )
+ a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state)
+
+ augmented = _augment_prompt_with_a2a(
a2a_agents=a2a_agents,
task_description=original_description,
agent_cards=agent_cards,
failed_agents=failed_agents,
- extension_registry=extension_registry,
)
- task.response_model = agent_response_model
+ if extension_registry:
+ augmented = extension_registry.augment_prompt_with_all(augmented, {})
+ task.description = augmented
+ combined_tools: list[BaseTool] = [*(tools or []), *a2a_tools]
try:
- raw_result = await original_fn(self, task, context, tools)
- agent_response = _parse_agent_response(
- raw_result=raw_result, agent_response_model=agent_response_model
- )
-
- if extension_registry and isinstance(agent_response, BaseModel):
- agent_response = extension_registry.process_response_with_all(
- agent_response, extension_states
- )
-
- if isinstance(agent_response, BaseModel) and isinstance(
- agent_response, AgentResponseProtocol
- ):
- if agent_response.is_a2a:
- return await _adelegate_to_a2a(
- self,
- agent_response=agent_response,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_cards=agent_cards,
- original_task_description=original_description,
- _extension_registry=extension_registry,
- )
- task.output_pydantic = None
- return agent_response.message
-
- return raw_result
+ return await original_fn(self, task, context, combined_tools)
finally:
task.description = original_description
- task.output_pydantic = original_output_pydantic
- task.response_model = original_response_model
-async def _ahandle_agent_response_and_continue(
- self: Agent,
- a2a_result: TaskStateResult,
- agent_id: str,
- agent_cards: dict[str, AgentCard] | None,
- a2a_agents: list[A2AConfig | A2AClientConfig],
- original_task_description: str,
- conversation_history: list[Message],
- turn_num: int,
- max_turns: int,
- task: Task,
- original_fn: Callable[..., Coroutine[Any, Any, str]],
- context: str | None,
- tools: list[BaseTool] | None,
- agent_response_model: type[BaseModel] | None,
- extension_registry: ExtensionRegistry | None = None,
- remote_status_notice: str = "",
- endpoint: str | None = None,
- a2a_agent_name: str | None = None,
- agent_card: dict[str, Any] | None = None,
-) -> tuple[str | None, str | None]:
- """Async version of _handle_agent_response_and_continue."""
- agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards)
-
- (
- task.description,
- disable_structured_output,
- extension_states,
- ) = _augment_prompt_with_a2a(
- a2a_agents=a2a_agents,
- task_description=original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=max_turns,
- agent_cards=agent_cards_dict,
- remote_status_notice=remote_status_notice,
- )
-
- original_response_model = task.response_model
- if disable_structured_output:
- task.response_model = None
-
- raw_result = await original_fn(self, task, context, tools)
-
- if disable_structured_output:
- task.response_model = original_response_model
-
- return _process_response_result(
- raw_result=raw_result,
- disable_structured_output=disable_structured_output,
- turn_num=turn_num,
- agent_role=self.role,
- agent_response_model=agent_response_model,
- extension_registry=extension_registry,
- extension_states=extension_states,
- from_task=task,
- from_agent=self,
- endpoint=endpoint,
- a2a_agent_name=a2a_agent_name,
- agent_card=agent_card,
- )
-
-
-async def _adelegate_to_a2a(
- self: Agent,
- agent_response: AgentResponseProtocol,
- task: Task,
- original_fn: Callable[..., Coroutine[Any, Any, str]],
- context: str | None,
- tools: list[BaseTool] | None,
- agent_cards: dict[str, AgentCard] | None = None,
- original_task_description: str | None = None,
- _extension_registry: ExtensionRegistry | None = None,
-) -> str:
- """Async version of _delegate_to_a2a."""
- ctx = _prepare_delegation_context(
- self, agent_response, task, original_task_description
- )
- state = _init_delegation_state(ctx, agent_cards)
- current_request = state.current_request
- context_id = state.context_id
- task_id = state.task_id
- reference_task_ids = state.reference_task_ids
- conversation_history = state.conversation_history
-
+@contextlib.contextmanager
+def _temporarily_extend_tools(agent: Agent, extra: list[BaseTool]) -> Iterator[None]:
+ """Append ``extra`` to ``agent.tools`` for the lifetime of the context."""
+ if not extra:
+ yield
+ return
+ original_tools = agent.tools
+ if original_tools is None:
+ agent.tools = list(extra)
+ else:
+ agent.tools = [*original_tools, *extra]
try:
- for turn_num in range(ctx.max_turns):
- agent_branch, accepted_output_modes = _get_turn_context(ctx.agent_config)
+ yield
+ finally:
+ agent.tools = original_tools
- merged_metadata = dict(ctx.metadata) if ctx.metadata else {}
- if _extension_registry and conversation_history:
- _ext_states = _extension_registry.extract_all_states(
- conversation_history
- )
- merged_metadata.update(
- _extension_registry.prepare_all_metadata(_ext_states)
- )
- a2a_result = await aexecute_a2a_delegation(
- endpoint=ctx.agent_config.endpoint,
- auth=ctx.agent_config.auth,
- timeout=ctx.agent_config.timeout,
- task_description=current_request,
- context_id=context_id,
- task_id=task_id,
- reference_task_ids=reference_task_ids,
- metadata=merged_metadata or None,
- extensions=ctx.extensions,
- conversation_history=conversation_history,
- agent_id=ctx.agent_id,
- agent_role=Role.user,
- agent_branch=agent_branch,
- response_model=ctx.agent_config.response_model,
- turn_number=turn_num + 1,
- transport=ctx.agent_config.transport,
- updates=ctx.agent_config.updates,
- from_task=task,
- from_agent=self,
- client_extensions=getattr(ctx.agent_config, "extensions", None),
- accepted_output_modes=accepted_output_modes,
- input_files=task.input_files,
- )
+def _kickoff_description(messages: str | list[Any]) -> str:
+ if isinstance(messages, str):
+ return messages
+ content = next(
+ (m["content"] for m in reversed(messages) if m["role"] == "user"),
+ None,
+ )
+ return content if isinstance(content, str) else ""
- conversation_history = a2a_result.get("history", [])
- if conversation_history:
- latest_message = conversation_history[-1]
- if latest_message.task_id is not None:
- task_id = latest_message.task_id
- if latest_message.context_id is not None:
- context_id = latest_message.context_id
+def _kickoff_with_a2a(
+ self: Agent,
+ a2a_agents: list[A2AConfig | A2AClientConfig],
+ original_kickoff: Callable[..., LiteAgentOutput],
+ messages: str | list[Any],
+ response_format: type[Any] | None,
+ input_files: dict[str, Any] | None,
+ extension_registry: ExtensionRegistry,
+) -> LiteAgentOutput:
+ """Execute kickoff with A2A delegation support (sync)."""
+ description = _kickoff_description(messages)
+ if not description:
+ return original_kickoff(self, messages, response_format, input_files)
- if a2a_result["status"] in [TaskState.completed, TaskState.input_required]:
- trusted_result, task_id, reference_task_ids, remote_notice = (
- _handle_task_completion(
- a2a_result,
- task,
- task_id,
- reference_task_ids,
- ctx.agent_config,
- turn_num,
- from_task=task,
- from_agent=self,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
- )
- if trusted_result is not None:
- return trusted_result
+ agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents)
+ if not agent_cards and a2a_agents and failed_agents:
+ return original_kickoff(self, messages, response_format, input_files)
- final_result, next_request = await _ahandle_agent_response_and_continue(
- self=self,
- a2a_result=a2a_result,
- agent_id=ctx.agent_id,
- agent_cards=agent_cards,
- a2a_agents=ctx.a2a_agents,
- original_task_description=ctx.original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=ctx.max_turns,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_response_model=ctx.agent_response_model,
- extension_registry=_extension_registry,
- remote_status_notice=remote_notice,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
+ fake_task = Task(
+ description=description,
+ agent=self,
+ expected_output="Result from A2A delegation",
+ input_files=input_files or {},
+ )
+ state = A2ADelegationState(
+ agent=self, task=fake_task, extension_registry=extension_registry
+ )
+ a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state)
- if final_result is not None:
- return final_result
+ augmented = _augment_prompt_with_a2a(
+ a2a_agents=a2a_agents,
+ task_description=description,
+ agent_cards=agent_cards,
+ failed_agents=failed_agents,
+ )
+ if extension_registry:
+ augmented = extension_registry.augment_prompt_with_all(augmented, {})
- if next_request is not None:
- current_request = next_request
+ if isinstance(messages, str):
+ wrapped_messages: str | list[Any] = augmented
+ else:
+ wrapped_messages = [*messages, {"role": "user", "content": augmented}]
- continue
+ with _temporarily_extend_tools(self, a2a_tools):
+ return original_kickoff(self, wrapped_messages, response_format, input_files)
- error_msg = a2a_result.get("error", "Unknown error")
- final_result, next_request = await _ahandle_agent_response_and_continue(
- self=self,
- a2a_result=a2a_result,
- agent_id=ctx.agent_id,
- agent_cards=agent_cards,
- a2a_agents=ctx.a2a_agents,
- original_task_description=ctx.original_task_description,
- conversation_history=conversation_history,
- turn_num=turn_num,
- max_turns=ctx.max_turns,
- task=task,
- original_fn=original_fn,
- context=context,
- tools=tools,
- agent_response_model=ctx.agent_response_model,
- extension_registry=_extension_registry,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
- )
-
- if final_result is not None:
- return final_result
-
- if next_request is not None:
- current_request = next_request
- continue
-
- return _emit_delegation_failed(
- error_msg,
- turn_num,
- task,
- self,
- ctx.agent_config.endpoint,
- state.agent_name,
- state.agent_card_dict,
- )
-
- return _handle_max_turns_exceeded(
- conversation_history,
- ctx.max_turns,
- from_task=task,
- from_agent=self,
- endpoint=ctx.agent_config.endpoint,
- a2a_agent_name=state.agent_name,
- agent_card=state.agent_card_dict,
+async def _akickoff_with_a2a(
+ self: Agent,
+ a2a_agents: list[A2AConfig | A2AClientConfig],
+ original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]],
+ messages: str | list[Any],
+ response_format: type[Any] | None,
+ input_files: dict[str, Any] | None,
+ extension_registry: ExtensionRegistry,
+) -> LiteAgentOutput:
+ """Execute kickoff with A2A delegation support (async)."""
+ description = _kickoff_description(messages)
+ if not description:
+ return await original_kickoff_async(
+ self, messages, response_format, input_files
)
- finally:
- task.description = ctx.original_task_description
+ agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents)
+ if not agent_cards and a2a_agents and failed_agents:
+ return await original_kickoff_async(
+ self, messages, response_format, input_files
+ )
+
+ fake_task = Task(
+ description=description,
+ agent=self,
+ expected_output="Result from A2A delegation",
+ input_files=input_files or {},
+ )
+ state = A2ADelegationState(
+ agent=self, task=fake_task, extension_registry=extension_registry
+ )
+ a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state)
+
+ augmented = _augment_prompt_with_a2a(
+ a2a_agents=a2a_agents,
+ task_description=description,
+ agent_cards=agent_cards,
+ failed_agents=failed_agents,
+ )
+ if extension_registry:
+ augmented = extension_registry.augment_prompt_with_all(augmented, {})
+
+ if isinstance(messages, str):
+ wrapped_messages: str | list[Any] = augmented
+ else:
+ wrapped_messages = [*messages, {"role": "user", "content": augmented}]
+
+ with _temporarily_extend_tools(self, a2a_tools):
+ return await original_kickoff_async(
+ self, wrapped_messages, response_format, input_files
+ )
diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py
index 561307680..8def29f27 100644
--- a/lib/crewai/src/crewai/agent/core.py
+++ b/lib/crewai/src/crewai/agent/core.py
@@ -112,12 +112,6 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
-try:
- from crewai.a2a.types import AgentResponseProtocol
-except ImportError:
- AgentResponseProtocol = None # type: ignore[assignment, misc]
-
-
if TYPE_CHECKING:
from crewai_files import FileInput
@@ -626,15 +620,7 @@ class Agent(BaseAgent):
result = process_tool_results(self, result)
- output_for_event = result
- if (
- AgentResponseProtocol is not None
- and isinstance(result, BaseModel)
- and isinstance(result, AgentResponseProtocol)
- ):
- output_for_event = str(result.message)
- elif not isinstance(result, str):
- output_for_event = str(result)
+ output_for_event = result if isinstance(result, str) else str(result)
crewai_event_bus.emit(
self,
diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py
index cd9823e15..e16530cbf 100644
--- a/lib/crewai/src/crewai/lite_agent.py
+++ b/lib/crewai/src/crewai/lite_agent.py
@@ -121,11 +121,11 @@ def _kickoff_with_a2a_support(
Returns:
LiteAgentOutput from either local execution or A2A delegation.
"""
- from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
+ from crewai.a2a.utils.response_model import extract_a2a_client_configs
from crewai.a2a.wrapper import _execute_task_with_a2a
from crewai.task import Task
- a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a)
+ a2a_agents = extract_a2a_client_configs(agent.a2a)
if not a2a_agents:
return original_kickoff(messages, response_format, input_files)
@@ -160,7 +160,6 @@ def _kickoff_with_a2a_support(
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
- agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,
diff --git a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py
index 6347f8e1c..f33a2351f 100644
--- a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py
+++ b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py
@@ -1,13 +1,14 @@
-"""Test trust_remote_completion_status flag in A2A wrapper."""
+"""Tests for A2A delegation tool behavior, including trust_remote_completion_status."""
from unittest.mock import MagicMock, patch
import pytest
-from crewai.a2a.config import A2AConfig
+from crewai.a2a.config import A2AClientConfig, A2AConfig
+
try:
- from a2a.types import Message, Role
+ from a2a.types import TaskState # noqa: F401
A2A_SDK_INSTALLED = True
except ImportError:
@@ -15,141 +16,126 @@ except ImportError:
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
- """Create a mock agent card with proper model_dump behavior."""
+ """Create a mock agent card with the attributes A2ADelegationTool reads."""
mock_card = MagicMock()
mock_card.name = name
mock_card.url = url
+ mock_card.description = "A test agent"
+ mock_card.skills = []
mock_card.model_dump.return_value = {"name": name, "url": url}
- mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
return mock_card
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
-def test_trust_remote_completion_status_true_returns_directly():
- """When trust_remote_completion_status=True and A2A returns completed, return result directly."""
- from crewai.a2a.wrapper import _delegate_to_a2a
- from crewai.a2a.types import AgentResponseProtocol
+def test_delegation_tool_returns_remote_result_on_completion():
+ """A successful remote completion is returned to the local LLM as the tool result."""
+ from a2a.types import TaskState
+
from crewai import Agent, Task
+ from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
- a2a_config = A2AConfig(
- endpoint="http://test-endpoint.com",
- trust_remote_completion_status=True,
- )
-
- agent = Agent(
- role="test manager",
- goal="coordinate",
- backstory="test",
- a2a=a2a_config,
- )
-
+ config = A2AClientConfig(endpoint="http://test-endpoint.com")
+ agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
- class MockResponse:
- is_a2a = True
- message = "Please help"
- a2a_ids = ["http://test-endpoint.com/"]
+ card = _create_mock_agent_card()
+ state = A2ADelegationState(agent=agent, task=task)
+ tools = build_a2a_tools([config], {config.endpoint: card}, state)
+ assert len(tools) == 1
+ tool = tools[0]
- with (
- patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
- patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
- ):
- mock_card = _create_mock_agent_card()
- mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
-
- # A2A returns completed
+ with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
- "status": "completed",
+ "status": TaskState.completed,
"result": "Done by remote",
"history": [],
}
+ result = tool._run(message="Please help")
- # This should return directly without checking LLM response
- result = _delegate_to_a2a(
- self=agent,
- agent_response=MockResponse(),
- task=task,
- original_fn=lambda *args, **kwargs: "fallback",
- context=None,
- tools=None,
- agent_cards={"http://test-endpoint.com/": mock_card},
- original_task_description="test",
- )
-
- assert result == "Done by remote"
- assert mock_execute.call_count == 1
+ assert result == "Done by remote"
+ assert mock_execute.call_count == 1
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
-def test_trust_remote_completion_status_false_continues_conversation():
- """When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
- from crewai.a2a.wrapper import _delegate_to_a2a
+def test_delegation_tool_records_completed_task_in_references():
+ """When a remote task completes with a task_id, it goes into reference_task_ids."""
+ from a2a.types import TaskState
+
from crewai import Agent, Task
+ from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
- a2a_config = A2AConfig(
- endpoint="http://test-endpoint.com",
- trust_remote_completion_status=False,
- )
-
- agent = Agent(
- role="test manager",
- goal="coordinate",
- backstory="test",
- a2a=a2a_config,
- )
-
+ config = A2AClientConfig(endpoint="http://test-endpoint.com")
+ agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
- class MockResponse:
- is_a2a = True
- message = "Please help"
- a2a_ids = ["http://test-endpoint.com/"]
+ card = _create_mock_agent_card()
+ state = A2ADelegationState(agent=agent, task=task)
+ [tool] = build_a2a_tools([config], {config.endpoint: card}, state)
- call_count = 0
+ history_msg = MagicMock()
+ history_msg.task_id = "remote-task-1"
+ history_msg.context_id = "ctx-1"
- def mock_original_fn(self, task, context, tools):
- nonlocal call_count
- call_count += 1
- if call_count == 1:
- # Server decides to finish
- return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
- return "unexpected"
-
- with (
- patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
- patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
- ):
- mock_card = _create_mock_agent_card()
- mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
-
- # A2A returns completed
+ with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
- "status": "completed",
- "result": "Done by remote",
- "history": [],
+ "status": TaskState.completed,
+ "result": "Done",
+ "history": [history_msg],
}
+ tool._run(message="Please help")
- result = _delegate_to_a2a(
- self=agent,
- agent_response=MockResponse(),
- task=task,
- original_fn=mock_original_fn,
- context=None,
- tools=None,
- agent_cards={"http://test-endpoint.com/": mock_card},
- original_task_description="test",
- )
-
- # Should call original_fn to get server response
- assert call_count >= 1
- assert result == "Server final answer"
+ endpoint_state = state._per_endpoint[config.endpoint]
+ assert "remote-task-1" in endpoint_state.reference_task_ids
+ assert endpoint_state.task_id is None
+ assert task.config is not None
+ assert task.config["reference_task_ids"] == ["remote-task-1"]
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
+def test_delegation_tool_returns_error_message_on_failure():
+ """A non-completed/non-input-required status surfaces as a readable error string."""
+ from a2a.types import TaskState
+
+ from crewai import Agent, Task
+ from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
+
+ config = A2AClientConfig(endpoint="http://test-endpoint.com")
+ agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
+ task = Task(description="test", expected_output="test", agent=agent)
+
+ card = _create_mock_agent_card()
+ state = A2ADelegationState(agent=agent, task=task)
+ [tool] = build_a2a_tools([config], {config.endpoint: card}, state)
+
+ with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
+ mock_execute.return_value = {
+ "status": TaskState.failed,
+ "error": "remote agent unreachable",
+ "history": [],
+ }
+ result = tool._run(message="Please help")
+
+ assert "remote agent unreachable" in result
+
+
+@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
+def test_delegation_tool_respects_max_turns_via_usage_count():
+ """A2AConfig.max_turns wires through to BaseTool.max_usage_count."""
+ from crewai import Agent, Task
+ from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
+
+ config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2)
+ agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
+ task = Task(description="test", expected_output="test", agent=agent)
+
+ card = _create_mock_agent_card()
+ state = A2ADelegationState(agent=agent, task=task)
+ [tool] = build_a2a_tools([config], {config.endpoint: card}, state)
+
+ assert tool.max_usage_count == 2
+
+
def test_default_trust_remote_completion_status_is_false():
"""Verify that default value of trust_remote_completion_status is False."""
- a2a_config = A2AConfig(
- endpoint="http://test-endpoint.com",
- )
-
+ a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
assert a2a_config.trust_remote_completion_status is False
\ No newline at end of file