Fix symlink path traversal in skill cache extraction

`_safe_extractall` in `crewai.experimental.skills.cache` (the Python < 3.12
fallback used by `SkillCacheManager.store` to unpack registry-downloaded skill
tarballs into `~/.crewai/skills/`) validated each member's *name* against the
destination but never validated symlink/hardlink *targets*. A malicious skill
archive could plant a symlink escaping the destination (e.g.
`link -> /home/user/.ssh`) followed by a regular member written through it
(`link/authorized_keys`), escaping `dest` even though every member name
resolves inside it — the classic symlink-extraction traversal.

This is the same bug fixed in 32d2da1 for the CLI's `_safe_extractall`
(`crewai_cli.experimental.skills.main`); the duplicate copy in the core
library's skill cache was missed. The 3.12+ path
(`extractall(..., filter="data")`) already blocks this; the fallback now
mirrors it by rejecting absolute link targets and any link target that
resolves outside the destination directory.

Adds regression tests covering absolute and relative escaping symlinks plus
benign in-tree symlinks and ordinary archives.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Rip&Tear
2026-06-19 13:11:15 +08:00
parent 32d2da1bfb
commit 589e590dd7
2 changed files with 136 additions and 1 deletions

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import tarfile
from typing import TypedDict
@@ -127,12 +128,38 @@ class SkillCacheManager:
def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
"""Path-traversal-safe extraction for Python < 3.12."""
"""Path-traversal-safe extraction for Python < 3.12.
Validates both the member's own path and, for symlink/hardlink members,
the link target. Without the link-target check a malicious archive can
plant a symlink that escapes ``dest`` (e.g. ``link -> /home/user/.ssh``)
followed by a regular member written *through* that link
(``link/authorized_keys``), escaping ``dest`` even though every member
name resolves inside it. This mirrors the protection that
``tarfile.extractall(..., filter="data")`` provides on Python >= 3.12.
"""
dest_resolved = dest.resolve()
for member in tf.getmembers():
member_path = (dest / member.name).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member.name!r}")
if member.issym() or member.islnk():
link_target = member.linkname
# Absolute link targets always escape the destination.
if os.path.isabs(link_target):
raise ValueError(
f"Blocked link target escaping destination: "
f"{member.name!r} -> {link_target!r}"
)
# Hardlink names are relative to the archive root; symlink
# targets are relative to the member's own directory.
anchor = dest if member.islnk() else (dest / member.name).parent
resolved_target = (anchor / link_target).resolve()
if not resolved_target.is_relative_to(dest_resolved):
raise ValueError(
f"Blocked link target escaping destination: "
f"{member.name!r} -> {link_target!r}"
)
tf.extractall(dest) # noqa: S202

View File

@@ -0,0 +1,108 @@
"""Regression tests for path-traversal-safe archive extraction in the cache.
Guards against symlink/hardlink-based path traversal in the Python < 3.12
extraction fallback (`_safe_extractall`) used by `SkillCacheManager.store`.
The 3.12+ path relies on `tarfile.extractall(..., filter="data")`; the
fallback must provide the same protection by validating link targets, not
just member names.
"""
from __future__ import annotations
import io
import tarfile
from pathlib import Path
import pytest
from crewai.experimental.skills.cache import _safe_extractall
def _tar_from_members(build) -> tarfile.TarFile:
"""Build an in-memory tar archive via `build(tf)` and return it for reading."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
build(tf)
buf.seek(0)
return tarfile.open(fileobj=buf, mode="r")
def test_blocks_symlink_escaping_destination(tmp_path: Path) -> None:
"""A symlink whose target escapes dest, plus a file written through it,
must be rejected before anything is extracted."""
outside = tmp_path / "outside"
outside.mkdir()
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
link = tarfile.TarInfo("link")
link.type = tarfile.SYMTYPE
link.linkname = str(outside) # absolute path outside dest
tf.addfile(link)
payload = b"pwned"
info = tarfile.TarInfo("link/evil.txt")
info.size = len(payload)
tf.addfile(info, io.BytesIO(payload))
with _tar_from_members(build) as tf:
with pytest.raises(ValueError, match="escaping destination"):
_safe_extractall(tf, dest)
assert not (outside / "evil.txt").exists()
def test_blocks_relative_symlink_escaping_destination(tmp_path: Path) -> None:
"""A relative symlink (../..) that escapes dest is also rejected."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
link = tarfile.TarInfo("sub/link")
link.type = tarfile.SYMTYPE
link.linkname = "../../outside" # escapes dest from sub/
tf.addfile(link)
with _tar_from_members(build) as tf:
with pytest.raises(ValueError, match="escaping destination"):
_safe_extractall(tf, dest)
def test_allows_benign_relative_symlink(tmp_path: Path) -> None:
"""A symlink that stays within dest is permitted."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
payload = b"hi"
info = tarfile.TarInfo("real.txt")
info.size = len(payload)
tf.addfile(info, io.BytesIO(payload))
link = tarfile.TarInfo("alias.txt")
link.type = tarfile.SYMTYPE
link.linkname = "real.txt" # stays inside dest
tf.addfile(link)
with _tar_from_members(build) as tf:
_safe_extractall(tf, dest)
assert (dest / "real.txt").read_bytes() == b"hi"
def test_allows_benign_archive(tmp_path: Path) -> None:
"""An ordinary archive of regular files extracts correctly."""
dest = tmp_path / "dest"
dest.mkdir()
def build(tf: tarfile.TarFile) -> None:
for name, body in (("SKILL.md", b"# skill"), ("scripts/run.py", b"print(1)")):
payload = body
info = tarfile.TarInfo(name)
info.size = len(payload)
tf.addfile(info, io.BytesIO(payload))
with _tar_from_members(build) as tf:
_safe_extractall(tf, dest)
assert (dest / "SKILL.md").read_bytes() == b"# skill"
assert (dest / "scripts" / "run.py").read_bytes() == b"print(1)"