Add metadata support to Knowledge Source classes

- Implement _coerce_to_records function to handle both string and dict formats
- Update KnowledgeStorage.save() to accept list[str] | list[dict[str, Any]]
- Add metadata (filepath, chunk_index, source_type) to all Knowledge Source classes:
  - TextFileKnowledgeSource
  - PDFKnowledgeSource
  - CSVKnowledgeSource
  - JSONKnowledgeSource
  - ExcelKnowledgeSource (includes sheet_name for multi-sheet files)
  - StringKnowledgeSource
  - CrewDoclingSource
- Add comprehensive tests for metadata functionality
- Maintain backward compatibility with existing string-based chunks

Fixes #3812

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-10-30 09:04:35 +00:00
parent 2e9eb8c32d
commit 82bb304147
10 changed files with 4626 additions and 4058 deletions

View File

@@ -94,9 +94,19 @@ class CrewDoclingSource(BaseKnowledgeSource):
def add(self) -> None:
if self.content is None:
return
for doc in self.content:
new_chunks_iterable = self._chunk_doc(doc)
self.chunks.extend(list(new_chunks_iterable))
for doc_index, doc in enumerate(self.content):
filepath = self.safe_file_paths[doc_index] if doc_index < len(self.safe_file_paths) else "unknown"
chunk_index = 0
for chunk_text in self._chunk_doc(doc):
self.chunks.append({
"content": chunk_text,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "docling",
}
})
chunk_index += 1
self._save_documents()
def _convert_source_to_docling_documents(self) -> list[DoclingDocument]:

View File

@@ -21,14 +21,20 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
def add(self) -> None:
"""
Add CSV file content to the knowledge source, chunk it, compute embeddings,
Add CSV file content to the knowledge source, chunk it with metadata,
and save the embeddings.
"""
content_str = (
str(self.content) if isinstance(self.content, dict) else self.content
)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)
for filepath, text in self.content.items():
text_chunks = self._chunk_text(text)
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "csv",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -142,21 +142,34 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
def add(self) -> None:
"""
Add Excel file content to the knowledge source, chunk it, compute embeddings,
Add Excel file content to the knowledge source, chunk it with metadata,
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
for filepath, sheets in self.content.items():
if isinstance(sheets, dict):
for sheet_name, sheet_content in sheets.items():
text_chunks = self._chunk_text(str(sheet_content))
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"sheet_name": sheet_name,
"chunk_index": chunk_index,
"source_type": "excel",
}
})
else:
content_str += str(value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)
text_chunks = self._chunk_text(str(sheets))
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "excel",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -34,14 +34,20 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
def add(self) -> None:
"""
Add JSON file content to the knowledge source, chunk it, compute embeddings,
Add JSON file content to the knowledge source, chunk it with metadata,
and save the embeddings.
"""
content_str = (
str(self.content) if isinstance(self.content, dict) else self.content
)
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)
for filepath, text in self.content.items():
text_chunks = self._chunk_text(text)
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "json",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -36,12 +36,20 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
def add(self) -> None:
"""
Add PDF file content to the knowledge source, chunk it, compute embeddings,
Add PDF file content to the knowledge source, chunk it with metadata,
and save the embeddings.
"""
for text in self.content.values():
new_chunks = self._chunk_text(text)
self.chunks.extend(new_chunks)
for filepath, text in self.content.items():
text_chunks = self._chunk_text(text)
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "pdf",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -19,9 +19,16 @@ class StringKnowledgeSource(BaseKnowledgeSource):
raise ValueError("StringKnowledgeSource only accepts string content")
def add(self) -> None:
"""Add string content to the knowledge source, chunk it, compute embeddings, and save them."""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
"""Add string content to the knowledge source, chunk it with metadata, and save them."""
text_chunks = self._chunk_text(self.content)
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"chunk_index": chunk_index,
"source_type": "string",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -17,12 +17,20 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource):
def add(self) -> None:
"""
Add text file content to the knowledge source, chunk it, compute embeddings,
Add text file content to the knowledge source, chunk it with metadata,
and save the embeddings.
"""
for text in self.content.values():
new_chunks = self._chunk_text(text)
self.chunks.extend(new_chunks)
for filepath, text in self.content.items():
text_chunks = self._chunk_text(text)
for chunk_index, chunk in enumerate(text_chunks):
self.chunks.append({
"content": chunk,
"metadata": {
"filepath": str(filepath),
"chunk_index": chunk_index,
"source_type": "text_file",
}
})
self._save_documents()
def _chunk_text(self, text: str) -> list[str]:

View File

@@ -1,5 +1,6 @@
import logging
import traceback
from collections.abc import Mapping, Sequence
from typing import Any, cast
import warnings
@@ -16,6 +17,72 @@ from crewai.rag.types import BaseRecord, SearchResult
from crewai.utilities.logger import Logger
def _coerce_to_records(documents: Sequence[Any]) -> list[BaseRecord]:
"""Convert various document formats to BaseRecord format.
Supports:
- str: Simple string content
- dict: With 'content' key and optional 'metadata' and 'doc_id'
Args:
documents: Sequence of documents in various formats
Returns:
List of BaseRecord dictionaries with content and optional metadata
"""
records: list[BaseRecord] = []
for d in documents:
if isinstance(d, str):
records.append({"content": d})
elif isinstance(d, Mapping):
if "content" not in d:
continue
content = d["content"]
if content is None or (isinstance(content, str) and not content):
continue
content_str = str(content)
rec: BaseRecord = {"content": content_str}
if "metadata" in d:
metadata_raw = d["metadata"]
if isinstance(metadata_raw, Mapping):
sanitized_metadata: dict[str, str | int | float | bool] = {}
for k, v in metadata_raw.items():
if isinstance(v, (str, int, float, bool)):
sanitized_metadata[str(k)] = v
elif v is None:
sanitized_metadata[str(k)] = ""
else:
sanitized_metadata[str(k)] = str(v)
rec["metadata"] = sanitized_metadata
elif isinstance(metadata_raw, list):
sanitized_list: list[Mapping[str, str | int | float | bool]] = []
for item in metadata_raw:
if isinstance(item, Mapping):
sanitized_item: dict[str, str | int | float | bool] = {}
for k, v in item.items():
if isinstance(v, (str, int, float, bool)):
sanitized_item[str(k)] = v
elif v is None:
sanitized_item[str(k)] = ""
else:
sanitized_item[str(k)] = str(v)
sanitized_list.append(sanitized_item)
if sanitized_list:
rec["metadata"] = sanitized_list
if "doc_id" in d and isinstance(d["doc_id"], str):
rec["doc_id"] = d["doc_id"]
records.append(rec)
return records
class KnowledgeStorage(BaseKnowledgeStorage):
"""
Extends Storage to handle embeddings for memory entries, improving
@@ -98,7 +165,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
f"Error during knowledge reset: {e!s}\n{traceback.format_exc()}"
)
def save(self, documents: list[str]) -> None:
def save(self, documents: list[str] | list[dict[str, Any]]) -> None:
try:
client = self._get_client()
collection_name = (
@@ -108,7 +175,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
)
client.get_or_create_collection(collection_name=collection_name)
rag_documents: list[BaseRecord] = [{"content": doc} for doc in documents]
rag_documents: list[BaseRecord] = _coerce_to_records(documents)
client.add_documents(
collection_name=collection_name, documents=rag_documents

View File

@@ -0,0 +1,400 @@
"""Test Knowledge Source metadata functionality."""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import _coerce_to_records
class TestCoerceToRecords:
"""Test the _coerce_to_records function."""
def test_coerce_string_list(self):
"""Test coercing a list of strings."""
documents = ["chunk1", "chunk2", "chunk3"]
result = _coerce_to_records(documents)
assert len(result) == 3
assert result[0]["content"] == "chunk1"
assert result[1]["content"] == "chunk2"
assert result[2]["content"] == "chunk3"
assert "metadata" not in result[0]
def test_coerce_dict_with_metadata(self):
"""Test coercing dictionaries with metadata."""
documents = [
{
"content": "chunk1",
"metadata": {
"filepath": "/path/to/file.txt",
"chunk_index": 0,
"source_type": "text_file",
}
},
{
"content": "chunk2",
"metadata": {
"filepath": "/path/to/file.txt",
"chunk_index": 1,
"source_type": "text_file",
}
}
]
result = _coerce_to_records(documents)
assert len(result) == 2
assert result[0]["content"] == "chunk1"
assert result[0]["metadata"]["filepath"] == "/path/to/file.txt"
assert result[0]["metadata"]["chunk_index"] == 0
assert result[0]["metadata"]["source_type"] == "text_file"
assert result[1]["content"] == "chunk2"
assert result[1]["metadata"]["chunk_index"] == 1
def test_coerce_mixed_formats(self):
"""Test coercing mixed string and dict formats."""
documents = [
"plain string chunk",
{
"content": "dict chunk",
"metadata": {"source_type": "test"}
}
]
result = _coerce_to_records(documents)
assert len(result) == 2
assert result[0]["content"] == "plain string chunk"
assert "metadata" not in result[0]
assert result[1]["content"] == "dict chunk"
assert result[1]["metadata"]["source_type"] == "test"
def test_coerce_empty_content_skipped(self):
"""Test that empty content is skipped."""
documents = [
{"content": "valid chunk"},
{"content": None},
{"content": ""},
{"content": "another valid chunk"}
]
result = _coerce_to_records(documents)
assert len(result) == 2
assert result[0]["content"] == "valid chunk"
assert result[1]["content"] == "another valid chunk"
def test_coerce_missing_content_skipped(self):
"""Test that dicts without content key are skipped."""
documents = [
{"content": "valid chunk"},
{"metadata": {"some": "data"}},
{"content": "another valid chunk"}
]
result = _coerce_to_records(documents)
assert len(result) == 2
assert result[0]["content"] == "valid chunk"
assert result[1]["content"] == "another valid chunk"
def test_coerce_with_doc_id(self):
"""Test coercing documents with doc_id."""
documents = [
{
"content": "chunk with id",
"doc_id": "doc123",
"metadata": {"source_type": "test"}
}
]
result = _coerce_to_records(documents)
assert len(result) == 1
assert result[0]["content"] == "chunk with id"
assert result[0]["doc_id"] == "doc123"
assert result[0]["metadata"]["source_type"] == "test"
def test_coerce_metadata_type_conversion(self):
"""Test that metadata values are properly converted to allowed types."""
documents = [
{
"content": "test chunk",
"metadata": {
"string_val": "text",
"int_val": 42,
"float_val": 3.14,
"bool_val": True,
"none_val": None,
"other_val": {"nested": "dict"}
}
}
]
result = _coerce_to_records(documents)
assert len(result) == 1
metadata = result[0]["metadata"]
assert metadata["string_val"] == "text"
assert metadata["int_val"] == 42
assert metadata["float_val"] == 3.14
assert metadata["bool_val"] is True
assert metadata["none_val"] == ""
assert isinstance(metadata["other_val"], str)
class TestTextFileKnowledgeSourceMetadata:
"""Test TextFileKnowledgeSource metadata functionality."""
def test_text_file_chunks_have_metadata(self, tmpdir):
"""Test that text file chunks include metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
content = "This is a test file. " * 100
file_path = Path(tmpdir.join("test.txt"))
with open(file_path, "w") as f:
f.write(content)
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = TextFileKnowledgeSource(
file_paths=[file_path],
storage=KnowledgeStorage(),
chunk_size=100,
chunk_overlap=10
)
source.add()
assert len(source.chunks) > 0
for i, chunk in enumerate(source.chunks):
assert isinstance(chunk, dict)
assert "content" in chunk
assert "metadata" in chunk
assert chunk["metadata"]["filepath"] == str(file_path)
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "text_file"
mock_save.assert_called_once()
saved_chunks = mock_save.call_args[0][0]
assert len(saved_chunks) == len(source.chunks)
class TestPDFKnowledgeSourceMetadata:
"""Test PDFKnowledgeSource metadata functionality."""
@patch('crewai.knowledge.source.pdf_knowledge_source.PDFKnowledgeSource._import_pdfplumber')
def test_pdf_chunks_have_metadata(self, mock_import, tmpdir):
"""Test that PDF chunks include metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
mock_pdf = MagicMock()
mock_page = MagicMock()
mock_page.extract_text.return_value = "PDF content. " * 50
mock_pdf.pages = [mock_page]
mock_pdfplumber = MagicMock()
mock_pdfplumber.open.return_value.__enter__.return_value = mock_pdf
mock_import.return_value = mock_pdfplumber
file_path = Path(tmpdir.join("test.pdf"))
file_path.touch()
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = PDFKnowledgeSource(
file_paths=[file_path],
storage=KnowledgeStorage(),
chunk_size=100,
chunk_overlap=10
)
source.add()
assert len(source.chunks) > 0
for i, chunk in enumerate(source.chunks):
assert isinstance(chunk, dict)
assert "content" in chunk
assert "metadata" in chunk
assert chunk["metadata"]["filepath"] == str(file_path)
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "pdf"
class TestCSVKnowledgeSourceMetadata:
"""Test CSVKnowledgeSource metadata functionality."""
def test_csv_chunks_have_metadata(self, tmpdir):
"""Test that CSV chunks include metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
csv_content = "Name,Age,City\nJohn,30,NYC\nJane,25,LA\n" * 20
file_path = Path(tmpdir.join("test.csv"))
with open(file_path, "w") as f:
f.write(csv_content)
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = CSVKnowledgeSource(
file_paths=[file_path],
storage=KnowledgeStorage(),
chunk_size=100,
chunk_overlap=10
)
source.add()
assert len(source.chunks) > 0
for i, chunk in enumerate(source.chunks):
assert isinstance(chunk, dict)
assert "content" in chunk
assert "metadata" in chunk
assert chunk["metadata"]["filepath"] == str(file_path)
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "csv"
class TestJSONKnowledgeSourceMetadata:
"""Test JSONKnowledgeSource metadata functionality."""
def test_json_chunks_have_metadata(self, tmpdir):
"""Test that JSON chunks include metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
json_content = '{"users": [{"name": "John", "age": 30}, {"name": "Jane", "age": 25}]}'
file_path = Path(tmpdir.join("test.json"))
with open(file_path, "w") as f:
f.write(json_content)
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = JSONKnowledgeSource(
file_paths=[file_path],
storage=KnowledgeStorage(),
chunk_size=50,
chunk_overlap=5
)
source.add()
assert len(source.chunks) > 0
for i, chunk in enumerate(source.chunks):
assert isinstance(chunk, dict)
assert "content" in chunk
assert "metadata" in chunk
assert chunk["metadata"]["filepath"] == str(file_path)
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "json"
class TestStringKnowledgeSourceMetadata:
"""Test StringKnowledgeSource metadata functionality."""
def test_string_chunks_have_metadata(self):
"""Test that string chunks include metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
content = "This is a test string. " * 50
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = StringKnowledgeSource(
content=content,
storage=KnowledgeStorage(),
chunk_size=100,
chunk_overlap=10
)
source.add()
assert len(source.chunks) > 0
for i, chunk in enumerate(source.chunks):
assert isinstance(chunk, dict)
assert "content" in chunk
assert "metadata" in chunk
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "string"
assert "filepath" not in chunk["metadata"]
class TestMultipleFilesMetadata:
"""Test metadata for multiple files."""
def test_multiple_text_files_have_distinct_metadata(self, tmpdir):
"""Test that multiple files have distinct metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
file1 = Path(tmpdir.join("file1.txt"))
file2 = Path(tmpdir.join("file2.txt"))
with open(file1, "w") as f:
f.write("Content from file 1. " * 50)
with open(file2, "w") as f:
f.write("Content from file 2. " * 50)
with patch.object(KnowledgeStorage, 'save') as mock_save:
source = TextFileKnowledgeSource(
file_paths=[file1, file2],
storage=KnowledgeStorage(),
chunk_size=100,
chunk_overlap=10
)
source.add()
file1_chunks = [c for c in source.chunks if c["metadata"]["filepath"] == str(file1)]
file2_chunks = [c for c in source.chunks if c["metadata"]["filepath"] == str(file2)]
assert len(file1_chunks) > 0
assert len(file2_chunks) > 0
for i, chunk in enumerate(file1_chunks):
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "text_file"
for i, chunk in enumerate(file2_chunks):
assert chunk["metadata"]["chunk_index"] == i
assert chunk["metadata"]["source_type"] == "text_file"
class TestBackwardCompatibility:
"""Test backward compatibility with existing code."""
def test_storage_accepts_string_list(self):
"""Test that storage still accepts plain string lists."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
with patch('crewai.knowledge.storage.knowledge_storage.get_rag_client') as mock_client:
mock_client_instance = MagicMock()
mock_client.return_value = mock_client_instance
storage = KnowledgeStorage()
documents = ["chunk1", "chunk2", "chunk3"]
storage.save(documents)
mock_client_instance.add_documents.assert_called_once()
saved_docs = mock_client_instance.add_documents.call_args[1]["documents"]
assert len(saved_docs) == 3
assert all("content" in doc for doc in saved_docs)
def test_storage_accepts_dict_list(self):
"""Test that storage accepts dict lists with metadata."""
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
with patch('crewai.knowledge.storage.knowledge_storage.get_rag_client') as mock_client:
mock_client_instance = MagicMock()
mock_client.return_value = mock_client_instance
storage = KnowledgeStorage()
documents = [
{
"content": "chunk1",
"metadata": {"filepath": "/path/to/file.txt", "chunk_index": 0}
},
{
"content": "chunk2",
"metadata": {"filepath": "/path/to/file.txt", "chunk_index": 1}
}
]
storage.save(documents)
mock_client_instance.add_documents.assert_called_once()
saved_docs = mock_client_instance.add_documents.call_args[1]["documents"]
assert len(saved_docs) == 2
assert all("content" in doc for doc in saved_docs)
assert all("metadata" in doc for doc in saved_docs)

8079
uv.lock generated

File diff suppressed because it is too large Load Diff