ensure embeddings are persisted

This commit is contained in:
Lorenze Jay
2024-11-14 18:31:07 -08:00
parent 98a708ca15
commit 10f445e18a
16 changed files with 196 additions and 33 deletions

View File

@@ -136,6 +136,7 @@ def log_tasks_outputs() -> None:
@click.option("-l", "--long", is_flag=True, help="Reset LONG TERM memory") @click.option("-l", "--long", is_flag=True, help="Reset LONG TERM memory")
@click.option("-s", "--short", is_flag=True, help="Reset SHORT TERM memory") @click.option("-s", "--short", is_flag=True, help="Reset SHORT TERM memory")
@click.option("-e", "--entities", is_flag=True, help="Reset ENTITIES memory") @click.option("-e", "--entities", is_flag=True, help="Reset ENTITIES memory")
@click.option("-kn", "--knowledge", is_flag=True, help="Reset KNOWLEDGE")
@click.option( @click.option(
"-k", "-k",
"--kickoff-outputs", "--kickoff-outputs",
@@ -143,7 +144,7 @@ def log_tasks_outputs() -> None:
help="Reset LATEST KICKOFF TASK OUTPUTS", help="Reset LATEST KICKOFF TASK OUTPUTS",
) )
@click.option("-a", "--all", is_flag=True, help="Reset ALL memories") @click.option("-a", "--all", is_flag=True, help="Reset ALL memories")
def reset_memories(long, short, entities, kickoff_outputs, all): def reset_memories(long, short, entities, knowledge, kickoff_outputs, all):
""" """
Reset the crew memories (long, short, entity, latest_crew_kickoff_ouputs). This will delete all the data saved. Reset the crew memories (long, short, entity, latest_crew_kickoff_ouputs). This will delete all the data saved.
""" """
@@ -153,7 +154,7 @@ def reset_memories(long, short, entities, kickoff_outputs, all):
"Please specify at least one memory type to reset using the appropriate flags." "Please specify at least one memory type to reset using the appropriate flags."
) )
return return
reset_memories_command(long, short, entities, kickoff_outputs, all) reset_memories_command(long, short, entities, knowledge, kickoff_outputs, all)
except Exception as e: except Exception as e:
click.echo(f"An error occurred while resetting memories: {e}", err=True) click.echo(f"An error occurred while resetting memories: {e}", err=True)

View File

@@ -5,9 +5,12 @@ from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None: def reset_memories_command(
long, short, entity, kickoff_outputs, all, knowledge
) -> None:
""" """
Reset the crew memories. Reset the crew memories.
@@ -17,6 +20,7 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
entity (bool): Whether to reset the entity memory. entity (bool): Whether to reset the entity memory.
kickoff_outputs (bool): Whether to reset the latest kickoff task outputs. kickoff_outputs (bool): Whether to reset the latest kickoff task outputs.
all (bool): Whether to reset all memories. all (bool): Whether to reset all memories.
knowledge (bool): Whether to reset the knowledge.
""" """
try: try:
@@ -25,6 +29,7 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
EntityMemory().reset() EntityMemory().reset()
LongTermMemory().reset() LongTermMemory().reset()
TaskOutputStorageHandler().reset() TaskOutputStorageHandler().reset()
KnowledgeStorage().reset()
click.echo("All memories have been reset.") click.echo("All memories have been reset.")
else: else:
if long: if long:
@@ -40,6 +45,9 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
if kickoff_outputs: if kickoff_outputs:
TaskOutputStorageHandler().reset() TaskOutputStorageHandler().reset()
click.echo("Latest Kickoff outputs stored has been reset.") click.echo("Latest Kickoff outputs stored has been reset.")
if knowledge:
KnowledgeStorage().reset()
click.echo("Knowledge has been reset.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while resetting the memories: {e}", err=True) click.echo(f"An error occurred while resetting the memories: {e}", err=True)

View File

@@ -39,7 +39,7 @@ class FastEmbed(BaseEmbedder):
if not FASTEMBED_AVAILABLE: if not FASTEMBED_AVAILABLE:
raise ImportError( raise ImportError(
"FastEmbed is not installed. Please install it with: " "FastEmbed is not installed. Please install it with: "
"pip install fastembed or pip install fastembed-gpu for GPU support" "uv pip install fastembed or uv pip install fastembed-gpu for GPU support"
) )
self.model = TextEmbedding( self.model = TextEmbedding(

View File

@@ -1,10 +1,11 @@
from typing import List from typing import List, Optional
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.embedder.base_embedder import BaseEmbedder from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.embedder.fastembed import FastEmbed from crewai.knowledge.embedder.fastembed import FastEmbed
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
class Knowledge(BaseModel): class Knowledge(BaseModel):
@@ -12,6 +13,8 @@ class Knowledge(BaseModel):
embedder: BaseEmbedder = Field(default_factory=FastEmbed) embedder: BaseEmbedder = Field(default_factory=FastEmbed)
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
agents: List[str] = Field(default_factory=list)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
def __init__(self, **data): def __init__(self, **data):
super().__init__(**data) super().__init__(**data)
@@ -19,35 +22,45 @@ class Knowledge(BaseModel):
for source in self.sources: for source in self.sources:
source.add(self.embedder) source.add(self.embedder)
def query(self, query: str, top_k: int = 3) -> List[str]: def query(
self, query: str, top_k: int = 3, preference: Optional[str] = None
) -> List[str]:
""" """
Query across all knowledge sources to find the most relevant information. Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks. Returns the top_k most relevant chunks.
""" """
if not self.sources: # if not self.sources:
return [] # return []
results = self.storage.search(
[query],
top_k,
filter={"preference": preference} if preference else None,
score_threshold=0.35,
)
return results
# Collect all chunks and embeddings from all sources # Collect all chunks and embeddings from all sources
all_chunks = [] # all_chunks = []
all_embeddings = [] # all_embeddings = []
for source in self.sources: # for source in self.sources:
all_chunks.extend(source.chunks) # all_chunks.extend(source.chunks)
all_embeddings.extend(source.get_embeddings()) # all_embeddings.extend(source.get_embeddings())
# Embed the query # # Embed the query
query_embedding = self.embedder.embed_text(query) # query_embedding = self.embedder.embed_text(query)
# Calculate similarities # # Calculate similarities
similarities = [] # similarities = []
for idx, embedding in enumerate(all_embeddings): # for idx, embedding in enumerate(all_embeddings):
similarity = query_embedding.dot(embedding) # similarity = query_embedding.dot(embedding)
similarities.append((similarity, idx)) # similarities.append((similarity, idx))
# Sort by similarity # # Sort by similarity
similarities.sort(reverse=True, key=lambda x: x[0]) # similarities.sort(reverse=True, key=lambda x: x[0])
# Get top_k results # # Get top_k results
top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]] # top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]]
return top_chunks # return top_chunks

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from pydantic import Field from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from typing import Dict, Any
class BaseFileKnowledgeSource(BaseKnowledgeSource): class BaseFileKnowledgeSource(BaseKnowledgeSource):
@@ -22,3 +23,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource):
if not self.file_path.is_file(): if not self.file_path.is_file():
raise ValueError(f"Path is not a file: {self.file_path}") raise ValueError(f"Path is not a file: {self.file_path}")
return "" return ""
def _save_documents(self, metadata: Dict[str, Any]):
"""Save the documents to the storage."""
self.storage.save(self.chunks, metadata)

View File

@@ -1,10 +1,12 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List from typing import List, Optional
import numpy as np import numpy as np
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.embedder.base_embedder import BaseEmbedder from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from typing import Dict, Any
class BaseKnowledgeSource(BaseModel, ABC): class BaseKnowledgeSource(BaseModel, ABC):
@@ -16,6 +18,8 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list) chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict)
@abstractmethod @abstractmethod
def load_content(self): def load_content(self):
@@ -37,3 +41,10 @@ class BaseKnowledgeSource(BaseModel, ABC):
text[i : i + self.chunk_size] text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap) for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
] ]
def _save_documents(self, metadata: Dict[str, Any]):
"""
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
self.storage.save(self.chunks, metadata)

View File

@@ -29,6 +29,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

@@ -39,6 +39,7 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

@@ -41,6 +41,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

@@ -41,6 +41,7 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

@@ -1,4 +1,4 @@
from typing import List from typing import List, Dict, Any
from pydantic import Field from pydantic import Field
@@ -28,6 +28,8 @@ class StringKnowledgeSource(BaseKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
print("adding")
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

@@ -24,6 +24,7 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks) new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings # Save the embeddings
self.chunk_embeddings.extend(new_embeddings) self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""

View File

View File

@@ -0,0 +1,110 @@
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities.paths import db_storage_path
from typing import Optional, List
import chromadb
import numpy as np
from typing import Dict, Any
import uuid
import contextlib
import io
import logging
@contextlib.contextmanager
def suppress_logging(
logger_name="chromadb.segment.impl.vector.local_persistent_hnsw",
level=logging.ERROR,
):
logger = logging.getLogger(logger_name)
original_level = logger.getEffectiveLevel()
logger.setLevel(level)
with (
contextlib.redirect_stdout(io.StringIO()),
contextlib.redirect_stderr(io.StringIO()),
contextlib.suppress(UserWarning),
):
yield
logger.setLevel(original_level)
class KnowledgeStorage(BaseRAGStorage):
"""
Extends Storage to handle embeddings for memory entries, improving
search efficiency.
"""
collection: Optional[chromadb.Collection] = None
def __init__(self, embedder_config=None):
self.embedder_config = (
embedder_config or self._create_default_embedding_function()
)
self._initialize_app()
def _sanitize_role(self, role: str) -> str:
return role.replace(" ", "_")
def search(
self,
query: List[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
with suppress_logging():
if self.collection:
fetched = self.collection.query(
query_texts=query,
n_results=limit,
where=filter,
)
print("Fetched", fetched)
results = []
for i in range(len(fetched["ids"][0])):
result = {
"id": fetched["ids"][0][i],
"metadata": fetched["metadatas"][0][i],
"context": fetched["documents"][0][i],
"score": fetched["distances"][0][i],
}
if result["score"] >= score_threshold:
results.append(result)
return results
else:
raise Exception("Collection not initialized")
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
chroma_client = chromadb.PersistentClient(
path=f"{db_storage_path()}/knowledge",
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
self.collection = self.app.get_or_create_collection(name="knowledge")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
if self.app:
self.app.reset()
def save(self, documents: List[str], metadata: Dict[str, Any]):
if self.collection:
self.collection.add(
documents=documents,
metadatas=metadata,
ids=[str(uuid.uuid4())],
)
else:
raise Exception("Collection not initialized")
def _create_default_embedding_function(self):
from crewai.knowledge.embedder.fastembed import FastEmbed
return FastEmbed().embed_texts

View File

@@ -102,3 +102,11 @@ class ContextualMemory:
f"- {result['memory']}" for result in user_memories f"- {result['memory']}" for result in user_memories
) )
return f"User memories/preferences:\n{formatted_memories}" return f"User memories/preferences:\n{formatted_memories}"
# TODO: set this up
# def _fetch_knowledge_context(self, query: str) -> str:
# """
# Fetches relevant knowledge from Knowledge Storage.
# """
# knowledge_results = self.knowledge.query(query)
# return "\n".join([result["context"] for result in knowledge_results])

View File

@@ -55,12 +55,12 @@ class BaseRAGStorage(ABC):
"""Reset the storage.""" """Reset the storage."""
pass pass
@abstractmethod # @abstractmethod
def _generate_embedding( # def _generate_embedding(
self, text: str, metadata: Optional[Dict[str, Any]] = None # self, text: str, metadata: Optional[Dict[str, Any]] = None
) -> Any: # ) -> Any:
"""Generate an embedding for the given text and metadata.""" # """Generate an embedding for the given text and metadata."""
pass # pass
@abstractmethod @abstractmethod
def _initialize_app(self): def _initialize_app(self):