Compare commits

...

8 Commits

Author SHA1 Message Date
Devin AI
79547fba25 Remove lock usage entirely to fix pickling issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:20:18 +00:00
Devin AI
171f8b63fd Replace RLock with threading.Lock to fix pickling issues
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:19:24 +00:00
Devin AI
72df165b07 Fix RLock pickling issue in BaseFileKnowledgeSource
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:15:45 +00:00
Devin AI
63eccf5e30 Improve error handling and documentation in Knowledge._check_and_reload_sources
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:14:25 +00:00
Devin AI
a98a44afb2 Fix test file path handling for CSVKnowledgeSource
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:12:12 +00:00
Devin AI
6e0f1fe38d Address code review comments: improve error handling, add thread safety, enhance documentation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:11:28 +00:00
Devin AI
c2bf2b3210 Fix import sorting in manual test script
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:06:18 +00:00
Devin AI
14579a7861 Fix #2762: Make CSV knowledge sources detect and load file updates
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-06 00:04:39 +00:00
4 changed files with 331 additions and 1 deletions

92
manual_test_csv_update.py Normal file
View File

@@ -0,0 +1,92 @@
"""
Manual test script to verify CSV knowledge source update functionality.
This script creates a CSV file, loads it as a knowledge source, updates the file,
and verifies that the updated content is detected and loaded.
"""
import os
import sys
import tempfile
import time
from pathlib import Path
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
def test_csv_knowledge_source_updates():
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
with tempfile.TemporaryDirectory() as tmpdir:
csv_path = Path(tmpdir) / "test_updates.csv"
initial_csv_content = [
["name", "age", "city"],
["John", "30", "New York"],
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
]
with open(csv_path, "w") as f:
for row in initial_csv_content:
f.write(",".join(row) + "\n")
print(f"Created CSV file at {csv_path}")
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
if not hasattr(csv_source, 'files_have_changed'):
print("❌ TEST FAILED: files_have_changed method not found in CSVKnowledgeSource")
return False
if not hasattr(csv_source, '_file_mtimes'):
print("❌ TEST FAILED: _file_mtimes attribute not found in CSVKnowledgeSource")
return False
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
if not hasattr(knowledge, '_check_and_reload_sources'):
print("❌ TEST FAILED: _check_and_reload_sources method not found in Knowledge")
return False
print("✅ All required methods and attributes exist")
updated_csv_content = [
["name", "age", "city"],
["John", "30", "Boston"], # Changed city
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
["Eve", "22", "Miami"], # Added new person
]
print("\nWaiting for 1 second before updating file...")
time.sleep(1)
with open(csv_path, "w") as f:
for row in updated_csv_content:
f.write(",".join(row) + "\n")
print(f"Updated CSV file at {csv_path}")
if not csv_source.files_have_changed():
print("❌ TEST FAILED: files_have_changed did not detect file modification")
return False
print("✅ files_have_changed correctly detected file modification")
csv_source._record_file_mtimes()
csv_source.content = csv_source.load_content()
content_str = str(csv_source.content)
if "Boston" in content_str and "Eve" in content_str and "Miami" in content_str:
print("✅ Content was correctly updated with new data")
else:
print("❌ TEST FAILED: Content was not updated with new data")
return False
print("\n✅ TEST PASSED: CSV knowledge source correctly detects and loads file updates")
return True
if __name__ == "__main__":
success = test_csv_knowledge_source_updates()
sys.exit(0 if success else 1)

View File

@@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict, Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.logger import Logger
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
@@ -12,10 +13,19 @@ os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
class Knowledge(BaseModel):
"""
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
This class manages knowledge sources and provides methods to query them for relevant information.
It automatically detects and reloads file-based knowledge sources when their underlying files change.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
The knowledge sources to use for querying.
storage: Optional[KnowledgeStorage] = Field(default=None)
The storage backend for knowledge embeddings.
embedder: Optional[Dict[str, Any]] = None
Configuration for the embedding model.
collection_name: Optional[str] = None
Name of the collection to use for storage.
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
@@ -23,6 +33,7 @@ class Knowledge(BaseModel):
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
_logger: Logger = Logger(verbose=True)
def __init__(
self,
@@ -54,6 +65,8 @@ class Knowledge(BaseModel):
"""
if self.storage is None:
raise ValueError("Storage is not initialized.")
self._check_and_reload_sources()
results = self.storage.search(
query,
@@ -61,6 +74,65 @@ class Knowledge(BaseModel):
score_threshold=score_threshold,
)
return results
def _check_and_reload_sources(self):
"""
Check if any file-based knowledge sources have changed and reload them if necessary.
This method detects modifications to source files by comparing their modification timestamps
with previously recorded values. When changes are detected, the source is reloaded and
the storage is updated with the new content.
The method handles various file-related exceptions with specific error messages:
- FileNotFoundError: When a source file no longer exists
- PermissionError: When there are permission issues accessing a file
- IOError: When there are I/O errors reading a file
- ValueError: When there are issues with file content format
- Other unexpected exceptions are also caught and logged
Each exception is logged with appropriate context to aid in troubleshooting.
"""
for source in self.sources:
source_name = source.__class__.__name__
try:
if hasattr(source, 'files_have_changed') and source.files_have_changed():
self._logger.log("info", f"Reloading modified source: {source_name}")
# Update file modification timestamps
try:
source._record_file_mtimes()
except (PermissionError, IOError) as e:
self._logger.log("warning", f"Could not record file timestamps for {source_name}: {str(e)}")
try:
source.content = source.load_content()
except FileNotFoundError as e:
self._logger.log("error", f"File not found when loading content for {source_name}: {str(e)}")
continue
except PermissionError as e:
self._logger.log("error", f"Permission error when loading content for {source_name}: {str(e)}")
continue
except IOError as e:
self._logger.log("error", f"IO error when loading content for {source_name}: {str(e)}")
continue
except ValueError as e:
self._logger.log("error", f"Invalid content format in {source_name}: {str(e)}")
continue
try:
source.add()
self._logger.log("info", f"Successfully reloaded and updated {source_name}")
except Exception as e:
self._logger.log("error", f"Failed to update storage for {source_name}: {str(e)}")
except FileNotFoundError as e:
self._logger.log("error", f"File not found when checking for updates in {source_name}: {str(e)}")
except PermissionError as e:
self._logger.log("error", f"Permission error when checking for updates in {source_name}: {str(e)}")
except IOError as e:
self._logger.log("error", f"IO error when checking for updates in {source_name}: {str(e)}")
except Exception as e:
self._logger.log("error", f"Unexpected error when checking for updates in {source_name}: {str(e)}")
def add_sources(self):
try:

View File

@@ -1,3 +1,4 @@
import os
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
@@ -11,9 +12,24 @@ from crewai.utilities.logger import Logger
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Base class for knowledge sources that load content from files."""
"""
Base class for knowledge sources that load content from files.
This class provides common functionality for file-based knowledge sources,
including file path validation, content loading, and change detection.
It automatically tracks file modification times to detect when files have
been updated and need to be reloaded.
Attributes:
file_path: Deprecated. Use file_paths instead.
file_paths: Path(s) to the file(s) containing knowledge data.
content: Dictionary mapping file paths to their loaded content.
storage: Storage backend for the knowledge data.
safe_file_paths: Validated list of Path objects.
"""
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
@@ -43,7 +59,34 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Post-initialization method to load content."""
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self._record_file_mtimes()
self.content = self.load_content()
def _record_file_mtimes(self):
"""
Record modification times of all files.
This method stores the current modification timestamps of all files
in the _file_mtimes dictionary. These timestamps are later used to
detect when files have been modified and need to be reloaded.
Thread-safe: Uses a lock to prevent concurrent modifications.
"""
with self._lock:
self._file_mtimes = {}
for path in self.safe_file_paths:
try:
if path.exists() and path.is_file():
if os.access(path, os.R_OK):
self._file_mtimes[path] = path.stat().st_mtime
else:
self._logger.log("warning", f"File {path} is not readable.")
except PermissionError as e:
self._logger.log("error", f"Permission error when recording file timestamp for {path}: {str(e)}")
except IOError as e:
self._logger.log("error", f"IO error when recording file timestamp for {path}: {str(e)}")
except Exception as e:
self._logger.log("error", f"Unexpected error when recording file timestamp for {path}: {str(e)}")
@abstractmethod
def load_content(self) -> Dict[Path, str]:
@@ -107,3 +150,41 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
)
return [self.convert_to_path(path) for path in path_list]
def files_have_changed(self) -> bool:
"""
Check if any of the files have been modified since they were last loaded.
This method compares the current modification timestamps of files with the
previously recorded timestamps to detect changes. When a file has been modified,
it logs the change and returns True to trigger a reload.
Returns:
bool: True if any file has been modified, False otherwise.
"""
for path in self.safe_file_paths:
try:
if not path.exists():
self._logger.log("warning", f"File {path} no longer exists.")
continue
if not path.is_file():
self._logger.log("warning", f"Path {path} is not a file.")
continue
if not os.access(path, os.R_OK):
self._logger.log("warning", f"File {path} is not readable.")
continue
current_mtime = path.stat().st_mtime
if path not in self._file_mtimes or current_mtime > self._file_mtimes[path]:
self._logger.log("info", f"File {path} has been modified. Reloading data.")
return True
except PermissionError as e:
self._logger.log("error", f"Permission error when checking file {path}: {str(e)}")
except IOError as e:
self._logger.log("error", f"IO error when checking file {path}: {str(e)}")
except Exception as e:
self._logger.log("error", f"Unexpected error when checking file {path}: {str(e)}")
return False

View File

@@ -0,0 +1,85 @@
import os
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
@patch('crewai.knowledge.storage.knowledge_storage.KnowledgeStorage.search')
@patch('crewai.knowledge.source.csv_knowledge_source.CSVKnowledgeSource.add')
def test_csv_knowledge_source_updates(mock_add, mock_search, tmpdir):
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
mock_search.side_effect = [
[{"context": "name,age,city\nJohn,30,New York\nAlice,25,San Francisco\nBob,28,Chicago"}],
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}],
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}]
]
csv_path = str(tmpdir / "test_updates.csv")
initial_csv_content = [
["name", "age", "city"],
["John", "30", "New York"],
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
]
with open(csv_path, "w") as f:
for row in initial_csv_content:
f.write(",".join(row) + "\n")
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
original_files_have_changed = csv_source.files_have_changed
files_changed_called = [False]
def spy_files_have_changed():
files_changed_called[0] = True
return original_files_have_changed()
csv_source.files_have_changed = spy_files_have_changed
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
assert hasattr(knowledge, '_check_and_reload_sources'), "Knowledge class is missing _check_and_reload_sources method"
initial_results = knowledge.query(["John"])
assert any("John" in result["context"] for result in initial_results)
assert any("New York" in result["context"] for result in initial_results)
mock_add.reset_mock()
files_changed_called[0] = False
updated_csv_content = [
["name", "age", "city"],
["John", "30", "Boston"], # Changed city
["Alice", "25", "San Francisco"],
["Bob", "28", "Chicago"],
["Eve", "22", "Miami"], # Added new person
]
time.sleep(1)
csv_path_str = str(csv_path)
with open(csv_path_str, "w") as f:
for row in updated_csv_content:
f.write(",".join(row) + "\n")
updated_results = knowledge.query(["John"])
assert files_changed_called[0], "files_have_changed method was not called during query"
assert mock_add.called, "add method was not called to reload the data"
assert any("John" in result["context"] for result in updated_results)
assert any("Boston" in result["context"] for result in updated_results)
assert not any("New York" in result["context"] for result in updated_results)
new_results = knowledge.query(["Eve"])
assert any("Eve" in result["context"] for result in new_results)
assert any("Miami" in result["context"] for result in new_results)