[SECURITY] Fix F-001: Remove vulnerable sandbox fallback in CodeInterpreterTool

CRITICAL SECURITY FIX
=====================

Vulnerability: Sandbox escape in CodeInterpreterTool fallback leads to host RCE

Impact:
- Removed bypassable Python sandbox that could be escaped via object introspection
- Attackers could previously execute arbitrary code on host when Docker unavailable

Changes:
- Removed SandboxPython class entirely (insecure by design)
- Removed run_code_in_restricted_sandbox() fallback method
- Implemented fail-safe behavior: raises RuntimeError when Docker unavailable
- Fixed command injection in unsafe_mode library installation (os.system -> subprocess)
- Enhanced security warnings and documentation

Security Model:
- Safe mode (default): Requires Docker, fails safely if unavailable
- Unsafe mode: Explicit opt-in, clear warnings, no protections

Breaking Change:
- Code execution now requires Docker or explicit unsafe_mode=True
- Previous silent fallback to vulnerable sandbox is removed

Testing:
- Updated all tests to reflect new fail-safe behavior
- Added tests for Docker unavailable scenarios
- Verified subprocess usage for library installation

Refs: F-001, SECURITY_FIX_F001.md
Docs: https://docs.crewai.com/en/tools/ai-ml/codeinterpretertool

Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>
This commit is contained in:
Cursor Agent
2026-03-09 14:06:31 +00:00
parent 51dc1199a3
commit 0c10f13c90
3 changed files with 372 additions and 206 deletions

View File

@@ -1,15 +1,17 @@
"""Code Interpreter Tool for executing Python code in isolated environments.
This module provides a tool for executing Python code either in a Docker container for
safe isolation or directly in a restricted sandbox. It includes mechanisms for blocking
potentially unsafe operations and importing restricted modules.
This module provides a tool for executing Python code in a Docker container for
safe isolation. Docker is required for secure code execution.
SECURITY: This tool executes arbitrary code. Docker isolation is mandatory for
untrusted code. The tool will fail if Docker is not available to prevent
sandbox escape vulnerabilities.
"""
import importlib.util
import os
import subprocess
from types import ModuleType
from typing import Any, ClassVar, TypedDict
from typing import Any, TypedDict
from crewai.tools import BaseTool
from docker import ( # type: ignore[import-untyped]
@@ -49,104 +51,23 @@ class CodeInterpreterSchema(BaseModel):
)
class SandboxPython:
"""A restricted Python execution environment for running code safely.
This class provides methods to safely execute Python code by restricting access to
potentially dangerous modules and built-in functions. It creates a sandboxed
environment where harmful operations are blocked.
"""
BLOCKED_MODULES: ClassVar[set[str]] = {
"os",
"sys",
"subprocess",
"shutil",
"importlib",
"inspect",
"tempfile",
"sysconfig",
"builtins",
}
UNSAFE_BUILTINS: ClassVar[set[str]] = {
"exec",
"eval",
"open",
"compile",
"input",
"globals",
"locals",
"vars",
"help",
"dir",
}
@staticmethod
def restricted_import(
name: str,
custom_globals: dict[str, Any] | None = None,
custom_locals: dict[str, Any] | None = None,
fromlist: list[str] | None = None,
level: int = 0,
) -> ModuleType:
"""A restricted import function that blocks importing of unsafe modules.
Args:
name: The name of the module to import.
custom_globals: Global namespace to use.
custom_locals: Local namespace to use.
fromlist: List of items to import from the module.
level: The level value passed to __import__.
Returns:
The imported module if allowed.
Raises:
ImportError: If the module is in the blocked modules list.
"""
if name in SandboxPython.BLOCKED_MODULES:
raise ImportError(f"Importing '{name}' is not allowed.")
return __import__(name, custom_globals, custom_locals, fromlist or (), level)
@staticmethod
def safe_builtins() -> dict[str, Any]:
"""Creates a dictionary of built-in functions with unsafe ones removed.
Returns:
A dictionary of safe built-in functions and objects.
"""
import builtins
safe_builtins = {
k: v
for k, v in builtins.__dict__.items()
if k not in SandboxPython.UNSAFE_BUILTINS
}
safe_builtins["__import__"] = SandboxPython.restricted_import
return safe_builtins
@staticmethod
def exec(code: str, locals_: dict[str, Any]) -> None:
"""Executes Python code in a restricted environment.
Args:
code: The Python code to execute as a string.
locals_: A dictionary that will be used for local variable storage.
"""
exec(code, {"__builtins__": SandboxPython.safe_builtins()}, locals_) # noqa: S102
class CodeInterpreterTool(BaseTool):
"""A tool for executing Python code in isolated environments.
"""A tool for executing Python code in isolated Docker containers.
This tool provides functionality to run Python code either in a Docker container
for safe isolation or directly in a restricted sandbox. It can handle installing
Python packages and executing arbitrary Python code.
This tool provides functionality to run Python code in a Docker container
for safe isolation. Docker is required for secure code execution.
Security Model:
- Docker container provides process, filesystem, and network isolation
- Code execution fails if Docker is unavailable (fail-safe)
- unsafe_mode bypasses all protections (use only in trusted environments)
For more information, see:
https://docs.crewai.com/en/tools/ai-ml/codeinterpretertool#docker-container-recommended
"""
name: str = "Code Interpreter"
description: str = "Interprets Python3 code strings with a final print statement."
description: str = "Interprets Python3 code strings with a final print statement. Requires Docker for secure execution."
args_schema: type[BaseModel] = CodeInterpreterSchema
default_image_tag: str = "code-interpreter:latest"
code: str | None = None
@@ -271,12 +192,10 @@ class CodeInterpreterTool(BaseTool):
"""Checks if Docker is available and running on the system.
Attempts to run the 'docker info' command to verify Docker availability.
Prints appropriate messages if Docker is not installed or not running.
Returns:
True if Docker is available and running, False otherwise.
"""
try:
subprocess.run(
["docker", "info"], # noqa: S607
@@ -286,32 +205,44 @@ class CodeInterpreterTool(BaseTool):
timeout=1,
)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
Printer.print(
"Docker is installed but not running or inaccessible.",
color="bold_purple",
)
return False
except FileNotFoundError:
Printer.print("Docker is not installed", color="bold_purple")
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
return False
def run_code_safety(self, code: str, libraries_used: list[str]) -> str:
"""Runs code in the safest available environment.
"""Runs code in a Docker container for safe isolation.
Attempts to run code in Docker if available, falls back to a restricted
sandbox if Docker is not available.
Requires Docker to be installed and running. Fails with an error message
if Docker is not available, preventing sandbox escape vulnerabilities.
Args:
code: The Python code to execute as a string.
libraries_used: A list of Python library names to install before execution.
Returns:
The output of the executed code as a string.
The output of the executed code as a string, or an error message if
Docker is not available.
Raises:
RuntimeError: If Docker is not available and code execution is attempted.
"""
if self._check_docker_available():
return self.run_code_in_docker(code, libraries_used)
return self.run_code_in_restricted_sandbox(code)
if not self._check_docker_available():
error_msg = (
"SECURITY ERROR: Docker is required for safe code execution but is not available.\n\n"
"Docker provides essential isolation to prevent sandbox escape attacks.\n"
"Please install and start Docker, then try again.\n\n"
"For installation instructions, see:\n"
"- https://docs.docker.com/get-docker/\n"
"- https://docs.crewai.com/en/tools/ai-ml/codeinterpretertool#docker-container-recommended\n\n"
"If you are in a trusted environment and understand the risks, you can use unsafe_mode=True,\n"
"but this is NOT recommended for production use or untrusted code."
)
Printer.print(error_msg, color="bold_red")
raise RuntimeError(
"Docker is required for safe code execution. "
"Install Docker or use unsafe_mode=True (not recommended)."
)
return self.run_code_in_docker(code, libraries_used)
def run_code_in_docker(self, code: str, libraries_used: list[str]) -> str:
"""Runs Python code in a Docker container for safe isolation.
@@ -340,34 +271,20 @@ class CodeInterpreterTool(BaseTool):
return f"Something went wrong while running the code: \n{exec_result.output.decode('utf-8')}"
return exec_result.output.decode("utf-8")
@staticmethod
def run_code_in_restricted_sandbox(code: str) -> str:
"""Runs Python code in a restricted sandbox environment.
Executes the code with restricted access to potentially dangerous modules and
built-in functions for basic safety when Docker is not available.
Args:
code: The Python code to execute as a string.
Returns:
The value of the 'result' variable from the executed code,
or an error message if execution failed.
"""
Printer.print("Running code in restricted sandbox", color="yellow")
exec_locals: dict[str, Any] = {}
try:
SandboxPython.exec(code=code, locals_=exec_locals)
return exec_locals.get("result", "No result variable found.")
except Exception as e:
return f"An error occurred: {e!s}"
@staticmethod
def run_code_unsafe(code: str, libraries_used: list[str]) -> str:
"""Runs code directly on the host machine without any safety restrictions.
WARNING: This mode is unsafe and should only be used in trusted environments
with code from trusted sources.
WARNING: This mode bypasses all security controls and executes code directly
on the host system. Use ONLY in trusted environments with trusted code.
SECURITY RISKS:
- No process isolation
- No filesystem restrictions
- No network restrictions
- Full access to host system resources
- Potential for system compromise
Args:
code: The Python code to execute as a string.
@@ -377,12 +294,23 @@ class CodeInterpreterTool(BaseTool):
The value of the 'result' variable from the executed code,
or an error message if execution failed.
"""
Printer.print("WARNING: Running code in unsafe mode", color="bold_magenta")
# Install libraries on the host machine
for library in libraries_used:
os.system(f"pip install {library}") # noqa: S605
Printer.print(
"⚠️ WARNING: Running code in UNSAFE mode - no security controls active!",
color="bold_red",
)
for library in libraries_used:
try:
subprocess.run(
["pip", "install", library],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
return f"Failed to install library '{library}': {e!s}"
# Execute the code
try:
exec_locals: dict[str, Any] = {}
exec(code, {}, exec_locals) # noqa: S102

View File

@@ -1,10 +1,11 @@
import subprocess
from unittest.mock import patch
import pytest
from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import (
CodeInterpreterTool,
SandboxPython,
)
import pytest
@pytest.fixture
@@ -76,99 +77,91 @@ print("This is line 2")"""
)
def test_restricted_sandbox_basic_code_execution(printer_mock, docker_unavailable_mock):
"""Test basic code execution."""
def test_docker_unavailable_fails_safely(printer_mock, docker_unavailable_mock):
"""Test that code execution fails when Docker is unavailable."""
tool = CodeInterpreterTool()
code = """
result = 2 + 2
print(result)
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
assert result == 4
with pytest.raises(RuntimeError) as exc_info:
tool.run(code=code, libraries_used=[])
assert "Docker is required for safe code execution" in str(exc_info.value)
assert printer_mock.called
call_args = printer_mock.call_args
assert "SECURITY ERROR" in call_args[0][0]
assert call_args[1]["color"] == "bold_red"
def test_restricted_sandbox_running_with_blocked_modules(
printer_mock, docker_unavailable_mock
):
"""Test that restricted modules cannot be imported."""
def test_docker_unavailable_suggests_unsafe_mode(printer_mock, docker_unavailable_mock):
"""Test that error message suggests unsafe_mode as alternative."""
tool = CodeInterpreterTool()
restricted_modules = SandboxPython.BLOCKED_MODULES
code = "result = 1 + 1"
for module in restricted_modules:
code = f"""
import {module}
result = "Import succeeded"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
with pytest.raises(RuntimeError) as exc_info:
tool.run(code=code, libraries_used=[])
assert f"An error occurred: Importing '{module}' is not allowed" in result
def test_restricted_sandbox_running_with_blocked_builtins(
printer_mock, docker_unavailable_mock
):
"""Test that restricted builtins are not available."""
tool = CodeInterpreterTool()
restricted_builtins = SandboxPython.UNSAFE_BUILTINS
for builtin in restricted_builtins:
code = f"""
{builtin}("test")
result = "Builtin available"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
assert f"An error occurred: name '{builtin}' is not defined" in result
def test_restricted_sandbox_running_with_no_result_variable(
printer_mock, docker_unavailable_mock
):
"""Test behavior when no result variable is set."""
tool = CodeInterpreterTool()
code = """
x = 10
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
assert result == "No result variable found."
error_output = printer_mock.call_args[0][0]
assert "unsafe_mode=True" in error_output
assert "NOT recommended" in error_output
assert "docs.crewai.com" in error_output
def test_unsafe_mode_running_with_no_result_variable(
printer_mock, docker_unavailable_mock
):
"""Test behavior when no result variable is set."""
"""Test behavior when no result variable is set in unsafe mode."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = """
x = 10
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"WARNING: Running code in unsafe mode", color="bold_magenta"
"⚠️ WARNING: Running code in UNSAFE mode - no security controls active!",
color="bold_red",
)
assert result == "No result variable found."
def test_unsafe_mode_running_unsafe_code(printer_mock, docker_unavailable_mock):
"""Test behavior when no result variable is set."""
"""Test that unsafe mode allows unrestricted code execution."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = """
import os
os.system("ls -la")
result = eval("5/1")
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"WARNING: Running code in unsafe mode", color="bold_magenta"
"⚠️ WARNING: Running code in UNSAFE mode - no security controls active!",
color="bold_red",
)
assert 5.0 == result
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run")
def test_unsafe_mode_library_installation(subprocess_mock, printer_mock, docker_unavailable_mock):
"""Test that unsafe mode properly installs libraries using subprocess."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 42"
libraries = ["numpy", "pandas"]
subprocess_mock.return_value = None
tool.run(code=code, libraries_used=libraries)
assert subprocess_mock.call_count == 2
subprocess_mock.assert_any_call(
["pip", "install", "numpy"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)
subprocess_mock.assert_any_call(
["pip", "install", "pandas"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=30,
)