mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-13 14:32:47 +00:00
* fix: add SSRF and path traversal protections CVE-2026-2286: validate_url blocks non-http/https schemes, private IPs, loopback, link-local, reserved addresses. Applied to 11 web tools. CVE-2026-2285: validate_path confines file access to the working directory. Applied to 7 file and directory tools. * fix: drop unused assignment from validate_url call * fix: DNS rebinding protection and allow_private flag Rewrite validated URLs to use the resolved IP, preventing DNS rebinding between validation and request time. SDK-based tools use pin_ip=False since they manage their own HTTP clients. Add allow_private flag for deployments that need internal network access. * fix: unify security utilities and restore RAG chokepoint validation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor: move validation to security/ package + address review comments - Move safe_path.py to crewai_tools/security/; add safe_url.py re-export - Keep utilities/safe_path.py as a backwards-compat shim - Update all 21 import sites to use crewai_tools.security.safe_path - files_compressor_tool: validate output_path (user-controlled) - serper_scrape_website_tool: call validate_url() before building payload - brightdata_unlocker: validate_url() already called without assignment (no-op fix) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor: move validation to security/ package, keep utilities/ as compat shim - security/safe_path.py is the canonical location for all validation - utilities/safe_path.py re-exports for backward compatibility - All tool imports already point to security.safe_path - All review comments already addressed in prior commits * fix: move validation outside try/except blocks, use correct directory validator Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: use resolved paths from validation to prevent symlink TOCTOU, remove unused safe_url.py --------- Co-authored-by: Alex <alex@crewai.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
352 lines
12 KiB
Python
352 lines
12 KiB
Python
from abc import ABC, abstractmethod
|
|
import os
|
|
from typing import Any, Literal, cast
|
|
|
|
from crewai.rag.core.base_embeddings_callable import EmbeddingFunction
|
|
from crewai.rag.embeddings.factory import build_embedder
|
|
from crewai.rag.embeddings.types import ProviderSpec
|
|
from crewai.tools import BaseTool
|
|
from pydantic import (
|
|
BaseModel,
|
|
ConfigDict,
|
|
Field,
|
|
TypeAdapter,
|
|
ValidationError,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
from typing_extensions import Self, Unpack
|
|
|
|
from crewai_tools.tools.rag.types import (
|
|
AddDocumentParams,
|
|
ContentItem,
|
|
RagToolConfig,
|
|
VectorDbConfig,
|
|
)
|
|
|
|
|
|
def _validate_embedding_config(
|
|
value: dict[str, Any] | ProviderSpec,
|
|
) -> dict[str, Any] | ProviderSpec:
|
|
"""Validate embedding config and provide clearer error messages for union validation.
|
|
|
|
This pre-validator catches Pydantic ValidationErrors from the ProviderSpec union
|
|
and provides a cleaner, more focused error message that only shows the relevant
|
|
provider's validation errors instead of all 18 union members.
|
|
|
|
Args:
|
|
value: The embedding configuration dictionary or validated ProviderSpec.
|
|
|
|
Returns:
|
|
A validated ProviderSpec instance, or the original value if already validated
|
|
or missing required fields.
|
|
|
|
Raises:
|
|
ValueError: If the configuration is invalid for the specified provider.
|
|
"""
|
|
if not isinstance(value, dict):
|
|
return value
|
|
|
|
provider = value.get("provider")
|
|
if not provider:
|
|
return value
|
|
|
|
try:
|
|
type_adapter: TypeAdapter[ProviderSpec] = TypeAdapter(ProviderSpec)
|
|
return type_adapter.validate_python(value)
|
|
except ValidationError as e:
|
|
provider_key = f"{provider.lower()}providerspec"
|
|
provider_errors = [
|
|
err for err in e.errors() if provider_key in str(err.get("loc", "")).lower()
|
|
]
|
|
|
|
if provider_errors:
|
|
error_msgs = []
|
|
for err in provider_errors:
|
|
loc_parts = err["loc"]
|
|
if str(loc_parts[0]).lower() == provider_key:
|
|
loc_parts = loc_parts[1:]
|
|
loc = ".".join(str(x) for x in loc_parts)
|
|
error_msgs.append(f" - {loc}: {err['msg']}")
|
|
|
|
raise ValueError(
|
|
f"Invalid configuration for embedding provider '{provider}':\n"
|
|
+ "\n".join(error_msgs)
|
|
) from e
|
|
|
|
raise
|
|
|
|
|
|
class Adapter(BaseModel, ABC):
|
|
"""Abstract base class for RAG adapters."""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
@abstractmethod
|
|
def query(
|
|
self,
|
|
question: str,
|
|
similarity_threshold: float | None = None,
|
|
limit: int | None = None,
|
|
) -> str:
|
|
"""Query the knowledge base with a question and return the answer."""
|
|
|
|
@abstractmethod
|
|
def add(
|
|
self,
|
|
*args: ContentItem,
|
|
**kwargs: Unpack[AddDocumentParams],
|
|
) -> None:
|
|
"""Add content to the knowledge base."""
|
|
|
|
|
|
class RagTool(BaseTool):
|
|
class _AdapterPlaceholder(Adapter):
|
|
def query(
|
|
self,
|
|
question: str,
|
|
similarity_threshold: float | None = None,
|
|
limit: int | None = None,
|
|
) -> str:
|
|
raise NotImplementedError
|
|
|
|
def add(
|
|
self,
|
|
*args: ContentItem,
|
|
**kwargs: Unpack[AddDocumentParams],
|
|
) -> None:
|
|
raise NotImplementedError
|
|
|
|
name: str = "Knowledge base"
|
|
description: str = "A knowledge base that can be used to answer questions."
|
|
summarize: bool = False
|
|
similarity_threshold: float = 0.6
|
|
limit: int = 5
|
|
collection_name: str = "rag_tool_collection"
|
|
adapter: Adapter = Field(default_factory=_AdapterPlaceholder)
|
|
config: RagToolConfig = Field(
|
|
default_factory=RagToolConfig,
|
|
description="Configuration format accepted by RagTool.",
|
|
)
|
|
|
|
@field_validator("config", mode="before")
|
|
@classmethod
|
|
def _validate_config(cls, value: Any) -> Any:
|
|
"""Validate config with improved error messages for embedding providers."""
|
|
if not isinstance(value, dict):
|
|
return value
|
|
|
|
embedding_model = value.get("embedding_model")
|
|
if embedding_model:
|
|
try:
|
|
value["embedding_model"] = _validate_embedding_config(embedding_model)
|
|
except ValueError:
|
|
raise
|
|
|
|
return value
|
|
|
|
@model_validator(mode="after")
|
|
def _ensure_adapter(self) -> Self:
|
|
if isinstance(self.adapter, RagTool._AdapterPlaceholder):
|
|
from crewai_tools.adapters.crewai_rag_adapter import CrewAIRagAdapter
|
|
|
|
provider_cfg = self._parse_config(self.config)
|
|
self.adapter = CrewAIRagAdapter(
|
|
collection_name=self.collection_name,
|
|
summarize=self.summarize,
|
|
similarity_threshold=self.similarity_threshold,
|
|
limit=self.limit,
|
|
config=provider_cfg,
|
|
)
|
|
return self
|
|
|
|
def _parse_config(self, config: RagToolConfig) -> Any:
|
|
"""Normalize the RagToolConfig into a provider-specific config object.
|
|
|
|
Defaults to 'chromadb' with no extra provider config if none is supplied.
|
|
"""
|
|
if not config:
|
|
return self._create_provider_config("chromadb", {}, None)
|
|
|
|
vectordb_cfg = cast(VectorDbConfig, config.get("vectordb", {}))
|
|
provider: Literal["chromadb", "qdrant"] = vectordb_cfg.get(
|
|
"provider", "chromadb"
|
|
)
|
|
provider_config: dict[str, Any] = vectordb_cfg.get("config", {})
|
|
|
|
supported = ("chromadb", "qdrant")
|
|
if provider not in supported:
|
|
raise ValueError(
|
|
f"Unsupported vector database provider: '{provider}'. "
|
|
f"CrewAI RAG currently supports: {', '.join(supported)}."
|
|
)
|
|
|
|
embedding_spec: ProviderSpec | None = config.get("embedding_model")
|
|
if embedding_spec:
|
|
embedding_spec = cast(
|
|
ProviderSpec, _validate_embedding_config(embedding_spec)
|
|
)
|
|
|
|
embedding_function = build_embedder(embedding_spec) if embedding_spec else None
|
|
return self._create_provider_config(
|
|
provider, provider_config, embedding_function
|
|
)
|
|
|
|
@staticmethod
|
|
def _create_provider_config(
|
|
provider: Literal["chromadb", "qdrant"],
|
|
provider_config: dict[str, Any],
|
|
embedding_function: EmbeddingFunction[Any] | None,
|
|
) -> Any:
|
|
"""Instantiate provider config with optional embedding_function injected."""
|
|
if provider == "chromadb":
|
|
from crewai.rag.chromadb.config import ChromaDBConfig
|
|
|
|
kwargs = dict(provider_config)
|
|
if embedding_function is not None:
|
|
kwargs["embedding_function"] = embedding_function
|
|
return ChromaDBConfig(**kwargs)
|
|
|
|
if provider == "qdrant":
|
|
from crewai.rag.qdrant.config import QdrantConfig
|
|
|
|
kwargs = dict(provider_config)
|
|
if embedding_function is not None:
|
|
kwargs["embedding_function"] = embedding_function
|
|
return QdrantConfig(**kwargs)
|
|
|
|
raise ValueError(f"Unhandled provider: {provider}")
|
|
|
|
def add(
|
|
self,
|
|
*args: ContentItem,
|
|
**kwargs: Unpack[AddDocumentParams],
|
|
) -> None:
|
|
"""Add content to the knowledge base.
|
|
|
|
|
|
Args:
|
|
*args: Content items to add (strings, paths, or document dicts)
|
|
data_type: DataType enum or string (e.g., "file", "pdf_file", "text")
|
|
path: Path to file or directory, alias to positional arg
|
|
file_path: Alias for path
|
|
metadata: Additional metadata to attach to documents
|
|
url: URL to fetch content from
|
|
website: Website URL to scrape
|
|
github_url: GitHub repository URL
|
|
youtube_url: YouTube video URL
|
|
directory_path: Path to directory
|
|
|
|
Examples:
|
|
rag_tool.add("path/to/document.pdf", data_type=DataType.PDF_FILE)
|
|
|
|
# Keyword argument (documented API)
|
|
rag_tool.add(path="path/to/document.pdf", data_type="file")
|
|
rag_tool.add(file_path="path/to/document.pdf", data_type="pdf_file")
|
|
|
|
# Auto-detect type from extension
|
|
rag_tool.add("path/to/document.pdf") # auto-detects PDF
|
|
"""
|
|
# Validate file paths and URLs before adding to prevent
|
|
# unauthorized file reads and SSRF.
|
|
from urllib.parse import urlparse
|
|
|
|
from crewai_tools.security.safe_path import validate_file_path, validate_url
|
|
|
|
def _check_url(value: str, label: str) -> None:
|
|
try:
|
|
validate_url(value)
|
|
except ValueError as e:
|
|
raise ValueError(f"Blocked unsafe {label}: {e}") from e
|
|
|
|
def _check_path(value: str, label: str) -> str:
|
|
try:
|
|
return validate_file_path(value)
|
|
except ValueError as e:
|
|
raise ValueError(f"Blocked unsafe {label}: {e}") from e
|
|
|
|
validated_args: list[ContentItem] = []
|
|
for arg in args:
|
|
source_ref = (
|
|
str(arg.get("source", arg.get("content", "")))
|
|
if isinstance(arg, dict)
|
|
else str(arg)
|
|
)
|
|
|
|
# Check if it's a URL — only catch urlparse-specific errors here;
|
|
# validate_url's ValueError must propagate so it is never silently bypassed.
|
|
try:
|
|
parsed = urlparse(source_ref)
|
|
except (ValueError, AttributeError):
|
|
parsed = None
|
|
|
|
if parsed is not None and parsed.scheme in ("http", "https", "file"):
|
|
try:
|
|
validate_url(source_ref)
|
|
except ValueError as e:
|
|
raise ValueError(f"Blocked unsafe URL: {e}") from e
|
|
validated_args.append(arg)
|
|
continue
|
|
|
|
# Check if it looks like a file path (not a plain text string).
|
|
# Check both os.sep (backslash on Windows) and "/" so that
|
|
# forward-slash paths like "sub/file.txt" are caught on all platforms.
|
|
if (
|
|
os.path.sep in source_ref
|
|
or "/" in source_ref
|
|
or source_ref.startswith(".")
|
|
or os.path.isabs(source_ref)
|
|
):
|
|
try:
|
|
resolved_ref = validate_file_path(source_ref)
|
|
except ValueError as e:
|
|
raise ValueError(f"Blocked unsafe file path: {e}") from e
|
|
# Use the resolved path to prevent symlink TOCTOU
|
|
if isinstance(arg, dict):
|
|
arg = {**arg}
|
|
if "source" in arg:
|
|
arg["source"] = resolved_ref
|
|
elif "content" in arg:
|
|
arg["content"] = resolved_ref
|
|
else:
|
|
arg = resolved_ref
|
|
|
|
validated_args.append(arg)
|
|
|
|
# Validate keyword path/URL arguments — these are equally user-controlled
|
|
# and must not bypass the checks applied to positional args.
|
|
if "path" in kwargs and kwargs.get("path") is not None:
|
|
kwargs["path"] = _check_path(str(kwargs["path"]), "path")
|
|
if "file_path" in kwargs and kwargs.get("file_path") is not None:
|
|
kwargs["file_path"] = _check_path(str(kwargs["file_path"]), "file_path")
|
|
|
|
if "directory_path" in kwargs and kwargs.get("directory_path") is not None:
|
|
kwargs["directory_path"] = _check_path(
|
|
str(kwargs["directory_path"]), "directory_path"
|
|
)
|
|
|
|
if "url" in kwargs and kwargs.get("url") is not None:
|
|
_check_url(str(kwargs["url"]), "url")
|
|
if "website" in kwargs and kwargs.get("website") is not None:
|
|
_check_url(str(kwargs["website"]), "website")
|
|
if "github_url" in kwargs and kwargs.get("github_url") is not None:
|
|
_check_url(str(kwargs["github_url"]), "github_url")
|
|
if "youtube_url" in kwargs and kwargs.get("youtube_url") is not None:
|
|
_check_url(str(kwargs["youtube_url"]), "youtube_url")
|
|
|
|
self.adapter.add(*validated_args, **kwargs)
|
|
|
|
def _run(
|
|
self,
|
|
query: str,
|
|
similarity_threshold: float | None = None,
|
|
limit: int | None = None,
|
|
) -> str:
|
|
threshold = (
|
|
similarity_threshold
|
|
if similarity_threshold is not None
|
|
else self.similarity_threshold
|
|
)
|
|
result_limit = limit if limit is not None else self.limit
|
|
return f"Relevant Content:\n{self.adapter.query(query, similarity_threshold=threshold, limit=result_limit)}"
|