mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-22 15:28:30 +00:00
Compare commits
8 Commits
docs/train
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79547fba25 | ||
|
|
171f8b63fd | ||
|
|
72df165b07 | ||
|
|
63eccf5e30 | ||
|
|
a98a44afb2 | ||
|
|
6e0f1fe38d | ||
|
|
c2bf2b3210 | ||
|
|
14579a7861 |
92
manual_test_csv_update.py
Normal file
92
manual_test_csv_update.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
"""
|
||||||
|
Manual test script to verify CSV knowledge source update functionality.
|
||||||
|
This script creates a CSV file, loads it as a knowledge source, updates the file,
|
||||||
|
and verifies that the updated content is detected and loaded.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from crewai.knowledge.knowledge import Knowledge
|
||||||
|
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
|
||||||
|
|
||||||
|
|
||||||
|
def test_csv_knowledge_source_updates():
|
||||||
|
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
csv_path = Path(tmpdir) / "test_updates.csv"
|
||||||
|
|
||||||
|
initial_csv_content = [
|
||||||
|
["name", "age", "city"],
|
||||||
|
["John", "30", "New York"],
|
||||||
|
["Alice", "25", "San Francisco"],
|
||||||
|
["Bob", "28", "Chicago"],
|
||||||
|
]
|
||||||
|
|
||||||
|
with open(csv_path, "w") as f:
|
||||||
|
for row in initial_csv_content:
|
||||||
|
f.write(",".join(row) + "\n")
|
||||||
|
|
||||||
|
print(f"Created CSV file at {csv_path}")
|
||||||
|
|
||||||
|
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
|
||||||
|
|
||||||
|
if not hasattr(csv_source, 'files_have_changed'):
|
||||||
|
print("❌ TEST FAILED: files_have_changed method not found in CSVKnowledgeSource")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not hasattr(csv_source, '_file_mtimes'):
|
||||||
|
print("❌ TEST FAILED: _file_mtimes attribute not found in CSVKnowledgeSource")
|
||||||
|
return False
|
||||||
|
|
||||||
|
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
|
||||||
|
|
||||||
|
if not hasattr(knowledge, '_check_and_reload_sources'):
|
||||||
|
print("❌ TEST FAILED: _check_and_reload_sources method not found in Knowledge")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print("✅ All required methods and attributes exist")
|
||||||
|
|
||||||
|
updated_csv_content = [
|
||||||
|
["name", "age", "city"],
|
||||||
|
["John", "30", "Boston"], # Changed city
|
||||||
|
["Alice", "25", "San Francisco"],
|
||||||
|
["Bob", "28", "Chicago"],
|
||||||
|
["Eve", "22", "Miami"], # Added new person
|
||||||
|
]
|
||||||
|
|
||||||
|
print("\nWaiting for 1 second before updating file...")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
with open(csv_path, "w") as f:
|
||||||
|
for row in updated_csv_content:
|
||||||
|
f.write(",".join(row) + "\n")
|
||||||
|
|
||||||
|
print(f"Updated CSV file at {csv_path}")
|
||||||
|
|
||||||
|
if not csv_source.files_have_changed():
|
||||||
|
print("❌ TEST FAILED: files_have_changed did not detect file modification")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print("✅ files_have_changed correctly detected file modification")
|
||||||
|
|
||||||
|
csv_source._record_file_mtimes()
|
||||||
|
csv_source.content = csv_source.load_content()
|
||||||
|
|
||||||
|
content_str = str(csv_source.content)
|
||||||
|
if "Boston" in content_str and "Eve" in content_str and "Miami" in content_str:
|
||||||
|
print("✅ Content was correctly updated with new data")
|
||||||
|
else:
|
||||||
|
print("❌ TEST FAILED: Content was not updated with new data")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print("\n✅ TEST PASSED: CSV knowledge source correctly detects and loads file updates")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
success = test_csv_knowledge_source_updates()
|
||||||
|
sys.exit(0 if success else 1)
|
||||||
@@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict, Field
|
|||||||
|
|
||||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||||
|
from crewai.utilities.logger import Logger
|
||||||
|
|
||||||
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
|
||||||
|
|
||||||
@@ -12,10 +13,19 @@ os.environ["TOKENIZERS_PARALLELISM"] = "false" # removes logging from fastembed
|
|||||||
class Knowledge(BaseModel):
|
class Knowledge(BaseModel):
|
||||||
"""
|
"""
|
||||||
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
|
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
|
||||||
|
|
||||||
|
This class manages knowledge sources and provides methods to query them for relevant information.
|
||||||
|
It automatically detects and reloads file-based knowledge sources when their underlying files change.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
||||||
|
The knowledge sources to use for querying.
|
||||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||||
|
The storage backend for knowledge embeddings.
|
||||||
embedder: Optional[Dict[str, Any]] = None
|
embedder: Optional[Dict[str, Any]] = None
|
||||||
|
Configuration for the embedding model.
|
||||||
|
collection_name: Optional[str] = None
|
||||||
|
Name of the collection to use for storage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
||||||
@@ -23,6 +33,7 @@ class Knowledge(BaseModel):
|
|||||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||||
embedder: Optional[Dict[str, Any]] = None
|
embedder: Optional[Dict[str, Any]] = None
|
||||||
collection_name: Optional[str] = None
|
collection_name: Optional[str] = None
|
||||||
|
_logger: Logger = Logger(verbose=True)
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -55,6 +66,8 @@ class Knowledge(BaseModel):
|
|||||||
if self.storage is None:
|
if self.storage is None:
|
||||||
raise ValueError("Storage is not initialized.")
|
raise ValueError("Storage is not initialized.")
|
||||||
|
|
||||||
|
self._check_and_reload_sources()
|
||||||
|
|
||||||
results = self.storage.search(
|
results = self.storage.search(
|
||||||
query,
|
query,
|
||||||
limit=results_limit,
|
limit=results_limit,
|
||||||
@@ -62,6 +75,65 @@ class Knowledge(BaseModel):
|
|||||||
)
|
)
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def _check_and_reload_sources(self):
|
||||||
|
"""
|
||||||
|
Check if any file-based knowledge sources have changed and reload them if necessary.
|
||||||
|
|
||||||
|
This method detects modifications to source files by comparing their modification timestamps
|
||||||
|
with previously recorded values. When changes are detected, the source is reloaded and
|
||||||
|
the storage is updated with the new content.
|
||||||
|
|
||||||
|
The method handles various file-related exceptions with specific error messages:
|
||||||
|
- FileNotFoundError: When a source file no longer exists
|
||||||
|
- PermissionError: When there are permission issues accessing a file
|
||||||
|
- IOError: When there are I/O errors reading a file
|
||||||
|
- ValueError: When there are issues with file content format
|
||||||
|
- Other unexpected exceptions are also caught and logged
|
||||||
|
|
||||||
|
Each exception is logged with appropriate context to aid in troubleshooting.
|
||||||
|
"""
|
||||||
|
for source in self.sources:
|
||||||
|
source_name = source.__class__.__name__
|
||||||
|
try:
|
||||||
|
if hasattr(source, 'files_have_changed') and source.files_have_changed():
|
||||||
|
self._logger.log("info", f"Reloading modified source: {source_name}")
|
||||||
|
|
||||||
|
# Update file modification timestamps
|
||||||
|
try:
|
||||||
|
source._record_file_mtimes()
|
||||||
|
except (PermissionError, IOError) as e:
|
||||||
|
self._logger.log("warning", f"Could not record file timestamps for {source_name}: {str(e)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
source.content = source.load_content()
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
self._logger.log("error", f"File not found when loading content for {source_name}: {str(e)}")
|
||||||
|
continue
|
||||||
|
except PermissionError as e:
|
||||||
|
self._logger.log("error", f"Permission error when loading content for {source_name}: {str(e)}")
|
||||||
|
continue
|
||||||
|
except IOError as e:
|
||||||
|
self._logger.log("error", f"IO error when loading content for {source_name}: {str(e)}")
|
||||||
|
continue
|
||||||
|
except ValueError as e:
|
||||||
|
self._logger.log("error", f"Invalid content format in {source_name}: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
source.add()
|
||||||
|
self._logger.log("info", f"Successfully reloaded and updated {source_name}")
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.log("error", f"Failed to update storage for {source_name}: {str(e)}")
|
||||||
|
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
self._logger.log("error", f"File not found when checking for updates in {source_name}: {str(e)}")
|
||||||
|
except PermissionError as e:
|
||||||
|
self._logger.log("error", f"Permission error when checking for updates in {source_name}: {str(e)}")
|
||||||
|
except IOError as e:
|
||||||
|
self._logger.log("error", f"IO error when checking for updates in {source_name}: {str(e)}")
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.log("error", f"Unexpected error when checking for updates in {source_name}: {str(e)}")
|
||||||
|
|
||||||
def add_sources(self):
|
def add_sources(self):
|
||||||
try:
|
try:
|
||||||
for source in self.sources:
|
for source in self.sources:
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import os
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Union
|
from typing import Dict, List, Optional, Union
|
||||||
@@ -11,9 +12,24 @@ from crewai.utilities.logger import Logger
|
|||||||
|
|
||||||
|
|
||||||
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||||
"""Base class for knowledge sources that load content from files."""
|
"""
|
||||||
|
Base class for knowledge sources that load content from files.
|
||||||
|
|
||||||
|
This class provides common functionality for file-based knowledge sources,
|
||||||
|
including file path validation, content loading, and change detection.
|
||||||
|
It automatically tracks file modification times to detect when files have
|
||||||
|
been updated and need to be reloaded.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
file_path: Deprecated. Use file_paths instead.
|
||||||
|
file_paths: Path(s) to the file(s) containing knowledge data.
|
||||||
|
content: Dictionary mapping file paths to their loaded content.
|
||||||
|
storage: Storage backend for the knowledge data.
|
||||||
|
safe_file_paths: Validated list of Path objects.
|
||||||
|
"""
|
||||||
|
|
||||||
_logger: Logger = Logger(verbose=True)
|
_logger: Logger = Logger(verbose=True)
|
||||||
|
|
||||||
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||||
default=None,
|
default=None,
|
||||||
description="[Deprecated] The path to the file. Use file_paths instead.",
|
description="[Deprecated] The path to the file. Use file_paths instead.",
|
||||||
@@ -43,8 +59,35 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
|||||||
"""Post-initialization method to load content."""
|
"""Post-initialization method to load content."""
|
||||||
self.safe_file_paths = self._process_file_paths()
|
self.safe_file_paths = self._process_file_paths()
|
||||||
self.validate_content()
|
self.validate_content()
|
||||||
|
self._record_file_mtimes()
|
||||||
self.content = self.load_content()
|
self.content = self.load_content()
|
||||||
|
|
||||||
|
def _record_file_mtimes(self):
|
||||||
|
"""
|
||||||
|
Record modification times of all files.
|
||||||
|
|
||||||
|
This method stores the current modification timestamps of all files
|
||||||
|
in the _file_mtimes dictionary. These timestamps are later used to
|
||||||
|
detect when files have been modified and need to be reloaded.
|
||||||
|
|
||||||
|
Thread-safe: Uses a lock to prevent concurrent modifications.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._file_mtimes = {}
|
||||||
|
for path in self.safe_file_paths:
|
||||||
|
try:
|
||||||
|
if path.exists() and path.is_file():
|
||||||
|
if os.access(path, os.R_OK):
|
||||||
|
self._file_mtimes[path] = path.stat().st_mtime
|
||||||
|
else:
|
||||||
|
self._logger.log("warning", f"File {path} is not readable.")
|
||||||
|
except PermissionError as e:
|
||||||
|
self._logger.log("error", f"Permission error when recording file timestamp for {path}: {str(e)}")
|
||||||
|
except IOError as e:
|
||||||
|
self._logger.log("error", f"IO error when recording file timestamp for {path}: {str(e)}")
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.log("error", f"Unexpected error when recording file timestamp for {path}: {str(e)}")
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def load_content(self) -> Dict[Path, str]:
|
def load_content(self) -> Dict[Path, str]:
|
||||||
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
|
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
|
||||||
@@ -107,3 +150,41 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
|||||||
)
|
)
|
||||||
|
|
||||||
return [self.convert_to_path(path) for path in path_list]
|
return [self.convert_to_path(path) for path in path_list]
|
||||||
|
|
||||||
|
def files_have_changed(self) -> bool:
|
||||||
|
"""
|
||||||
|
Check if any of the files have been modified since they were last loaded.
|
||||||
|
|
||||||
|
This method compares the current modification timestamps of files with the
|
||||||
|
previously recorded timestamps to detect changes. When a file has been modified,
|
||||||
|
it logs the change and returns True to trigger a reload.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if any file has been modified, False otherwise.
|
||||||
|
"""
|
||||||
|
for path in self.safe_file_paths:
|
||||||
|
try:
|
||||||
|
if not path.exists():
|
||||||
|
self._logger.log("warning", f"File {path} no longer exists.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not path.is_file():
|
||||||
|
self._logger.log("warning", f"Path {path} is not a file.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not os.access(path, os.R_OK):
|
||||||
|
self._logger.log("warning", f"File {path} is not readable.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
current_mtime = path.stat().st_mtime
|
||||||
|
if path not in self._file_mtimes or current_mtime > self._file_mtimes[path]:
|
||||||
|
self._logger.log("info", f"File {path} has been modified. Reloading data.")
|
||||||
|
return True
|
||||||
|
except PermissionError as e:
|
||||||
|
self._logger.log("error", f"Permission error when checking file {path}: {str(e)}")
|
||||||
|
except IOError as e:
|
||||||
|
self._logger.log("error", f"IO error when checking file {path}: {str(e)}")
|
||||||
|
except Exception as e:
|
||||||
|
self._logger.log("error", f"Unexpected error when checking file {path}: {str(e)}")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|||||||
85
tests/knowledge/test_csv_knowledge_source_updates.py
Normal file
85
tests/knowledge/test_csv_knowledge_source_updates.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import os
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from crewai.knowledge.knowledge import Knowledge
|
||||||
|
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
|
||||||
|
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||||
|
|
||||||
|
|
||||||
|
@patch('crewai.knowledge.storage.knowledge_storage.KnowledgeStorage.search')
|
||||||
|
@patch('crewai.knowledge.source.csv_knowledge_source.CSVKnowledgeSource.add')
|
||||||
|
def test_csv_knowledge_source_updates(mock_add, mock_search, tmpdir):
|
||||||
|
"""Test that CSVKnowledgeSource properly detects and loads updates to CSV files."""
|
||||||
|
mock_search.side_effect = [
|
||||||
|
[{"context": "name,age,city\nJohn,30,New York\nAlice,25,San Francisco\nBob,28,Chicago"}],
|
||||||
|
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}],
|
||||||
|
[{"context": "name,age,city\nJohn,30,Boston\nAlice,25,San Francisco\nBob,28,Chicago\nEve,22,Miami"}]
|
||||||
|
]
|
||||||
|
|
||||||
|
csv_path = str(tmpdir / "test_updates.csv")
|
||||||
|
|
||||||
|
initial_csv_content = [
|
||||||
|
["name", "age", "city"],
|
||||||
|
["John", "30", "New York"],
|
||||||
|
["Alice", "25", "San Francisco"],
|
||||||
|
["Bob", "28", "Chicago"],
|
||||||
|
]
|
||||||
|
|
||||||
|
with open(csv_path, "w") as f:
|
||||||
|
for row in initial_csv_content:
|
||||||
|
f.write(",".join(row) + "\n")
|
||||||
|
|
||||||
|
csv_source = CSVKnowledgeSource(file_paths=[csv_path])
|
||||||
|
|
||||||
|
original_files_have_changed = csv_source.files_have_changed
|
||||||
|
files_changed_called = [False]
|
||||||
|
|
||||||
|
def spy_files_have_changed():
|
||||||
|
files_changed_called[0] = True
|
||||||
|
return original_files_have_changed()
|
||||||
|
|
||||||
|
csv_source.files_have_changed = spy_files_have_changed
|
||||||
|
|
||||||
|
knowledge = Knowledge(sources=[csv_source], collection_name="test_updates")
|
||||||
|
|
||||||
|
assert hasattr(knowledge, '_check_and_reload_sources'), "Knowledge class is missing _check_and_reload_sources method"
|
||||||
|
|
||||||
|
initial_results = knowledge.query(["John"])
|
||||||
|
assert any("John" in result["context"] for result in initial_results)
|
||||||
|
assert any("New York" in result["context"] for result in initial_results)
|
||||||
|
|
||||||
|
mock_add.reset_mock()
|
||||||
|
files_changed_called[0] = False
|
||||||
|
|
||||||
|
updated_csv_content = [
|
||||||
|
["name", "age", "city"],
|
||||||
|
["John", "30", "Boston"], # Changed city
|
||||||
|
["Alice", "25", "San Francisco"],
|
||||||
|
["Bob", "28", "Chicago"],
|
||||||
|
["Eve", "22", "Miami"], # Added new person
|
||||||
|
]
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
csv_path_str = str(csv_path)
|
||||||
|
with open(csv_path_str, "w") as f:
|
||||||
|
for row in updated_csv_content:
|
||||||
|
f.write(",".join(row) + "\n")
|
||||||
|
|
||||||
|
updated_results = knowledge.query(["John"])
|
||||||
|
|
||||||
|
assert files_changed_called[0], "files_have_changed method was not called during query"
|
||||||
|
|
||||||
|
assert mock_add.called, "add method was not called to reload the data"
|
||||||
|
|
||||||
|
assert any("John" in result["context"] for result in updated_results)
|
||||||
|
assert any("Boston" in result["context"] for result in updated_results)
|
||||||
|
assert not any("New York" in result["context"] for result in updated_results)
|
||||||
|
|
||||||
|
new_results = knowledge.query(["Eve"])
|
||||||
|
assert any("Eve" in result["context"] for result in new_results)
|
||||||
|
assert any("Miami" in result["context"] for result in new_results)
|
||||||
Reference in New Issue
Block a user