diff --git a/src/crewai/agent.py b/src/crewai/agent.py index d17cbbdfe..4b163e05a 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,7 +1,7 @@ import os import shutil import subprocess -from typing import Any, List, Literal, Optional, Union +from typing import Any, List, Literal, Optional, Union, Dict from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -10,6 +10,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.cli.constants import ENV_VARS from crewai.llm import LLM +from crewai.knowledge.knowledge import Knowledge from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.tools import BaseTool from crewai.tools.agent_tools.agent_tools import AgentTools @@ -120,9 +121,14 @@ class Agent(BaseAgent): default="safe", description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).", ) + knowledge: Optional[Dict[str, Any]] = Field( + default=None, + description="Knowledge for the agent. Add knowledge sources to the knowledge object.", + ) @model_validator(mode="after") def post_init_setup(self): + self._set_knowledge() self.agent_ops_agent_name = self.role unnacepted_attributes = [ "AWS_ACCESS_KEY_ID", @@ -235,6 +241,28 @@ class Agent(BaseAgent): self.cache_handler = CacheHandler() self.set_cache_handler(self.cache_handler) + def _set_knowledge(self): + try: + if self.knowledge: + knowledge_agent_name = f"{self.role.replace(' ', '_')}" + if isinstance(self.knowledge, dict): + knowledge_data = self.knowledge.copy() + knowledge_data["store_dir"] = knowledge_agent_name + self.knowledge = Knowledge(**knowledge_data) + self.knowledge.storage.initialize_knowledge_storage() + try: + for source in self.knowledge.sources: + source.storage = self.knowledge.storage + source.add() + except Exception as e: + self._logger.log( + "warning", + f"Failed to init knowledge: {knowledge_agent_name} {e}", + color="yellow", + ) + except (TypeError, ValueError) as e: + raise ValueError(f"Invalid Knowledge Configuration: {str(e)}") + def execute_task( self, task: Any, @@ -273,17 +301,20 @@ class Agent(BaseAgent): if memory.strip() != "": task_prompt += self.i18n.slice("memory").format(memory=memory) - # Integrate the knowledge base + if self.knowledge and isinstance(self.knowledge, Knowledge): + agent_knowledge_snippets = self.knowledge.query([task.prompt()]) + agent_knowledge_context = self._extract_knowledge_context( + agent_knowledge_snippets + ) + if agent_knowledge_context: + task_prompt += agent_knowledge_context + if self.crew and self.crew.knowledge: knowledge_snippets = self.crew.knowledge.query([task.prompt()]) - valid_snippets = [ - result["context"] - for result in knowledge_snippets - if result and result.get("context") - ] - if valid_snippets: - formatted_knowledge = "\n".join(valid_snippets) - task_prompt += f"\n\nAdditional Information:\n{formatted_knowledge}" + + crew_knowledge_context = self._extract_knowledge_context(knowledge_snippets) + if crew_knowledge_context: + task_prompt += crew_knowledge_context tools = tools or self.tools or [] self.create_agent_executor(tools=tools, task=task) @@ -490,6 +521,18 @@ class Agent(BaseAgent): f"Docker is not running. Please start Docker to use code execution with agent: {self.role}" ) + def _extract_knowledge_context( + self, knowledge_snippets: List[Dict[str, Any]] + ) -> str: + """Extract knowledge from the task prompt.""" + valid_snippets = [ + result["context"] + for result in knowledge_snippets + if result and result.get("context") + ] + snippet = "\n".join(valid_snippets) + return f"Additional Information: {snippet}" if valid_snippets else "" + @staticmethod def __tools_names(tools) -> str: return ", ".join([t.name for t in tools]) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 4c3886a3f..c06bd611c 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -203,10 +203,10 @@ class Crew(BaseModel): description="List of execution logs for tasks", ) knowledge: Optional[Dict[str, Any]] = Field( - default=None, description="Knowledge for the crew. Add knowledge sources to the knowledge object." + default=None, + description="Knowledge for the crew. Add knowledge sources to the knowledge object.", ) - @field_validator("id", mode="before") @classmethod def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: @@ -282,11 +282,23 @@ class Crew(BaseModel): @model_validator(mode="after") def create_crew_knowledge(self) -> "Crew": + """Create the knowledge for the crew.""" if self.knowledge: try: - self.knowledge = Knowledge(**self.knowledge) if isinstance(self.knowledge, dict) else self.knowledge - except (TypeError, ValueError) as e: - raise ValueError(f"Invalid knowledge configuration: {str(e)}") + self.knowledge = ( + Knowledge(**self.knowledge, store_dir="crew") + if isinstance(self.knowledge, dict) + else self.knowledge + ) + self.knowledge.storage.initialize_knowledge_storage() + + for source in self.knowledge.sources: + source.storage = self.knowledge.storage + source.add() + except Exception as e: + self._logger.log( + "warning", f"Failed to init knowledge: {e}", color="yellow" + ) return self @model_validator(mode="after") diff --git a/src/crewai/knowledge/knowledge.py b/src/crewai/knowledge/knowledge.py index cf2907e67..2622144e7 100644 --- a/src/crewai/knowledge/knowledge.py +++ b/src/crewai/knowledge/knowledge.py @@ -7,6 +7,7 @@ from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage from crewai.utilities.logger import Logger from crewai.utilities.constants import DEFAULT_SCORE_THRESHOLD + os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed @@ -18,23 +19,26 @@ class Knowledge(BaseModel): storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) embedder_config: Optional[Dict[str, Any]] = None """ + sources: List[BaseKnowledgeSource] = Field(default_factory=list) model_config = ConfigDict(arbitrary_types_allowed=True) storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) embedder_config: Optional[Dict[str, Any]] = None + store_dir: Optional[str] = None - def __init__(self, embedder_config: Optional[Dict[str, Any]] = None, **data): + def __init__( + self, + store_dir: str, + embedder_config: Optional[Dict[str, Any]] = None, + storage: Optional[KnowledgeStorage] = None, + **data, + ): super().__init__(**data) - self.storage = KnowledgeStorage(embedder_config=embedder_config or None) - - try: - for source in self.sources: - source.add() - except Exception as e: - Logger(verbose=True).log( - "warning", - f"Failed to init knowledge: {e}", - color="yellow", + if storage: + self.storage = storage + else: + self.storage = KnowledgeStorage( + embedder_config=embedder_config, store_dir=store_dir ) def query( diff --git a/src/crewai/knowledge/source/base_knowledge_source.py b/src/crewai/knowledge/source/base_knowledge_source.py index bb4c69cf3..d62f08709 100644 --- a/src/crewai/knowledge/source/base_knowledge_source.py +++ b/src/crewai/knowledge/source/base_knowledge_source.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional import numpy as np from pydantic import BaseModel, ConfigDict, Field @@ -18,6 +18,7 @@ class BaseKnowledgeSource(BaseModel, ABC): model_config = ConfigDict(arbitrary_types_allowed=True) storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage) metadata: Dict[str, Any] = Field(default_factory=dict) + store_dir: Optional[str] = Field(default=None) @abstractmethod def load_content(self) -> Dict[Any, str]: diff --git a/src/crewai/knowledge/source/string_knowledge_source.py b/src/crewai/knowledge/source/string_knowledge_source.py index d4c22e3c1..7e33d7fe4 100644 --- a/src/crewai/knowledge/source/string_knowledge_source.py +++ b/src/crewai/knowledge/source/string_knowledge_source.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Optional from pydantic import Field @@ -9,6 +9,7 @@ class StringKnowledgeSource(BaseKnowledgeSource): """A knowledge source that stores and queries plain text content using embeddings.""" content: str = Field(...) + store_dir: Optional[str] = Field(default=None) def model_post_init(self, _): """Post-initialization method to validate content.""" diff --git a/src/crewai/knowledge/storage/knowledge_storage.py b/src/crewai/knowledge/storage/knowledge_storage.py index b3d5ba750..e23ddd0f2 100644 --- a/src/crewai/knowledge/storage/knowledge_storage.py +++ b/src/crewai/knowledge/storage/knowledge_storage.py @@ -4,11 +4,11 @@ import logging import chromadb import os from crewai.utilities.paths import db_storage_path -from typing import Optional, List -from typing import Dict, Any +from typing import Optional, List, Dict, Any from crewai.utilities import EmbeddingConfigurator from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage import hashlib +from chromadb.config import Settings @contextlib.contextmanager @@ -35,9 +35,16 @@ class KnowledgeStorage(BaseKnowledgeStorage): """ collection: Optional[chromadb.Collection] = None + store_dir: Optional[str] = "knowledge" + app: Optional[chromadb.PersistentClient] = None - def __init__(self, embedder_config: Optional[Dict[str, Any]] = None): - self._initialize_app(embedder_config or {}) + def __init__( + self, + embedder_config: Optional[Dict[str, Any]] = None, + store_dir: Optional[str] = None, + ): + self.embedder_config = embedder_config + self.store_dir = store_dir def search( self, @@ -67,27 +74,33 @@ class KnowledgeStorage(BaseKnowledgeStorage): else: raise Exception("Collection not initialized") - def _initialize_app(self, embedder_config: Optional[Dict[str, Any]] = None): - import chromadb - from chromadb.config import Settings - - self._set_embedder_config(embedder_config) - + def initialize_knowledge_storage(self): + base_path = os.path.join(db_storage_path(), "knowledge") chroma_client = chromadb.PersistentClient( - path=f"{db_storage_path()}/knowledge", + path=base_path, settings=Settings(allow_reset=True), ) self.app = chroma_client try: - self.collection = self.app.get_or_create_collection(name="knowledge") + collection_name = ( + f"knowledge_{self.store_dir}" if self.store_dir else "knowledge" + ) + self.collection = self.app.get_or_create_collection(name=collection_name) except Exception: raise Exception("Failed to create or get collection") def reset(self): if self.app: self.app.reset() + else: + base_path = os.path.join(db_storage_path(), "knowledge") + self.app = chromadb.PersistentClient( + path=base_path, + settings=Settings(allow_reset=True), + ) + self.app.reset() def save( self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]] @@ -95,9 +108,7 @@ class KnowledgeStorage(BaseKnowledgeStorage): if self.collection: metadatas = [metadata] if isinstance(metadata, dict) else metadata - ids = [ - hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents - ] + ids = [hashlib.sha256(doc.encode("utf-8")).hexdigest() for doc in documents] self.collection.upsert( documents=documents,