diff --git a/src/crewai/cli/authentication/main.py b/src/crewai/cli/authentication/main.py index 2fd9c632b..72f176655 100644 --- a/src/crewai/cli/authentication/main.py +++ b/src/crewai/cli/authentication/main.py @@ -1,4 +1,7 @@ -import time +import base64 +import hashlib +import secrets +import textwrap import webbrowser from typing import Any, Dict @@ -7,87 +10,216 @@ from rich.console import Console from crewai.cli.tools.main import ToolCommand -from .constants import AUTH0_AUDIENCE, AUTH0_CLIENT_ID, AUTH0_DOMAIN +from .constants import ( + WORKOS_AUTHORIZE_URL, + WORKOS_CLIENT_ID, + WORKOS_DOMAIN, + WORKOS_TOKEN_URL, +) from .utils import TokenManager, validate_token console = Console() +import socket +from urllib.parse import parse_qs, urlparse + class AuthenticationCommand: - DEVICE_CODE_URL = f"https://{AUTH0_DOMAIN}/oauth/device/code" - TOKEN_URL = f"https://{AUTH0_DOMAIN}/oauth/token" + CODE_VERIFIER = secrets.token_urlsafe(64) + CODE_CHALLENGE = ( + base64.urlsafe_b64encode(hashlib.sha256(CODE_VERIFIER.encode()).digest()) + .rstrip(b"=") + .decode("utf-8") + ) + NONCE = secrets.token_hex(6) + STATE = secrets.token_hex(9) + SOCKET_HOST = "0.0.0.0" + SOCKET_PORT = 49152 def __init__(self): self.token_manager = TokenManager() + self.auth_url = self._get_auth_url() def login(self) -> None: """Login or Sign Up to CrewAI Enterprise""" - return self._poll_for_token(device_code_data) + console.print("Signing in to CrewAI enterprise... \n", style="bold blue") - def _get_device_code(self) -> Dict[str, Any]: - """Get the device code to authenticate the user.""" - - device_code_payload = { - "client_id": AUTH0_CLIENT_ID, - "scope": "openid", - "audience": AUTH0_AUDIENCE, - } - response = requests.post( - url=self.DEVICE_CODE_URL, data=device_code_payload, timeout=20 + # 1. Get the auth URL. Upon successful authentication, browser will redirect back to CLI with the 'code' parameter. + console.print( + f"1. Navigate to [bold blue][link={self.auth_url}]this link.[/link][/bold blue] (it should open automatically in a few seconds...)", + style="bold", ) - response.raise_for_status() + webbrowser.open(self.auth_url) + + # 2. Listen for the auth response from the browser, and upon receiving the 'code' parameter, authenticate the user. + redirect_url_params = self._listen_for_auth_response() + console.print( + "2. Login successful. Retrieving your [bold blue]access tokens[/bold blue]...", + style="bold", + ) + auth_response = self._authenticate(redirect_url_params) + + # 3. Validate the JWT token signature, extract the access and refresh tokens and save them to the token manager. + access_token, refresh_token, user_info = self._validate_and_extract_tokens( + auth_response + ) + self.token_manager.save_access_token(access_token, auth_response["expires_in"]) + self.token_manager.save_refresh_token(refresh_token) + + # 4. Sign in to the tool repository. + console.print( + "3. All good. Now signing you in to [bold blue]tool repository[/bold blue]...", + style="bold", + ) + self._sign_in_to_tool_repository() + + # 5. Wrap up. + console.print( + f"4. Done! You are now signed in to CrewAI enterprise. Welcome, [bold cyan]{user_info.get('name')}[/bold cyan].", + style="bold green", + ) + return None + + def _get_auth_url(self) -> str: + return ( + f"{WORKOS_AUTHORIZE_URL}?" + f"response_type=code&" + f"client_id={WORKOS_CLIENT_ID}&" + f"redirect_uri=http://localhost:{self.SOCKET_PORT}&" + f"scope=openid+profile+email+offline_access&" + f"code_challenge={self.CODE_CHALLENGE}&" + f"code_challenge_method=S256&" + f"nonce={self.NONCE}&" + f"state={self.STATE}" + ) + + def _listen_for_auth_response(self) -> dict[str, str]: + """ + Listen for the authentication response from the browser. + + Returns: + dict[str, str]: The URL parameters passed in the querystring of the redirect URL. + """ + + redirect_url_params = {} + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + server_socket.bind((self.SOCKET_HOST, self.SOCKET_PORT)) + server_socket.listen(1) + console.print("> Waiting for browser login and approval...", style="yellow") + + conn, addr = server_socket.accept() + with conn: + request = conn.recv(1024).decode("utf-8") + + # Extract the request line (first line of the HTTP request) + request_line = request.splitlines()[0] + method, path, _ = request_line.split() + + # Parse the URL path to get query string parameters + parsed_url = urlparse(path) + redirect_url_params = parse_qs(parsed_url.query) + + # Convert values from lists to single values if appropriate + redirect_url_params = { + k: v[0] if len(v) == 1 else v + for k, v in redirect_url_params.items() + } + + # Prepare the HTTP response with success message and JS that attempts to close the tab. + html_body = self._html_response_body() + http_response = f"HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {len(html_body.encode('utf-8'))}\r\nConnection: close\r\n\r\n{html_body}" + conn.sendall(http_response.encode("utf-8")) + + server_socket.close() + console.print("> Response received. Proceeding to login...", style="green") + + return redirect_url_params + + def _authenticate(self, params) -> dict[str, str]: + response = requests.post( + WORKOS_TOKEN_URL, + data={ + "grant_type": "authorization_code", + "client_id": WORKOS_CLIENT_ID, + "code": params["code"], + "redirect_uri": f"http://localhost:{self.SOCKET_PORT}", + "code_verifier": self.CODE_VERIFIER, + }, + ) + + if response.status_code != 200: + console.print( + f"❌ Failed to sign in to CrewAI enterprise. \nRun [bold]crewai login[/bold] and try logging in again.\n", + style="red", + ) + raise SystemExit + return response.json() - def _display_auth_instructions(self, device_code_data: Dict[str, str]) -> None: - """Display the authentication instructions to the user.""" - console.print("1. Navigate to: ", device_code_data["verification_uri_complete"]) - console.print("2. Enter the following code: ", device_code_data["user_code"]) - webbrowser.open(device_code_data["verification_uri_complete"]) + def _validate_and_extract_tokens( + self, response_dict: dict[str, str] + ) -> [str, str, dict[str, str]]: + user_info = {} + try: + validate_token(response_dict["access_token"]) + user_info = validate_token(response_dict["id_token"], "id_token") + except Exception as e: + console.print( + f"❌ Failure validating JWT token signature, login failed. \nRun [bold]crewai login[/bold] to try logging in again.\n\n Error: {e}", + style="red", + ) + raise SystemExit - def _poll_for_token(self, device_code_data: Dict[str, Any]) -> None: - """Poll the server for the token.""" - token_payload = { - "grant_type": "urn:ietf:params:oauth:grant-type:device_code", - "device_code": device_code_data["device_code"], - "client_id": AUTH0_CLIENT_ID, - } + return response_dict["access_token"], response_dict["refresh_token"], user_info - attempts = 0 - while True and attempts < 5: - response = requests.post(self.TOKEN_URL, data=token_payload, timeout=30) - token_data = response.json() + def _sign_in_to_tool_repository(self) -> None: + try: + ToolCommand().login() + except Exception as e: + console.print( + "\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.", + style="yellow", + ) + console.print( + "Other features will work normally, but you may experience limitations " + "with downloading and publishing tools." + "\nRun [bold]crewai login[/bold] to try logging in again.\n", + style="yellow", + ) + console.print(f"Error: {e}", style="red") - if response.status_code == 200: - validate_token(token_data["id_token"]) - expires_in = 360000 # Token expiration time in seconds - self.token_manager.save_tokens(token_data["access_token"], expires_in) - - try: - ToolCommand().login() - except Exception: - console.print( - "\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.", - style="yellow", - ) - console.print( - "Other features will work normally, but you may experience limitations " - "with downloading and publishing tools." - "\nRun [bold]crewai login[/bold] to try logging in again.\n", - style="yellow", - ) - - console.print( - "\n[bold green]Welcome to CrewAI Enterprise![/bold green]\n" - ) - return - - if token_data["error"] not in ("authorization_pending", "slow_down"): - raise requests.HTTPError(token_data["error_description"]) - - time.sleep(device_code_data["interval"]) - attempts += 1 - - console.print( - "Timeout: Failed to get the token. Please try again.", style="bold red" - ) + def _html_response_body(self) -> str: + html_body = textwrap.dedent("""\ + + +
+ + +Your authentication through CLI was successful. This browser tab should close automatically.
+If it doesn't close in a few seconds, you may close it manually.
+