"""Tests for TokenManager with atomic file operations.""" import json import os import stat import sys import tempfile import unittest from datetime import datetime, timedelta from pathlib import Path from unittest.mock import patch from cryptography.fernet import Fernet from crewai_core.token_manager import TokenManager class TestTokenManager(unittest.TestCase): """Test cases for TokenManager.""" @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def setUp(self, mock_get_key: unittest.mock.MagicMock) -> None: """Set up test fixtures.""" mock_get_key.return_value = Fernet.generate_key() self.token_manager = TokenManager() @patch("crewai_core.token_manager.TokenManager._read_secure_file") @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_get_or_create_key_existing( self, mock_get_or_create: unittest.mock.MagicMock, mock_read: unittest.mock.MagicMock, ) -> None: """Test that existing key is returned when present.""" mock_key = Fernet.generate_key() mock_get_or_create.return_value = mock_key token_manager = TokenManager() result = token_manager.key self.assertEqual(result, mock_key) def test_get_or_create_key_new(self) -> None: """Test that new key is created when none exists.""" mock_key = Fernet.generate_key() with ( patch.object(self.token_manager, "_read_secure_file", return_value=None) as mock_read, patch.object(self.token_manager, "_atomic_create_secure_file", return_value=True) as mock_atomic_create, patch("crewai_core.token_manager.Fernet.generate_key", return_value=mock_key) as mock_generate, ): result = self.token_manager._get_or_create_key() self.assertEqual(result, mock_key) mock_read.assert_called_with("secret.key") mock_generate.assert_called_once() mock_atomic_create.assert_called_once_with("secret.key", mock_key) def test_get_or_create_key_race_condition(self) -> None: """Test that another process's key is used when atomic create fails.""" our_key = Fernet.generate_key() their_key = Fernet.generate_key() with ( patch.object(self.token_manager, "_read_secure_file", side_effect=[None, their_key]) as mock_read, patch.object(self.token_manager, "_atomic_create_secure_file", return_value=False) as mock_atomic_create, patch("crewai_core.token_manager.Fernet.generate_key", return_value=our_key), ): result = self.token_manager._get_or_create_key() self.assertEqual(result, their_key) self.assertEqual(mock_read.call_count, 2) @patch("crewai_core.token_manager.TokenManager._atomic_write_secure_file") def test_save_tokens( self, mock_write: unittest.mock.MagicMock ) -> None: """Test saving tokens encrypts and writes atomically.""" access_token = "test_token" expires_at = int((datetime.now() + timedelta(seconds=3600)).timestamp()) self.token_manager.save_tokens(access_token, expires_at) mock_write.assert_called_once() args = mock_write.call_args[0] self.assertEqual(args[0], "tokens.enc") decrypted_data = self.token_manager.fernet.decrypt(args[1]) data = json.loads(decrypted_data) self.assertEqual(data["access_token"], access_token) expiration = datetime.fromisoformat(data["expiration"]) self.assertEqual(expiration, datetime.fromtimestamp(expires_at)) @patch("crewai_core.token_manager.TokenManager._read_secure_file") def test_get_token_valid( self, mock_read: unittest.mock.MagicMock ) -> None: """Test getting a valid non-expired token.""" access_token = "test_token" expiration = (datetime.now() + timedelta(hours=1)).isoformat() data = {"access_token": access_token, "expiration": expiration} encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode()) mock_read.return_value = encrypted_data result = self.token_manager.get_token() self.assertEqual(result, access_token) @patch("crewai_core.token_manager.TokenManager._read_secure_file") def test_get_token_expired( self, mock_read: unittest.mock.MagicMock ) -> None: """Test that expired token returns None.""" access_token = "test_token" expiration = (datetime.now() - timedelta(hours=1)).isoformat() data = {"access_token": access_token, "expiration": expiration} encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode()) mock_read.return_value = encrypted_data result = self.token_manager.get_token() self.assertIsNone(result) @patch("crewai_core.token_manager.TokenManager._read_secure_file") def test_get_token_not_found( self, mock_read: unittest.mock.MagicMock ) -> None: """Test that missing token file returns None.""" mock_read.return_value = None result = self.token_manager.get_token() self.assertIsNone(result) @patch("crewai_core.token_manager.TokenManager._delete_secure_file") def test_clear_tokens( self, mock_delete: unittest.mock.MagicMock ) -> None: """Test clearing tokens deletes the token file.""" self.token_manager.clear_tokens() mock_delete.assert_called_once_with("tokens.enc") class TestAtomicFileOperations(unittest.TestCase): """Test atomic file operations directly.""" def setUp(self) -> None: """Set up test fixtures with temp directory.""" self.temp_dir = tempfile.mkdtemp() self.original_get_path = TokenManager._get_secure_storage_path def mock_get_path() -> Path: return Path(self.temp_dir) TokenManager._get_secure_storage_path = staticmethod(mock_get_path) def tearDown(self) -> None: """Clean up temp directory.""" TokenManager._get_secure_storage_path = staticmethod(self.original_get_path) import shutil shutil.rmtree(self.temp_dir, ignore_errors=True) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_atomic_create_new_file( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test atomic create succeeds for new file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() result = tm._atomic_create_secure_file("test.txt", b"content") self.assertTrue(result) file_path = Path(self.temp_dir) / "test.txt" self.assertTrue(file_path.exists()) self.assertEqual(file_path.read_bytes(), b"content") self.assertEqual(file_path.stat().st_mode & 0o777, 0o600) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_atomic_create_existing_file( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test atomic create fails for existing file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() file_path = Path(self.temp_dir) / "test.txt" file_path.write_bytes(b"original") result = tm._atomic_create_secure_file("test.txt", b"new content") self.assertFalse(result) self.assertEqual(file_path.read_bytes(), b"original") @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_atomic_write_new_file( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test atomic write creates new file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() tm._atomic_write_secure_file("test.txt", b"content") file_path = Path(self.temp_dir) / "test.txt" self.assertTrue(file_path.exists()) self.assertEqual(file_path.read_bytes(), b"content") self.assertEqual(file_path.stat().st_mode & 0o777, 0o600) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_atomic_write_overwrites( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test atomic write overwrites existing file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() file_path = Path(self.temp_dir) / "test.txt" file_path.write_bytes(b"original") tm._atomic_write_secure_file("test.txt", b"new content") self.assertEqual(file_path.read_bytes(), b"new content") @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_atomic_write_no_temp_file_on_success( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test that temp file is cleaned up after successful write.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() tm._atomic_write_secure_file("test.txt", b"content") temp_files = list(Path(self.temp_dir).glob(".test.txt.*")) self.assertEqual(len(temp_files), 0) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_read_secure_file_exists( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test reading existing file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() file_path = Path(self.temp_dir) / "test.txt" file_path.write_bytes(b"content") result = tm._read_secure_file("test.txt") self.assertEqual(result, b"content") @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_read_secure_file_not_exists( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test reading non-existent file returns None.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() result = tm._read_secure_file("nonexistent.txt") self.assertIsNone(result) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_delete_secure_file_exists( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test deleting existing file.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() file_path = Path(self.temp_dir) / "test.txt" file_path.write_bytes(b"content") tm._delete_secure_file("test.txt") self.assertFalse(file_path.exists()) @patch("crewai_core.token_manager.TokenManager._get_or_create_key") def test_delete_secure_file_not_exists( self, mock_get_key: unittest.mock.MagicMock ) -> None: """Test deleting non-existent file doesn't raise.""" mock_get_key.return_value = Fernet.generate_key() tm = TokenManager() tm._delete_secure_file("nonexistent.txt") class TestSecureStoragePathPermissions(unittest.TestCase): """Test that the credential directory is created with restrictive permissions.""" @unittest.skipIf(sys.platform == "win32", "POSIX permission semantics") def test_storage_path_is_owner_only(self) -> None: """The credential directory must be mode 0o700 even under a permissive umask.""" with tempfile.TemporaryDirectory() as base: old_umask = os.umask(0o022) try: with ( patch("crewai_core.token_manager.sys.platform", "linux"), patch( "crewai_core.token_manager.os.path.expanduser", return_value=base, ), ): storage_path = TokenManager._get_secure_storage_path() finally: os.umask(old_umask) self.assertTrue(storage_path.is_dir()) mode = stat.S_IMODE(storage_path.stat().st_mode) self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}") @unittest.skipIf(sys.platform == "win32", "POSIX permission semantics") def test_existing_loose_dir_is_tightened(self) -> None: """A pre-existing world-traversable directory is corrected to 0o700.""" with tempfile.TemporaryDirectory() as base: loose = Path(base) / "crewai" / "credentials" loose.mkdir(parents=True) loose.chmod(0o755) with ( patch("crewai_core.token_manager.sys.platform", "linux"), patch( "crewai_core.token_manager.os.path.expanduser", return_value=base, ), ): storage_path = TokenManager._get_secure_storage_path() mode = stat.S_IMODE(storage_path.stat().st_mode) self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}") if __name__ == "__main__": unittest.main()