Feat/cli deploy (#1240)

* feat: set basic structure deploy commands

* feat: add first iteration of CLI Deploy

* feat: some minor refactor

* feat: Add api, Deploy command and update cli

* feat: Remove test token

* feat: add auth0 lib, update cli and improve code

* feat: update code and decouple auth

* fix: parts of the code

* feat: Add token manager to encrypt access token and get and save tokens

* feat: add audience to costants

* feat: add subsystem saving credentials and remove comment of type hinting

* feat: add get crew version to send on header of request

* feat: add docstrings

* feat: add tests for authentication module

* feat: add tests for utils

* feat: add unit tests for cl

* feat: add tests

* feat: add deploy man tests

* feat: fix type checking issue

* feat: rename tests to pass ci

* feat: fix pr issues

* feat: fix get crewai versoin

* fix: add timeout for tests.yml
This commit is contained in:
Eduardo Chiarotti
2024-08-23 10:20:03 -03:00
committed by GitHub
parent 4736604b4d
commit f5246039e5
18 changed files with 1481 additions and 18 deletions

View File

@@ -0,0 +1,3 @@
from .main import AuthenticationCommand
__all__ = ["AuthenticationCommand"]

View File

@@ -0,0 +1,4 @@
ALGORITHMS = ["RS256"]
AUTH0_DOMAIN = "dev-jzsr0j8zs0atl5ha.us.auth0.com"
AUTH0_CLIENT_ID = "CZtyRHuVW80HbLSjk4ggXNzjg4KAt7Oe"
AUTH0_AUDIENCE = "https://dev-jzsr0j8zs0atl5ha.us.auth0.com/api/v2/"

View File

@@ -0,0 +1,75 @@
import time
import webbrowser
from typing import Any, Dict
import requests
from rich.console import Console
from .constants import AUTH0_AUDIENCE, AUTH0_CLIENT_ID, AUTH0_DOMAIN
from .utils import TokenManager, validate_token
console = Console()
class AuthenticationCommand:
DEVICE_CODE_URL = f"https://{AUTH0_DOMAIN}/oauth/device/code"
TOKEN_URL = f"https://{AUTH0_DOMAIN}/oauth/token"
def __init__(self):
self.token_manager = TokenManager()
def signup(self) -> None:
"""Sign up to CrewAI+"""
console.print("Signing Up to CrewAI+ \n", style="bold blue")
device_code_data = self._get_device_code()
self._display_auth_instructions(device_code_data)
return self._poll_for_token(device_code_data)
def _get_device_code(self) -> Dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
"client_id": AUTH0_CLIENT_ID,
"scope": "openid",
"audience": AUTH0_AUDIENCE,
}
response = requests.post(url=self.DEVICE_CODE_URL, data=device_code_payload)
response.raise_for_status()
return response.json()
def _display_auth_instructions(self, device_code_data: Dict[str, str]) -> None:
"""Display the authentication instructions to the user."""
console.print("1. Navigate to: ", device_code_data["verification_uri_complete"])
console.print("2. Enter the following code: ", device_code_data["user_code"])
webbrowser.open(device_code_data["verification_uri_complete"])
def _poll_for_token(self, device_code_data: Dict[str, Any]) -> None:
"""Poll the server for the token."""
token_payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code_data["device_code"],
"client_id": AUTH0_CLIENT_ID,
}
attempts = 0
while True and attempts < 5:
response = requests.post(self.TOKEN_URL, data=token_payload)
token_data = response.json()
if response.status_code == 200:
validate_token(token_data["id_token"])
expires_in = 360000 # Token expiration time in seconds
self.token_manager.save_tokens(token_data["access_token"], expires_in)
console.print("\nWelcome to CrewAI+ !!", style="green")
return
if token_data["error"] not in ("authorization_pending", "slow_down"):
raise requests.HTTPError(token_data["error_description"])
time.sleep(device_code_data["interval"])
attempts += 1
console.print(
"Timeout: Failed to get the token. Please try again.", style="bold red"
)

View File

@@ -0,0 +1,144 @@
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from auth0.authentication.token_verifier import (
AsymmetricSignatureVerifier,
TokenVerifier,
)
from cryptography.fernet import Fernet
from .constants import AUTH0_CLIENT_ID, AUTH0_DOMAIN
def validate_token(id_token: str) -> None:
"""
Verify the token and its precedence
:param id_token:
"""
jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
issuer = f"https://{AUTH0_DOMAIN}/"
signature_verifier = AsymmetricSignatureVerifier(jwks_url)
token_verifier = TokenVerifier(
signature_verifier=signature_verifier, issuer=issuer, audience=AUTH0_CLIENT_ID
)
token_verifier.verify(id_token)
class TokenManager:
def __init__(self, file_path: str = "tokens.enc") -> None:
"""
Initialize the TokenManager class.
:param file_path: The file path to store the encrypted tokens. Default is "tokens.enc".
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""
Get or create the encryption key.
:return: The encryption key.
"""
key_filename = "secret.key"
key = self.read_secure_file(key_filename)
if key is not None:
return key
new_key = Fernet.generate_key()
self.save_secure_file(key_filename, new_key)
return new_key
def save_tokens(self, access_token: str, expires_in: int) -> None:
"""
Save the access token and its expiration time.
:param access_token: The access token to save.
:param expires_in: The expiration time of the access token in seconds.
"""
expiration_time = datetime.now() + timedelta(seconds=expires_in)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self.save_secure_file(self.file_path, encrypted_data)
def get_token(self) -> Optional[str]:
"""
Get the access token if it is valid and not expired.
:return: The access token if valid and not expired, otherwise None.
"""
encrypted_data = self.read_secure_file(self.file_path)
decrypted_data = self.fernet.decrypt(encrypted_data)
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return data["access_token"]
def get_secure_storage_path(self) -> Path:
"""
Get the secure storage path based on the operating system.
:return: The secure storage path.
"""
if sys.platform == "win32":
# Windows: Use %LOCALAPPDATA%
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
# macOS: Use ~/Library/Application Support
base_path = os.path.expanduser("~/Library/Application Support")
else:
# Linux and other Unix-like: Use ~/.local/share
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def save_secure_file(self, filename: str, content: bytes) -> None:
"""
Save the content to a secure file.
:param filename: The name of the file.
:param content: The content to save.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
with open(file_path, "wb") as f:
f.write(content)
# Set appropriate permissions (read/write for owner only)
os.chmod(file_path, 0o600)
def read_secure_file(self, filename: str) -> Optional[bytes]:
"""
Read the content of a secure file.
:param filename: The name of the file.
:return: The content of the file if it exists, otherwise None.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if not file_path.exists():
return None
with open(file_path, "rb") as f:
return f.read()