Compare commits

...

4 Commits

Author SHA1 Message Date
Greyson LaLonde
f32fe819b4 Merge branch 'main' into gl/refactor/a2a-tool-based-delegation 2026-05-13 08:22:50 +08:00
Greyson LaLonde
189d769cb9 Merge branch 'main' into gl/refactor/a2a-tool-based-delegation 2026-05-09 00:18:21 +08:00
Greyson Lalonde
6fe34644ab refactor(a2a): use tool calling for delegation instead of structured output
Each remote A2A agent is now exposed to the local LLM as a BaseTool
(delegate_to_<card_name>); the local agent's tool-call loop drives
multi-turn delegation. The Literal-constrained AgentResponse model and
the explicit per-turn re-prompting loop are gone.

Closes #3897. The original failure mode — Pydantic literal_error when
skill.id != endpoint URL, and Gemini flash-lite hallucinating
out-of-enum values — is structurally impossible: provider-side tool-call
validation enforces the tool name, and there's no competing identifier.
2026-05-08 22:16:17 +08:00
Greyson Lalonde
27463ce8c4 chore(deps): bump mem0ai to >=2.0.0 to address GHSA-xqxw-r767-67m7 2026-05-08 22:07:40 +08:00
8 changed files with 752 additions and 1731 deletions

View File

@@ -1,23 +1,20 @@
"""String templates for A2A (Agent-to-Agent) protocol messaging and status."""
"""String templates for A2A (Agent-to-Agent) delegation prompts."""
from string import Template
from typing import Final
AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template(
"\n<AVAILABLE_A2A_AGENTS>\n $available_a2a_agents\n</AVAILABLE_A2A_AGENTS>\n"
)
PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template(
"\n<PREVIOUS_A2A_CONVERSATION>\n"
" $previous_a2a_conversation"
"\n</PREVIOUS_A2A_CONVERSATION>\n"
)
CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template(
"\n<CONVERSATION_PROGRESS>\n"
' turn="$turn_count"\n'
' max_turns="$max_turns"\n'
" $warning"
"\n</CONVERSATION_PROGRESS>\n"
"\n<AVAILABLE_A2A_AGENTS>\n"
"You can delegate to remote agents using the delegate_to_* tools below. "
"Each tool's description lists the remote agent's capabilities — call the "
"tool whose capabilities best match the task. Pass the question or sub-task "
"to the remote agent via the tool's `message` argument; the tool returns "
"the remote agent's response, which you should incorporate into your final "
"answer. If the available agents are not a good fit, answer directly "
"without calling a delegation tool.\n\n"
" $available_a2a_agents"
"\n</AVAILABLE_A2A_AGENTS>\n"
)
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
"\n<A2A_AGENTS_STATUS>\n"
@@ -27,29 +24,3 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
" $unavailable_agents"
"\n</A2A_AGENTS_STATUS>\n"
)
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: COMPLETED
The remote agent has finished processing your request. Their response is in the conversation history above.
You MUST now:
1. Extract the answer from the conversation history
2. Set is_a2a=false
3. Return the answer as your final message
DO NOT send another request - the task is already done.
</REMOTE_AGENT_STATUS>
"""
REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """
<REMOTE_AGENT_STATUS>
STATUS: RESPONSE_RECEIVED
The remote agent has responded. Their response is in the conversation history above.
You MUST now:
1. Set is_a2a=false (the remote task is complete and cannot receive more messages)
2. Provide YOUR OWN response to the original task based on the information received
IMPORTANT: Your response should be addressed to the USER who gave you the original task.
Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that...").
Do NOT address the remote agent directly or use "you" to refer to them.
</REMOTE_AGENT_STATUS>
"""

View File

@@ -0,0 +1,394 @@
"""Tool-based A2A delegation.
Each remote A2A agent is exposed to the local LLM as a BaseTool. The local
agent's normal tool-call loop drives multi-turn delegation: each tool call is
one turn against the remote agent. Per-endpoint conversation state lives in
``A2ADelegationState`` and is shared across the tools built for a single task.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from a2a.types import Role, TaskState
from pydantic import BaseModel, Field, PrivateAttr
from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.extensions.base import (
A2AExtension,
ConversationState,
ExtensionRegistry,
)
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import A2AConversationCompletedEvent
from crewai.tools.base_tool import BaseTool
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
from a2a.types import AgentCard, Message
from crewai.task import Task
_DELEGATE_PREFIX = "delegate_to_"
@dataclass
class _EndpointState:
"""Mutable per-endpoint conversation state across tool calls."""
conversation_history: list[Message] = field(default_factory=list)
context_id: str | None = None
task_id: str | None = None
reference_task_ids: list[str] = field(default_factory=list)
turn_count: int = 0
@dataclass
class A2ADelegationState:
"""State shared across all A2A delegation tools for a single task execution."""
agent: Any
task: Task
extension_registry: ExtensionRegistry | None = None
_per_endpoint: dict[str, _EndpointState] = field(default_factory=dict)
def _state_for(self, endpoint: str) -> _EndpointState:
return self._per_endpoint.setdefault(endpoint, _EndpointState())
def _initial_ids_from_task(self, state: _EndpointState) -> None:
if state.turn_count > 0:
return
task_config = self.task.config or {}
if state.context_id is None:
state.context_id = task_config.get("context_id")
if state.task_id is None:
state.task_id = task_config.get("task_id")
if not state.reference_task_ids:
state.reference_task_ids = list(task_config.get("reference_task_ids", []))
def delegate(
self,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
"""Run one delegation turn against ``config.endpoint``.
Returns the remote agent's response text, suitable for handing back to
the local LLM as a tool result.
"""
return _run_delegation(self, config, agent_card, message, sync=True)
async def adelegate(
self,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
"""Async variant of :meth:`delegate`."""
return await _run_delegation_async(self, config, agent_card, message)
class _A2ADelegationArgs(BaseModel):
"""Argument schema for A2A delegation tools."""
message: str = Field(
...,
description=(
"The question or task to send to the remote agent. Be specific and "
"self-contained: the remote agent does not see your other tools or "
"your prior reasoning."
),
)
class A2ADelegationTool(BaseTool):
"""BaseTool that delegates one turn of conversation to a remote A2A agent.
Each instance is bound to a specific A2A endpoint via ``_config``. Calling
``_run`` or ``_arun`` advances that endpoint's conversation by one turn and
returns the remote agent's response text.
"""
args_schema: type[BaseModel] = _A2ADelegationArgs
_config: A2AConfig | A2AClientConfig = PrivateAttr()
_agent_card: AgentCard | None = PrivateAttr(default=None)
_state: A2ADelegationState = PrivateAttr()
def _run(self, message: str) -> str:
return self._state.delegate(self._config, self._agent_card, message)
async def _arun(self, message: str) -> str:
return await self._state.adelegate(self._config, self._agent_card, message)
def build_a2a_tools(
a2a_agents: list[A2AConfig | A2AClientConfig],
agent_cards: dict[str, AgentCard],
state: A2ADelegationState,
) -> list[BaseTool]:
"""Build one ``A2ADelegationTool`` per available A2A agent.
Tool names collide-disambiguate with a numeric suffix; agents whose cards
failed to fetch are skipped.
"""
tools: list[BaseTool] = []
used_names: set[str] = set()
for config in a2a_agents:
card = agent_cards.get(config.endpoint)
if card is None:
continue
name = _build_tool_name(card.name or "remote_agent", used_names)
used_names.add(name)
tool = A2ADelegationTool(
name=name,
description=_build_tool_description(card),
max_usage_count=config.max_turns,
)
tool._config = config
tool._agent_card = card
tool._state = state
tools.append(tool)
return tools
def _build_tool_name(card_name: str, used: set[str]) -> str:
base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}")
if base not in used:
return base
for i in range(2, 1000):
candidate = sanitize_tool_name(f"{base}_{i}")
if candidate not in used:
return candidate
raise ValueError(f"Could not generate unique tool name for {card_name!r}")
def _build_tool_description(card: AgentCard) -> str:
lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."]
if card.description:
lines.append(card.description.strip())
if card.skills:
skill_names = ", ".join(s.name for s in card.skills if s.name)
if skill_names:
lines.append(f"Capabilities: {skill_names}.")
lines.append(
"Use this tool only when the question matches the agent's capabilities. "
"After receiving a response, prefer answering directly unless you need "
"another round-trip."
)
return "\n".join(lines)
def _run_delegation(
state: A2ADelegationState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
*,
sync: bool,
) -> str:
endpoint_state = state._state_for(config.endpoint)
state._initial_ids_from_task(endpoint_state)
extension_states = _extract_extension_states(state, endpoint_state)
metadata = _merged_metadata(state, endpoint_state, extension_states)
agent_branch, accepted_output_modes = _turn_context(config)
a2a_result = execute_a2a_delegation(
endpoint=config.endpoint,
auth=config.auth,
timeout=config.timeout,
task_description=message,
context_id=endpoint_state.context_id,
task_id=endpoint_state.task_id,
reference_task_ids=endpoint_state.reference_task_ids,
metadata=metadata or None,
extensions=(state.task.config or {}).get("extensions"),
conversation_history=endpoint_state.conversation_history,
agent_id=config.endpoint,
agent_role=Role.user,
agent_branch=agent_branch,
response_model=config.response_model,
turn_number=endpoint_state.turn_count + 1,
updates=config.updates,
transport=config.transport,
from_task=state.task,
from_agent=state.agent,
client_extensions=getattr(config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=state.task.input_files,
)
return _finalize_turn(
state, endpoint_state, config, agent_card, a2a_result, extension_states
)
async def _run_delegation_async(
state: A2ADelegationState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
message: str,
) -> str:
endpoint_state = state._state_for(config.endpoint)
state._initial_ids_from_task(endpoint_state)
extension_states = _extract_extension_states(state, endpoint_state)
metadata = _merged_metadata(state, endpoint_state, extension_states)
agent_branch, accepted_output_modes = _turn_context(config)
a2a_result = await aexecute_a2a_delegation(
endpoint=config.endpoint,
auth=config.auth,
timeout=config.timeout,
task_description=message,
context_id=endpoint_state.context_id,
task_id=endpoint_state.task_id,
reference_task_ids=endpoint_state.reference_task_ids,
metadata=metadata or None,
extensions=(state.task.config or {}).get("extensions"),
conversation_history=endpoint_state.conversation_history,
agent_id=config.endpoint,
agent_role=Role.user,
agent_branch=agent_branch,
response_model=config.response_model,
turn_number=endpoint_state.turn_count + 1,
updates=config.updates,
transport=config.transport,
from_task=state.task,
from_agent=state.agent,
client_extensions=getattr(config, "extensions", None),
accepted_output_modes=accepted_output_modes,
input_files=state.task.input_files,
)
return _finalize_turn(
state, endpoint_state, config, agent_card, a2a_result, extension_states
)
def _extract_extension_states(
state: A2ADelegationState,
endpoint_state: _EndpointState,
) -> dict[type[A2AExtension], ConversationState]:
if state.extension_registry and endpoint_state.conversation_history:
return state.extension_registry.extract_all_states(
endpoint_state.conversation_history
)
return {}
def _merged_metadata(
state: A2ADelegationState,
endpoint_state: _EndpointState,
extension_states: dict[type[A2AExtension], ConversationState],
) -> dict[str, Any]:
task_config = state.task.config or {}
metadata: dict[str, Any] = dict(task_config.get("metadata") or {})
if state.extension_registry and extension_states:
metadata.update(state.extension_registry.prepare_all_metadata(extension_states))
return metadata
def _turn_context(
config: A2AConfig | A2AClientConfig,
) -> tuple[Any | None, list[str] | None]:
console_formatter = getattr(crewai_event_bus, "_console", None)
agent_branch = None
if console_formatter:
agent_branch = getattr(
console_formatter, "current_agent_branch", None
) or getattr(console_formatter, "current_task_branch", None)
accepted_output_modes = None
if isinstance(config, A2AClientConfig):
accepted_output_modes = config.accepted_output_modes
return agent_branch, accepted_output_modes
def _finalize_turn(
state: A2ADelegationState,
endpoint_state: _EndpointState,
config: A2AConfig | A2AClientConfig,
agent_card: AgentCard | None,
a2a_result: TaskStateResult,
extension_states: dict[type[A2AExtension], ConversationState],
) -> str:
endpoint_state.conversation_history = list(a2a_result.get("history", []))
if endpoint_state.conversation_history:
latest = endpoint_state.conversation_history[-1]
if latest.task_id is not None:
endpoint_state.task_id = latest.task_id
if latest.context_id is not None:
endpoint_state.context_id = latest.context_id
endpoint_state.turn_count += 1
status = a2a_result.get("status")
if status == TaskState.completed:
if (
endpoint_state.task_id is not None
and endpoint_state.task_id not in endpoint_state.reference_task_ids
):
endpoint_state.reference_task_ids.append(endpoint_state.task_id)
if state.task.config is None:
state.task.config = {}
state.task.config["reference_task_ids"] = list(
endpoint_state.reference_task_ids
)
endpoint_state.task_id = None
result_text = str(a2a_result.get("result", ""))
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=result_text,
error=None,
total_turns=endpoint_state.turn_count,
from_task=state.task,
from_agent=state.agent,
endpoint=config.endpoint,
a2a_agent_name=agent_card.name if agent_card else None,
agent_card=agent_card.model_dump() if agent_card else None,
),
)
return _apply_response_extensions(state, result_text, extension_states)
if status == TaskState.input_required:
result_text = str(a2a_result.get("result", ""))
return _apply_response_extensions(state, result_text, extension_states)
error_msg = a2a_result.get("error", "Unknown error")
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="failed",
final_result=None,
error=error_msg,
total_turns=endpoint_state.turn_count,
from_task=state.task,
from_agent=state.agent,
endpoint=config.endpoint,
a2a_agent_name=agent_card.name if agent_card else None,
agent_card=agent_card.model_dump() if agent_card else None,
),
)
return f"Remote agent error: {error_msg}"
def _apply_response_extensions(
state: A2ADelegationState,
response_text: str,
extension_states: dict[type[A2AExtension], ConversationState],
) -> str:
if not state.extension_registry:
return response_text
processed = state.extension_registry.process_response_with_all(
response_text, extension_states
)
return processed if isinstance(processed, str) else str(processed)

View File

@@ -6,8 +6,6 @@ from typing import (
Annotated,
Any,
Literal,
Protocol,
runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
@@ -57,15 +55,6 @@ Url = Annotated[
]
@runtime_checkable
class AgentResponseProtocol(Protocol):
"""Protocol for the dynamically created AgentResponse model."""
a2a_ids: tuple[str, ...]
message: str
is_a2a: bool
class PartsMetadataDict(TypedDict, total=False):
"""Metadata for A2A message parts.

View File

@@ -1,75 +1,25 @@
"""Response model utilities for A2A agent interactions."""
"""Helpers for extracting A2A client configurations."""
from __future__ import annotations
from typing import TypeAlias
from pydantic import BaseModel, Field, create_model
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs.
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
or None if agent_ids is empty.
"""
if not agent_ids:
return None
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
def extract_a2a_client_configs(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
) -> list[A2AClientConfigTypes]:
"""Return the client-side A2A configs from a possibly-mixed config list.
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs list and agent endpoint IDs.
Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to.
"""
if a2a_config is None:
return [], ()
return []
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
@@ -77,24 +27,6 @@ def extract_a2a_agent_ids_from_config(
else:
configs = a2a_config
client_configs: list[A2AClientConfigTypes] = [
return [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
return client_configs, tuple(config.endpoint for config in client_configs)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
"""Get A2A agent configs and response model.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs and response model.
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

File diff suppressed because it is too large Load Diff

View File

@@ -111,12 +111,6 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.types import AgentResponseProtocol
except ImportError:
AgentResponseProtocol = None # type: ignore[assignment, misc]
if TYPE_CHECKING:
from crewai_files import FileInput
@@ -632,15 +626,7 @@ class Agent(BaseAgent):
result = process_tool_results(self, result)
output_for_event = result
if (
AgentResponseProtocol is not None
and isinstance(result, BaseModel)
and isinstance(result, AgentResponseProtocol)
):
output_for_event = str(result.message)
elif not isinstance(result, str):
output_for_event = str(result)
output_for_event = result if isinstance(result, str) else str(result)
crewai_event_bus.emit(
self,

View File

@@ -121,11 +121,11 @@ def _kickoff_with_a2a_support(
Returns:
LiteAgentOutput from either local execution or A2A delegation.
"""
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
from crewai.a2a.utils.response_model import extract_a2a_client_configs
from crewai.a2a.wrapper import _execute_task_with_a2a
from crewai.task import Task
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a)
a2a_agents = extract_a2a_client_configs(agent.a2a)
if not a2a_agents:
return original_kickoff(messages, response_format, input_files)
@@ -160,7 +160,6 @@ def _kickoff_with_a2a_support(
a2a_agents=a2a_agents,
original_fn=task_to_kickoff_adapter,
task=fake_task,
agent_response_model=agent_response_model,
context=None,
tools=None,
extension_registry=extension_registry,

View File

@@ -1,13 +1,14 @@
"""Test trust_remote_completion_status flag in A2A wrapper."""
"""Tests for A2A delegation tool behavior, including trust_remote_completion_status."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.a2a.config import A2AConfig
from crewai.a2a.config import A2AClientConfig, A2AConfig
try:
from a2a.types import Message, Role
from a2a.types import TaskState # noqa: F401
A2A_SDK_INSTALLED = True
except ImportError:
@@ -15,141 +16,126 @@ except ImportError:
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
"""Create a mock agent card with proper model_dump behavior."""
"""Create a mock agent card with the attributes A2ADelegationTool reads."""
mock_card = MagicMock()
mock_card.name = name
mock_card.url = url
mock_card.description = "A test agent"
mock_card.skills = []
mock_card.model_dump.return_value = {"name": name, "url": url}
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
return mock_card
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_true_returns_directly():
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai.a2a.types import AgentResponseProtocol
def test_delegation_tool_returns_remote_result_on_completion():
"""A successful remote completion is returned to the local LLM as the tool result."""
from a2a.types import TaskState
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
tools = build_a2a_tools([config], {config.endpoint: card}, state)
assert len(tools) == 1
tool = tools[0]
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
"status": "completed",
"status": TaskState.completed,
"result": "Done by remote",
"history": [],
}
result = tool._run(message="Please help")
# This should return directly without checking LLM response
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
assert result == "Done by remote"
assert mock_execute.call_count == 1
assert result == "Done by remote"
assert mock_execute.call_count == 1
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_false_continues_conversation():
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
from crewai.a2a.wrapper import _delegate_to_a2a
def test_delegation_tool_records_completed_task_in_references():
"""When a remote task completes with a task_id, it goes into reference_task_ids."""
from a2a.types import TaskState
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=False,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
call_count = 0
history_msg = MagicMock()
history_msg.task_id = "remote-task-1"
history_msg.context_id = "ctx-1"
def mock_original_fn(self, task, context, tools):
nonlocal call_count
call_count += 1
if call_count == 1:
# Server decides to finish
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
return "unexpected"
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
"status": TaskState.completed,
"result": "Done",
"history": [history_msg],
}
tool._run(message="Please help")
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=mock_original_fn,
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Should call original_fn to get server response
assert call_count >= 1
assert result == "Server final answer"
endpoint_state = state._per_endpoint[config.endpoint]
assert "remote-task-1" in endpoint_state.reference_task_ids
assert endpoint_state.task_id is None
assert task.config is not None
assert task.config["reference_task_ids"] == ["remote-task-1"]
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_returns_error_message_on_failure():
"""A non-completed/non-input-required status surfaces as a readable error string."""
from a2a.types import TaskState
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com")
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
mock_execute.return_value = {
"status": TaskState.failed,
"error": "remote agent unreachable",
"history": [],
}
result = tool._run(message="Please help")
assert "remote agent unreachable" in result
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_delegation_tool_respects_max_turns_via_usage_count():
"""A2AConfig.max_turns wires through to BaseTool.max_usage_count."""
from crewai import Agent, Task
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2)
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
task = Task(description="test", expected_output="test", agent=agent)
card = _create_mock_agent_card()
state = A2ADelegationState(agent=agent, task=task)
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
assert tool.max_usage_count == 2
def test_default_trust_remote_completion_status_is_false():
"""Verify that default value of trust_remote_completion_status is False."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
assert a2a_config.trust_remote_completion_status is False