mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-03 14:09:24 +00:00
fix: add SSRF and path traversal protections (#5315)
* fix: add SSRF and path traversal protections CVE-2026-2286: validate_url blocks non-http/https schemes, private IPs, loopback, link-local, reserved addresses. Applied to 11 web tools. CVE-2026-2285: validate_path confines file access to the working directory. Applied to 7 file and directory tools. * fix: drop unused assignment from validate_url call * fix: DNS rebinding protection and allow_private flag Rewrite validated URLs to use the resolved IP, preventing DNS rebinding between validation and request time. SDK-based tools use pin_ip=False since they manage their own HTTP clients. Add allow_private flag for deployments that need internal network access. * fix: unify security utilities and restore RAG chokepoint validation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor: move validation to security/ package + address review comments - Move safe_path.py to crewai_tools/security/; add safe_url.py re-export - Keep utilities/safe_path.py as a backwards-compat shim - Update all 21 import sites to use crewai_tools.security.safe_path - files_compressor_tool: validate output_path (user-controlled) - serper_scrape_website_tool: call validate_url() before building payload - brightdata_unlocker: validate_url() already called without assignment (no-op fix) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor: move validation to security/ package, keep utilities/ as compat shim - security/safe_path.py is the canonical location for all validation - utilities/safe_path.py re-exports for backward compatibility - All tool imports already point to security.safe_path - All review comments already addressed in prior commits * fix: move validation outside try/except blocks, use correct directory validator Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: use resolved paths from validation to prevent symlink TOCTOU, remove unused safe_url.py --------- Co-authored-by: Alex <alex@crewai.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
205
lib/crewai-tools/src/crewai_tools/security/safe_path.py
Normal file
205
lib/crewai-tools/src/crewai_tools/security/safe_path.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Path and URL validation utilities for crewai-tools.
|
||||
|
||||
Provides validation for file paths and URLs to prevent unauthorized
|
||||
file access and server-side request forgery (SSRF) when tools accept
|
||||
user-controlled or LLM-controlled inputs at runtime.
|
||||
|
||||
Set CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true to bypass validation (not
|
||||
recommended for production).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
|
||||
|
||||
|
||||
def _is_escape_hatch_enabled() -> bool:
|
||||
"""Check if the unsafe paths escape hatch is enabled."""
|
||||
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File path validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def validate_file_path(path: str, base_dir: str | None = None) -> str:
|
||||
"""Validate that a file path is safe to read.
|
||||
|
||||
Resolves symlinks and ``..`` components, then checks that the resolved
|
||||
path falls within *base_dir* (defaults to the current working directory).
|
||||
|
||||
Args:
|
||||
path: The file path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory.
|
||||
"""
|
||||
if _is_escape_hatch_enabled():
|
||||
logger.warning(
|
||||
"%s is enabled — skipping file path validation for: %s",
|
||||
_UNSAFE_PATHS_ENV,
|
||||
path,
|
||||
)
|
||||
return os.path.realpath(path)
|
||||
|
||||
if base_dir is None:
|
||||
base_dir = os.getcwd()
|
||||
|
||||
resolved_base = os.path.realpath(base_dir)
|
||||
resolved_path = os.path.realpath(
|
||||
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
|
||||
)
|
||||
|
||||
# Ensure the resolved path is within the base directory.
|
||||
# When resolved_base already ends with a separator (e.g. the filesystem
|
||||
# root "/"), appending os.sep would double it ("//"), so use the base
|
||||
# as-is in that case.
|
||||
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
|
||||
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
|
||||
raise ValueError(
|
||||
f"Path '{path}' resolves to '{resolved_path}' which is outside "
|
||||
f"the allowed directory '{resolved_base}'. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
|
||||
)
|
||||
|
||||
return resolved_path
|
||||
|
||||
|
||||
def validate_directory_path(path: str, base_dir: str | None = None) -> str:
|
||||
"""Validate that a directory path is safe to read.
|
||||
|
||||
Same as :func:`validate_file_path` but also checks that the path
|
||||
is an existing directory.
|
||||
|
||||
Args:
|
||||
path: The directory path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory or is not a directory.
|
||||
"""
|
||||
validated = validate_file_path(path, base_dir)
|
||||
if not os.path.isdir(validated):
|
||||
raise ValueError(f"Path '{validated}' is not a directory.")
|
||||
return validated
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# URL validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Private and reserved IP ranges that should not be accessed
|
||||
_BLOCKED_IPV4_NETWORKS = [
|
||||
ipaddress.ip_network("10.0.0.0/8"),
|
||||
ipaddress.ip_network("172.16.0.0/12"),
|
||||
ipaddress.ip_network("192.168.0.0/16"),
|
||||
ipaddress.ip_network("127.0.0.0/8"),
|
||||
ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata
|
||||
ipaddress.ip_network("0.0.0.0/32"),
|
||||
]
|
||||
|
||||
_BLOCKED_IPV6_NETWORKS = [
|
||||
ipaddress.ip_network("::1/128"),
|
||||
ipaddress.ip_network("::/128"),
|
||||
ipaddress.ip_network("fc00::/7"), # Unique local addresses
|
||||
ipaddress.ip_network("fe80::/10"), # Link-local IPv6
|
||||
]
|
||||
|
||||
|
||||
def _is_private_or_reserved(ip_str: str) -> bool:
|
||||
"""Check if an IP address is private, reserved, or otherwise unsafe."""
|
||||
try:
|
||||
addr = ipaddress.ip_address(ip_str)
|
||||
# Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4
|
||||
# so they are only checked against IPv4 networks (avoids TypeError when
|
||||
# an IPv4Address is compared against an IPv6Network).
|
||||
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped:
|
||||
addr = addr.ipv4_mapped
|
||||
networks = (
|
||||
_BLOCKED_IPV4_NETWORKS
|
||||
if isinstance(addr, ipaddress.IPv4Address)
|
||||
else _BLOCKED_IPV6_NETWORKS
|
||||
)
|
||||
return any(addr in network for network in networks)
|
||||
except ValueError:
|
||||
return True # If we can't parse, block it
|
||||
|
||||
|
||||
def validate_url(url: str) -> str:
|
||||
"""Validate that a URL is safe to fetch.
|
||||
|
||||
Blocks ``file://`` scheme entirely. For ``http``/``https``, resolves
|
||||
DNS and checks that the target IP is not private or reserved (prevents
|
||||
SSRF to internal services and cloud metadata endpoints).
|
||||
|
||||
Args:
|
||||
url: The URL to validate.
|
||||
|
||||
Returns:
|
||||
The validated URL string.
|
||||
|
||||
Raises:
|
||||
ValueError: If the URL uses a blocked scheme or resolves to a
|
||||
private/reserved IP address.
|
||||
"""
|
||||
if _is_escape_hatch_enabled():
|
||||
logger.warning(
|
||||
"%s is enabled — skipping URL validation for: %s",
|
||||
_UNSAFE_PATHS_ENV,
|
||||
url,
|
||||
)
|
||||
return url
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Block file:// scheme
|
||||
if parsed.scheme == "file":
|
||||
raise ValueError(
|
||||
f"file:// URLs are not allowed: '{url}'. "
|
||||
f"Use a file path instead, or set {_UNSAFE_PATHS_ENV}=true to bypass."
|
||||
)
|
||||
|
||||
# Only allow http and https
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError(
|
||||
f"URL scheme '{parsed.scheme}' is not allowed. Only http and https are supported."
|
||||
)
|
||||
|
||||
if not parsed.hostname:
|
||||
raise ValueError(f"URL has no hostname: '{url}'")
|
||||
|
||||
# Resolve DNS and check IPs
|
||||
try:
|
||||
addrinfos = socket.getaddrinfo(
|
||||
parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80)
|
||||
)
|
||||
except socket.gaierror as exc:
|
||||
raise ValueError(f"Could not resolve hostname: '{parsed.hostname}'") from exc
|
||||
|
||||
for _family, _, _, _, sockaddr in addrinfos:
|
||||
ip_str = str(sockaddr[0])
|
||||
if _is_private_or_reserved(ip_str):
|
||||
raise ValueError(
|
||||
f"URL '{url}' resolves to private/reserved IP {ip_str}. "
|
||||
f"Access to internal networks is not allowed. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass."
|
||||
)
|
||||
|
||||
return url
|
||||
@@ -7,6 +7,8 @@ from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
class BrightDataConfig(BaseModel):
|
||||
API_URL: str = "https://api.brightdata.com/request"
|
||||
@@ -134,6 +136,7 @@ class BrightDataWebUnlockerTool(BaseTool):
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
validate_url(url)
|
||||
try:
|
||||
response = requests.post(
|
||||
self.base_url, json=payload, headers=headers, timeout=30
|
||||
|
||||
@@ -3,6 +3,8 @@ from typing import Any
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class ContextualAICreateAgentSchema(BaseModel):
|
||||
"""Schema for contextual create agent tool."""
|
||||
@@ -47,6 +49,7 @@ class ContextualAICreateAgentTool(BaseTool):
|
||||
document_paths: list[str],
|
||||
) -> str:
|
||||
"""Create a complete RAG pipeline with documents."""
|
||||
resolved_paths = [validate_file_path(doc_path) for doc_path in document_paths]
|
||||
try:
|
||||
import os
|
||||
|
||||
@@ -56,7 +59,7 @@ class ContextualAICreateAgentTool(BaseTool):
|
||||
|
||||
# Upload documents
|
||||
document_ids = []
|
||||
for doc_path in document_paths:
|
||||
for doc_path in resolved_paths:
|
||||
if not os.path.exists(doc_path):
|
||||
raise FileNotFoundError(f"Document not found: {doc_path}")
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class ContextualAIParseSchema(BaseModel):
|
||||
"""Schema for contextual parse tool."""
|
||||
@@ -45,6 +47,7 @@ class ContextualAIParseTool(BaseTool):
|
||||
"""Parse a document using Contextual AI's parser."""
|
||||
if output_types is None:
|
||||
output_types = ["markdown-per-page"]
|
||||
file_path = validate_file_path(file_path)
|
||||
try:
|
||||
import json
|
||||
import os
|
||||
|
||||
@@ -4,6 +4,8 @@ from typing import Any
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_directory_path
|
||||
|
||||
|
||||
class FixedDirectoryReadToolSchema(BaseModel):
|
||||
"""Input for DirectoryReadTool."""
|
||||
@@ -39,6 +41,7 @@ class DirectoryReadTool(BaseTool):
|
||||
if directory is None:
|
||||
raise ValueError("Directory must be provided.")
|
||||
|
||||
directory = validate_directory_path(directory)
|
||||
if directory[-1] == "/":
|
||||
directory = directory[:-1]
|
||||
files_list = [
|
||||
|
||||
@@ -3,8 +3,8 @@ from typing import Any
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.rag.data_types import DataType
|
||||
from crewai_tools.security.safe_path import validate_directory_path
|
||||
from crewai_tools.tools.rag.rag_tool import RagTool
|
||||
from crewai_tools.utilities.safe_path import validate_directory_path
|
||||
|
||||
|
||||
class FixedDirectorySearchToolSchema(BaseModel):
|
||||
@@ -38,7 +38,7 @@ class DirectorySearchTool(RagTool):
|
||||
self._generate_description()
|
||||
|
||||
def add(self, directory: str) -> None: # type: ignore[override]
|
||||
validate_directory_path(directory)
|
||||
directory = validate_directory_path(directory)
|
||||
super().add(directory, data_type=DataType.DIRECTORY)
|
||||
|
||||
def _run( # type: ignore[override]
|
||||
|
||||
@@ -3,6 +3,8 @@ from typing import Any
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class FileReadToolSchema(BaseModel):
|
||||
"""Input for FileReadTool."""
|
||||
@@ -76,6 +78,7 @@ class FileReadTool(BaseTool):
|
||||
if file_path is None:
|
||||
return "Error: No file path provided. Please provide a file path either in the constructor or as an argument."
|
||||
|
||||
file_path = validate_file_path(file_path)
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
if start_line == 1 and line_count is None:
|
||||
|
||||
@@ -5,6 +5,8 @@ import zipfile
|
||||
from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class FileCompressorToolInput(BaseModel):
|
||||
"""Input schema for FileCompressorTool."""
|
||||
@@ -40,12 +42,15 @@ class FileCompressorTool(BaseTool):
|
||||
overwrite: bool = False,
|
||||
format: str = "zip",
|
||||
) -> str:
|
||||
input_path = validate_file_path(input_path)
|
||||
if not os.path.exists(input_path):
|
||||
return f"Input path '{input_path}' does not exist."
|
||||
|
||||
if not output_path:
|
||||
output_path = self._generate_output_path(input_path, format)
|
||||
|
||||
output_path = validate_file_path(output_path)
|
||||
|
||||
format_extension = {
|
||||
"zip": ".zip",
|
||||
"tar": ".tar",
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import Any
|
||||
from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
try:
|
||||
from firecrawl import FirecrawlApp # type: ignore[import-untyped]
|
||||
@@ -106,6 +108,7 @@ class FirecrawlCrawlWebsiteTool(BaseTool):
|
||||
if not self._firecrawl:
|
||||
raise RuntimeError("FirecrawlApp not properly initialized")
|
||||
|
||||
url = validate_url(url)
|
||||
return self._firecrawl.crawl(url=url, poll_interval=2, **self.config)
|
||||
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import Any
|
||||
from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
try:
|
||||
from firecrawl import FirecrawlApp # type: ignore[import-untyped]
|
||||
@@ -106,6 +108,7 @@ class FirecrawlScrapeWebsiteTool(BaseTool):
|
||||
if not self._firecrawl:
|
||||
raise RuntimeError("FirecrawlApp not properly initialized")
|
||||
|
||||
url = validate_url(url)
|
||||
return self._firecrawl.scrape(url=url, **self.config)
|
||||
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ from typing import Any, Literal
|
||||
from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
class HyperbrowserLoadToolSchema(BaseModel):
|
||||
url: str = Field(description="Website URL")
|
||||
@@ -119,6 +121,7 @@ class HyperbrowserLoadTool(BaseTool):
|
||||
) from e
|
||||
|
||||
params = self._prepare_params(params)
|
||||
url = validate_url(url)
|
||||
|
||||
if operation == "scrape":
|
||||
scrape_params = StartScrapeJobParams(url=url, **params)
|
||||
|
||||
@@ -4,6 +4,8 @@ from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
class JinaScrapeWebsiteToolInput(BaseModel):
|
||||
"""Input schema for JinaScrapeWebsiteTool."""
|
||||
@@ -45,6 +47,7 @@ class JinaScrapeWebsiteTool(BaseTool):
|
||||
"Website URL must be provided either during initialization or execution"
|
||||
)
|
||||
|
||||
url = validate_url(url)
|
||||
response = requests.get(
|
||||
f"https://r.jina.ai/{url}", headers=self.headers, timeout=15
|
||||
)
|
||||
|
||||
@@ -11,6 +11,8 @@ from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.types import LLMMessage
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class OCRToolSchema(BaseModel):
|
||||
"""Input schema for Optical Character Recognition Tool.
|
||||
@@ -98,5 +100,6 @@ class OCRTool(BaseTool):
|
||||
Returns:
|
||||
str: Base64-encoded image data as a UTF-8 string.
|
||||
"""
|
||||
image_path = validate_file_path(image_path)
|
||||
with open(image_path, "rb") as image_file:
|
||||
return base64.b64encode(image_file.read()).decode()
|
||||
|
||||
@@ -251,7 +251,7 @@ class RagTool(BaseTool):
|
||||
# unauthorized file reads and SSRF.
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from crewai_tools.utilities.safe_path import validate_file_path, validate_url
|
||||
from crewai_tools.security.safe_path import validate_file_path, validate_url
|
||||
|
||||
def _check_url(value: str, label: str) -> None:
|
||||
try:
|
||||
@@ -259,9 +259,9 @@ class RagTool(BaseTool):
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Blocked unsafe {label}: {e}") from e
|
||||
|
||||
def _check_path(value: str, label: str) -> None:
|
||||
def _check_path(value: str, label: str) -> str:
|
||||
try:
|
||||
validate_file_path(value)
|
||||
return validate_file_path(value)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Blocked unsafe {label}: {e}") from e
|
||||
|
||||
@@ -298,21 +298,32 @@ class RagTool(BaseTool):
|
||||
or os.path.isabs(source_ref)
|
||||
):
|
||||
try:
|
||||
validate_file_path(source_ref)
|
||||
resolved_ref = validate_file_path(source_ref)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Blocked unsafe file path: {e}") from e
|
||||
# Use the resolved path to prevent symlink TOCTOU
|
||||
if isinstance(arg, dict):
|
||||
arg = {**arg}
|
||||
if "source" in arg:
|
||||
arg["source"] = resolved_ref
|
||||
elif "content" in arg:
|
||||
arg["content"] = resolved_ref
|
||||
else:
|
||||
arg = resolved_ref
|
||||
|
||||
validated_args.append(arg)
|
||||
|
||||
# Validate keyword path/URL arguments — these are equally user-controlled
|
||||
# and must not bypass the checks applied to positional args.
|
||||
if "path" in kwargs and kwargs.get("path") is not None:
|
||||
_check_path(str(kwargs["path"]), "path")
|
||||
kwargs["path"] = _check_path(str(kwargs["path"]), "path")
|
||||
if "file_path" in kwargs and kwargs.get("file_path") is not None:
|
||||
_check_path(str(kwargs["file_path"]), "file_path")
|
||||
kwargs["file_path"] = _check_path(str(kwargs["file_path"]), "file_path")
|
||||
|
||||
if "directory_path" in kwargs and kwargs.get("directory_path") is not None:
|
||||
_check_path(str(kwargs["directory_path"]), "directory_path")
|
||||
kwargs["directory_path"] = _check_path(
|
||||
str(kwargs["directory_path"]), "directory_path"
|
||||
)
|
||||
|
||||
if "url" in kwargs and kwargs.get("url") is not None:
|
||||
_check_url(str(kwargs["url"]), "url")
|
||||
|
||||
@@ -5,6 +5,8 @@ from crewai.tools import BaseTool
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
@@ -81,6 +83,7 @@ class ScrapeElementFromWebsiteTool(BaseTool):
|
||||
if website_url is None or css_element is None:
|
||||
raise ValueError("Both website_url and css_element must be provided.")
|
||||
|
||||
website_url = validate_url(website_url)
|
||||
page = requests.get(
|
||||
website_url,
|
||||
headers=self.headers,
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import Any
|
||||
from pydantic import Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
try:
|
||||
from bs4 import BeautifulSoup
|
||||
@@ -73,6 +75,7 @@ class ScrapeWebsiteTool(BaseTool):
|
||||
if website_url is None:
|
||||
raise ValueError("Website URL must be provided.")
|
||||
|
||||
website_url = validate_url(website_url)
|
||||
page = requests.get(
|
||||
website_url,
|
||||
timeout=15,
|
||||
|
||||
@@ -5,6 +5,8 @@ from typing import Any, Literal
|
||||
from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
|
||||
@@ -72,6 +74,7 @@ class ScrapflyScrapeWebsiteTool(BaseTool):
|
||||
) -> str | None:
|
||||
from scrapfly import ScrapeConfig
|
||||
|
||||
url = validate_url(url)
|
||||
scrape_config = scrape_config if scrape_config is not None else {}
|
||||
try:
|
||||
response = self.scrapfly.scrape( # type: ignore[union-attr]
|
||||
|
||||
@@ -5,6 +5,8 @@ from crewai.tools import BaseTool, EnvVar
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
|
||||
|
||||
class SerperScrapeWebsiteInput(BaseModel):
|
||||
"""Input schema for SerperScrapeWebsite."""
|
||||
@@ -42,6 +44,7 @@ class SerperScrapeWebsiteTool(BaseTool):
|
||||
Returns:
|
||||
Scraped website content as a string
|
||||
"""
|
||||
validate_url(url)
|
||||
try:
|
||||
# Serper API endpoint
|
||||
api_url = "https://scrape.serper.dev"
|
||||
|
||||
@@ -5,6 +5,7 @@ from crewai.tools import EnvVar
|
||||
from pydantic import BaseModel, Field
|
||||
import requests
|
||||
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
from crewai_tools.tools.rag.rag_tool import RagTool
|
||||
|
||||
|
||||
@@ -48,6 +49,7 @@ class SerplyWebpageToMarkdownTool(RagTool):
|
||||
if self.proxy_location and not self.headers.get("X-Proxy-Location"):
|
||||
self.headers["X-Proxy-Location"] = self.proxy_location
|
||||
|
||||
validate_url(url)
|
||||
data = {"url": url, "method": "GET", "response_type": "markdown"}
|
||||
response = requests.request(
|
||||
"POST",
|
||||
|
||||
@@ -7,6 +7,8 @@ from crewai.tools import BaseTool, EnvVar
|
||||
from crewai.utilities.types import LLMMessage
|
||||
from pydantic import BaseModel, Field, PrivateAttr, field_validator
|
||||
|
||||
from crewai_tools.security.safe_path import validate_file_path
|
||||
|
||||
|
||||
class ImagePromptSchema(BaseModel):
|
||||
"""Input for Vision Tool."""
|
||||
@@ -135,5 +137,6 @@ class VisionTool(BaseTool):
|
||||
Returns:
|
||||
Base64-encoded image data
|
||||
"""
|
||||
image_path = validate_file_path(image_path)
|
||||
with open(image_path, "rb") as image_file:
|
||||
return base64.b64encode(image_file.read()).decode()
|
||||
|
||||
@@ -3,6 +3,7 @@ from typing import Any
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai_tools.rag.data_types import DataType
|
||||
from crewai_tools.security.safe_path import validate_url
|
||||
from crewai_tools.tools.rag.rag_tool import RagTool
|
||||
|
||||
|
||||
@@ -37,6 +38,7 @@ class WebsiteSearchTool(RagTool):
|
||||
self._generate_description()
|
||||
|
||||
def add(self, website: str) -> None: # type: ignore[override]
|
||||
website = validate_url(website)
|
||||
super().add(website, data_type=DataType.WEBSITE)
|
||||
|
||||
def _run( # type: ignore[override]
|
||||
|
||||
@@ -1,205 +1,10 @@
|
||||
"""Path and URL validation utilities for crewai-tools.
|
||||
"""Backward-compatible re-export from crewai_tools.security.safe_path."""
|
||||
|
||||
Provides validation for file paths and URLs to prevent unauthorized
|
||||
file access and server-side request forgery (SSRF) when tools accept
|
||||
user-controlled or LLM-controlled inputs at runtime.
|
||||
|
||||
Set CREWAI_TOOLS_ALLOW_UNSAFE_PATHS=true to bypass validation (not
|
||||
recommended for production).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
from urllib.parse import urlparse
|
||||
from crewai_tools.security.safe_path import (
|
||||
validate_directory_path,
|
||||
validate_file_path,
|
||||
validate_url,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_UNSAFE_PATHS_ENV = "CREWAI_TOOLS_ALLOW_UNSAFE_PATHS"
|
||||
|
||||
|
||||
def _is_escape_hatch_enabled() -> bool:
|
||||
"""Check if the unsafe paths escape hatch is enabled."""
|
||||
return os.environ.get(_UNSAFE_PATHS_ENV, "").lower() in ("true", "1", "yes")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# File path validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def validate_file_path(path: str, base_dir: str | None = None) -> str:
|
||||
"""Validate that a file path is safe to read.
|
||||
|
||||
Resolves symlinks and ``..`` components, then checks that the resolved
|
||||
path falls within *base_dir* (defaults to the current working directory).
|
||||
|
||||
Args:
|
||||
path: The file path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory.
|
||||
"""
|
||||
if _is_escape_hatch_enabled():
|
||||
logger.warning(
|
||||
"%s is enabled — skipping file path validation for: %s",
|
||||
_UNSAFE_PATHS_ENV,
|
||||
path,
|
||||
)
|
||||
return os.path.realpath(path)
|
||||
|
||||
if base_dir is None:
|
||||
base_dir = os.getcwd()
|
||||
|
||||
resolved_base = os.path.realpath(base_dir)
|
||||
resolved_path = os.path.realpath(
|
||||
os.path.join(resolved_base, path) if not os.path.isabs(path) else path
|
||||
)
|
||||
|
||||
# Ensure the resolved path is within the base directory.
|
||||
# When resolved_base already ends with a separator (e.g. the filesystem
|
||||
# root "/"), appending os.sep would double it ("//"), so use the base
|
||||
# as-is in that case.
|
||||
prefix = resolved_base if resolved_base.endswith(os.sep) else resolved_base + os.sep
|
||||
if not resolved_path.startswith(prefix) and resolved_path != resolved_base:
|
||||
raise ValueError(
|
||||
f"Path '{path}' resolves to '{resolved_path}' which is outside "
|
||||
f"the allowed directory '{resolved_base}'. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass this check."
|
||||
)
|
||||
|
||||
return resolved_path
|
||||
|
||||
|
||||
def validate_directory_path(path: str, base_dir: str | None = None) -> str:
|
||||
"""Validate that a directory path is safe to read.
|
||||
|
||||
Same as :func:`validate_file_path` but also checks that the path
|
||||
is an existing directory.
|
||||
|
||||
Args:
|
||||
path: The directory path to validate.
|
||||
base_dir: Allowed root directory. Defaults to ``os.getcwd()``.
|
||||
|
||||
Returns:
|
||||
The resolved, validated absolute path.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path escapes the allowed directory or is not a directory.
|
||||
"""
|
||||
validated = validate_file_path(path, base_dir)
|
||||
if not os.path.isdir(validated):
|
||||
raise ValueError(f"Path '{validated}' is not a directory.")
|
||||
return validated
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# URL validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Private and reserved IP ranges that should not be accessed
|
||||
_BLOCKED_IPV4_NETWORKS = [
|
||||
ipaddress.ip_network("10.0.0.0/8"),
|
||||
ipaddress.ip_network("172.16.0.0/12"),
|
||||
ipaddress.ip_network("192.168.0.0/16"),
|
||||
ipaddress.ip_network("127.0.0.0/8"),
|
||||
ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata
|
||||
ipaddress.ip_network("0.0.0.0/32"),
|
||||
]
|
||||
|
||||
_BLOCKED_IPV6_NETWORKS = [
|
||||
ipaddress.ip_network("::1/128"),
|
||||
ipaddress.ip_network("::/128"),
|
||||
ipaddress.ip_network("fc00::/7"), # Unique local addresses
|
||||
ipaddress.ip_network("fe80::/10"), # Link-local IPv6
|
||||
]
|
||||
|
||||
|
||||
def _is_private_or_reserved(ip_str: str) -> bool:
|
||||
"""Check if an IP address is private, reserved, or otherwise unsafe."""
|
||||
try:
|
||||
addr = ipaddress.ip_address(ip_str)
|
||||
# Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4
|
||||
# so they are only checked against IPv4 networks (avoids TypeError when
|
||||
# an IPv4Address is compared against an IPv6Network).
|
||||
if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped:
|
||||
addr = addr.ipv4_mapped
|
||||
networks = (
|
||||
_BLOCKED_IPV4_NETWORKS
|
||||
if isinstance(addr, ipaddress.IPv4Address)
|
||||
else _BLOCKED_IPV6_NETWORKS
|
||||
)
|
||||
return any(addr in network for network in networks)
|
||||
except ValueError:
|
||||
return True # If we can't parse, block it
|
||||
|
||||
|
||||
def validate_url(url: str) -> str:
|
||||
"""Validate that a URL is safe to fetch.
|
||||
|
||||
Blocks ``file://`` scheme entirely. For ``http``/``https``, resolves
|
||||
DNS and checks that the target IP is not private or reserved (prevents
|
||||
SSRF to internal services and cloud metadata endpoints).
|
||||
|
||||
Args:
|
||||
url: The URL to validate.
|
||||
|
||||
Returns:
|
||||
The validated URL string.
|
||||
|
||||
Raises:
|
||||
ValueError: If the URL uses a blocked scheme or resolves to a
|
||||
private/reserved IP address.
|
||||
"""
|
||||
if _is_escape_hatch_enabled():
|
||||
logger.warning(
|
||||
"%s is enabled — skipping URL validation for: %s",
|
||||
_UNSAFE_PATHS_ENV,
|
||||
url,
|
||||
)
|
||||
return url
|
||||
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Block file:// scheme
|
||||
if parsed.scheme == "file":
|
||||
raise ValueError(
|
||||
f"file:// URLs are not allowed: '{url}'. "
|
||||
f"Use a file path instead, or set {_UNSAFE_PATHS_ENV}=true to bypass."
|
||||
)
|
||||
|
||||
# Only allow http and https
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError(
|
||||
f"URL scheme '{parsed.scheme}' is not allowed. Only http and https are supported."
|
||||
)
|
||||
|
||||
if not parsed.hostname:
|
||||
raise ValueError(f"URL has no hostname: '{url}'")
|
||||
|
||||
# Resolve DNS and check IPs
|
||||
try:
|
||||
addrinfos = socket.getaddrinfo(
|
||||
parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80)
|
||||
)
|
||||
except socket.gaierror as exc:
|
||||
raise ValueError(f"Could not resolve hostname: '{parsed.hostname}'") from exc
|
||||
|
||||
for _family, _, _, _, sockaddr in addrinfos:
|
||||
ip_str = str(sockaddr[0])
|
||||
if _is_private_or_reserved(ip_str):
|
||||
raise ValueError(
|
||||
f"URL '{url}' resolves to private/reserved IP {ip_str}. "
|
||||
f"Access to internal networks is not allowed. "
|
||||
f"Set {_UNSAFE_PATHS_ENV}=true to bypass."
|
||||
)
|
||||
|
||||
return url
|
||||
__all__ = ["validate_directory_path", "validate_file_path", "validate_url"]
|
||||
|
||||
@@ -6,7 +6,7 @@ import os
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai_tools.utilities.safe_path import (
|
||||
from crewai_tools.security.safe_path import (
|
||||
validate_directory_path,
|
||||
validate_file_path,
|
||||
validate_url,
|
||||
|
||||
Reference in New Issue
Block a user