added knowledge to agent level (#1655)

* added knowledge to agent level

* linted

* added doc

* added from suggestions

* added test

* fixes from discussion

* fix docs

* fix test

* rm cassette for knowledge_sources test as its a mock and update agent doc string

* fix test

* rm unused

* linted
This commit is contained in:
Lorenze Jay
2024-11-27 11:33:07 -08:00
committed by GitHub
parent 366bbbbea3
commit c6a6c918e0
10 changed files with 654 additions and 91 deletions

View File

@@ -5,8 +5,8 @@ from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.logger import Logger
from crewai.utilities.constants import DEFAULT_SCORE_THRESHOLD
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
@@ -18,24 +18,33 @@ class Knowledge(BaseModel):
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None, **data):
def __init__(
self,
collection_name: str,
sources: List[BaseKnowledgeSource],
embedder_config: Optional[Dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
super().__init__(**data)
self.storage = KnowledgeStorage(embedder_config=embedder_config or None)
try:
for source in self.sources:
source.add()
except Exception as e:
Logger(verbose=True).log(
"warning",
f"Failed to init knowledge: {e}",
color="yellow",
if storage:
self.storage = storage
else:
self.storage = KnowledgeStorage(
embedder_config=embedder_config, collection_name=collection_name
)
self.sources = sources
self.storage.initialize_knowledge_storage()
for source in sources:
source.storage = self.storage
source.add()
def query(
self, query: List[str], limit: int = 3, preference: Optional[str] = None
@@ -52,3 +61,8 @@ class Knowledge(BaseModel):
score_threshold=DEFAULT_SCORE_THRESHOLD,
)
return results
def _add_sources(self):
for source in self.sources:
source.storage = self.storage
source.add()

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
import numpy as np
from pydantic import BaseModel, ConfigDict, Field
@@ -18,6 +18,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict)
collection_name: Optional[str] = Field(default=None)
@abstractmethod
def load_content(self) -> Dict[Any, str]:

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from pydantic import Field
@@ -9,6 +9,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries plain text content using embeddings."""
content: str = Field(...)
collection_name: Optional[str] = Field(default=None)
def model_post_init(self, _):
"""Post-initialization method to validate content."""

View File

@@ -3,12 +3,16 @@ import io
import logging
import chromadb
import os
import chromadb.errors
from crewai.utilities.paths import db_storage_path
from typing import Optional, List
from typing import Dict, Any
from typing import Optional, List, Dict, Any, Union
from crewai.utilities import EmbeddingConfigurator
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
import hashlib
from chromadb.config import Settings
from chromadb.api import ClientAPI
from crewai.utilities.logger import Logger
@contextlib.contextmanager
@@ -35,9 +39,16 @@ class KnowledgeStorage(BaseKnowledgeStorage):
"""
collection: Optional[chromadb.Collection] = None
collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None):
self._initialize_app(embedder_config or {})
def __init__(
self,
embedder_config: Optional[Dict[str, Any]] = None,
collection_name: Optional[str] = None,
):
self.collection_name = collection_name
self._set_embedder_config(embedder_config)
def search(
self,
@@ -67,43 +78,75 @@ class KnowledgeStorage(BaseKnowledgeStorage):
else:
raise Exception("Collection not initialized")
def _initialize_app(self, embedder_config: Optional[Dict[str, Any]] = None):
import chromadb
from chromadb.config import Settings
self._set_embedder_config(embedder_config)
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=f"{db_storage_path()}/knowledge",
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
self.collection = self.app.get_or_create_collection(name="knowledge")
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
else "knowledge"
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder_config
)
else:
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
if self.app:
self.app.reset()
else:
base_path = os.path.join(db_storage_path(), "knowledge")
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
def save(
self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]]
self,
documents: List[str],
metadata: Union[Dict[str, Any], List[Dict[str, Any]]],
):
if self.collection:
metadatas = [metadata] if isinstance(metadata, dict) else metadata
try:
metadatas = [metadata] if isinstance(metadata, dict) else metadata
ids = [
hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents
]
ids = [
hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents
]
self.collection.upsert(
documents=documents,
metadatas=metadatas,
ids=ids,
)
self.collection.upsert(
documents=documents,
metadatas=metadatas,
ids=ids,
)
except chromadb.errors.InvalidDimensionException as e:
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except Exception as e:
Logger(verbose=True).log(
"error", f"Failed to upsert documents: {e}", "red"
)
raise
else:
raise Exception("Collection not initialized")

View File

@@ -0,0 +1,12 @@
from typing import Any, Dict, List
def extract_knowledge_context(knowledge_snippets: List[Dict[str, Any]]) -> str:
"""Extract knowledge from the task prompt."""
valid_snippets = [
result["context"]
for result in knowledge_snippets
if result and result.get("context")
]
snippet = "\n".join(valid_snippets)
return f"Additional Information: {snippet}" if valid_snippets else ""