fix: re-validate interpolated output_file against untrusted inputs

The output_file field validator accepts {var} templates unchecked, and the
concrete path produced by interpolate_inputs_and_add_conversation_history was
assigned without re-validation. An untrusted crew.kickoff(inputs=...) value
could inject '..', an absolute path, or ~/$ expansion into a templated
output_file and write outside the working directory.

Validate the interpolated variable values (only those appearing in the
output_file template) for traversal, absolute paths, shell expansion, and
shell metacharacters before interpolation. The developer-authored template
(including an absolute base directory) stays trusted, so legitimate templated
paths are unaffected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Rip&Tear
2026-07-03 00:36:41 +08:00
parent e8dced8a2d
commit 246a8b37e0
2 changed files with 115 additions and 8 deletions

View File

@@ -10,7 +10,7 @@ from hashlib import md5
import inspect
import json
import logging
from pathlib import Path
from pathlib import Path, PurePosixPath, PureWindowsPath
import threading
from typing import (
Annotated,
@@ -504,6 +504,29 @@ class Task(BaseModel):
if value is None:
return None
if "{" in value or "}" in value:
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
if not var.isidentifier():
raise ValueError(f"Invalid template variable name: {var}")
# Literal portions are still checked here; the fully interpolated
# path is re-validated at runtime (see
# interpolate_inputs_and_add_conversation_history) because template
# variables may be filled from untrusted kickoff inputs.
cls._sanitize_output_file_path(value)
return value
return cls._sanitize_output_file_path(value)
@staticmethod
def _sanitize_output_file_path(value: str) -> str:
"""Enforce path-safety on an ``output_file`` value.
Shared by the field validator's literal and template branches. Rejects
traversal sequences, shell expansion, and shell metacharacters, and
strips a leading ``/`` so a literal path stays relative to the working
directory.
"""
if ".." in value:
raise ValueError(
"Path traversal attempts are not allowed in output_file paths"
@@ -519,17 +542,53 @@ class Task(BaseModel):
"Shell special characters are not allowed in output_file paths"
)
if "{" in value or "}" in value:
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
if not var.isidentifier():
raise ValueError(f"Invalid template variable name: {var}")
return value
if value.startswith("/"):
return value[1:]
return value
def _validate_output_file_input_values(
self, inputs: dict[str, str | int | float | dict[str, Any] | list[Any]]
) -> None:
"""Reject untrusted input values that would escape the output path.
Only the variables that actually appear in the ``output_file`` template
are checked. The developer-authored template is trusted (it may contain
an absolute base directory), but a value substituted into it must not
introduce path traversal (``..``), an absolute path, a home/variable
expansion (``~``/``$``), or shell metacharacters that would redirect the
write outside the intended location.
"""
if not self._original_output_file:
return
template_vars = [
part.split("}")[0] for part in self._original_output_file.split("{")[1:]
]
for var in template_vars:
if var not in inputs:
continue
value = str(inputs[var])
if ".." in value:
raise ValueError(
f"Invalid value for output_file variable '{var}': path "
"traversal sequences ('..') are not allowed"
)
if value.startswith(("~", "$")):
raise ValueError(
f"Invalid value for output_file variable '{var}': shell "
"expansion characters are not allowed"
)
if any(char in value for char in ["|", ">", "<", "&", ";"]):
raise ValueError(
f"Invalid value for output_file variable '{var}': shell "
"special characters are not allowed"
)
if PurePosixPath(value).is_absolute() or PureWindowsPath(value).is_absolute():
raise ValueError(
f"Invalid value for output_file variable '{var}': absolute "
"paths are not allowed"
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> Task:
"""Set attributes based on the agent configuration."""
@@ -1026,6 +1085,12 @@ Follow these guidelines:
raise ValueError(f"Error interpolating expected_output: {e!s}") from e
if self.output_file is not None:
# Values interpolated into the output path may come from untrusted
# kickoff inputs. The developer-authored template (including any
# absolute base directory) is trusted, but an injected value must
# not introduce path traversal, an absolute path, or shell
# expansion that would escape the intended location.
self._validate_output_file_input_values(inputs)
try:
self.output_file = interpolate_only(
input_string=self._original_output_file, inputs=inputs

View File

@@ -931,6 +931,48 @@ def test_interpolate_inputs(tmp_path):
assert task.output_file == str(tmp_path / "ML" / "output_2025.txt")
@pytest.mark.parametrize(
("template", "malicious_inputs", "expected_error"),
[
("reports/{name}.md", {"name": "../../../../tmp/pwn"}, "path traversal"),
("{p}", {"p": "/tmp/abs_pwn"}, "absolute paths"),
("{p}", {"p": "~/.bashrc"}, "shell expansion"),
("{p}", {"p": "x;rm -rf /"}, "shell special characters"),
("{p}", {"p": r"C:\Windows\evil"}, "absolute paths"),
],
)
def test_interpolate_output_file_rejects_unsafe_inputs(
template, malicious_inputs, expected_error
):
"""Untrusted inputs must not escape the output_file path via interpolation."""
task = Task(
description="do {p} {name}".replace("{p}", "x").replace("{name}", "x"),
expected_output="e",
output_file=template,
)
with pytest.raises(ValueError, match=expected_error):
task.interpolate_inputs_and_add_conversation_history(inputs=malicious_inputs)
def test_interpolate_output_file_allows_safe_inputs(tmp_path):
"""Safe input values and developer-chosen absolute base paths still work."""
task = Task(
description="d",
expected_output="e",
output_file="reports/{name}.md",
)
task.interpolate_inputs_and_add_conversation_history(inputs={"name": "q3_summary"})
assert task.output_file == "reports/q3_summary.md"
abs_task = Task(
description="d",
expected_output="e",
output_file=str(tmp_path / "{topic}" / "out.md"),
)
abs_task.interpolate_inputs_and_add_conversation_history(inputs={"topic": "sales"})
assert abs_task.output_file == str(tmp_path / "sales" / "out.md")
def test_interpolate_only():
"""Test the interpolate_only method for various scenarios including JSON structure preservation."""