From 78b9c7dbebbf86905cbba2b470e307fae94f5a2c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 6 Jun 2025 23:34:27 +0000 Subject: [PATCH] feat: Add A2A (Agent-to-Agent) protocol support for remote interoperability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement CrewAgentExecutor class that wraps CrewAI crews as A2A-compatible agents - Add server utilities for starting A2A servers with crews - Include comprehensive test coverage for all A2A functionality - Add optional dependency group 'a2a' in pyproject.toml - Expose A2A classes in main CrewAI module with graceful import handling - Add documentation and examples for A2A integration - Support bidirectional agent communication via A2A protocol - Enable crews to participate in remote agent networks Fixes #2970 Co-Authored-By: João --- docs/concepts/a2a-integration.md | 184 +++++++++++++++++++ examples/a2a_integration_example.py | 64 +++++++ pyproject.toml | 4 +- src/crewai/__init__.py | 6 + src/crewai/a2a/__init__.py | 49 ++++++ src/crewai/a2a/crew_agent_executor.py | 244 ++++++++++++++++++++++++++ src/crewai/a2a/server.py | 136 ++++++++++++++ tests/a2a/__init__.py | 1 + tests/a2a/test_crew_agent_executor.py | 197 +++++++++++++++++++++ tests/a2a/test_integration.py | 121 +++++++++++++ tests/a2a/test_server.py | 129 ++++++++++++++ 11 files changed, 1133 insertions(+), 2 deletions(-) create mode 100644 docs/concepts/a2a-integration.md create mode 100644 examples/a2a_integration_example.py create mode 100644 src/crewai/a2a/__init__.py create mode 100644 src/crewai/a2a/crew_agent_executor.py create mode 100644 src/crewai/a2a/server.py create mode 100644 tests/a2a/__init__.py create mode 100644 tests/a2a/test_crew_agent_executor.py create mode 100644 tests/a2a/test_integration.py create mode 100644 tests/a2a/test_server.py diff --git a/docs/concepts/a2a-integration.md b/docs/concepts/a2a-integration.md new file mode 100644 index 000000000..b4428b01b --- /dev/null +++ b/docs/concepts/a2a-integration.md @@ -0,0 +1,184 @@ +# A2A Protocol Integration + +CrewAI supports the A2A (Agent-to-Agent) protocol, enabling your crews to participate in remote agent interoperability. This allows CrewAI crews to be exposed as remotely accessible agents that can communicate with other A2A-compatible systems. + +## Overview + +The A2A protocol is Google's standard for agent interoperability that enables bidirectional communication between agents. CrewAI's A2A integration provides: + +- **Remote Interoperability**: Expose crews as A2A-compatible agents +- **Bidirectional Communication**: Enable full-duplex agent interactions +- **Protocol Compliance**: Full support for A2A specifications +- **Transport Flexibility**: Support for multiple transport protocols + +## Installation + +A2A support is available as an optional dependency: + +```bash +pip install crewai[a2a] +``` + +## Basic Usage + +### Creating an A2A Server + +```python +from crewai import Agent, Crew, Task +from crewai.a2a import CrewAgentExecutor, start_a2a_server + +# Create your crew +agent = Agent( + role="Assistant", + goal="Help users with their queries", + backstory="A helpful AI assistant" +) + +task = Task( + description="Help with: {query}", + agent=agent +) + +crew = Crew(agents=[agent], tasks=[task]) + +# Create A2A executor +executor = CrewAgentExecutor(crew) + +# Start A2A server +start_a2a_server(executor, host="0.0.0.0", port=10001) +``` + +### Custom Configuration + +```python +from crewai.a2a import CrewAgentExecutor, create_a2a_app + +# Create executor with custom content types +executor = CrewAgentExecutor( + crew=crew, + supported_content_types=['text', 'application/json', 'image/png'] +) + +# Create custom A2A app +app = create_a2a_app( + executor, + agent_name="My Research Crew", + agent_description="A specialized research and analysis crew", + transport="starlette" +) + +# Run with custom ASGI server +import uvicorn +uvicorn.run(app, host="0.0.0.0", port=8080) +``` + +## Key Features + +### CrewAgentExecutor + +The `CrewAgentExecutor` class wraps CrewAI crews to implement the A2A `AgentExecutor` interface: + +- **Asynchronous Execution**: Crews run asynchronously within the A2A protocol +- **Task Management**: Automatic handling of task lifecycle and cancellation +- **Error Handling**: Robust error handling with A2A-compliant responses +- **Output Conversion**: Automatic conversion of crew outputs to A2A artifacts + +### Server Utilities + +Convenience functions for starting A2A servers: + +- `start_a2a_server()`: Quick server startup with default configuration +- `create_a2a_app()`: Create custom A2A applications for advanced use cases + +## Protocol Compliance + +CrewAI's A2A integration provides full protocol compliance: + +- **Agent Cards**: Automatic generation of agent capability descriptions +- **Task Execution**: Asynchronous task processing with event queues +- **Artifact Management**: Conversion of crew outputs to A2A artifacts +- **Error Handling**: A2A-compliant error responses and status codes + +## Use Cases + +### Remote Agent Networks + +Expose CrewAI crews as part of larger agent networks: + +```python +# Multi-agent system with specialized crews +research_crew = create_research_crew() +analysis_crew = create_analysis_crew() +writing_crew = create_writing_crew() + +# Expose each as A2A agents on different ports +start_a2a_server(CrewAgentExecutor(research_crew), port=10001) +start_a2a_server(CrewAgentExecutor(analysis_crew), port=10002) +start_a2a_server(CrewAgentExecutor(writing_crew), port=10003) +``` + +### Cross-Platform Integration + +Enable CrewAI crews to work with other agent frameworks: + +```python +# CrewAI crew accessible to other A2A-compatible systems +executor = CrewAgentExecutor(crew) +start_a2a_server(executor, host="0.0.0.0", port=10001) + +# Other systems can now invoke this crew remotely +``` + +## Advanced Configuration + +### Custom Agent Cards + +```python +from a2a.types import AgentCard, AgentCapabilities, AgentSkill + +# Custom agent card for specialized capabilities +agent_card = AgentCard( + name="Specialized Research Crew", + description="Advanced research and analysis capabilities", + version="2.0.0", + capabilities=AgentCapabilities( + streaming=True, + pushNotifications=False + ), + skills=[ + AgentSkill( + id="research", + name="Research Analysis", + description="Comprehensive research and analysis", + tags=["research", "analysis", "data"] + ) + ] +) +``` + +### Error Handling + +The A2A integration includes comprehensive error handling: + +- **Validation Errors**: Input validation with clear error messages +- **Execution Errors**: Crew execution errors converted to A2A artifacts +- **Cancellation**: Proper task cancellation support +- **Timeouts**: Configurable timeout handling + +## Best Practices + +1. **Resource Management**: Monitor crew resource usage in server environments +2. **Error Handling**: Implement proper error handling in crew tasks +3. **Security**: Use appropriate authentication and authorization +4. **Monitoring**: Monitor A2A server performance and health +5. **Scaling**: Consider load balancing for high-traffic scenarios + +## Limitations + +- **Optional Dependency**: A2A support requires additional dependencies +- **Transport Support**: Currently supports Starlette transport only +- **Synchronous Crews**: Crews execute synchronously within async A2A context + +## Examples + +See the `examples/a2a_integration_example.py` file for a complete working example of A2A integration with CrewAI. diff --git a/examples/a2a_integration_example.py b/examples/a2a_integration_example.py new file mode 100644 index 000000000..44d2b2c19 --- /dev/null +++ b/examples/a2a_integration_example.py @@ -0,0 +1,64 @@ +"""Example: CrewAI A2A Integration + +This example demonstrates how to expose a CrewAI crew as an A2A (Agent-to-Agent) +protocol server for remote interoperability. + +Requirements: + pip install crewai[a2a] +""" + +from crewai import Agent, Crew, Task +from crewai.a2a import CrewAgentExecutor, start_a2a_server + + +def main(): + """Create and start an A2A server with a CrewAI crew.""" + + researcher = Agent( + role="Research Analyst", + goal="Provide comprehensive research and analysis on any topic", + backstory=( + "You are an experienced research analyst with expertise in " + "gathering, analyzing, and synthesizing information from various sources." + ), + verbose=True + ) + + research_task = Task( + description=( + "Research and analyze the topic: {query}\n" + "Provide a comprehensive overview including:\n" + "- Key concepts and definitions\n" + "- Current trends and developments\n" + "- Important considerations\n" + "- Relevant examples or case studies" + ), + agent=researcher, + expected_output="A detailed research report with analysis and insights" + ) + + research_crew = Crew( + agents=[researcher], + tasks=[research_task], + verbose=True + ) + + executor = CrewAgentExecutor( + crew=research_crew, + supported_content_types=['text', 'text/plain', 'application/json'] + ) + + print("Starting A2A server with CrewAI research crew...") + print("Server will be available at http://localhost:10001") + print("Use the A2A CLI or SDK to interact with the crew remotely") + + start_a2a_server( + executor, + host="0.0.0.0", + port=10001, + transport="starlette" + ) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 1c4e751c1..08ecf556b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,8 +65,8 @@ mem0 = ["mem0ai>=0.1.94"] docling = [ "docling>=2.12.0", ] -aisuite = [ - "aisuite>=0.1.10", +a2a = [ + "a2a-sdk>=0.0.1", ] [tool.uv] diff --git a/src/crewai/__init__.py b/src/crewai/__init__.py index 1d5279288..bd3792832 100644 --- a/src/crewai/__init__.py +++ b/src/crewai/__init__.py @@ -32,3 +32,9 @@ __all__ = [ "TaskOutput", "LLMGuardrail", ] + +try: + from crewai.a2a import CrewAgentExecutor, start_a2a_server, create_a2a_app + __all__.extend(["CrewAgentExecutor", "start_a2a_server", "create_a2a_app"]) +except ImportError: + pass diff --git a/src/crewai/a2a/__init__.py b/src/crewai/a2a/__init__.py new file mode 100644 index 000000000..30e78deef --- /dev/null +++ b/src/crewai/a2a/__init__.py @@ -0,0 +1,49 @@ +"""A2A (Agent-to-Agent) protocol integration for CrewAI. + +This module provides integration with the A2A protocol to enable remote agent +interoperability. It allows CrewAI crews to be exposed as A2A-compatible agents +that can communicate with other agents following the A2A protocol standard. + +The integration is optional and requires the 'a2a' extra dependency: + pip install crewai[a2a] + +Example: + from crewai import Agent, Crew, Task + from crewai.a2a import CrewAgentExecutor, start_a2a_server + + agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI") + task = Task(description="Help with {query}", agent=agent) + crew = Crew(agents=[agent], tasks=[task]) + + executor = CrewAgentExecutor(crew) + start_a2a_server(executor, host="localhost", port=8080) +""" + +try: + from .crew_agent_executor import CrewAgentExecutor + from .server import start_a2a_server, create_a2a_app + + __all__ = [ + "CrewAgentExecutor", + "start_a2a_server", + "create_a2a_app" + ] +except ImportError as e: + import warnings + warnings.warn( + "A2A integration requires the 'a2a' extra dependency. " + "Install with: pip install crewai[a2a]", + ImportWarning + ) + + def _missing_dependency(*args, **kwargs): + raise ImportError( + "A2A integration requires the 'a2a' extra dependency. " + "Install with: pip install crewai[a2a]" + ) + + CrewAgentExecutor = _missing_dependency + start_a2a_server = _missing_dependency + create_a2a_app = _missing_dependency + + __all__ = [] diff --git a/src/crewai/a2a/crew_agent_executor.py b/src/crewai/a2a/crew_agent_executor.py new file mode 100644 index 000000000..2a9709057 --- /dev/null +++ b/src/crewai/a2a/crew_agent_executor.py @@ -0,0 +1,244 @@ +"""CrewAI Agent Executor for A2A Protocol Integration. + +This module implements the A2A AgentExecutor interface to enable CrewAI crews +to participate in the Agent-to-Agent protocol for remote interoperability. +""" + +import asyncio +import logging +from typing import Any, Dict, Optional + +from crewai import Crew +from crewai.crew import CrewOutput + +try: + from a2a.server.agent_execution.agent_executor import AgentExecutor + from a2a.server.agent_execution.context import RequestContext + from a2a.server.events.event_queue import EventQueue + from a2a.types import ( + FilePart, + FileWithBytes, + InvalidParamsError, + Part, + Task, + TextPart, + UnsupportedOperationError, + ) + from a2a.utils import completed_task, new_artifact + from a2a.utils.errors import ServerError +except ImportError: + raise ImportError( + "A2A integration requires the 'a2a' extra dependency. " + "Install with: pip install crewai[a2a]" + ) + +logger = logging.getLogger(__name__) + + +class CrewAgentExecutor(AgentExecutor): + """A2A Agent Executor that wraps CrewAI crews for remote interoperability. + + This class implements the A2A AgentExecutor interface to enable CrewAI crews + to be exposed as remotely interoperable agents following the A2A protocol. + + Args: + crew: The CrewAI crew to expose as an A2A agent + supported_content_types: List of supported content types for input + + Example: + from crewai import Agent, Crew, Task + from crewai.a2a import CrewAgentExecutor + + agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI") + task = Task(description="Help with {query}", agent=agent) + crew = Crew(agents=[agent], tasks=[task]) + + executor = CrewAgentExecutor(crew) + """ + + def __init__( + self, + crew: Crew, + supported_content_types: Optional[list[str]] = None + ): + """Initialize the CrewAgentExecutor. + + Args: + crew: The CrewAI crew to wrap + supported_content_types: List of supported content types + """ + self.crew = crew + self.supported_content_types = supported_content_types or [ + 'text', 'text/plain' + ] + self._running_tasks: Dict[str, asyncio.Task] = {} + + async def execute( + self, + context: RequestContext, + event_queue: EventQueue, + ) -> None: + """Execute the crew with the given context and publish results to event queue. + + This method extracts the user input from the request context, executes + the CrewAI crew, and publishes the results as A2A artifacts. + + Args: + context: The A2A request context containing task details + event_queue: Queue for publishing execution events and results + + Raises: + ServerError: If validation fails or execution encounters an error + """ + error = self._validate_request(context) + if error: + logger.error(f"Request validation failed: {error}") + raise ServerError(error=InvalidParamsError()) + + query = context.get_user_input() + task_id = context.task_id + context_id = context.context_id + + logger.info(f"Executing crew for task {task_id} with query: {query}") + + try: + inputs = {"query": query} + + execution_task = asyncio.create_task( + self._execute_crew_async(inputs) + ) + self._running_tasks[task_id] = execution_task + + result = await execution_task + + self._running_tasks.pop(task_id, None) + + logger.info(f"Crew execution completed for task {task_id}") + + parts = self._convert_output_to_parts(result) + + event_queue.enqueue_event( + completed_task( + task_id, + context_id, + [new_artifact(parts, f"crew_output_{task_id}")], + [context.message], + ) + ) + + except asyncio.CancelledError: + logger.info(f"Task {task_id} was cancelled") + self._running_tasks.pop(task_id, None) + raise + except Exception as e: + logger.error(f"Error executing crew for task {task_id}: {e}") + self._running_tasks.pop(task_id, None) + + error_parts = [ + Part(root=TextPart(text=f"Error executing crew: {str(e)}")) + ] + + event_queue.enqueue_event( + completed_task( + task_id, + context_id, + [new_artifact(error_parts, f"error_{task_id}")], + [context.message], + ) + ) + + raise ServerError( + error=ValueError(f"Error executing crew: {e}") + ) from e + + async def cancel( + self, + request: RequestContext, + event_queue: EventQueue + ) -> Task | None: + """Cancel a running crew execution. + + Args: + request: The A2A request context for the task to cancel + event_queue: Event queue for publishing cancellation events + + Returns: + None (cancellation is handled internally) + + Raises: + ServerError: If the task cannot be cancelled + """ + task_id = request.task_id + + if task_id in self._running_tasks: + execution_task = self._running_tasks[task_id] + execution_task.cancel() + + try: + await execution_task + except asyncio.CancelledError: + logger.info(f"Successfully cancelled task {task_id}") + pass + + self._running_tasks.pop(task_id, None) + return None + else: + logger.warning(f"Task {task_id} not found for cancellation") + raise ServerError(error=UnsupportedOperationError()) + + async def _execute_crew_async(self, inputs: Dict[str, Any]) -> CrewOutput: + """Execute the crew asynchronously. + + Args: + inputs: Input parameters for the crew + + Returns: + The crew execution output + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.crew.kickoff, inputs) + + def _convert_output_to_parts(self, result: CrewOutput) -> list[Part]: + """Convert CrewAI output to A2A Parts. + + Args: + result: The crew execution result + + Returns: + List of A2A Parts representing the output + """ + parts = [] + + if hasattr(result, 'raw') and result.raw: + parts.append(Part(root=TextPart(text=str(result.raw)))) + elif result: + parts.append(Part(root=TextPart(text=str(result)))) + + if hasattr(result, 'json_dict') and result.json_dict: + import json + json_output = json.dumps(result.json_dict, indent=2) + parts.append(Part(root=TextPart(text=f"Structured Output:\n{json_output}"))) + + if not parts: + parts.append(Part(root=TextPart(text="Crew execution completed successfully"))) + + return parts + + def _validate_request(self, context: RequestContext) -> Optional[str]: + """Validate the incoming request context. + + Args: + context: The A2A request context to validate + + Returns: + Error message if validation fails, None if valid + """ + try: + user_input = context.get_user_input() + if not user_input or not user_input.strip(): + return "Empty or missing user input" + + return None + + except Exception as e: + return f"Failed to extract user input: {e}" diff --git a/src/crewai/a2a/server.py b/src/crewai/a2a/server.py new file mode 100644 index 000000000..0d5bcd85d --- /dev/null +++ b/src/crewai/a2a/server.py @@ -0,0 +1,136 @@ +"""A2A Server utilities for CrewAI integration. + +This module provides convenience functions for starting A2A servers with CrewAI +crews, supporting multiple transport protocols and configurations. +""" + +import logging +from typing import Optional + +try: + from a2a.server.agent_execution.agent_executor import AgentExecutor + from a2a.server.apps import A2AStarletteApplication + from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler + from a2a.server.tasks import InMemoryTaskStore + from a2a.types import AgentCard, AgentCapabilities, AgentSkill +except ImportError: + raise ImportError( + "A2A integration requires the 'a2a' extra dependency. " + "Install with: pip install crewai[a2a]" + ) + +logger = logging.getLogger(__name__) + + +def start_a2a_server( + agent_executor: AgentExecutor, + host: str = "localhost", + port: int = 10001, + transport: str = "starlette", + **kwargs +) -> None: + """Start an A2A server with the given agent executor. + + This is a convenience function that creates and starts an A2A server + with the specified configuration. + + Args: + agent_executor: The A2A agent executor to serve + host: Host address to bind the server to + port: Port number to bind the server to + transport: Transport protocol to use ("starlette" or "fastapi") + **kwargs: Additional arguments passed to the server + + Example: + from crewai import Agent, Crew, Task + from crewai.a2a import CrewAgentExecutor, start_a2a_server + + agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI") + task = Task(description="Help with {query}", agent=agent) + crew = Crew(agents=[agent], tasks=[task]) + + executor = CrewAgentExecutor(crew) + start_a2a_server(executor, host="0.0.0.0", port=8080) + """ + app = create_a2a_app(agent_executor, transport=transport, **kwargs) + + logger.info(f"Starting A2A server on {host}:{port} using {transport} transport") + + try: + import uvicorn + uvicorn.run(app, host=host, port=port) + except ImportError: + raise ImportError("uvicorn is required to run the A2A server. Install with: pip install uvicorn") + + +def create_a2a_app( + agent_executor: AgentExecutor, + transport: str = "starlette", + agent_name: Optional[str] = None, + agent_description: Optional[str] = None, + **kwargs +): + """Create an A2A application with the given agent executor. + + This function creates an A2A server application that can be run + with any ASGI server. + + Args: + agent_executor: The A2A agent executor to serve + transport: Transport protocol to use ("starlette" or "fastapi") + agent_name: Optional name for the agent + agent_description: Optional description for the agent + **kwargs: Additional arguments passed to the transport + + Returns: + ASGI application ready to be served + + Example: + from crewai.a2a import CrewAgentExecutor, create_a2a_app + + executor = CrewAgentExecutor(crew) + app = create_a2a_app( + executor, + agent_name="My Crew Agent", + agent_description="A helpful CrewAI agent" + ) + + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8080) + """ + agent_card = AgentCard( + name=agent_name or "CrewAI Agent", + description=agent_description or "A CrewAI agent exposed via A2A protocol", + version="1.0.0", + supportedContentTypes=getattr(agent_executor, 'supported_content_types', ['text', 'text/plain']), + capabilities=AgentCapabilities( + streaming=True, + pushNotifications=False + ), + defaultInputModes=["text"], + defaultOutputModes=["text"], + skills=[ + AgentSkill( + id="crew_execution", + name="Crew Execution", + description="Execute CrewAI crew tasks with multiple agents", + examples=["Process user queries", "Coordinate multi-agent workflows"], + tags=["crewai", "multi-agent", "workflow"] + ) + ], + url="https://github.com/crewAIInc/crewAI" + ) + + task_store = InMemoryTaskStore() + request_handler = DefaultRequestHandler(agent_executor, task_store) + + if transport.lower() == "fastapi": + raise ValueError("FastAPI transport is not available in the current A2A SDK version") + else: + app_instance = A2AStarletteApplication( + agent_card=agent_card, + http_handler=request_handler, + **kwargs + ) + + return app_instance.build() diff --git a/tests/a2a/__init__.py b/tests/a2a/__init__.py new file mode 100644 index 000000000..6b4eff286 --- /dev/null +++ b/tests/a2a/__init__.py @@ -0,0 +1 @@ +"""Tests for CrewAI A2A integration.""" diff --git a/tests/a2a/test_crew_agent_executor.py b/tests/a2a/test_crew_agent_executor.py new file mode 100644 index 000000000..8d5895652 --- /dev/null +++ b/tests/a2a/test_crew_agent_executor.py @@ -0,0 +1,197 @@ +"""Tests for CrewAgentExecutor class.""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, patch + +from crewai import Agent, Crew, Task +from crewai.crews.crew_output import CrewOutput + +try: + from crewai.a2a import CrewAgentExecutor + from a2a.server.agent_execution import RequestContext + from a2a.server.events import EventQueue + from a2a.types import InvalidParamsError, UnsupportedOperationError + from a2a.utils.errors import ServerError + A2A_AVAILABLE = True +except ImportError: + A2A_AVAILABLE = False + + +@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available") +class TestCrewAgentExecutor: + """Test cases for CrewAgentExecutor.""" + + @pytest.fixture + def sample_crew(self): + """Create a sample crew for testing.""" + from unittest.mock import Mock + mock_crew = Mock() + mock_crew.agents = [] + mock_crew.tasks = [] + return mock_crew + + @pytest.fixture + def crew_executor(self, sample_crew): + """Create a CrewAgentExecutor for testing.""" + return CrewAgentExecutor(sample_crew) + + @pytest.fixture + def mock_context(self): + """Create a mock RequestContext.""" + from a2a.types import Message, Part, TextPart + context = Mock(spec=RequestContext) + context.task_id = "test-task-123" + context.context_id = "test-context-456" + context.message = Message( + messageId="msg-123", + taskId="test-task-123", + contextId="test-context-456", + role="user", + parts=[Part(root=TextPart(text="Test message"))] + ) + context.get_user_input.return_value = "Test query" + return context + + @pytest.fixture + def mock_event_queue(self): + """Create a mock EventQueue.""" + return Mock(spec=EventQueue) + + def test_init(self, sample_crew): + """Test CrewAgentExecutor initialization.""" + executor = CrewAgentExecutor(sample_crew) + + assert executor.crew == sample_crew + assert executor.supported_content_types == ['text', 'text/plain'] + assert executor._running_tasks == {} + + def test_init_with_custom_content_types(self, sample_crew): + """Test CrewAgentExecutor initialization with custom content types.""" + custom_types = ['text', 'application/json'] + executor = CrewAgentExecutor(sample_crew, supported_content_types=custom_types) + + assert executor.supported_content_types == custom_types + + @pytest.mark.asyncio + async def test_execute_success(self, crew_executor, mock_context, mock_event_queue): + """Test successful crew execution.""" + mock_output = CrewOutput(raw="Test response", json_dict=None) + + with patch.object(crew_executor, '_execute_crew_async', return_value=mock_output): + await crew_executor.execute(mock_context, mock_event_queue) + + mock_event_queue.enqueue_event.assert_called_once() + + assert len(crew_executor._running_tasks) == 0 + + @pytest.mark.asyncio + async def test_execute_with_validation_error(self, crew_executor, mock_event_queue): + """Test execution with validation error.""" + bad_context = Mock(spec=RequestContext) + bad_context.get_user_input.return_value = "" + + with pytest.raises(ServerError): + await crew_executor.execute(bad_context, mock_event_queue) + + @pytest.mark.asyncio + async def test_execute_with_crew_error(self, crew_executor, mock_context, mock_event_queue): + """Test execution when crew raises an error.""" + with patch.object(crew_executor, '_execute_crew_async', side_effect=Exception("Crew error")): + with pytest.raises(ServerError): + await crew_executor.execute(mock_context, mock_event_queue) + + mock_event_queue.enqueue_event.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_existing_task(self, crew_executor, mock_event_queue): + """Test cancelling an existing task.""" + cancel_context = Mock(spec=RequestContext) + cancel_context.task_id = "test-task-123" + + async def dummy_task(): + await asyncio.sleep(10) + + mock_task = asyncio.create_task(dummy_task()) + crew_executor._running_tasks["test-task-123"] = mock_task + + result = await crew_executor.cancel(cancel_context, mock_event_queue) + + assert result is None + assert "test-task-123" not in crew_executor._running_tasks + assert mock_task.cancelled() + + @pytest.mark.asyncio + async def test_cancel_nonexistent_task(self, crew_executor, mock_event_queue): + """Test cancelling a task that doesn't exist.""" + cancel_context = Mock(spec=RequestContext) + cancel_context.task_id = "nonexistent-task" + + with pytest.raises(ServerError): + await crew_executor.cancel(cancel_context, mock_event_queue) + + def test_convert_output_to_parts_with_raw(self, crew_executor): + """Test converting crew output with raw content to A2A parts.""" + output = Mock() + output.raw = "Test response" + output.json_dict = None + parts = crew_executor._convert_output_to_parts(output) + + assert len(parts) == 1 + assert parts[0].root.text == "Test response" + + def test_convert_output_to_parts_with_json(self, crew_executor): + """Test converting crew output with JSON data to A2A parts.""" + output = Mock() + output.raw = "Test response" + output.json_dict = {"key": "value"} + parts = crew_executor._convert_output_to_parts(output) + + assert len(parts) == 2 + assert parts[0].root.text == "Test response" + assert "Structured Output:" in parts[1].root.text + assert '"key": "value"' in parts[1].root.text + + def test_convert_output_to_parts_empty(self, crew_executor): + """Test converting empty crew output to A2A parts.""" + output = "" + parts = crew_executor._convert_output_to_parts(output) + + assert len(parts) == 1 + assert parts[0].root.text == "Crew execution completed successfully" + + def test_validate_request_valid(self, crew_executor, mock_context): + """Test request validation with valid input.""" + error = crew_executor._validate_request(mock_context) + assert error is None + + def test_validate_request_empty_input(self, crew_executor): + """Test request validation with empty input.""" + context = Mock(spec=RequestContext) + context.get_user_input.return_value = "" + + error = crew_executor._validate_request(context) + assert error == "Empty or missing user input" + + def test_validate_request_whitespace_input(self, crew_executor): + """Test request validation with whitespace-only input.""" + context = Mock(spec=RequestContext) + context.get_user_input.return_value = " \n\t " + + error = crew_executor._validate_request(context) + assert error == "Empty or missing user input" + + def test_validate_request_exception(self, crew_executor): + """Test request validation when get_user_input raises exception.""" + context = Mock(spec=RequestContext) + context.get_user_input.side_effect = Exception("Input error") + + error = crew_executor._validate_request(context) + assert "Failed to extract user input" in error + + +@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling") +def test_import_error_handling(): + """Test that import errors are handled gracefully when A2A is not available.""" + with pytest.raises(ImportError, match="A2A integration requires"): + from crewai.a2a import CrewAgentExecutor diff --git a/tests/a2a/test_integration.py b/tests/a2a/test_integration.py new file mode 100644 index 000000000..3b2e1b418 --- /dev/null +++ b/tests/a2a/test_integration.py @@ -0,0 +1,121 @@ +"""Integration tests for CrewAI A2A functionality.""" + +import pytest +from unittest.mock import Mock, patch + +from crewai import Agent, Crew, Task + +try: + from crewai.a2a import CrewAgentExecutor, create_a2a_app + A2A_AVAILABLE = True +except ImportError: + A2A_AVAILABLE = False + + +@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available") +class TestA2AIntegration: + """Integration tests for A2A functionality.""" + + @pytest.fixture + def sample_crew(self): + """Create a sample crew for integration testing.""" + from unittest.mock import Mock + mock_crew = Mock() + mock_crew.agents = [] + mock_crew.tasks = [] + return mock_crew + + def test_end_to_end_integration(self, sample_crew): + """Test end-to-end A2A integration.""" + executor = CrewAgentExecutor(sample_crew) + + assert executor.crew == sample_crew + assert isinstance(executor.supported_content_types, list) + + with patch('crewai.a2a.server.A2AStarletteApplication') as mock_app_class: + with patch('crewai.a2a.server.DefaultRequestHandler') as mock_handler_class: + with patch('crewai.a2a.server.InMemoryTaskStore') as mock_task_store_class: + mock_handler = Mock() + mock_app_instance = Mock() + mock_built_app = Mock() + mock_task_store = Mock() + + mock_task_store_class.return_value = mock_task_store + mock_handler_class.return_value = mock_handler + mock_app_class.return_value = mock_app_instance + mock_app_instance.build.return_value = mock_built_app + + app = create_a2a_app(executor) + + mock_task_store_class.assert_called_once() + mock_handler_class.assert_called_once_with(executor, mock_task_store) + mock_app_class.assert_called_once() + assert app == mock_built_app + + def test_crew_with_multiple_agents(self): + """Test A2A integration with multi-agent crew.""" + from unittest.mock import Mock + crew = Mock() + crew.agents = [Mock(), Mock()] + crew.tasks = [Mock(), Mock()] + + executor = CrewAgentExecutor(crew) + assert executor.crew == crew + assert len(executor.crew.agents) == 2 + assert len(executor.crew.tasks) == 2 + + def test_custom_content_types(self, sample_crew): + """Test A2A integration with custom content types.""" + custom_types = ['text', 'application/json', 'image/png'] + executor = CrewAgentExecutor( + sample_crew, + supported_content_types=custom_types + ) + + assert executor.supported_content_types == custom_types + + @patch('uvicorn.run') + def test_server_startup_integration(self, mock_uvicorn_run, sample_crew): + """Test server startup integration.""" + from crewai.a2a import start_a2a_server + + executor = CrewAgentExecutor(sample_crew) + + with patch('crewai.a2a.server.create_a2a_app') as mock_create_app: + mock_app = Mock() + mock_create_app.return_value = mock_app + + start_a2a_server( + executor, + host="127.0.0.1", + port=9999, + transport="starlette" + ) + + mock_create_app.assert_called_once_with( + executor, + transport="starlette" + ) + mock_uvicorn_run.assert_called_once_with( + mock_app, + host="127.0.0.1", + port=9999 + ) + + +def test_optional_import_in_main_module(): + """Test that A2A classes are optionally imported in main module.""" + import crewai + + if A2A_AVAILABLE: + assert hasattr(crewai, 'CrewAgentExecutor') + assert hasattr(crewai, 'start_a2a_server') + assert hasattr(crewai, 'create_a2a_app') + + assert 'CrewAgentExecutor' in crewai.__all__ + assert 'start_a2a_server' in crewai.__all__ + assert 'create_a2a_app' in crewai.__all__ + else: + assert not hasattr(crewai, 'CrewAgentExecutor') + assert not hasattr(crewai, 'start_a2a_server') + assert not hasattr(crewai, 'create_a2a_app') diff --git a/tests/a2a/test_server.py b/tests/a2a/test_server.py new file mode 100644 index 000000000..59aa0d80d --- /dev/null +++ b/tests/a2a/test_server.py @@ -0,0 +1,129 @@ +"""Tests for A2A server utilities.""" + +import pytest +from unittest.mock import Mock, patch + +try: + from crewai.a2a import start_a2a_server, create_a2a_app + from a2a.server.agent_execution.agent_executor import AgentExecutor + A2A_AVAILABLE = True +except ImportError: + A2A_AVAILABLE = False + + +@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available") +class TestA2AServer: + """Test cases for A2A server utilities.""" + + @pytest.fixture + def mock_agent_executor(self): + """Create a mock AgentExecutor.""" + return Mock(spec=AgentExecutor) + + @patch('uvicorn.run') + @patch('crewai.a2a.server.create_a2a_app') + def test_start_a2a_server_default(self, mock_create_app, mock_uvicorn_run, mock_agent_executor): + """Test starting A2A server with default parameters.""" + mock_app = Mock() + mock_create_app.return_value = mock_app + + start_a2a_server(mock_agent_executor) + + mock_create_app.assert_called_once_with( + mock_agent_executor, + transport="starlette" + ) + + mock_uvicorn_run.assert_called_once_with( + mock_app, + host="localhost", + port=10001 + ) + + @patch('uvicorn.run') + @patch('crewai.a2a.server.create_a2a_app') + def test_start_a2a_server_custom(self, mock_create_app, mock_uvicorn_run, mock_agent_executor): + """Test starting A2A server with custom parameters.""" + mock_app = Mock() + mock_create_app.return_value = mock_app + + start_a2a_server( + mock_agent_executor, + host="0.0.0.0", + port=8080, + transport="fastapi" + ) + + mock_create_app.assert_called_once_with( + mock_agent_executor, + transport="fastapi" + ) + + mock_uvicorn_run.assert_called_once_with( + mock_app, + host="0.0.0.0", + port=8080 + ) + + @patch('crewai.a2a.server.A2AStarletteApplication') + @patch('crewai.a2a.server.DefaultRequestHandler') + @patch('crewai.a2a.server.InMemoryTaskStore') + def test_create_a2a_app_starlette(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor): + """Test creating A2A app with Starlette transport.""" + mock_handler = Mock() + mock_app_instance = Mock() + mock_built_app = Mock() + mock_task_store = Mock() + + mock_task_store_class.return_value = mock_task_store + mock_handler_class.return_value = mock_handler + mock_app_class.return_value = mock_app_instance + mock_app_instance.build.return_value = mock_built_app + + result = create_a2a_app(mock_agent_executor, transport="starlette") + + mock_task_store_class.assert_called_once() + mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store) + mock_app_class.assert_called_once() + mock_app_instance.build.assert_called_once() + + assert result == mock_built_app + + def test_create_a2a_app_fastapi(self, mock_agent_executor): + """Test creating A2A app with FastAPI transport raises error.""" + with pytest.raises(ValueError, match="FastAPI transport is not available"): + create_a2a_app( + mock_agent_executor, + transport="fastapi", + agent_name="Custom Agent", + agent_description="Custom description" + ) + + @patch('crewai.a2a.server.A2AStarletteApplication') + @patch('crewai.a2a.server.DefaultRequestHandler') + @patch('crewai.a2a.server.InMemoryTaskStore') + def test_create_a2a_app_default_transport(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor): + """Test creating A2A app with default transport.""" + mock_handler = Mock() + mock_app_instance = Mock() + mock_built_app = Mock() + mock_task_store = Mock() + + mock_task_store_class.return_value = mock_task_store + mock_handler_class.return_value = mock_handler + mock_app_class.return_value = mock_app_instance + mock_app_instance.build.return_value = mock_built_app + + result = create_a2a_app(mock_agent_executor) + + mock_task_store_class.assert_called_once() + mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store) + mock_app_class.assert_called_once() + assert result == mock_built_app + + +@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling") +def test_server_import_error_handling(): + """Test that import errors are handled gracefully when A2A is not available.""" + with pytest.raises(ImportError, match="A2A integration requires"): + from crewai.a2a.server import start_a2a_server