feat: a2a trust remote completion status flag
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled

- add trust_remote_completion_status flag to A2AConfig, Adds configuration flag to control whether to trust A2A agent completion status. Resolves #3899
- update docs
This commit is contained in:
Greyson LaLonde
2025-11-13 13:43:09 -05:00
committed by GitHub
parent 528d812263
commit d7bdac12a2
4 changed files with 175 additions and 2 deletions

View File

@@ -83,6 +83,10 @@ The `A2AConfig` class accepts the following parameters:
Whether to raise an error immediately if agent connection fails. When `False`, the agent continues with available agents and informs the LLM about unavailable ones.
</ParamField>
<ParamField path="trust_remote_completion_status" type="bool" default="False">
When `True`, returns the A2A agent's result directly when it signals completion. When `False`, allows the server agent to review the result and potentially continue the conversation.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -38,6 +38,7 @@ class A2AConfig(BaseModel):
max_turns: Maximum conversation turns with A2A agent (default: 10).
response_model: Optional Pydantic model for structured A2A agent responses.
fail_fast: If True, raise error when agent unreachable; if False, skip and continue (default: True).
trust_remote_completion_status: If True, return A2A agent's result directly when status is "completed"; if False, always ask server agent to respond (default: False).
"""
endpoint: Url = Field(description="A2A agent endpoint URL")
@@ -57,3 +58,7 @@ class A2AConfig(BaseModel):
default=True,
description="If True, raise an error immediately when the A2A agent is unreachable. If False, skip the A2A agent and continue execution.",
)
trust_remote_completion_status: bool = Field(
default=False,
description='If True, return the A2A agent\'s result directly when status is "completed" without asking the server agent to respond. If False, always ask the server agent to respond, allowing it to potentially delegate again.',
)

View File

@@ -52,7 +52,7 @@ def wrap_agent_with_a2a_instance(agent: Agent) -> None:
Args:
agent: The agent instance to wrap
"""
original_execute_task = agent.execute_task.__func__
original_execute_task = agent.execute_task.__func__ # type: ignore[attr-defined]
@wraps(original_execute_task)
def execute_task_with_a2a(
@@ -73,7 +73,7 @@ def wrap_agent_with_a2a_instance(agent: Agent) -> None:
Task execution result
"""
if not self.a2a:
return original_execute_task(self, task, context, tools)
return original_execute_task(self, task, context, tools) # type: ignore[no-any-return]
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a)
@@ -498,6 +498,23 @@ def _delegate_to_a2a(
conversation_history = a2a_result.get("history", [])
if a2a_result["status"] in ["completed", "input_required"]:
if (
a2a_result["status"] == "completed"
and agent_config.trust_remote_completion_status
):
result_text = a2a_result.get("result", "")
final_turn_number = turn_num + 1
crewai_event_bus.emit(
None,
A2AConversationCompletedEvent(
status="completed",
final_result=result_text,
error=None,
total_turns=final_turn_number,
),
)
return result_text # type: ignore[no-any-return]
final_result, next_request = _handle_agent_response_and_continue(
self=self,
a2a_result=a2a_result,

View File

@@ -0,0 +1,147 @@
"""Test trust_remote_completion_status flag in A2A wrapper."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.a2a.config import A2AConfig
try:
from a2a.types import Message, Role
A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_true_returns_directly():
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai.a2a.types import AgentResponseProtocol
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
}
# This should return directly without checking LLM response
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
assert result == "Done by remote"
assert mock_execute.call_count == 1
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_false_continues_conversation():
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=False,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
call_count = 0
def mock_original_fn(self, task, context, tools):
nonlocal call_count
call_count += 1
if call_count == 1:
# Server decides to finish
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
return "unexpected"
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
}
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=mock_original_fn,
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Should call original_fn to get server response
assert call_count >= 1
assert result == "Server final answer"
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_default_trust_remote_completion_status_is_false():
"""Verify that default value of trust_remote_completion_status is False."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
assert a2a_config.trust_remote_completion_status is False