From 774a3b8ecdd68fce34fb2eb132c3a2f7f9df7aa1 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 7 Apr 2026 00:49:14 -0700 Subject: [PATCH] fix: address three review comments on path/URL validation - safe_path._is_private_or_reserved: after unwrapping IPv4-mapped IPv6 to IPv4, only check against IPv4 networks to avoid TypeError when comparing an IPv4Address against IPv6Network objects. - safe_path.validate_file_path: handle filesystem-root base_dir ('/') by not appending os.sep when the base already ends with a separator, preventing the '//'-prefix bug. - rag_tool.add: path-detection heuristic now checks for both '/' and os.sep so forward-slash paths are caught on Windows as well as Unix. Co-Authored-By: Claude Sonnet 4.6 --- .../src/crewai_tools/tools/rag/rag_tool.py | 5 ++- .../src/crewai_tools/utilities/safe_path.py | 33 ++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py b/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py index ace1c4bec..eb7e9cefd 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/rag/rag_tool.py @@ -288,9 +288,12 @@ class RagTool(BaseTool): validated_args.append(arg) continue - # Check if it looks like a file path (not a plain text string) + # Check if it looks like a file path (not a plain text string). + # Check both os.sep (backslash on Windows) and "/" so that + # forward-slash paths like "sub/file.txt" are caught on all platforms. if ( os.path.sep in source_ref + or "/" in source_ref or source_ref.startswith(".") or os.path.isabs(source_ref) ): diff --git a/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py b/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py index 6d059c086..254ac82e9 100644 --- a/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py +++ b/lib/crewai-tools/src/crewai_tools/utilities/safe_path.py @@ -64,11 +64,16 @@ def validate_file_path(path: str, base_dir: str | None = None) -> str: os.path.join(resolved_base, path) if not os.path.isabs(path) else path ) - # Ensure the resolved path is within the base directory - if ( - not resolved_path.startswith(resolved_base + os.sep) - and resolved_path != resolved_base - ): + # Ensure the resolved path is within the base directory. + # When resolved_base already ends with a separator (e.g. the filesystem + # root "/"), appending os.sep would double it ("//"), so use the base + # as-is in that case. + prefix = ( + resolved_base + if resolved_base.endswith(os.sep) + else resolved_base + os.sep + ) + if not resolved_path.startswith(prefix) and resolved_path != resolved_base: raise ValueError( f"Path '{path}' resolves to '{resolved_path}' which is outside " f"the allowed directory '{resolved_base}'. " @@ -105,28 +110,40 @@ def validate_directory_path(path: str, base_dir: str | None = None) -> str: # --------------------------------------------------------------------------- # Private and reserved IP ranges that should not be accessed -_BLOCKED_NETWORKS = [ +_BLOCKED_IPV4_NETWORKS = [ ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("169.254.0.0/16"), # Link-local / cloud metadata ipaddress.ip_network("0.0.0.0/32"), +] + +_BLOCKED_IPV6_NETWORKS = [ ipaddress.ip_network("::1/128"), ipaddress.ip_network("::/128"), ipaddress.ip_network("fc00::/7"), # Unique local addresses ipaddress.ip_network("fe80::/10"), # Link-local IPv6 ] +_BLOCKED_NETWORKS = _BLOCKED_IPV4_NETWORKS + _BLOCKED_IPV6_NETWORKS + def _is_private_or_reserved(ip_str: str) -> bool: """Check if an IP address is private, reserved, or otherwise unsafe.""" try: addr = ipaddress.ip_address(ip_str) - # Check IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) + # Unwrap IPv4-mapped IPv6 addresses (e.g., ::ffff:127.0.0.1) to IPv4 + # so they are only checked against IPv4 networks (avoids TypeError when + # an IPv4Address is compared against an IPv6Network). if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped: addr = addr.ipv4_mapped - return any(addr in network for network in _BLOCKED_NETWORKS) + networks = ( + _BLOCKED_IPV4_NETWORKS + if isinstance(addr, ipaddress.IPv4Address) + else _BLOCKED_IPV6_NETWORKS + ) + return any(addr in network for network in networks) except ValueError: return True # If we can't parse, block it