refactor: extract a2a into standalone crewai-a2a package

Move crewai.a2a into lib/crewai-a2a as its own workspace package, importable as crewai_a2a. The crewai[a2a] extra now pulls in crewai-a2a, which owns a2a-sdk, httpx-auth, httpx-sse, and aiocache.

crewai.a2a stays importable. Its __init__ is a compat shim that installs a meta-path finder routing crewai.a2a.* to crewai_a2a.*, so existing user code keeps working untouched.

a2a tests and cassettes moved alongside the package under lib/crewai-a2a/tests/. Added that path to the mypy and ruff per-file-ignores lists to match the other test dirs.
This commit is contained in:
Greyson LaLonde
2026-04-14 22:21:31 +08:00
parent 0dba95e166
commit 080c22678a
87 changed files with 415 additions and 321 deletions

View File

@@ -1,155 +0,0 @@
"""Test trust_remote_completion_status flag in A2A wrapper."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.a2a.config import A2AConfig
try:
from a2a.types import Message, Role
A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
"""Create a mock agent card with proper model_dump behavior."""
mock_card = MagicMock()
mock_card.name = name
mock_card.url = url
mock_card.model_dump.return_value = {"name": name, "url": url}
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
return mock_card
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_true_returns_directly():
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai.a2a.types import AgentResponseProtocol
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
}
# This should return directly without checking LLM response
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
assert result == "Done by remote"
assert mock_execute.call_count == 1
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_trust_remote_completion_status_false_continues_conversation():
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=False,
)
agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)
task = Task(description="test", expected_output="test", agent=agent)
class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]
call_count = 0
def mock_original_fn(self, task, context, tools):
nonlocal call_count
call_count += 1
if call_count == 1:
# Server decides to finish
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
return "unexpected"
with (
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = _create_mock_agent_card()
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
# A2A returns completed
mock_execute.return_value = {
"status": "completed",
"result": "Done by remote",
"history": [],
}
result = _delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=mock_original_fn,
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)
# Should call original_fn to get server response
assert call_count >= 1
assert result == "Server final answer"
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_default_trust_remote_completion_status_is_false():
"""Verify that default value of trust_remote_completion_status is False."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
assert a2a_config.trust_remote_completion_status is False

View File

@@ -1,217 +0,0 @@
"""Tests for Agent.kickoff() with A2A delegation using VCR cassettes."""
from __future__ import annotations
import os
import pytest
from crewai import Agent
from crewai.a2a.config import A2AClientConfig
A2A_TEST_ENDPOINT = os.getenv(
"A2A_TEST_ENDPOINT", "http://localhost:9999/.well-known/agent-card.json"
)
class TestAgentA2AKickoff:
"""Tests for Agent.kickoff() with A2A delegation."""
@pytest.fixture
def researcher_agent(self) -> Agent:
"""Create a research agent with A2A configuration."""
return Agent(
role="Research Analyst",
goal="Find and analyze information about AI developments",
backstory="Expert researcher with access to remote specialized agents",
verbose=True,
a2a=[
A2AClientConfig(
endpoint=A2A_TEST_ENDPOINT,
fail_fast=False,
max_turns=3, # Limit turns for testing
)
],
)
@pytest.mark.skip(reason="VCR cassette matching issue with agent card caching")
@pytest.mark.vcr()
def test_agent_kickoff_delegates_to_a2a(self, researcher_agent: Agent) -> None:
"""Test that agent.kickoff() delegates to A2A server."""
result = researcher_agent.kickoff(
"Use the remote A2A agent to find out what the current time is in New York."
)
assert result is not None
assert result.raw is not None
assert isinstance(result.raw, str)
assert len(result.raw) > 0
@pytest.mark.skip(reason="VCR cassette matching issue with agent card caching")
@pytest.mark.vcr()
def test_agent_kickoff_with_calculator_skill(
self, researcher_agent: Agent
) -> None:
"""Test that agent can delegate calculation to A2A server."""
result = researcher_agent.kickoff(
"Ask the remote A2A agent to calculate 25 times 17."
)
assert result is not None
assert result.raw is not None
assert "425" in result.raw or "425.0" in result.raw
@pytest.mark.skip(reason="VCR cassette matching issue with agent card caching")
@pytest.mark.vcr()
def test_agent_kickoff_with_conversation_skill(
self, researcher_agent: Agent
) -> None:
"""Test that agent can have a conversation with A2A server."""
result = researcher_agent.kickoff(
"Delegate to the remote A2A agent to explain quantum computing in simple terms."
)
assert result is not None
assert result.raw is not None
assert isinstance(result.raw, str)
assert len(result.raw) > 50 # Should have a meaningful response
@pytest.mark.vcr()
def test_agent_kickoff_returns_lite_agent_output(
self, researcher_agent: Agent
) -> None:
"""Test that kickoff returns LiteAgentOutput with correct structure."""
from crewai.lite_agent_output import LiteAgentOutput
result = researcher_agent.kickoff(
"Use the A2A agent to tell me what time it is."
)
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None
assert result.agent_role == "Research Analyst"
assert isinstance(result.messages, list)
@pytest.mark.skip(reason="VCR cassette matching issue with agent card caching")
@pytest.mark.vcr()
def test_agent_kickoff_handles_multi_turn_conversation(
self, researcher_agent: Agent
) -> None:
"""Test that agent handles multi-turn A2A conversations."""
# This should trigger multiple turns of conversation
result = researcher_agent.kickoff(
"Ask the remote A2A agent about recent developments in AI agent communication protocols."
)
assert result is not None
assert result.raw is not None
# The response should contain information about A2A or agent protocols
assert isinstance(result.raw, str)
@pytest.mark.vcr()
def test_agent_without_a2a_works_normally(self) -> None:
"""Test that agent without A2A config works normally."""
agent = Agent(
role="Simple Assistant",
goal="Help with basic tasks",
backstory="A helpful assistant",
verbose=False,
)
# This should work without A2A delegation
result = agent.kickoff("Say hello")
assert result is not None
assert result.raw is not None
@pytest.mark.vcr()
def test_agent_kickoff_with_failed_a2a_endpoint(self) -> None:
"""Test that agent handles failed A2A connection gracefully."""
agent = Agent(
role="Research Analyst",
goal="Find information",
backstory="Expert researcher",
verbose=False,
a2a=[
A2AClientConfig(
endpoint="http://nonexistent:9999/.well-known/agent-card.json",
fail_fast=False,
)
],
)
# Should fallback to local LLM when A2A fails
result = agent.kickoff("What is 2 + 2?")
assert result is not None
assert result.raw is not None
@pytest.mark.skip(reason="VCR cassette matching issue with agent card caching")
@pytest.mark.vcr()
def test_agent_kickoff_with_list_messages(
self, researcher_agent: Agent
) -> None:
"""Test that agent.kickoff() works with list of messages."""
messages = [
{
"role": "user",
"content": "Delegate to the A2A agent to find the current time in Tokyo.",
},
]
result = researcher_agent.kickoff(messages)
assert result is not None
assert result.raw is not None
assert isinstance(result.raw, str)
class TestAgentA2AKickoffAsync:
"""Tests for async Agent.kickoff_async() with A2A delegation."""
@pytest.fixture
def researcher_agent(self) -> Agent:
"""Create a research agent with A2A configuration."""
return Agent(
role="Research Analyst",
goal="Find and analyze information",
backstory="Expert researcher with access to remote agents",
verbose=True,
a2a=[
A2AClientConfig(
endpoint=A2A_TEST_ENDPOINT,
fail_fast=False,
max_turns=3,
)
],
)
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_agent_kickoff_async_delegates_to_a2a(
self, researcher_agent: Agent
) -> None:
"""Test that agent.kickoff_async() delegates to A2A server."""
result = await researcher_agent.kickoff_async(
"Use the remote A2A agent to calculate 10 plus 15."
)
assert result is not None
assert result.raw is not None
assert isinstance(result.raw, str)
@pytest.mark.skip(reason="Test assertion needs fixing - not capturing final answer")
@pytest.mark.vcr()
@pytest.mark.asyncio
async def test_agent_kickoff_async_with_calculator(
self, researcher_agent: Agent
) -> None:
"""Test async delegation with calculator skill."""
result = await researcher_agent.kickoff_async(
"Ask the A2A agent to calculate 100 divided by 4."
)
assert result is not None
assert result.raw is not None
assert "25" in result.raw or "25.0" in result.raw

View File

@@ -1,111 +0,0 @@
"""Test A2A wrapper is only applied when a2a is passed to Agent."""
from unittest.mock import patch
import pytest
from crewai import Agent
from crewai.a2a.config import A2AConfig
try:
import a2a # noqa: F401
A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False
def test_agent_without_a2a_has_no_wrapper():
"""Verify that agents without a2a don't get the wrapper applied."""
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
)
assert agent.a2a is None
assert callable(agent.execute_task)
@pytest.mark.skipif(
True,
reason="Requires a2a-sdk to be installed. This test verifies wrapper is applied when a2a is set.",
)
def test_agent_with_a2a_has_wrapper():
"""Verify that agents with a2a get the wrapper applied."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
assert agent.a2a is not None
assert agent.a2a.endpoint == "http://test-endpoint.com"
assert callable(agent.execute_task)
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_agent_with_a2a_creates_successfully():
"""Verify that creating an agent with a2a succeeds and applies wrapper."""
a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
assert agent.a2a is not None
assert agent.a2a.endpoint == "http://test-endpoint.com/"
assert callable(agent.execute_task)
assert hasattr(agent.execute_task, "__wrapped__")
def test_multiple_agents_without_a2a():
"""Verify that multiple agents without a2a work correctly."""
agent1 = Agent(
role="agent 1",
goal="test goal",
backstory="test backstory",
)
agent2 = Agent(
role="agent 2",
goal="test goal",
backstory="test backstory",
)
assert agent1.a2a is None
assert agent2.a2a is None
assert callable(agent1.execute_task)
assert callable(agent2.execute_task)
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_wrapper_is_applied_differently_per_instance():
"""Verify that agents with and without a2a have different execute_task methods."""
agent_without_a2a = Agent(
role="agent without a2a",
goal="test goal",
backstory="test backstory",
)
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
agent_with_a2a = Agent(
role="agent with a2a",
goal="test goal",
backstory="test backstory",
a2a=a2a_config,
)
assert agent_without_a2a.execute_task.__func__ is not agent_with_a2a.execute_task.__func__
assert not hasattr(agent_without_a2a.execute_task, "__wrapped__")
assert hasattr(agent_with_a2a.execute_task, "__wrapped__")