diff --git a/manual_test_csv_update.py b/manual_test_csv_update.py new file mode 100644 index 000000000..801cc163b --- /dev/null +++ b/manual_test_csv_update.py @@ -0,0 +1,92 @@ +""" +Manual test script to verify CSV knowledge source update functionality. +This script creates a CSV file, loads it as a knowledge source, updates the file, +and verifies that the updated content is detected and loaded. +""" + +import os +import time +from pathlib import Path +import tempfile +import sys + +from crewai.knowledge.knowledge import Knowledge +from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource + + +def test_csv_knowledge_source_updates(): + """Test that CSVKnowledgeSource properly detects and loads updates to CSV files.""" + with tempfile.TemporaryDirectory() as tmpdir: + csv_path = Path(tmpdir) / "test_updates.csv" + + initial_csv_content = [ + ["name", "age", "city"], + ["John", "30", "New York"], + ["Alice", "25", "San Francisco"], + ["Bob", "28", "Chicago"], + ] + + with open(csv_path, "w") as f: + for row in initial_csv_content: + f.write(",".join(row) + "\n") + + print(f"Created CSV file at {csv_path}") + + csv_source = CSVKnowledgeSource(file_paths=[csv_path]) + + if not hasattr(csv_source, 'files_have_changed'): + print("❌ TEST FAILED: files_have_changed method not found in CSVKnowledgeSource") + return False + + if not hasattr(csv_source, '_file_mtimes'): + print("❌ TEST FAILED: _file_mtimes attribute not found in CSVKnowledgeSource") + return False + + knowledge = Knowledge(sources=[csv_source], collection_name="test_updates") + + if not hasattr(knowledge, '_check_and_reload_sources'): + print("❌ TEST FAILED: _check_and_reload_sources method not found in Knowledge") + return False + + print("✅ All required methods and attributes exist") + + updated_csv_content = [ + ["name", "age", "city"], + ["John", "30", "Boston"], # Changed city + ["Alice", "25", "San Francisco"], + ["Bob", "28", "Chicago"], + ["Eve", "22", "Miami"], # Added new person + ] + + print("\nWaiting for 1 second before updating file...") + time.sleep(1) + + with open(csv_path, "w") as f: + for row in updated_csv_content: + f.write(",".join(row) + "\n") + + print(f"Updated CSV file at {csv_path}") + + if not csv_source.files_have_changed(): + print("❌ TEST FAILED: files_have_changed did not detect file modification") + return False + + print("✅ files_have_changed correctly detected file modification") + + csv_source._record_file_mtimes() + csv_source.content = csv_source.load_content() + + content_str = str(csv_source.content) + if "Boston" in content_str and "Eve" in content_str and "Miami" in content_str: + print("✅ Content was correctly updated with new data") + else: + print("❌ TEST FAILED: Content was not updated with new data") + return False + + print("\n✅ TEST PASSED: CSV knowledge source correctly detects and loads file updates") + return True + + +if __name__ == "__main__": + success = test_csv_knowledge_source_updates() + sys.exit(0 if success else 1) diff --git a/src/crewai/knowledge/knowledge.py b/src/crewai/knowledge/knowledge.py index 2340dec90..b5435883e 100644 --- a/src/crewai/knowledge/knowledge.py +++ b/src/crewai/knowledge/knowledge.py @@ -54,6 +54,8 @@ class Knowledge(BaseModel): """ if self.storage is None: raise ValueError("Storage is not initialized.") + + self._check_and_reload_sources() results = self.storage.search( query, @@ -61,6 +63,14 @@ class Knowledge(BaseModel): score_threshold=score_threshold, ) return results + + def _check_and_reload_sources(self): + """Check if any sources have changed and reload them if necessary.""" + for source in self.sources: + if hasattr(source, 'files_have_changed') and source.files_have_changed(): + source._record_file_mtimes() # Update timestamps + source.content = source.load_content() + source.add() # Reload and update storage def add_sources(self): try: diff --git a/src/crewai/knowledge/source/base_file_knowledge_source.py b/src/crewai/knowledge/source/base_file_knowledge_source.py index 4c4b9b337..3618952db 100644 --- a/src/crewai/knowledge/source/base_file_knowledge_source.py +++ b/src/crewai/knowledge/source/base_file_knowledge_source.py @@ -43,7 +43,15 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC): """Post-initialization method to load content.""" self.safe_file_paths = self._process_file_paths() self.validate_content() + self._record_file_mtimes() self.content = self.load_content() + + def _record_file_mtimes(self): + """Record modification times of all files.""" + self._file_mtimes = {} + for path in self.safe_file_paths: + if path.exists() and path.is_file(): + self._file_mtimes[path] = path.stat().st_mtime @abstractmethod def load_content(self) -> Dict[Path, str]: @@ -107,3 +115,14 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC): ) return [self.convert_to_path(path) for path in path_list] + + def files_have_changed(self) -> bool: + """Check if any of the files have been modified since they were last loaded.""" + for path in self.safe_file_paths: + if not path.exists() or not path.is_file(): + continue + current_mtime = path.stat().st_mtime + if path not in self._file_mtimes or current_mtime > self._file_mtimes[path]: + self._logger.log("info", f"File {path} has been modified. Reloading data.") + return True + return False diff --git a/tests/knowledge/test_csv_knowledge_source_updates.py b/tests/knowledge/test_csv_knowledge_source_updates.py new file mode 100644 index 000000000..b5bad859b --- /dev/null +++ b/tests/knowledge/test_csv_knowledge_source_updates.py @@ -0,0 +1,84 @@ +import os +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.knowledge.knowledge import Knowledge +from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource +from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage + + +@patch('crewai.knowledge.storage.knowledge_storage.KnowledgeStorage.search') +@patch('crewai.knowledge.source.csv_knowledge_source.CSVKnowledgeSource.add') +def test_csv_knowledge_source_updates(mock_add, mock_search, tmpdir): + """Test that CSVKnowledgeSource properly detects and loads updates to CSV files.""" + mock_search.side_effect = [ + [{"context": "name,age,city\nJohn,30,New York\nAlice,25,San Francisco\nBob,28,Chicago"}], + [{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}], + [{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}] + ] + + csv_path = tmpdir / "test_updates.csv" + + initial_csv_content = [ + ["name", "age", "city"], + ["John", "30", "New York"], + ["Alice", "25", "San Francisco"], + ["Bob", "28", "Chicago"], + ] + + with open(csv_path, "w") as f: + for row in initial_csv_content: + f.write(",".join(row) + "\n") + + csv_source = CSVKnowledgeSource(file_paths=[csv_path]) + + original_files_have_changed = csv_source.files_have_changed + files_changed_called = [False] + + def spy_files_have_changed(): + files_changed_called[0] = True + return original_files_have_changed() + + csv_source.files_have_changed = spy_files_have_changed + + knowledge = Knowledge(sources=[csv_source], collection_name="test_updates") + + assert hasattr(knowledge, '_check_and_reload_sources'), "Knowledge class is missing _check_and_reload_sources method" + + initial_results = knowledge.query(["John"]) + assert any("John" in result["context"] for result in initial_results) + assert any("New York" in result["context"] for result in initial_results) + + mock_add.reset_mock() + files_changed_called[0] = False + + updated_csv_content = [ + ["name", "age", "city"], + ["John", "30", "Boston"], # Changed city + ["Alice", "25", "San Francisco"], + ["Bob", "28", "Chicago"], + ["Eve", "22", "Miami"], # Added new person + ] + + time.sleep(1) + + with open(csv_path, "w") as f: + for row in updated_csv_content: + f.write(",".join(row) + "\n") + + updated_results = knowledge.query(["John"]) + + assert files_changed_called[0], "files_have_changed method was not called during query" + + assert mock_add.called, "add method was not called to reload the data" + + assert any("John" in result["context"] for result in updated_results) + assert any("Boston" in result["context"] for result in updated_results) + assert not any("New York" in result["context"] for result in updated_results) + + new_results = knowledge.query(["Eve"]) + assert any("Eve" in result["context"] for result in new_results) + assert any("Miami" in result["context"] for result in new_results)