From d7bdac12a295cb6dfc428aadb93f961c4f558d38 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 13 Nov 2025 13:43:09 -0500 Subject: [PATCH] feat: a2a trust remote completion status flag - add trust_remote_completion_status flag to A2AConfig, Adds configuration flag to control whether to trust A2A agent completion status. Resolves #3899 - update docs --- docs/en/learn/a2a-agent-delegation.mdx | 4 + lib/crewai/src/crewai/a2a/config.py | 5 + lib/crewai/src/crewai/a2a/wrapper.py | 21 ++- .../test_a2a_trust_completion_status.py | 147 ++++++++++++++++++ 4 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 lib/crewai/tests/agents/test_a2a_trust_completion_status.py diff --git a/docs/en/learn/a2a-agent-delegation.mdx b/docs/en/learn/a2a-agent-delegation.mdx index 78c28b1e0..ec2832751 100644 --- a/docs/en/learn/a2a-agent-delegation.mdx +++ b/docs/en/learn/a2a-agent-delegation.mdx @@ -83,6 +83,10 @@ The `A2AConfig` class accepts the following parameters: Whether to raise an error immediately if agent connection fails. When `False`, the agent continues with available agents and informs the LLM about unavailable ones. + + When `True`, returns the A2A agent's result directly when it signals completion. When `False`, allows the server agent to review the result and potentially continue the conversation. + + ## Authentication For A2A agents that require authentication, use one of the provided auth schemes: diff --git a/lib/crewai/src/crewai/a2a/config.py b/lib/crewai/src/crewai/a2a/config.py index 0d7470dbf..c53602882 100644 --- a/lib/crewai/src/crewai/a2a/config.py +++ b/lib/crewai/src/crewai/a2a/config.py @@ -38,6 +38,7 @@ class A2AConfig(BaseModel): max_turns: Maximum conversation turns with A2A agent (default: 10). response_model: Optional Pydantic model for structured A2A agent responses. fail_fast: If True, raise error when agent unreachable; if False, skip and continue (default: True). + trust_remote_completion_status: If True, return A2A agent's result directly when status is "completed"; if False, always ask server agent to respond (default: False). """ endpoint: Url = Field(description="A2A agent endpoint URL") @@ -57,3 +58,7 @@ class A2AConfig(BaseModel): default=True, description="If True, raise an error immediately when the A2A agent is unreachable. If False, skip the A2A agent and continue execution.", ) + trust_remote_completion_status: bool = Field( + default=False, + description='If True, return the A2A agent\'s result directly when status is "completed" without asking the server agent to respond. If False, always ask the server agent to respond, allowing it to potentially delegate again.', + ) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 3bbb0f8c7..82216233f 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -52,7 +52,7 @@ def wrap_agent_with_a2a_instance(agent: Agent) -> None: Args: agent: The agent instance to wrap """ - original_execute_task = agent.execute_task.__func__ + original_execute_task = agent.execute_task.__func__ # type: ignore[attr-defined] @wraps(original_execute_task) def execute_task_with_a2a( @@ -73,7 +73,7 @@ def wrap_agent_with_a2a_instance(agent: Agent) -> None: Task execution result """ if not self.a2a: - return original_execute_task(self, task, context, tools) + return original_execute_task(self, task, context, tools) # type: ignore[no-any-return] a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) @@ -498,6 +498,23 @@ def _delegate_to_a2a( conversation_history = a2a_result.get("history", []) if a2a_result["status"] in ["completed", "input_required"]: + if ( + a2a_result["status"] == "completed" + and agent_config.trust_remote_completion_status + ): + result_text = a2a_result.get("result", "") + final_turn_number = turn_num + 1 + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=final_turn_number, + ), + ) + return result_text # type: ignore[no-any-return] + final_result, next_request = _handle_agent_response_and_continue( self=self, a2a_result=a2a_result, diff --git a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py new file mode 100644 index 000000000..7573ecb5d --- /dev/null +++ b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py @@ -0,0 +1,147 @@ +"""Test trust_remote_completion_status flag in A2A wrapper.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.a2a.config import A2AConfig + +try: + from a2a.types import Message, Role + + A2A_SDK_INSTALLED = True +except ImportError: + A2A_SDK_INSTALLED = False + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_trust_remote_completion_status_true_returns_directly(): + """When trust_remote_completion_status=True and A2A returns completed, return result directly.""" + from crewai.a2a.wrapper import _delegate_to_a2a + from crewai.a2a.types import AgentResponseProtocol + from crewai import Agent, Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + trust_remote_completion_status=True, + ) + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_config, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + class MockResponse: + is_a2a = True + message = "Please help" + a2a_ids = ["http://test-endpoint.com/"] + + with ( + patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, + patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + # A2A returns completed + mock_execute.return_value = { + "status": "completed", + "result": "Done by remote", + "history": [], + } + + # This should return directly without checking LLM response + result = _delegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + assert result == "Done by remote" + assert mock_execute.call_count == 1 + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_trust_remote_completion_status_false_continues_conversation(): + """When trust_remote_completion_status=False and A2A returns completed, ask server agent.""" + from crewai.a2a.wrapper import _delegate_to_a2a + from crewai import Agent, Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + trust_remote_completion_status=False, + ) + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_config, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + class MockResponse: + is_a2a = True + message = "Please help" + a2a_ids = ["http://test-endpoint.com/"] + + call_count = 0 + + def mock_original_fn(self, task, context, tools): + nonlocal call_count + call_count += 1 + if call_count == 1: + # Server decides to finish + return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}' + return "unexpected" + + with ( + patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, + patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + # A2A returns completed + mock_execute.return_value = { + "status": "completed", + "result": "Done by remote", + "history": [], + } + + result = _delegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=mock_original_fn, + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + # Should call original_fn to get server response + assert call_count >= 1 + assert result == "Server final answer" + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_default_trust_remote_completion_status_is_false(): + """Verify that default value of trust_remote_completion_status is False.""" + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + ) + + assert a2a_config.trust_remote_completion_status is False \ No newline at end of file