From 10f445e18a197f4faf9db36737c6dadd72536700 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Thu, 14 Nov 2024 18:31:07 -0800 Subject: [PATCH] ensure embeddings are persisted --- src/crewai/cli/cli.py | 5 +- src/crewai/cli/reset_memories_command.py | 10 +- src/crewai/knowledge/embedder/fastembed.py | 2 +- src/crewai/knowledge/knowledge.py | 55 +++++---- .../source/base_file_knowledge_source.py | 5 + .../knowledge/source/base_knowledge_source.py | 13 ++- .../knowledge/source/csv_knowledge_source.py | 1 + .../source/excel_knowledge_source.py | 1 + .../knowledge/source/json_knowledge_source.py | 1 + .../knowledge/source/pdf_knowledge_source.py | 1 + .../source/string_knowledge_source.py | 4 +- .../source/text_file_knowledge_source.py | 1 + src/crewai/knowledge/storage/__init__.py | 0 .../knowledge/storage/knowledge_storage.py | 110 ++++++++++++++++++ .../memory/contextual/contextual_memory.py | 8 ++ src/crewai/memory/storage/base_rag_storage.py | 12 +- 16 files changed, 196 insertions(+), 33 deletions(-) create mode 100644 src/crewai/knowledge/storage/__init__.py create mode 100644 src/crewai/knowledge/storage/knowledge_storage.py diff --git a/src/crewai/cli/cli.py b/src/crewai/cli/cli.py index 0f43ff3f4..27358f865 100644 --- a/src/crewai/cli/cli.py +++ b/src/crewai/cli/cli.py @@ -136,6 +136,7 @@ def log_tasks_outputs() -> None: @click.option("-l", "--long", is_flag=True, help="Reset LONG TERM memory") @click.option("-s", "--short", is_flag=True, help="Reset SHORT TERM memory") @click.option("-e", "--entities", is_flag=True, help="Reset ENTITIES memory") +@click.option("-kn", "--knowledge", is_flag=True, help="Reset KNOWLEDGE") @click.option( "-k", "--kickoff-outputs", @@ -143,7 +144,7 @@ def log_tasks_outputs() -> None: help="Reset LATEST KICKOFF TASK OUTPUTS", ) @click.option("-a", "--all", is_flag=True, help="Reset ALL memories") -def reset_memories(long, short, entities, kickoff_outputs, all): +def reset_memories(long, short, entities, knowledge, kickoff_outputs, all): """ Reset the crew memories (long, short, entity, latest_crew_kickoff_ouputs). This will delete all the data saved. """ @@ -153,7 +154,7 @@ def reset_memories(long, short, entities, kickoff_outputs, all): "Please specify at least one memory type to reset using the appropriate flags." ) return - reset_memories_command(long, short, entities, kickoff_outputs, all) + reset_memories_command(long, short, entities, knowledge, kickoff_outputs, all) except Exception as e: click.echo(f"An error occurred while resetting memories: {e}", err=True) diff --git a/src/crewai/cli/reset_memories_command.py b/src/crewai/cli/reset_memories_command.py index c4808594f..e589fad29 100644 --- a/src/crewai/cli/reset_memories_command.py +++ b/src/crewai/cli/reset_memories_command.py @@ -5,9 +5,12 @@ from crewai.memory.entity.entity_memory import EntityMemory from crewai.memory.long_term.long_term_memory import LongTermMemory from crewai.memory.short_term.short_term_memory import ShortTermMemory from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler +from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage -def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None: +def reset_memories_command( + long, short, entity, kickoff_outputs, all, knowledge +) -> None: """ Reset the crew memories. @@ -17,6 +20,7 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None: entity (bool): Whether to reset the entity memory. kickoff_outputs (bool): Whether to reset the latest kickoff task outputs. all (bool): Whether to reset all memories. + knowledge (bool): Whether to reset the knowledge. """ try: @@ -25,6 +29,7 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None: EntityMemory().reset() LongTermMemory().reset() TaskOutputStorageHandler().reset() + KnowledgeStorage().reset() click.echo("All memories have been reset.") else: if long: @@ -40,6 +45,9 @@ def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None: if kickoff_outputs: TaskOutputStorageHandler().reset() click.echo("Latest Kickoff outputs stored has been reset.") + if knowledge: + KnowledgeStorage().reset() + click.echo("Knowledge has been reset.") except subprocess.CalledProcessError as e: click.echo(f"An error occurred while resetting the memories: {e}", err=True) diff --git a/src/crewai/knowledge/embedder/fastembed.py b/src/crewai/knowledge/embedder/fastembed.py index adff1cdbe..54db11643 100644 --- a/src/crewai/knowledge/embedder/fastembed.py +++ b/src/crewai/knowledge/embedder/fastembed.py @@ -39,7 +39,7 @@ class FastEmbed(BaseEmbedder): if not FASTEMBED_AVAILABLE: raise ImportError( "FastEmbed is not installed. Please install it with: " - "pip install fastembed or pip install fastembed-gpu for GPU support" + "uv pip install fastembed or uv pip install fastembed-gpu for GPU support" ) self.model = TextEmbedding( diff --git a/src/crewai/knowledge/knowledge.py b/src/crewai/knowledge/knowledge.py index ba4ac34a9..9f36d2ff2 100644 --- a/src/crewai/knowledge/knowledge.py +++ b/src/crewai/knowledge/knowledge.py @@ -1,10 +1,11 @@ -from typing import List +from typing import List, Optional from pydantic import BaseModel, ConfigDict, Field from crewai.knowledge.embedder.base_embedder import BaseEmbedder from crewai.knowledge.embedder.fastembed import FastEmbed from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource +from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage class Knowledge(BaseModel): @@ -12,6 +13,8 @@ class Knowledge(BaseModel): embedder: BaseEmbedder = Field(default_factory=FastEmbed) model_config = ConfigDict(arbitrary_types_allowed=True) + agents: List[str] = Field(default_factory=list) + storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) def __init__(self, **data): super().__init__(**data) @@ -19,35 +22,45 @@ class Knowledge(BaseModel): for source in self.sources: source.add(self.embedder) - def query(self, query: str, top_k: int = 3) -> List[str]: + def query( + self, query: str, top_k: int = 3, preference: Optional[str] = None + ) -> List[str]: """ Query across all knowledge sources to find the most relevant information. Returns the top_k most relevant chunks. """ - if not self.sources: - return [] + # if not self.sources: + # return [] + + results = self.storage.search( + [query], + top_k, + filter={"preference": preference} if preference else None, + score_threshold=0.35, + ) + return results # Collect all chunks and embeddings from all sources - all_chunks = [] - all_embeddings = [] + # all_chunks = [] + # all_embeddings = [] - for source in self.sources: - all_chunks.extend(source.chunks) - all_embeddings.extend(source.get_embeddings()) + # for source in self.sources: + # all_chunks.extend(source.chunks) + # all_embeddings.extend(source.get_embeddings()) - # Embed the query - query_embedding = self.embedder.embed_text(query) + # # Embed the query + # query_embedding = self.embedder.embed_text(query) - # Calculate similarities - similarities = [] - for idx, embedding in enumerate(all_embeddings): - similarity = query_embedding.dot(embedding) - similarities.append((similarity, idx)) + # # Calculate similarities + # similarities = [] + # for idx, embedding in enumerate(all_embeddings): + # similarity = query_embedding.dot(embedding) + # similarities.append((similarity, idx)) - # Sort by similarity - similarities.sort(reverse=True, key=lambda x: x[0]) + # # Sort by similarity + # similarities.sort(reverse=True, key=lambda x: x[0]) - # Get top_k results - top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]] + # # Get top_k results + # top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]] - return top_chunks + # return top_chunks diff --git a/src/crewai/knowledge/source/base_file_knowledge_source.py b/src/crewai/knowledge/source/base_file_knowledge_source.py index a658d2e30..12739b7ad 100644 --- a/src/crewai/knowledge/source/base_file_knowledge_source.py +++ b/src/crewai/knowledge/source/base_file_knowledge_source.py @@ -3,6 +3,7 @@ from pathlib import Path from pydantic import Field from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource +from typing import Dict, Any class BaseFileKnowledgeSource(BaseKnowledgeSource): @@ -22,3 +23,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource): if not self.file_path.is_file(): raise ValueError(f"Path is not a file: {self.file_path}") return "" + + def _save_documents(self, metadata: Dict[str, Any]): + """Save the documents to the storage.""" + self.storage.save(self.chunks, metadata) diff --git a/src/crewai/knowledge/source/base_knowledge_source.py b/src/crewai/knowledge/source/base_knowledge_source.py index 51675af68..d9e67e911 100644 --- a/src/crewai/knowledge/source/base_knowledge_source.py +++ b/src/crewai/knowledge/source/base_knowledge_source.py @@ -1,10 +1,12 @@ from abc import ABC, abstractmethod -from typing import List +from typing import List, Optional import numpy as np from pydantic import BaseModel, ConfigDict, Field from crewai.knowledge.embedder.base_embedder import BaseEmbedder +from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage +from typing import Dict, Any class BaseKnowledgeSource(BaseModel, ABC): @@ -16,6 +18,8 @@ class BaseKnowledgeSource(BaseModel, ABC): chunk_embeddings: List[np.ndarray] = Field(default_factory=list) model_config = ConfigDict(arbitrary_types_allowed=True) + storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) + metadata: Dict[str, Any] = Field(default_factory=dict) @abstractmethod def load_content(self): @@ -37,3 +41,10 @@ class BaseKnowledgeSource(BaseModel, ABC): text[i : i + self.chunk_size] for i in range(0, len(text), self.chunk_size - self.chunk_overlap) ] + + def _save_documents(self, metadata: Dict[str, Any]): + """ + Save the documents to the storage. + This method should be called after the chunks and embeddings are generated. + """ + self.storage.save(self.chunks, metadata) diff --git a/src/crewai/knowledge/source/csv_knowledge_source.py b/src/crewai/knowledge/source/csv_knowledge_source.py index eea307a33..6c283b6c7 100644 --- a/src/crewai/knowledge/source/csv_knowledge_source.py +++ b/src/crewai/knowledge/source/csv_knowledge_source.py @@ -29,6 +29,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/excel_knowledge_source.py b/src/crewai/knowledge/source/excel_knowledge_source.py index fd17914e8..0c1dcd034 100644 --- a/src/crewai/knowledge/source/excel_knowledge_source.py +++ b/src/crewai/knowledge/source/excel_knowledge_source.py @@ -39,6 +39,7 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/json_knowledge_source.py b/src/crewai/knowledge/source/json_knowledge_source.py index 351e3e207..8b37ed75b 100644 --- a/src/crewai/knowledge/source/json_knowledge_source.py +++ b/src/crewai/knowledge/source/json_knowledge_source.py @@ -41,6 +41,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/pdf_knowledge_source.py b/src/crewai/knowledge/source/pdf_knowledge_source.py index 1ca0ab356..34fa50182 100644 --- a/src/crewai/knowledge/source/pdf_knowledge_source.py +++ b/src/crewai/knowledge/source/pdf_knowledge_source.py @@ -41,6 +41,7 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/string_knowledge_source.py b/src/crewai/knowledge/source/string_knowledge_source.py index 9dd0ecd9f..563e8c403 100644 --- a/src/crewai/knowledge/source/string_knowledge_source.py +++ b/src/crewai/knowledge/source/string_knowledge_source.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Dict, Any from pydantic import Field @@ -28,6 +28,8 @@ class StringKnowledgeSource(BaseKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + print("adding") + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/text_file_knowledge_source.py b/src/crewai/knowledge/source/text_file_knowledge_source.py index fb14319e5..c9f59adca 100644 --- a/src/crewai/knowledge/source/text_file_knowledge_source.py +++ b/src/crewai/knowledge/source/text_file_knowledge_source.py @@ -24,6 +24,7 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) + self._save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/storage/__init__.py b/src/crewai/knowledge/storage/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/crewai/knowledge/storage/knowledge_storage.py b/src/crewai/knowledge/storage/knowledge_storage.py new file mode 100644 index 000000000..17724cbe9 --- /dev/null +++ b/src/crewai/knowledge/storage/knowledge_storage.py @@ -0,0 +1,110 @@ +from crewai.memory.storage.base_rag_storage import BaseRAGStorage +from crewai.utilities.paths import db_storage_path +from typing import Optional, List + +import chromadb +import numpy as np +from typing import Dict, Any +import uuid +import contextlib +import io +import logging + + +@contextlib.contextmanager +def suppress_logging( + logger_name="chromadb.segment.impl.vector.local_persistent_hnsw", + level=logging.ERROR, +): + logger = logging.getLogger(logger_name) + original_level = logger.getEffectiveLevel() + logger.setLevel(level) + with ( + contextlib.redirect_stdout(io.StringIO()), + contextlib.redirect_stderr(io.StringIO()), + contextlib.suppress(UserWarning), + ): + yield + logger.setLevel(original_level) + + +class KnowledgeStorage(BaseRAGStorage): + """ + Extends Storage to handle embeddings for memory entries, improving + search efficiency. + """ + + collection: Optional[chromadb.Collection] = None + + def __init__(self, embedder_config=None): + self.embedder_config = ( + embedder_config or self._create_default_embedding_function() + ) + self._initialize_app() + + def _sanitize_role(self, role: str) -> str: + return role.replace(" ", "_") + + def search( + self, + query: List[str], + limit: int = 3, + filter: Optional[dict] = None, + score_threshold: float = 0.35, + ) -> List[Dict[str, Any]]: + with suppress_logging(): + if self.collection: + fetched = self.collection.query( + query_texts=query, + n_results=limit, + where=filter, + ) + print("Fetched", fetched) + results = [] + for i in range(len(fetched["ids"][0])): + result = { + "id": fetched["ids"][0][i], + "metadata": fetched["metadatas"][0][i], + "context": fetched["documents"][0][i], + "score": fetched["distances"][0][i], + } + if result["score"] >= score_threshold: + results.append(result) + return results + else: + raise Exception("Collection not initialized") + + def _initialize_app(self): + import chromadb + from chromadb.config import Settings + + chroma_client = chromadb.PersistentClient( + path=f"{db_storage_path()}/knowledge", + settings=Settings(allow_reset=True), + ) + + self.app = chroma_client + + try: + self.collection = self.app.get_or_create_collection(name="knowledge") + except Exception: + raise Exception("Failed to create or get collection") + + def reset(self): + if self.app: + self.app.reset() + + def save(self, documents: List[str], metadata: Dict[str, Any]): + if self.collection: + self.collection.add( + documents=documents, + metadatas=metadata, + ids=[str(uuid.uuid4())], + ) + else: + raise Exception("Collection not initialized") + + def _create_default_embedding_function(self): + from crewai.knowledge.embedder.fastembed import FastEmbed + + return FastEmbed().embed_texts diff --git a/src/crewai/memory/contextual/contextual_memory.py b/src/crewai/memory/contextual/contextual_memory.py index 9598fe6ee..98494ab98 100644 --- a/src/crewai/memory/contextual/contextual_memory.py +++ b/src/crewai/memory/contextual/contextual_memory.py @@ -102,3 +102,11 @@ class ContextualMemory: f"- {result['memory']}" for result in user_memories ) return f"User memories/preferences:\n{formatted_memories}" + + # TODO: set this up + # def _fetch_knowledge_context(self, query: str) -> str: + # """ + # Fetches relevant knowledge from Knowledge Storage. + # """ + # knowledge_results = self.knowledge.query(query) + # return "\n".join([result["context"] for result in knowledge_results]) diff --git a/src/crewai/memory/storage/base_rag_storage.py b/src/crewai/memory/storage/base_rag_storage.py index 10b82ebff..b50caa84f 100644 --- a/src/crewai/memory/storage/base_rag_storage.py +++ b/src/crewai/memory/storage/base_rag_storage.py @@ -55,12 +55,12 @@ class BaseRAGStorage(ABC): """Reset the storage.""" pass - @abstractmethod - def _generate_embedding( - self, text: str, metadata: Optional[Dict[str, Any]] = None - ) -> Any: - """Generate an embedding for the given text and metadata.""" - pass + # @abstractmethod + # def _generate_embedding( + # self, text: str, metadata: Optional[Dict[str, Any]] = None + # ) -> Any: + # """Generate an embedding for the given text and metadata.""" + # pass @abstractmethod def _initialize_app(self):