From e28a564e330b6330e238f1f760efcd95ad79e23d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Mar 2026 22:11:54 +0000 Subject: [PATCH] feat: add sandlock as lightweight OS-level sandboxing backend for code execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses #5150 - adds sandlock (Landlock + seccomp-bpf) as a new execution backend for CodeInterpreterTool, providing kernel-level process isolation without requiring Docker. Changes: - Add 'execution_backend' parameter to CodeInterpreterTool with options: 'auto' (default), 'docker', 'sandlock', 'unsafe' - Add sandbox configuration options: sandbox_fs_read, sandbox_fs_write, sandbox_max_memory_mb, sandbox_max_processes, sandbox_timeout - Add run_code_in_sandlock() method using sandlock's Sandbox/Policy API - Add _check_sandlock_available() to verify Linux + sandlock installation - Add _build_sandlock_policy() to construct sandlock Policy from config - Update run_code_safety() to fall back to sandlock when Docker unavailable - Update error messages to mention sandlock as an alternative - Add 'sandlock' optional dependency in pyproject.toml - Add 18 new tests covering all sandlock integration paths Co-Authored-By: João --- lib/crewai-tools/pyproject.toml | 3 + .../code_interpreter_tool.py | 282 +++++++++++++- .../tests/tools/test_code_interpreter_tool.py | 368 +++++++++++++++++- 3 files changed, 629 insertions(+), 24 deletions(-) diff --git a/lib/crewai-tools/pyproject.toml b/lib/crewai-tools/pyproject.toml index 69cb9df17..b5c9dfe9c 100644 --- a/lib/crewai-tools/pyproject.toml +++ b/lib/crewai-tools/pyproject.toml @@ -140,6 +140,9 @@ contextual = [ "contextual-client>=0.1.0", "nest-asyncio>=1.6.0", ] +sandlock = [ + "sandlock>=0.2.0", +] [build-system] diff --git a/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py b/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py index 9ad969966..55581d90e 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/code_interpreter_tool/code_interpreter_tool.py @@ -1,16 +1,41 @@ """Code Interpreter Tool for executing Python code in isolated environments. -This module provides a tool for executing Python code either in a Docker container for -safe isolation or directly in a restricted sandbox. It includes mechanisms for blocking -potentially unsafe operations and importing restricted modules. +This module provides a tool for executing Python code either in a Docker container, +a sandlock process sandbox, or directly in a restricted sandbox. It includes mechanisms +for blocking potentially unsafe operations and importing restricted modules. + +Execution backends (in order of preference): + 1. Docker: Full container isolation (~200ms startup) + 2. Sandlock: Kernel-level process sandbox via Landlock + seccomp-bpf (~1ms startup) + 3. Unsafe: Direct execution on the host (no isolation, trusted code only) + +Example usage:: + + from crewai_tools import CodeInterpreterTool + + # Auto-select best available backend (Docker > Sandlock > error) + tool = CodeInterpreterTool() + + # Explicitly use sandlock backend + tool = CodeInterpreterTool( + execution_backend="sandlock", + sandbox_fs_read=["/usr/lib/python3", "/workspace"], + sandbox_fs_write=["/workspace/output"], + sandbox_max_memory_mb=512, + ) + + # Use unsafe mode (only for trusted code) + tool = CodeInterpreterTool(unsafe_mode=True) """ import importlib.util import os +import platform import subprocess import sys +import tempfile from types import ModuleType -from typing import Any, ClassVar, TypedDict +from typing import Any, ClassVar, Literal, TypedDict from crewai.tools import BaseTool from docker import ( # type: ignore[import-untyped] @@ -56,7 +81,7 @@ class SandboxPython: sandbox escape attacks via Python object introspection. Attackers can recover the original __import__ function and bypass all restrictions. - DO NOT USE for untrusted code execution. Use Docker containers instead. + DO NOT USE for untrusted code execution. Use Docker containers or sandlock instead. This class attempts to restrict access to dangerous modules and built-in functions but provides no real security boundary against a motivated attacker. @@ -146,8 +171,34 @@ class CodeInterpreterTool(BaseTool): """A tool for executing Python code in isolated environments. This tool provides functionality to run Python code either in a Docker container - for safe isolation or directly in a restricted sandbox. It can handle installing + for safe isolation, in a sandlock process sandbox for lightweight kernel-level + isolation, or directly in a restricted sandbox. It can handle installing Python packages and executing arbitrary Python code. + + Attributes: + execution_backend: The execution backend to use. One of ``"auto"``, + ``"docker"``, ``"sandlock"``, or ``"unsafe"``. Defaults to ``"auto"`` + which tries Docker first, then sandlock, then raises an error. + sandbox_fs_read: List of filesystem paths to allow read access in sandlock. + sandbox_fs_write: List of filesystem paths to allow write access in sandlock. + sandbox_max_memory_mb: Maximum memory in MB for sandlock execution. + sandbox_max_processes: Maximum number of processes for sandlock execution. + sandbox_timeout: Timeout in seconds for sandlock execution. + + Example:: + + # Auto-select best available backend + tool = CodeInterpreterTool() + result = tool.run(code="print('hello')", libraries_used=[]) + + # Explicitly use sandlock with custom policy + tool = CodeInterpreterTool( + execution_backend="sandlock", + sandbox_fs_read=["/usr/lib/python3"], + sandbox_fs_write=["/tmp/output"], + sandbox_max_memory_mb=256, + ) + result = tool.run(code="print(2 + 2)", libraries_used=[]) """ name: str = "Code Interpreter" @@ -159,6 +210,13 @@ class CodeInterpreterTool(BaseTool): user_docker_base_url: str | None = None unsafe_mode: bool = False + execution_backend: Literal["auto", "docker", "sandlock", "unsafe"] = "auto" + sandbox_fs_read: list[str] = Field(default_factory=list) + sandbox_fs_write: list[str] = Field(default_factory=list) + sandbox_max_memory_mb: int | None = None + sandbox_max_processes: int | None = None + sandbox_timeout: int | None = None + @staticmethod def _get_installed_package_path() -> str: """Gets the installation path of the crewai_tools package. @@ -226,8 +284,17 @@ class CodeInterpreterTool(BaseTool): if not code: return "No code provided to execute." - if self.unsafe_mode: + # Handle legacy unsafe_mode flag + if self.unsafe_mode or self.execution_backend == "unsafe": return self.run_code_unsafe(code, libraries_used) + + if self.execution_backend == "docker": + return self.run_code_in_docker(code, libraries_used) + + if self.execution_backend == "sandlock": + return self.run_code_in_sandlock(code, libraries_used) + + # Auto mode: try Docker first, then sandlock, then raise error return self.run_code_safety(code, libraries_used) @staticmethod @@ -301,11 +368,184 @@ class CodeInterpreterTool(BaseTool): Printer.print("Docker is not installed", color="bold_purple") return False + @staticmethod + def _check_sandlock_available() -> bool: + """Checks if sandlock is installed and the system supports it. + + Verifies that: + 1. The sandlock package is importable + 2. The system is running Linux (sandlock requires Linux kernel features) + + Returns: + True if sandlock is available and the system supports it, False otherwise. + """ + if platform.system() != "Linux": + Printer.print( + "Sandlock requires Linux (Landlock + seccomp-bpf). " + "Use Docker on macOS/Windows.", + color="bold_purple", + ) + return False + + if importlib.util.find_spec("sandlock") is None: + Printer.print( + "Sandlock is not installed. Install with: pip install sandlock", + color="bold_purple", + ) + return False + + return True + + def _build_sandlock_policy(self, work_dir: str) -> Any: + """Builds a sandlock Policy with the configured sandbox parameters. + + Constructs a sandlock Policy object using the tool's configuration for + filesystem access, memory limits, process limits, and other constraints. + + Args: + work_dir: The working directory for the sandbox (writable). + + Returns: + A sandlock Policy object configured with the appropriate restrictions. + """ + from sandlock import Policy # type: ignore[import-untyped] + + # Default readable paths for Python execution + default_readable = [ + "/usr", + "/lib", + "/lib64", + "/etc/alternatives", + ] + + # Add Python-specific paths + python_path = os.path.dirname(os.path.dirname(sys.executable)) + if python_path not in default_readable: + default_readable.append(python_path) + + # Include site-packages for installed libraries + for path in sys.path: + if path and os.path.isdir(path) and path not in default_readable: + default_readable.append(path) + + fs_readable = list(set(default_readable + self.sandbox_fs_read)) + fs_writable = list(set([work_dir] + self.sandbox_fs_write)) + + policy_kwargs: dict[str, Any] = { + "fs_readable": fs_readable, + "fs_writable": fs_writable, + "isolate_ipc": True, + "clean_env": True, + "env": {"PATH": "/usr/bin:/bin", "HOME": work_dir}, + } + + if self.sandbox_max_memory_mb is not None: + policy_kwargs["max_memory"] = f"{self.sandbox_max_memory_mb}M" + + if self.sandbox_max_processes is not None: + policy_kwargs["max_processes"] = self.sandbox_max_processes + + return Policy(**policy_kwargs) + + def run_code_in_sandlock(self, code: str, libraries_used: list[str]) -> str: + """Runs Python code in a sandlock process sandbox. + + Uses sandlock's Landlock + seccomp-bpf kernel-level isolation to execute + code in a confined process. This provides stronger isolation than the + Python-level SandboxPython (which is vulnerable to escape attacks) while + being much lighter than Docker (~1ms vs ~200ms startup). + + Libraries are installed in a temporary directory before sandbox activation. + + Args: + code: The Python code to execute as a string. + libraries_used: A list of Python library names to install before execution. + + Returns: + The output of the executed code as a string, or an error message + if execution failed. + + Raises: + RuntimeError: If sandlock is not available or the system doesn't support it. + """ + if not self._check_sandlock_available(): + raise RuntimeError( + "Sandlock is not available. Ensure sandlock is installed " + "(pip install sandlock) and you are running on Linux 5.13+." + ) + + from sandlock import Sandbox # type: ignore[import-untyped] + + Printer.print( + "Running code in sandlock sandbox (Landlock + seccomp-bpf)", + color="bold_blue", + ) + + with tempfile.TemporaryDirectory(prefix="crewai_sandbox_") as work_dir: + # Install libraries before entering the sandbox + if libraries_used: + Printer.print( + f"Installing libraries: {', '.join(libraries_used)}", + color="bold_purple", + ) + for library in libraries_used: + subprocess.run( # noqa: S603 + [ + sys.executable, + "-m", + "pip", + "install", + "--target", + os.path.join(work_dir, "libs"), + library, + ], + check=False, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # Write the code to a temporary file + code_file = os.path.join(work_dir, "script.py") + with open(code_file, "w") as f: # noqa: PTH123 + f.write(code) + + # Build the sandbox policy + policy = self._build_sandlock_policy(work_dir) + + # Build the command with PYTHONPATH for installed libraries + env_pythonpath = os.path.join(work_dir, "libs") + cmd = [ + sys.executable, + "-c", + ( + f"import sys; sys.path.insert(0, '{env_pythonpath}'); " + f"exec(open('{code_file}').read())" + ), + ] + + timeout = self.sandbox_timeout if self.sandbox_timeout is not None else 60 + + try: + result = Sandbox(policy).run(cmd, timeout=timeout) + output = result.stdout if hasattr(result, "stdout") else str(result) + if hasattr(result, "returncode") and result.returncode != 0: + stderr = result.stderr if hasattr(result, "stderr") else "" + return ( + f"Something went wrong while running the code: " + f"\n{stderr or output}" + ) + return output + except Exception as e: + return f"An error occurred in sandlock sandbox: {e!s}" + def run_code_safety(self, code: str, libraries_used: list[str]) -> str: """Runs code in the safest available environment. - Requires Docker to be available for secure code execution. Fails closed - if Docker is not available to prevent sandbox escape vulnerabilities. + Tries execution backends in order of isolation strength: + 1. Docker (full container isolation) + 2. Sandlock (kernel-level process sandbox, Linux only) + + Fails closed if neither backend is available. Args: code: The Python code to execute as a string. @@ -315,18 +555,24 @@ class CodeInterpreterTool(BaseTool): The output of the executed code as a string. Raises: - RuntimeError: If Docker is not available, as the restricted sandbox - is vulnerable to escape attacks and should not be used - for untrusted code execution. + RuntimeError: If no secure execution backend is available. """ if self._check_docker_available(): return self.run_code_in_docker(code, libraries_used) + if self._check_sandlock_available(): + Printer.print( + "Docker unavailable, falling back to sandlock sandbox.", + color="bold_yellow", + ) + return self.run_code_in_sandlock(code, libraries_used) + error_msg = ( - "Docker is required for safe code execution but is not available. " - "The restricted sandbox fallback has been removed due to security vulnerabilities " - "that allow sandbox escape via Python object introspection. " - "Please install Docker (https://docs.docker.com/get-docker/) or use unsafe_mode=True " + "No secure execution backend is available. " + "Install Docker (https://docs.docker.com/get-docker/) for full container isolation, " + "or install sandlock (pip install sandlock) on Linux 5.13+ for lightweight " + "kernel-level sandboxing via Landlock + seccomp-bpf. " + "Alternatively, use unsafe_mode=True or execution_backend='unsafe' " "if you trust the code source and understand the security risks." ) Printer.print(error_msg, color="bold_red") @@ -372,8 +618,8 @@ class CodeInterpreterTool(BaseTool): - Access any Python module including os, subprocess, sys, etc. - Execute arbitrary commands on the host system - Use run_code_in_docker() for secure code execution, or run_code_unsafe() - if you explicitly acknowledge the security risks. + Use run_code_in_docker() or run_code_in_sandlock() for secure code execution, + or run_code_unsafe() if you explicitly acknowledge the security risks. Args: code: The Python code to execute as a string. diff --git a/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py b/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py index 5b0144790..c9d1dda79 100644 --- a/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py +++ b/lib/crewai-tools/tests/tools/test_code_interpreter_tool.py @@ -1,5 +1,6 @@ import sys -from unittest.mock import patch +from types import SimpleNamespace +from unittest.mock import MagicMock, patch from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import ( CodeInterpreterTool, @@ -23,6 +24,24 @@ def docker_unavailable_mock(): yield mock +@pytest.fixture +def sandlock_unavailable_mock(): + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.CodeInterpreterTool._check_sandlock_available", + return_value=False, + ) as mock: + yield mock + + +@pytest.fixture +def sandlock_available_mock(): + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.CodeInterpreterTool._check_sandlock_available", + return_value=True, + ) as mock: + yield mock + + @patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker_from_env") def test_run_code_in_docker(docker_mock, printer_mock): tool = CodeInterpreterTool() @@ -77,8 +96,10 @@ print("This is line 2")""" ) -def test_docker_unavailable_raises_error(printer_mock, docker_unavailable_mock): - """Test that execution fails when Docker is unavailable in safe mode.""" +def test_docker_and_sandlock_unavailable_raises_error( + printer_mock, docker_unavailable_mock, sandlock_unavailable_mock +): + """Test that execution fails when both Docker and sandlock are unavailable.""" tool = CodeInterpreterTool() code = """ result = 2 + 2 @@ -86,9 +107,9 @@ print(result) """ with pytest.raises(RuntimeError) as exc_info: tool.run(code=code, libraries_used=[]) - - assert "Docker is required for safe code execution" in str(exc_info.value) - assert "sandbox escape" in str(exc_info.value) + + assert "No secure execution backend is available" in str(exc_info.value) + assert "sandlock" in str(exc_info.value) def test_restricted_sandbox_running_with_blocked_modules(): @@ -206,6 +227,341 @@ result = eval("5/1") assert 5.0 == result +# --- Sandlock backend tests --- + + +def test_sandlock_fallback_when_docker_unavailable( + printer_mock, docker_unavailable_mock, sandlock_available_mock +): + """Test that sandlock is used as fallback when Docker is unavailable.""" + tool = CodeInterpreterTool() + code = "print('hello')" + + with patch.object( + CodeInterpreterTool, + "run_code_in_sandlock", + return_value="hello\n", + ) as sandlock_run_mock: + result = tool.run(code=code, libraries_used=[]) + + assert result == "hello\n" + sandlock_run_mock.assert_called_once_with(code, []) + + +def test_execution_backend_sandlock_calls_sandlock( + printer_mock, sandlock_available_mock +): + """Test that execution_backend='sandlock' routes to sandlock.""" + tool = CodeInterpreterTool(execution_backend="sandlock") + code = "print('test')" + + with patch.object( + CodeInterpreterTool, + "run_code_in_sandlock", + return_value="test\n", + ) as mock_sandlock: + result = tool.run(code=code, libraries_used=[]) + + assert result == "test\n" + mock_sandlock.assert_called_once_with(code, []) + + +def test_execution_backend_docker_calls_docker(printer_mock): + """Test that execution_backend='docker' routes directly to Docker.""" + tool = CodeInterpreterTool(execution_backend="docker") + code = "print('test')" + + with patch.object( + CodeInterpreterTool, + "run_code_in_docker", + return_value="test\n", + ) as mock_docker: + result = tool.run(code=code, libraries_used=[]) + + assert result == "test\n" + mock_docker.assert_called_once_with(code, []) + + +def test_execution_backend_unsafe_calls_unsafe(printer_mock): + """Test that execution_backend='unsafe' routes to unsafe mode.""" + tool = CodeInterpreterTool(execution_backend="unsafe") + code = "result = 42" + + result = tool.run(code=code, libraries_used=[]) + assert result == 42 + + +def test_sandlock_check_not_linux(printer_mock): + """Test that sandlock is unavailable on non-Linux systems.""" + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system", + return_value="Darwin", + ): + assert CodeInterpreterTool._check_sandlock_available() is False + + +def test_sandlock_check_not_installed(printer_mock): + """Test that sandlock is unavailable when not installed.""" + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system", + return_value="Linux", + ): + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.importlib.util.find_spec", + return_value=None, + ): + assert CodeInterpreterTool._check_sandlock_available() is False + + +def test_sandlock_check_available_on_linux(printer_mock): + """Test that sandlock is available on Linux when installed.""" + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system", + return_value="Linux", + ): + with patch( + "crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.importlib.util.find_spec", + return_value=MagicMock(), # non-None means installed + ): + assert CodeInterpreterTool._check_sandlock_available() is True + + +def test_sandlock_run_raises_when_unavailable(printer_mock): + """Test that run_code_in_sandlock raises RuntimeError when sandlock is unavailable.""" + tool = CodeInterpreterTool() + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=False + ): + with pytest.raises(RuntimeError) as exc_info: + tool.run_code_in_sandlock("print('hello')", []) + assert "Sandlock is not available" in str(exc_info.value) + + +def test_sandlock_run_success(printer_mock): + """Test sandlock execution with successful output.""" + tool = CodeInterpreterTool() + code = "print('hello from sandlock')" + + sandbox_result = SimpleNamespace( + stdout="hello from sandlock\n", stderr="", returncode=0 + ) + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.return_value = sandbox_result + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + result = tool.run_code_in_sandlock(code, []) + + assert result == "hello from sandlock\n" + + +def test_sandlock_run_with_error(printer_mock): + """Test sandlock execution when the code returns an error.""" + tool = CodeInterpreterTool() + code = "print(1/0)" + + sandbox_result = SimpleNamespace( + stdout="", stderr="ZeroDivisionError: division by zero", returncode=1 + ) + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.return_value = sandbox_result + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + result = tool.run_code_in_sandlock(code, []) + + assert "Something went wrong" in result + assert "ZeroDivisionError" in result + + +def test_sandlock_run_with_exception(printer_mock): + """Test sandlock execution when an exception occurs.""" + tool = CodeInterpreterTool() + code = "print('hello')" + + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.side_effect = OSError("Landlock not supported") + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + result = tool.run_code_in_sandlock(code, []) + + assert "An error occurred in sandlock sandbox" in result + assert "Landlock not supported" in result + + +@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run") +def test_sandlock_installs_libraries_to_temp_dir( + subprocess_run_mock, printer_mock +): + """Test that sandlock installs libraries to a temporary directory.""" + tool = CodeInterpreterTool() + code = "result = 1" + libraries_used = ["numpy"] + + sandbox_result = SimpleNamespace(stdout="", stderr="", returncode=0) + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.return_value = sandbox_result + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + tool.run_code_in_sandlock(code, libraries_used) + + # Check that subprocess.run was called for pip install with --target + pip_calls = [ + c for c in subprocess_run_mock.call_args_list + if "--target" in c[0][0] + ] + assert len(pip_calls) == 1 + args = pip_calls[0][0][0] + assert args[0] == sys.executable + assert "--target" in args + assert "numpy" in args + + +def test_sandlock_custom_policy_params(printer_mock): + """Test that custom sandbox parameters are passed to the policy.""" + tool = CodeInterpreterTool( + sandbox_fs_read=["/custom/read"], + sandbox_fs_write=["/custom/write"], + sandbox_max_memory_mb=256, + sandbox_max_processes=5, + ) + + mock_policy_cls = MagicMock() + mock_sandlock_module = MagicMock() + mock_sandlock_module.Policy = mock_policy_cls + + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + tool._build_sandlock_policy("/tmp/work") + + mock_policy_cls.assert_called_once() + call_kwargs = mock_policy_cls.call_args[1] + assert "/custom/read" in call_kwargs["fs_readable"] + assert "/custom/write" in call_kwargs["fs_writable"] + assert "/tmp/work" in call_kwargs["fs_writable"] + assert call_kwargs["max_memory"] == "256M" + assert call_kwargs["max_processes"] == 5 + assert call_kwargs["isolate_ipc"] is True + assert call_kwargs["clean_env"] is True + + +def test_sandlock_default_policy_no_memory_limit(printer_mock): + """Test that default policy omits max_memory when not configured.""" + tool = CodeInterpreterTool() + + mock_policy_cls = MagicMock() + mock_sandlock_module = MagicMock() + mock_sandlock_module.Policy = mock_policy_cls + + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + tool._build_sandlock_policy("/tmp/work") + + call_kwargs = mock_policy_cls.call_args[1] + assert "max_memory" not in call_kwargs + assert "max_processes" not in call_kwargs + + +def test_sandlock_timeout_default(printer_mock): + """Test that sandlock uses the default 60s timeout.""" + tool = CodeInterpreterTool() + code = "print('hello')" + + sandbox_result = SimpleNamespace(stdout="hello\n", stderr="", returncode=0) + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.return_value = sandbox_result + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + tool.run_code_in_sandlock(code, []) + + # Verify timeout=60 was passed + run_call = mock_sandbox_instance.run + assert run_call.call_args[1]["timeout"] == 60 + + +def test_sandlock_custom_timeout(printer_mock): + """Test that sandlock uses a custom timeout when configured.""" + tool = CodeInterpreterTool(sandbox_timeout=30) + code = "print('hello')" + + sandbox_result = SimpleNamespace(stdout="hello\n", stderr="", returncode=0) + mock_sandbox_instance = MagicMock() + mock_sandbox_instance.run.return_value = sandbox_result + mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance) + mock_policy_cls = MagicMock() + + with patch.object( + CodeInterpreterTool, "_check_sandlock_available", return_value=True + ): + mock_sandlock_module = MagicMock() + mock_sandlock_module.Sandbox = mock_sandbox_cls + mock_sandlock_module.Policy = mock_policy_cls + with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}): + tool.run_code_in_sandlock(code, []) + + run_call = mock_sandbox_instance.run + assert run_call.call_args[1]["timeout"] == 30 + + +def test_auto_mode_prefers_docker_over_sandlock(printer_mock): + """Test that auto mode tries Docker first before sandlock.""" + tool = CodeInterpreterTool() + code = "print('hello')" + + with patch.object( + CodeInterpreterTool, "_check_docker_available", return_value=True + ): + with patch.object( + CodeInterpreterTool, "run_code_in_docker", return_value="hello\n" + ) as mock_docker: + with patch.object( + CodeInterpreterTool, + "run_code_in_sandlock", + return_value="hello\n", + ) as mock_sandlock: + result = tool.run(code=code, libraries_used=[]) + + mock_docker.assert_called_once() + mock_sandlock.assert_not_called() + assert result == "hello\n" + + @pytest.mark.xfail( reason=( "run_code_in_restricted_sandbox is known to be vulnerable to sandbox "