From cb03ee60b8e51b5914cf00674179cc047de1b0c6 Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Fri, 15 Nov 2024 15:28:07 -0800 Subject: [PATCH] improvements all around Knowledge class --- src/crewai/agent.py | 28 +++++++------ src/crewai/crew.py | 16 +++++++ src/crewai/knowledge/knowledge.py | 37 +++------------- .../source/base_file_knowledge_source.py | 28 ++++++++----- .../knowledge/source/base_knowledge_source.py | 4 +- .../knowledge/source/csv_knowledge_source.py | 2 +- .../source/excel_knowledge_source.py | 2 +- .../knowledge/source/json_knowledge_source.py | 2 +- .../knowledge/source/pdf_knowledge_source.py | 42 +++++++++++-------- .../source/string_knowledge_source.py | 3 +- .../source/text_file_knowledge_source.py | 2 +- .../knowledge/storage/knowledge_storage.py | 27 ++++++------ .../memory/contextual/contextual_memory.py | 8 ---- 13 files changed, 99 insertions(+), 102 deletions(-) diff --git a/src/crewai/agent.py b/src/crewai/agent.py index db8635617..8937ada84 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,7 +1,7 @@ import os import shutil import subprocess -from typing import Any, List, Literal, Optional, Union +from typing import Any, List, Literal, Optional, Union, Dict, Any from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -10,7 +10,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.cli.constants import ENV_VARS from crewai.knowledge.knowledge import Knowledge -from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.llm import LLM from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.tools import BaseTool @@ -88,10 +87,6 @@ class Agent(BaseAgent): llm: Union[str, InstanceOf[LLM], Any] = Field( description="Language model that will run the agent.", default=None ) - knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( - default=None, - description="Knowledge sources for the agent.", - ) function_calling_llm: Optional[Any] = Field( description="Language model that will run the agent.", default=None ) @@ -237,8 +232,8 @@ class Agent(BaseAgent): self._validate_docker_installation() # Initialize the Knowledge object if knowledge_sources are provided - if self.knowledge_sources: - self._knowledge = Knowledge(sources=self.knowledge_sources) + if self.crew and self.crew.knowledge: + self._knowledge = self.crew.knowledge else: self._knowledge = None @@ -288,12 +283,19 @@ class Agent(BaseAgent): task_prompt += self.i18n.slice("memory").format(memory=memory) # Integrate the knowledge base - if self._knowledge: - # Query the knowledge base for relevant information - knowledge_snippets = self._knowledge.query(query=task.prompt()) + if self.crew and self.crew.knowledge: + knowledge_snippets: List[Dict[str, Any]] = self.crew.knowledge.query( + [task.prompt()] + ) if knowledge_snippets: - formatted_knowledge = "\n".join(knowledge_snippets) - task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}" + valid_snippets = [ + result["context"] + for result in knowledge_snippets + if result and result.get("context") + ] + if valid_snippets: + formatted_knowledge = "\n".join(valid_snippets) + task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}" tools = tools or self.tools or [] self.create_agent_executor(tools=tools, task=task) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 04820adf8..59c8fd8d8 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -27,6 +27,8 @@ from crewai.llm import LLM from crewai.memory.entity.entity_memory import EntityMemory from crewai.memory.long_term.long_term_memory import LongTermMemory from crewai.memory.short_term.short_term_memory import ShortTermMemory +from crewai.knowledge.knowledge import Knowledge +from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.memory.user.user_memory import UserMemory from crewai.process import Process from crewai.task import Task @@ -193,6 +195,13 @@ class Crew(BaseModel): default=[], description="List of execution logs for tasks", ) + knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( + default=None, + description="Knowledge sources for the agent.", + ) + knowledge: Optional[Knowledge] = Field( + default=None, description="Knowledge Source for the crew." + ) @field_validator("id", mode="before") @classmethod @@ -267,6 +276,13 @@ class Crew(BaseModel): self._user_memory = None return self + @model_validator(mode="after") + def create_crew_knowledge(self) -> "Crew": + self.knowledge = Knowledge( + sources=self.knowledge_sources or [], embedder_config=self.embedder + ) + return self + @model_validator(mode="after") def check_manager_llm(self): """Validates that the language model is set when using hierarchical process.""" diff --git a/src/crewai/knowledge/knowledge.py b/src/crewai/knowledge/knowledge.py index 9f36d2ff2..79ff009f7 100644 --- a/src/crewai/knowledge/knowledge.py +++ b/src/crewai/knowledge/knowledge.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import List, Optional, Dict, Any from pydantic import BaseModel, ConfigDict, Field @@ -23,44 +23,17 @@ class Knowledge(BaseModel): source.add(self.embedder) def query( - self, query: str, top_k: int = 3, preference: Optional[str] = None - ) -> List[str]: + self, query: List[str], limit: int = 3, preference: Optional[str] = None + ) -> List[Dict[str, Any]]: """ Query across all knowledge sources to find the most relevant information. Returns the top_k most relevant chunks. """ - # if not self.sources: - # return [] results = self.storage.search( - [query], - top_k, + query, + limit, filter={"preference": preference} if preference else None, score_threshold=0.35, ) return results - - # Collect all chunks and embeddings from all sources - # all_chunks = [] - # all_embeddings = [] - - # for source in self.sources: - # all_chunks.extend(source.chunks) - # all_embeddings.extend(source.get_embeddings()) - - # # Embed the query - # query_embedding = self.embedder.embed_text(query) - - # # Calculate similarities - # similarities = [] - # for idx, embedding in enumerate(all_embeddings): - # similarity = query_embedding.dot(embedding) - # similarities.append((similarity, idx)) - - # # Sort by similarity - # similarities.sort(reverse=True, key=lambda x: x[0]) - - # # Get top_k results - # top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]] - - # return top_chunks diff --git a/src/crewai/knowledge/source/base_file_knowledge_source.py b/src/crewai/knowledge/source/base_file_knowledge_source.py index 12739b7ad..4c3e2e3f9 100644 --- a/src/crewai/knowledge/source/base_file_knowledge_source.py +++ b/src/crewai/knowledge/source/base_file_knowledge_source.py @@ -1,29 +1,37 @@ from pathlib import Path +from typing import Union, List from pydantic import Field from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from typing import Dict, Any +from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage class BaseFileKnowledgeSource(BaseKnowledgeSource): """Base class for knowledge sources that load content from files.""" - file_path: Path = Field(...) - content: str = Field(init=False, default="") + file_path: Union[Path, List[Path]] = Field(...) + content: Dict[Path, str] = Field(init=False, default_factory=dict) + storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) + metadata: Dict[str, Any] = Field(default_factory=dict) def model_post_init(self, context): """Post-initialization method to load content.""" self.content = self.load_content() - def load_content(self) -> str: + def load_content(self) -> Dict[Path, str]: """Load and preprocess file content. Should be overridden by subclasses.""" - if not self.file_path.exists(): - raise FileNotFoundError(f"File not found: {self.file_path}") - if not self.file_path.is_file(): - raise ValueError(f"Path is not a file: {self.file_path}") - return "" + paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path - def _save_documents(self, metadata: Dict[str, Any]): + for path in paths: + if not path.exists(): + raise FileNotFoundError(f"File not found: {path}") + if not path.is_file(): + raise ValueError(f"Path is not a file: {path}") + return {} + + def save_documents(self, metadata: Dict[str, Any]): """Save the documents to the storage.""" - self.storage.save(self.chunks, metadata) + chunk_metadatas = [metadata.copy() for _ in self.chunks] + self.storage.save(self.chunks, chunk_metadatas) diff --git a/src/crewai/knowledge/source/base_knowledge_source.py b/src/crewai/knowledge/source/base_knowledge_source.py index d9e67e911..4eb58df72 100644 --- a/src/crewai/knowledge/source/base_knowledge_source.py +++ b/src/crewai/knowledge/source/base_knowledge_source.py @@ -12,7 +12,7 @@ from typing import Dict, Any class BaseKnowledgeSource(BaseModel, ABC): """Abstract base class for knowledge sources.""" - chunk_size: int = 1000 + chunk_size: int = 4000 chunk_overlap: int = 200 chunks: List[str] = Field(default_factory=list) chunk_embeddings: List[np.ndarray] = Field(default_factory=list) @@ -42,7 +42,7 @@ class BaseKnowledgeSource(BaseModel, ABC): for i in range(0, len(text), self.chunk_size - self.chunk_overlap) ] - def _save_documents(self, metadata: Dict[str, Any]): + def save_documents(self, metadata: Dict[str, Any]): """ Save the documents to the storage. This method should be called after the chunks and embeddings are generated. diff --git a/src/crewai/knowledge/source/csv_knowledge_source.py b/src/crewai/knowledge/source/csv_knowledge_source.py index 6c283b6c7..bf7af5b0e 100644 --- a/src/crewai/knowledge/source/csv_knowledge_source.py +++ b/src/crewai/knowledge/source/csv_knowledge_source.py @@ -29,7 +29,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) - self._save_documents(metadata=self.metadata) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/excel_knowledge_source.py b/src/crewai/knowledge/source/excel_knowledge_source.py index 0c1dcd034..608657076 100644 --- a/src/crewai/knowledge/source/excel_knowledge_source.py +++ b/src/crewai/knowledge/source/excel_knowledge_source.py @@ -39,7 +39,7 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) - self._save_documents(metadata=self.metadata) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/json_knowledge_source.py b/src/crewai/knowledge/source/json_knowledge_source.py index 8b37ed75b..b5b0dcbf1 100644 --- a/src/crewai/knowledge/source/json_knowledge_source.py +++ b/src/crewai/knowledge/source/json_knowledge_source.py @@ -41,7 +41,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) - self._save_documents(metadata=self.metadata) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/pdf_knowledge_source.py b/src/crewai/knowledge/source/pdf_knowledge_source.py index 34fa50182..7a368f755 100644 --- a/src/crewai/knowledge/source/pdf_knowledge_source.py +++ b/src/crewai/knowledge/source/pdf_knowledge_source.py @@ -1,4 +1,5 @@ -from typing import List +from typing import List, Dict +from pathlib import Path from crewai.knowledge.embedder.base_embedder import BaseEmbedder from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource @@ -7,17 +8,23 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge class PDFKnowledgeSource(BaseFileKnowledgeSource): """A knowledge source that stores and queries PDF file content using embeddings.""" - def load_content(self) -> str: + def load_content(self) -> Dict[Path, str]: """Load and preprocess PDF file content.""" - super().load_content() # Validate the file path + super().load_content() # Validate the file paths pdfplumber = self._import_pdfplumber() - text = "" - with pdfplumber.open(self.file_path) as pdf: - for page in pdf.pages: - page_text = page.extract_text() - if page_text: - text += page_text + "\n" - return text + + paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path + content = {} + + for path in paths: + text = "" + with pdfplumber.open(path) as pdf: + for page in pdf.pages: + page_text = page.extract_text() + if page_text: + text += page_text + "\n" + content[path] = text + return content def _import_pdfplumber(self): """Dynamically import pdfplumber.""" @@ -35,13 +42,14 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource): Add PDF file content to the knowledge source, chunk it, compute embeddings, and save the embeddings. """ - new_chunks = self._chunk_text(self.content) - self.chunks.extend(new_chunks) - # Compute embeddings for the new chunks - new_embeddings = embedder.embed_chunks(new_chunks) - # Save the embeddings - self.chunk_embeddings.extend(new_embeddings) - self._save_documents(metadata=self.metadata) + for _, text in self.content.items(): + new_chunks = self._chunk_text(text) + self.chunks.extend(new_chunks) + # Compute embeddings for the new chunks + new_embeddings = embedder.embed_chunks(new_chunks) + # Save the embeddings + self.chunk_embeddings.extend(new_embeddings) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/string_knowledge_source.py b/src/crewai/knowledge/source/string_knowledge_source.py index 563e8c403..e9e72334f 100644 --- a/src/crewai/knowledge/source/string_knowledge_source.py +++ b/src/crewai/knowledge/source/string_knowledge_source.py @@ -28,8 +28,7 @@ class StringKnowledgeSource(BaseKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) - print("adding") - self._save_documents(metadata=self.metadata) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/source/text_file_knowledge_source.py b/src/crewai/knowledge/source/text_file_knowledge_source.py index c9f59adca..b8195da5c 100644 --- a/src/crewai/knowledge/source/text_file_knowledge_source.py +++ b/src/crewai/knowledge/source/text_file_knowledge_source.py @@ -24,7 +24,7 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource): new_embeddings = embedder.embed_chunks(new_chunks) # Save the embeddings self.chunk_embeddings.extend(new_embeddings) - self._save_documents(metadata=self.metadata) + self.save_documents(metadata=self.metadata) def _chunk_text(self, text: str) -> List[str]: """Utility method to split text into chunks.""" diff --git a/src/crewai/knowledge/storage/knowledge_storage.py b/src/crewai/knowledge/storage/knowledge_storage.py index 17724cbe9..d79a192a0 100644 --- a/src/crewai/knowledge/storage/knowledge_storage.py +++ b/src/crewai/knowledge/storage/knowledge_storage.py @@ -1,14 +1,12 @@ -from crewai.memory.storage.base_rag_storage import BaseRAGStorage -from crewai.utilities.paths import db_storage_path -from typing import Optional, List - -import chromadb -import numpy as np -from typing import Dict, Any import uuid import contextlib import io import logging +import chromadb + +from crewai.utilities.paths import db_storage_path +from typing import Optional, List +from typing import Dict, Any @contextlib.contextmanager @@ -28,7 +26,7 @@ def suppress_logging( logger.setLevel(original_level) -class KnowledgeStorage(BaseRAGStorage): +class KnowledgeStorage: """ Extends Storage to handle embeddings for memory entries, improving search efficiency. @@ -42,9 +40,6 @@ class KnowledgeStorage(BaseRAGStorage): ) self._initialize_app() - def _sanitize_role(self, role: str) -> str: - return role.replace(" ", "_") - def search( self, query: List[str], @@ -94,12 +89,16 @@ class KnowledgeStorage(BaseRAGStorage): if self.app: self.app.reset() - def save(self, documents: List[str], metadata: Dict[str, Any]): + def save( + self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]] + ): if self.collection: + metadatas = [metadata] if isinstance(metadata, dict) else metadata + self.collection.add( documents=documents, - metadatas=metadata, - ids=[str(uuid.uuid4())], + metadatas=metadatas, + ids=[str(uuid.uuid4()) for _ in range(len(documents))], ) else: raise Exception("Collection not initialized") diff --git a/src/crewai/memory/contextual/contextual_memory.py b/src/crewai/memory/contextual/contextual_memory.py index 98494ab98..9598fe6ee 100644 --- a/src/crewai/memory/contextual/contextual_memory.py +++ b/src/crewai/memory/contextual/contextual_memory.py @@ -102,11 +102,3 @@ class ContextualMemory: f"- {result['memory']}" for result in user_memories ) return f"User memories/preferences:\n{formatted_memories}" - - # TODO: set this up - # def _fetch_knowledge_context(self, query: str) -> str: - # """ - # Fetches relevant knowledge from Knowledge Storage. - # """ - # knowledge_results = self.knowledge.query(query) - # return "\n".join([result["context"] for result in knowledge_results])