Compare commits

...

3 Commits

Author SHA1 Message Date
Rip&Tear
df4495dc06 Potential fix for pull request finding 'Empty except'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-06-19 14:46:26 +08:00
Rip&Tear
714cd11d04 Potential fix for pull request finding 'Empty except'
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
2026-06-19 14:42:16 +08:00
Rip&Tear
750684ca36 fix: enforce owner-only permissions on credential files
Credentials stored at rest were left world-readable on multi-user hosts:

- TokenManager._get_secure_storage_path() documented its credential dir as
  mode 0o700 but created it via mkdir() with default perms (0o755), leaving
  the Fernet secret.key and encrypted tokens.enc in a traversable dir.
- Settings.dump() persisted tool_repository_password (plaintext) to
  settings.json via open("w"), producing a 0o644 file, and created the
  config dir at 0o755 — despite the sibling token_manager already writing
  secrets atomically at 0o600.

Fixes:
- TokenManager: chmod the credential dir to 0o700 after mkdir (robust against
  umask and pre-existing dirs).
- Settings: write settings.json atomically at 0o600 (mkstemp + chmod +
  os.replace) and chmod the dedicated config dir to 0o700. The /tmp and cwd
  fallback parents are deliberately not chmod'd; the 0o600 file mode protects
  the credential there.

Adds regression tests asserting 0o600 files and 0o700 dirs, and that shared
fallback dirs are not globally tightened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-19 14:23:30 +08:00
4 changed files with 150 additions and 2 deletions

View File

@@ -1,5 +1,8 @@
import json
import os
import shutil
import stat
import sys
import tempfile
import unittest
from datetime import datetime, timedelta
@@ -146,3 +149,55 @@ class TestSettings(unittest.TestCase):
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
class TestSettingsFilePermissions(unittest.TestCase):
"""Regression tests: credentials in settings.json must not be world-readable."""
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
@unittest.skipIf(sys.platform == "win32", "POSIX permission semantics")
def test_dump_writes_owner_only_file(self):
config_path = self.test_dir / "settings.json"
old_umask = os.umask(0o022)
try:
settings = Settings(
config_path=config_path, tool_repository_password="hunter2"
)
settings.dump()
finally:
os.umask(old_umask)
mode = stat.S_IMODE(config_path.stat().st_mode)
self.assertEqual(mode, 0o600, f"expected 0o600, got {oct(mode)}")
@unittest.skipIf(sys.platform == "win32", "POSIX permission semantics")
def test_dedicated_config_dir_is_owner_only(self):
config_path = self.test_dir / "crewai" / "settings.json"
old_umask = os.umask(0o022)
try:
Settings(config_path=config_path, tool_repository_username="u")
finally:
os.umask(old_umask)
mode = stat.S_IMODE(config_path.parent.stat().st_mode)
self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}")
@unittest.skipIf(sys.platform == "win32", "POSIX permission semantics")
def test_shared_fallback_dir_is_not_chmodded(self):
"""The system temp dir (a fallback parent) must never be globally chmod'd."""
from crewai_core.settings import _ensure_dir_mode
tmp_root = Path(tempfile.gettempdir())
before = stat.S_IMODE(tmp_root.stat().st_mode)
_ensure_dir_mode(tmp_root)
after = stat.S_IMODE(tmp_root.stat().st_mode)
self.assertEqual(before, after)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,6 +1,9 @@
"""Tests for TokenManager with atomic file operations."""
import json
import os
import stat
import sys
import tempfile
import unittest
from datetime import datetime, timedelta
@@ -285,5 +288,50 @@ class TestAtomicFileOperations(unittest.TestCase):
tm._delete_secure_file("nonexistent.txt")
class TestSecureStoragePathPermissions(unittest.TestCase):
"""Test that the credential directory is created with restrictive permissions."""
@unittest.skipIf(sys.platform == "win32", "POSIX permission semantics")
def test_storage_path_is_owner_only(self) -> None:
"""The credential directory must be mode 0o700 even under a permissive umask."""
with tempfile.TemporaryDirectory() as base:
old_umask = os.umask(0o022)
try:
with (
patch("crewai_core.token_manager.sys.platform", "linux"),
patch(
"crewai_core.token_manager.os.path.expanduser",
return_value=base,
),
):
storage_path = TokenManager._get_secure_storage_path()
finally:
os.umask(old_umask)
self.assertTrue(storage_path.is_dir())
mode = stat.S_IMODE(storage_path.stat().st_mode)
self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}")
@unittest.skipIf(sys.platform == "win32", "POSIX permission semantics")
def test_existing_loose_dir_is_tightened(self) -> None:
"""A pre-existing world-traversable directory is corrected to 0o700."""
with tempfile.TemporaryDirectory() as base:
loose = Path(base) / "crewai" / "credentials"
loose.mkdir(parents=True)
loose.chmod(0o755)
with (
patch("crewai_core.token_manager.sys.platform", "linux"),
patch(
"crewai_core.token_manager.os.path.expanduser",
return_value=base,
),
):
storage_path = TokenManager._get_secure_storage_path()
mode = stat.S_IMODE(storage_path.stat().st_mode)
self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}")
if __name__ == "__main__":
unittest.main()

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import json
from logging import getLogger
import os
from pathlib import Path
import tempfile
from typing import Any
@@ -25,6 +26,41 @@ logger = getLogger(__name__)
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
def _ensure_dir_mode(directory: Path) -> None:
"""Tighten a dedicated config directory to 0o700.
Skips directories shared with other users or content (the system temp dir
and the current working directory), which are used as best-effort fallbacks
by :func:`get_writable_config_path` and must not be globally chmod'd. Secret
files written there are still protected by their own 0o600 mode.
"""
try:
shared = {Path(tempfile.gettempdir()).resolve(), Path.cwd().resolve()}
if directory.resolve() in shared:
return
directory.chmod(0o700)
except OSError as e:
logger.debug(
"Could not enforce 0o700 on config directory %s (best-effort): %s",
directory,
e,
)
def _write_secure_json(path: Path, data: dict[str, Any]) -> None:
"""Atomically write ``data`` as JSON to ``path`` with owner-only (0o600) mode."""
fd, tmp = tempfile.mkstemp(dir=path.parent, prefix=f".{path.name}.")
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f, indent=4)
os.chmod(tmp, 0o600)
os.replace(tmp, path)
except BaseException:
if os.path.exists(tmp):
os.unlink(tmp)
raise
def get_writable_config_path() -> Path | None:
"""Find a writable location for the config file with fallback options.
@@ -43,6 +79,7 @@ def get_writable_config_path() -> Path | None:
for config_path in fallback_paths:
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
_ensure_dir_mode(config_path.parent)
test_file = config_path.parent / ".crewai_write_test"
try:
test_file.write_text("test")
@@ -153,6 +190,7 @@ class Settings(BaseModel):
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
_ensure_dir_mode(config_path.parent)
except Exception:
merged_data = {**data}
super().__init__(config_path=Path("/dev/null"), **merged_data)
@@ -194,8 +232,7 @@ class Settings(BaseModel):
existing_data = {}
updated_data = {**existing_data, **self.model_dump(exclude_unset=True)}
with self.config_path.open("w") as f:
json.dump(updated_data, f, indent=4)
_write_secure_json(self.config_path, updated_data)
except Exception: # noqa: S110
pass

View File

@@ -95,6 +95,14 @@ class TokenManager:
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
# Enforce the documented 0o700 mode: mkdir is subject to umask and does
# not adjust the mode of a pre-existing directory, so chmod explicitly.
try:
storage_path.chmod(0o700)
except OSError:
# Best-effort permission hardening only: some platforms/filesystems
# may reject chmod here, and token operations should still proceed.
pass
return storage_path