improvements all around Knowledge class

This commit is contained in:
Lorenze Jay
2024-11-15 15:28:07 -08:00
parent 10f445e18a
commit cb03ee60b8
13 changed files with 99 additions and 102 deletions

View File

@@ -1,7 +1,7 @@
import os
import shutil
import subprocess
from typing import Any, List, Literal, Optional, Union
from typing import Any, List, Literal, Optional, Union, Dict, Any
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -10,7 +10,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools import BaseTool
@@ -88,10 +87,6 @@ class Agent(BaseAgent):
llm: Union[str, InstanceOf[LLM], Any] = Field(
description="Language model that will run the agent.", default=None
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
function_calling_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
)
@@ -237,8 +232,8 @@ class Agent(BaseAgent):
self._validate_docker_installation()
# Initialize the Knowledge object if knowledge_sources are provided
if self.knowledge_sources:
self._knowledge = Knowledge(sources=self.knowledge_sources)
if self.crew and self.crew.knowledge:
self._knowledge = self.crew.knowledge
else:
self._knowledge = None
@@ -288,12 +283,19 @@ class Agent(BaseAgent):
task_prompt += self.i18n.slice("memory").format(memory=memory)
# Integrate the knowledge base
if self._knowledge:
# Query the knowledge base for relevant information
knowledge_snippets = self._knowledge.query(query=task.prompt())
if self.crew and self.crew.knowledge:
knowledge_snippets: List[Dict[str, Any]] = self.crew.knowledge.query(
[task.prompt()]
)
if knowledge_snippets:
formatted_knowledge = "\n".join(knowledge_snippets)
task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}"
valid_snippets = [
result["context"]
for result in knowledge_snippets
if result and result.get("context")
]
if valid_snippets:
formatted_knowledge = "\n".join(valid_snippets)
task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}"
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)

View File

@@ -27,6 +27,8 @@ from crewai.llm import LLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.memory.user.user_memory import UserMemory
from crewai.process import Process
from crewai.task import Task
@@ -193,6 +195,13 @@ class Crew(BaseModel):
default=[],
description="List of execution logs for tasks",
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
knowledge: Optional[Knowledge] = Field(
default=None, description="Knowledge Source for the crew."
)
@field_validator("id", mode="before")
@classmethod
@@ -267,6 +276,13 @@ class Crew(BaseModel):
self._user_memory = None
return self
@model_validator(mode="after")
def create_crew_knowledge(self) -> "Crew":
self.knowledge = Knowledge(
sources=self.knowledge_sources or [], embedder_config=self.embedder
)
return self
@model_validator(mode="after")
def check_manager_llm(self):
"""Validates that the language model is set when using hierarchical process."""

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, ConfigDict, Field
@@ -23,44 +23,17 @@ class Knowledge(BaseModel):
source.add(self.embedder)
def query(
self, query: str, top_k: int = 3, preference: Optional[str] = None
) -> List[str]:
self, query: List[str], limit: int = 3, preference: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
"""
# if not self.sources:
# return []
results = self.storage.search(
[query],
top_k,
query,
limit,
filter={"preference": preference} if preference else None,
score_threshold=0.35,
)
return results
# Collect all chunks and embeddings from all sources
# all_chunks = []
# all_embeddings = []
# for source in self.sources:
# all_chunks.extend(source.chunks)
# all_embeddings.extend(source.get_embeddings())
# # Embed the query
# query_embedding = self.embedder.embed_text(query)
# # Calculate similarities
# similarities = []
# for idx, embedding in enumerate(all_embeddings):
# similarity = query_embedding.dot(embedding)
# similarities.append((similarity, idx))
# # Sort by similarity
# similarities.sort(reverse=True, key=lambda x: x[0])
# # Get top_k results
# top_chunks = [all_chunks[idx] for _, idx in similarities[:top_k]]
# return top_chunks

View File

@@ -1,29 +1,37 @@
from pathlib import Path
from typing import Union, List
from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from typing import Dict, Any
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
class BaseFileKnowledgeSource(BaseKnowledgeSource):
"""Base class for knowledge sources that load content from files."""
file_path: Path = Field(...)
content: str = Field(init=False, default="")
file_path: Union[Path, List[Path]] = Field(...)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict)
def model_post_init(self, context):
"""Post-initialization method to load content."""
self.content = self.load_content()
def load_content(self) -> str:
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess file content. Should be overridden by subclasses."""
if not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
if not self.file_path.is_file():
raise ValueError(f"Path is not a file: {self.file_path}")
return ""
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
def _save_documents(self, metadata: Dict[str, Any]):
for path in paths:
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {path}")
return {}
def save_documents(self, metadata: Dict[str, Any]):
"""Save the documents to the storage."""
self.storage.save(self.chunks, metadata)
chunk_metadatas = [metadata.copy() for _ in self.chunks]
self.storage.save(self.chunks, chunk_metadatas)

View File

@@ -12,7 +12,7 @@ from typing import Dict, Any
class BaseKnowledgeSource(BaseModel, ABC):
"""Abstract base class for knowledge sources."""
chunk_size: int = 1000
chunk_size: int = 4000
chunk_overlap: int = 200
chunks: List[str] = Field(default_factory=list)
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
@@ -42,7 +42,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]
def _save_documents(self, metadata: Dict[str, Any]):
def save_documents(self, metadata: Dict[str, Any]):
"""
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.

View File

@@ -29,7 +29,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -39,7 +39,7 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -41,7 +41,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -1,4 +1,5 @@
from typing import List
from typing import List, Dict
from pathlib import Path
from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -7,17 +8,23 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class PDFKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries PDF file content using embeddings."""
def load_content(self) -> str:
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess PDF file content."""
super().load_content() # Validate the file path
super().load_content() # Validate the file paths
pdfplumber = self._import_pdfplumber()
text = ""
with pdfplumber.open(self.file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
paths = [self.file_path] if isinstance(self.file_path, Path) else self.file_path
content = {}
for path in paths:
text = ""
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
content[path] = text
return content
def _import_pdfplumber(self):
"""Dynamically import pdfplumber."""
@@ -35,13 +42,14 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
Add PDF file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
# Compute embeddings for the new chunks
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
for _, text in self.content.items():
new_chunks = self._chunk_text(text)
self.chunks.extend(new_chunks)
# Compute embeddings for the new chunks
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -28,8 +28,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
print("adding")
self._save_documents(metadata=self.metadata)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -24,7 +24,7 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource):
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
self._save_documents(metadata=self.metadata)
self.save_documents(metadata=self.metadata)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""

View File

@@ -1,14 +1,12 @@
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities.paths import db_storage_path
from typing import Optional, List
import chromadb
import numpy as np
from typing import Dict, Any
import uuid
import contextlib
import io
import logging
import chromadb
from crewai.utilities.paths import db_storage_path
from typing import Optional, List
from typing import Dict, Any
@contextlib.contextmanager
@@ -28,7 +26,7 @@ def suppress_logging(
logger.setLevel(original_level)
class KnowledgeStorage(BaseRAGStorage):
class KnowledgeStorage:
"""
Extends Storage to handle embeddings for memory entries, improving
search efficiency.
@@ -42,9 +40,6 @@ class KnowledgeStorage(BaseRAGStorage):
)
self._initialize_app()
def _sanitize_role(self, role: str) -> str:
return role.replace(" ", "_")
def search(
self,
query: List[str],
@@ -94,12 +89,16 @@ class KnowledgeStorage(BaseRAGStorage):
if self.app:
self.app.reset()
def save(self, documents: List[str], metadata: Dict[str, Any]):
def save(
self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]]
):
if self.collection:
metadatas = [metadata] if isinstance(metadata, dict) else metadata
self.collection.add(
documents=documents,
metadatas=metadata,
ids=[str(uuid.uuid4())],
metadatas=metadatas,
ids=[str(uuid.uuid4()) for _ in range(len(documents))],
)
else:
raise Exception("Collection not initialized")

View File

@@ -102,11 +102,3 @@ class ContextualMemory:
f"- {result['memory']}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"
# TODO: set this up
# def _fetch_knowledge_context(self, query: str) -> str:
# """
# Fetches relevant knowledge from Knowledge Storage.
# """
# knowledge_results = self.knowledge.query(query)
# return "\n".join([result["context"] for result in knowledge_results])