feat: Add A2A (Agent-to-Agent) protocol support for remote interoperability

- Implement CrewAgentExecutor class that wraps CrewAI crews as A2A-compatible agents
- Add server utilities for starting A2A servers with crews
- Include comprehensive test coverage for all A2A functionality
- Add optional dependency group 'a2a' in pyproject.toml
- Expose A2A classes in main CrewAI module with graceful import handling
- Add documentation and examples for A2A integration
- Support bidirectional agent communication via A2A protocol
- Enable crews to participate in remote agent networks

Fixes #2970

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-06-06 23:34:27 +00:00
parent 21d063a46c
commit 78b9c7dbeb
11 changed files with 1133 additions and 2 deletions

1
tests/a2a/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for CrewAI A2A integration."""

View File

@@ -0,0 +1,197 @@
"""Tests for CrewAgentExecutor class."""
import asyncio
import pytest
from unittest.mock import Mock, AsyncMock, patch
from crewai import Agent, Crew, Task
from crewai.crews.crew_output import CrewOutput
try:
from crewai.a2a import CrewAgentExecutor
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import InvalidParamsError, UnsupportedOperationError
from a2a.utils.errors import ServerError
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestCrewAgentExecutor:
"""Test cases for CrewAgentExecutor."""
@pytest.fixture
def sample_crew(self):
"""Create a sample crew for testing."""
from unittest.mock import Mock
mock_crew = Mock()
mock_crew.agents = []
mock_crew.tasks = []
return mock_crew
@pytest.fixture
def crew_executor(self, sample_crew):
"""Create a CrewAgentExecutor for testing."""
return CrewAgentExecutor(sample_crew)
@pytest.fixture
def mock_context(self):
"""Create a mock RequestContext."""
from a2a.types import Message, Part, TextPart
context = Mock(spec=RequestContext)
context.task_id = "test-task-123"
context.context_id = "test-context-456"
context.message = Message(
messageId="msg-123",
taskId="test-task-123",
contextId="test-context-456",
role="user",
parts=[Part(root=TextPart(text="Test message"))]
)
context.get_user_input.return_value = "Test query"
return context
@pytest.fixture
def mock_event_queue(self):
"""Create a mock EventQueue."""
return Mock(spec=EventQueue)
def test_init(self, sample_crew):
"""Test CrewAgentExecutor initialization."""
executor = CrewAgentExecutor(sample_crew)
assert executor.crew == sample_crew
assert executor.supported_content_types == ['text', 'text/plain']
assert executor._running_tasks == {}
def test_init_with_custom_content_types(self, sample_crew):
"""Test CrewAgentExecutor initialization with custom content types."""
custom_types = ['text', 'application/json']
executor = CrewAgentExecutor(sample_crew, supported_content_types=custom_types)
assert executor.supported_content_types == custom_types
@pytest.mark.asyncio
async def test_execute_success(self, crew_executor, mock_context, mock_event_queue):
"""Test successful crew execution."""
mock_output = CrewOutput(raw="Test response", json_dict=None)
with patch.object(crew_executor, '_execute_crew_async', return_value=mock_output):
await crew_executor.execute(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
assert len(crew_executor._running_tasks) == 0
@pytest.mark.asyncio
async def test_execute_with_validation_error(self, crew_executor, mock_event_queue):
"""Test execution with validation error."""
bad_context = Mock(spec=RequestContext)
bad_context.get_user_input.return_value = ""
with pytest.raises(ServerError):
await crew_executor.execute(bad_context, mock_event_queue)
@pytest.mark.asyncio
async def test_execute_with_crew_error(self, crew_executor, mock_context, mock_event_queue):
"""Test execution when crew raises an error."""
with patch.object(crew_executor, '_execute_crew_async', side_effect=Exception("Crew error")):
with pytest.raises(ServerError):
await crew_executor.execute(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
@pytest.mark.asyncio
async def test_cancel_existing_task(self, crew_executor, mock_event_queue):
"""Test cancelling an existing task."""
cancel_context = Mock(spec=RequestContext)
cancel_context.task_id = "test-task-123"
async def dummy_task():
await asyncio.sleep(10)
mock_task = asyncio.create_task(dummy_task())
crew_executor._running_tasks["test-task-123"] = mock_task
result = await crew_executor.cancel(cancel_context, mock_event_queue)
assert result is None
assert "test-task-123" not in crew_executor._running_tasks
assert mock_task.cancelled()
@pytest.mark.asyncio
async def test_cancel_nonexistent_task(self, crew_executor, mock_event_queue):
"""Test cancelling a task that doesn't exist."""
cancel_context = Mock(spec=RequestContext)
cancel_context.task_id = "nonexistent-task"
with pytest.raises(ServerError):
await crew_executor.cancel(cancel_context, mock_event_queue)
def test_convert_output_to_parts_with_raw(self, crew_executor):
"""Test converting crew output with raw content to A2A parts."""
output = Mock()
output.raw = "Test response"
output.json_dict = None
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 1
assert parts[0].root.text == "Test response"
def test_convert_output_to_parts_with_json(self, crew_executor):
"""Test converting crew output with JSON data to A2A parts."""
output = Mock()
output.raw = "Test response"
output.json_dict = {"key": "value"}
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 2
assert parts[0].root.text == "Test response"
assert "Structured Output:" in parts[1].root.text
assert '"key": "value"' in parts[1].root.text
def test_convert_output_to_parts_empty(self, crew_executor):
"""Test converting empty crew output to A2A parts."""
output = ""
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 1
assert parts[0].root.text == "Crew execution completed successfully"
def test_validate_request_valid(self, crew_executor, mock_context):
"""Test request validation with valid input."""
error = crew_executor._validate_request(mock_context)
assert error is None
def test_validate_request_empty_input(self, crew_executor):
"""Test request validation with empty input."""
context = Mock(spec=RequestContext)
context.get_user_input.return_value = ""
error = crew_executor._validate_request(context)
assert error == "Empty or missing user input"
def test_validate_request_whitespace_input(self, crew_executor):
"""Test request validation with whitespace-only input."""
context = Mock(spec=RequestContext)
context.get_user_input.return_value = " \n\t "
error = crew_executor._validate_request(context)
assert error == "Empty or missing user input"
def test_validate_request_exception(self, crew_executor):
"""Test request validation when get_user_input raises exception."""
context = Mock(spec=RequestContext)
context.get_user_input.side_effect = Exception("Input error")
error = crew_executor._validate_request(context)
assert "Failed to extract user input" in error
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
def test_import_error_handling():
"""Test that import errors are handled gracefully when A2A is not available."""
with pytest.raises(ImportError, match="A2A integration requires"):
from crewai.a2a import CrewAgentExecutor

View File

@@ -0,0 +1,121 @@
"""Integration tests for CrewAI A2A functionality."""
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Crew, Task
try:
from crewai.a2a import CrewAgentExecutor, create_a2a_app
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestA2AIntegration:
"""Integration tests for A2A functionality."""
@pytest.fixture
def sample_crew(self):
"""Create a sample crew for integration testing."""
from unittest.mock import Mock
mock_crew = Mock()
mock_crew.agents = []
mock_crew.tasks = []
return mock_crew
def test_end_to_end_integration(self, sample_crew):
"""Test end-to-end A2A integration."""
executor = CrewAgentExecutor(sample_crew)
assert executor.crew == sample_crew
assert isinstance(executor.supported_content_types, list)
with patch('crewai.a2a.server.A2AStarletteApplication') as mock_app_class:
with patch('crewai.a2a.server.DefaultRequestHandler') as mock_handler_class:
with patch('crewai.a2a.server.InMemoryTaskStore') as mock_task_store_class:
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
app = create_a2a_app(executor)
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(executor, mock_task_store)
mock_app_class.assert_called_once()
assert app == mock_built_app
def test_crew_with_multiple_agents(self):
"""Test A2A integration with multi-agent crew."""
from unittest.mock import Mock
crew = Mock()
crew.agents = [Mock(), Mock()]
crew.tasks = [Mock(), Mock()]
executor = CrewAgentExecutor(crew)
assert executor.crew == crew
assert len(executor.crew.agents) == 2
assert len(executor.crew.tasks) == 2
def test_custom_content_types(self, sample_crew):
"""Test A2A integration with custom content types."""
custom_types = ['text', 'application/json', 'image/png']
executor = CrewAgentExecutor(
sample_crew,
supported_content_types=custom_types
)
assert executor.supported_content_types == custom_types
@patch('uvicorn.run')
def test_server_startup_integration(self, mock_uvicorn_run, sample_crew):
"""Test server startup integration."""
from crewai.a2a import start_a2a_server
executor = CrewAgentExecutor(sample_crew)
with patch('crewai.a2a.server.create_a2a_app') as mock_create_app:
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(
executor,
host="127.0.0.1",
port=9999,
transport="starlette"
)
mock_create_app.assert_called_once_with(
executor,
transport="starlette"
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="127.0.0.1",
port=9999
)
def test_optional_import_in_main_module():
"""Test that A2A classes are optionally imported in main module."""
import crewai
if A2A_AVAILABLE:
assert hasattr(crewai, 'CrewAgentExecutor')
assert hasattr(crewai, 'start_a2a_server')
assert hasattr(crewai, 'create_a2a_app')
assert 'CrewAgentExecutor' in crewai.__all__
assert 'start_a2a_server' in crewai.__all__
assert 'create_a2a_app' in crewai.__all__
else:
assert not hasattr(crewai, 'CrewAgentExecutor')
assert not hasattr(crewai, 'start_a2a_server')
assert not hasattr(crewai, 'create_a2a_app')

129
tests/a2a/test_server.py Normal file
View File

@@ -0,0 +1,129 @@
"""Tests for A2A server utilities."""
import pytest
from unittest.mock import Mock, patch
try:
from crewai.a2a import start_a2a_server, create_a2a_app
from a2a.server.agent_execution.agent_executor import AgentExecutor
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestA2AServer:
"""Test cases for A2A server utilities."""
@pytest.fixture
def mock_agent_executor(self):
"""Create a mock AgentExecutor."""
return Mock(spec=AgentExecutor)
@patch('uvicorn.run')
@patch('crewai.a2a.server.create_a2a_app')
def test_start_a2a_server_default(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
"""Test starting A2A server with default parameters."""
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(mock_agent_executor)
mock_create_app.assert_called_once_with(
mock_agent_executor,
transport="starlette"
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="localhost",
port=10001
)
@patch('uvicorn.run')
@patch('crewai.a2a.server.create_a2a_app')
def test_start_a2a_server_custom(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
"""Test starting A2A server with custom parameters."""
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(
mock_agent_executor,
host="0.0.0.0",
port=8080,
transport="fastapi"
)
mock_create_app.assert_called_once_with(
mock_agent_executor,
transport="fastapi"
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="0.0.0.0",
port=8080
)
@patch('crewai.a2a.server.A2AStarletteApplication')
@patch('crewai.a2a.server.DefaultRequestHandler')
@patch('crewai.a2a.server.InMemoryTaskStore')
def test_create_a2a_app_starlette(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
"""Test creating A2A app with Starlette transport."""
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
result = create_a2a_app(mock_agent_executor, transport="starlette")
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
mock_app_class.assert_called_once()
mock_app_instance.build.assert_called_once()
assert result == mock_built_app
def test_create_a2a_app_fastapi(self, mock_agent_executor):
"""Test creating A2A app with FastAPI transport raises error."""
with pytest.raises(ValueError, match="FastAPI transport is not available"):
create_a2a_app(
mock_agent_executor,
transport="fastapi",
agent_name="Custom Agent",
agent_description="Custom description"
)
@patch('crewai.a2a.server.A2AStarletteApplication')
@patch('crewai.a2a.server.DefaultRequestHandler')
@patch('crewai.a2a.server.InMemoryTaskStore')
def test_create_a2a_app_default_transport(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
"""Test creating A2A app with default transport."""
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
result = create_a2a_app(mock_agent_executor)
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
mock_app_class.assert_called_once()
assert result == mock_built_app
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
def test_server_import_error_handling():
"""Test that import errors are handled gracefully when A2A is not available."""
with pytest.raises(ImportError, match="A2A integration requires"):
from crewai.a2a.server import start_a2a_server