mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-28 18:28:30 +00:00
Compare commits
4 Commits
lorenze/ad
...
devin/1757
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8661c63cb | ||
|
|
a6513e14f2 | ||
|
|
77065f2151 | ||
|
|
101cee8a27 |
@@ -1,31 +1,31 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from pydantic import Field, field_validator
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
|
||||
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.paths import get_knowledge_directory
|
||||
|
||||
|
||||
class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
"""Base class for knowledge sources that load content from files."""
|
||||
|
||||
_logger: Logger = Logger(verbose=True)
|
||||
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
file_path: Path | list[Path] | str | list[str] | None = Field(
|
||||
default=None,
|
||||
description="[Deprecated] The path to the file. Use file_paths instead.",
|
||||
)
|
||||
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
file_paths: Path | list[Path] | str | list[str] | None = Field(
|
||||
default_factory=list, description="The path to the file"
|
||||
)
|
||||
content: Dict[Path, str] = Field(init=False, default_factory=dict)
|
||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||
safe_file_paths: List[Path] = Field(default_factory=list)
|
||||
content: dict[Path, str] = Field(init=False, default_factory=dict)
|
||||
storage: KnowledgeStorage | None = Field(default=None)
|
||||
safe_file_paths: list[Path] = Field(default_factory=list)
|
||||
|
||||
@field_validator("file_path", "file_paths", mode="before")
|
||||
@classmethod
|
||||
def validate_file_path(cls, v, info):
|
||||
"""Validate that at least one of file_path or file_paths is provided."""
|
||||
# Single check if both are None, O(1) instead of nested conditions
|
||||
@@ -46,9 +46,8 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
self.content = self.load_content()
|
||||
|
||||
@abstractmethod
|
||||
def load_content(self) -> Dict[Path, str]:
|
||||
def load_content(self) -> dict[Path, str]:
|
||||
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
|
||||
pass
|
||||
|
||||
def validate_content(self):
|
||||
"""Validate the paths."""
|
||||
@@ -74,11 +73,11 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
else:
|
||||
raise ValueError("No storage found to save documents.")
|
||||
|
||||
def convert_to_path(self, path: Union[Path, str]) -> Path:
|
||||
def convert_to_path(self, path: Path | str) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
|
||||
return Path(get_knowledge_directory() + "/" + path) if isinstance(path, str) else path
|
||||
|
||||
def _process_file_paths(self) -> List[Path]:
|
||||
def _process_file_paths(self) -> list[Path]:
|
||||
"""Convert file_path to a list of Path objects."""
|
||||
|
||||
if hasattr(self, "file_path") and self.file_path is not None:
|
||||
@@ -93,7 +92,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
raise ValueError("Your source must be provided with a file_paths: []")
|
||||
|
||||
# Convert single path to list
|
||||
path_list: List[Union[Path, str]] = (
|
||||
path_list: list[Path | str] = (
|
||||
[self.file_paths]
|
||||
if isinstance(self.file_paths, (str, Path))
|
||||
else list(self.file_paths)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
from typing import Iterator, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
try:
|
||||
@@ -16,8 +16,8 @@ except ImportError:
|
||||
from pydantic import Field
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.paths import get_knowledge_directory
|
||||
|
||||
|
||||
class CrewDoclingSource(BaseKnowledgeSource):
|
||||
@@ -35,11 +35,11 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
|
||||
_logger: Logger = Logger(verbose=True)
|
||||
|
||||
file_path: Optional[List[Union[Path, str]]] = Field(default=None)
|
||||
file_paths: List[Union[Path, str]] = Field(default_factory=list)
|
||||
chunks: List[str] = Field(default_factory=list)
|
||||
safe_file_paths: List[Union[Path, str]] = Field(default_factory=list)
|
||||
content: List["DoclingDocument"] = Field(default_factory=list)
|
||||
file_path: list[Path | str] | None = Field(default=None)
|
||||
file_paths: list[Path | str] = Field(default_factory=list)
|
||||
chunks: list[str] = Field(default_factory=list)
|
||||
safe_file_paths: list[Path | str] = Field(default_factory=list)
|
||||
content: list["DoclingDocument"] = Field(default_factory=list)
|
||||
document_converter: "DocumentConverter" = Field(
|
||||
default_factory=lambda: DocumentConverter(
|
||||
allowed_formats=[
|
||||
@@ -66,7 +66,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
self.safe_file_paths = self.validate_content()
|
||||
self.content = self._load_content()
|
||||
|
||||
def _load_content(self) -> List["DoclingDocument"]:
|
||||
def _load_content(self) -> list["DoclingDocument"]:
|
||||
try:
|
||||
return self._convert_source_to_docling_documents()
|
||||
except ConversionError as e:
|
||||
@@ -88,7 +88,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
self.chunks.extend(list(new_chunks_iterable))
|
||||
self._save_documents()
|
||||
|
||||
def _convert_source_to_docling_documents(self) -> List["DoclingDocument"]:
|
||||
def _convert_source_to_docling_documents(self) -> list["DoclingDocument"]:
|
||||
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
|
||||
return [result.document for result in conv_results_iter]
|
||||
|
||||
@@ -97,8 +97,8 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
for chunk in chunker.chunk(doc):
|
||||
yield chunk.text
|
||||
|
||||
def validate_content(self) -> List[Union[Path, str]]:
|
||||
processed_paths: List[Union[Path, str]] = []
|
||||
def validate_content(self) -> list[Path | str]:
|
||||
processed_paths: list[Path | str] = []
|
||||
for path in self.file_paths:
|
||||
if isinstance(path, str):
|
||||
if path.startswith(("http://", "https://")):
|
||||
@@ -108,9 +108,9 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
else:
|
||||
raise ValueError(f"Invalid URL format: {path}")
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid URL: {path}. Error: {str(e)}")
|
||||
raise ValueError(f"Invalid URL: {path}. Error: {e!s}") from e
|
||||
else:
|
||||
local_path = Path(KNOWLEDGE_DIRECTORY + "/" + path)
|
||||
local_path = Path(get_knowledge_directory() + "/" + path)
|
||||
if local_path.exists():
|
||||
processed_paths.append(local_path)
|
||||
else:
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from pydantic import Field, field_validator
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
|
||||
from crewai.utilities.logger import Logger
|
||||
from crewai.utilities.paths import get_knowledge_directory
|
||||
|
||||
|
||||
class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
@@ -16,18 +14,19 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
|
||||
_logger: Logger = Logger(verbose=True)
|
||||
|
||||
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
file_path: Path | list[Path] | str | list[str] | None = Field(
|
||||
default=None,
|
||||
description="[Deprecated] The path to the file. Use file_paths instead.",
|
||||
)
|
||||
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
|
||||
file_paths: Path | list[Path] | str | list[str] | None = Field(
|
||||
default_factory=list, description="The path to the file"
|
||||
)
|
||||
chunks: List[str] = Field(default_factory=list)
|
||||
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
|
||||
safe_file_paths: List[Path] = Field(default_factory=list)
|
||||
chunks: list[str] = Field(default_factory=list)
|
||||
content: dict[Path, dict[str, str]] = Field(default_factory=dict)
|
||||
safe_file_paths: list[Path] = Field(default_factory=list)
|
||||
|
||||
@field_validator("file_path", "file_paths", mode="before")
|
||||
@classmethod
|
||||
def validate_file_path(cls, v, info):
|
||||
"""Validate that at least one of file_path or file_paths is provided."""
|
||||
# Single check if both are None, O(1) instead of nested conditions
|
||||
@@ -41,7 +40,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
raise ValueError("Either file_path or file_paths must be provided")
|
||||
return v
|
||||
|
||||
def _process_file_paths(self) -> List[Path]:
|
||||
def _process_file_paths(self) -> list[Path]:
|
||||
"""Convert file_path to a list of Path objects."""
|
||||
|
||||
if hasattr(self, "file_path") and self.file_path is not None:
|
||||
@@ -56,7 +55,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
raise ValueError("Your source must be provided with a file_paths: []")
|
||||
|
||||
# Convert single path to list
|
||||
path_list: List[Union[Path, str]] = (
|
||||
path_list: list[Path | str] = (
|
||||
[self.file_paths]
|
||||
if isinstance(self.file_paths, (str, Path))
|
||||
else list(self.file_paths)
|
||||
@@ -100,7 +99,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
self.validate_content()
|
||||
self.content = self._load_content()
|
||||
|
||||
def _load_content(self) -> Dict[Path, Dict[str, str]]:
|
||||
def _load_content(self) -> dict[Path, dict[str, str]]:
|
||||
"""Load and preprocess Excel file content from multiple sheets.
|
||||
|
||||
Each sheet's content is converted to CSV format and stored.
|
||||
@@ -126,21 +125,21 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
content_dict[file_path] = sheet_dict
|
||||
return content_dict
|
||||
|
||||
def convert_to_path(self, path: Union[Path, str]) -> Path:
|
||||
def convert_to_path(self, path: Path | str) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
|
||||
return Path(get_knowledge_directory() + "/" + path) if isinstance(path, str) else path
|
||||
|
||||
def _import_dependencies(self):
|
||||
"""Dynamically import dependencies."""
|
||||
try:
|
||||
import pandas as pd
|
||||
import pandas as pd # type: ignore[import-untyped]
|
||||
|
||||
return pd
|
||||
except ImportError as e:
|
||||
missing_package = str(e).split()[-1]
|
||||
raise ImportError(
|
||||
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
|
||||
)
|
||||
) from e
|
||||
|
||||
def add(self) -> None:
|
||||
"""
|
||||
@@ -161,7 +160,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
|
||||
self.chunks.extend(new_chunks)
|
||||
self._save_documents()
|
||||
|
||||
def _chunk_text(self, text: str) -> List[str]:
|
||||
def _chunk_text(self, text: str) -> list[str]:
|
||||
"""Utility method to split text into chunks."""
|
||||
return [
|
||||
text[i : i + self.chunk_size]
|
||||
|
||||
@@ -25,7 +25,17 @@ def get_project_directory_name():
|
||||
|
||||
if project_directory_name:
|
||||
return project_directory_name
|
||||
else:
|
||||
cwd = Path.cwd()
|
||||
project_directory_name = cwd.name
|
||||
return project_directory_name
|
||||
cwd = Path.cwd()
|
||||
return cwd.name
|
||||
|
||||
|
||||
def get_knowledge_directory():
|
||||
"""Returns the knowledge directory path from environment variable or default."""
|
||||
knowledge_dir = os.environ.get("CREWAI_KNOWLEDGE_FILE_DIR")
|
||||
|
||||
if knowledge_dir:
|
||||
knowledge_path = Path(knowledge_dir)
|
||||
if not knowledge_path.exists():
|
||||
raise ValueError(f"Knowledge directory does not exist: {knowledge_dir}")
|
||||
return str(knowledge_path)
|
||||
return "knowledge"
|
||||
|
||||
101
tests/knowledge/test_external_knowledge_directory.py
Normal file
101
tests/knowledge/test_external_knowledge_directory.py
Normal file
@@ -0,0 +1,101 @@
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
|
||||
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
|
||||
|
||||
|
||||
class TestExternalKnowledgeDirectory:
|
||||
def test_text_file_source_with_external_directory(self):
|
||||
"""Test that TextFileKnowledgeSource works with external directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_file = Path(temp_dir) / "test.txt"
|
||||
test_content = "This is a test file for external knowledge directory."
|
||||
test_file.write_text(test_content)
|
||||
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
|
||||
try:
|
||||
source = TextFileKnowledgeSource(file_paths=["test.txt"])
|
||||
|
||||
assert len(source.content) == 1
|
||||
loaded_content = next(iter(source.content.values()))
|
||||
assert loaded_content == test_content
|
||||
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_json_file_source_with_external_directory(self):
|
||||
"""Test that JSONKnowledgeSource works with external directory."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_file = Path(temp_dir) / "test.json"
|
||||
test_data = {"name": "John", "age": 30, "city": "New York"}
|
||||
import json
|
||||
test_file.write_text(json.dumps(test_data))
|
||||
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
|
||||
try:
|
||||
source = JSONKnowledgeSource(file_paths=["test.json"])
|
||||
|
||||
assert len(source.content) == 1
|
||||
loaded_content = next(iter(source.content.values()))
|
||||
assert "John" in loaded_content
|
||||
assert "30" in loaded_content
|
||||
assert "New York" in loaded_content
|
||||
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_knowledge_source_fallback_to_default(self):
|
||||
"""Test that knowledge sources fall back to default directory when env var not set."""
|
||||
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
knowledge_dir = Path("knowledge")
|
||||
knowledge_dir.mkdir(exist_ok=True)
|
||||
test_file = knowledge_dir / "test_fallback.txt"
|
||||
test_content = "This is a test file for default knowledge directory."
|
||||
|
||||
try:
|
||||
test_file.write_text(test_content)
|
||||
|
||||
source = TextFileKnowledgeSource(file_paths=["test_fallback.txt"])
|
||||
|
||||
assert len(source.content) == 1
|
||||
loaded_content = next(iter(source.content.values()))
|
||||
assert loaded_content == test_content
|
||||
|
||||
finally:
|
||||
if test_file.exists():
|
||||
test_file.unlink()
|
||||
|
||||
def test_knowledge_source_with_absolute_path_ignores_env_var(self):
|
||||
"""Test that absolute paths ignore the environment variable."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_file = Path(temp_dir) / "test_absolute.txt"
|
||||
test_content = "This is a test file with absolute path."
|
||||
test_file.write_text(test_content)
|
||||
|
||||
with tempfile.TemporaryDirectory() as other_dir:
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = other_dir
|
||||
try:
|
||||
source = TextFileKnowledgeSource(file_paths=[str(test_file)])
|
||||
|
||||
assert len(source.content) == 1
|
||||
loaded_content = next(iter(source.content.values()))
|
||||
assert loaded_content == test_content
|
||||
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_knowledge_source_error_with_invalid_external_directory(self):
|
||||
"""Test that proper error is raised when external directory doesn't exist."""
|
||||
invalid_dir = "/path/that/does/not/exist"
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = invalid_dir
|
||||
try:
|
||||
with pytest.raises(ValueError, match="Knowledge directory does not exist"):
|
||||
TextFileKnowledgeSource(file_paths=["test.txt"])
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
63
tests/utilities/test_knowledge_directory.py
Normal file
63
tests/utilities/test_knowledge_directory.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
from crewai.utilities.paths import get_knowledge_directory
|
||||
|
||||
|
||||
class TestKnowledgeDirectory:
|
||||
def test_default_knowledge_directory(self):
|
||||
"""Test that default knowledge directory is returned when env var not set."""
|
||||
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
result = get_knowledge_directory()
|
||||
assert result == "knowledge"
|
||||
|
||||
def test_custom_knowledge_directory(self):
|
||||
"""Test that custom directory is returned when env var is set."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
|
||||
try:
|
||||
result = get_knowledge_directory()
|
||||
assert result == temp_dir
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_invalid_knowledge_directory(self):
|
||||
"""Test that ValueError is raised for non-existent directory."""
|
||||
invalid_dir = "/path/that/does/not/exist"
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = invalid_dir
|
||||
try:
|
||||
with pytest.raises(ValueError, match="Knowledge directory does not exist"):
|
||||
get_knowledge_directory()
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_relative_path_knowledge_directory(self):
|
||||
"""Test that relative paths work correctly."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
sub_dir = Path(temp_dir) / "knowledge_files"
|
||||
sub_dir.mkdir()
|
||||
|
||||
original_cwd = os.getcwd()
|
||||
try:
|
||||
os.chdir(temp_dir)
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = "knowledge_files"
|
||||
|
||||
result = get_knowledge_directory()
|
||||
assert result == str(sub_dir)
|
||||
finally:
|
||||
os.chdir(original_cwd)
|
||||
if "CREWAI_KNOWLEDGE_FILE_DIR" in os.environ:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
|
||||
def test_absolute_path_knowledge_directory(self):
|
||||
"""Test that absolute paths work correctly."""
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
os.environ["CREWAI_KNOWLEDGE_FILE_DIR"] = temp_dir
|
||||
try:
|
||||
result = get_knowledge_directory()
|
||||
assert result == temp_dir
|
||||
finally:
|
||||
del os.environ["CREWAI_KNOWLEDGE_FILE_DIR"]
|
||||
Reference in New Issue
Block a user