mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
3 Commits
1.2.1
...
devin/1747
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
43866c0f61 | ||
|
|
f1a91c506b | ||
|
|
30486acb4d |
@@ -736,6 +736,214 @@ recent_news = SpaceNewsKnowledgeSource(
|
||||
)
|
||||
```
|
||||
|
||||
## Custom Knowledge Storage with pgvector
|
||||
|
||||
CrewAI allows you to use custom knowledge storage backends to store and retrieve knowledge. One powerful option is using PostgreSQL with the pgvector extension, which provides efficient vector similarity search capabilities.
|
||||
|
||||
### Prerequisites
|
||||
|
||||
Before using pgvector as your knowledge storage backend, you need to:
|
||||
|
||||
1. Set up a PostgreSQL database with the pgvector extension installed
|
||||
2. Install the required Python packages
|
||||
|
||||
#### PostgreSQL Setup
|
||||
|
||||
```bash
|
||||
# Install PostgreSQL (Ubuntu example)
|
||||
sudo apt update
|
||||
sudo apt install postgresql postgresql-contrib
|
||||
|
||||
# Connect to PostgreSQL
|
||||
sudo -u postgres psql
|
||||
|
||||
# Create a database
|
||||
CREATE DATABASE crewai_knowledge;
|
||||
|
||||
# Connect to the database
|
||||
\c crewai_knowledge
|
||||
|
||||
# Install the pgvector extension
|
||||
CREATE EXTENSION vector;
|
||||
|
||||
# Create a user (optional)
|
||||
CREATE USER crewai WITH PASSWORD 'your_password';
|
||||
GRANT ALL PRIVILEGES ON DATABASE crewai_knowledge TO crewai;
|
||||
```
|
||||
|
||||
#### Python Dependencies
|
||||
|
||||
Add these dependencies to your project:
|
||||
|
||||
```bash
|
||||
# Install required packages
|
||||
uv add sqlalchemy pgvector psycopg2-binary
|
||||
```
|
||||
|
||||
### Using pgvector Knowledge Storage
|
||||
|
||||
Here's how to use pgvector as your knowledge storage backend in CrewAI:
|
||||
|
||||
```python
|
||||
from crewai import Agent, Task, Crew, Process
|
||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
|
||||
|
||||
# Create a connection string for PostgreSQL
|
||||
connection_string = "postgresql://username:password@localhost:5432/crewai_knowledge"
|
||||
|
||||
# Create a custom knowledge storage
|
||||
pgvector_storage = PGVectorKnowledgeStorage(
|
||||
connection_string=connection_string,
|
||||
embedding_dimension=1536, # Dimension for OpenAI embeddings
|
||||
)
|
||||
|
||||
# Create a knowledge source
|
||||
content = "CrewAI is a framework for orchestrating role-playing autonomous agents."
|
||||
string_source = StringKnowledgeSource(
|
||||
content=content,
|
||||
storage=pgvector_storage # Use pgvector storage
|
||||
)
|
||||
|
||||
# Create an agent with the knowledge store
|
||||
agent = Agent(
|
||||
role="CrewAI Expert",
|
||||
goal="Explain CrewAI concepts accurately.",
|
||||
backstory="You are an expert in the CrewAI framework.",
|
||||
knowledge_sources=[string_source],
|
||||
)
|
||||
|
||||
# Create a task
|
||||
task = Task(
|
||||
description="Answer this question about CrewAI: {question}",
|
||||
expected_output="A detailed answer about CrewAI.",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
# Create a crew with the knowledge sources
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=True,
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
# Run the crew
|
||||
result = crew.kickoff(inputs={"question": "What is CrewAI?"})
|
||||
```
|
||||
|
||||
### Configuration Options
|
||||
|
||||
The `PGVectorKnowledgeStorage` class supports the following configuration options:
|
||||
|
||||
| Option | Description | Default |
|
||||
|--------|-------------|---------|
|
||||
| `connection_string` | PostgreSQL connection string | Required |
|
||||
| `embedder` | Embedding configuration | OpenAI embeddings |
|
||||
| `table_name` | Name of the table to store documents | "documents" |
|
||||
| `embedding_dimension` | Dimension of the embedding vectors | 1536 |
|
||||
|
||||
#### Connection String Format
|
||||
|
||||
The PostgreSQL connection string follows this format:
|
||||
```
|
||||
postgresql://username:password@hostname:port/database_name
|
||||
```
|
||||
|
||||
#### Custom Embedding Models
|
||||
|
||||
You can configure custom embedding models just like with the default knowledge storage:
|
||||
|
||||
```python
|
||||
pgvector_storage = PGVectorKnowledgeStorage(
|
||||
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
|
||||
embedder={
|
||||
"provider": "openai",
|
||||
"config": {
|
||||
"model": "text-embedding-3-large"
|
||||
}
|
||||
},
|
||||
embedding_dimension=3072, # Dimension for text-embedding-3-large
|
||||
)
|
||||
```
|
||||
|
||||
### Advanced Usage
|
||||
|
||||
#### Custom Table Names
|
||||
|
||||
You can specify a custom table name to store your documents:
|
||||
|
||||
```python
|
||||
pgvector_storage = PGVectorKnowledgeStorage(
|
||||
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
|
||||
table_name="my_custom_documents_table"
|
||||
)
|
||||
```
|
||||
|
||||
#### Multiple Knowledge Collections
|
||||
|
||||
You can create multiple knowledge collections by using different table names:
|
||||
|
||||
```python
|
||||
# Create a storage for product knowledge
|
||||
product_storage = PGVectorKnowledgeStorage(
|
||||
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
|
||||
table_name="product_knowledge"
|
||||
)
|
||||
|
||||
# Create a storage for customer knowledge
|
||||
customer_storage = PGVectorKnowledgeStorage(
|
||||
connection_string="postgresql://username:password@localhost:5432/crewai_knowledge",
|
||||
table_name="customer_knowledge"
|
||||
)
|
||||
```
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
#### Common Issues
|
||||
|
||||
1. **pgvector Extension Not Found**
|
||||
|
||||
Error: `ERROR: could not load library "/usr/local/lib/postgresql/pgvector.so"`
|
||||
|
||||
Solution: Make sure the pgvector extension is properly installed in your PostgreSQL instance:
|
||||
```sql
|
||||
CREATE EXTENSION vector;
|
||||
```
|
||||
|
||||
2. **Dimension Mismatch**
|
||||
|
||||
Error: `ERROR: vector dimensions do not match`
|
||||
|
||||
Solution: Ensure that the `embedding_dimension` parameter matches the dimension of your embedding model.
|
||||
|
||||
3. **Connection Issues**
|
||||
|
||||
Error: `Could not connect to PostgreSQL server`
|
||||
|
||||
Solution: Check your connection string and make sure the PostgreSQL server is running and accessible.
|
||||
|
||||
#### Performance Tips
|
||||
|
||||
1. **Create an Index**
|
||||
|
||||
For better performance with large datasets, create an index on the embedding column:
|
||||
|
||||
```sql
|
||||
CREATE INDEX ON documents USING hnsw (embedding vector_l2_ops);
|
||||
```
|
||||
|
||||
2. **Batch Processing**
|
||||
|
||||
When saving large numbers of documents, process them in batches to avoid memory issues:
|
||||
|
||||
```python
|
||||
batch_size = 100
|
||||
for i in range(0, len(documents), batch_size):
|
||||
batch = documents[i:i+batch_size]
|
||||
pgvector_storage.save(batch)
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
<AccordionGroup>
|
||||
|
||||
@@ -67,6 +67,11 @@ docling = [
|
||||
aisuite = [
|
||||
"aisuite>=0.1.10",
|
||||
]
|
||||
pgvector = [
|
||||
"pgvector>=0.2.0",
|
||||
"sqlalchemy>=2.0.0",
|
||||
"psycopg2-binary>=2.9.0",
|
||||
]
|
||||
|
||||
[tool.uv]
|
||||
dev-dependencies = [
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
try:
|
||||
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
|
||||
__all__ = ["PGVectorKnowledgeStorage"]
|
||||
except ImportError:
|
||||
__all__ = []
|
||||
|
||||
220
src/crewai/knowledge/storage/pgvector_knowledge_storage.py
Normal file
220
src/crewai/knowledge/storage/pgvector_knowledge_storage.py
Normal file
@@ -0,0 +1,220 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from sqlalchemy import create_engine, Column, String, Text
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
|
||||
from crewai.utilities import EmbeddingConfigurator
|
||||
|
||||
try:
|
||||
from pgvector.sqlalchemy import Vector
|
||||
HAS_PGVECTOR = True
|
||||
except ImportError:
|
||||
HAS_PGVECTOR = False
|
||||
class VectorType:
|
||||
def __init__(self, dimensions: int):
|
||||
self.dimensions = dimensions
|
||||
Vector = VectorType # type: ignore
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Document(Base): # type: ignore
|
||||
"""SQLAlchemy model for document storage with pgvector."""
|
||||
__tablename__ = "documents"
|
||||
|
||||
id = Column(String, primary_key=True)
|
||||
content = Column(Text)
|
||||
doc_metadata = Column(Text) # JSON serialized metadata
|
||||
embedding: Column = Column(Vector(1536)) # Adjust dimension based on embedding model
|
||||
|
||||
class PGVectorKnowledgeStorage(BaseKnowledgeStorage):
|
||||
"""
|
||||
Knowledge storage implementation using pgvector.
|
||||
|
||||
This class provides an implementation of BaseKnowledgeStorage using PostgreSQL
|
||||
with the pgvector extension for vector similarity search.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
connection_string: str,
|
||||
embedder: Optional[Dict[str, Any]] = None,
|
||||
table_name: str = "documents",
|
||||
embedding_dimension: int = 1536,
|
||||
):
|
||||
"""
|
||||
Initialize the pgvector knowledge storage.
|
||||
|
||||
Args:
|
||||
connection_string: PostgreSQL connection string
|
||||
embedder: Configuration dictionary for the embedder
|
||||
table_name: Name of the table to store documents
|
||||
embedding_dimension: Dimension of the embedding vectors
|
||||
"""
|
||||
if not HAS_PGVECTOR:
|
||||
raise ImportError(
|
||||
"pgvector is not installed. Please install it with: pip install pgvector"
|
||||
)
|
||||
|
||||
self.connection_string = connection_string
|
||||
self.table_name = table_name
|
||||
self.embedding_dimension = embedding_dimension
|
||||
|
||||
self.engine = create_engine(connection_string)
|
||||
self.Session = sessionmaker(bind=self.engine)
|
||||
|
||||
self._set_embedder_config(embedder)
|
||||
|
||||
Base.metadata.create_all(self.engine)
|
||||
|
||||
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
|
||||
"""
|
||||
Set the embedding configuration for the knowledge storage.
|
||||
|
||||
Args:
|
||||
embedder_config: Configuration dictionary for the embedder.
|
||||
If None or empty, defaults to the default embedding function.
|
||||
"""
|
||||
self.embedder = (
|
||||
EmbeddingConfigurator().configure_embedder(embedder)
|
||||
if embedder
|
||||
else self._create_default_embedding_function()
|
||||
)
|
||||
|
||||
def search(
|
||||
self,
|
||||
query: List[str],
|
||||
limit: int = 3,
|
||||
filter: Optional[dict] = None,
|
||||
score_threshold: float = 0.35,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Search for documents in the knowledge base.
|
||||
|
||||
Args:
|
||||
query: List of query strings
|
||||
limit: Maximum number of results to return
|
||||
filter: Optional metadata filter
|
||||
score_threshold: Minimum similarity score threshold
|
||||
|
||||
Returns:
|
||||
List of search results with id, metadata, context, and score
|
||||
"""
|
||||
session = self.Session()
|
||||
|
||||
try:
|
||||
query_embedding = self.embedder([query[0]])[0]
|
||||
|
||||
sql_query = text("""
|
||||
SELECT id, content, doc_metadata, 1 - (embedding <=> :query_embedding) as similarity
|
||||
FROM :table_name
|
||||
ORDER BY embedding <=> :query_embedding
|
||||
LIMIT :limit
|
||||
""").bindparams(
|
||||
query_embedding=query_embedding,
|
||||
limit=limit,
|
||||
table_name=self.table_name
|
||||
)
|
||||
|
||||
results = session.execute(
|
||||
sql_query,
|
||||
{"query_embedding": query_embedding, "limit": limit}
|
||||
).fetchall()
|
||||
|
||||
formatted_results = []
|
||||
for row in results:
|
||||
similarity = float(row[3])
|
||||
if similarity >= score_threshold:
|
||||
formatted_results.append({
|
||||
"id": row[0],
|
||||
"context": row[1],
|
||||
"metadata": row[2], # Keep the key as 'metadata' for API compatibility
|
||||
"score": similarity,
|
||||
})
|
||||
|
||||
return formatted_results
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def save(
|
||||
self,
|
||||
documents: List[str],
|
||||
metadata: Optional[Dict[str, Any] | List[Dict[str, Any]]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Save documents to the knowledge base.
|
||||
|
||||
Args:
|
||||
documents: List of document strings
|
||||
metadata: Optional metadata for the documents
|
||||
"""
|
||||
session = self.Session()
|
||||
|
||||
try:
|
||||
unique_docs = {}
|
||||
|
||||
for idx, doc in enumerate(documents):
|
||||
doc_id = hashlib.sha256(doc.encode("utf-8")).hexdigest()
|
||||
doc_metadata = None
|
||||
if metadata is not None:
|
||||
if isinstance(metadata, list):
|
||||
doc_metadata = metadata[idx]
|
||||
else:
|
||||
doc_metadata = metadata
|
||||
unique_docs[doc_id] = (doc, doc_metadata)
|
||||
|
||||
docs_list = [doc for doc, _ in unique_docs.values()]
|
||||
embeddings = self.embedder(docs_list)
|
||||
|
||||
for i, (doc_id, (doc, meta)) in enumerate(unique_docs.items()):
|
||||
embedding = embeddings[i]
|
||||
|
||||
existing = session.query(Document).filter(Document.id == doc_id).first()
|
||||
|
||||
if existing:
|
||||
setattr(existing, "content", doc)
|
||||
setattr(existing, "doc_metadata", str(meta) if meta else None)
|
||||
setattr(existing, "embedding", embedding)
|
||||
else:
|
||||
new_doc = Document(
|
||||
id=doc_id,
|
||||
content=doc,
|
||||
doc_metadata=str(meta) if meta else None,
|
||||
embedding=embedding,
|
||||
)
|
||||
session.add(new_doc)
|
||||
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
logging.error(f"Failed to save documents: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the knowledge base by dropping and recreating the table."""
|
||||
session = self.Session()
|
||||
try:
|
||||
session.query(Document).delete()
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
logging.error(f"Failed to reset knowledge base: {e}")
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
def _create_default_embedding_function(self):
|
||||
"""Create a default embedding function for the knowledge storage."""
|
||||
from chromadb.utils.embedding_functions.openai_embedding_function import (
|
||||
OpenAIEmbeddingFunction,
|
||||
)
|
||||
|
||||
return OpenAIEmbeddingFunction(
|
||||
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
|
||||
)
|
||||
88
tests/knowledge/pgvector_knowledge_storage_test.py
Normal file
88
tests/knowledge/pgvector_knowledge_storage_test.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from crewai.knowledge.storage.pgvector_knowledge_storage import PGVectorKnowledgeStorage
|
||||
|
||||
class MockSession:
|
||||
def __init__(self):
|
||||
self.queries = []
|
||||
self.commits = 0
|
||||
self.rollbacks = 0
|
||||
self.closes = 0
|
||||
|
||||
def query(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def filter(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def first(self):
|
||||
return None
|
||||
|
||||
def add(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def commit(self):
|
||||
self.commits += 1
|
||||
|
||||
def rollback(self):
|
||||
self.rollbacks += 1
|
||||
|
||||
def close(self):
|
||||
self.closes += 1
|
||||
|
||||
def execute(self, *args, **kwargs):
|
||||
return self
|
||||
|
||||
def fetchall(self):
|
||||
return [
|
||||
("doc1", "This is a test document", '{"source": "test"}', 0.9),
|
||||
("doc2", "Another test document", '{"source": "test"}', 0.8),
|
||||
("doc3", "A third test document", '{"source": "test"}', 0.7),
|
||||
]
|
||||
|
||||
@pytest.fixture
|
||||
def mock_embedder():
|
||||
return lambda x: [[0.1] * 1536 for _ in range(len(x))]
|
||||
|
||||
@pytest.fixture
|
||||
def mock_session():
|
||||
return MockSession()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_session_maker(mock_session):
|
||||
def session_maker():
|
||||
return mock_session
|
||||
return session_maker
|
||||
|
||||
@pytest.fixture
|
||||
def mock_engine():
|
||||
return MagicMock()
|
||||
|
||||
@pytest.fixture
|
||||
def pgvector_storage(mock_embedder, mock_session_maker, mock_engine):
|
||||
with patch("crewai.knowledge.storage.pgvector_knowledge_storage.create_engine", return_value=mock_engine), \
|
||||
patch("crewai.knowledge.storage.pgvector_knowledge_storage.sessionmaker", return_value=mock_session_maker), \
|
||||
patch("crewai.knowledge.storage.pgvector_knowledge_storage.Base.metadata.create_all"):
|
||||
storage = PGVectorKnowledgeStorage(connection_string="postgresql://test:test@localhost:5432/test")
|
||||
storage.embedder = mock_embedder
|
||||
return storage
|
||||
|
||||
def test_search(pgvector_storage, mock_session):
|
||||
results = pgvector_storage.search(["test query"], limit=3, score_threshold=0.5)
|
||||
|
||||
assert len(results) == 3
|
||||
assert results[0]["id"] == "doc1"
|
||||
assert results[0]["context"] == "This is a test document"
|
||||
assert results[0]["score"] == 0.9
|
||||
|
||||
assert mock_session.closes == 1
|
||||
|
||||
def test_save(pgvector_storage, mock_session):
|
||||
documents = ["Document 1", "Document 2"]
|
||||
metadata = [{"source": "test1"}, {"source": "test2"}]
|
||||
|
||||
pgvector_storage.save(documents, metadata)
|
||||
|
||||
assert mock_session.commits == 1
|
||||
assert mock_session.closes == 1
|
||||
Reference in New Issue
Block a user