from pathlib import Path import subprocess from typing import Any from urllib.parse import quote import webbrowser from crewai_core.plus_api import CreateCrewPayload from rich.console import Console from crewai_cli import git from crewai_cli.command import BaseCommand, PlusAPIMixin from crewai_cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL from crewai_cli.deploy.archive import create_project_zip from crewai_cli.deploy.validate import DeployValidator, Severity, render_report from crewai_cli.utils import fetch_and_json_env_file, get_project_name console = Console() _MISSING_LOCKFILE_ERROR_CODES = {"missing_lockfile"} _DEPLOYMENT_ID_KEYS = ("deployment_id", "deploymentId") _DEPLOYMENT_FALLBACK_IDENTIFIER_KEYS = ("id", "uuid") def _run_predeploy_validation( skip_validate: bool, ignored_error_codes: set[str] | None = None, ) -> bool: """Run pre-deploy validation unless skipped. Returns True if deployment should proceed, False if it should abort. """ if skip_validate: console.print( "[yellow]Skipping pre-deploy validation (--skip-validate).[/yellow]" ) return True console.print("Running pre-deploy validation...", style="bold blue") validator = DeployValidator() validator.run() ignored_error_codes = ignored_error_codes or set() visible_results = [ result for result in validator.results if result.severity is not Severity.ERROR or result.code not in ignored_error_codes ] render_report(visible_results) blocking_errors = [ result for result in validator.errors if result.code not in ignored_error_codes ] if blocking_errors: console.print( "\n[bold red]Pre-deploy validation failed. " "Fix the issues above or re-run with --skip-validate.[/bold red]" ) return False return True def _display_git_repository_help() -> None: """Explain how to prepare a new project for deployment.""" console.print( "Initialized a local Git repository and created an initial commit.", style="green", ) def _display_git_remote_help() -> None: """Explain that ZIP deployment will be used without an origin remote.""" console.print( "No origin remote found. Deploying from a ZIP upload instead.", style="yellow", ) def _env_summary(env_vars: dict[str, str]) -> str: """Return a compact description of environment variables for prompts.""" if not env_vars: return "0 env vars" keys = ", ".join(sorted(env_vars)) return f"{len(env_vars)} env vars: {keys}" def _deployment_identifier(json_response: dict[str, Any]) -> str | None: """Return the best available identifier for a deployment show URL.""" deployment = json_response.get("deployment") for key in _DEPLOYMENT_ID_KEYS: value = json_response.get(key) if value: return str(value) if isinstance(deployment, dict): for key in _DEPLOYMENT_ID_KEYS + _DEPLOYMENT_FALLBACK_IDENTIFIER_KEYS: value = deployment.get(key) if value: return str(value) for key in _DEPLOYMENT_FALLBACK_IDENTIFIER_KEYS: value = json_response.get(key) if value: return str(value) return None def _deployment_page_url(base_url: str, json_response: dict[str, Any]) -> str | None: """Build the CrewAI deployment show URL for a response payload.""" identifier = _deployment_identifier(json_response) if not identifier: return None return ( f"{base_url.rstrip('/')}/crewai_plus/deployments/{quote(identifier, safe='')}" ) def _needs_lockfile_for_deploy(project_root: Path | None = None) -> bool: """Return True when deploy should create the project's first lockfile.""" root = project_root or Path.cwd() if not (root / "pyproject.toml").is_file(): return False return not (root / "uv.lock").is_file() and not (root / "poetry.lock").is_file() def _ensure_lockfile_for_deploy() -> None: """Create a uv lockfile before deploy when a project has not been run yet.""" if not _needs_lockfile_for_deploy(): return from crewai_cli.install_crew import install_crew console.print( "No lockfile found. Installing dependencies before deployment...", style="bold blue", ) try: install_crew([], raise_on_error=True) except subprocess.CalledProcessError as e: raise SystemExit(e.returncode) from e except Exception as e: raise SystemExit(1) from e def _prepare_project_for_deploy(skip_validate: bool) -> bool: """Validate deploy inputs before creating a missing lockfile.""" if skip_validate: _run_predeploy_validation(skip_validate) _ensure_lockfile_for_deploy() return True needs_lockfile = _needs_lockfile_for_deploy() ignored_error_codes = _MISSING_LOCKFILE_ERROR_CODES if needs_lockfile else None if not _run_predeploy_validation( skip_validate, ignored_error_codes=ignored_error_codes, ): return False if not needs_lockfile: return True _ensure_lockfile_for_deploy() return _run_predeploy_validation(skip_validate) class DeployCommand(BaseCommand, PlusAPIMixin): """ A class to handle deployment-related operations for CrewAI projects. """ def __init__(self) -> None: """ Initialize the DeployCommand with project name and API client. """ BaseCommand.__init__(self) PlusAPIMixin.__init__(self, telemetry=self._telemetry) self.project_name = get_project_name(require=True) def _standard_no_param_error_message(self) -> None: """ Display a standard error message when no UUID or project name is available. """ console.print( "No UUID provided, project pyproject.toml not found or with error.", style="bold red", ) def _display_deployment_info(self, json_response: dict[str, Any]) -> None: """ Display deployment information. Args: json_response (Dict[str, Any]): The deployment information to display. """ console.print("Deploying the crew...\n", style="bold blue") for key, value in json_response.items(): console.print(f"{key.title()}: [green]{value}[/green]") console.print("\nTo check the status of the deployment, run:") console.print("crewai deploy status") console.print(" or") console.print(f'crewai deploy status --uuid "{json_response["uuid"]}"') self._open_deployment_page(json_response) def _display_logs(self, log_messages: list[dict[str, Any]]) -> None: """ Display log messages. Args: log_messages (List[Dict[str, Any]]): The log messages to display. """ for log_message in log_messages: console.print( f"{log_message['timestamp']} - {log_message['level']}: {log_message['message']}" ) def _open_deployment_page(self, json_response: dict[str, Any]) -> None: """Open the deployment show page in the user's browser when possible.""" base_url = str( getattr(self.plus_api_client, "base_url", None) or DEFAULT_CREWAI_ENTERPRISE_URL ) deployment_url = _deployment_page_url(base_url, json_response) if not deployment_url: return console.print(f"\nOpening deployment page: [blue]{deployment_url}[/blue]") try: opened = webbrowser.open(deployment_url) except Exception: opened = False if not opened: console.print( "Could not open the deployment page automatically.", style="yellow", ) def deploy(self, uuid: str | None = None, skip_validate: bool = False) -> None: """ Deploy a crew using either UUID or project name. Args: uuid (Optional[str]): The UUID of the crew to deploy. skip_validate (bool): Skip pre-deploy validation checks. """ if not _prepare_project_for_deploy(skip_validate): return self._telemetry.start_deployment_span(uuid) console.print("Starting deployment...", style="bold blue") repository = self._prepare_git_repository() remote_repo_url = repository.origin_url() if repository else None if remote_repo_url and uuid: response = self.plus_api_client.deploy_by_uuid(uuid) elif remote_repo_url and self.project_name: response = self.plus_api_client.deploy_by_name(self.project_name) elif uuid: _display_git_remote_help() env_vars = fetch_and_json_env_file() response = self._update_crew_from_zip(uuid, repository, env_vars) elif self.project_name: _display_git_remote_help() deployment_uuid = self._deployment_uuid_by_name() env_vars = fetch_and_json_env_file() response = self._update_crew_from_zip( deployment_uuid, repository, env_vars, ) else: self._standard_no_param_error_message() return self._validate_response(response) self._display_deployment_info(response.json()) def _deployment_uuid_by_name(self) -> str: """Resolve the current project's deployment UUID by project name.""" if not self.project_name: raise ValueError("project_name is required to find a deployment") response = self.plus_api_client.crew_status_by_name(self.project_name) self._validate_response(response) json_response = response.json() uuid = json_response.get("uuid") if not uuid: raise ValueError("Deployment status response did not include a uuid") return str(uuid) def create_crew(self, confirm: bool = False, skip_validate: bool = False) -> None: """ Create a new crew deployment. Args: confirm (bool): Whether to skip the interactive confirmation prompt. skip_validate (bool): Skip pre-deploy validation checks. """ if not _prepare_project_for_deploy(skip_validate): return self._telemetry.create_crew_deployment_span() console.print("Creating deployment...", style="bold blue") env_vars = fetch_and_json_env_file() repository = self._prepare_git_repository() remote_repo_url = repository.origin_url() if repository else None if remote_repo_url: self._confirm_input(env_vars, remote_repo_url, confirm) payload = self._create_payload(env_vars, remote_repo_url) response = self.plus_api_client.create_crew(payload) else: _display_git_remote_help() response = self._create_crew_from_zip(env_vars, repository, confirm) self._validate_response(response) self._display_creation_success(response.json()) def _prepare_git_repository(self) -> git.Repository | None: """Prepare Git for deploy while preserving remote deploy when possible.""" try: repository = git.Repository(fetch=False) except ValueError as exc: if "not a Git repository" not in str(exc): console.print( f"{exc} Continuing with ZIP deployment.", style="yellow", ) return None try: repository = git.Repository.initialize() except Exception as init_error: console.print( "Git auto-setup did not complete. Continuing with ZIP deployment.", style="yellow", ) console.print(str(init_error), style="dim") try: return git.Repository(fetch=False) except Exception as repository_error: console.print(str(repository_error), style="dim") return None _display_git_repository_help() return repository remote_repo_url = repository.origin_url() if remote_repo_url: try: repository.fetch() except ValueError as fetch_error: console.print( "Could not fetch from origin. Continuing with remote deployment.", style="yellow", ) console.print(str(fetch_error), style="dim") try: if repository.create_initial_commit_if_needed(): console.print( "Created an initial Git commit for this project.", style="green", ) except Exception as commit_error: console.print( "Could not create an initial Git commit. " "Continuing with remote deployment.", style="yellow", ) console.print(str(commit_error), style="dim") return repository try: if repository.create_initial_commit_if_needed(): console.print( "Created an initial Git commit for this project.", style="green", ) except Exception as commit_error: console.print( "Could not create an initial Git commit. " "Continuing with ZIP deployment using Git file listing.", style="yellow", ) console.print(str(commit_error), style="dim") return repository return repository def _create_crew_from_zip( self, env_vars: dict[str, str], repository: git.Repository | None, confirm: bool, ) -> Any: """Create a deployment by uploading a project ZIP archive.""" if not self.project_name: raise ValueError("project_name is required to create a ZIP deployment") console.print("Preparing project ZIP...", style="bold blue") zip_file_path = create_project_zip(self.project_name, repository=repository) try: self._confirm_zip_input(env_vars, confirm) console.print("Uploading project ZIP...", style="bold blue") return self.plus_api_client.create_crew_from_zip( zip_file_path, name=self.project_name, env=env_vars, ) finally: zip_file_path.unlink(missing_ok=True) def _update_crew_from_zip( self, uuid: str, repository: git.Repository | None, env_vars: dict[str, str], ) -> Any: """Update an existing deployment by uploading a project ZIP archive.""" if not self.project_name: raise ValueError("project_name is required to update a ZIP deployment") console.print("Preparing project ZIP...", style="bold blue") zip_file_path = create_project_zip(self.project_name, repository=repository) try: console.print("Uploading project ZIP...", style="bold blue") return self.plus_api_client.update_crew_from_zip( uuid, zip_file_path, env=env_vars, ) finally: zip_file_path.unlink(missing_ok=True) def _confirm_input( self, env_vars: dict[str, str], remote_repo_url: str, confirm: bool ) -> None: """ Confirm input parameters with the user. Args: env_vars (Dict[str, str]): Environment variables. remote_repo_url (str): Remote repository URL. confirm (bool): Whether to confirm input. """ if not confirm: input(f"Press Enter to continue with {_env_summary(env_vars)}") input( f"Press Enter to continue with the following remote repository: {remote_repo_url}\n" ) def _confirm_zip_input(self, env_vars: dict[str, str], confirm: bool) -> None: """Prompt before ZIP upload unless confirmation was already supplied.""" if not confirm: input(f"Press Enter to continue with {_env_summary(env_vars)}") def _create_payload( self, env_vars: dict[str, str], remote_repo_url: str, ) -> CreateCrewPayload: """ Create the payload for crew creation. Args: remote_repo_url (str): Remote repository URL. env_vars (Dict[str, str]): Environment variables. Returns: Dict[str, Any]: The payload for crew creation. """ if not self.project_name: raise ValueError("project_name is required to create a deployment payload") return { "deploy": { "name": self.project_name, "repo_clone_url": remote_repo_url, "env": env_vars, } } def _display_creation_success(self, json_response: dict[str, Any]) -> None: """ Display success message after crew creation. Args: json_response (Dict[str, Any]): The response containing crew information. """ console.print("Deployment created successfully!\n", style="bold green") console.print( f"Name: {self.project_name} ({json_response['uuid']})", style="bold green" ) console.print(f"Status: {json_response['status']}", style="bold green") console.print("\nTo (re)deploy the crew, run:") console.print("crewai deploy push") console.print(" or") console.print(f"crewai deploy push --uuid {json_response['uuid']}") self._open_deployment_page(json_response) def list_crews(self) -> None: """ List all available crews. """ console.print("Listing all Crews\n", style="bold blue") response = self.plus_api_client.list_crews() json_response = response.json() if response.status_code == 200: self._display_crews(json_response) else: self._display_no_crews_message() def _display_crews(self, crews_data: list[dict[str, Any]]) -> None: """ Display the list of crews. Args: crews_data (List[Dict[str, Any]]): List of crew data to display. """ for crew_data in crews_data: console.print( f"- {crew_data['name']} ({crew_data['uuid']}) [blue]{crew_data['status']}[/blue]" ) def _display_no_crews_message(self) -> None: """ Display a message when no crews are available. """ console.print("You don't have any Crews yet. Let's create one!", style="yellow") console.print(" crewai create crew ", style="green") def get_crew_status(self, uuid: str | None = None) -> None: """ Get the status of a crew. Args: uuid (Optional[str]): The UUID of the crew to check. """ console.print("Fetching deployment status...", style="bold blue") if uuid: response = self.plus_api_client.crew_status_by_uuid(uuid) elif self.project_name: response = self.plus_api_client.crew_status_by_name(self.project_name) else: self._standard_no_param_error_message() return self._validate_response(response) self._display_crew_status(response.json()) def _display_crew_status(self, status_data: dict[str, str]) -> None: """ Display the status of a crew. Args: status_data (Dict[str, str]): The status data to display. """ console.print(f"Name:\t {status_data['name']}") console.print(f"Status:\t {status_data['status']}") def get_crew_logs(self, uuid: str | None, log_type: str = "deployment") -> None: """ Get logs for a crew. Args: uuid (Optional[str]): The UUID of the crew to get logs for. log_type (str): The type of logs to retrieve (default: "deployment"). """ self._telemetry.get_crew_logs_span(uuid, log_type) console.print(f"Fetching {log_type} logs...", style="bold blue") if uuid: response = self.plus_api_client.crew_by_uuid(uuid, log_type) elif self.project_name: response = self.plus_api_client.crew_by_name(self.project_name, log_type) else: self._standard_no_param_error_message() return self._validate_response(response) self._display_logs(response.json()) def remove_crew(self, uuid: str | None) -> None: """ Remove a crew deployment. Args: uuid (Optional[str]): The UUID of the crew to remove. """ self._telemetry.remove_crew_span(uuid) console.print("Removing deployment...", style="bold blue") if uuid: response = self.plus_api_client.delete_crew_by_uuid(uuid) elif self.project_name: response = self.plus_api_client.delete_crew_by_name(self.project_name) else: self._standard_no_param_error_message() return if response.status_code == 204: console.print( f"Crew '{self.project_name}' removed successfully.", style="green" ) else: console.print( f"Failed to remove crew '{self.project_name}'", style="bold red" )