added knowledge to agent level

This commit is contained in:
Lorenze Jay
2024-11-25 15:28:42 -08:00
parent 8cf1cd5a62
commit 6c6c60318c
6 changed files with 115 additions and 43 deletions

View File

@@ -1,7 +1,7 @@
import os
import shutil
import subprocess
from typing import Any, List, Literal, Optional, Union
from typing import Any, List, Literal, Optional, Union, Dict
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -10,6 +10,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS
from crewai.llm import LLM
from crewai.knowledge.knowledge import Knowledge
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
@@ -120,9 +121,14 @@ class Agent(BaseAgent):
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
)
knowledge: Optional[Dict[str, Any]] = Field(
default=None,
description="Knowledge for the agent. Add knowledge sources to the knowledge object.",
)
@model_validator(mode="after")
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
unnacepted_attributes = [
"AWS_ACCESS_KEY_ID",
@@ -235,6 +241,28 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def _set_knowledge(self):
try:
if self.knowledge:
knowledge_agent_name = f"{self.role.replace(' ', '_')}"
if isinstance(self.knowledge, dict):
knowledge_data = self.knowledge.copy()
knowledge_data["store_dir"] = knowledge_agent_name
self.knowledge = Knowledge(**knowledge_data)
self.knowledge.storage.initialize_knowledge_storage()
try:
for source in self.knowledge.sources:
source.storage = self.knowledge.storage
source.add()
except Exception as e:
self._logger.log(
"warning",
f"Failed to init knowledge: {knowledge_agent_name} {e}",
color="yellow",
)
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
def execute_task(
self,
task: Any,
@@ -273,17 +301,20 @@ class Agent(BaseAgent):
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
# Integrate the knowledge base
if self.knowledge and isinstance(self.knowledge, Knowledge):
agent_knowledge_snippets = self.knowledge.query([task.prompt()])
agent_knowledge_context = self._extract_knowledge_context(
agent_knowledge_snippets
)
if agent_knowledge_context:
task_prompt += agent_knowledge_context
if self.crew and self.crew.knowledge:
knowledge_snippets = self.crew.knowledge.query([task.prompt()])
valid_snippets = [
result["context"]
for result in knowledge_snippets
if result and result.get("context")
]
if valid_snippets:
formatted_knowledge = "\n".join(valid_snippets)
task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}"
crew_knowledge_context = self._extract_knowledge_context(knowledge_snippets)
if crew_knowledge_context:
task_prompt += crew_knowledge_context
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
@@ -490,6 +521,18 @@ class Agent(BaseAgent):
f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
)
def _extract_knowledge_context(
self, knowledge_snippets: List[Dict[str, Any]]
) -> str:
"""Extract knowledge from the task prompt."""
valid_snippets = [
result["context"]
for result in knowledge_snippets
if result and result.get("context")
]
snippet = "\n".join(valid_snippets)
return f"Additional Information: {snippet}" if valid_snippets else ""
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -203,10 +203,10 @@ class Crew(BaseModel):
description="List of execution logs for tasks",
)
knowledge: Optional[Dict[str, Any]] = Field(
default=None, description="Knowledge for the crew. Add knowledge sources to the knowledge object."
default=None,
description="Knowledge for the crew. Add knowledge sources to the knowledge object.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
@@ -282,11 +282,23 @@ class Crew(BaseModel):
@model_validator(mode="after")
def create_crew_knowledge(self) -> "Crew":
"""Create the knowledge for the crew."""
if self.knowledge:
try:
self.knowledge = Knowledge(**self.knowledge) if isinstance(self.knowledge, dict) else self.knowledge
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid knowledge configuration: {str(e)}")
self.knowledge = (
Knowledge(**self.knowledge, store_dir="crew")
if isinstance(self.knowledge, dict)
else self.knowledge
)
self.knowledge.storage.initialize_knowledge_storage()
for source in self.knowledge.sources:
source.storage = self.knowledge.storage
source.add()
except Exception as e:
self._logger.log(
"warning", f"Failed to init knowledge: {e}", color="yellow"
)
return self
@model_validator(mode="after")

View File

@@ -7,6 +7,7 @@ from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.logger import Logger
from crewai.utilities.constants import DEFAULT_SCORE_THRESHOLD
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
@@ -18,23 +19,26 @@ class Knowledge(BaseModel):
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
store_dir: Optional[str] = None
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None, **data):
def __init__(
self,
store_dir: str,
embedder_config: Optional[Dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
super().__init__(**data)
self.storage = KnowledgeStorage(embedder_config=embedder_config or None)
try:
for source in self.sources:
source.add()
except Exception as e:
Logger(verbose=True).log(
"warning",
f"Failed to init knowledge: {e}",
color="yellow",
if storage:
self.storage = storage
else:
self.storage = KnowledgeStorage(
embedder_config=embedder_config, store_dir=store_dir
)
def query(

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional
import numpy as np
from pydantic import BaseModel, ConfigDict, Field
@@ -18,6 +18,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict)
store_dir: Optional[str] = Field(default=None)
@abstractmethod
def load_content(self) -> Dict[Any, str]:

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from pydantic import Field
@@ -9,6 +9,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries plain text content using embeddings."""
content: str = Field(...)
store_dir: Optional[str] = Field(default=None)
def model_post_init(self, _):
"""Post-initialization method to validate content."""

View File

@@ -4,11 +4,11 @@ import logging
import chromadb
import os
from crewai.utilities.paths import db_storage_path
from typing import Optional, List
from typing import Dict, Any
from typing import Optional, List, Dict, Any
from crewai.utilities import EmbeddingConfigurator
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
import hashlib
from chromadb.config import Settings
@contextlib.contextmanager
@@ -35,9 +35,16 @@ class KnowledgeStorage(BaseKnowledgeStorage):
"""
collection: Optional[chromadb.Collection] = None
store_dir: Optional[str] = "knowledge"
app: Optional[chromadb.PersistentClient] = None
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None):
self._initialize_app(embedder_config or {})
def __init__(
self,
embedder_config: Optional[Dict[str, Any]] = None,
store_dir: Optional[str] = None,
):
self.embedder_config = embedder_config
self.store_dir = store_dir
def search(
self,
@@ -67,27 +74,33 @@ class KnowledgeStorage(BaseKnowledgeStorage):
else:
raise Exception("Collection not initialized")
def _initialize_app(self, embedder_config: Optional[Dict[str, Any]] = None):
import chromadb
from chromadb.config import Settings
self._set_embedder_config(embedder_config)
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=f"{db_storage_path()}/knowledge",
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
self.collection = self.app.get_or_create_collection(name="knowledge")
collection_name = (
f"knowledge_{self.store_dir}" if self.store_dir else "knowledge"
)
self.collection = self.app.get_or_create_collection(name=collection_name)
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
if self.app:
self.app.reset()
else:
base_path = os.path.join(db_storage_path(), "knowledge")
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
def save(
self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]]
@@ -95,9 +108,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
if self.collection:
metadatas = [metadata] if isinstance(metadata, dict) else metadata
ids = [
hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents
]
ids = [hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents]
self.collection.upsert(
documents=documents,