Compare commits

..

6 Commits

Author SHA1 Message Date
Devin AI
06a5689e8a Implement AI code review suggestions
- Enhanced SSL function documentation with detailed examples and environment variable precedence
- Added CA bundle file format validation (.pem, .crt, .cer) with warnings for unsupported formats
- Improved error handling with structured solutions and current CA bundle path display
- Added comprehensive tests for file format validation and warning scenarios
- Enhanced user guidance for SSL certificate configuration issues

Addresses feedback from joaomdmoura's AI code review for better documentation,
error handling, and path validation as requested.

Co-Authored-By: João <joao@crewai.com>
2025-06-09 14:46:50 +00:00
Devin AI
2a48e24d98 Address CI failures and code review feedback
- Remove unused pytest imports from test files (fixes lint errors)
- Fix CodeQL security alert by using exact URL validation instead of substring check
- Enhance SSL function documentation with detailed environment variable precedence
- Improve error handling in fetch_provider_data with current CA bundle path display
- Add more helpful guidance for SSL certificate configuration issues

Addresses feedback from AI code review and resolves CI lint/security failures.

Co-Authored-By: João <joao@crewai.com>
2025-06-09 14:35:04 +00:00
Devin AI
4649f00cab Fix SSL certificate verification in provider data fetching
- Add get_ssl_verify_config() function to respect SSL environment variables
- Support REQUESTS_CA_BUNDLE, SSL_CERT_FILE, CURL_CA_BUNDLE env vars
- Fallback to certifi.where() when no custom CA bundle is specified
- Improve error handling for SSL verification failures
- Add comprehensive tests for SSL configuration scenarios

Fixes #2978

Co-Authored-By: João <joao@crewai.com>
2025-06-09 14:27:16 +00:00
Lucas Gomide
8a37b535ed docs: improve docs about planning LLM usage (#2977) 2025-06-09 10:17:04 -04:00
Lucas Gomide
e6ac1311e7 build: upgrade LiteLLM to support latest Openai version (#2963)
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-06-09 08:55:12 -04:00
Akshit Madan
b0d89698fd docs: added Maxim support for Agent Observability (#2861)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* docs: added Maxim support for Agent Observability

* enhanced the maxim integration doc page as per the github PR reviewer bot suggestions

* Update maxim-observability.mdx

* Update maxim-observability.mdx

- Fixed Python version, >=3.10
- added expected_output field in Task
- Removed marketing links and added github link

* added maxim in observability

---------

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-06-08 13:39:01 -04:00
24 changed files with 363 additions and 1443 deletions

View File

@@ -1,184 +0,0 @@
# A2A Protocol Integration
CrewAI supports the A2A (Agent-to-Agent) protocol, enabling your crews to participate in remote agent interoperability. This allows CrewAI crews to be exposed as remotely accessible agents that can communicate with other A2A-compatible systems.
## Overview
The A2A protocol is Google's standard for agent interoperability that enables bidirectional communication between agents. CrewAI's A2A integration provides:
- **Remote Interoperability**: Expose crews as A2A-compatible agents
- **Bidirectional Communication**: Enable full-duplex agent interactions
- **Protocol Compliance**: Full support for A2A specifications
- **Transport Flexibility**: Support for multiple transport protocols
## Installation
A2A support is available as an optional dependency:
```bash
pip install crewai[a2a]
```
## Basic Usage
### Creating an A2A Server
```python
from crewai import Agent, Crew, Task
from crewai.a2a import CrewAgentExecutor, start_a2a_server
# Create your crew
agent = Agent(
role="Assistant",
goal="Help users with their queries",
backstory="A helpful AI assistant"
)
task = Task(
description="Help with: {query}",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
# Create A2A executor
executor = CrewAgentExecutor(crew)
# Start A2A server
start_a2a_server(executor, host="0.0.0.0", port=10001)
```
### Custom Configuration
```python
from crewai.a2a import CrewAgentExecutor, create_a2a_app
# Create executor with custom content types
executor = CrewAgentExecutor(
crew=crew,
supported_content_types=['text', 'application/json', 'image/png']
)
# Create custom A2A app
app = create_a2a_app(
executor,
agent_name="My Research Crew",
agent_description="A specialized research and analysis crew",
transport="starlette"
)
# Run with custom ASGI server
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
```
## Key Features
### CrewAgentExecutor
The `CrewAgentExecutor` class wraps CrewAI crews to implement the A2A `AgentExecutor` interface:
- **Asynchronous Execution**: Crews run asynchronously within the A2A protocol
- **Task Management**: Automatic handling of task lifecycle and cancellation
- **Error Handling**: Robust error handling with A2A-compliant responses
- **Output Conversion**: Automatic conversion of crew outputs to A2A artifacts
### Server Utilities
Convenience functions for starting A2A servers:
- `start_a2a_server()`: Quick server startup with default configuration
- `create_a2a_app()`: Create custom A2A applications for advanced use cases
## Protocol Compliance
CrewAI's A2A integration provides full protocol compliance:
- **Agent Cards**: Automatic generation of agent capability descriptions
- **Task Execution**: Asynchronous task processing with event queues
- **Artifact Management**: Conversion of crew outputs to A2A artifacts
- **Error Handling**: A2A-compliant error responses and status codes
## Use Cases
### Remote Agent Networks
Expose CrewAI crews as part of larger agent networks:
```python
# Multi-agent system with specialized crews
research_crew = create_research_crew()
analysis_crew = create_analysis_crew()
writing_crew = create_writing_crew()
# Expose each as A2A agents on different ports
start_a2a_server(CrewAgentExecutor(research_crew), port=10001)
start_a2a_server(CrewAgentExecutor(analysis_crew), port=10002)
start_a2a_server(CrewAgentExecutor(writing_crew), port=10003)
```
### Cross-Platform Integration
Enable CrewAI crews to work with other agent frameworks:
```python
# CrewAI crew accessible to other A2A-compatible systems
executor = CrewAgentExecutor(crew)
start_a2a_server(executor, host="0.0.0.0", port=10001)
# Other systems can now invoke this crew remotely
```
## Advanced Configuration
### Custom Agent Cards
```python
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
# Custom agent card for specialized capabilities
agent_card = AgentCard(
name="Specialized Research Crew",
description="Advanced research and analysis capabilities",
version="2.0.0",
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False
),
skills=[
AgentSkill(
id="research",
name="Research Analysis",
description="Comprehensive research and analysis",
tags=["research", "analysis", "data"]
)
]
)
```
### Error Handling
The A2A integration includes comprehensive error handling:
- **Validation Errors**: Input validation with clear error messages
- **Execution Errors**: Crew execution errors converted to A2A artifacts
- **Cancellation**: Proper task cancellation support
- **Timeouts**: Configurable timeout handling
## Best Practices
1. **Resource Management**: Monitor crew resource usage in server environments
2. **Error Handling**: Implement proper error handling in crew tasks
3. **Security**: Use appropriate authentication and authorization
4. **Monitoring**: Monitor A2A server performance and health
5. **Scaling**: Consider load balancing for high-traffic scenarios
## Limitations
- **Optional Dependency**: A2A support requires additional dependencies
- **Transport Support**: Currently supports Starlette transport only
- **Synchronous Crews**: Crews execute synchronously within async A2A context
## Examples
See the `examples/a2a_integration_example.py` file for a complete working example of A2A integration with CrewAI.

View File

@@ -29,6 +29,10 @@ my_crew = Crew(
From this point on, your crew will have planning enabled, and the tasks will be planned before each iteration.
<Warning>
When planning is enabled, crewAI will use `gpt-4o-mini` as the default LLM for planning, which requires a valid OpenAI API key. Since your agents might be using different LLMs, this could cause confusion if you don't have an OpenAI API key configured or if you're experiencing unexpected behavior related to LLM API calls.
</Warning>
#### Planning LLM
Now you can define the LLM that will be used to plan the tasks.

View File

@@ -201,6 +201,7 @@
"observability/arize-phoenix",
"observability/langfuse",
"observability/langtrace",
"observability/maxim",
"observability/mlflow",
"observability/openlit",
"observability/opik",

View File

@@ -0,0 +1,152 @@
---
title: Maxim Integration
description: Start Agent monitoring, evaluation, and observability
icon: bars-staggered
---
# Maxim Integration
Maxim AI provides comprehensive agent monitoring, evaluation, and observability for your CrewAI applications. With Maxim's one-line integration, you can easily trace and analyse agent interactions, performance metrics, and more.
## Features: One Line Integration
- **End-to-End Agent Tracing**: Monitor the complete lifecycle of your agents
- **Performance Analytics**: Track latency, tokens consumed, and costs
- **Hyperparameter Monitoring**: View the configuration details of your agent runs
- **Tool Call Tracking**: Observe when and how agents use their tools
- **Advanced Visualisation**: Understand agent trajectories through intuitive dashboards
## Getting Started
### Prerequisites
- Python version >=3.10
- A Maxim account ([sign up here](https://getmaxim.ai/))
- A CrewAI project
### Installation
Install the Maxim SDK via pip:
```python
pip install maxim-py>=3.6.2
```
Or add it to your `requirements.txt`:
```
maxim-py>=3.6.2
```
### Basic Setup
### 1. Set up environment variables
```python
### Environment Variables Setup
# Create a `.env` file in your project root:
# Maxim API Configuration
MAXIM_API_KEY=your_api_key_here
MAXIM_LOG_REPO_ID=your_repo_id_here
```
### 2. Import the required packages
```python
from crewai import Agent, Task, Crew, Process
from maxim import Maxim
from maxim.logger.crewai import instrument_crewai
```
### 3. Initialise Maxim with your API key
```python
# Initialize Maxim logger
logger = Maxim().logger()
# Instrument CrewAI with just one line
instrument_crewai(logger)
```
### 4. Create and run your CrewAI application as usual
```python
# Create your agent
researcher = Agent(
role='Senior Research Analyst',
goal='Uncover cutting-edge developments in AI',
backstory="You are an expert researcher at a tech think tank...",
verbose=True,
llm=llm
)
# Define the task
research_task = Task(
description="Research the latest AI advancements...",
expected_output="",
agent=researcher
)
# Configure and run the crew
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)
try:
result = crew.kickoff()
finally:
maxim.cleanup() # Ensure cleanup happens even if errors occur
```
That's it! All your CrewAI agent interactions will now be logged and available in your Maxim dashboard.
Check this Google Colab Notebook for a quick reference - [Notebook](https://colab.research.google.com/drive/1ZKIZWsmgQQ46n8TH9zLsT1negKkJA6K8?usp=sharing)
## Viewing Your Traces
After running your CrewAI application:
![Example trace in Maxim showing agent interactions](https://raw.githubusercontent.com/maximhq/maxim-docs/master/images/Screenshot2025-05-14at12.10.58PM.png)
1. Log in to your [Maxim Dashboard](https://getmaxim.ai/dashboard)
2. Navigate to your repository
3. View detailed agent traces, including:
- Agent conversations
- Tool usage patterns
- Performance metrics
- Cost analytics
## Troubleshooting
### Common Issues
- **No traces appearing**: Ensure your API key and repository ID are correc
- Ensure you've **called `instrument_crewai()`** ***before*** running your crew. This initializes logging hooks correctly.
- Set `debug=True` in your `instrument_crewai()` call to surface any internal errors:
```python
instrument_crewai(logger, debug=True)
```
- Configure your agents with `verbose=True` to capture detailed logs:
```python
agent = CrewAgent(..., verbose=True)
```
- Double-check that `instrument_crewai()` is called **before** creating or executing agents. This might be obvious, but it's a common oversight.
### Support
If you encounter any issues:
- Check the [Maxim Documentation](https://getmaxim.ai/docs)
- Maxim Github [Link](https://github.com/maximhq)

View File

@@ -1,64 +0,0 @@
"""Example: CrewAI A2A Integration
This example demonstrates how to expose a CrewAI crew as an A2A (Agent-to-Agent)
protocol server for remote interoperability.
Requirements:
pip install crewai[a2a]
"""
from crewai import Agent, Crew, Task
from crewai.a2a import CrewAgentExecutor, start_a2a_server
def main():
"""Create and start an A2A server with a CrewAI crew."""
researcher = Agent(
role="Research Analyst",
goal="Provide comprehensive research and analysis on any topic",
backstory=(
"You are an experienced research analyst with expertise in "
"gathering, analyzing, and synthesizing information from various sources."
),
verbose=True
)
research_task = Task(
description=(
"Research and analyze the topic: {query}\n"
"Provide a comprehensive overview including:\n"
"- Key concepts and definitions\n"
"- Current trends and developments\n"
"- Important considerations\n"
"- Relevant examples or case studies"
),
agent=researcher,
expected_output="A detailed research report with analysis and insights"
)
research_crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)
executor = CrewAgentExecutor(
crew=research_crew,
supported_content_types=['text', 'text/plain', 'application/json']
)
print("Starting A2A server with CrewAI research crew...")
print("Server will be available at http://localhost:10001")
print("Use the A2A CLI or SDK to interact with the crew remotely")
start_a2a_server(
executor,
host="0.0.0.0",
port=10001,
transport="starlette"
)
if __name__ == "__main__":
main()

View File

@@ -11,7 +11,7 @@ dependencies = [
# Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
"litellm==1.68.0",
"litellm==1.72.0",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
@@ -65,8 +65,8 @@ mem0 = ["mem0ai>=0.1.94"]
docling = [
"docling>=2.12.0",
]
a2a = [
"a2a-sdk>=0.0.1",
aisuite = [
"aisuite>=0.1.10",
]
[tool.uv]

View File

@@ -32,13 +32,3 @@ __all__ = [
"TaskOutput",
"LLMGuardrail",
]
try:
from crewai.a2a import ( # noqa: F401
CrewAgentExecutor,
start_a2a_server,
create_a2a_app
)
__all__.extend(["CrewAgentExecutor", "start_a2a_server", "create_a2a_app"])
except ImportError:
pass

View File

@@ -1,62 +0,0 @@
"""A2A (Agent-to-Agent) protocol integration for CrewAI.
This module provides integration with the A2A protocol to enable remote agent
interoperability. It allows CrewAI crews to be exposed as A2A-compatible agents
that can communicate with other agents following the A2A protocol standard.
The integration is optional and requires the 'a2a' extra dependency:
pip install crewai[a2a]
Example:
from crewai import Agent, Crew, Task
from crewai.a2a import CrewAgentExecutor, start_a2a_server
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
task = Task(description="Help with {query}", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
executor = CrewAgentExecutor(crew)
start_a2a_server(executor, host="localhost", port=8080)
"""
try:
from .crew_agent_executor import CrewAgentExecutor
from .server import start_a2a_server, create_a2a_app
from .server_config import ServerConfig
from .task_info import TaskInfo
from .exceptions import A2AServerError, TransportError, ExecutionError
__all__ = [
"CrewAgentExecutor",
"start_a2a_server",
"create_a2a_app",
"ServerConfig",
"TaskInfo",
"A2AServerError",
"TransportError",
"ExecutionError"
]
except ImportError:
import warnings
warnings.warn(
"A2A integration requires the 'a2a' extra dependency. "
"Install with: pip install crewai[a2a]",
ImportWarning
)
def _missing_dependency(*args, **kwargs):
raise ImportError(
"A2A integration requires the 'a2a' extra dependency. "
"Install with: pip install crewai[a2a]"
)
CrewAgentExecutor = _missing_dependency # type: ignore
start_a2a_server = _missing_dependency # type: ignore
create_a2a_app = _missing_dependency # type: ignore
ServerConfig = _missing_dependency # type: ignore
TaskInfo = _missing_dependency # type: ignore
A2AServerError = _missing_dependency # type: ignore
TransportError = _missing_dependency # type: ignore
ExecutionError = _missing_dependency # type: ignore
__all__ = []

View File

@@ -1,255 +0,0 @@
"""CrewAI Agent Executor for A2A Protocol Integration.
This module implements the A2A AgentExecutor interface to enable CrewAI crews
to participate in the Agent-to-Agent protocol for remote interoperability.
"""
import asyncio
import json
import logging
from typing import Any, Dict, Optional
from crewai import Crew
from crewai.crew import CrewOutput
from .task_info import TaskInfo
try:
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.types import (
InvalidParamsError,
Part,
Task,
TextPart,
UnsupportedOperationError,
)
from a2a.utils import completed_task, new_artifact
from a2a.utils.errors import ServerError
except ImportError:
raise ImportError(
"A2A integration requires the 'a2a' extra dependency. "
"Install with: pip install crewai[a2a]"
)
logger = logging.getLogger(__name__)
class CrewAgentExecutor(AgentExecutor):
"""A2A Agent Executor that wraps CrewAI crews for remote interoperability.
This class implements the A2A AgentExecutor interface to enable CrewAI crews
to be exposed as remotely interoperable agents following the A2A protocol.
Args:
crew: The CrewAI crew to expose as an A2A agent
supported_content_types: List of supported content types for input
Example:
from crewai import Agent, Crew, Task
from crewai.a2a import CrewAgentExecutor
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
task = Task(description="Help with {query}", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
executor = CrewAgentExecutor(crew)
"""
def __init__(
self,
crew: Crew,
supported_content_types: Optional[list[str]] = None
):
"""Initialize the CrewAgentExecutor.
Args:
crew: The CrewAI crew to wrap
supported_content_types: List of supported content types
"""
self.crew = crew
self.supported_content_types = supported_content_types or [
'text', 'text/plain'
]
self._running_tasks: Dict[str, TaskInfo] = {}
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
"""Execute the crew with the given context and publish results to event queue.
This method extracts the user input from the request context, executes
the CrewAI crew, and publishes the results as A2A artifacts.
Args:
context: The A2A request context containing task details
event_queue: Queue for publishing execution events and results
Raises:
ServerError: If validation fails or execution encounters an error
"""
error = self._validate_request(context)
if error:
logger.error(f"Request validation failed: {error}")
raise ServerError(error=InvalidParamsError())
query = context.get_user_input()
task_id = context.task_id
context_id = context.context_id
if not task_id or not context_id:
raise ServerError(error=InvalidParamsError())
logger.info(f"Executing crew for task {task_id} with query: {query}")
try:
inputs = {"query": query}
execution_task = asyncio.create_task(
self._execute_crew_async(inputs)
)
from datetime import datetime
self._running_tasks[task_id] = TaskInfo(
task=execution_task,
started_at=datetime.now(),
status="running"
)
result = await execution_task
self._running_tasks.pop(task_id, None)
logger.info(f"Crew execution completed for task {task_id}")
parts = self._convert_output_to_parts(result)
messages = [context.message] if context.message else []
event_queue.enqueue_event(
completed_task(
task_id,
context_id,
[new_artifact(parts, f"crew_output_{task_id}")],
messages,
)
)
except asyncio.CancelledError:
logger.info(f"Task {task_id} was cancelled")
self._running_tasks.pop(task_id, None)
raise
except Exception as e:
logger.error(f"Error executing crew for task {task_id}: {e}")
self._running_tasks.pop(task_id, None)
error_parts = [
Part(root=TextPart(text=f"Error executing crew: {str(e)}"))
]
messages = [context.message] if context.message else []
event_queue.enqueue_event(
completed_task(
task_id,
context_id,
[new_artifact(error_parts, f"error_{task_id}")],
messages,
)
)
raise ServerError(
error=InvalidParamsError()
) from e
async def cancel(
self,
request: RequestContext,
event_queue: EventQueue
) -> Task | None:
"""Cancel a running crew execution.
Args:
request: The A2A request context for the task to cancel
event_queue: Event queue for publishing cancellation events
Returns:
None (cancellation is handled internally)
Raises:
ServerError: If the task cannot be cancelled
"""
task_id = request.task_id
if task_id in self._running_tasks:
task_info = self._running_tasks[task_id]
task_info.task.cancel()
task_info.update_status("cancelled")
try:
await task_info.task
except asyncio.CancelledError:
logger.info(f"Successfully cancelled task {task_id}")
pass
self._running_tasks.pop(task_id, None)
return None
else:
logger.warning(f"Task {task_id} not found for cancellation")
raise ServerError(error=UnsupportedOperationError())
async def _execute_crew_async(self, inputs: Dict[str, Any]) -> CrewOutput:
"""Execute the crew asynchronously.
Args:
inputs: Input parameters for the crew
Returns:
The crew execution output
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.crew.kickoff, inputs)
def _convert_output_to_parts(self, result: CrewOutput) -> list[Part]:
"""Convert CrewAI output to A2A Parts.
Args:
result: The crew execution result
Returns:
List of A2A Parts representing the output
"""
parts = []
if hasattr(result, 'raw') and result.raw:
parts.append(Part(root=TextPart(text=str(result.raw))))
elif result:
parts.append(Part(root=TextPart(text=str(result))))
if hasattr(result, 'json_dict') and result.json_dict:
json_output = json.dumps(result.json_dict, indent=2)
parts.append(Part(root=TextPart(text=json_output)))
if not parts:
parts.append(Part(root=TextPart(text="Crew execution completed successfully")))
return parts
def _validate_request(self, context: RequestContext) -> Optional[str]:
"""Validate the incoming request context.
Args:
context: The A2A request context to validate
Returns:
Error message if validation fails, None if valid
"""
try:
user_input = context.get_user_input()
if not user_input or not user_input.strip():
return "Empty or missing user input"
return None
except Exception as e:
return f"Failed to extract user input: {e}"

View File

@@ -1,16 +0,0 @@
"""Custom exceptions for A2A integration."""
class A2AServerError(Exception):
"""Base exception for A2A server errors."""
pass
class TransportError(A2AServerError):
"""Error related to transport configuration."""
pass
class ExecutionError(A2AServerError):
"""Error during crew execution."""
pass

View File

@@ -1,151 +0,0 @@
"""A2A Server utilities for CrewAI integration.
This module provides convenience functions for starting A2A servers with CrewAI
crews, supporting multiple transport protocols and configurations.
"""
import logging
from typing import Optional
from .exceptions import TransportError
from .server_config import ServerConfig
try:
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
except ImportError:
raise ImportError(
"A2A integration requires the 'a2a' extra dependency. "
"Install with: pip install crewai[a2a]"
)
logger = logging.getLogger(__name__)
def start_a2a_server(
agent_executor: AgentExecutor,
host: str = "localhost",
port: int = 10001,
transport: str = "starlette",
config: Optional[ServerConfig] = None,
**kwargs
) -> None:
"""Start an A2A server with the given agent executor.
This is a convenience function that creates and starts an A2A server
with the specified configuration.
Args:
agent_executor: The A2A agent executor to serve
host: Host address to bind the server to
port: Port number to bind the server to
transport: Transport protocol to use ("starlette" or "fastapi")
config: Optional ServerConfig object to override individual parameters
**kwargs: Additional arguments passed to the server
Example:
from crewai import Agent, Crew, Task
from crewai.a2a import CrewAgentExecutor, start_a2a_server
agent = Agent(role="Assistant", goal="Help users", backstory="Helpful AI")
task = Task(description="Help with {query}", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
executor = CrewAgentExecutor(crew)
start_a2a_server(executor, host="0.0.0.0", port=8080)
"""
if config:
host = config.host
port = config.port
transport = config.transport
app = create_a2a_app(
agent_executor,
transport=transport,
agent_name=config.agent_name if config else None,
agent_description=config.agent_description if config else None,
**kwargs
)
logger.info(f"Starting A2A server on {host}:{port} using {transport} transport")
try:
import uvicorn
uvicorn.run(app, host=host, port=port)
except ImportError:
raise ImportError("uvicorn is required to run the A2A server. Install with: pip install uvicorn")
def create_a2a_app(
agent_executor: AgentExecutor,
transport: str = "starlette",
agent_name: Optional[str] = None,
agent_description: Optional[str] = None,
**kwargs
):
"""Create an A2A application with the given agent executor.
This function creates an A2A server application that can be run
with any ASGI server.
Args:
agent_executor: The A2A agent executor to serve
transport: Transport protocol to use ("starlette" or "fastapi")
agent_name: Optional name for the agent
agent_description: Optional description for the agent
**kwargs: Additional arguments passed to the transport
Returns:
ASGI application ready to be served
Example:
from crewai.a2a import CrewAgentExecutor, create_a2a_app
executor = CrewAgentExecutor(crew)
app = create_a2a_app(
executor,
agent_name="My Crew Agent",
agent_description="A helpful CrewAI agent"
)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
"""
agent_card = AgentCard(
name=agent_name or "CrewAI Agent",
description=agent_description or "A CrewAI agent exposed via A2A protocol",
version="1.0.0",
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False
),
defaultInputModes=["text"],
defaultOutputModes=["text"],
skills=[
AgentSkill(
id="crew_execution",
name="Crew Execution",
description="Execute CrewAI crew tasks with multiple agents",
examples=["Process user queries", "Coordinate multi-agent workflows"],
tags=["crewai", "multi-agent", "workflow"]
)
],
url="https://github.com/crewAIInc/crewAI"
)
task_store = InMemoryTaskStore()
request_handler = DefaultRequestHandler(agent_executor, task_store)
if transport.lower() == "fastapi":
raise TransportError("FastAPI transport is not available in the current A2A SDK version")
else:
app_instance = A2AStarletteApplication(
agent_card=agent_card,
http_handler=request_handler,
**kwargs
)
return app_instance.build()

View File

@@ -1,25 +0,0 @@
"""Server configuration for A2A integration."""
from dataclasses import dataclass
from typing import Optional
@dataclass
class ServerConfig:
"""Configuration for A2A server.
This class encapsulates server settings to improve readability
and flexibility for server setups.
Attributes:
host: Host address to bind the server to
port: Port number to bind the server to
transport: Transport protocol to use ("starlette" or "fastapi")
agent_name: Optional name for the agent
agent_description: Optional description for the agent
"""
host: str = "localhost"
port: int = 10001
transport: str = "starlette"
agent_name: Optional[str] = None
agent_description: Optional[str] = None

View File

@@ -1,47 +0,0 @@
"""Task information tracking for A2A integration."""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import asyncio
@dataclass
class TaskInfo:
"""Information about a running task in the A2A executor.
This class tracks the lifecycle and status of tasks being executed
by the CrewAgentExecutor, providing better task management capabilities.
Attributes:
task: The asyncio task being executed
started_at: When the task was started
status: Current status of the task ("running", "completed", "cancelled", "failed")
"""
task: asyncio.Task
started_at: datetime
status: str = "running"
def update_status(self, new_status: str) -> None:
"""Update the task status.
Args:
new_status: The new status to set
"""
self.status = new_status
@property
def is_running(self) -> bool:
"""Check if the task is currently running."""
return self.status == "running" and not self.task.done()
@property
def duration(self) -> Optional[float]:
"""Get the duration of the task in seconds.
Returns:
Duration in seconds if task is completed, None if still running
"""
if self.task.done():
return (datetime.now() - self.started_at).total_seconds()
return None

View File

@@ -1,8 +1,10 @@
import json
import os
import time
from collections import defaultdict
from pathlib import Path
import certifi
import click
import requests
@@ -153,6 +155,41 @@ def read_cache_file(cache_file):
return None
def get_ssl_verify_config():
"""
Get SSL verification configuration from environment variables or use certifi default.
Environment Variables (checked in order of precedence):
REQUESTS_CA_BUNDLE: Path to the primary CA bundle file.
SSL_CERT_FILE: Path to the secondary CA bundle file.
CURL_CA_BUNDLE: Path to the tertiary CA bundle file.
Returns:
str: Path to CA bundle file or certifi default path.
Example:
>>> get_ssl_verify_config()
'/path/to/ca-bundle.pem'
>>> os.environ['REQUESTS_CA_BUNDLE'] = '/custom/ca-bundle.pem'
>>> get_ssl_verify_config()
'/custom/ca-bundle.pem'
"""
for env_var in ['REQUESTS_CA_BUNDLE', 'SSL_CERT_FILE', 'CURL_CA_BUNDLE']:
ca_bundle = os.environ.get(env_var)
if ca_bundle:
ca_path = Path(ca_bundle)
if ca_path.is_file() and ca_path.suffix in ['.pem', '.crt', '.cer']:
return str(ca_path)
elif ca_path.is_file():
click.secho(f"Warning: CA bundle file {ca_bundle} may not be in expected format (.pem, .crt, .cer)", fg="yellow")
return str(ca_path)
else:
click.secho(f"Warning: CA bundle path {ca_bundle} from {env_var} does not exist", fg="yellow")
return certifi.where()
def fetch_provider_data(cache_file):
"""
Fetches provider data from a specified URL and caches it to a file.
@@ -163,13 +200,22 @@ def fetch_provider_data(cache_file):
Returns:
- dict or None: The fetched provider data or None if the operation fails.
"""
ssl_config = get_ssl_verify_config()
try:
response = requests.get(JSON_URL, stream=True, timeout=60)
response = requests.get(JSON_URL, stream=True, timeout=60, verify=ssl_config)
response.raise_for_status()
data = download_data(response)
with open(cache_file, "w") as f:
json.dump(data, f)
return data
except requests.exceptions.SSLError as e:
click.secho(f"SSL certificate verification failed: {e}", fg="red")
click.secho(f"Current CA bundle path: {ssl_config}", fg="yellow")
click.secho("Solutions:", fg="cyan")
click.secho(" 1. Set REQUESTS_CA_BUNDLE environment variable to your CA bundle path", fg="yellow")
click.secho(" 2. Ensure your CA bundle file is in .pem, .crt, or .cer format", fg="yellow")
click.secho(" 3. Contact your system administrator for the correct CA bundle", fg="yellow")
return None
except requests.RequestException as e:
click.secho(f"Error fetching provider data: {e}", fg="red")
except json.JSONDecodeError:

View File

@@ -1 +0,0 @@
"""Tests for CrewAI A2A integration."""

View File

@@ -1,198 +0,0 @@
"""Tests for CrewAgentExecutor class."""
import asyncio
import pytest
from unittest.mock import Mock, patch
from crewai.crews.crew_output import CrewOutput
try:
from crewai.a2a import CrewAgentExecutor
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
pass # Imports handled in test methods as needed
from a2a.utils.errors import ServerError
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestCrewAgentExecutor:
"""Test cases for CrewAgentExecutor."""
@pytest.fixture
def sample_crew(self):
"""Create a sample crew for testing."""
from unittest.mock import Mock
mock_crew = Mock()
mock_crew.agents = []
mock_crew.tasks = []
return mock_crew
@pytest.fixture
def crew_executor(self, sample_crew):
"""Create a CrewAgentExecutor for testing."""
return CrewAgentExecutor(sample_crew)
@pytest.fixture
def mock_context(self):
"""Create a mock RequestContext."""
from a2a.types import Message, Part, TextPart
context = Mock(spec=RequestContext)
context.task_id = "test-task-123"
context.context_id = "test-context-456"
context.message = Message(
messageId="msg-123",
taskId="test-task-123",
contextId="test-context-456",
role="user",
parts=[Part(root=TextPart(text="Test message"))]
)
context.get_user_input.return_value = "Test query"
return context
@pytest.fixture
def mock_event_queue(self):
"""Create a mock EventQueue."""
return Mock(spec=EventQueue)
def test_init(self, sample_crew):
"""Test CrewAgentExecutor initialization."""
executor = CrewAgentExecutor(sample_crew)
assert executor.crew == sample_crew
assert executor.supported_content_types == ['text', 'text/plain']
assert executor._running_tasks == {}
def test_init_with_custom_content_types(self, sample_crew):
"""Test CrewAgentExecutor initialization with custom content types."""
custom_types = ['text', 'application/json']
executor = CrewAgentExecutor(sample_crew, supported_content_types=custom_types)
assert executor.supported_content_types == custom_types
@pytest.mark.asyncio
async def test_execute_success(self, crew_executor, mock_context, mock_event_queue):
"""Test successful crew execution."""
mock_output = CrewOutput(raw="Test response", json_dict=None)
with patch.object(crew_executor, '_execute_crew_async', return_value=mock_output):
await crew_executor.execute(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
assert len(crew_executor._running_tasks) == 0
@pytest.mark.asyncio
async def test_execute_with_validation_error(self, crew_executor, mock_event_queue):
"""Test execution with validation error."""
bad_context = Mock(spec=RequestContext)
bad_context.get_user_input.return_value = ""
with pytest.raises(ServerError):
await crew_executor.execute(bad_context, mock_event_queue)
@pytest.mark.asyncio
async def test_execute_with_crew_error(self, crew_executor, mock_context, mock_event_queue):
"""Test execution when crew raises an error."""
with patch.object(crew_executor, '_execute_crew_async', side_effect=Exception("Crew error")):
with pytest.raises(ServerError):
await crew_executor.execute(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
@pytest.mark.asyncio
async def test_cancel_existing_task(self, crew_executor, mock_event_queue):
"""Test cancelling an existing task."""
cancel_context = Mock(spec=RequestContext)
cancel_context.task_id = "test-task-123"
async def dummy_task():
await asyncio.sleep(10)
mock_task = asyncio.create_task(dummy_task())
from crewai.a2a.crew_agent_executor import TaskInfo
from datetime import datetime
task_info = TaskInfo(task=mock_task, started_at=datetime.now())
crew_executor._running_tasks["test-task-123"] = task_info
result = await crew_executor.cancel(cancel_context, mock_event_queue)
assert result is None
assert "test-task-123" not in crew_executor._running_tasks
assert mock_task.cancelled()
@pytest.mark.asyncio
async def test_cancel_nonexistent_task(self, crew_executor, mock_event_queue):
"""Test cancelling a task that doesn't exist."""
cancel_context = Mock(spec=RequestContext)
cancel_context.task_id = "nonexistent-task"
with pytest.raises(ServerError):
await crew_executor.cancel(cancel_context, mock_event_queue)
def test_convert_output_to_parts_with_raw(self, crew_executor):
"""Test converting crew output with raw content to A2A parts."""
output = Mock()
output.raw = "Test response"
output.json_dict = None
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 1
assert parts[0].root.text == "Test response"
def test_convert_output_to_parts_with_json(self, crew_executor):
"""Test converting crew output with JSON data to A2A parts."""
output = Mock()
output.raw = "Test response"
output.json_dict = {"key": "value"}
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 2
assert parts[0].root.text == "Test response"
assert '"key": "value"' in parts[1].root.text
def test_convert_output_to_parts_empty(self, crew_executor):
"""Test converting empty crew output to A2A parts."""
output = ""
parts = crew_executor._convert_output_to_parts(output)
assert len(parts) == 1
assert parts[0].root.text == "Crew execution completed successfully"
def test_validate_request_valid(self, crew_executor, mock_context):
"""Test request validation with valid input."""
error = crew_executor._validate_request(mock_context)
assert error is None
def test_validate_request_empty_input(self, crew_executor):
"""Test request validation with empty input."""
context = Mock(spec=RequestContext)
context.get_user_input.return_value = ""
error = crew_executor._validate_request(context)
assert error == "Empty or missing user input"
def test_validate_request_whitespace_input(self, crew_executor):
"""Test request validation with whitespace-only input."""
context = Mock(spec=RequestContext)
context.get_user_input.return_value = " \n\t "
error = crew_executor._validate_request(context)
assert error == "Empty or missing user input"
def test_validate_request_exception(self, crew_executor):
"""Test request validation when get_user_input raises exception."""
context = Mock(spec=RequestContext)
context.get_user_input.side_effect = Exception("Input error")
error = crew_executor._validate_request(context)
assert "Failed to extract user input" in error
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
def test_import_error_handling():
"""Test that import errors are handled gracefully when A2A is not available."""
with pytest.raises(ImportError, match="A2A integration requires"):
pass

View File

@@ -1,56 +0,0 @@
"""Tests for A2A custom exceptions."""
import pytest
try:
from crewai.a2a.crew_agent_executor import (
A2AServerError,
TransportError,
ExecutionError
)
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestA2AExceptions:
"""Test A2A custom exception classes."""
def test_a2a_server_error_base(self):
"""Test A2AServerError base exception."""
error = A2AServerError("Base error message")
assert str(error) == "Base error message"
assert isinstance(error, Exception)
def test_transport_error_inheritance(self):
"""Test TransportError inherits from A2AServerError."""
error = TransportError("Transport configuration failed")
assert str(error) == "Transport configuration failed"
assert isinstance(error, A2AServerError)
assert isinstance(error, Exception)
def test_execution_error_inheritance(self):
"""Test ExecutionError inherits from A2AServerError."""
error = ExecutionError("Crew execution failed")
assert str(error) == "Crew execution failed"
assert isinstance(error, A2AServerError)
assert isinstance(error, Exception)
def test_exception_raising(self):
"""Test that exceptions can be raised and caught."""
with pytest.raises(TransportError) as exc_info:
raise TransportError("Test transport error")
assert str(exc_info.value) == "Test transport error"
with pytest.raises(ExecutionError) as exc_info:
raise ExecutionError("Test execution error")
assert str(exc_info.value) == "Test execution error"
with pytest.raises(A2AServerError):
raise TransportError("Should be caught as base class")

View File

@@ -1,122 +0,0 @@
"""Integration tests for CrewAI A2A functionality."""
import pytest
from unittest.mock import Mock, patch
try:
from crewai.a2a import CrewAgentExecutor, create_a2a_app
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestA2AIntegration:
"""Integration tests for A2A functionality."""
@pytest.fixture
def sample_crew(self):
"""Create a sample crew for integration testing."""
from unittest.mock import Mock
mock_crew = Mock()
mock_crew.agents = []
mock_crew.tasks = []
return mock_crew
def test_end_to_end_integration(self, sample_crew):
"""Test end-to-end A2A integration."""
executor = CrewAgentExecutor(sample_crew)
assert executor.crew == sample_crew
assert isinstance(executor.supported_content_types, list)
with patch('crewai.a2a.server.A2AStarletteApplication') as mock_app_class:
with patch('crewai.a2a.server.DefaultRequestHandler') as mock_handler_class:
with patch('crewai.a2a.server.InMemoryTaskStore') as mock_task_store_class:
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
app = create_a2a_app(executor)
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(executor, mock_task_store)
mock_app_class.assert_called_once()
assert app == mock_built_app
def test_crew_with_multiple_agents(self):
"""Test A2A integration with multi-agent crew."""
from unittest.mock import Mock
crew = Mock()
crew.agents = [Mock(), Mock()]
crew.tasks = [Mock(), Mock()]
executor = CrewAgentExecutor(crew)
assert executor.crew == crew
assert len(executor.crew.agents) == 2
assert len(executor.crew.tasks) == 2
def test_custom_content_types(self, sample_crew):
"""Test A2A integration with custom content types."""
custom_types = ['text', 'application/json', 'image/png']
executor = CrewAgentExecutor(
sample_crew,
supported_content_types=custom_types
)
assert executor.supported_content_types == custom_types
@patch('uvicorn.run')
def test_server_startup_integration(self, mock_uvicorn_run, sample_crew):
"""Test server startup integration."""
from crewai.a2a import start_a2a_server
executor = CrewAgentExecutor(sample_crew)
with patch('crewai.a2a.server.create_a2a_app') as mock_create_app:
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(
executor,
host="127.0.0.1",
port=9999,
transport="starlette"
)
mock_create_app.assert_called_once_with(
executor,
transport="starlette",
agent_name=None,
agent_description=None
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="127.0.0.1",
port=9999
)
def test_optional_import_in_main_module():
"""Test that A2A classes are optionally imported in main module."""
import crewai
if A2A_AVAILABLE:
assert hasattr(crewai, 'CrewAgentExecutor')
assert hasattr(crewai, 'start_a2a_server')
assert hasattr(crewai, 'create_a2a_app')
assert 'CrewAgentExecutor' in crewai.__all__
assert 'start_a2a_server' in crewai.__all__
assert 'create_a2a_app' in crewai.__all__
else:
assert not hasattr(crewai, 'CrewAgentExecutor')
assert not hasattr(crewai, 'start_a2a_server')
assert not hasattr(crewai, 'create_a2a_app')

View File

@@ -1,134 +0,0 @@
"""Tests for A2A server utilities."""
import pytest
from unittest.mock import Mock, patch
try:
from crewai.a2a import start_a2a_server, create_a2a_app
from a2a.server.agent_execution.agent_executor import AgentExecutor
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestA2AServer:
"""Test cases for A2A server utilities."""
@pytest.fixture
def mock_agent_executor(self):
"""Create a mock AgentExecutor."""
return Mock(spec=AgentExecutor)
@patch('uvicorn.run')
@patch('crewai.a2a.server.create_a2a_app')
def test_start_a2a_server_default(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
"""Test starting A2A server with default parameters."""
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(mock_agent_executor)
mock_create_app.assert_called_once_with(
mock_agent_executor,
transport="starlette",
agent_name=None,
agent_description=None
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="localhost",
port=10001
)
@patch('uvicorn.run')
@patch('crewai.a2a.server.create_a2a_app')
def test_start_a2a_server_custom(self, mock_create_app, mock_uvicorn_run, mock_agent_executor):
"""Test starting A2A server with custom parameters."""
mock_app = Mock()
mock_create_app.return_value = mock_app
start_a2a_server(
mock_agent_executor,
host="0.0.0.0",
port=8080,
transport="fastapi"
)
mock_create_app.assert_called_once_with(
mock_agent_executor,
transport="fastapi",
agent_name=None,
agent_description=None
)
mock_uvicorn_run.assert_called_once_with(
mock_app,
host="0.0.0.0",
port=8080
)
@patch('crewai.a2a.server.A2AStarletteApplication')
@patch('crewai.a2a.server.DefaultRequestHandler')
@patch('crewai.a2a.server.InMemoryTaskStore')
def test_create_a2a_app_starlette(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
"""Test creating A2A app with Starlette transport."""
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
result = create_a2a_app(mock_agent_executor, transport="starlette")
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
mock_app_class.assert_called_once()
mock_app_instance.build.assert_called_once()
assert result == mock_built_app
def test_create_a2a_app_fastapi(self, mock_agent_executor):
"""Test creating A2A app with FastAPI transport raises error."""
from crewai.a2a.exceptions import TransportError
with pytest.raises(TransportError, match="FastAPI transport is not available"):
create_a2a_app(
mock_agent_executor,
transport="fastapi",
agent_name="Custom Agent",
agent_description="Custom description"
)
@patch('crewai.a2a.server.A2AStarletteApplication')
@patch('crewai.a2a.server.DefaultRequestHandler')
@patch('crewai.a2a.server.InMemoryTaskStore')
def test_create_a2a_app_default_transport(self, mock_task_store_class, mock_handler_class, mock_app_class, mock_agent_executor):
"""Test creating A2A app with default transport."""
mock_handler = Mock()
mock_app_instance = Mock()
mock_built_app = Mock()
mock_task_store = Mock()
mock_task_store_class.return_value = mock_task_store
mock_handler_class.return_value = mock_handler
mock_app_class.return_value = mock_app_instance
mock_app_instance.build.return_value = mock_built_app
result = create_a2a_app(mock_agent_executor)
mock_task_store_class.assert_called_once()
mock_handler_class.assert_called_once_with(mock_agent_executor, mock_task_store)
mock_app_class.assert_called_once()
assert result == mock_built_app
@pytest.mark.skipif(A2A_AVAILABLE, reason="Testing import error handling")
def test_server_import_error_handling():
"""Test that import errors are handled gracefully when A2A is not available."""
with pytest.raises(ImportError, match="A2A integration requires"):
pass

View File

@@ -1,53 +0,0 @@
"""Tests for ServerConfig dataclass."""
import pytest
try:
from crewai.a2a.server import ServerConfig
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestServerConfig:
"""Test ServerConfig dataclass functionality."""
def test_server_config_defaults(self):
"""Test ServerConfig with default values."""
config = ServerConfig()
assert config.host == "localhost"
assert config.port == 10001
assert config.transport == "starlette"
assert config.agent_name is None
assert config.agent_description is None
def test_server_config_custom_values(self):
"""Test ServerConfig with custom values."""
config = ServerConfig(
host="0.0.0.0",
port=8080,
transport="custom",
agent_name="Test Agent",
agent_description="A test agent"
)
assert config.host == "0.0.0.0"
assert config.port == 8080
assert config.transport == "custom"
assert config.agent_name == "Test Agent"
assert config.agent_description == "A test agent"
def test_server_config_partial_override(self):
"""Test ServerConfig with partial value override."""
config = ServerConfig(
port=9000,
agent_name="Custom Agent"
)
assert config.host == "localhost" # default
assert config.port == 9000 # custom
assert config.transport == "starlette" # default
assert config.agent_name == "Custom Agent" # custom
assert config.agent_description is None # default

View File

@@ -1,51 +0,0 @@
"""Tests for TaskInfo dataclass."""
import pytest
from datetime import datetime
from unittest.mock import Mock
try:
from crewai.a2a.crew_agent_executor import TaskInfo
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
@pytest.mark.skipif(not A2A_AVAILABLE, reason="A2A integration not available")
class TestTaskInfo:
"""Test TaskInfo dataclass functionality."""
def test_task_info_creation(self):
"""Test TaskInfo creation with required fields."""
mock_task = Mock()
started_at = datetime.now()
task_info = TaskInfo(task=mock_task, started_at=started_at)
assert task_info.task == mock_task
assert task_info.started_at == started_at
assert task_info.status == "running"
def test_task_info_with_custom_status(self):
"""Test TaskInfo creation with custom status."""
mock_task = Mock()
started_at = datetime.now()
task_info = TaskInfo(
task=mock_task,
started_at=started_at,
status="completed"
)
assert task_info.status == "completed"
def test_task_info_status_update(self):
"""Test TaskInfo status can be updated."""
mock_task = Mock()
started_at = datetime.now()
task_info = TaskInfo(task=mock_task, started_at=started_at)
assert task_info.status == "running"
task_info.status = "cancelled"
assert task_info.status == "cancelled"

View File

@@ -1,6 +1,4 @@
import pytest
from crewai.cli.constants import ENV_VARS, MODELS, PROVIDERS
from crewai.cli.constants import ENV_VARS, JSON_URL, MODELS, PROVIDERS
def test_huggingface_in_providers():
@@ -21,3 +19,9 @@ def test_huggingface_models():
"""Test that Huggingface models are properly configured."""
assert "huggingface" in MODELS
assert len(MODELS["huggingface"]) > 0
def test_json_url_is_https():
"""Test that JSON_URL uses HTTPS for secure connection."""
assert JSON_URL.startswith("https://")
assert JSON_URL == "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"

View File

@@ -0,0 +1,142 @@
import os
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
import requests
from crewai.cli.provider import fetch_provider_data, get_ssl_verify_config
class TestSSLConfiguration:
def test_get_ssl_verify_config_with_requests_ca_bundle(self):
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_path = temp_file.name
try:
with patch.dict(os.environ, {'REQUESTS_CA_BUNDLE': temp_path}):
result = get_ssl_verify_config()
assert result == temp_path
finally:
os.unlink(temp_path)
def test_get_ssl_verify_config_with_ssl_cert_file(self):
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_path = temp_file.name
try:
with patch.dict(os.environ, {'SSL_CERT_FILE': temp_path}, clear=True):
result = get_ssl_verify_config()
assert result == temp_path
finally:
os.unlink(temp_path)
def test_get_ssl_verify_config_with_curl_ca_bundle(self):
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_path = temp_file.name
try:
with patch.dict(os.environ, {'CURL_CA_BUNDLE': temp_path}, clear=True):
result = get_ssl_verify_config()
assert result == temp_path
finally:
os.unlink(temp_path)
def test_get_ssl_verify_config_precedence(self):
with tempfile.NamedTemporaryFile(delete=False) as temp_file1:
temp_path1 = temp_file1.name
with tempfile.NamedTemporaryFile(delete=False) as temp_file2:
temp_path2 = temp_file2.name
try:
with patch.dict(os.environ, {
'REQUESTS_CA_BUNDLE': temp_path1,
'SSL_CERT_FILE': temp_path2
}):
result = get_ssl_verify_config()
assert result == temp_path1
finally:
os.unlink(temp_path1)
os.unlink(temp_path2)
def test_get_ssl_verify_config_invalid_file(self):
with patch.dict(os.environ, {'REQUESTS_CA_BUNDLE': '/nonexistent/file'}, clear=True):
with patch('certifi.where', return_value='/path/to/certifi/cacert.pem'):
result = get_ssl_verify_config()
assert result == '/path/to/certifi/cacert.pem'
def test_get_ssl_verify_config_fallback_to_certifi(self):
with patch.dict(os.environ, {}, clear=True):
with patch('certifi.where', return_value='/path/to/certifi/cacert.pem'):
result = get_ssl_verify_config()
assert result == '/path/to/certifi/cacert.pem'
def test_get_ssl_verify_config_file_format_validation(self):
"""Test that CA bundle file format validation works correctly."""
with tempfile.NamedTemporaryFile(suffix=".pem", delete=False) as temp_file:
temp_path = temp_file.name
try:
with patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": temp_path}):
result = get_ssl_verify_config()
assert result == temp_path
finally:
os.unlink(temp_path)
def test_get_ssl_verify_config_unsupported_format_warning(self):
"""Test that unsupported file formats still work but show warning."""
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as temp_file:
temp_path = temp_file.name
try:
with patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": temp_path}):
with patch('click.secho') as mock_secho:
result = get_ssl_verify_config()
assert result == temp_path
mock_secho.assert_called_with(
f"Warning: CA bundle file {temp_path} may not be in expected format (.pem, .crt, .cer)",
fg="yellow"
)
finally:
os.unlink(temp_path)
class TestFetchProviderDataSSL:
def test_fetch_provider_data_uses_ssl_config(self):
cache_file = Path("/tmp/test_cache.json")
mock_response = Mock()
mock_response.headers = {'content-length': '100'}
mock_response.iter_content.return_value = [b'{"test": "data"}']
with patch('requests.get', return_value=mock_response) as mock_get:
with patch('crewai.cli.provider.get_ssl_verify_config', return_value='/custom/ca/bundle.pem'):
fetch_provider_data(cache_file)
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
assert kwargs['verify'] == '/custom/ca/bundle.pem'
if cache_file.exists():
cache_file.unlink()
def test_fetch_provider_data_ssl_error_handling(self):
cache_file = Path("/tmp/test_cache.json")
with patch('requests.get', side_effect=requests.exceptions.SSLError("SSL verification failed")):
with patch('click.secho') as mock_secho:
result = fetch_provider_data(cache_file)
assert result is None
mock_secho.assert_any_call("SSL certificate verification failed: SSL verification failed", fg="red")
mock_secho.assert_any_call("Solutions:", fg="cyan")
mock_secho.assert_any_call(" 1. Set REQUESTS_CA_BUNDLE environment variable to your CA bundle path", fg="yellow")
def test_fetch_provider_data_general_request_error(self):
cache_file = Path("/tmp/test_cache.json")
with patch('requests.get', side_effect=requests.exceptions.RequestException("Network error")):
with patch('click.secho') as mock_secho:
result = fetch_provider_data(cache_file)
assert result is None
mock_secho.assert_any_call("Error fetching provider data: Network error", fg="red")

14
uv.lock generated
View File

@@ -820,7 +820,7 @@ requires-dist = [
{ name = "json-repair", specifier = ">=0.25.2" },
{ name = "json5", specifier = ">=0.10.0" },
{ name = "jsonref", specifier = ">=1.1.0" },
{ name = "litellm", specifier = "==1.68.0" },
{ name = "litellm", specifier = "==1.72.0" },
{ name = "mem0ai", marker = "extra == 'mem0'", specifier = ">=0.1.94" },
{ name = "onnxruntime", specifier = "==1.22.0" },
{ name = "openai", specifier = ">=1.13.3" },
@@ -2245,7 +2245,7 @@ wheels = [
[[package]]
name = "litellm"
version = "1.68.0"
version = "1.72.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiohttp" },
@@ -2260,9 +2260,9 @@ dependencies = [
{ name = "tiktoken" },
{ name = "tokenizers" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ba/22/138545b646303ca3f4841b69613c697b9d696322a1386083bb70bcbba60b/litellm-1.68.0.tar.gz", hash = "sha256:9fb24643db84dfda339b64bafca505a2eef857477afbc6e98fb56512c24dbbfa", size = 7314051 }
sdist = { url = "https://files.pythonhosted.org/packages/55/d3/f1a8c9c9ffdd3bab1bc410254c8140b1346f05a01b8c6b37c48b56abb4b0/litellm-1.72.0.tar.gz", hash = "sha256:135022b9b8798f712ffa84e71ac419aa4310f1d0a70f79dd2007f7ef3a381e43", size = 8082337 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/10/af/1e344bc8aee41445272e677d802b774b1f8b34bdc3bb5697ba30f0fb5d52/litellm-1.68.0-py3-none-any.whl", hash = "sha256:3bca38848b1a5236b11aa6b70afa4393b60880198c939e582273f51a542d4759", size = 7684460 },
{ url = "https://files.pythonhosted.org/packages/c2/98/bec08f5a3e504013db6f52b5fd68375bd92b463c91eb454d5a6460e957af/litellm-1.72.0-py3-none-any.whl", hash = "sha256:88360a7ae9aa9c96278ae1bb0a459226f909e711c5d350781296d0640386a824", size = 7979630 },
]
[[package]]
@@ -3123,7 +3123,7 @@ wheels = [
[[package]]
name = "openai"
version = "1.68.2"
version = "1.78.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -3135,9 +3135,9 @@ dependencies = [
{ name = "tqdm" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3f/6b/6b002d5d38794645437ae3ddb42083059d556558493408d39a0fcea608bc/openai-1.68.2.tar.gz", hash = "sha256:b720f0a95a1dbe1429c0d9bb62096a0d98057bcda82516f6e8af10284bdd5b19", size = 413429 }
sdist = { url = "https://files.pythonhosted.org/packages/d1/7c/7c48bac9be52680e41e99ae7649d5da3a0184cd94081e028897f9005aa03/openai-1.78.0.tar.gz", hash = "sha256:254aef4980688468e96cbddb1f348ed01d274d02c64c6c69b0334bf001fb62b3", size = 442652 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fd/34/cebce15f64eb4a3d609a83ac3568d43005cc9a1cba9d7fde5590fd415423/openai-1.68.2-py3-none-any.whl", hash = "sha256:24484cb5c9a33b58576fdc5acf0e5f92603024a4e39d0b99793dfa1eb14c2b36", size = 606073 },
{ url = "https://files.pythonhosted.org/packages/cc/41/d64a6c56d0ec886b834caff7a07fc4d43e1987895594b144757e7a6b90d7/openai-1.78.0-py3-none-any.whl", hash = "sha256:1ade6a48cd323ad8a7715e7e1669bb97a17e1a5b8a916644261aaef4bf284778", size = 680407 },
]
[[package]]