import base64 import hashlib import secrets import textwrap import webbrowser from typing import Any, Dict import requests from rich.console import Console from crewai.cli.tools.main import ToolCommand from .constants import ( WORKOS_AUTHORIZE_URL, WORKOS_CLIENT_ID, WORKOS_DOMAIN, WORKOS_TOKEN_URL, ) from .utils import TokenManager, validate_token console = Console() import socket from urllib.parse import parse_qs, urlparse class AuthenticationCommand: CODE_VERIFIER = secrets.token_urlsafe(64) CODE_CHALLENGE = ( base64.urlsafe_b64encode(hashlib.sha256(CODE_VERIFIER.encode()).digest()) .rstrip(b"=") .decode("utf-8") ) NONCE = secrets.token_hex(6) STATE = secrets.token_hex(9) SOCKET_HOST = "0.0.0.0" SOCKET_PORT = 49152 def __init__(self): self.token_manager = TokenManager() self.auth_url = self._get_auth_url() def login(self) -> None: """Login or Sign Up to CrewAI Enterprise""" console.print("Signing in to CrewAI enterprise... \n", style="bold blue") # 1. Get the auth URL. Upon successful authentication, browser will redirect back to CLI with the 'code' parameter. console.print( f"1. Navigate to [bold blue][link={self.auth_url}]this link.[/link][/bold blue] (it should open automatically in a few seconds...)", style="bold", ) webbrowser.open(self.auth_url) # 2. Listen for the auth response from the browser, and upon receiving the 'code' parameter, authenticate the user. redirect_url_params = self._listen_for_auth_response() console.print( "2. Login successful. Retrieving your [bold blue]access tokens[/bold blue]...", style="bold", ) auth_response = self._authenticate(redirect_url_params) # 3. Validate the JWT token signature, extract the access and refresh tokens and save them to the token manager. access_token, refresh_token, user_info = self._validate_and_extract_tokens( auth_response ) self.token_manager.save_access_token(access_token, auth_response["expires_in"]) self.token_manager.save_refresh_token(refresh_token) # 4. Sign in to the tool repository. console.print( "3. All good. Now signing you in to [bold blue]tool repository[/bold blue]...", style="bold", ) self._sign_in_to_tool_repository() # 5. Wrap up. console.print( f"4. Done! You are now signed in to CrewAI enterprise. Welcome, [bold cyan]{user_info.get('name')}[/bold cyan].", style="bold green", ) return None def _get_auth_url(self) -> str: return ( f"{WORKOS_AUTHORIZE_URL}?" f"response_type=code&" f"client_id={WORKOS_CLIENT_ID}&" f"redirect_uri=http://localhost:{self.SOCKET_PORT}&" f"scope=openid+profile+email+offline_access&" f"code_challenge={self.CODE_CHALLENGE}&" f"code_challenge_method=S256&" f"nonce={self.NONCE}&" f"state={self.STATE}" ) def _listen_for_auth_response(self) -> dict[str, str]: """ Listen for the authentication response from the browser. Returns: dict[str, str]: The URL parameters passed in the querystring of the redirect URL. """ redirect_url_params = {} with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: server_socket.bind((self.SOCKET_HOST, self.SOCKET_PORT)) server_socket.listen(1) console.print("> Waiting for browser login and approval...", style="yellow") conn, addr = server_socket.accept() with conn: request = conn.recv(1024).decode("utf-8") # Extract the request line (first line of the HTTP request) request_line = request.splitlines()[0] method, path, _ = request_line.split() # Parse the URL path to get query string parameters parsed_url = urlparse(path) redirect_url_params = parse_qs(parsed_url.query) # Convert values from lists to single values if appropriate redirect_url_params = { k: v[0] if len(v) == 1 else v for k, v in redirect_url_params.items() } # Prepare the HTTP response with success message and JS that attempts to close the tab. html_body = self._html_response_body() http_response = f"HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: {len(html_body.encode('utf-8'))}\r\nConnection: close\r\n\r\n{html_body}" conn.sendall(http_response.encode("utf-8")) server_socket.close() console.print("> Response received. Proceeding to login...", style="green") return redirect_url_params def _authenticate(self, params) -> dict[str, str]: response = requests.post( WORKOS_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": WORKOS_CLIENT_ID, "code": params["code"], "redirect_uri": f"http://localhost:{self.SOCKET_PORT}", "code_verifier": self.CODE_VERIFIER, }, ) if response.status_code != 200: console.print( f"❌ Failed to sign in to CrewAI enterprise. \nRun [bold]crewai login[/bold] and try logging in again.\n", style="red", ) raise SystemExit return response.json() def _validate_and_extract_tokens( self, response_dict: dict[str, str] ) -> [str, str, dict[str, str]]: user_info = {} try: validate_token(response_dict["access_token"]) user_info = validate_token(response_dict["id_token"], "id_token") except Exception as e: console.print( f"❌ Failure validating JWT token signature, login failed. \nRun [bold]crewai login[/bold] to try logging in again.\n\n Error: {e}", style="red", ) raise SystemExit return response_dict["access_token"], response_dict["refresh_token"], user_info def _sign_in_to_tool_repository(self) -> None: try: ToolCommand().login() except Exception as e: console.print( "\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.", style="yellow", ) console.print( "Other features will work normally, but you may experience limitations " "with downloading and publishing tools." "\nRun [bold]crewai login[/bold] to try logging in again.\n", style="yellow", ) console.print(f"Error: {e}", style="red") def _html_response_body(self) -> str: html_body = textwrap.dedent("""\
Your authentication through CLI was successful. This browser tab should close automatically.
If it doesn't close in a few seconds, you may close it manually.