mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-04 05:38:33 +00:00
Compare commits
4 Commits
fix/unsafe
...
devin/1749
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c91500c887 | ||
|
|
6806803bf8 | ||
|
|
4b9426fbcc | ||
|
|
78b9c7dbeb |
184
docs/concepts/a2a-integration.md
Normal file
184
docs/concepts/a2a-integration.md
Normal file
@@ -0,0 +1,184 @@
|
||||
# A2A Protocol Integration
|
||||
|
||||
CrewAI supports the A2A (Agent-to-Agent) protocol, enabling your crews to participate in remote agent interoperability. This allows CrewAI crews to be exposed as remotely accessible agents that can communicate with other A2A-compatible systems.
|
||||
|
||||
## Overview
|
||||
|
||||
The A2A protocol is Google's standard for agent interoperability that enables bidirectional communication between agents. CrewAI's A2A integration provides:
|
||||
|
||||
- **Remote Interoperability**: Expose crews as A2A-compatible agents
|
||||
- **Bidirectional Communication**: Enable full-duplex agent interactions
|
||||
- **Protocol Compliance**: Full support for A2A specifications
|
||||
- **Transport Flexibility**: Support for multiple transport protocols
|
||||
|
||||
## Installation
|
||||
|
||||
A2A support is available as an optional dependency:
|
||||
|
||||
```bash
|
||||
pip install crewai[a2a]
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
### Creating an A2A Server
|
||||
|
||||
```python
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.a2a import CrewAgentExecutor, start_a2a_server
|
||||
|
||||
# Create your crew
|
||||
agent = Agent(
|
||||
role="Assistant",
|
||||
goal="Help users with their queries",
|
||||
backstory="A helpful AI assistant"
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Help with: {query}",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
# Create A2A executor
|
||||
executor = CrewAgentExecutor(crew)
|
||||
|
||||
# Start A2A server
|
||||
start_a2a_server(executor, host="0.0.0.0", port=10001)
|
||||
```
|
||||
|
||||
### Custom Configuration
|
||||
|
||||
```python
|
||||
from crewai.a2a import CrewAgentExecutor, create_a2a_app
|
||||
|
||||
# Create executor with custom content types
|
||||
executor = CrewAgentExecutor(
|
||||
crew=crew,
|
||||
supported_content_types=['text', 'application/json', 'image/png']
|
||||
)
|
||||
|
||||
# Create custom A2A app
|
||||
app = create_a2a_app(
|
||||
executor,
|
||||
agent_name="My Research Crew",
|
||||
agent_description="A specialized research and analysis crew",
|
||||
transport="starlette"
|
||||
)
|
||||
|
||||
# Run with custom ASGI server
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8080)
|
||||
```
|
||||
|
||||
## Key Features
|
||||
|
||||
### CrewAgentExecutor
|
||||
|
||||
The `CrewAgentExecutor` class wraps CrewAI crews to implement the A2A `AgentExecutor` interface:
|
||||
|
||||
- **Asynchronous Execution**: Crews run asynchronously within the A2A protocol
|
||||
- **Task Management**: Automatic handling of task lifecycle and cancellation
|
||||
- **Error Handling**: Robust error handling with A2A-compliant responses
|
||||
- **Output Conversion**: Automatic conversion of crew outputs to A2A artifacts
|
||||
|
||||
### Server Utilities
|
||||
|
||||
Convenience functions for starting A2A servers:
|
||||
|
||||
- `start_a2a_server()`: Quick server startup with default configuration
|
||||
- `create_a2a_app()`: Create custom A2A applications for advanced use cases
|
||||
|
||||
## Protocol Compliance
|
||||
|
||||
CrewAI's A2A integration provides full protocol compliance:
|
||||
|
||||
- **Agent Cards**: Automatic generation of agent capability descriptions
|
||||
- **Task Execution**: Asynchronous task processing with event queues
|
||||
- **Artifact Management**: Conversion of crew outputs to A2A artifacts
|
||||
- **Error Handling**: A2A-compliant error responses and status codes
|
||||
|
||||
## Use Cases
|
||||
|
||||
### Remote Agent Networks
|
||||
|
||||
Expose CrewAI crews as part of larger agent networks:
|
||||
|
||||
```python
|
||||
# Multi-agent system with specialized crews
|
||||
research_crew = create_research_crew()
|
||||
analysis_crew = create_analysis_crew()
|
||||
writing_crew = create_writing_crew()
|
||||
|
||||
# Expose each as A2A agents on different ports
|
||||
start_a2a_server(CrewAgentExecutor(research_crew), port=10001)
|
||||
start_a2a_server(CrewAgentExecutor(analysis_crew), port=10002)
|
||||
start_a2a_server(CrewAgentExecutor(writing_crew), port=10003)
|
||||
```
|
||||
|
||||
### Cross-Platform Integration
|
||||
|
||||
Enable CrewAI crews to work with other agent frameworks:
|
||||
|
||||
```python
|
||||
# CrewAI crew accessible to other A2A-compatible systems
|
||||
executor = CrewAgentExecutor(crew)
|
||||
start_a2a_server(executor, host="0.0.0.0", port=10001)
|
||||
|
||||
# Other systems can now invoke this crew remotely
|
||||
```
|
||||
|
||||
## Advanced Configuration
|
||||
|
||||
### Custom Agent Cards
|
||||
|
||||
```python
|
||||
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
|
||||
|
||||
# Custom agent card for specialized capabilities
|
||||
agent_card = AgentCard(
|
||||
name="Specialized Research Crew",
|
||||
description="Advanced research and analysis capabilities",
|
||||
version="2.0.0",
|
||||
capabilities=AgentCapabilities(
|
||||
streaming=True,
|
||||
pushNotifications=False
|
||||
),
|
||||
skills=[
|
||||
AgentSkill(
|
||||
id="research",
|
||||
name="Research Analysis",
|
||||
description="Comprehensive research and analysis",
|
||||
tags=["research", "analysis", "data"]
|
||||
)
|
||||
]
|
||||
)
|
||||
```
|
||||
|
||||
### Error Handling
|
||||
|
||||
The A2A integration includes comprehensive error handling:
|
||||
|
||||
- **Validation Errors**: Input validation with clear error messages
|
||||
- **Execution Errors**: Crew execution errors converted to A2A artifacts
|
||||
- **Cancellation**: Proper task cancellation support
|
||||
- **Timeouts**: Configurable timeout handling
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Resource Management**: Monitor crew resource usage in server environments
|
||||
2. **Error Handling**: Implement proper error handling in crew tasks
|
||||
3. **Security**: Use appropriate authentication and authorization
|
||||
4. **Monitoring**: Monitor A2A server performance and health
|
||||
5. **Scaling**: Consider load balancing for high-traffic scenarios
|
||||
|
||||
## Limitations
|
||||
|
||||
- **Optional Dependency**: A2A support requires additional dependencies
|
||||
- **Transport Support**: Currently supports Starlette transport only
|
||||
- **Synchronous Crews**: Crews execute synchronously within async A2A context
|
||||
|
||||
## Examples
|
||||
|
||||
See the `examples/a2a_integration_example.py` file for a complete working example of A2A integration with CrewAI.
|
||||
64
examples/a2a_integration_example.py
Normal file
64
examples/a2a_integration_example.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Example: CrewAI A2A Integration
|
||||
|
||||
This example demonstrates how to expose a CrewAI crew as an A2A (Agent-to-Agent)
|
||||
protocol server for remote interoperability.
|
||||
|
||||
Requirements:
|
||||
pip install crewai[a2a]
|
||||
"""
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.a2a import CrewAgentExecutor, start_a2a_server
|
||||
|
||||
|
||||
def main():
|
||||
"""Create and start an A2A server with a CrewAI crew."""
|
||||
|
||||
researcher = Agent(
|
||||
role="Research Analyst",
|
||||
goal="Provide comprehensive research and analysis on any topic",
|
||||
backstory=(
|
||||
"You are an experienced research analyst with expertise in "
|
||||
"gathering, analyzing, and synthesizing information from various sources."
|
||||
),
|
||||
verbose=True
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description=(
|
||||
"Research and analyze the topic: {query}\n"
|
||||
"Provide a comprehensive overview including:\n"
|
||||
"- Key concepts and definitions\n"
|
||||
"- Current trends and developments\n"
|
||||
"- Important considerations\n"
|
||||
"- Relevant examples or case studies"
|
||||
),
|
||||
agent=researcher,
|
||||
expected_output="A detailed research report with analysis and insights"
|
||||
)
|
||||
|
||||
research_crew = Crew(
|
||||
agents=[researcher],
|
||||
tasks=[research_task],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
executor = CrewAgentExecutor(
|
||||
crew=research_crew,
|
||||
supported_content_types=['text', 'text/plain', 'application/json']
|
||||
)
|
||||
|
||||
print("Starting A2A server with CrewAI research crew...")
|
||||
print("Server will be available at http://localhost:10001")
|
||||
print("Use the A2A CLI or SDK to interact with the crew remotely")
|
||||
|
||||
start_a2a_server(
|
||||
executor,
|
||||
host="0.0.0.0",
|
||||
port=10001,
|
||||
transport="starlette"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -65,8 +65,8 @@ mem0 = ["mem0ai>=0.1.94"]
|
||||
docling = [
|
||||
"docling>=2.12.0",
|
||||
]
|
||||
aisuite = [
|
||||
"aisuite>=0.1.10",
|
||||
a2a = [
|
||||
"a2a-sdk>=0.0.1",
|
||||
]
|
||||
|
||||
[tool.uv]
|
||||
|
||||
@@ -32,3 +32,13 @@ __all__ = [
|
||||
"TaskOutput",
|
||||
"LLMGuardrail",
|
||||
]
|
||||
|
||||
try:
|
||||
from crewai.a2a import ( # noqa: F401
|
||||
CrewAgentExecutor,
|
||||
start_a2a_server,
|
||||
create_a2a_app
|
||||
)
|
||||
__all__.extend(["CrewAgentExecutor", "start_a2a_server", "create_a2a_app"])
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
62
src/crewai/a2a/__init__.py
Normal file
62
src/crewai/a2a/__init__.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""A2A (Agent-to-Agent) protocol integration for CrewAI.
|
||||
|
||||
This module provides integration with the A2A protocol to enable remote agent
|
||||
interoperability. It allows CrewAI crews to be exposed as A2A-compatible agents
|
||||
that can communicate with other agents following the A2A protocol standard.
|
||||
|
||||
The integration is optional and requires the 'a2a' extra dependency:
|
||||
pip install crewai[a2a]
|
||||
|
||||
Example:
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.a2a import CrewAgentExecutor, start_a2a_server
|
||||
|
||||
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
|
||||
task = Task(description="Help with {query}", agent=agent)
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
executor = CrewAgentExecutor(crew)
|
||||
start_a2a_server(executor, host="localhost", port=8080)
|
||||
"""
|
||||
|
||||
try:
|
||||
from .crew_agent_executor import CrewAgentExecutor
|
||||
from .server import start_a2a_server, create_a2a_app
|
||||
from .server_config import ServerConfig
|
||||
from .task_info import TaskInfo
|
||||
from .exceptions import A2AServerError, TransportError, ExecutionError
|
||||
|
||||
__all__ = [
|
||||
"CrewAgentExecutor",
|
||||
"start_a2a_server",
|
||||
"create_a2a_app",
|
||||
"ServerConfig",
|
||||
"TaskInfo",
|
||||
"A2AServerError",
|
||||
"TransportError",
|
||||
"ExecutionError"
|
||||
]
|
||||
except ImportError:
|
||||
import warnings
|
||||
warnings.warn(
|
||||
"A2A integration requires the 'a2a' extra dependency. "
|
||||
"Install with: pip install crewai[a2a]",
|
||||
ImportWarning
|
||||
)
|
||||
|
||||
def _missing_dependency(*args, **kwargs):
|
||||
raise ImportError(
|
||||
"A2A integration requires the 'a2a' extra dependency. "
|
||||
"Install with: pip install crewai[a2a]"
|
||||
)
|
||||
|
||||
CrewAgentExecutor = _missing_dependency # type: ignore
|
||||
start_a2a_server = _missing_dependency # type: ignore
|
||||
create_a2a_app = _missing_dependency # type: ignore
|
||||
ServerConfig = _missing_dependency # type: ignore
|
||||
TaskInfo = _missing_dependency # type: ignore
|
||||
A2AServerError = _missing_dependency # type: ignore
|
||||
TransportError = _missing_dependency # type: ignore
|
||||
ExecutionError = _missing_dependency # type: ignore
|
||||
|
||||
__all__ = []
|
||||
255
src/crewai/a2a/crew_agent_executor.py
Normal file
255
src/crewai/a2a/crew_agent_executor.py
Normal file
@@ -0,0 +1,255 @@
|
||||
"""CrewAI Agent Executor for A2A Protocol Integration.
|
||||
|
||||
This module implements the A2A AgentExecutor interface to enable CrewAI crews
|
||||
to participate in the Agent-to-Agent protocol for remote interoperability.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from crewai import Crew
|
||||
from crewai.crew import CrewOutput
|
||||
|
||||
from .task_info import TaskInfo
|
||||
|
||||
try:
|
||||
from a2a.server.agent_execution.agent_executor import AgentExecutor
|
||||
from a2a.server.agent_execution.context import RequestContext
|
||||
from a2a.server.events.event_queue import EventQueue
|
||||
from a2a.types import (
|
||||
InvalidParamsError,
|
||||
Part,
|
||||
Task,
|
||||
TextPart,
|
||||
UnsupportedOperationError,
|
||||
)
|
||||
from a2a.utils import completed_task, new_artifact
|
||||
from a2a.utils.errors import ServerError
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"A2A integration requires the 'a2a' extra dependency. "
|
||||
"Install with: pip install crewai[a2a]"
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CrewAgentExecutor(AgentExecutor):
|
||||
"""A2A Agent Executor that wraps CrewAI crews for remote interoperability.
|
||||
|
||||
This class implements the A2A AgentExecutor interface to enable CrewAI crews
|
||||
to be exposed as remotely interoperable agents following the A2A protocol.
|
||||
|
||||
Args:
|
||||
crew: The CrewAI crew to expose as an A2A agent
|
||||
supported_content_types: List of supported content types for input
|
||||
|
||||
Example:
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.a2a import CrewAgentExecutor
|
||||
|
||||
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
|
||||
task = Task(description="Help with {query}", agent=agent)
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
executor = CrewAgentExecutor(crew)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
crew: Crew,
|
||||
supported_content_types: Optional[list[str]] = None
|
||||
):
|
||||
"""Initialize the CrewAgentExecutor.
|
||||
|
||||
Args:
|
||||
crew: The CrewAI crew to wrap
|
||||
supported_content_types: List of supported content types
|
||||
"""
|
||||
self.crew = crew
|
||||
self.supported_content_types = supported_content_types or [
|
||||
'text', 'text/plain'
|
||||
]
|
||||
self._running_tasks: Dict[str, TaskInfo] = {}
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
context: RequestContext,
|
||||
event_queue: EventQueue,
|
||||
) -> None:
|
||||
"""Execute the crew with the given context and publish results to event queue.
|
||||
|
||||
This method extracts the user input from the request context, executes
|
||||
the CrewAI crew, and publishes the results as A2A artifacts.
|
||||
|
||||
Args:
|
||||
context: The A2A request context containing task details
|
||||
event_queue: Queue for publishing execution events and results
|
||||
|
||||
Raises:
|
||||
ServerError: If validation fails or execution encounters an error
|
||||
"""
|
||||
error = self._validate_request(context)
|
||||
if error:
|
||||
logger.error(f"Request validation failed: {error}")
|
||||
raise ServerError(error=InvalidParamsError())
|
||||
|
||||
query = context.get_user_input()
|
||||
task_id = context.task_id
|
||||
context_id = context.context_id
|
||||
|
||||
if not task_id or not context_id:
|
||||
raise ServerError(error=InvalidParamsError())
|
||||
|
||||
logger.info(f"Executing crew for task {task_id} with query: {query}")
|
||||
|
||||
try:
|
||||
inputs = {"query": query}
|
||||
|
||||
execution_task = asyncio.create_task(
|
||||
self._execute_crew_async(inputs)
|
||||
)
|
||||
from datetime import datetime
|
||||
self._running_tasks[task_id] = TaskInfo(
|
||||
task=execution_task,
|
||||
started_at=datetime.now(),
|
||||
status="running"
|
||||
)
|
||||
|
||||
result = await execution_task
|
||||
|
||||
self._running_tasks.pop(task_id, None)
|
||||
|
||||
logger.info(f"Crew execution completed for task {task_id}")
|
||||
|
||||
parts = self._convert_output_to_parts(result)
|
||||
|
||||
messages = [context.message] if context.message else []
|
||||
event_queue.enqueue_event(
|
||||
completed_task(
|
||||
task_id,
|
||||
context_id,
|
||||
[new_artifact(parts, f"crew_output_{task_id}")],
|
||||
messages,
|
||||
)
|
||||
)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Task {task_id} was cancelled")
|
||||
self._running_tasks.pop(task_id, None)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing crew for task {task_id}: {e}")
|
||||
self._running_tasks.pop(task_id, None)
|
||||
|
||||
error_parts = [
|
||||
Part(root=TextPart(text=f"Error executing crew: {str(e)}"))
|
||||
]
|
||||
|
||||
messages = [context.message] if context.message else []
|
||||
event_queue.enqueue_event(
|
||||
completed_task(
|
||||
task_id,
|
||||
context_id,
|
||||
[new_artifact(error_parts, f"error_{task_id}")],
|
||||
messages,
|
||||
)
|
||||
)
|
||||
|
||||
raise ServerError(
|
||||
error=InvalidParamsError()
|
||||
) from e
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
request: RequestContext,
|
||||
event_queue: EventQueue
|
||||
) -> Task | None:
|
||||
"""Cancel a running crew execution.
|
||||
|
||||
Args:
|
||||
request: The A2A request context for the task to cancel
|
||||
event_queue: Event queue for publishing cancellation events
|
||||
|
||||
Returns:
|
||||
None (cancellation is handled internally)
|
||||
|
||||
Raises:
|
||||
ServerError: If the task cannot be cancelled
|
||||
"""
|
||||
task_id = request.task_id
|
||||
|
||||
if task_id in self._running_tasks:
|
||||
task_info = self._running_tasks[task_id]
|
||||
task_info.task.cancel()
|
||||
task_info.update_status("cancelled")
|
||||
|
||||
try:
|
||||
await task_info.task
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Successfully cancelled task {task_id}")
|
||||
pass
|
||||
|
||||
self._running_tasks.pop(task_id, None)
|
||||
return None
|
||||
else:
|
||||
logger.warning(f"Task {task_id} not found for cancellation")
|
||||
raise ServerError(error=UnsupportedOperationError())
|
||||
|
||||
async def _execute_crew_async(self, inputs: Dict[str, Any]) -> CrewOutput:
|
||||
"""Execute the crew asynchronously.
|
||||
|
||||
Args:
|
||||
inputs: Input parameters for the crew
|
||||
|
||||
Returns:
|
||||
The crew execution output
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(None, self.crew.kickoff, inputs)
|
||||
|
||||
def _convert_output_to_parts(self, result: CrewOutput) -> list[Part]:
|
||||
"""Convert CrewAI output to A2A Parts.
|
||||
|
||||
Args:
|
||||
result: The crew execution result
|
||||
|
||||
Returns:
|
||||
List of A2A Parts representing the output
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if hasattr(result, 'raw') and result.raw:
|
||||
parts.append(Part(root=TextPart(text=str(result.raw))))
|
||||
elif result:
|
||||
parts.append(Part(root=TextPart(text=str(result))))
|
||||
|
||||
if hasattr(result, 'json_dict') and result.json_dict:
|
||||
json_output = json.dumps(result.json_dict, indent=2)
|
||||
parts.append(Part(root=TextPart(text=json_output)))
|
||||
|
||||
if not parts:
|
||||
parts.append(Part(root=TextPart(text="Crew execution completed successfully")))
|
||||
|
||||
return parts
|
||||
|
||||
def _validate_request(self, context: RequestContext) -> Optional[str]:
|
||||
"""Validate the incoming request context.
|
||||
|
||||
Args:
|
||||
context: The A2A request context to validate
|
||||
|
||||
Returns:
|
||||
Error message if validation fails, None if valid
|
||||
"""
|
||||
try:
|
||||
user_input = context.get_user_input()
|
||||
if not user_input or not user_input.strip():
|
||||
return "Empty or missing user input"
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
return f"Failed to extract user input: {e}"
|
||||
16
src/crewai/a2a/exceptions.py
Normal file
16
src/crewai/a2a/exceptions.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Custom exceptions for A2A integration."""
|
||||
|
||||
|
||||
class A2AServerError(Exception):
|
||||
"""Base exception for A2A server errors."""
|
||||
pass
|
||||
|
||||
|
||||
class TransportError(A2AServerError):
|
||||
"""Error related to transport configuration."""
|
||||
pass
|
||||
|
||||
|
||||
class ExecutionError(A2AServerError):
|
||||
"""Error during crew execution."""
|
||||
pass
|
||||
151
src/crewai/a2a/server.py
Normal file
151
src/crewai/a2a/server.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""A2A Server utilities for CrewAI integration.
|
||||
|
||||
This module provides convenience functions for starting A2A servers with CrewAI
|
||||
crews, supporting multiple transport protocols and configurations.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .exceptions import TransportError
|
||||
from .server_config import ServerConfig
|
||||
|
||||
try:
|
||||
from a2a.server.agent_execution.agent_executor import AgentExecutor
|
||||
from a2a.server.apps import A2AStarletteApplication
|
||||
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
|
||||
from a2a.server.tasks import InMemoryTaskStore
|
||||
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"A2A integration requires the 'a2a' extra dependency. "
|
||||
"Install with: pip install crewai[a2a]"
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def start_a2a_server(
|
||||
agent_executor: AgentExecutor,
|
||||
host: str = "localhost",
|
||||
port: int = 10001,
|
||||
transport: str = "starlette",
|
||||
config: Optional[ServerConfig] = None,
|
||||
**kwargs
|
||||
) -> None:
|
||||
"""Start an A2A server with the given agent executor.
|
||||
|
||||
This is a convenience function that creates and starts an A2A server
|
||||
with the specified configuration.
|
||||
|
||||
Args:
|
||||
agent_executor: The A2A agent executor to serve
|
||||
host: Host address to bind the server to
|
||||
port: Port number to bind the server to
|
||||
transport: Transport protocol to use ("starlette" or "fastapi")
|
||||
config: Optional ServerConfig object to override individual parameters
|
||||
**kwargs: Additional arguments passed to the server
|
||||
|
||||
Example:
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.a2a import CrewAgentExecutor, start_a2a_server
|
||||
|
||||
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
|
||||
task = Task(description="Help with {query}", agent=agent)
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
executor = CrewAgentExecutor(crew)
|
||||
start_a2a_server(executor, host="0.0.0.0", port=8080)
|
||||
"""
|
||||
if config:
|
||||
host = config.host
|
||||
port = config.port
|
||||
transport = config.transport
|
||||
|
||||
app = create_a2a_app(
|
||||
agent_executor,
|
||||
transport=transport,
|
||||
agent_name=config.agent_name if config else None,
|
||||
agent_description=config.agent_description if config else None,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
logger.info(f"Starting A2A server on {host}:{port} using {transport} transport")
|
||||
|
||||
try:
|
||||
import uvicorn
|
||||
uvicorn.run(app, host=host, port=port)
|
||||
except ImportError:
|
||||
raise ImportError("uvicorn is required to run the A2A server. Install with: pip install uvicorn")
|
||||
|
||||
|
||||
def create_a2a_app(
|
||||
agent_executor: AgentExecutor,
|
||||
transport: str = "starlette",
|
||||
agent_name: Optional[str] = None,
|
||||
agent_description: Optional[str] = None,
|
||||
**kwargs
|
||||
):
|
||||
"""Create an A2A application with the given agent executor.
|
||||
|
||||
This function creates an A2A server application that can be run
|
||||
with any ASGI server.
|
||||
|
||||
Args:
|
||||
agent_executor: The A2A agent executor to serve
|
||||
transport: Transport protocol to use ("starlette" or "fastapi")
|
||||
agent_name: Optional name for the agent
|
||||
agent_description: Optional description for the agent
|
||||
**kwargs: Additional arguments passed to the transport
|
||||
|
||||
Returns:
|
||||
ASGI application ready to be served
|
||||
|
||||
Example:
|
||||
from crewai.a2a import CrewAgentExecutor, create_a2a_app
|
||||
|
||||
executor = CrewAgentExecutor(crew)
|
||||
app = create_a2a_app(
|
||||
executor,
|
||||
agent_name="My Crew Agent",
|
||||
agent_description="A helpful CrewAI agent"
|
||||
)
|
||||
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="0.0.0.0", port=8080)
|
||||
"""
|
||||
agent_card = AgentCard(
|
||||
name=agent_name or "CrewAI Agent",
|
||||
description=agent_description or "A CrewAI agent exposed via A2A protocol",
|
||||
version="1.0.0",
|
||||
capabilities=AgentCapabilities(
|
||||
streaming=True,
|
||||
pushNotifications=False
|
||||
),
|
||||
defaultInputModes=["text"],
|
||||
defaultOutputModes=["text"],
|
||||
skills=[
|
||||
AgentSkill(
|
||||
id="crew_execution",
|
||||
name="Crew Execution",
|
||||
description="Execute CrewAI crew tasks with multiple agents",
|
||||
examples=["Process user queries", "Coordinate multi-agent workflows"],
|
||||
tags=["crewai", "multi-agent", "workflow"]
|
||||
)
|
||||
],
|
||||
url="https://github.com/crewAIInc/crewAI"
|
||||
)
|
||||
|
||||
task_store = InMemoryTaskStore()
|
||||
request_handler = DefaultRequestHandler(agent_executor, task_store)
|
||||
|
||||
if transport.lower() == "fastapi":
|
||||
raise TransportError("FastAPI transport is not available in the current A2A SDK version")
|
||||
else:
|
||||
app_instance = A2AStarletteApplication(
|
||||
agent_card=agent_card,
|
||||
http_handler=request_handler,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
return app_instance.build()
|
||||
25
src/crewai/a2a/server_config.py
Normal file
25
src/crewai/a2a/server_config.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""Server configuration for A2A integration."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServerConfig:
|
||||
"""Configuration for A2A server.
|
||||
|
||||
This class encapsulates server settings to improve readability
|
||||
and flexibility for server setups.
|
||||
|
||||
Attributes:
|
||||
host: Host address to bind the server to
|
||||
port: Port number to bind the server to
|
||||
transport: Transport protocol to use ("starlette" or "fastapi")
|
||||
agent_name: Optional name for the agent
|
||||
agent_description: Optional description for the agent
|
||||
"""
|
||||
host: str = "localhost"
|
||||
port: int = 10001
|
||||
transport: str = "starlette"
|
||||
agent_name: Optional[str] = None
|
||||
agent_description: Optional[str] = None
|
||||
47
src/crewai/a2a/task_info.py
Normal file
47
src/crewai/a2a/task_info.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Task information tracking for A2A integration."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
import asyncio
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskInfo:
|
||||
"""Information about a running task in the A2A executor.
|
||||
|
||||
This class tracks the lifecycle and status of tasks being executed
|
||||
by the CrewAgentExecutor, providing better task management capabilities.
|
||||
|
||||
Attributes:
|
||||
task: The asyncio task being executed
|
||||
started_at: When the task was started
|
||||
status: Current status of the task ("running", "completed", "cancelled", "failed")
|
||||
"""
|
||||
task: asyncio.Task
|
||||
started_at: datetime
|
||||
status: str = "running"
|
||||
|
||||
def update_status(self, new_status: str) -> None:
|
||||
"""Update the task status.
|
||||
|
||||
Args:
|
||||
new_status: The new status to set
|
||||
"""
|
||||
self.status = new_status
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Check if the task is currently running."""
|
||||
return self.status == "running" and not self.task.done()
|
||||
|
||||
@property
|
||||
def duration(self) -> Optional[float]:
|
||||
"""Get the duration of the task in seconds.
|
||||
|
||||
Returns:
|
||||
Duration in seconds if task is completed, None if still running
|
||||
"""
|
||||
if self.task.done():
|
||||
return (datetime.now() - self.started_at).total_seconds()
|
||||
return None
|
||||
1
tests/a2a/__init__.py
Normal file
1
tests/a2a/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Tests for CrewAI A2A integration."""
|
||||
198
tests/a2a/test_crew_agent_executor.py
Normal file
198
tests/a2a/test_crew_agent_executor.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""Tests for CrewAgentExecutor class."""
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
|
||||
try:
|
||||
from crewai.a2a import CrewAgentExecutor
|
||||
from a2a.server.agent_execution import RequestContext
|
||||
from a2a.server.events import EventQueue
|
||||
pass # Imports handled in test methods as needed
|
||||
from a2a.utils.errors import ServerError
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestCrewAgentExecutor:
|
||||
"""Test cases for CrewAgentExecutor."""
|
||||
|
||||
@pytest.fixture
|
||||
def sample_crew(self):
|
||||
"""Create a sample crew for testing."""
|
||||
from unittest.mock import Mock
|
||||
mock_crew = Mock()
|
||||
mock_crew.agents = []
|
||||
mock_crew.tasks = []
|
||||
return mock_crew
|
||||
|
||||
@pytest.fixture
|
||||
def crew_executor(self, sample_crew):
|
||||
"""Create a CrewAgentExecutor for testing."""
|
||||
return CrewAgentExecutor(sample_crew)
|
||||
|
||||
@pytest.fixture
|
||||
def mock_context(self):
|
||||
"""Create a mock RequestContext."""
|
||||
from a2a.types import Message, Part, TextPart
|
||||
context = Mock(spec=RequestContext)
|
||||
context.task_id = "test-task-123"
|
||||
context.context_id = "test-context-456"
|
||||
context.message = Message(
|
||||
messageId="msg-123",
|
||||
taskId="test-task-123",
|
||||
contextId="test-context-456",
|
||||
role="user",
|
||||
parts=[Part(root=TextPart(text="Test message"))]
|
||||
)
|
||||
context.get_user_input.return_value = "Test query"
|
||||
return context
|
||||
|
||||
@pytest.fixture
|
||||
def mock_event_queue(self):
|
||||
"""Create a mock EventQueue."""
|
||||
return Mock(spec=EventQueue)
|
||||
|
||||
def test_init(self, sample_crew):
|
||||
"""Test CrewAgentExecutor initialization."""
|
||||
executor = CrewAgentExecutor(sample_crew)
|
||||
|
||||
assert executor.crew == sample_crew
|
||||
assert executor.supported_content_types == ['text', 'text/plain']
|
||||
assert executor._running_tasks == {}
|
||||
|
||||
def test_init_with_custom_content_types(self, sample_crew):
|
||||
"""Test CrewAgentExecutor initialization with custom content types."""
|
||||
custom_types = ['text', 'application/json']
|
||||
executor = CrewAgentExecutor(sample_crew, supported_content_types=custom_types)
|
||||
|
||||
assert executor.supported_content_types == custom_types
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_success(self, crew_executor, mock_context, mock_event_queue):
|
||||
"""Test successful crew execution."""
|
||||
mock_output = CrewOutput(raw="Test response", json_dict=None)
|
||||
|
||||
with patch.object(crew_executor, '_execute_crew_async', return_value=mock_output):
|
||||
await crew_executor.execute(mock_context, mock_event_queue)
|
||||
|
||||
mock_event_queue.enqueue_event.assert_called_once()
|
||||
|
||||
assert len(crew_executor._running_tasks) == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_with_validation_error(self, crew_executor, mock_event_queue):
|
||||
"""Test execution with validation error."""
|
||||
bad_context = Mock(spec=RequestContext)
|
||||
bad_context.get_user_input.return_value = ""
|
||||
|
||||
with pytest.raises(ServerError):
|
||||
await crew_executor.execute(bad_context, mock_event_queue)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_with_crew_error(self, crew_executor, mock_context, mock_event_queue):
|
||||
"""Test execution when crew raises an error."""
|
||||
with patch.object(crew_executor, '_execute_crew_async', side_effect=Exception("Crew error")):
|
||||
with pytest.raises(ServerError):
|
||||
await crew_executor.execute(mock_context, mock_event_queue)
|
||||
|
||||
mock_event_queue.enqueue_event.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_existing_task(self, crew_executor, mock_event_queue):
|
||||
"""Test cancelling an existing task."""
|
||||
cancel_context = Mock(spec=RequestContext)
|
||||
cancel_context.task_id = "test-task-123"
|
||||
|
||||
async def dummy_task():
|
||||
await asyncio.sleep(10)
|
||||
|
||||
mock_task = asyncio.create_task(dummy_task())
|
||||
from crewai.a2a.crew_agent_executor import TaskInfo
|
||||
from datetime import datetime
|
||||
task_info = TaskInfo(task=mock_task, started_at=datetime.now())
|
||||
crew_executor._running_tasks["test-task-123"] = task_info
|
||||
|
||||
result = await crew_executor.cancel(cancel_context, mock_event_queue)
|
||||
|
||||
assert result is None
|
||||
assert "test-task-123" not in crew_executor._running_tasks
|
||||
assert mock_task.cancelled()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_nonexistent_task(self, crew_executor, mock_event_queue):
|
||||
"""Test cancelling a task that doesn't exist."""
|
||||
cancel_context = Mock(spec=RequestContext)
|
||||
cancel_context.task_id = "nonexistent-task"
|
||||
|
||||
with pytest.raises(ServerError):
|
||||
await crew_executor.cancel(cancel_context, mock_event_queue)
|
||||
|
||||
def test_convert_output_to_parts_with_raw(self, crew_executor):
|
||||
"""Test converting crew output with raw content to A2A parts."""
|
||||
output = Mock()
|
||||
output.raw = "Test response"
|
||||
output.json_dict = None
|
||||
parts = crew_executor._convert_output_to_parts(output)
|
||||
|
||||
assert len(parts) == 1
|
||||
assert parts[0].root.text == "Test response"
|
||||
|
||||
def test_convert_output_to_parts_with_json(self, crew_executor):
|
||||
"""Test converting crew output with JSON data to A2A parts."""
|
||||
output = Mock()
|
||||
output.raw = "Test response"
|
||||
output.json_dict = {"key": "value"}
|
||||
parts = crew_executor._convert_output_to_parts(output)
|
||||
|
||||
assert len(parts) == 2
|
||||
assert parts[0].root.text == "Test response"
|
||||
assert '"key": "value"' in parts[1].root.text
|
||||
|
||||
def test_convert_output_to_parts_empty(self, crew_executor):
|
||||
"""Test converting empty crew output to A2A parts."""
|
||||
output = ""
|
||||
parts = crew_executor._convert_output_to_parts(output)
|
||||
|
||||
assert len(parts) == 1
|
||||
assert parts[0].root.text == "Crew execution completed successfully"
|
||||
|
||||
def test_validate_request_valid(self, crew_executor, mock_context):
|
||||
"""Test request validation with valid input."""
|
||||
error = crew_executor._validate_request(mock_context)
|
||||
assert error is None
|
||||
|
||||
def test_validate_request_empty_input(self, crew_executor):
|
||||
"""Test request validation with empty input."""
|
||||
context = Mock(spec=RequestContext)
|
||||
context.get_user_input.return_value = ""
|
||||
|
||||
error = crew_executor._validate_request(context)
|
||||
assert error == "Empty or missing user input"
|
||||
|
||||
def test_validate_request_whitespace_input(self, crew_executor):
|
||||
"""Test request validation with whitespace-only input."""
|
||||
context = Mock(spec=RequestContext)
|
||||
context.get_user_input.return_value = " \n\t "
|
||||
|
||||
error = crew_executor._validate_request(context)
|
||||
assert error == "Empty or missing user input"
|
||||
|
||||
def test_validate_request_exception(self, crew_executor):
|
||||
"""Test request validation when get_user_input raises exception."""
|
||||
context = Mock(spec=RequestContext)
|
||||
context.get_user_input.side_effect = Exception("Input error")
|
||||
|
||||
error = crew_executor._validate_request(context)
|
||||
assert "Failed to extract user input" in error
|
||||
|
||||
|
||||
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
|
||||
def test_import_error_handling():
|
||||
"""Test that import errors are handled gracefully when A2A is not available."""
|
||||
with pytest.raises(ImportError, match="A2A integration requires"):
|
||||
pass
|
||||
56
tests/a2a/test_exceptions.py
Normal file
56
tests/a2a/test_exceptions.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Tests for A2A custom exceptions."""
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from crewai.a2a.crew_agent_executor import (
|
||||
A2AServerError,
|
||||
TransportError,
|
||||
ExecutionError
|
||||
)
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestA2AExceptions:
|
||||
"""Test A2A custom exception classes."""
|
||||
|
||||
def test_a2a_server_error_base(self):
|
||||
"""Test A2AServerError base exception."""
|
||||
error = A2AServerError("Base error message")
|
||||
|
||||
assert str(error) == "Base error message"
|
||||
assert isinstance(error, Exception)
|
||||
|
||||
def test_transport_error_inheritance(self):
|
||||
"""Test TransportError inherits from A2AServerError."""
|
||||
error = TransportError("Transport configuration failed")
|
||||
|
||||
assert str(error) == "Transport configuration failed"
|
||||
assert isinstance(error, A2AServerError)
|
||||
assert isinstance(error, Exception)
|
||||
|
||||
def test_execution_error_inheritance(self):
|
||||
"""Test ExecutionError inherits from A2AServerError."""
|
||||
error = ExecutionError("Crew execution failed")
|
||||
|
||||
assert str(error) == "Crew execution failed"
|
||||
assert isinstance(error, A2AServerError)
|
||||
assert isinstance(error, Exception)
|
||||
|
||||
def test_exception_raising(self):
|
||||
"""Test that exceptions can be raised and caught."""
|
||||
with pytest.raises(TransportError) as exc_info:
|
||||
raise TransportError("Test transport error")
|
||||
|
||||
assert str(exc_info.value) == "Test transport error"
|
||||
|
||||
with pytest.raises(ExecutionError) as exc_info:
|
||||
raise ExecutionError("Test execution error")
|
||||
|
||||
assert str(exc_info.value) == "Test execution error"
|
||||
|
||||
with pytest.raises(A2AServerError):
|
||||
raise TransportError("Should be caught as base class")
|
||||
122
tests/a2a/test_integration.py
Normal file
122
tests/a2a/test_integration.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Integration tests for CrewAI A2A functionality."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
|
||||
try:
|
||||
from crewai.a2a import CrewAgentExecutor, create_a2a_app
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestA2AIntegration:
|
||||
"""Integration tests for A2A functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def sample_crew(self):
|
||||
"""Create a sample crew for integration testing."""
|
||||
from unittest.mock import Mock
|
||||
mock_crew = Mock()
|
||||
mock_crew.agents = []
|
||||
mock_crew.tasks = []
|
||||
return mock_crew
|
||||
|
||||
def test_end_to_end_integration(self, sample_crew):
|
||||
"""Test end-to-end A2A integration."""
|
||||
executor = CrewAgentExecutor(sample_crew)
|
||||
|
||||
assert executor.crew == sample_crew
|
||||
assert isinstance(executor.supported_content_types, list)
|
||||
|
||||
with patch('crewai.a2a.server.A2AStarletteApplication') as mock_app_class:
|
||||
with patch('crewai.a2a.server.DefaultRequestHandler') as mock_handler_class:
|
||||
with patch('crewai.a2a.server.InMemoryTaskStore') as mock_task_store_class:
|
||||
mock_handler = Mock()
|
||||
mock_app_instance = Mock()
|
||||
mock_built_app = Mock()
|
||||
mock_task_store = Mock()
|
||||
|
||||
mock_task_store_class.return_value = mock_task_store
|
||||
mock_handler_class.return_value = mock_handler
|
||||
mock_app_class.return_value = mock_app_instance
|
||||
mock_app_instance.build.return_value = mock_built_app
|
||||
|
||||
app = create_a2a_app(executor)
|
||||
|
||||
mock_task_store_class.assert_called_once()
|
||||
mock_handler_class.assert_called_once_with(executor, mock_task_store)
|
||||
mock_app_class.assert_called_once()
|
||||
assert app == mock_built_app
|
||||
|
||||
def test_crew_with_multiple_agents(self):
|
||||
"""Test A2A integration with multi-agent crew."""
|
||||
from unittest.mock import Mock
|
||||
crew = Mock()
|
||||
crew.agents = [Mock(), Mock()]
|
||||
crew.tasks = [Mock(), Mock()]
|
||||
|
||||
executor = CrewAgentExecutor(crew)
|
||||
assert executor.crew == crew
|
||||
assert len(executor.crew.agents) == 2
|
||||
assert len(executor.crew.tasks) == 2
|
||||
|
||||
def test_custom_content_types(self, sample_crew):
|
||||
"""Test A2A integration with custom content types."""
|
||||
custom_types = ['text', 'application/json', 'image/png']
|
||||
executor = CrewAgentExecutor(
|
||||
sample_crew,
|
||||
supported_content_types=custom_types
|
||||
)
|
||||
|
||||
assert executor.supported_content_types == custom_types
|
||||
|
||||
@patch('uvicorn.run')
|
||||
def test_server_startup_integration(self, mock_uvicorn_run, sample_crew):
|
||||
"""Test server startup integration."""
|
||||
from crewai.a2a import start_a2a_server
|
||||
|
||||
executor = CrewAgentExecutor(sample_crew)
|
||||
|
||||
with patch('crewai.a2a.server.create_a2a_app') as mock_create_app:
|
||||
mock_app = Mock()
|
||||
mock_create_app.return_value = mock_app
|
||||
|
||||
start_a2a_server(
|
||||
executor,
|
||||
host="127.0.0.1",
|
||||
port=9999,
|
||||
transport="starlette"
|
||||
)
|
||||
|
||||
mock_create_app.assert_called_once_with(
|
||||
executor,
|
||||
transport="starlette",
|
||||
agent_name=None,
|
||||
agent_description=None
|
||||
)
|
||||
mock_uvicorn_run.assert_called_once_with(
|
||||
mock_app,
|
||||
host="127.0.0.1",
|
||||
port=9999
|
||||
)
|
||||
|
||||
|
||||
def test_optional_import_in_main_module():
|
||||
"""Test that A2A classes are optionally imported in main module."""
|
||||
import crewai
|
||||
|
||||
if A2A_AVAILABLE:
|
||||
assert hasattr(crewai, 'CrewAgentExecutor')
|
||||
assert hasattr(crewai, 'start_a2a_server')
|
||||
assert hasattr(crewai, 'create_a2a_app')
|
||||
|
||||
assert 'CrewAgentExecutor' in crewai.__all__
|
||||
assert 'start_a2a_server' in crewai.__all__
|
||||
assert 'create_a2a_app' in crewai.__all__
|
||||
else:
|
||||
assert not hasattr(crewai, 'CrewAgentExecutor')
|
||||
assert not hasattr(crewai, 'start_a2a_server')
|
||||
assert not hasattr(crewai, 'create_a2a_app')
|
||||
134
tests/a2a/test_server.py
Normal file
134
tests/a2a/test_server.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Tests for A2A server utilities."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
try:
|
||||
from crewai.a2a import start_a2a_server, create_a2a_app
|
||||
from a2a.server.agent_execution.agent_executor import AgentExecutor
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestA2AServer:
|
||||
"""Test cases for A2A server utilities."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_agent_executor(self):
|
||||
"""Create a mock AgentExecutor."""
|
||||
return Mock(spec=AgentExecutor)
|
||||
|
||||
@patch('uvicorn.run')
|
||||
@patch('crewai.a2a.server.create_a2a_app')
|
||||
def test_start_a2a_server_default(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
|
||||
"""Test starting A2A server with default parameters."""
|
||||
mock_app = Mock()
|
||||
mock_create_app.return_value = mock_app
|
||||
|
||||
start_a2a_server(mock_agent_executor)
|
||||
|
||||
mock_create_app.assert_called_once_with(
|
||||
mock_agent_executor,
|
||||
transport="starlette",
|
||||
agent_name=None,
|
||||
agent_description=None
|
||||
)
|
||||
|
||||
mock_uvicorn_run.assert_called_once_with(
|
||||
mock_app,
|
||||
host="localhost",
|
||||
port=10001
|
||||
)
|
||||
|
||||
@patch('uvicorn.run')
|
||||
@patch('crewai.a2a.server.create_a2a_app')
|
||||
def test_start_a2a_server_custom(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
|
||||
"""Test starting A2A server with custom parameters."""
|
||||
mock_app = Mock()
|
||||
mock_create_app.return_value = mock_app
|
||||
|
||||
start_a2a_server(
|
||||
mock_agent_executor,
|
||||
host="0.0.0.0",
|
||||
port=8080,
|
||||
transport="fastapi"
|
||||
)
|
||||
|
||||
mock_create_app.assert_called_once_with(
|
||||
mock_agent_executor,
|
||||
transport="fastapi",
|
||||
agent_name=None,
|
||||
agent_description=None
|
||||
)
|
||||
|
||||
mock_uvicorn_run.assert_called_once_with(
|
||||
mock_app,
|
||||
host="0.0.0.0",
|
||||
port=8080
|
||||
)
|
||||
|
||||
@patch('crewai.a2a.server.A2AStarletteApplication')
|
||||
@patch('crewai.a2a.server.DefaultRequestHandler')
|
||||
@patch('crewai.a2a.server.InMemoryTaskStore')
|
||||
def test_create_a2a_app_starlette(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
|
||||
"""Test creating A2A app with Starlette transport."""
|
||||
mock_handler = Mock()
|
||||
mock_app_instance = Mock()
|
||||
mock_built_app = Mock()
|
||||
mock_task_store = Mock()
|
||||
|
||||
mock_task_store_class.return_value = mock_task_store
|
||||
mock_handler_class.return_value = mock_handler
|
||||
mock_app_class.return_value = mock_app_instance
|
||||
mock_app_instance.build.return_value = mock_built_app
|
||||
|
||||
result = create_a2a_app(mock_agent_executor, transport="starlette")
|
||||
|
||||
mock_task_store_class.assert_called_once()
|
||||
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
|
||||
mock_app_class.assert_called_once()
|
||||
mock_app_instance.build.assert_called_once()
|
||||
|
||||
assert result == mock_built_app
|
||||
|
||||
def test_create_a2a_app_fastapi(self, mock_agent_executor):
|
||||
"""Test creating A2A app with FastAPI transport raises error."""
|
||||
from crewai.a2a.exceptions import TransportError
|
||||
with pytest.raises(TransportError, match="FastAPI transport is not available"):
|
||||
create_a2a_app(
|
||||
mock_agent_executor,
|
||||
transport="fastapi",
|
||||
agent_name="Custom Agent",
|
||||
agent_description="Custom description"
|
||||
)
|
||||
|
||||
@patch('crewai.a2a.server.A2AStarletteApplication')
|
||||
@patch('crewai.a2a.server.DefaultRequestHandler')
|
||||
@patch('crewai.a2a.server.InMemoryTaskStore')
|
||||
def test_create_a2a_app_default_transport(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
|
||||
"""Test creating A2A app with default transport."""
|
||||
mock_handler = Mock()
|
||||
mock_app_instance = Mock()
|
||||
mock_built_app = Mock()
|
||||
mock_task_store = Mock()
|
||||
|
||||
mock_task_store_class.return_value = mock_task_store
|
||||
mock_handler_class.return_value = mock_handler
|
||||
mock_app_class.return_value = mock_app_instance
|
||||
mock_app_instance.build.return_value = mock_built_app
|
||||
|
||||
result = create_a2a_app(mock_agent_executor)
|
||||
|
||||
mock_task_store_class.assert_called_once()
|
||||
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
|
||||
mock_app_class.assert_called_once()
|
||||
assert result == mock_built_app
|
||||
|
||||
|
||||
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
|
||||
def test_server_import_error_handling():
|
||||
"""Test that import errors are handled gracefully when A2A is not available."""
|
||||
with pytest.raises(ImportError, match="A2A integration requires"):
|
||||
pass
|
||||
53
tests/a2a/test_server_config.py
Normal file
53
tests/a2a/test_server_config.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Tests for ServerConfig dataclass."""
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from crewai.a2a.server import ServerConfig
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestServerConfig:
|
||||
"""Test ServerConfig dataclass functionality."""
|
||||
|
||||
def test_server_config_defaults(self):
|
||||
"""Test ServerConfig with default values."""
|
||||
config = ServerConfig()
|
||||
|
||||
assert config.host == "localhost"
|
||||
assert config.port == 10001
|
||||
assert config.transport == "starlette"
|
||||
assert config.agent_name is None
|
||||
assert config.agent_description is None
|
||||
|
||||
def test_server_config_custom_values(self):
|
||||
"""Test ServerConfig with custom values."""
|
||||
config = ServerConfig(
|
||||
host="0.0.0.0",
|
||||
port=8080,
|
||||
transport="custom",
|
||||
agent_name="Test Agent",
|
||||
agent_description="A test agent"
|
||||
)
|
||||
|
||||
assert config.host == "0.0.0.0"
|
||||
assert config.port == 8080
|
||||
assert config.transport == "custom"
|
||||
assert config.agent_name == "Test Agent"
|
||||
assert config.agent_description == "A test agent"
|
||||
|
||||
def test_server_config_partial_override(self):
|
||||
"""Test ServerConfig with partial value override."""
|
||||
config = ServerConfig(
|
||||
port=9000,
|
||||
agent_name="Custom Agent"
|
||||
)
|
||||
|
||||
assert config.host == "localhost" # default
|
||||
assert config.port == 9000 # custom
|
||||
assert config.transport == "starlette" # default
|
||||
assert config.agent_name == "Custom Agent" # custom
|
||||
assert config.agent_description is None # default
|
||||
51
tests/a2a/test_task_info.py
Normal file
51
tests/a2a/test_task_info.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Tests for TaskInfo dataclass."""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from unittest.mock import Mock
|
||||
|
||||
try:
|
||||
from crewai.a2a.crew_agent_executor import TaskInfo
|
||||
A2A_AVAILABLE = True
|
||||
except ImportError:
|
||||
A2A_AVAILABLE = False
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
|
||||
class TestTaskInfo:
|
||||
"""Test TaskInfo dataclass functionality."""
|
||||
|
||||
def test_task_info_creation(self):
|
||||
"""Test TaskInfo creation with required fields."""
|
||||
mock_task = Mock()
|
||||
started_at = datetime.now()
|
||||
|
||||
task_info = TaskInfo(task=mock_task, started_at=started_at)
|
||||
|
||||
assert task_info.task == mock_task
|
||||
assert task_info.started_at == started_at
|
||||
assert task_info.status == "running"
|
||||
|
||||
def test_task_info_with_custom_status(self):
|
||||
"""Test TaskInfo creation with custom status."""
|
||||
mock_task = Mock()
|
||||
started_at = datetime.now()
|
||||
|
||||
task_info = TaskInfo(
|
||||
task=mock_task,
|
||||
started_at=started_at,
|
||||
status="completed"
|
||||
)
|
||||
|
||||
assert task_info.status == "completed"
|
||||
|
||||
def test_task_info_status_update(self):
|
||||
"""Test TaskInfo status can be updated."""
|
||||
mock_task = Mock()
|
||||
started_at = datetime.now()
|
||||
|
||||
task_info = TaskInfo(task=mock_task, started_at=started_at)
|
||||
assert task_info.status == "running"
|
||||
|
||||
task_info.status = "cancelled"
|
||||
assert task_info.status == "cancelled"
|
||||
Reference in New Issue
Block a user