mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
added knowledge to agent level (#1655)
* added knowledge to agent level * linted * added doc * added from suggestions * added test * fixes from discussion * fix docs * fix test * rm cassette for knowledge_sources test as its a mock and update agent doc string * fix test * rm unused * linted
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Any, List, Literal, Optional, Union
|
||||
from typing import Any, List, Literal, Optional, Union, Dict
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
|
||||
@@ -10,6 +10,8 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.cli.constants import ENV_VARS
|
||||
from crewai.llm import LLM
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.memory.contextual.contextual_memory import ContextualMemory
|
||||
from crewai.task import Task
|
||||
from crewai.tools import BaseTool
|
||||
@@ -19,6 +21,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
|
||||
from crewai.utilities.converter import generate_model_description
|
||||
from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
|
||||
|
||||
|
||||
def mock_agent_ops_provider():
|
||||
@@ -65,6 +68,7 @@ class Agent(BaseAgent):
|
||||
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
|
||||
tools: Tools at agents disposal
|
||||
step_callback: Callback to be executed after each step of the agent execution.
|
||||
knowledge_sources: Knowledge sources for the agent.
|
||||
"""
|
||||
|
||||
_times_executed: int = PrivateAttr(default=0)
|
||||
@@ -122,9 +126,21 @@ class Agent(BaseAgent):
|
||||
default="safe",
|
||||
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
|
||||
)
|
||||
embedder_config: Optional[Dict[str, Any]] = Field(
|
||||
default=None,
|
||||
description="Embedder configuration for the agent.",
|
||||
)
|
||||
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
|
||||
default=None,
|
||||
description="Knowledge sources for the agent.",
|
||||
)
|
||||
_knowledge: Optional[Knowledge] = PrivateAttr(
|
||||
default=None,
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def post_init_setup(self):
|
||||
self._set_knowledge()
|
||||
self.agent_ops_agent_name = self.role
|
||||
unaccepted_attributes = [
|
||||
"AWS_ACCESS_KEY_ID",
|
||||
@@ -232,6 +248,21 @@ class Agent(BaseAgent):
|
||||
self.cache_handler = CacheHandler()
|
||||
self.set_cache_handler(self.cache_handler)
|
||||
|
||||
def _set_knowledge(self):
|
||||
try:
|
||||
if self.knowledge_sources:
|
||||
knowledge_agent_name = f"{self.role.replace(' ', '_')}"
|
||||
if isinstance(self.knowledge_sources, list) and all(
|
||||
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
|
||||
):
|
||||
self._knowledge = Knowledge(
|
||||
sources=self.knowledge_sources,
|
||||
embedder_config=self.embedder_config,
|
||||
collection_name=knowledge_agent_name,
|
||||
)
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
|
||||
|
||||
def execute_task(
|
||||
self,
|
||||
task: Task,
|
||||
@@ -286,17 +317,21 @@ class Agent(BaseAgent):
|
||||
if memory.strip() != "":
|
||||
task_prompt += self.i18n.slice("memory").format(memory=memory)
|
||||
|
||||
# Integrate the knowledge base
|
||||
if self.crew and self.crew.knowledge:
|
||||
knowledge_snippets = self.crew.knowledge.query([task.prompt()])
|
||||
valid_snippets = [
|
||||
result["context"]
|
||||
for result in knowledge_snippets
|
||||
if result and result.get("context")
|
||||
]
|
||||
if valid_snippets:
|
||||
formatted_knowledge = "\n".join(valid_snippets)
|
||||
task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}"
|
||||
if self._knowledge:
|
||||
agent_knowledge_snippets = self._knowledge.query([task.prompt()])
|
||||
if agent_knowledge_snippets:
|
||||
agent_knowledge_context = extract_knowledge_context(
|
||||
agent_knowledge_snippets
|
||||
)
|
||||
if agent_knowledge_context:
|
||||
task_prompt += agent_knowledge_context
|
||||
|
||||
if self.crew:
|
||||
knowledge_snippets = self.crew.query_knowledge([task.prompt()])
|
||||
if knowledge_snippets:
|
||||
crew_knowledge_context = extract_knowledge_context(knowledge_snippets)
|
||||
if crew_knowledge_context:
|
||||
task_prompt += crew_knowledge_context
|
||||
|
||||
tools = tools or self.tools or []
|
||||
self.create_agent_executor(tools=tools, task=task)
|
||||
|
||||
@@ -28,6 +28,7 @@ from crewai.memory.entity.entity_memory import EntityMemory
|
||||
from crewai.memory.long_term.long_term_memory import LongTermMemory
|
||||
from crewai.memory.short_term.short_term_memory import ShortTermMemory
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.memory.user.user_memory import UserMemory
|
||||
from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
@@ -202,10 +203,13 @@ class Crew(BaseModel):
|
||||
default=[],
|
||||
description="List of execution logs for tasks",
|
||||
)
|
||||
knowledge: Optional[Dict[str, Any]] = Field(
|
||||
default=None, description="Knowledge for the crew. Add knowledge sources to the knowledge object."
|
||||
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
|
||||
default=None,
|
||||
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
|
||||
)
|
||||
_knowledge: Optional[Knowledge] = PrivateAttr(
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
@field_validator("id", mode="before")
|
||||
@classmethod
|
||||
@@ -282,11 +286,22 @@ class Crew(BaseModel):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def create_crew_knowledge(self) -> "Crew":
|
||||
if self.knowledge:
|
||||
"""Create the knowledge for the crew."""
|
||||
if self.knowledge_sources:
|
||||
try:
|
||||
self.knowledge = Knowledge(**self.knowledge) if isinstance(self.knowledge, dict) else self.knowledge
|
||||
except (TypeError, ValueError) as e:
|
||||
raise ValueError(f"Invalid knowledge configuration: {str(e)}")
|
||||
if isinstance(self.knowledge_sources, list) and all(
|
||||
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
|
||||
):
|
||||
self._knowledge = Knowledge(
|
||||
sources=self.knowledge_sources,
|
||||
embedder_config=self.embedder,
|
||||
collection_name="crew",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self._logger.log(
|
||||
"warning", f"Failed to init knowledge: {e}", color="yellow"
|
||||
)
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -942,6 +957,11 @@ class Crew(BaseModel):
|
||||
result = self._execute_tasks(self.tasks, start_index, True)
|
||||
return result
|
||||
|
||||
def query_knowledge(self, query: List[str]) -> Union[List[Dict[str, Any]], None]:
|
||||
if self._knowledge:
|
||||
return self._knowledge.query(query)
|
||||
return None
|
||||
|
||||
def copy(self):
|
||||
"""Create a deep copy of the Crew."""
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.constants import DEFAULT_SCORE_THRESHOLD
|
||||
|
||||
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
|
||||
|
||||
|
||||
@@ -18,24 +18,33 @@ class Knowledge(BaseModel):
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
embedder_config: Optional[Dict[str, Any]] = None
|
||||
"""
|
||||
|
||||
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
embedder_config: Optional[Dict[str, Any]] = None
|
||||
collection_name: Optional[str] = None
|
||||
|
||||
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None, **data):
|
||||
def __init__(
|
||||
self,
|
||||
collection_name: str,
|
||||
sources: List[BaseKnowledgeSource],
|
||||
embedder_config: Optional[Dict[str, Any]] = None,
|
||||
storage: Optional[KnowledgeStorage] = None,
|
||||
**data,
|
||||
):
|
||||
super().__init__(**data)
|
||||
self.storage = KnowledgeStorage(embedder_config=embedder_config or None)
|
||||
|
||||
try:
|
||||
for source in self.sources:
|
||||
source.add()
|
||||
except Exception as e:
|
||||
Logger(verbose=True).log(
|
||||
"warning",
|
||||
f"Failed to init knowledge: {e}",
|
||||
color="yellow",
|
||||
if storage:
|
||||
self.storage = storage
|
||||
else:
|
||||
self.storage = KnowledgeStorage(
|
||||
embedder_config=embedder_config, collection_name=collection_name
|
||||
)
|
||||
self.sources = sources
|
||||
self.storage.initialize_knowledge_storage()
|
||||
for source in sources:
|
||||
source.storage = self.storage
|
||||
source.add()
|
||||
|
||||
def query(
|
||||
self, query: List[str], limit: int = 3, preference: Optional[str] = None
|
||||
@@ -52,3 +61,8 @@ class Knowledge(BaseModel):
|
||||
score_threshold=DEFAULT_SCORE_THRESHOLD,
|
||||
)
|
||||
return results
|
||||
|
||||
def _add_sources(self):
|
||||
for source in self.sources:
|
||||
source.storage = self.storage
|
||||
source.add()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Dict, Any
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
import numpy as np
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
@@ -18,6 +18,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict)
|
||||
collection_name: Optional[str] = Field(default=None)
|
||||
|
||||
@abstractmethod
|
||||
def load_content(self) -> Dict[Any, str]:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import List
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
@@ -9,6 +9,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
|
||||
"""A knowledge source that stores and queries plain text content using embeddings."""
|
||||
|
||||
content: str = Field(...)
|
||||
collection_name: Optional[str] = Field(default=None)
|
||||
|
||||
def model_post_init(self, _):
|
||||
"""Post-initialization method to validate content."""
|
||||
|
||||
@@ -3,12 +3,16 @@ import io
|
||||
import logging
|
||||
import chromadb
|
||||
import os
|
||||
|
||||
import chromadb.errors
|
||||
from crewai.utilities.paths import db_storage_path
|
||||
from typing import Optional, List
|
||||
from typing import Dict, Any
|
||||
from typing import Optional, List, Dict, Any, Union
|
||||
from crewai.utilities import EmbeddingConfigurator
|
||||
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
|
||||
import hashlib
|
||||
from chromadb.config import Settings
|
||||
from chromadb.api import ClientAPI
|
||||
from crewai.utilities.logger import Logger
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
@@ -35,9 +39,16 @@ class KnowledgeStorage(BaseKnowledgeStorage):
|
||||
"""
|
||||
|
||||
collection: Optional[chromadb.Collection] = None
|
||||
collection_name: Optional[str] = "knowledge"
|
||||
app: Optional[ClientAPI] = None
|
||||
|
||||
def __init__(self, embedder_config: Optional[Dict[str, Any]] = None):
|
||||
self._initialize_app(embedder_config or {})
|
||||
def __init__(
|
||||
self,
|
||||
embedder_config: Optional[Dict[str, Any]] = None,
|
||||
collection_name: Optional[str] = None,
|
||||
):
|
||||
self.collection_name = collection_name
|
||||
self._set_embedder_config(embedder_config)
|
||||
|
||||
def search(
|
||||
self,
|
||||
@@ -67,43 +78,75 @@ class KnowledgeStorage(BaseKnowledgeStorage):
|
||||
else:
|
||||
raise Exception("Collection not initialized")
|
||||
|
||||
def _initialize_app(self, embedder_config: Optional[Dict[str, Any]] = None):
|
||||
import chromadb
|
||||
from chromadb.config import Settings
|
||||
|
||||
self._set_embedder_config(embedder_config)
|
||||
|
||||
def initialize_knowledge_storage(self):
|
||||
base_path = os.path.join(db_storage_path(), "knowledge")
|
||||
chroma_client = chromadb.PersistentClient(
|
||||
path=f"{db_storage_path()}/knowledge",
|
||||
path=base_path,
|
||||
settings=Settings(allow_reset=True),
|
||||
)
|
||||
|
||||
self.app = chroma_client
|
||||
|
||||
try:
|
||||
self.collection = self.app.get_or_create_collection(name="knowledge")
|
||||
collection_name = (
|
||||
f"knowledge_{self.collection_name}"
|
||||
if self.collection_name
|
||||
else "knowledge"
|
||||
)
|
||||
if self.app:
|
||||
self.collection = self.app.get_or_create_collection(
|
||||
name=collection_name, embedding_function=self.embedder_config
|
||||
)
|
||||
else:
|
||||
raise Exception("Vector Database Client not initialized")
|
||||
except Exception:
|
||||
raise Exception("Failed to create or get collection")
|
||||
|
||||
def reset(self):
|
||||
if self.app:
|
||||
self.app.reset()
|
||||
else:
|
||||
base_path = os.path.join(db_storage_path(), "knowledge")
|
||||
self.app = chromadb.PersistentClient(
|
||||
path=base_path,
|
||||
settings=Settings(allow_reset=True),
|
||||
)
|
||||
self.app.reset()
|
||||
|
||||
def save(
|
||||
self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]]
|
||||
self,
|
||||
documents: List[str],
|
||||
metadata: Union[Dict[str, Any], List[Dict[str, Any]]],
|
||||
):
|
||||
if self.collection:
|
||||
metadatas = [metadata] if isinstance(metadata, dict) else metadata
|
||||
try:
|
||||
metadatas = [metadata] if isinstance(metadata, dict) else metadata
|
||||
|
||||
ids = [
|
||||
hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents
|
||||
]
|
||||
ids = [
|
||||
hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents
|
||||
]
|
||||
|
||||
self.collection.upsert(
|
||||
documents=documents,
|
||||
metadatas=metadatas,
|
||||
ids=ids,
|
||||
)
|
||||
self.collection.upsert(
|
||||
documents=documents,
|
||||
metadatas=metadatas,
|
||||
ids=ids,
|
||||
)
|
||||
except chromadb.errors.InvalidDimensionException as e:
|
||||
Logger(verbose=True).log(
|
||||
"error",
|
||||
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
|
||||
"red",
|
||||
)
|
||||
raise ValueError(
|
||||
"Embedding dimension mismatch. Make sure you're using the same embedding model "
|
||||
"across all operations with this collection."
|
||||
"Try resetting the collection using `crewai reset-memories -a`"
|
||||
) from e
|
||||
except Exception as e:
|
||||
Logger(verbose=True).log(
|
||||
"error", f"Failed to upsert documents: {e}", "red"
|
||||
)
|
||||
raise
|
||||
else:
|
||||
raise Exception("Collection not initialized")
|
||||
|
||||
|
||||
12
src/crewai/knowledge/utils/knowledge_utils.py
Normal file
12
src/crewai/knowledge/utils/knowledge_utils.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from typing import Any, Dict, List
|
||||
|
||||
|
||||
def extract_knowledge_context(knowledge_snippets: List[Dict[str, Any]]) -> str:
|
||||
"""Extract knowledge from the task prompt."""
|
||||
valid_snippets = [
|
||||
result["context"]
|
||||
for result in knowledge_snippets
|
||||
if result and result.get("context")
|
||||
]
|
||||
snippet = "\n".join(valid_snippets)
|
||||
return f"Additional Information: {snippet}" if valid_snippets else ""
|
||||
Reference in New Issue
Block a user