Fix #2762: Make CSV knowledge sources detect and load file updates

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-05-06 00:04:39 +00:00
parent dabf02a90d
commit 14579a7861
4 changed files with 205 additions and 0 deletions

92
manual_test_csv_update.py Normal file
View File

@@ -0,0 +1,92 @@
"""
Manual test script to verify CSV knowledge source update functionality.
This script creates a CSV file, loads it as a knowledge source, updates the file,
and verifies that the updated content is detected and loaded.
"""
import os
import time
from pathlib import Path
import tempfile
import sys
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
def test_csv_knowledge_source_updates():
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
with tempfile.TemporaryDirectory() as tmpdir:
csv_path = Path(tmpdir) / "test_updates.csv"
initial_csv_content = [
["name", "age", "city"],
["John", "30", "New York"],
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
]
with open(csv_path, "w") as f:
for row in initial_csv_content:
f.write(",".join(row) + "\n")
print(f"Created CSV file at {csv_path}")
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
if not hasattr(csv_source, 'files_have_changed'):
print("❌ TEST FAILED: files_have_changed method not found in CSVKnowledgeSource")
return False
if not hasattr(csv_source, '_file_mtimes'):
print("❌ TEST FAILED: _file_mtimes attribute not found in CSVKnowledgeSource")
return False
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
if not hasattr(knowledge, '_check_and_reload_sources'):
print("❌ TEST FAILED: _check_and_reload_sources method not found in Knowledge")
return False
print("✅ All required methods and attributes exist")
updated_csv_content = [
["name", "age", "city"],
["John", "30", "Boston"], # Changed city
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
["Eve", "22", "Miami"], # Added new person
]
print("\nWaiting for 1 second before updating file...")
time.sleep(1)
with open(csv_path, "w") as f:
for row in updated_csv_content:
f.write(",".join(row) + "\n")
print(f"Updated CSV file at {csv_path}")
if not csv_source.files_have_changed():
print("❌ TEST FAILED: files_have_changed did not detect file modification")
return False
print("✅ files_have_changed correctly detected file modification")
csv_source._record_file_mtimes()
csv_source.content = csv_source.load_content()
content_str = str(csv_source.content)
if "Boston" in content_str and "Eve" in content_str and "Miami" in content_str:
print("✅ Content was correctly updated with new data")
else:
print("❌ TEST FAILED: Content was not updated with new data")
return False
print("\n✅ TEST PASSED: CSV knowledge source correctly detects and loads file updates")
return True
if __name__ == "__main__":
success = test_csv_knowledge_source_updates()
sys.exit(0 if success else 1)

View File

@@ -54,6 +54,8 @@ class Knowledge(BaseModel):
"""
if self.storage is None:
raise ValueError("Storage is not initialized.")
self._check_and_reload_sources()
results = self.storage.search(
query,
@@ -61,6 +63,14 @@ class Knowledge(BaseModel):
score_threshold=score_threshold,
)
return results
def _check_and_reload_sources(self):
"""Check if any sources have changed and reload them if necessary."""
for source in self.sources:
if hasattr(source, 'files_have_changed') and source.files_have_changed():
source._record_file_mtimes() # Update timestamps
source.content = source.load_content()
source.add() # Reload and update storage
def add_sources(self):
try:

View File

@@ -43,7 +43,15 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Post-initialization method to load content."""
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self._record_file_mtimes()
self.content = self.load_content()
def _record_file_mtimes(self):
"""Record modification times of all files."""
self._file_mtimes = {}
for path in self.safe_file_paths:
if path.exists() and path.is_file():
self._file_mtimes[path] = path.stat().st_mtime
@abstractmethod
def load_content(self) -> Dict[Path, str]:
@@ -107,3 +115,14 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
)
return [self.convert_to_path(path) for path in path_list]
def files_have_changed(self) -> bool:
"""Check if any of the files have been modified since they were last loaded."""
for path in self.safe_file_paths:
if not path.exists() or not path.is_file():
continue
current_mtime = path.stat().st_mtime
if path not in self._file_mtimes or current_mtime > self._file_mtimes[path]:
self._logger.log("info", f"File {path} has been modified. Reloading data.")
return True
return False

View File

@@ -0,0 +1,84 @@
import os
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
@patch('crewai.knowledge.storage.knowledge_storage.KnowledgeStorage.search')
@patch('crewai.knowledge.source.csv_knowledge_source.CSVKnowledgeSource.add')
def test_csv_knowledge_source_updates(mock_add, mock_search, tmpdir):
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
mock_search.side_effect = [
[{"context": "name,age,city\nJohn,30,New York\nAlice,25,San Francisco\nBob,28,Chicago"}],
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}],
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}]
]
csv_path = tmpdir / "test_updates.csv"
initial_csv_content = [
["name", "age", "city"],
["John", "30", "New York"],
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
]
with open(csv_path, "w") as f:
for row in initial_csv_content:
f.write(",".join(row) + "\n")
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
original_files_have_changed = csv_source.files_have_changed
files_changed_called = [False]
def spy_files_have_changed():
files_changed_called[0] = True
return original_files_have_changed()
csv_source.files_have_changed = spy_files_have_changed
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
assert hasattr(knowledge, '_check_and_reload_sources'), "Knowledge class is missing _check_and_reload_sources method"
initial_results = knowledge.query(["John"])
assert any("John" in result["context"] for result in initial_results)
assert any("New York" in result["context"] for result in initial_results)
mock_add.reset_mock()
files_changed_called[0] = False
updated_csv_content = [
["name", "age", "city"],
["John", "30", "Boston"], # Changed city
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
["Eve", "22", "Miami"], # Added new person
]
time.sleep(1)
with open(csv_path, "w") as f:
for row in updated_csv_content:
f.write(",".join(row) + "\n")
updated_results = knowledge.query(["John"])
assert files_changed_called[0], "files_have_changed method was not called during query"
assert mock_add.called, "add method was not called to reload the data"
assert any("John" in result["context"] for result in updated_results)
assert any("Boston" in result["context"] for result in updated_results)
assert not any("New York" in result["context"] for result in updated_results)
new_results = knowledge.query(["Eve"])
assert any("Eve" in result["context"] for result in new_results)
assert any("Miami" in result["context"] for result in new_results)