Harden skill cache archive extraction

This commit is contained in:
Rip&Tear
2026-06-24 13:47:06 +08:00
parent 6c2e4410ec
commit 580878dbef
4 changed files with 125 additions and 6 deletions

View File

@@ -378,7 +378,7 @@ class SkillCommand(BaseCommand, PlusAPIMixin):
def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
"""Path-traversal-safe extraction for Python < 3.12.
"""Path-traversal-safe extraction for Python versions without tar filters.
Validates both the member's own path and, for symlink/hardlink members,
the link target. Without the link-target check a malicious archive can
@@ -386,7 +386,7 @@ def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
followed by a regular member written *through* that link
(``link/authorized_keys``), escaping ``dest`` even though every member
name resolves inside it. This mirrors the protection that
``tarfile.extractall(..., filter="data")`` provides on Python >= 3.12.
``tarfile.extractall(..., filter="data")`` provides when available.
"""
dest_resolved = dest.resolve()
for member in tf.getmembers():

View File

@@ -1,7 +1,7 @@
"""Regression tests for path-traversal-safe archive extraction.
Guards against symlink/hardlink-based path traversal in the Python < 3.12
extraction fallback (`_safe_extractall`). The 3.12+ path relies on
Guards against symlink/hardlink-based path traversal in the fallback used on
Python versions without tarfile extraction filters. The filtered path relies on
`tarfile.extractall(..., filter="data")`; the fallback must provide the same
protection by validating link targets, not just member names.
"""
@@ -67,6 +67,22 @@ def test_blocks_relative_symlink_escaping_destination(tmp_path: Path) -> None:
_safe_extractall(tf, dest)
def test_blocks_hardlink_escaping_destination(tmp_path: Path) -> None:
"""A hardlink whose target escapes dest is rejected."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
link = tarfile.TarInfo("escape")
link.type = tarfile.LNKTYPE
link.linkname = "../outside.txt" # escapes archive root
tf.addfile(link)
with _tar_from_members(build) as tf:
with pytest.raises(ValueError, match="escaping destination"):
_safe_extractall(tf, dest)
def test_allows_benign_relative_symlink(tmp_path: Path) -> None:
"""A symlink that stays within dest is permitted."""
dest = tmp_path / "dest"
@@ -86,6 +102,8 @@ def test_allows_benign_relative_symlink(tmp_path: Path) -> None:
_safe_extractall(tf, dest)
assert (dest / "real.txt").read_bytes() == b"hi"
assert (dest / "alias.txt").is_symlink()
assert (dest / "alias.txt").readlink() == Path("real.txt")
def test_allows_benign_archive(tmp_path: Path) -> None:

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import tarfile
from typing import TypedDict
@@ -127,12 +128,34 @@ class SkillCacheManager:
def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
"""Path-traversal-safe extraction for Python < 3.12."""
"""Path-traversal-safe extraction for Python versions without tar filters.
Validates both the member's own path and, for symlink/hardlink members,
the link target. Without the link-target check a malicious archive can
plant a symlink that escapes ``dest`` followed by a regular member written
through that link, escaping ``dest`` even though every member name resolves
inside it. This mirrors the protection that
``tarfile.extractall(..., filter="data")`` provides when available.
"""
dest_resolved = dest.resolve()
for member in tf.getmembers():
member_path = (dest / member.name).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member.name!r}")
if member.issym() or member.islnk():
link_target = member.linkname
if os.path.isabs(link_target):
raise ValueError(
f"Blocked link target escaping destination: "
f"{member.name!r} -> {link_target!r}"
)
anchor = dest if member.islnk() else (dest / member.name).parent
resolved_target = (anchor / link_target).resolve()
if not resolved_target.is_relative_to(dest_resolved):
raise ValueError(
f"Blocked link target escaping destination: "
f"{member.name!r} -> {link_target!r}"
)
tf.extractall(dest) # noqa: S202

View File

@@ -8,7 +8,9 @@ import json
import tarfile
from pathlib import Path
from crewai.experimental.skills.cache import SkillCacheManager
import pytest
from crewai.experimental.skills.cache import SkillCacheManager, _safe_extractall
def _make_tar_gz(files: dict[str, str]) -> bytes:
@@ -35,6 +37,15 @@ def _make_tar_gz(files: dict[str, str]) -> bytes:
return out.getvalue()
def _tar_from_members(build) -> tarfile.TarFile:
"""Build an in-memory tar archive via `build(tf)` and return it for reading."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
build(tf)
buf.seek(0)
return tarfile.open(fileobj=buf, mode="r")
class TestSkillCacheManager:
def test_get_cached_path_missing(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
@@ -113,3 +124,70 @@ class TestSkillCacheManager:
dest = cache.store("acme", "my-skill", None, archive)
meta = json.loads((dest / ".crewai_meta.json").read_text())
assert meta["version"] is None
def test_safe_extractall_blocks_symlink_escaping_cache_destination(
tmp_path: Path,
) -> None:
"""A symlink whose target escapes dest is rejected before extraction."""
outside = tmp_path / "outside"
outside.mkdir()
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
link = tarfile.TarInfo("link")
link.type = tarfile.SYMTYPE
link.linkname = str(outside)
tf.addfile(link)
payload = b"pwned"
info = tarfile.TarInfo("link/evil.txt")
info.size = len(payload)
tf.addfile(info, io.BytesIO(payload))
with _tar_from_members(build) as tf:
with pytest.raises(ValueError, match="escaping destination"):
_safe_extractall(tf, dest)
assert not (outside / "evil.txt").exists()
def test_safe_extractall_blocks_hardlink_escaping_cache_destination(
tmp_path: Path,
) -> None:
"""A hardlink whose target escapes dest is rejected."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
link = tarfile.TarInfo("escape")
link.type = tarfile.LNKTYPE
link.linkname = "../outside.txt"
tf.addfile(link)
with _tar_from_members(build) as tf:
with pytest.raises(ValueError, match="escaping destination"):
_safe_extractall(tf, dest)
def test_safe_extractall_allows_benign_cache_symlink(tmp_path: Path) -> None:
"""A symlink that stays within dest is permitted."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
payload = b"hi"
info = tarfile.TarInfo("real.txt")
info.size = len(payload)
tf.addfile(info, io.BytesIO(payload))
link = tarfile.TarInfo("alias.txt")
link.type = tarfile.SYMTYPE
link.linkname = "real.txt"
tf.addfile(link)
with _tar_from_members(build) as tf:
_safe_extractall(tf, dest)
assert (dest / "real.txt").read_bytes() == b"hi"
assert (dest / "alias.txt").is_symlink()
assert (dest / "alias.txt").readlink() == Path("real.txt")