mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-04 05:38:33 +00:00
* feat: agent metaclass, refactor a2a to wrappers * feat: a2a schemas and utils * chore: move agent class, update imports * refactor: organize imports to avoid circularity, add a2a to console * feat: pass response_model through call chain * feat: add standard openapi spec serialization to tools and structured output * feat: a2a events * chore: add a2a to pyproject * docs: minimal base for learn docs * fix: adjust a2a conversation flow, allow llm to decide exit until max_retries * fix: inject agent skills into initial prompt * fix: format agent card as json in prompt * refactor: simplify A2A agent prompt formatting and improve skill display * chore: wide cleanup * chore: cleanup logic, add auth cache, use json for messages in prompt * chore: update docs * fix: doc snippets formatting * feat: optimize A2A agent card fetching and improve error reporting * chore: move imports to top of file * chore: refactor hasattr check * chore: add httpx-auth, update lockfile * feat: create base public api * chore: cleanup modules, add docstrings, types * fix: exclude extra fields in prompt * chore: update docs * tests: update to correct import * chore: lint for ruff, add missing import * fix: tweak openai streaming logic for response model * tests: add reimport for test * tests: add reimport for test * fix: don't set a2a attr if not set * fix: don't set a2a attr if not set * chore: update cassettes * tests: fix tests * fix: use instructor and dont pass response_format for litellm * chore: consolidate event listeners, add typing * fix: address race condition in test, update cassettes * tests: add correct mocks, rerun cassette for json * tests: update cassette * chore: regenerate cassette after new run * fix: make token manager access-safe * fix: make token manager access-safe * merge * chore: update test and cassete for output pydantic * fix: tweak to disallow deadlock * chore: linter * fix: adjust event ordering for threading * fix: use conditional for batch check * tests: tweak for emission * tests: simplify api + event check * fix: ensure non-function calling llms see json formatted string * tests: tweak message comparison * fix: use internal instructor for litellm structure responses --------- Co-authored-by: Mike Plachta <mike@crewai.com>
167 lines
7.1 KiB
Python
167 lines
7.1 KiB
Python
import json
|
|
import unittest
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
from crewai.cli.shared.token_manager import TokenManager
|
|
|
|
|
|
class TestTokenManager(unittest.TestCase):
|
|
@patch("crewai.cli.shared.token_manager.TokenManager._get_or_create_key")
|
|
def setUp(self, mock_get_key):
|
|
mock_get_key.return_value = Fernet.generate_key()
|
|
self.token_manager = TokenManager()
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.read_secure_file")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.save_secure_file")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager._get_or_create_key")
|
|
def test_get_or_create_key_existing(self, mock_get_or_create, mock_save, mock_read):
|
|
mock_key = Fernet.generate_key()
|
|
mock_get_or_create.return_value = mock_key
|
|
|
|
token_manager = TokenManager()
|
|
result = token_manager.key
|
|
|
|
self.assertEqual(result, mock_key)
|
|
|
|
@patch("crewai.cli.shared.token_manager.Fernet.generate_key")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.read_secure_file")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.save_secure_file")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager._acquire_lock")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager._release_lock")
|
|
@patch("builtins.open", new_callable=unittest.mock.mock_open)
|
|
def test_get_or_create_key_new(
|
|
self, mock_open, mock_release_lock, mock_acquire_lock, mock_save, mock_read, mock_generate
|
|
):
|
|
mock_key = b"new_key"
|
|
mock_read.return_value = None
|
|
mock_generate.return_value = mock_key
|
|
|
|
result = self.token_manager._get_or_create_key()
|
|
|
|
self.assertEqual(result, mock_key)
|
|
# read_secure_file is called twice: once for fast path, once inside lock
|
|
self.assertEqual(mock_read.call_count, 2)
|
|
mock_read.assert_called_with("secret.key")
|
|
mock_generate.assert_called_once()
|
|
mock_save.assert_called_once_with("secret.key", mock_key)
|
|
# Verify lock was acquired and released
|
|
mock_acquire_lock.assert_called_once()
|
|
mock_release_lock.assert_called_once()
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.save_secure_file")
|
|
def test_save_tokens(self, mock_save):
|
|
access_token = "test_token"
|
|
expires_at = int((datetime.now() + timedelta(seconds=3600)).timestamp())
|
|
|
|
self.token_manager.save_tokens(access_token, expires_at)
|
|
|
|
mock_save.assert_called_once()
|
|
args = mock_save.call_args[0]
|
|
self.assertEqual(args[0], "tokens.enc")
|
|
decrypted_data = self.token_manager.fernet.decrypt(args[1])
|
|
data = json.loads(decrypted_data)
|
|
self.assertEqual(data["access_token"], access_token)
|
|
expiration = datetime.fromisoformat(data["expiration"])
|
|
self.assertEqual(expiration, datetime.fromtimestamp(expires_at))
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.read_secure_file")
|
|
def test_get_token_valid(self, mock_read):
|
|
access_token = "test_token"
|
|
expiration = (datetime.now() + timedelta(hours=1)).isoformat()
|
|
data = {"access_token": access_token, "expiration": expiration}
|
|
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
|
|
mock_read.return_value = encrypted_data
|
|
|
|
result = self.token_manager.get_token()
|
|
|
|
self.assertEqual(result, access_token)
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.read_secure_file")
|
|
def test_get_token_expired(self, mock_read):
|
|
access_token = "test_token"
|
|
expiration = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
data = {"access_token": access_token, "expiration": expiration}
|
|
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
|
|
mock_read.return_value = encrypted_data
|
|
|
|
result = self.token_manager.get_token()
|
|
|
|
self.assertIsNone(result)
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.get_secure_storage_path")
|
|
@patch("builtins.open", new_callable=unittest.mock.mock_open)
|
|
@patch("crewai.cli.shared.token_manager.os.chmod")
|
|
def test_save_secure_file(self, mock_chmod, mock_open, mock_get_path):
|
|
mock_path = MagicMock()
|
|
mock_get_path.return_value = mock_path
|
|
filename = "test_file.txt"
|
|
content = b"test_content"
|
|
|
|
self.token_manager.save_secure_file(filename, content)
|
|
|
|
mock_path.__truediv__.assert_called_once_with(filename)
|
|
mock_open.assert_called_once_with(mock_path.__truediv__.return_value, "wb")
|
|
mock_open().write.assert_called_once_with(content)
|
|
mock_chmod.assert_called_once_with(mock_path.__truediv__.return_value, 0o600)
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.get_secure_storage_path")
|
|
@patch(
|
|
"builtins.open", new_callable=unittest.mock.mock_open, read_data=b"test_content"
|
|
)
|
|
def test_read_secure_file_exists(self, mock_open, mock_get_path):
|
|
mock_path = MagicMock()
|
|
mock_get_path.return_value = mock_path
|
|
mock_path.__truediv__.return_value.exists.return_value = True
|
|
filename = "test_file.txt"
|
|
|
|
result = self.token_manager.read_secure_file(filename)
|
|
|
|
self.assertEqual(result, b"test_content")
|
|
mock_path.__truediv__.assert_called_once_with(filename)
|
|
mock_open.assert_called_once_with(mock_path.__truediv__.return_value, "rb")
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.get_secure_storage_path")
|
|
def test_read_secure_file_not_exists(self, mock_get_path):
|
|
mock_path = MagicMock()
|
|
mock_get_path.return_value = mock_path
|
|
mock_path.__truediv__.return_value.exists.return_value = False
|
|
filename = "test_file.txt"
|
|
|
|
result = self.token_manager.read_secure_file(filename)
|
|
|
|
self.assertIsNone(result)
|
|
mock_path.__truediv__.assert_called_once_with(filename)
|
|
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.get_secure_storage_path")
|
|
def test_clear_tokens(self, mock_get_path):
|
|
mock_path = MagicMock()
|
|
mock_get_path.return_value = mock_path
|
|
|
|
self.token_manager.clear_tokens()
|
|
|
|
mock_path.__truediv__.assert_called_once_with("tokens.enc")
|
|
mock_path.__truediv__.return_value.unlink.assert_called_once_with(
|
|
missing_ok=True
|
|
)
|
|
|
|
@patch("crewai.cli.shared.token_manager.Fernet.generate_key")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.read_secure_file")
|
|
@patch("crewai.cli.shared.token_manager.TokenManager.save_secure_file")
|
|
@patch("builtins.open", side_effect=OSError(9, "Bad file descriptor"))
|
|
def test_get_or_create_key_oserror_fallback(
|
|
self, mock_open, mock_save, mock_read, mock_generate
|
|
):
|
|
"""Test that OSError during file locking falls back to lock-free creation."""
|
|
mock_key = Fernet.generate_key()
|
|
mock_read.return_value = None
|
|
mock_generate.return_value = mock_key
|
|
|
|
result = self.token_manager._get_or_create_key()
|
|
|
|
self.assertEqual(result, mock_key)
|
|
self.assertGreaterEqual(mock_generate.call_count, 1)
|
|
self.assertGreaterEqual(mock_save.call_count, 1)
|