Files
crewAI/lib/crewai/src/crewai/knowledge/source/string_knowledge_source.py
Greyson LaLonde d898d7c02c feat: async knowledge support (#4023)
* feat: add async support for tools, add async tool tests

* chore: improve tool decorator typing

* fix: ensure _run backward compat

* chore: update docs

* chore: make docstrings a little more readable

* feat: add async execution support to agent executor

* chore: add tests

* feat: add aiosqlite dep; regenerate lockfile

* feat: add async ops to memory feat; create tests

* feat: async knowledge support; add tests

* chore: regenerate lockfile
2025-12-04 10:27:52 -08:00

41 lines
1.4 KiB
Python

from typing import Any
from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
class StringKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries plain text content using embeddings."""
content: str = Field(...)
collection_name: str | None = Field(default=None)
def model_post_init(self, _: Any) -> None:
"""Post-initialization method to validate content."""
self.validate_content()
def validate_content(self) -> None:
"""Validate string content."""
if not isinstance(self.content, str):
raise ValueError("StringKnowledgeSource only accepts string content")
def add(self) -> None:
"""Add string content to the knowledge source, chunk it, compute embeddings, and save them."""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
self._save_documents()
async def aadd(self) -> None:
"""Add string content asynchronously."""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
await self._asave_documents()
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]