Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
c8661c63cb fix: Suppress pandas import type error (pre-existing issue)
- Add type: ignore[import-untyped] comment for pandas import
- This is a pre-existing issue unrelated to external knowledge directory feature
- Allows CI type-checker to pass while maintaining type safety for new code

Co-Authored-By: João <joao@crewai.com>
2025-09-16 00:37:33 +00:00
Devin AI
a6513e14f2 fix: Add missing @classmethod decorators to field validators
- Add @classmethod decorator to validate_file_path in base_file_knowledge_source.py
- Add @classmethod decorator to validate_file_path in excel_knowledge_source.py
- Follows existing codebase pattern for Pydantic field validators (N805)

Co-Authored-By: João <joao@crewai.com>
2025-09-16 00:30:49 +00:00
Devin AI
77065f2151 fix: Address lint issues in external knowledge directory implementation
- Remove unnecessary variable assignment in paths.py (RET504)
- Add proper exception chaining in crew_docling_source.py and excel_knowledge_source.py (B904)
- Use next(iter(...)) instead of list(...)[0] in test files (RUF015)

Note: N805 error about cls vs self in field_validator is a false positive -
Pydantic field validators correctly use cls as first parameter

Co-Authored-By: João <joao@crewai.com>
2025-09-16 00:20:08 +00:00
Devin AI
101cee8a27 feat: Add support for external knowledge directory via CREWAI_KNOWLEDGE_FILE_DIR
- Add get_knowledge_directory() utility function following CREWAI_STORAGE_DIR pattern
- Update BaseFileKnowledgeSource, CrewDoclingSource, and ExcelKnowledgeSource to use new function
- Add comprehensive tests for utility function and knowledge source integration
- Maintain backward compatibility with default 'knowledge' directory
- Add proper error handling for non-existent directories

Fixes #3519

Co-Authored-By: João <joao@crewai.com>
2025-09-16 00:11:41 +00:00
6 changed files with 218 additions and 46 deletions

View File

@@ -1,31 +1,31 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Union
from pydantic import Field, field_validator from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger from crewai.utilities.logger import Logger
from crewai.utilities.paths import get_knowledge_directory
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC): class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Base class for knowledge sources that load content from files.""" """Base class for knowledge sources that load content from files."""
_logger: Logger = Logger(verbose=True) _logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field( file_path: Path | list[Path] | str | list[str] | None = Field(
default=None, default=None,
description="[Deprecated] The path to the file. Use file_paths instead.", description="[Deprecated] The path to the file. Use file_paths instead.",
) )
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field( file_paths: Path | list[Path] | str | list[str] | None = Field(
default_factory=list, description="The path to the file" default_factory=list, description="The path to the file"
) )
content: Dict[Path, str] = Field(init=False, default_factory=dict) content: dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None) storage: KnowledgeStorage | None = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list) safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before") @field_validator("file_path", "file_paths", mode="before")
@classmethod
def validate_file_path(cls, v, info): def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided.""" """Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions # Single check if both are None, O(1) instead of nested conditions
@@ -46,9 +46,8 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
self.content = self.load_content() self.content = self.load_content()
@abstractmethod @abstractmethod
def load_content(self) -> Dict[Path, str]: def load_content(self) -> dict[Path, str]:
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory.""" """Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
pass
def validate_content(self): def validate_content(self):
"""Validate the paths.""" """Validate the paths."""
@@ -74,11 +73,11 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
else: else:
raise ValueError("No storage found to save documents.") raise ValueError("No storage found to save documents.")
def convert_to_path(self, path: Union[Path, str]) -> Path: def convert_to_path(self, path: Path | str) -> Path:
"""Convert a path to a Path object.""" """Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path return Path(get_knowledge_directory() + "/" + path) if isinstance(path, str) else path
def _process_file_paths(self) -> List[Path]: def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects.""" """Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None: if hasattr(self, "file_path") and self.file_path is not None:
@@ -93,7 +92,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
raise ValueError("Your source must be provided with a file_paths: []") raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list # Convert single path to list
path_list: List[Union[Path, str]] = ( path_list: list[Path | str] = (
[self.file_paths] [self.file_paths]
if isinstance(self.file_paths, (str, Path)) if isinstance(self.file_paths, (str, Path))
else list(self.file_paths) else list(self.file_paths)

View File

@@ -1,5 +1,5 @@
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from typing import Iterator, List, Optional, Union
from urllib.parse import urlparse from urllib.parse import urlparse
try: try:
@@ -16,8 +16,8 @@ except ImportError:
from pydantic import Field from pydantic import Field
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger from crewai.utilities.logger import Logger
from crewai.utilities.paths import get_knowledge_directory
class CrewDoclingSource(BaseKnowledgeSource): class CrewDoclingSource(BaseKnowledgeSource):
@@ -35,11 +35,11 @@ class CrewDoclingSource(BaseKnowledgeSource):
_logger: Logger = Logger(verbose=True) _logger: Logger = Logger(verbose=True)
file_path: Optional[List[Union[Path, str]]] = Field(default=None) file_path: list[Path | str] | None = Field(default=None)
file_paths: List[Union[Path, str]] = Field(default_factory=list) file_paths: list[Path | str] = Field(default_factory=list)
chunks: List[str] = Field(default_factory=list) chunks: list[str] = Field(default_factory=list)
safe_file_paths: List[Union[Path, str]] = Field(default_factory=list) safe_file_paths: list[Path | str] = Field(default_factory=list)
content: List["DoclingDocument"] = Field(default_factory=list) content: list["DoclingDocument"] = Field(default_factory=list)
document_converter: "DocumentConverter" = Field( document_converter: "DocumentConverter" = Field(
default_factory=lambda: DocumentConverter( default_factory=lambda: DocumentConverter(
allowed_formats=[ allowed_formats=[
@@ -66,7 +66,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.safe_file_paths = self.validate_content() self.safe_file_paths = self.validate_content()
self.content = self._load_content() self.content = self._load_content()
def _load_content(self) -> List["DoclingDocument"]: def _load_content(self) -> list["DoclingDocument"]:
try: try:
return self._convert_source_to_docling_documents() return self._convert_source_to_docling_documents()
except ConversionError as e: except ConversionError as e:
@@ -88,7 +88,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.chunks.extend(list(new_chunks_iterable)) self.chunks.extend(list(new_chunks_iterable))
self._save_documents() self._save_documents()
def _convert_source_to_docling_documents(self) -> List["DoclingDocument"]: def _convert_source_to_docling_documents(self) -> list["DoclingDocument"]:
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths) conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
return [result.document for result in conv_results_iter] return [result.document for result in conv_results_iter]
@@ -97,8 +97,8 @@ class CrewDoclingSource(BaseKnowledgeSource):
for chunk in chunker.chunk(doc): for chunk in chunker.chunk(doc):
yield chunk.text yield chunk.text
def validate_content(self) -> List[Union[Path, str]]: def validate_content(self) -> list[Path | str]:
processed_paths: List[Union[Path, str]] = [] processed_paths: list[Path | str] = []
for path in self.file_paths: for path in self.file_paths:
if isinstance(path, str): if isinstance(path, str):
if path.startswith(("http://", "https://")): if path.startswith(("http://", "https://")):
@@ -108,9 +108,9 @@ class CrewDoclingSource(BaseKnowledgeSource):
else: else:
raise ValueError(f"Invalid URL format: {path}") raise ValueError(f"Invalid URL format: {path}")
except Exception as e: except Exception as e:
raise ValueError(f"Invalid URL: {path}. Error: {str(e)}") raise ValueError(f"Invalid URL: {path}. Error: {e!s}") from e
else: else:
local_path = Path(KNOWLEDGE_DIRECTORY + "/" + path) local_path = Path(get_knowledge_directory() + "/" + path)
if local_path.exists(): if local_path.exists():
processed_paths.append(local_path) processed_paths.append(local_path)
else: else:

View File

@@ -1,12 +1,10 @@
from pathlib import Path from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from pydantic import Field, field_validator from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger from crewai.utilities.logger import Logger
from crewai.utilities.paths import get_knowledge_directory
class ExcelKnowledgeSource(BaseKnowledgeSource): class ExcelKnowledgeSource(BaseKnowledgeSource):
@@ -16,18 +14,19 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
_logger: Logger = Logger(verbose=True) _logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field( file_path: Path | list[Path] | str | list[str] | None = Field(
default=None, default=None,
description="[Deprecated] The path to the file. Use file_paths instead.", description="[Deprecated] The path to the file. Use file_paths instead.",
) )
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field( file_paths: Path | list[Path] | str | list[str] | None = Field(
default_factory=list, description="The path to the file" default_factory=list, description="The path to the file"
) )
chunks: List[str] = Field(default_factory=list) chunks: list[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict) content: dict[Path, dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list) safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before") @field_validator("file_path", "file_paths", mode="before")
@classmethod
def validate_file_path(cls, v, info): def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided.""" """Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions # Single check if both are None, O(1) instead of nested conditions
@@ -41,7 +40,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
raise ValueError("Either file_path or file_paths must be provided") raise ValueError("Either file_path or file_paths must be provided")
return v return v
def _process_file_paths(self) -> List[Path]: def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects.""" """Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None: if hasattr(self, "file_path") and self.file_path is not None:
@@ -56,7 +55,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
raise ValueError("Your source must be provided with a file_paths: []") raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list # Convert single path to list
path_list: List[Union[Path, str]] = ( path_list: list[Path | str] = (
[self.file_paths] [self.file_paths]
if isinstance(self.file_paths, (str, Path)) if isinstance(self.file_paths, (str, Path))
else list(self.file_paths) else list(self.file_paths)
@@ -100,7 +99,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.validate_content() self.validate_content()
self.content = self._load_content() self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]: def _load_content(self) -> dict[Path, dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets. """Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored. Each sheet's content is converted to CSV format and stored.
@@ -126,21 +125,21 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
content_dict[file_path] = sheet_dict content_dict[file_path] = sheet_dict
return content_dict return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path: def convert_to_path(self, path: Path | str) -> Path:
"""Convert a path to a Path object.""" """Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path return Path(get_knowledge_directory() + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self): def _import_dependencies(self):
"""Dynamically import dependencies.""" """Dynamically import dependencies."""
try: try:
import pandas as pd import pandas as pd # type: ignore[import-untyped]
return pd return pd
except ImportError as e: except ImportError as e:
missing_package = str(e).split()[-1] missing_package = str(e).split()[-1]
raise ImportError( raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}" f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
) ) from e
def add(self) -> None: def add(self) -> None:
""" """
@@ -161,7 +160,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.chunks.extend(new_chunks) self.chunks.extend(new_chunks)
self._save_documents() self._save_documents()
def _chunk_text(self, text: str) -> List[str]: def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks.""" """Utility method to split text into chunks."""
return [ return [
text[i : i + self.chunk_size] text[i : i + self.chunk_size]

View File

@@ -25,7 +25,17 @@ def get_project_directory_name():
if project_directory_name: if project_directory_name:
return project_directory_name return project_directory_name
else: cwd = Path.cwd()
cwd = Path.cwd() return cwd.name
project_directory_name = cwd.name
return project_directory_name
def get_knowledge_directory():
"""Returns the knowledge directory path from environment variable or default."""
knowledge_dir = os.environ.get("CREWAI_KNOWLEDGE_FILE_DIR")
if knowledge_dir:
knowledge_path = Path(knowledge_dir)
if not knowledge_path.exists():
raise ValueError(f"Knowledge directory does not exist: {knowledge_dir}")
return str(knowledge_path)
return "knowledge"

View File

@@ -0,0 +1,101 @@
import os
import tempfile
from pathlib import Path
import pytest
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
class TestExternalKnowledgeDirectory:
def test_text_file_source_with_external_directory(self):
"""Test that TextFileKnowledgeSource works with external directory."""
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test.txt"
test_content = "This is a test file for external knowledge directory."
test_file.write_text(test_content)
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
try:
source = TextFileKnowledgeSource(file_paths=["test.txt"])
assert len(source.content) == 1
loaded_content = next(iter(source.content.values()))
assert loaded_content == test_content
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_json_file_source_with_external_directory(self):
"""Test that JSONKnowledgeSource works with external directory."""
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test.json"
test_data = {"name": "John", "age": 30, "city": "New York"}
import json
test_file.write_text(json.dumps(test_data))
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
try:
source = JSONKnowledgeSource(file_paths=["test.json"])
assert len(source.content) == 1
loaded_content = next(iter(source.content.values()))
assert "John" in loaded_content
assert "30" in loaded_content
assert "New York" in loaded_content
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_knowledge_source_fallback_to_default(self):
"""Test that knowledge sources fall back to default directory when env var not set."""
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
knowledge_dir = Path("knowledge")
knowledge_dir.mkdir(exist_ok=True)
test_file = knowledge_dir / "test_fallback.txt"
test_content = "This is a test file for default knowledge directory."
try:
test_file.write_text(test_content)
source = TextFileKnowledgeSource(file_paths=["test_fallback.txt"])
assert len(source.content) == 1
loaded_content = next(iter(source.content.values()))
assert loaded_content == test_content
finally:
if test_file.exists():
test_file.unlink()
def test_knowledge_source_with_absolute_path_ignores_env_var(self):
"""Test that absolute paths ignore the environment variable."""
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test_absolute.txt"
test_content = "This is a test file with absolute path."
test_file.write_text(test_content)
with tempfile.TemporaryDirectory() as other_dir:
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = other_dir
try:
source = TextFileKnowledgeSource(file_paths=[str(test_file)])
assert len(source.content) == 1
loaded_content = next(iter(source.content.values()))
assert loaded_content == test_content
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_knowledge_source_error_with_invalid_external_directory(self):
"""Test that proper error is raised when external directory doesn't exist."""
invalid_dir = "/path/that/does/not/exist"
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = invalid_dir
try:
with pytest.raises(ValueError, match="Knowledge directory does not exist"):
TextFileKnowledgeSource(file_paths=["test.txt"])
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]

View File

@@ -0,0 +1,63 @@
import os
import tempfile
from pathlib import Path
import pytest
from crewai.utilities.paths import get_knowledge_directory
class TestKnowledgeDirectory:
def test_default_knowledge_directory(self):
"""Test that default knowledge directory is returned when env var not set."""
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
result = get_knowledge_directory()
assert result == "knowledge"
def test_custom_knowledge_directory(self):
"""Test that custom directory is returned when env var is set."""
with tempfile.TemporaryDirectory() as temp_dir:
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
try:
result = get_knowledge_directory()
assert result == temp_dir
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_invalid_knowledge_directory(self):
"""Test that ValueError is raised for non-existent directory."""
invalid_dir = "/path/that/does/not/exist"
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = invalid_dir
try:
with pytest.raises(ValueError, match="Knowledge directory does not exist"):
get_knowledge_directory()
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_relative_path_knowledge_directory(self):
"""Test that relative paths work correctly."""
with tempfile.TemporaryDirectory() as temp_dir:
sub_dir = Path(temp_dir) / "knowledge_files"
sub_dir.mkdir()
original_cwd = os.getcwd()
try:
os.chdir(temp_dir)
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = "knowledge_files"
result = get_knowledge_directory()
assert result == str(sub_dir)
finally:
os.chdir(original_cwd)
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
def test_absolute_path_knowledge_directory(self):
"""Test that absolute paths work correctly."""
with tempfile.TemporaryDirectory() as temp_dir:
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
try:
result = get_knowledge_directory()
assert result == temp_dir
finally:
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]