Compare commits

..

8 Commits

Author SHA1 Message Date
Devin AI
6b354201cb Fix lint issues: suppress unused chromadb import warnings and update remaining external memory test
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:45:39 +00:00
Devin AI
5566f4716b Update external memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:38:50 +00:00
Devin AI
ea5f6b592c Update memory tests to handle optional ChromaDB dependency
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:37:44 +00:00
Devin AI
a0057afe45 Fix Collection type annotation and test mocking issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:27:06 +00:00
Devin AI
ebcb6c6f90 Fix CI failures: restore missing error classes and resolve type issues
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:19:30 +00:00
Devin AI
c7e83a7529 Update optional dependencies tests
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:46 +00:00
Devin AI
f0b1cc23f4 Add ChromaDBRequiredError class and improve error handling
Co-Authored-By: João <joao@crewai.com>
2025-05-30 09:10:35 +00:00
Devin AI
7b129fc847 Fix #2919: Make chromadb an optional dependency to resolve package conflicts
Co-Authored-By: João <joao@crewai.com>
2025-05-30 08:56:35 +00:00
23 changed files with 1746 additions and 1967 deletions

View File

@@ -14,7 +14,7 @@ jobs:
timeout-minutes: 15
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12']
steps:
- name: Checkout code
uses: actions/checkout@v4

View File

@@ -403,7 +403,7 @@ In addition to the sequential process, you can use the hierarchical process, whi
## Key Features
CrewAI stands apart as a lean, standalone, high-performance multi-AI Agent framework delivering simplicity, flexibility, and precise control—free from the complexity and limitations found in other agent frameworks.
CrewAI stands apart as a lean, standalone, high-performance framework delivering simplicity, flexibility, and precise control—free from the complexity and limitations found in other agent frameworks.
- **Standalone & Lean**: Completely independent from other frameworks like LangChain, offering faster execution and lighter resource demands.
- **Flexible & Precise**: Easily orchestrate autonomous agents through intuitive [Crews](https://docs.crewai.com/concepts/crews) or precise [Flows](https://docs.crewai.com/concepts/flows), achieving perfect balance for your needs.

View File

@@ -85,12 +85,7 @@
{
"group": "MCP Integration",
"pages": [
"mcp/overview",
"mcp/stdio",
"mcp/sse",
"mcp/streamable-http",
"mcp/multiple-servers",
"mcp/security"
"mcp/crewai-mcp-integration"
]
},
{

View File

@@ -22,7 +22,7 @@ Watch this video tutorial for a step-by-step demonstration of the installation p
<Note>
**Python Version Requirements**
CrewAI requires `Python >=3.10 and <=3.13`. Here's how to check your version:
CrewAI requires `Python >=3.10 and <3.13`. Here's how to check your version:
```bash
python3 --version
```

View File

@@ -0,0 +1,243 @@
---
title: 'MCP Servers as Tools in CrewAI'
description: 'Learn how to integrate MCP servers as tools in your CrewAI agents using the `crewai-tools` library.'
icon: 'plug'
---
## Overview
The [Model Context Protocol](https://modelcontextprotocol.io/introduction) (MCP) provides a standardized way for AI agents to provide context to LLMs by communicating with external services, known as MCP Servers.
The `crewai-tools` library extends CrewAI's capabilities by allowing you to seamlessly integrate tools from these MCP servers into your agents.
This gives your crews access to a vast ecosystem of functionalities. For now, we support **Standard Input/Output** (Stdio) and **Server-Sent Events** (SSE) transport mechanisms.
<Info>
We will also be integrating **Streamable HTTP** transport in the near future.
Streamable HTTP is designed for efficient, bi-directional communication over a single HTTP connection.
</Info>
## Video Tutorial
Watch this video tutorial for a comprehensive guide on MCP integration with CrewAI:
<iframe
width="100%"
height="400"
src="https://www.youtube.com/embed/TpQ45lAZh48"
title="CrewAI MCP Integration Guide"
frameborder="0"
style={{ borderRadius: '10px' }}
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen
></iframe>
## Installation
Before you start using MCP with `crewai-tools`, you need to install the `mcp` extra `crewai-tools` dependency with the following command:
```shell
uv pip install 'crewai-tools[mcp]'
```
### Integrating MCP Tools with `MCPServerAdapter`
The `MCPServerAdapter` class from `crewai-tools` is the primary way to connect to an MCP server and make its tools available to your CrewAI agents.
It supports different transport mechanisms, primarily **Stdio** (for local servers) and **SSE** (Server-Sent Events).You have two main options for managing the connection lifecycle:
### Option 1: Fully Managed Connection (Recommended)
Using a Python context manager (`with` statement) is the recommended approach. It automatically handles starting and stopping the connection to the MCP server.
**For a local Stdio-based MCP server:**
```python
from crewai import Agent, Task, Crew
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters
import os
server_params=StdioServerParameters(
command="uxv", # Or your python3 executable i.e. "python3"
args=["mock_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from Stdio MCP server: {[tool.name for tool in tools]}")
# Example: Using the tools from the Stdio MCP server in a CrewAI Agent
agent = Agent(
role="Web Information Retriever",
goal="Scrape content from a specified URL.",
backstory="An AI that can fetch and process web page data via an MCP tool.",
tools=tools,
verbose=True,
)
task = Task(
description="Scrape content from a specified URL.",
expected_output="Scraped content from the specified URL.",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
result = crew.kickoff()
print(result)
```
**For a remote SSE-based MCP server:**
```python
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew
server_params = {"url": "http://localhost:8000/sse"}
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from SSE MCP server: {[tool.name for tool in tools]}")
# Example: Using the tools from the SSE MCP server in a CrewAI Agent
agent = Agent(
role="Web Information Retriever",
goal="Scrape content from a specified URL.",
backstory="An AI that can fetch and process web page data via an MCP tool.",
tools=tools,
verbose=True,
)
task = Task(
description="Scrape content from a specified URL.",
expected_output="Scraped content from the specified URL.",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
result = crew.kickoff()
print(result)
```
### Option 2: More control over the MCP server connection lifecycle
If you need finer-grained control over the MCP server connection lifecycle, you can instantiate `MCPServerAdapter` directly and manage its `start()` and `stop()` methods.
<Info>
You **MUST** call `mcp_server_adapter.stop()` to ensure the connection is closed and resources are released. Using a `try...finally` block is highly recommended.
</Info>
#### Stdio Transport Example (Manual)
```python
from mcp import StdioServerParameters
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew
import os
stdio_params = StdioServerParameters(
command="uvx", # Or your python3 executable i.e. "python3"
args=["--quiet", "your-mcp-server@0.1.3"],
env={"UV_PYTHON": "3.12", **os.environ},
)
mcp_server_adapter = MCPServerAdapter(server_params=stdio_params)
try:
mcp_server_adapter.start() # Manually start the connection
tools = mcp_server_adapter.tools
print(f"Available tools (manual Stdio): {[tool.name for tool in tools]}")
# Use 'tools' with your Agent, Task, Crew setup as in Option 1
agent = Agent(
role="Medical Researcher",
goal="Find recent studies on a given topic using PubMed.",
backstory="An AI assistant specialized in biomedical literature research.",
tools=tools,
verbose=True
)
task = Task(
description="Search for recent articles on 'crispr gene editing'.",
expected_output="A summary of the top 3 recent articles.",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential
)
result = crew.kickoff()
print(result)
finally:
print("Stopping Stdio MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
```
#### SSE Transport Example (Manual)
```python
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew, Process
from mcp import StdioServerParameters
server_params = {"url": "http://localhost:8000/sse"}
try:
mcp_server_adapter = MCPServerAdapter(server_params)
mcp_server_adapter.start()
tools = mcp_server_adapter.tools
print(f"Available tools (manual SSE): {[tool.name for tool in tools]}")
agent = Agent(
role="Medical Researcher",
goal="Find recent studies on a given topic using PubMed.",
backstory="An AI assistant specialized in biomedical literature research.",
tools=tools,
verbose=True
)
task = Task(
description="Search for recent articles on 'crispr gene editing'.",
expected_output="A summary of the top 3 recent articles.",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential
)
result = crew.kickoff()
print(result)
finally:
print("Stopping SSE MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
```
## Staying Safe with MCP
<Warning>
Always ensure that you trust an MCP Server before using it.
</Warning>
#### Security Warning: DNS Rebinding Attacks
SSE transports can be vulnerable to DNS rebinding attacks if not properly secured.
To prevent this:
1. **Always validate Origin headers** on incoming SSE connections to ensure they come from expected sources
2. **Avoid binding servers to all network interfaces** (0.0.0.0) when running locally - bind only to localhost (127.0.0.1) instead
3. **Implement proper authentication** for all SSE connections
Without these protections, attackers could use DNS rebinding to interact with local MCP servers from remote websites.
For more details, see the [MCP Transport Security](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations) documentation.
### Limitations
* **Supported Primitives**: Currently, `MCPServerAdapter` primarily supports adapting MCP `tools`.
Other MCP primitives like `prompts` or `resources` are not directly integrated as CrewAI components through this adapter at this time.
* **Output Handling**: The adapter typically processes the primary text output from an MCP tool (e.g., `.content[0].text`). Complex or multi-modal outputs might require custom handling if not fitting this pattern.

View File

@@ -1,64 +0,0 @@
---
title: Connecting to Multiple MCP Servers
description: Learn how to use MCPServerAdapter in CrewAI to connect to multiple MCP servers simultaneously and aggregate their tools.
icon: layer-group
---
## Overview
`MCPServerAdapter` in `crewai-tools` allows you to connect to multiple MCP servers concurrently. This is useful when your agents need to access tools distributed across different services or environments. The adapter aggregates tools from all specified servers, making them available to your CrewAI agents.
## Configuration
To connect to multiple servers, you provide a list of server parameter dictionaries to `MCPServerAdapter`. Each dictionary in the list should define the parameters for one MCP server.
Supported transport types for each server in the list include `stdio`, `sse`, and `streamable-http`.
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters # Needed for Stdio example
# Define parameters for multiple MCP servers
server_params_list = [
# Streamable HTTP Server
{
"url": "http://localhost:8001/mcp",
"transport": "streamable-http"
},
# SSE Server
{
"url": "http://localhost:8000/sse",
"transport": "sse"
},
# StdIO Server
StdioServerParameters(
command="python3",
args=["servers/your_stdio_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
]
try:
with MCPServerAdapter(server_params_list) as aggregated_tools:
print(f"Available aggregated tools: {[tool.name for tool in aggregated_tools]}")
multi_server_agent = Agent(
role="Versatile Assistant",
goal="Utilize tools from local Stdio, remote SSE, and remote HTTP MCP servers.",
backstory="An AI agent capable of leveraging a diverse set of tools from multiple sources.",
tools=aggregated_tools, # All tools are available here
verbose=True,
)
... # Your other agent, tasks, and crew code here
except Exception as e:
print(f"Error connecting to or using multiple MCP servers (Managed): {e}")
print("Ensure all MCP servers are running and accessible with correct configurations.")
```
## Connection Management
When using the context manager (`with` statement), `MCPServerAdapter` handles the lifecycle (start and stop) of all connections to the configured MCP servers. This simplifies resource management and ensures that all connections are properly closed when the context is exited.

View File

@@ -1,164 +0,0 @@
---
title: 'MCP Servers as Tools in CrewAI'
description: 'Learn how to integrate MCP servers as tools in your CrewAI agents using the `crewai-tools` library.'
icon: plug
---
## Overview
The [Model Context Protocol](https://modelcontextprotocol.io/introduction) (MCP) provides a standardized way for AI agents to provide context to LLMs by communicating with external services, known as MCP Servers.
The `crewai-tools` library extends CrewAI's capabilities by allowing you to seamlessly integrate tools from these MCP servers into your agents.
This gives your crews access to a vast ecosystem of functionalities.
We currently support the following transport mechanisms:
- **Stdio**: for local servers (communication via standard input/output between processes on the same machine)
- **Server-Sent Events (SSE)**: for remote servers (unidirectional, real-time data streaming from server to client over HTTP)
- **Streamable HTTP**: for remote servers (flexible, potentially bi-directional communication over HTTP, often utilizing SSE for server-to-client streams)
## Video Tutorial
Watch this video tutorial for a comprehensive guide on MCP integration with CrewAI:
<iframe
width="100%"
height="400"
src="https://www.youtube.com/embed/TpQ45lAZh48"
title="CrewAI MCP Integration Guide"
frameborder="0"
style={{ borderRadius: '10px' }}
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen
></iframe>
## Installation
Before you start using MCP with `crewai-tools`, you need to install the `mcp` extra `crewai-tools` dependency with the following command:
```shell
uv pip install 'crewai-tools[mcp]'
```
## Key Concepts & Getting Started
The `MCPServerAdapter` class from `crewai-tools` is the primary way to connect to an MCP server and make its tools available to your CrewAI agents. It supports different transport mechanisms and simplifies connection management.
Using a Python context manager (`with` statement) is the **recommended approach** for `MCPServerAdapter`. It automatically handles starting and stopping the connection to the MCP server.
```python
from crewai import Agent
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters # For Stdio Server
# Example server_params (choose one based on your server type):
# 1. Stdio Server:
server_params=StdioServerParameters(
command="python3",
args=["servers/your_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
# 2. SSE Server:
server_params = {
"url": "http://localhost:8000/sse",
"transport": "sse"
}
# 3. Streamable HTTP Server:
server_params = {
"url": "http://localhost:8001/mcp",
"transport": "streamable-http"
}
# Example usage (uncomment and adapt once server_params is set):
with MCPServerAdapter(server_params) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
my_agent = Agent(
role="MCP Tool User",
goal="Utilize tools from an MCP server.",
backstory="I can connect to MCP servers and use their tools.",
tools=mcp_tools, # Pass the loaded tools to your agent
reasoning=True,
verbose=True
)
# ... rest of your crew setup ...
```
This general pattern shows how to integrate tools. For specific examples tailored to each transport, refer to the detailed guides below.
## Explore MCP Integrations
<CardGroup cols={2}>
<Card
title="Stdio Transport"
icon="server"
href="/mcp/stdio"
color="#3B82F6"
>
Connect to local MCP servers via standard input/output. Ideal for scripts and local executables.
</Card>
<Card
title="SSE Transport"
icon="wifi"
href="/mcp/sse"
color="#10B981"
>
Integrate with remote MCP servers using Server-Sent Events for real-time data streaming.
</Card>
<Card
title="Streamable HTTP Transport"
icon="globe"
href="/mcp/streamable-http"
color="#F59E0B"
>
Utilize flexible Streamable HTTP for robust communication with remote MCP servers.
</Card>
<Card
title="Connecting to Multiple Servers"
icon="layer-group"
href="/mcp/multiple-servers"
color="#8B5CF6"
>
Aggregate tools from several MCP servers simultaneously using a single adapter.
</Card>
<Card
title="Security Considerations"
icon="lock"
href="/mcp/security"
color="#EF4444"
>
Review important security best practices for MCP integration to keep your agents safe.
</Card>
</CardGroup>
Checkout this repository for full demos and examples of MCP integration with CrewAI! 👇
<Card
title="GitHub Repository"
icon="github"
href="https://github.com/tonykipkemboi/crewai-mcp-demo"
target="_blank"
>
CrewAI MCP Demo
</Card>
## Staying Safe with MCP
<Warning>
Always ensure that you trust an MCP Server before using it.
</Warning>
#### Security Warning: DNS Rebinding Attacks
SSE transports can be vulnerable to DNS rebinding attacks if not properly secured.
To prevent this:
1. **Always validate Origin headers** on incoming SSE connections to ensure they come from expected sources
2. **Avoid binding servers to all network interfaces** (0.0.0.0) when running locally - bind only to localhost (127.0.0.1) instead
3. **Implement proper authentication** for all SSE connections
Without these protections, attackers could use DNS rebinding to interact with local MCP servers from remote websites.
For more details, see the [Anthropic's MCP Transport Security docs](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations).
### Limitations
* **Supported Primitives**: Currently, `MCPServerAdapter` primarily supports adapting MCP `tools`.
Other MCP primitives like `prompts` or `resources` are not directly integrated as CrewAI components through this adapter at this time.
* **Output Handling**: The adapter typically processes the primary text output from an MCP tool (e.g., `.content[0].text`). Complex or multi-modal outputs might require custom handling if not fitting this pattern.

View File

@@ -1,166 +0,0 @@
---
title: MCP Security Considerations
description: Learn about important security best practices when integrating MCP servers with your CrewAI agents.
icon: lock
---
## Overview
<Warning>
The most critical aspect of MCP security is **trust**. You should **only** connect your CrewAI agents to MCP servers that you fully trust.
</Warning>
When integrating external services like MCP (Model Context Protocol) servers into your CrewAI agents, security is paramount.
MCP servers can execute code, access data, or interact with other systems based on the tools they expose.
It's crucial to understand the implications and follow best practices to protect your applications and data.
### Risks
- Execute arbitrary code on the machine where the agent is running (especially with `Stdio` transport if the server can control the command executed).
- Expose sensitive data from your agent or its environment.
- Manipulate your agent's behavior in unintended ways, including making unauthorized API calls on your behalf.
- Hijack your agent's reasoning process through sophisticated prompt injection techniques (see below).
### 1. Trusting MCP Servers
<Warning>
**Only connect to MCP servers that you trust.**
</Warning>
Before configuring `MCPServerAdapter` to connect to an MCP server, ensure you know:
- **Who operates the server?** Is it a known, reputable service, or an internal server under your control?
- **What tools does it expose?** Understand the capabilities of the tools. Could they be misused if an attacker gained control or if the server itself is malicious?
- **What data does it access or process?** Be aware of any sensitive information that might be sent to or handled by the MCP server.
Avoid connecting to unknown or unverified MCP servers, especially if your agents handle sensitive tasks or data.
### 2. Secure Prompt Injection via Tool Metadata: The "Model Control Protocol" Risk
A significant and subtle risk is the potential for prompt injection through tool metadata. Here's how it works:
1. When your CrewAI agent connects to an MCP server, it typically requests a list of available tools.
2. The MCP server responds with metadata for each tool, including its name, description, and parameter descriptions.
3. Your agent's underlying Language Model (LLM) uses this metadata to understand how and when to use the tools. This metadata is often incorporated into the LLM's system prompt or context.
4. A malicious MCP server can craft its tool metadata (names, descriptions) to include hidden or overt instructions. These instructions can act as a prompt injection, effectively telling your LLM to behave in a certain way, reveal sensitive information, or perform malicious actions.
**Crucially, this attack can occur simply by connecting to a malicious server and listing its tools, even if your agent never explicitly decides to *use* any of those tools.** The mere exposure to the malicious metadata can be enough to compromise the agent's behavior.
**Mitigation:**
* **Extreme Caution with Untrusted Servers:** Reiterate: *Do not connect to MCP servers you do not fully trust.* The risk of metadata injection makes this paramount.
### Stdio Transport Security
Stdio (Standard Input/Output) transport is typically used for local MCP servers running on the same machine as your CrewAI application.
- **Process Isolation**: While generally safer as it doesn't involve network exposure by default, ensure the script or command run by `StdioServerParameters` is from a trusted source and has appropriate file system permissions. A malicious Stdio server script could still harm your local system.
- **Input Sanitization**: If your Stdio server script takes complex inputs derived from agent interactions, ensure the script itself sanitizes these inputs to prevent command injection or other vulnerabilities within the script's logic.
- **Resource Limits**: Be mindful that a local Stdio server process consumes local resources (CPU, memory). Ensure it's well-behaved and won't exhaust system resources.
### Confused Deputy Attacks
The [Confused Deputy Problem](https://en.wikipedia.org/wiki/Confused_deputy_problem) is a classic security vulnerability that can manifest in MCP integrations, especially when an MCP server acts as a proxy to other third-party services (e.g., Google Calendar, GitHub) that use OAuth 2.0 for authorization.
**Scenario:**
1. An MCP server (let's call it `MCP-Proxy`) allows your agent to interact with `ThirdPartyAPI`.
2. `MCP-Proxy` uses its own single, static `client_id` when talking to `ThirdPartyAPI`'s authorization server.
3. You, as the user, legitimately authorize `MCP-Proxy` to access `ThirdPartyAPI` on your behalf. During this, `ThirdPartyAPI`'s auth server might set a cookie in your browser indicating your consent for `MCP-Proxy`'s `client_id`.
4. An attacker crafts a malicious link. This link initiates an OAuth flow with `MCP-Proxy`, but is designed to trick `ThirdPartyAPI`'s auth server.
5. If you click this link, and `ThirdPartyAPI`'s auth server sees your existing consent cookie for `MCP-Proxy`'s `client_id`, it might *skip* asking for your consent again.
6. `MCP-Proxy` might then be tricked into forwarding an authorization code (for `ThirdPartyAPI`) to the attacker, or an MCP authorization code that the attacker can use to impersonate you to `MCP-Proxy`.
**Mitigation (Primarily for MCP Server Developers):**
* MCP proxy servers using static client IDs for downstream services **must** obtain explicit user consent for *each client application or agent* connecting to them *before* initiating an OAuth flow with the third-party service. This means `MCP-Proxy` itself should show a consent screen.
**CrewAI User Implication:**
* Be cautious if an MCP server redirects you for multiple OAuth authentications, especially if it seems unexpected or if the permissions requested are overly broad.
* Prefer MCP servers that clearly delineate their own identity versus the third-party services they might proxy.
### Remote Transport Security (SSE & Streamable HTTP)
When connecting to remote MCP servers via Server-Sent Events (SSE) or Streamable HTTP, standard web security practices are essential.
### SSE Security Considerations
### a. DNS Rebinding Attacks (Especially for SSE)
<Critical>
**Protect against DNS Rebinding Attacks.**
</Critical>
DNS rebinding allows an attacker-controlled website to bypass the same-origin policy and make requests to servers on the user's local network (e.g., `localhost`) or intranet. This is particularly risky if you run an MCP server locally (e.g., for development) and an agent in a browser-like environment (though less common for typical CrewAI backend setups) or if the MCP server is on an internal network.
**Mitigation Strategies for MCP Server Implementers:**
- **Validate `Origin` and `Host` Headers**: MCP servers (especially SSE ones) should validate the `Origin` and/or `Host` HTTP headers to ensure requests are coming from expected domains/clients.
- **Bind to `localhost` (127.0.0.1)**: When running MCP servers locally for development, bind them to `127.0.0.1` instead of `0.0.0.0`. This prevents them from being accessible from other machines on the network.
- **Authentication**: Require authentication for all connections to your MCP server if it's not intended for public anonymous access.
### b. Use HTTPS
- **Encrypt Data in Transit**: Always use HTTPS (HTTP Secure) for the URLs of remote MCP servers. This encrypts the communication between your CrewAI application and the MCP server, protecting against eavesdropping and man-in-the-middle attacks. `MCPServerAdapter` will respect the scheme (`http` or `https`) provided in the URL.
### c. Token Passthrough (Anti-Pattern)
This is primarily a concern for MCP server developers but understanding it helps in choosing secure servers.
"Token passthrough" is when an MCP server accepts an access token from your CrewAI agent (which might be a token for a *different* service, say `ServiceA`) and simply passes it through to another downstream API (`ServiceB`) without proper validation. Specifically, `ServiceB` (or the MCP server itself) should only accept tokens that were explicitly issued *for them* (i.e., the 'audience' claim in the token matches the server/service).
**Risks:**
* Bypasses security controls (like rate limiting or fine-grained permissions) on the MCP server or the downstream API.
* Breaks audit trails and accountability.
* Allows misuse of stolen tokens.
**Mitigation (For MCP Server Developers):**
* MCP servers **MUST NOT** accept tokens that were not explicitly issued for them. They must validate the token's audience claim.
**CrewAI User Implication:**
* While not directly controllable by the user, this highlights the importance of connecting to well-designed MCP servers that adhere to security best practices.
#### Authentication and Authorization
- **Verify Identity**: If the MCP server provides sensitive tools or access to private data, it MUST implement strong authentication mechanisms to verify the identity of the client (your CrewAI application). This could involve API keys, OAuth tokens, or other standard methods.
- **Principle of Least Privilege**: Ensure the credentials used by `MCPServerAdapter` (if any) have only the necessary permissions to access the required tools.
### d. Input Validation and Sanitization
- **Input Validation is Critical**: MCP servers **must** rigorously validate all inputs received from agents *before* processing them or passing them to tools. This is a primary defense against many common vulnerabilities:
- **Command Injection:** If a tool constructs shell commands, SQL queries, or other interpreted language statements based on input, the server must meticulously sanitize this input to prevent malicious commands from being injected and executed.
- **Path Traversal:** If a tool accesses files based on input parameters, the server must validate and sanitize these paths to prevent access to unauthorized files or directories (e.g., by blocking `../` sequences).
- **Data Type & Range Checks:** Servers must ensure that input data conforms to the expected data types (e.g., string, number, boolean) and falls within acceptable ranges or adheres to defined formats (e.g., regex for URLs).
- **JSON Schema Validation:** All tool parameters should be strictly validated against their defined JSON schema. This helps catch malformed requests early.
- **Client-Side Awareness**: While server-side validation is paramount, as a CrewAI user, be mindful of the data your agents are constructed to send to MCP tools, especially if interacting with less-trusted or new MCP servers.
### e. Rate Limiting and Resource Management
- **Prevent Abuse**: MCP servers should implement rate limiting to prevent abuse, whether intentional (Denial of Service attacks) or unintentional (e.g., a misconfigured agent making too many requests).
- **Client-Side Retries**: Implement sensible retry logic in your CrewAI tasks if transient network issues or server rate limits are expected, but avoid aggressive retries that could exacerbate server load.
## 4. Secure MCP Server Implementation Advice (For Developers)
If you are developing an MCP server that CrewAI agents might connect to, consider these best practices in addition to the points above:
- **Follow Secure Coding Practices**: Adhere to standard secure coding principles for your chosen language and framework (e.g., OWASP Top 10).
- **Principle of Least Privilege**: Ensure the process running the MCP server (especially for `Stdio`) has only the minimum necessary permissions. Tools themselves should also operate with the least privilege required to perform their function.
- **Dependency Management**: Keep all server-side dependencies, including operating system packages, language runtimes, and third-party libraries, up-to-date to patch known vulnerabilities. Use tools to scan for vulnerable dependencies.
- **Secure Defaults**: Design your server and its tools to be secure by default. For example, features that could be risky should be off by default or require explicit opt-in with clear warnings.
- **Access Control for Tools**: Implement robust mechanisms to control which authenticated and authorized agents or users can access specific tools, especially those that are powerful, sensitive, or incur costs.
- **Secure Error Handling**: Servers should not expose detailed internal error messages, stack traces, or debugging information to the client, as these can reveal internal workings or potential vulnerabilities. Log errors comprehensively on the server-side for diagnostics.
- **Comprehensive Logging and Monitoring**: Implement detailed logging of security-relevant events (e.g., authentication attempts, tool invocations, errors, authorization changes). Monitor these logs for suspicious activity or abuse patterns.
- **Adherence to MCP Authorization Spec**: If implementing authentication and authorization, strictly follow the [MCP Authorization specification](https://modelcontextprotocol.io/specification/draft/basic/authorization) and relevant [OAuth 2.0 security best practices](https://datatracker.ietf.org/doc/html/rfc9700).
- **Regular Security Audits**: If your MCP server handles sensitive data, performs critical operations, or is publicly exposed, consider periodic security audits by qualified professionals.
## 5. Further Reading
For more detailed information on MCP security, refer to the official documentation:
- **[MCP Transport Security](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations)**
By understanding these security considerations and implementing best practices, you can safely leverage the power of MCP servers in your CrewAI projects.
These are by no means exhaustive, but they cover the most common and critical security concerns.
The threats will continue to evolve, so it's important to stay informed and adapt your security measures accordingly.

View File

@@ -1,150 +0,0 @@
---
title: SSE Transport
description: Learn how to connect CrewAI to remote MCP servers using Server-Sent Events (SSE) for real-time communication.
icon: wifi
---
## Overview
Server-Sent Events (SSE) provide a standard way for a web server to send updates to a client over a single, long-lived HTTP connection. In the context of MCP, SSE is used for remote servers to stream data (like tool responses) to your CrewAI application in real-time.
## Key Concepts
- **Remote Servers**: SSE is suitable for MCP servers hosted remotely.
- **Unidirectional Stream**: Typically, SSE is a one-way communication channel from server to client.
- **`MCPServerAdapter` Configuration**: For SSE, you'll provide the server's URL and specify the transport type.
## Connecting via SSE
You can connect to an SSE-based MCP server using two main approaches for managing the connection lifecycle:
### 1. Fully Managed Connection (Recommended)
Using a Python context manager (`with` statement) is the recommended approach. It automatically handles establishing and closing the connection to the SSE MCP server.
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "http://localhost:8000/sse", # Replace with your actual SSE server URL
"transport": "sse"
}
# Using MCPServerAdapter with a context manager
try:
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from SSE MCP server: {[tool.name for tool in tools]}")
# Example: Using a tool from the SSE MCP server
sse_agent = Agent(
role="Remote Service User",
goal="Utilize a tool provided by a remote SSE MCP server.",
backstory="An AI agent that connects to external services via SSE.",
tools=tools,
reasoning=True,
verbose=True,
)
sse_task = Task(
description="Fetch real-time stock updates for 'AAPL' using an SSE tool.",
expected_output="The latest stock price for AAPL.",
agent=sse_agent,
markdown=True
)
sse_crew = Crew(
agents=[sse_agent],
tasks=[sse_task],
verbose=True,
process=Process.sequential
)
if tools: # Only kickoff if tools were loaded
result = sse_crew.kickoff() # Add inputs={'stock_symbol': 'AAPL'} if tool requires it
print("\nCrew Task Result (SSE - Managed):\n", result)
else:
print("Skipping crew kickoff as tools were not loaded (check server connection).")
except Exception as e:
print(f"Error connecting to or using SSE MCP server (Managed): {e}")
print("Ensure the SSE MCP server is running and accessible at the specified URL.")
```
<Note>
Replace `"http://localhost:8000/sse"` with the actual URL of your SSE MCP server.
</Note>
### 2. Manual Connection Lifecycle
If you need finer-grained control, you can manage the `MCPServerAdapter` connection lifecycle manually.
<Info>
You **MUST** call `mcp_server_adapter.stop()` to ensure the connection is closed and resources are released. Using a `try...finally` block is highly recommended.
</Info>
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "http://localhost:8000/sse", # Replace with your actual SSE server URL
"transport": "sse"
}
mcp_server_adapter = None
try:
mcp_server_adapter = MCPServerAdapter(server_params)
mcp_server_adapter.start()
tools = mcp_server_adapter.tools
print(f"Available tools (manual SSE): {[tool.name for tool in tools]}")
manual_sse_agent = Agent(
role="Remote Data Analyst",
goal="Analyze data fetched from a remote SSE MCP server using manual connection management.",
backstory="An AI skilled in handling SSE connections explicitly.",
tools=tools,
verbose=True
)
analysis_task = Task(
description="Fetch and analyze the latest user activity trends from the SSE server.",
expected_output="A summary report of user activity trends.",
agent=manual_sse_agent
)
analysis_crew = Crew(
agents=[manual_sse_agent],
tasks=[analysis_task],
verbose=True,
process=Process.sequential
)
result = analysis_crew.kickoff()
print("\nCrew Task Result (SSE - Manual):\n", result)
except Exception as e:
print(f"An error occurred during manual SSE MCP integration: {e}")
print("Ensure the SSE MCP server is running and accessible.")
finally:
if mcp_server_adapter and mcp_server_adapter.is_connected:
print("Stopping SSE MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
elif mcp_server_adapter:
print("SSE MCP server adapter was not connected. No stop needed or start failed.")
```
## Security Considerations for SSE
<Warning>
**DNS Rebinding Attacks**: SSE transports can be vulnerable to DNS rebinding attacks if the MCP server is not properly secured. This could allow malicious websites to interact with local or intranet-based MCP servers.
</Warning>
To mitigate this risk:
- MCP server implementations should **validate `Origin` headers** on incoming SSE connections.
- When running local SSE MCP servers for development, **bind only to `localhost` (`127.0.0.1`)** rather than all network interfaces (`0.0.0.0`).
- Implement **proper authentication** for all SSE connections if they expose sensitive tools or data.
For a comprehensive overview of security best practices, please refer to our [Security Considerations](./security.mdx) page and the official [MCP Transport Security documentation](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations).

View File

@@ -1,134 +0,0 @@
---
title: Stdio Transport
description: Learn how to connect CrewAI to local MCP servers using the Stdio (Standard Input/Output) transport mechanism.
icon: server
---
## Overview
The Stdio (Standard Input/Output) transport is designed for connecting `MCPServerAdapter` to local MCP servers that communicate over their standard input and output streams. This is typically used when the MCP server is a script or executable running on the same machine as your CrewAI application.
## Key Concepts
- **Local Execution**: Stdio transport manages a locally running process for the MCP server.
- **`StdioServerParameters`**: This class from the `mcp` library is used to configure the command, arguments, and environment variables for launching the Stdio server.
## Connecting via Stdio
You can connect to an Stdio-based MCP server using two main approaches for managing the connection lifecycle:
### 1. Fully Managed Connection (Recommended)
Using a Python context manager (`with` statement) is the recommended approach. It automatically handles starting the MCP server process and stopping it when the context is exited.
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters
import os
# Create a StdioServerParameters object
server_params=StdioServerParameters(
command="python3",
args=["servers/your_stdio_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from Stdio MCP server: {[tool.name for tool in tools]}")
# Example: Using the tools from the Stdio MCP server in a CrewAI Agent
research_agent = Agent(
role="Local Data Processor",
goal="Process data using a local Stdio-based tool.",
backstory="An AI that leverages local scripts via MCP for specialized tasks.",
tools=tools,
reasoning=True,
verbose=True,
)
processing_task = Task(
description="Process the input data file 'data.txt' and summarize its contents.",
expected_output="A summary of the processed data.",
agent=research_agent,
markdown=True
)
data_crew = Crew(
agents=[research_agent],
tasks=[processing_task],
verbose=True,
process=Process.sequential
)
result = data_crew.kickoff()
print("\nCrew Task Result (Stdio - Managed):\n", result)
```
### 2. Manual Connection Lifecycle
If you need finer-grained control over when the Stdio MCP server process is started and stopped, you can manage the `MCPServerAdapter` lifecycle manually.
<Info>
You **MUST** call `mcp_server_adapter.stop()` to ensure the server process is terminated and resources are released. Using a `try...finally` block is highly recommended.
</Info>
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters
import os
# Create a StdioServerParameters object
stdio_params=StdioServerParameters(
command="python3",
args=["servers/your_stdio_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
mcp_server_adapter = MCPServerAdapter(server_params=stdio_params)
try:
mcp_server_adapter.start() # Manually start the connection and server process
tools = mcp_server_adapter.tools
print(f"Available tools (manual Stdio): {[tool.name for tool in tools]}")
# Example: Using the tools with your Agent, Task, Crew setup
manual_agent = Agent(
role="Local Task Executor",
goal="Execute a specific local task using a manually managed Stdio tool.",
backstory="An AI proficient in controlling local processes via MCP.",
tools=tools,
verbose=True
)
manual_task = Task(
description="Execute the 'perform_analysis' command via the Stdio tool.",
expected_output="Results of the analysis.",
agent=manual_agent
)
manual_crew = Crew(
agents=[manual_agent],
tasks=[manual_task],
verbose=True,
process=Process.sequential
)
result = manual_crew.kickoff() # Actual inputs depend on your tool
print("\nCrew Task Result (Stdio - Manual):\n", result)
except Exception as e:
print(f"An error occurred during manual Stdio MCP integration: {e}")
finally:
if mcp_server_adapter and mcp_server_adapter.is_connected: # Check if connected before stopping
print("Stopping Stdio MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
elif mcp_server_adapter: # If adapter exists but not connected (e.g. start failed)
print("Stdio MCP server adapter was not connected. No stop needed or start failed.")
```
Remember to replace placeholder paths and commands with your actual Stdio server details. The `env` parameter in `StdioServerParameters` can
be used to set environment variables for the server process, which can be useful for configuring its behavior or providing necessary paths (like `PYTHONPATH`).

View File

@@ -1,135 +0,0 @@
---
title: Streamable HTTP Transport
description: Learn how to connect CrewAI to remote MCP servers using the flexible Streamable HTTP transport.
icon: globe
---
## Overview
Streamable HTTP transport provides a flexible way to connect to remote MCP servers. It's often built upon HTTP and can support various communication patterns, including request-response and streaming, sometimes utilizing Server-Sent Events (SSE) for server-to-client streams within a broader HTTP interaction.
## Key Concepts
- **Remote Servers**: Designed for MCP servers hosted remotely.
- **Flexibility**: Can support more complex interaction patterns than plain SSE, potentially including bi-directional communication if the server implements it.
- **`MCPServerAdapter` Configuration**: You'll need to provide the server's base URL for MCP communication and specify `"streamable-http"` as the transport type.
## Connecting via Streamable HTTP
You have two primary methods for managing the connection lifecycle with a Streamable HTTP MCP server:
### 1. Fully Managed Connection (Recommended)
The recommended approach is to use a Python context manager (`with` statement), which handles the connection's setup and teardown automatically.
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "http://localhost:8001/mcp", # Replace with your actual Streamable HTTP server URL
"transport": "streamable-http"
}
try:
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from Streamable HTTP MCP server: {[tool.name for tool in tools]}")
http_agent = Agent(
role="HTTP Service Integrator",
goal="Utilize tools from a remote MCP server via Streamable HTTP.",
backstory="An AI agent adept at interacting with complex web services.",
tools=tools,
verbose=True,
)
http_task = Task(
description="Perform a complex data query using a tool from the Streamable HTTP server.",
expected_output="The result of the complex data query.",
agent=http_agent,
)
http_crew = Crew(
agents=[http_agent],
tasks=[http_task],
verbose=True,
process=Process.sequential
)
result = http_crew.kickoff()
print("\nCrew Task Result (Streamable HTTP - Managed):\n", result)
except Exception as e:
print(f"Error connecting to or using Streamable HTTP MCP server (Managed): {e}")
print("Ensure the Streamable HTTP MCP server is running and accessible at the specified URL.")
```
**Note:** Replace `"http://localhost:8001/mcp"` with the actual URL of your Streamable HTTP MCP server.
### 2. Manual Connection Lifecycle
For scenarios requiring more explicit control, you can manage the `MCPServerAdapter` connection manually.
<Info>
It is **critical** to call `mcp_server_adapter.stop()` when you are done to close the connection and free up resources. A `try...finally` block is the safest way to ensure this.
</Info>
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools import MCPServerAdapter
server_params = {
"url": "http://localhost:8001/mcp", # Replace with your actual Streamable HTTP server URL
"transport": "streamable-http"
}
mcp_server_adapter = None
try:
mcp_server_adapter = MCPServerAdapter(server_params)
mcp_server_adapter.start()
tools = mcp_server_adapter.tools
print(f"Available tools (manual Streamable HTTP): {[tool.name for tool in tools]}")
manual_http_agent = Agent(
role="Advanced Web Service User",
goal="Interact with an MCP server using manually managed Streamable HTTP connections.",
backstory="An AI specialist in fine-tuning HTTP-based service integrations.",
tools=tools,
verbose=True
)
data_processing_task = Task(
description="Submit data for processing and retrieve results via Streamable HTTP.",
expected_output="Processed data or confirmation.",
agent=manual_http_agent
)
data_crew = Crew(
agents=[manual_http_agent],
tasks=[data_processing_task],
verbose=True,
process=Process.sequential
)
result = data_crew.kickoff()
print("\nCrew Task Result (Streamable HTTP - Manual):\n", result)
except Exception as e:
print(f"An error occurred during manual Streamable HTTP MCP integration: {e}")
print("Ensure the Streamable HTTP MCP server is running and accessible.")
finally:
if mcp_server_adapter and mcp_server_adapter.is_connected:
print("Stopping Streamable HTTP MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
elif mcp_server_adapter:
print("Streamable HTTP MCP server adapter was not connected. No stop needed or start failed.")
```
## Security Considerations
When using Streamable HTTP transport, general web security best practices are paramount:
- **Use HTTPS**: Always prefer HTTPS (HTTP Secure) for your MCP server URLs to encrypt data in transit.
- **Authentication**: Implement robust authentication mechanisms if your MCP server exposes sensitive tools or data.
- **Input Validation**: Ensure your MCP server validates all incoming requests and parameters.
For a comprehensive guide on securing your MCP integrations, please refer to our [Security Considerations](./security.mdx) page and the official [MCP Transport Security documentation](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations).

View File

@@ -3,7 +3,7 @@ name = "crewai"
version = "0.121.1"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<3.14"
requires-python = ">=3.10,<3.13"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
@@ -21,9 +21,6 @@ dependencies = [
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0",
# Data Handling
"chromadb>=0.5.23",
"tokenizers>=0.20.3",
"onnxruntime==1.22.0",
"openpyxl>=3.1.5",
"pyvis>=0.3.2",
# Authentication and Security
@@ -49,9 +46,13 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.45.0"]
embeddings = [
"tiktoken~=0.8.0"
"tiktoken~=0.7.0"
]
storage = [
"chromadb>=0.5.23"
]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [
"pdfplumber>=0.11.4",
]
@@ -101,27 +102,6 @@ exclude = ["cli/templates"]
[tool.bandit]
exclude_dirs = ["src/crewai/cli/templates"]
# PyTorch index configuration, since torch 2.5.0 is not compatible with python 3.13
[[tool.uv.index]]
name = "pytorch-nightly"
url = "https://download.pytorch.org/whl/nightly/cpu"
explicit = true
[[tool.uv.index]]
name = "pytorch"
url = "https://download.pytorch.org/whl/cpu"
explicit = true
[tool.uv.sources]
torch = [
{ index = "pytorch-nightly", marker = "python_version >= '3.13'" },
{ index = "pytorch", marker = "python_version < '3.13'" },
]
torchvision = [
{ index = "pytorch-nightly", marker = "python_version >= '3.13'" },
{ index = "pytorch", marker = "python_version < '3.13'" },
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@@ -0,0 +1,93 @@
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
from .base_embedder import BaseEmbedder
try:
from fastembed_gpu import TextEmbedding # type: ignore
FASTEMBED_AVAILABLE = True
except ImportError:
try:
from fastembed import TextEmbedding
FASTEMBED_AVAILABLE = True
except ImportError:
FASTEMBED_AVAILABLE = False
class FastEmbed(BaseEmbedder):
"""
A wrapper class for text embedding models using FastEmbed
"""
def __init__(
self,
model_name: str = "BAAI/bge-small-en-v1.5",
cache_dir: Optional[Union[str, Path]] = None,
):
"""
Initialize the embedding model
Args:
model_name: Name of the model to use
cache_dir: Directory to cache the model
gpu: Whether to use GPU acceleration
"""
if not FASTEMBED_AVAILABLE:
raise ImportError(
"FastEmbed is not installed. Please install it with: "
"uv pip install fastembed or uv pip install fastembed-gpu for GPU support"
)
self.model = TextEmbedding(
model_name=model_name,
cache_dir=str(cache_dir) if cache_dir else None,
)
def embed_chunks(self, chunks: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
List of embeddings
"""
embeddings = list(self.model.embed(chunks))
return embeddings
def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
List of embeddings
"""
embeddings = list(self.model.embed(texts))
return embeddings
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
return self.embed_texts([text])[0]
@property
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
# Generate a test embedding to get dimensions
test_embed = self.embed_text("test")
return len(test_embed)

View File

@@ -6,16 +6,25 @@ import os
import shutil
from typing import Any, Dict, List, Optional, Union
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
try:
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
HAS_CHROMADB = True
except ImportError:
chromadb = None # type: ignore
ClientAPI = Any # type: ignore
OneOrMany = Any # type: ignore
Settings = Any # type: ignore
HAS_CHROMADB = False
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
@@ -43,7 +52,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
search efficiency.
"""
collection: Optional[chromadb.Collection] = None
collection: Optional[Any] = None # type: ignore
collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None
@@ -62,6 +71,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
with suppress_logging():
if self.collection:
fetched = self.collection.query(
@@ -84,48 +96,63 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def reset(self):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
try:
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
if not self.collection:
raise Exception("Collection not initialized")
@@ -156,7 +183,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = (
final_metadata: Optional[OneOrMany[Any]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata
)
@@ -165,29 +192,38 @@ class KnowledgeStorage(BaseKnowledgeStorage):
metadatas=final_metadata,
ids=filtered_ids,
)
except chromadb.errors.InvalidDimensionException as e:
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
except Exception as e:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
if HAS_CHROMADB and isinstance(e, chromadb.errors.InvalidDimensionException):
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
else:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("knowledge storage")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("knowledge storage")
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage.

View File

@@ -6,11 +6,17 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
try:
from chromadb.api import ClientAPI
HAS_CHROMADB = True
except ImportError:
ClientAPI = Any # type: ignore
HAS_CHROMADB = False
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.errors import ChromaDBRequiredError
from crewai.utilities.paths import db_storage_path
@@ -60,26 +66,32 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
if not HAS_CHROMADB:
raise ChromaDBRequiredError("memory storage")
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except ImportError:
raise ChromaDBRequiredError("memory storage")
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
@@ -165,10 +177,16 @@ class RAGStorage(BaseRAGStorage):
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("memory storage")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("memory storage")

View File

@@ -1,8 +1,29 @@
import os
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, Optional, cast, Protocol, Sequence, TYPE_CHECKING, TypeVar, List, Union
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
from crewai.utilities.errors import ChromaDBRequiredError
if TYPE_CHECKING:
from numpy import ndarray
from numpy import dtype, floating, signedinteger, unsignedinteger
try:
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
Documents = List[str] # type: ignore
Embeddings = List[List[float]] # type: ignore
class EmbeddingFunction(Protocol): # type: ignore
"""Protocol for embedding functions when ChromaDB is not available."""
def __call__(self, input: List[str]) -> List[List[float]]: ...
def validate_embedding_function(func: Any) -> None: # type: ignore
"""Stub for validate_embedding_function when ChromaDB is not available."""
pass
class EmbeddingConfigurator:
@@ -26,6 +47,9 @@ class EmbeddingConfigurator:
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
if embedder_config is None:
return self._create_default_embedding_function()
@@ -47,129 +71,189 @@ class EmbeddingConfigurator:
@staticmethod
def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
try:
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
except ImportError:
raise ChromaDBRequiredError("embedding functionality")
@staticmethod
def _configure_watson(config, model_name):
@@ -182,7 +266,7 @@ class EmbeddingConfigurator:
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e
class WatsonEmbeddingFunction(EmbeddingFunction):
class WatsonEmbeddingFunction:
def __call__(self, input: Documents) -> Embeddings:
if isinstance(input, str):
input = [input]
@@ -212,6 +296,9 @@ class EmbeddingConfigurator:
@staticmethod
def _configure_custom(config):
if not HAS_CHROMADB:
raise ChromaDBRequiredError("embedding functionality")
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:

View File

@@ -0,0 +1,62 @@
"""Custom error classes for CrewAI."""
from typing import Optional
class ChromaDBRequiredError(ImportError):
"""Error raised when ChromaDB is required but not installed."""
def __init__(self, feature: str):
"""Initialize the error with a specific feature name.
Args:
feature: The name of the feature that requires ChromaDB.
"""
message = (
f"ChromaDB is required for {feature} features. "
"Please install it with 'pip install crewai[storage]'"
)
super().__init__(message)
class DatabaseOperationError(Exception):
"""Base exception class for database operation errors."""
def __init__(self, message: str, original_error: Optional[Exception] = None):
"""Initialize the database operation error.
Args:
message: The error message to display
original_error: The original exception that caused this error, if any
"""
super().__init__(message)
self.original_error = original_error
class DatabaseError:
"""Standardized error message templates for database operations."""
INIT_ERROR: str = "Database initialization error: {}"
SAVE_ERROR: str = "Error saving task outputs: {}"
UPDATE_ERROR: str = "Error updating task outputs: {}"
LOAD_ERROR: str = "Error loading task outputs: {}"
DELETE_ERROR: str = "Error deleting task outputs: {}"
@classmethod
def format_error(cls, template: str, error: Exception) -> str:
"""Format an error message with the given template and error.
Args:
template: The error message template to use
error: The exception to format into the template
Returns:
The formatted error message
"""
return template.format(str(error))
class AgentRepositoryError(Exception):
"""Exception raised when an agent repository is not found."""
...

View File

@@ -2384,6 +2384,16 @@ def test_multiple_conditional_tasks(researcher, writer):
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2412,6 +2422,16 @@ def test_using_contextual_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_long_term_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2441,6 +2461,16 @@ def test_using_contextual_memory_with_long_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_warning_long_term_memory_without_entity_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2478,6 +2508,16 @@ def test_warning_long_term_memory_without_entity_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_long_term_memory_with_memory_flag():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2513,6 +2553,16 @@ def test_long_term_memory_with_memory_flag():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_short_term_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",
@@ -2542,6 +2592,16 @@ def test_using_contextual_memory_with_short_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_disabled_memory_using_contextual_memory():
from unittest.mock import patch
# Check if ChromaDB is available
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
math_researcher = Agent(
role="Researcher",

View File

@@ -169,6 +169,15 @@ def test_crew_external_memory_reset(mem_type, crew_with_external_memory):
def test_crew_external_memory_save_with_memory_flag(
mem_method, crew_with_external_memory
):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method:
@@ -181,6 +190,15 @@ def test_crew_external_memory_save_with_memory_flag(
def test_crew_external_memory_save_using_crew_without_memory_flag(
mem_method, crew_with_external_memory_without_memory_flag
):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
with patch(
f"crewai.memory.external.external_memory.ExternalMemory.{mem_method}"
) as mock_method:

View File

@@ -7,10 +7,28 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@pytest.fixture
def long_term_memory():
"""Fixture to create a LongTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
return LongTermMemory()
def test_save_and_search(long_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = LongTermMemoryItem(
agent="test_agent",
task="test_task",

View File

@@ -12,6 +12,15 @@ from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
@@ -29,6 +38,15 @@ def short_term_memory():
def test_save_and_search(short_term_memory):
try:
import chromadb # noqa: F401
HAS_CHROMADB = True
except ImportError:
HAS_CHROMADB = False
if not HAS_CHROMADB:
pytest.skip("ChromaDB is required for this test")
memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value
test value test value test value test value test value test value

View File

@@ -0,0 +1,80 @@
import pytest
import importlib
import sys
from unittest.mock import patch
from crewai.utilities.errors import ChromaDBRequiredError
def test_import_without_chromadb():
"""Test that crewai can be imported without chromadb."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.memory.storage.rag_storage",
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai import Agent, Task, Crew, Process
agent = Agent(role="Test Agent", goal="Test Goal", backstory="Test Backstory")
task = Task(description="Test Task", agent=agent)
_ = Crew(agents=[agent], tasks=[task], process=Process.sequential)
def test_memory_storage_without_chromadb():
"""Test that memory storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.memory.storage.rag_storage" in sys.modules:
importlib.reload(sys.modules["crewai.memory.storage.rag_storage"])
from crewai.memory.storage.rag_storage import RAGStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = RAGStorage("memory", allow_reset=True, crew=None)
assert "ChromaDB is required for memory storage" in str(excinfo.value)
def test_knowledge_storage_without_chromadb():
"""Test that knowledge storage raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
modules_to_reload = [
"crewai.knowledge.storage.knowledge_storage",
"crewai.utilities.embedding_configurator"
]
for module in modules_to_reload:
if module in sys.modules:
importlib.reload(sys.modules[module])
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
storage = KnowledgeStorage()
storage.initialize_knowledge_storage()
assert "ChromaDB is required for knowledge storage" in str(excinfo.value)
def test_embedding_configurator_without_chromadb():
"""Test that embedding configurator raises appropriate error when chromadb is not available."""
with patch.dict(sys.modules, {"chromadb": None, "chromadb.errors": None, "chromadb.api": None, "chromadb.config": None}):
if "crewai.utilities.embedding_configurator" in sys.modules:
importlib.reload(sys.modules["crewai.utilities.embedding_configurator"])
from crewai.utilities.embedding_configurator import EmbeddingConfigurator, HAS_CHROMADB
assert not HAS_CHROMADB
with pytest.raises(ChromaDBRequiredError) as excinfo:
configurator = EmbeddingConfigurator()
configurator.configure_embedder()
assert "ChromaDB is required for embedding functionality" in str(excinfo.value)

1782
uv.lock generated

File diff suppressed because it is too large Load Diff