Files
crewAI/src/crewai/knowledge/knowledge.py
Devin AI 4f72839fea Fix #5805: Add metadata_filter support to Knowledge querying pipeline
- Add KnowledgeConfig class with results_limit, score_threshold, metadata_filter
- Add knowledge_config field to Agent
- Update Knowledge.query() to forward filter and score_threshold to storage
- Update Crew.query_knowledge() to accept and forward filter params
- Fix BaseKnowledgeSource._save_documents() to pass self.metadata to storage
- Wire Agent.execute_task() to use knowledge_config for both agent and crew queries
- Add 10 tests covering all changes

Co-Authored-By: João <joao@crewai.com>
2026-05-14 08:16:13 +00:00

72 lines
2.3 KiB
Python

import os
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
class Knowledge(BaseModel):
"""
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
def __init__(
self,
collection_name: str,
sources: List[BaseKnowledgeSource],
embedder_config: Optional[Dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
super().__init__(**data)
if storage:
self.storage = storage
else:
self.storage = KnowledgeStorage(
embedder_config=embedder_config, collection_name=collection_name
)
self.sources = sources
self.storage.initialize_knowledge_storage()
for source in sources:
source.storage = self.storage
source.add()
def query(
self,
query: List[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
"""
results = self.storage.search(
query,
limit,
filter=filter,
score_threshold=score_threshold,
)
return results
def _add_sources(self):
for source in self.sources:
source.storage = self.storage
source.add()