Files
crewAI/tests/cli/authentication/test_utils.py
Bowen Liang 0b0f2d30ab sort imports with isort rules by ruff linter (#1730)
* sort imports

* update

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
Co-authored-by: Eduardo Chiarotti <dudumelgaco@hotmail.com>
2024-12-11 10:46:53 -05:00

149 lines
6.1 KiB
Python

import json
import unittest
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from cryptography.fernet import Fernet
from crewai.cli.authentication.utils import TokenManager, validate_token
class TestValidateToken(unittest.TestCase):
@patch("crewai.cli.authentication.utils.AsymmetricSignatureVerifier")
@patch("crewai.cli.authentication.utils.TokenVerifier")
def test_validate_token(self, mock_token_verifier, mock_asymmetric_verifier):
mock_verifier_instance = mock_token_verifier.return_value
mock_id_token = "mock_id_token"
validate_token(mock_id_token)
mock_asymmetric_verifier.assert_called_once_with(
"https://crewai.us.auth0.com/.well-known/jwks.json"
)
mock_token_verifier.assert_called_once_with(
signature_verifier=mock_asymmetric_verifier.return_value,
issuer="https://crewai.us.auth0.com/",
audience="DEVC5Fw6NlRoSzmDCcOhVq85EfLBjKa8",
)
mock_verifier_instance.verify.assert_called_once_with(mock_id_token)
class TestTokenManager(unittest.TestCase):
def setUp(self):
self.token_manager = TokenManager()
@patch("crewai.cli.authentication.utils.TokenManager.read_secure_file")
@patch("crewai.cli.authentication.utils.TokenManager.save_secure_file")
@patch("crewai.cli.authentication.utils.TokenManager._get_or_create_key")
def test_get_or_create_key_existing(self, mock_get_or_create, mock_save, mock_read):
mock_key = Fernet.generate_key()
mock_get_or_create.return_value = mock_key
token_manager = TokenManager()
result = token_manager.key
self.assertEqual(result, mock_key)
@patch("crewai.cli.authentication.utils.Fernet.generate_key")
@patch("crewai.cli.authentication.utils.TokenManager.read_secure_file")
@patch("crewai.cli.authentication.utils.TokenManager.save_secure_file")
def test_get_or_create_key_new(self, mock_save, mock_read, mock_generate):
mock_key = b"new_key"
mock_read.return_value = None
mock_generate.return_value = mock_key
result = self.token_manager._get_or_create_key()
self.assertEqual(result, mock_key)
mock_read.assert_called_once_with("secret.key")
mock_generate.assert_called_once()
mock_save.assert_called_once_with("secret.key", mock_key)
@patch("crewai.cli.authentication.utils.TokenManager.save_secure_file")
def test_save_tokens(self, mock_save):
access_token = "test_token"
expires_in = 3600
self.token_manager.save_tokens(access_token, expires_in)
mock_save.assert_called_once()
args = mock_save.call_args[0]
self.assertEqual(args[0], "tokens.enc")
decrypted_data = self.token_manager.fernet.decrypt(args[1])
data = json.loads(decrypted_data)
self.assertEqual(data["access_token"], access_token)
expiration = datetime.fromisoformat(data["expiration"])
self.assertAlmostEqual(
expiration,
datetime.now() + timedelta(seconds=expires_in),
delta=timedelta(seconds=1),
)
@patch("crewai.cli.authentication.utils.TokenManager.read_secure_file")
def test_get_token_valid(self, mock_read):
access_token = "test_token"
expiration = (datetime.now() + timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertEqual(result, access_token)
@patch("crewai.cli.authentication.utils.TokenManager.read_secure_file")
def test_get_token_expired(self, mock_read):
access_token = "test_token"
expiration = (datetime.now() - timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai.cli.authentication.utils.TokenManager.get_secure_storage_path")
@patch("builtins.open", new_callable=unittest.mock.mock_open)
@patch("crewai.cli.authentication.utils.os.chmod")
def test_save_secure_file(self, mock_chmod, mock_open, mock_get_path):
mock_path = MagicMock()
mock_get_path.return_value = mock_path
filename = "test_file.txt"
content = b"test_content"
self.token_manager.save_secure_file(filename, content)
mock_path.__truediv__.assert_called_once_with(filename)
mock_open.assert_called_once_with(mock_path.__truediv__.return_value, "wb")
mock_open().write.assert_called_once_with(content)
mock_chmod.assert_called_once_with(mock_path.__truediv__.return_value, 0o600)
@patch("crewai.cli.authentication.utils.TokenManager.get_secure_storage_path")
@patch(
"builtins.open", new_callable=unittest.mock.mock_open, read_data=b"test_content"
)
def test_read_secure_file_exists(self, mock_open, mock_get_path):
mock_path = MagicMock()
mock_get_path.return_value = mock_path
mock_path.__truediv__.return_value.exists.return_value = True
filename = "test_file.txt"
result = self.token_manager.read_secure_file(filename)
self.assertEqual(result, b"test_content")
mock_path.__truediv__.assert_called_once_with(filename)
mock_open.assert_called_once_with(mock_path.__truediv__.return_value, "rb")
@patch("crewai.cli.authentication.utils.TokenManager.get_secure_storage_path")
def test_read_secure_file_not_exists(self, mock_get_path):
mock_path = MagicMock()
mock_get_path.return_value = mock_path
mock_path.__truediv__.return_value.exists.return_value = False
filename = "test_file.txt"
result = self.token_manager.read_secure_file(filename)
self.assertIsNone(result)
mock_path.__truediv__.assert_called_once_with(filename)