Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
43866c0f61 Fix CI: Rename metadata column to doc_metadata and fix SQL injection vulnerability
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-22 10:21:07 +00:00
Devin AI
f1a91c506b Fix CI: Make pgvector an optional dependency, fix SQL injection and type errors
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-22 10:15:58 +00:00
Devin AI
30486acb4d Add documentation and implementation for custom pgvector knowledge storage (#2883)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-22 09:57:16 +00:00
5 changed files with 526 additions and 0 deletions

View File

@@ -736,6 +736,214 @@ recent_news = SpaceNewsKnowledgeSource(
)
```
## Custom Knowledge Storage with pgvector
CrewAI allows you to use custom knowledge storage backends to store and retrieve knowledge. One powerful option is using PostgreSQL with the pgvector extension, which provides efficient vector similarity search capabilities.
### Prerequisites
Before using pgvector as your knowledge storage backend, you need to:
1. Set up a PostgreSQL database with the pgvector extension installed
2. Install the required Python packages
#### PostgreSQL Setup
```bash
# Install PostgreSQL (Ubuntu example)
sudo apt update
sudo apt install postgresql postgresql-contrib
# Connect to PostgreSQL
sudo -u postgres psql
# Create a database
CREATE DATABASE crewai_knowledge;
# Connect to the database
\c crewai_knowledge
# Install the pgvector extension
CREATE EXTENSION vector;
# Create a user (optional)
CREATE USER crewai WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE crewai_knowledge TO crewai;
```
#### Python Dependencies
Add these dependencies to your project:
```bash
# Install required packages
uv add sqlalchemy pgvector psycopg2-binary
```
### Using pgvector Knowledge Storage
Here's how to use pgvector as your knowledge storage backend in CrewAI:
```python
from crewai import Agent, Task, Crew, Process
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
# Create a connection string for PostgreSQL
connection_string = "postgresql://username:password@localhost:5432/crewai_knowledge"
# Create a custom knowledge storage
pgvector_storage = PGVectorKnowledgeStorage(
connection_string=connection_string,
embedding_dimension=1536, # Dimension for OpenAI embeddings
)
# Create a knowledge source
content = "CrewAI is a framework for orchestrating role-playing autonomous agents."
string_source = StringKnowledgeSource(
content=content,
storage=pgvector_storage # Use pgvector storage
)
# Create an agent with the knowledge store
agent = Agent(
role="CrewAI Expert",
goal="Explain CrewAI concepts accurately.",
backstory="You are an expert in the CrewAI framework.",
knowledge_sources=[string_source],
)
# Create a task
task = Task(
description="Answer this question about CrewAI: {question}",
expected_output="A detailed answer about CrewAI.",
agent=agent,
)
# Create a crew with the knowledge sources
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential,
)
# Run the crew
result = crew.kickoff(inputs={"question": "What is CrewAI?"})
```
### Configuration Options
The `PGVectorKnowledgeStorage` class supports the following configuration options:
| Option | Description | Default |
|--------|-------------|---------|
| `connection_string` | PostgreSQL connection string | Required |
| `embedder` | Embedding configuration | OpenAI embeddings |
| `table_name` | Name of the table to store documents | "documents" |
| `embedding_dimension` | Dimension of the embedding vectors | 1536 |
#### Connection String Format
The PostgreSQL connection string follows this format:
```
postgresql://username:password@hostname:port/database_name
```
#### Custom Embedding Models
You can configure custom embedding models just like with the default knowledge storage:
```python
pgvector_storage = PGVectorKnowledgeStorage(
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
embedder={
"provider": "openai",
"config": {
"model": "text-embedding-3-large"
}
},
embedding_dimension=3072, # Dimension for text-embedding-3-large
)
```
### Advanced Usage
#### Custom Table Names
You can specify a custom table name to store your documents:
```python
pgvector_storage = PGVectorKnowledgeStorage(
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
table_name="my_custom_documents_table"
)
```
#### Multiple Knowledge Collections
You can create multiple knowledge collections by using different table names:
```python
# Create a storage for product knowledge
product_storage = PGVectorKnowledgeStorage(
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
table_name="product_knowledge"
)
# Create a storage for customer knowledge
customer_storage = PGVectorKnowledgeStorage(
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
table_name="customer_knowledge"
)
```
### Troubleshooting
#### Common Issues
1. **pgvector Extension Not Found**
Error: `ERROR: could not load library "/usr/local/lib/postgresql/pgvector.so"`
Solution: Make sure the pgvector extension is properly installed in your PostgreSQL instance:
```sql
CREATE EXTENSION vector;
```
2. **Dimension Mismatch**
Error: `ERROR: vector dimensions do not match`
Solution: Ensure that the `embedding_dimension` parameter matches the dimension of your embedding model.
3. **Connection Issues**
Error: `Could not connect to PostgreSQL server`
Solution: Check your connection string and make sure the PostgreSQL server is running and accessible.
#### Performance Tips
1. **Create an Index**
For better performance with large datasets, create an index on the embedding column:
```sql
CREATE INDEX ON documents USING hnsw (embedding vector_l2_ops);
```
2. **Batch Processing**
When saving large numbers of documents, process them in batches to avoid memory issues:
```python
batch_size = 100
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
pgvector_storage.save(batch)
```
## Best Practices
<AccordionGroup>

View File

@@ -67,6 +67,11 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
pgvector = [
"pgvector>=0.2.0",
"sqlalchemy>=2.0.0",
"psycopg2-binary>=2.9.0",
]
[tool.uv]
dev-dependencies = [

View File

@@ -0,0 +1,5 @@
try:
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
__all__ = ["PGVectorKnowledgeStorage"]
except ImportError:
__all__ = []

View File

@@ -0,0 +1,220 @@
from typing import Any, Dict, List, Optional
import hashlib
import logging
import os
from sqlalchemy import create_engine, Column, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
try:
from pgvector.sqlalchemy import Vector
HAS_PGVECTOR = True
except ImportError:
HAS_PGVECTOR = False
class VectorType:
def __init__(self, dimensions: int):
self.dimensions = dimensions
Vector = VectorType # type: ignore
Base = declarative_base()
class Document(Base): # type: ignore
"""SQLAlchemy model for document storage with pgvector."""
__tablename__ = "documents"
id = Column(String, primary_key=True)
content = Column(Text)
doc_metadata = Column(Text) # JSON serialized metadata
embedding: Column = Column(Vector(1536)) # Adjust dimension based on embedding model
class PGVectorKnowledgeStorage(BaseKnowledgeStorage):
"""
Knowledge storage implementation using pgvector.
This class provides an implementation of BaseKnowledgeStorage using PostgreSQL
with the pgvector extension for vector similarity search.
"""
def __init__(
self,
connection_string: str,
embedder: Optional[Dict[str, Any]] = None,
table_name: str = "documents",
embedding_dimension: int = 1536,
):
"""
Initialize the pgvector knowledge storage.
Args:
connection_string: PostgreSQL connection string
embedder: Configuration dictionary for the embedder
table_name: Name of the table to store documents
embedding_dimension: Dimension of the embedding vectors
"""
if not HAS_PGVECTOR:
raise ImportError(
"pgvector is not installed. Please install it with: pip install pgvector"
)
self.connection_string = connection_string
self.table_name = table_name
self.embedding_dimension = embedding_dimension
self.engine = create_engine(connection_string)
self.Session = sessionmaker(bind=self.engine)
self._set_embedder_config(embedder)
Base.metadata.create_all(self.engine)
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""
Set the embedding configuration for the knowledge storage.
Args:
embedder_config: Configuration dictionary for the embedder.
If None or empty, defaults to the default embedding function.
"""
self.embedder = (
EmbeddingConfigurator().configure_embedder(embedder)
if embedder
else self._create_default_embedding_function()
)
def search(
self,
query: List[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""
Search for documents in the knowledge base.
Args:
query: List of query strings
limit: Maximum number of results to return
filter: Optional metadata filter
score_threshold: Minimum similarity score threshold
Returns:
List of search results with id, metadata, context, and score
"""
session = self.Session()
try:
query_embedding = self.embedder([query[0]])[0]
sql_query = text("""
SELECT id, content, doc_metadata, 1 - (embedding <=> :query_embedding) as similarity
FROM :table_name
ORDER BY embedding <=> :query_embedding
LIMIT :limit
""").bindparams(
query_embedding=query_embedding,
limit=limit,
table_name=self.table_name
)
results = session.execute(
sql_query,
{"query_embedding": query_embedding, "limit": limit}
).fetchall()
formatted_results = []
for row in results:
similarity = float(row[3])
if similarity >= score_threshold:
formatted_results.append({
"id": row[0],
"context": row[1],
"metadata": row[2], # Keep the key as 'metadata' for API compatibility
"score": similarity,
})
return formatted_results
finally:
session.close()
def save(
self,
documents: List[str],
metadata: Optional[Dict[str, Any] | List[Dict[str, Any]]] = None,
) -> None:
"""
Save documents to the knowledge base.
Args:
documents: List of document strings
metadata: Optional metadata for the documents
"""
session = self.Session()
try:
unique_docs = {}
for idx, doc in enumerate(documents):
doc_id = hashlib.sha256(doc.encode("utf-8")).hexdigest()
doc_metadata = None
if metadata is not None:
if isinstance(metadata, list):
doc_metadata = metadata[idx]
else:
doc_metadata = metadata
unique_docs[doc_id] = (doc, doc_metadata)
docs_list = [doc for doc, _ in unique_docs.values()]
embeddings = self.embedder(docs_list)
for i, (doc_id, (doc, meta)) in enumerate(unique_docs.items()):
embedding = embeddings[i]
existing = session.query(Document).filter(Document.id == doc_id).first()
if existing:
setattr(existing, "content", doc)
setattr(existing, "doc_metadata", str(meta) if meta else None)
setattr(existing, "embedding", embedding)
else:
new_doc = Document(
id=doc_id,
content=doc,
doc_metadata=str(meta) if meta else None,
embedding=embedding,
)
session.add(new_doc)
session.commit()
except Exception as e:
session.rollback()
logging.error(f"Failed to save documents: {e}")
raise
finally:
session.close()
def reset(self) -> None:
"""Reset the knowledge base by dropping and recreating the table."""
session = self.Session()
try:
session.query(Document).delete()
session.commit()
except Exception as e:
session.rollback()
logging.error(f"Failed to reset knowledge base: {e}")
raise
finally:
session.close()
def _create_default_embedding_function(self):
"""Create a default embedding function for the knowledge storage."""
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)

View File

@@ -0,0 +1,88 @@
import pytest
from unittest.mock import patch, MagicMock
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
class MockSession:
def __init__(self):
self.queries = []
self.commits = 0
self.rollbacks = 0
self.closes = 0
def query(self, *args, **kwargs):
return self
def filter(self, *args, **kwargs):
return self
def first(self):
return None
def add(self, *args, **kwargs):
pass
def commit(self):
self.commits += 1
def rollback(self):
self.rollbacks += 1
def close(self):
self.closes += 1
def execute(self, *args, **kwargs):
return self
def fetchall(self):
return [
("doc1", "This is a test document", '{"source": "test"}', 0.9),
("doc2", "Another test document", '{"source": "test"}', 0.8),
("doc3", "A third test document", '{"source": "test"}', 0.7),
]
@pytest.fixture
def mock_embedder():
return lambda x: [[0.1] * 1536 for _ in range(len(x))]
@pytest.fixture
def mock_session():
return MockSession()
@pytest.fixture
def mock_session_maker(mock_session):
def session_maker():
return mock_session
return session_maker
@pytest.fixture
def mock_engine():
return MagicMock()
@pytest.fixture
def pgvector_storage(mock_embedder, mock_session_maker, mock_engine):
with patch("crewai.knowledge.storage.pgvector_knowledge_storage.create_engine", return_value=mock_engine), \
patch("crewai.knowledge.storage.pgvector_knowledge_storage.sessionmaker", return_value=mock_session_maker), \
patch("crewai.knowledge.storage.pgvector_knowledge_storage.Base.metadata.create_all"):
storage = PGVectorKnowledgeStorage(connection_string="postgresql://test:test@localhost:5432/test")
storage.embedder = mock_embedder
return storage
def test_search(pgvector_storage, mock_session):
results = pgvector_storage.search(["test query"], limit=3, score_threshold=0.5)
assert len(results) == 3
assert results[0]["id"] == "doc1"
assert results[0]["context"] == "This is a test document"
assert results[0]["score"] == 0.9
assert mock_session.closes == 1
def test_save(pgvector_storage, mock_session):
documents = ["Document 1", "Document 2"]
metadata = [{"source": "test1"}, {"source": "test2"}]
pgvector_storage.save(documents, metadata)
assert mock_session.commits == 1
assert mock_session.closes == 1