From ce4a730f76ffdd39b211abb85b7dadf2c5c6ae77 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 29 Dec 2024 03:23:29 +0000 Subject: [PATCH] fix: add security validation for output_file paths Co-Authored-By: Joe Moura --- src/crewai/task.py | 110 +++++++++++++++++++++++++++++++++++++++------ tests/task_test.py | 22 +++++++++ 2 files changed, 119 insertions(+), 13 deletions(-) diff --git a/src/crewai/task.py b/src/crewai/task.py index 4817419a4..64291a0af 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -214,12 +214,43 @@ class Task(BaseModel): @field_validator("output_file") @classmethod def output_file_validation(cls, value: Optional[str]) -> Optional[str]: - """Validate the output file path.""" + """Validate the output file path. + + Args: + value: The output file path to validate. Can be None or a string. + If the path contains template variables (e.g. {var}), leading slashes are preserved. + For regular paths, leading slashes are stripped. + + Returns: + The validated and potentially modified path, or None if no path was provided. + + Raises: + ValueError: If the path contains invalid characters, path traversal attempts, + or other security concerns. + """ if value is None: return None + + # Basic security checks + if ".." in value: + raise ValueError("Path traversal attempts are not allowed in output_file paths") + + if any(char in value for char in ['|', '>', '<', '&', ';', '$']): + raise ValueError("Shell special characters are not allowed in output_file paths") + + # Check for absolute paths that might bypass restrictions + if value.startswith('~') or value.startswith('$'): + raise ValueError("Shell expansion characters are not allowed in output_file paths") + # Don't strip leading slash if it's a template path with variables - if "{" in value or "}" in value: + if "{" in value or "}" in value: + # Validate template variable format + template_vars = [part.split("}")[0] for part in value.split("{")[1:]] + for var in template_vars: + if not var.isidentifier(): + raise ValueError(f"Invalid template variable name: {var}") return value + # Strip leading slash for regular paths if value.startswith("/"): return value[1:] @@ -399,33 +430,86 @@ class Task(BaseModel): tasks_slices = [self.description, output] return "\n".join(tasks_slices) - def interpolate_inputs(self, inputs: Dict[str, Any]) -> None: - """Interpolate inputs into the task description, expected output, and output file path.""" + _original_output_file: Optional[str] = None + + def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None: + """Interpolate inputs into the task description, expected output, and output file path. + + Args: + inputs: Dictionary mapping template variables to their values. + Supported value types are strings, integers, and floats. + + Raises: + ValueError: If a required template variable is missing from inputs. + """ if self._original_description is None: self._original_description = self.description if self._original_expected_output is None: self._original_expected_output = self.expected_output - if self.output_file is not None and not hasattr(self, "_original_output_file"): + if self.output_file is not None and self._original_output_file is None: self._original_output_file = self.output_file - if inputs: + if not inputs: + return + + try: self.description = self._original_description.format(**inputs) + except KeyError as e: + raise ValueError(f"Missing required template variable '{e.args[0]}' in description") from e + except ValueError as e: + raise ValueError(f"Error interpolating description: {str(e)}") from e + + try: self.expected_output = self.interpolate_only( input_string=self._original_expected_output, inputs=inputs ) - if self.output_file is not None: + except (KeyError, ValueError) as e: + raise ValueError(f"Error interpolating expected_output: {str(e)}") from e + + if self.output_file is not None: + try: self.output_file = self.interpolate_only( input_string=self._original_output_file, inputs=inputs ) + except (KeyError, ValueError) as e: + raise ValueError(f"Error interpolating output_file path: {str(e)}") from e - def interpolate_only(self, input_string: str, inputs: Dict[str, Any]) -> str: - """Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.""" - escaped_string = input_string.replace("{", "{{").replace("}", "}}") + def interpolate_only(self, input_string: str, inputs: Dict[str, Union[str, int, float]]) -> str: + """Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched. + + Args: + input_string: The string containing template variables to interpolate. + inputs: Dictionary mapping template variables to their values. + Supported value types are strings, integers, and floats. + + Returns: + The interpolated string with all template variables replaced with their values. + + Raises: + ValueError: If a required template variable is missing from inputs. + KeyError: If a template variable is not found in the inputs dictionary. + """ + if not input_string: + raise ValueError("Input string cannot be empty") + if not inputs: + raise ValueError("Inputs dictionary cannot be empty") - for key in inputs.keys(): - escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}") + try: + # Validate input types + for key, value in inputs.items(): + if not isinstance(value, (str, int, float)): + raise ValueError(f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}") - return escaped_string.format(**inputs) + escaped_string = input_string.replace("{", "{{").replace("}", "}}") + + for key in inputs.keys(): + escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}") + + return escaped_string.format(**inputs) + except KeyError as e: + raise KeyError(f"Template variable '{e.args[0]}' not found in inputs dictionary") from e + except ValueError as e: + raise ValueError(f"Error during string interpolation: {str(e)}") from e def increment_tools_errors(self) -> None: """Increment the tools errors counter.""" diff --git a/tests/task_test.py b/tests/task_test.py index e9edf927f..198f53f3f 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -875,3 +875,25 @@ def test_key(): assert ( task.key == hash ), "The key should be the hash of the non-interpolated description." + + +def test_output_file_validation(): + """Test output file path validation.""" + # Valid paths + assert Task(output_file="output.txt").output_file == "output.txt" + assert Task(output_file="/tmp/output.txt").output_file == "tmp/output.txt" + assert Task(output_file="{dir}/output_{date}.txt").output_file == "{dir}/output_{date}.txt" + + # Invalid paths + with pytest.raises(ValueError, match="Path traversal"): + Task(output_file="../output.txt") + with pytest.raises(ValueError, match="Path traversal"): + Task(output_file="folder/../output.txt") + with pytest.raises(ValueError, match="Shell special characters"): + Task(output_file="output.txt | rm -rf /") + with pytest.raises(ValueError, match="Shell expansion"): + Task(output_file="~/output.txt") + with pytest.raises(ValueError, match="Shell expansion"): + Task(output_file="$HOME/output.txt") + with pytest.raises(ValueError, match="Invalid template variable"): + Task(output_file="{invalid-name}/output.txt")