feat: add sandlock as lightweight OS-level sandboxing backend for code execution

Addresses #5150 - adds sandlock (Landlock + seccomp-bpf) as a new execution
backend for CodeInterpreterTool, providing kernel-level process isolation
without requiring Docker.

Changes:
- Add 'execution_backend' parameter to CodeInterpreterTool with options:
  'auto' (default), 'docker', 'sandlock', 'unsafe'
- Add sandbox configuration options: sandbox_fs_read, sandbox_fs_write,
  sandbox_max_memory_mb, sandbox_max_processes, sandbox_timeout
- Add run_code_in_sandlock() method using sandlock's Sandbox/Policy API
- Add _check_sandlock_available() to verify Linux + sandlock installation
- Add _build_sandlock_policy() to construct sandlock Policy from config
- Update run_code_safety() to fall back to sandlock when Docker unavailable
- Update error messages to mention sandlock as an alternative
- Add 'sandlock' optional dependency in pyproject.toml
- Add 18 new tests covering all sandlock integration paths

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2026-03-27 22:11:54 +00:00
parent 9fe0c15549
commit e28a564e33
3 changed files with 629 additions and 24 deletions

View File

@@ -1,5 +1,6 @@
import sys
from unittest.mock import patch
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import (
CodeInterpreterTool,
@@ -23,6 +24,24 @@ def docker_unavailable_mock():
yield mock
@pytest.fixture
def sandlock_unavailable_mock():
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.CodeInterpreterTool._check_sandlock_available",
return_value=False,
) as mock:
yield mock
@pytest.fixture
def sandlock_available_mock():
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.CodeInterpreterTool._check_sandlock_available",
return_value=True,
) as mock:
yield mock
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.docker_from_env")
def test_run_code_in_docker(docker_mock, printer_mock):
tool = CodeInterpreterTool()
@@ -77,8 +96,10 @@ print("This is line 2")"""
)
def test_docker_unavailable_raises_error(printer_mock, docker_unavailable_mock):
"""Test that execution fails when Docker is unavailable in safe mode."""
def test_docker_and_sandlock_unavailable_raises_error(
printer_mock, docker_unavailable_mock, sandlock_unavailable_mock
):
"""Test that execution fails when both Docker and sandlock are unavailable."""
tool = CodeInterpreterTool()
code = """
result = 2 + 2
@@ -86,9 +107,9 @@ print(result)
"""
with pytest.raises(RuntimeError) as exc_info:
tool.run(code=code, libraries_used=[])
assert "Docker is required for safe code execution" in str(exc_info.value)
assert "sandbox escape" in str(exc_info.value)
assert "No secure execution backend is available" in str(exc_info.value)
assert "sandlock" in str(exc_info.value)
def test_restricted_sandbox_running_with_blocked_modules():
@@ -206,6 +227,341 @@ result = eval("5/1")
assert 5.0 == result
# --- Sandlock backend tests ---
def test_sandlock_fallback_when_docker_unavailable(
printer_mock, docker_unavailable_mock, sandlock_available_mock
):
"""Test that sandlock is used as fallback when Docker is unavailable."""
tool = CodeInterpreterTool()
code = "print('hello')"
with patch.object(
CodeInterpreterTool,
"run_code_in_sandlock",
return_value="hello\n",
) as sandlock_run_mock:
result = tool.run(code=code, libraries_used=[])
assert result == "hello\n"
sandlock_run_mock.assert_called_once_with(code, [])
def test_execution_backend_sandlock_calls_sandlock(
printer_mock, sandlock_available_mock
):
"""Test that execution_backend='sandlock' routes to sandlock."""
tool = CodeInterpreterTool(execution_backend="sandlock")
code = "print('test')"
with patch.object(
CodeInterpreterTool,
"run_code_in_sandlock",
return_value="test\n",
) as mock_sandlock:
result = tool.run(code=code, libraries_used=[])
assert result == "test\n"
mock_sandlock.assert_called_once_with(code, [])
def test_execution_backend_docker_calls_docker(printer_mock):
"""Test that execution_backend='docker' routes directly to Docker."""
tool = CodeInterpreterTool(execution_backend="docker")
code = "print('test')"
with patch.object(
CodeInterpreterTool,
"run_code_in_docker",
return_value="test\n",
) as mock_docker:
result = tool.run(code=code, libraries_used=[])
assert result == "test\n"
mock_docker.assert_called_once_with(code, [])
def test_execution_backend_unsafe_calls_unsafe(printer_mock):
"""Test that execution_backend='unsafe' routes to unsafe mode."""
tool = CodeInterpreterTool(execution_backend="unsafe")
code = "result = 42"
result = tool.run(code=code, libraries_used=[])
assert result == 42
def test_sandlock_check_not_linux(printer_mock):
"""Test that sandlock is unavailable on non-Linux systems."""
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system",
return_value="Darwin",
):
assert CodeInterpreterTool._check_sandlock_available() is False
def test_sandlock_check_not_installed(printer_mock):
"""Test that sandlock is unavailable when not installed."""
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system",
return_value="Linux",
):
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.importlib.util.find_spec",
return_value=None,
):
assert CodeInterpreterTool._check_sandlock_available() is False
def test_sandlock_check_available_on_linux(printer_mock):
"""Test that sandlock is available on Linux when installed."""
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.platform.system",
return_value="Linux",
):
with patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.importlib.util.find_spec",
return_value=MagicMock(), # non-None means installed
):
assert CodeInterpreterTool._check_sandlock_available() is True
def test_sandlock_run_raises_when_unavailable(printer_mock):
"""Test that run_code_in_sandlock raises RuntimeError when sandlock is unavailable."""
tool = CodeInterpreterTool()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=False
):
with pytest.raises(RuntimeError) as exc_info:
tool.run_code_in_sandlock("print('hello')", [])
assert "Sandlock is not available" in str(exc_info.value)
def test_sandlock_run_success(printer_mock):
"""Test sandlock execution with successful output."""
tool = CodeInterpreterTool()
code = "print('hello from sandlock')"
sandbox_result = SimpleNamespace(
stdout="hello from sandlock\n", stderr="", returncode=0
)
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.return_value = sandbox_result
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
result = tool.run_code_in_sandlock(code, [])
assert result == "hello from sandlock\n"
def test_sandlock_run_with_error(printer_mock):
"""Test sandlock execution when the code returns an error."""
tool = CodeInterpreterTool()
code = "print(1/0)"
sandbox_result = SimpleNamespace(
stdout="", stderr="ZeroDivisionError: division by zero", returncode=1
)
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.return_value = sandbox_result
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
result = tool.run_code_in_sandlock(code, [])
assert "Something went wrong" in result
assert "ZeroDivisionError" in result
def test_sandlock_run_with_exception(printer_mock):
"""Test sandlock execution when an exception occurs."""
tool = CodeInterpreterTool()
code = "print('hello')"
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.side_effect = OSError("Landlock not supported")
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
result = tool.run_code_in_sandlock(code, [])
assert "An error occurred in sandlock sandbox" in result
assert "Landlock not supported" in result
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run")
def test_sandlock_installs_libraries_to_temp_dir(
subprocess_run_mock, printer_mock
):
"""Test that sandlock installs libraries to a temporary directory."""
tool = CodeInterpreterTool()
code = "result = 1"
libraries_used = ["numpy"]
sandbox_result = SimpleNamespace(stdout="", stderr="", returncode=0)
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.return_value = sandbox_result
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
tool.run_code_in_sandlock(code, libraries_used)
# Check that subprocess.run was called for pip install with --target
pip_calls = [
c for c in subprocess_run_mock.call_args_list
if "--target" in c[0][0]
]
assert len(pip_calls) == 1
args = pip_calls[0][0][0]
assert args[0] == sys.executable
assert "--target" in args
assert "numpy" in args
def test_sandlock_custom_policy_params(printer_mock):
"""Test that custom sandbox parameters are passed to the policy."""
tool = CodeInterpreterTool(
sandbox_fs_read=["/custom/read"],
sandbox_fs_write=["/custom/write"],
sandbox_max_memory_mb=256,
sandbox_max_processes=5,
)
mock_policy_cls = MagicMock()
mock_sandlock_module = MagicMock()
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
tool._build_sandlock_policy("/tmp/work")
mock_policy_cls.assert_called_once()
call_kwargs = mock_policy_cls.call_args[1]
assert "/custom/read" in call_kwargs["fs_readable"]
assert "/custom/write" in call_kwargs["fs_writable"]
assert "/tmp/work" in call_kwargs["fs_writable"]
assert call_kwargs["max_memory"] == "256M"
assert call_kwargs["max_processes"] == 5
assert call_kwargs["isolate_ipc"] is True
assert call_kwargs["clean_env"] is True
def test_sandlock_default_policy_no_memory_limit(printer_mock):
"""Test that default policy omits max_memory when not configured."""
tool = CodeInterpreterTool()
mock_policy_cls = MagicMock()
mock_sandlock_module = MagicMock()
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
tool._build_sandlock_policy("/tmp/work")
call_kwargs = mock_policy_cls.call_args[1]
assert "max_memory" not in call_kwargs
assert "max_processes" not in call_kwargs
def test_sandlock_timeout_default(printer_mock):
"""Test that sandlock uses the default 60s timeout."""
tool = CodeInterpreterTool()
code = "print('hello')"
sandbox_result = SimpleNamespace(stdout="hello\n", stderr="", returncode=0)
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.return_value = sandbox_result
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
tool.run_code_in_sandlock(code, [])
# Verify timeout=60 was passed
run_call = mock_sandbox_instance.run
assert run_call.call_args[1]["timeout"] == 60
def test_sandlock_custom_timeout(printer_mock):
"""Test that sandlock uses a custom timeout when configured."""
tool = CodeInterpreterTool(sandbox_timeout=30)
code = "print('hello')"
sandbox_result = SimpleNamespace(stdout="hello\n", stderr="", returncode=0)
mock_sandbox_instance = MagicMock()
mock_sandbox_instance.run.return_value = sandbox_result
mock_sandbox_cls = MagicMock(return_value=mock_sandbox_instance)
mock_policy_cls = MagicMock()
with patch.object(
CodeInterpreterTool, "_check_sandlock_available", return_value=True
):
mock_sandlock_module = MagicMock()
mock_sandlock_module.Sandbox = mock_sandbox_cls
mock_sandlock_module.Policy = mock_policy_cls
with patch.dict("sys.modules", {"sandlock": mock_sandlock_module}):
tool.run_code_in_sandlock(code, [])
run_call = mock_sandbox_instance.run
assert run_call.call_args[1]["timeout"] == 30
def test_auto_mode_prefers_docker_over_sandlock(printer_mock):
"""Test that auto mode tries Docker first before sandlock."""
tool = CodeInterpreterTool()
code = "print('hello')"
with patch.object(
CodeInterpreterTool, "_check_docker_available", return_value=True
):
with patch.object(
CodeInterpreterTool, "run_code_in_docker", return_value="hello\n"
) as mock_docker:
with patch.object(
CodeInterpreterTool,
"run_code_in_sandlock",
return_value="hello\n",
) as mock_sandlock:
result = tool.run(code=code, libraries_used=[])
mock_docker.assert_called_once()
mock_sandlock.assert_not_called()
assert result == "hello\n"
@pytest.mark.xfail(
reason=(
"run_code_in_restricted_sandbox is known to be vulnerable to sandbox "