Files
crewAI/src/crewai/knowledge/knowledge.py
Greyson LaLonde 2ba48dd82a fix: add type annotations and exclude tests from mypy
- Add type: ignore for mem0 import
- Fix tool_usage.py cache_function None check
- Change _execute_without_timeout return type to Any
- Add type annotations to multiple functions:
  - add_sources() -> None
  - log() with proper parameter types
  - stop_rpm_counter() -> None
  - EventListener.__new__() -> Self
  - setup_listeners() -> None
  - Memory class __init__ methods -> None
  - TaskEvaluator.__init__() -> None
  - get_skipped_task_output() -> TaskOutput
- Exclude tests directory from mypy checks in pyproject.toml
- Update deprecated typing imports to use built-in types
2025-09-04 11:11:59 -04:00

78 lines
2.4 KiB
Python

import os
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
class Knowledge(BaseModel):
"""
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[Dict[str, Any]] = None
"""
sources: list[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[dict[str, Any]] = None
collection_name: Optional[str] = None
def __init__(
self,
collection_name: str,
sources: list[BaseKnowledgeSource],
embedder: Optional[dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
super().__init__(**data)
if storage:
self.storage = storage
else:
self.storage = KnowledgeStorage(
embedder=embedder, collection_name=collection_name
)
self.sources = sources
self.storage.initialize_knowledge_storage()
def query(
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> list[dict[str, Any]]:
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
Raises:
ValueError: If storage is not initialized.
"""
if self.storage is None:
raise ValueError("Storage is not initialized.")
results = self.storage.search(
query,
limit=results_limit,
score_threshold=score_threshold,
)
return results
def add_sources(self) -> None:
try:
for source in self.sources:
source.storage = self.storage
source.add()
except Exception as e:
raise e
def reset(self) -> None:
if self.storage:
self.storage.reset()
else:
raise ValueError("Storage is not initialized.")