mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-23 23:28:15 +00:00
fix: add security validation for output_file paths
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -214,12 +214,43 @@ class Task(BaseModel):
|
|||||||
@field_validator("output_file")
|
@field_validator("output_file")
|
||||||
@classmethod
|
@classmethod
|
||||||
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
|
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
|
||||||
"""Validate the output file path."""
|
"""Validate the output file path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: The output file path to validate. Can be None or a string.
|
||||||
|
If the path contains template variables (e.g. {var}), leading slashes are preserved.
|
||||||
|
For regular paths, leading slashes are stripped.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The validated and potentially modified path, or None if no path was provided.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If the path contains invalid characters, path traversal attempts,
|
||||||
|
or other security concerns.
|
||||||
|
"""
|
||||||
if value is None:
|
if value is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Basic security checks
|
||||||
|
if ".." in value:
|
||||||
|
raise ValueError("Path traversal attempts are not allowed in output_file paths")
|
||||||
|
|
||||||
|
if any(char in value for char in ['|', '>', '<', '&', ';', '$']):
|
||||||
|
raise ValueError("Shell special characters are not allowed in output_file paths")
|
||||||
|
|
||||||
|
# Check for absolute paths that might bypass restrictions
|
||||||
|
if value.startswith('~') or value.startswith('$'):
|
||||||
|
raise ValueError("Shell expansion characters are not allowed in output_file paths")
|
||||||
|
|
||||||
# Don't strip leading slash if it's a template path with variables
|
# Don't strip leading slash if it's a template path with variables
|
||||||
if "{" in value or "}" in value:
|
if "{" in value or "}" in value:
|
||||||
|
# Validate template variable format
|
||||||
|
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
|
||||||
|
for var in template_vars:
|
||||||
|
if not var.isidentifier():
|
||||||
|
raise ValueError(f"Invalid template variable name: {var}")
|
||||||
return value
|
return value
|
||||||
|
|
||||||
# Strip leading slash for regular paths
|
# Strip leading slash for regular paths
|
||||||
if value.startswith("/"):
|
if value.startswith("/"):
|
||||||
return value[1:]
|
return value[1:]
|
||||||
@@ -399,33 +430,86 @@ class Task(BaseModel):
|
|||||||
tasks_slices = [self.description, output]
|
tasks_slices = [self.description, output]
|
||||||
return "\n".join(tasks_slices)
|
return "\n".join(tasks_slices)
|
||||||
|
|
||||||
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
_original_output_file: Optional[str] = None
|
||||||
"""Interpolate inputs into the task description, expected output, and output file path."""
|
|
||||||
|
def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None:
|
||||||
|
"""Interpolate inputs into the task description, expected output, and output file path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Dictionary mapping template variables to their values.
|
||||||
|
Supported value types are strings, integers, and floats.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If a required template variable is missing from inputs.
|
||||||
|
"""
|
||||||
if self._original_description is None:
|
if self._original_description is None:
|
||||||
self._original_description = self.description
|
self._original_description = self.description
|
||||||
if self._original_expected_output is None:
|
if self._original_expected_output is None:
|
||||||
self._original_expected_output = self.expected_output
|
self._original_expected_output = self.expected_output
|
||||||
if self.output_file is not None and not hasattr(self, "_original_output_file"):
|
if self.output_file is not None and self._original_output_file is None:
|
||||||
self._original_output_file = self.output_file
|
self._original_output_file = self.output_file
|
||||||
|
|
||||||
if inputs:
|
if not inputs:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
self.description = self._original_description.format(**inputs)
|
self.description = self._original_description.format(**inputs)
|
||||||
|
except KeyError as e:
|
||||||
|
raise ValueError(f"Missing required template variable '{e.args[0]}' in description") from e
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Error interpolating description: {str(e)}") from e
|
||||||
|
|
||||||
|
try:
|
||||||
self.expected_output = self.interpolate_only(
|
self.expected_output = self.interpolate_only(
|
||||||
input_string=self._original_expected_output, inputs=inputs
|
input_string=self._original_expected_output, inputs=inputs
|
||||||
)
|
)
|
||||||
if self.output_file is not None:
|
except (KeyError, ValueError) as e:
|
||||||
|
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
|
||||||
|
|
||||||
|
if self.output_file is not None:
|
||||||
|
try:
|
||||||
self.output_file = self.interpolate_only(
|
self.output_file = self.interpolate_only(
|
||||||
input_string=self._original_output_file, inputs=inputs
|
input_string=self._original_output_file, inputs=inputs
|
||||||
)
|
)
|
||||||
|
except (KeyError, ValueError) as e:
|
||||||
|
raise ValueError(f"Error interpolating output_file path: {str(e)}") from e
|
||||||
|
|
||||||
def interpolate_only(self, input_string: str, inputs: Dict[str, Any]) -> str:
|
def interpolate_only(self, input_string: str, inputs: Dict[str, Union[str, int, float]]) -> str:
|
||||||
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched."""
|
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
|
||||||
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
|
||||||
|
|
||||||
for key in inputs.keys():
|
Args:
|
||||||
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
input_string: The string containing template variables to interpolate.
|
||||||
|
inputs: Dictionary mapping template variables to their values.
|
||||||
|
Supported value types are strings, integers, and floats.
|
||||||
|
|
||||||
return escaped_string.format(**inputs)
|
Returns:
|
||||||
|
The interpolated string with all template variables replaced with their values.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If a required template variable is missing from inputs.
|
||||||
|
KeyError: If a template variable is not found in the inputs dictionary.
|
||||||
|
"""
|
||||||
|
if not input_string:
|
||||||
|
raise ValueError("Input string cannot be empty")
|
||||||
|
if not inputs:
|
||||||
|
raise ValueError("Inputs dictionary cannot be empty")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Validate input types
|
||||||
|
for key, value in inputs.items():
|
||||||
|
if not isinstance(value, (str, int, float)):
|
||||||
|
raise ValueError(f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}")
|
||||||
|
|
||||||
|
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
||||||
|
|
||||||
|
for key in inputs.keys():
|
||||||
|
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
||||||
|
|
||||||
|
return escaped_string.format(**inputs)
|
||||||
|
except KeyError as e:
|
||||||
|
raise KeyError(f"Template variable '{e.args[0]}' not found in inputs dictionary") from e
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError(f"Error during string interpolation: {str(e)}") from e
|
||||||
|
|
||||||
def increment_tools_errors(self) -> None:
|
def increment_tools_errors(self) -> None:
|
||||||
"""Increment the tools errors counter."""
|
"""Increment the tools errors counter."""
|
||||||
|
|||||||
@@ -875,3 +875,25 @@ def test_key():
|
|||||||
assert (
|
assert (
|
||||||
task.key == hash
|
task.key == hash
|
||||||
), "The key should be the hash of the non-interpolated description."
|
), "The key should be the hash of the non-interpolated description."
|
||||||
|
|
||||||
|
|
||||||
|
def test_output_file_validation():
|
||||||
|
"""Test output file path validation."""
|
||||||
|
# Valid paths
|
||||||
|
assert Task(output_file="output.txt").output_file == "output.txt"
|
||||||
|
assert Task(output_file="/tmp/output.txt").output_file == "tmp/output.txt"
|
||||||
|
assert Task(output_file="{dir}/output_{date}.txt").output_file == "{dir}/output_{date}.txt"
|
||||||
|
|
||||||
|
# Invalid paths
|
||||||
|
with pytest.raises(ValueError, match="Path traversal"):
|
||||||
|
Task(output_file="../output.txt")
|
||||||
|
with pytest.raises(ValueError, match="Path traversal"):
|
||||||
|
Task(output_file="folder/../output.txt")
|
||||||
|
with pytest.raises(ValueError, match="Shell special characters"):
|
||||||
|
Task(output_file="output.txt | rm -rf /")
|
||||||
|
with pytest.raises(ValueError, match="Shell expansion"):
|
||||||
|
Task(output_file="~/output.txt")
|
||||||
|
with pytest.raises(ValueError, match="Shell expansion"):
|
||||||
|
Task(output_file="$HOME/output.txt")
|
||||||
|
with pytest.raises(ValueError, match="Invalid template variable"):
|
||||||
|
Task(output_file="{invalid-name}/output.txt")
|
||||||
|
|||||||
Reference in New Issue
Block a user