mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
fix: prevent path traversal in knowledge source file paths (#4547)
Add path boundary validation in convert_to_path() to ensure resolved paths stay within the knowledge directory. This prevents attackers from using '../' sequences to read arbitrary files outside the knowledge dir. Fixes: - BaseFileKnowledgeSource.convert_to_path() - ExcelKnowledgeSource.convert_to_path() - CrewDoclingSource.validate_content() inline path construction Added tests covering path traversal rejection and valid path acceptance. Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -84,8 +84,20 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
raise ValueError("No storage found to save documents.")
|
||||
|
||||
def convert_to_path(self, path: Path | str) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
|
||||
"""Convert a path to a Path object.
|
||||
|
||||
Validates that the resolved path stays within the knowledge directory
|
||||
to prevent path traversal attacks.
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
result = (Path(KNOWLEDGE_DIRECTORY) / path).resolve()
|
||||
knowledge_dir = Path(KNOWLEDGE_DIRECTORY).resolve()
|
||||
if not result.is_relative_to(knowledge_dir):
|
||||
raise ValueError(
|
||||
f"Path '{path}' is outside the allowed knowledge directory"
|
||||
)
|
||||
return result
|
||||
return path
|
||||
|
||||
def _process_file_paths(self) -> list[Path]:
|
||||
"""Convert file_path to a list of Path objects."""
|
||||
|
||||
@@ -129,7 +129,12 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid URL: {path}. Error: {e!s}") from e
|
||||
else:
|
||||
local_path = Path(KNOWLEDGE_DIRECTORY + "/" + path)
|
||||
local_path = (Path(KNOWLEDGE_DIRECTORY) / path).resolve()
|
||||
knowledge_dir = Path(KNOWLEDGE_DIRECTORY).resolve()
|
||||
if not local_path.is_relative_to(knowledge_dir):
|
||||
raise ValueError(
|
||||
f"Path '{path}' is outside the allowed knowledge directory"
|
||||
)
|
||||
if local_path.exists():
|
||||
processed_paths.append(local_path)
|
||||
else:
|
||||
|
||||
@@ -130,8 +130,20 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
return content_dict
|
||||
|
||||
def convert_to_path(self, path: Path | str) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
|
||||
"""Convert a path to a Path object.
|
||||
|
||||
Validates that the resolved path stays within the knowledge directory
|
||||
to prevent path traversal attacks.
|
||||
"""
|
||||
if isinstance(path, str):
|
||||
result = (Path(KNOWLEDGE_DIRECTORY) / path).resolve()
|
||||
knowledge_dir = Path(KNOWLEDGE_DIRECTORY).resolve()
|
||||
if not result.is_relative_to(knowledge_dir):
|
||||
raise ValueError(
|
||||
f"Path '{path}' is outside the allowed knowledge directory"
|
||||
)
|
||||
return result
|
||||
return path
|
||||
|
||||
def _import_dependencies(self) -> ModuleType:
|
||||
"""Dynamically import dependencies."""
|
||||
|
||||
@@ -603,6 +603,70 @@ def test_file_path_validation():
|
||||
PDFKnowledgeSource()
|
||||
|
||||
|
||||
def test_path_traversal_blocked_in_base_file_knowledge_source(tmpdir):
|
||||
"""Test that path traversal sequences are blocked in BaseFileKnowledgeSource.convert_to_path()."""
|
||||
traversal_paths = [
|
||||
"../../../etc/passwd",
|
||||
"../../secret_file.txt",
|
||||
"subdir/../../.env",
|
||||
"../outside.txt",
|
||||
]
|
||||
for malicious_path in traversal_paths:
|
||||
with pytest.raises(ValueError, match="outside the allowed knowledge directory"):
|
||||
TextFileKnowledgeSource(file_paths=[malicious_path])
|
||||
|
||||
|
||||
def test_path_traversal_blocked_in_excel_knowledge_source():
|
||||
"""Test that path traversal sequences are blocked in ExcelKnowledgeSource.convert_to_path()."""
|
||||
traversal_paths = [
|
||||
"../../../etc/passwd",
|
||||
"../../secret_file.xlsx",
|
||||
"subdir/../../.env",
|
||||
]
|
||||
for malicious_path in traversal_paths:
|
||||
with pytest.raises(ValueError, match="outside the allowed knowledge directory"):
|
||||
ExcelKnowledgeSource(file_paths=[malicious_path])
|
||||
|
||||
|
||||
def test_path_traversal_blocked_in_crew_docling_source():
|
||||
"""Test that path traversal sequences are blocked in CrewDoclingSource.validate_content()."""
|
||||
pytest.importorskip("docling")
|
||||
|
||||
traversal_paths = [
|
||||
"../../../etc/passwd",
|
||||
"../../secret_file.pdf",
|
||||
"subdir/../../.env",
|
||||
]
|
||||
for malicious_path in traversal_paths:
|
||||
with pytest.raises(ValueError, match="outside the allowed knowledge directory"):
|
||||
CrewDoclingSource(file_paths=[malicious_path])
|
||||
|
||||
|
||||
def test_valid_paths_still_work_in_convert_to_path(tmpdir):
|
||||
"""Test that valid paths within the knowledge directory are accepted."""
|
||||
import os
|
||||
|
||||
knowledge_dir = Path("knowledge")
|
||||
knowledge_dir.mkdir(exist_ok=True)
|
||||
test_file = knowledge_dir / "test_valid.txt"
|
||||
test_file.write_text("test content")
|
||||
|
||||
try:
|
||||
source = TextFileKnowledgeSource(file_paths=[test_file])
|
||||
assert len(source.safe_file_paths) == 1
|
||||
finally:
|
||||
test_file.unlink(missing_ok=True)
|
||||
|
||||
|
||||
def test_path_object_bypasses_traversal_check(tmpdir):
|
||||
"""Test that Path objects (not strings) bypass the traversal check since they are explicit."""
|
||||
file_path = Path(tmpdir.join("test.txt"))
|
||||
file_path.write_text("test content")
|
||||
|
||||
source = TextFileKnowledgeSource(file_paths=[file_path])
|
||||
assert source.safe_file_paths == [file_path]
|
||||
|
||||
|
||||
def test_hash_based_id_generation_without_doc_id(mock_vector_db):
|
||||
"""Test that documents without doc_id generate hash-based IDs. Duplicates are deduplicated before upsert."""
|
||||
import hashlib
|
||||
|
||||
Reference in New Issue
Block a user