Support ZIP deployment fallback and JSON crew project env runs (#6166)

* Update crewAI CLI with various enhancements and fixes

- Updated `create_json_crew.py` to require `crewai[tools]>=1.14.7`.
- Enhanced `git.py` with improved repository initialization, including automatic initial commit creation and exclusion patterns for initial commits.
- Modified `install_crew.py` to allow error handling during installation with an optional `raise_on_error` parameter.
- Expanded `plus_api.py` to include methods for creating and updating crews from ZIP files.
- Introduced a new `archive.py` for creating deployable ZIP archives of CrewAI projects, ensuring local artifacts are excluded.
- Updated `run_crew.py` to manage JSON crew dependencies and run crews in the project's environment.
- Enhanced deployment logic in `main.py` to handle ZIP uploads and improve user feedback during deployment processes.
- Added tests for new functionalities and ensured existing tests reflect recent changes in behavior and requirements.

* fix(cli): address deploy zip review feedback

* fix(cli): sync missing lockfile before deploy

* fix(cli): preserve remote deploy on git setup warnings

* test(cli): use single deploy main import style

* fix(cli): skip project install for json crew sync

* fix(cli): load json runner from source checkout

* fix(cli): skip json crew sync when locked

* fix(cli): address deploy zip review feedback

* fix(cli): pass env on zip redeploy

* fix(cli): harden json run and zip fallback

* fix(cli): validate before deploy lock install

* fix(cli): respect poetry lock for json runs

* fix(cli): align json zip wrapper detection

* fix(deps): bump starlette audit floor

* fix(cli): avoid auth retry for deploy exits

* fix(cli): update json zip script entrypoints
This commit is contained in:
João Moura
2026-06-15 18:46:54 -03:00
committed by GitHub
parent a5cc6f6d0e
commit 53c2284484
20 changed files with 2449 additions and 96 deletions

View File

@@ -13,6 +13,10 @@ from crewai_cli.plus_api import PlusAPI
console = Console()
class AuthenticationRequiredError(SystemExit):
"""Raised when a Plus API command needs the user to log in first."""
class BaseCommand:
def __init__(self) -> None:
self._telemetry = Telemetry()
@@ -31,7 +35,7 @@ class PlusAPIMixin:
style="bold red",
)
console.print("Run 'crewai login' to sign up/login.", style="bold green")
raise SystemExit from None
raise AuthenticationRequiredError from None
def _validate_response(self, response: httpx.Response) -> None:
"""Handle and display error messages from API responses.

View File

@@ -84,7 +84,7 @@ description = "{name} using crewAI"
authors = [{{ name = "Your Name", email = "you@example.com" }}]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=1.15"
"crewai[tools]>=1.14.7"
]
[build-system]

View File

@@ -0,0 +1,409 @@
from __future__ import annotations
from pathlib import Path
import re
import shutil
import tempfile
from typing import Any
import zipfile
from crewai_cli import git
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import parse_toml
_EXCLUDED_DIRS = {
".crewai",
".git",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".tox",
".venv",
"__pycache__",
"build",
"dist",
"env",
"venv",
}
_EXCLUDED_FILES = {
".DS_Store",
".env",
}
_ALLOWED_ENV_EXAMPLES = {
".env.example",
".env.sample",
}
_EXCLUDED_SUFFIXES = {
".pyc",
".pyo",
}
_SCRIPT_KEY_PATTERN = re.compile(r"^\s*(?P<key>[A-Za-z0-9_.-]+|\"[^\"]+\"|'[^']+')\s*=")
_SECTION_PATTERN = re.compile(r"^\s*\[[^\]]+\]\s*(?:#.*)?$")
def create_project_zip(
project_name: str,
*,
project_dir: Path | None = None,
repository: git.Repository | None = None,
) -> Path:
"""Create a deployable ZIP archive for a CrewAI project."""
root = (project_dir or Path.cwd()).resolve()
files = _project_files(root, repository)
if not files:
raise ValueError("No deployable project files were found.")
staged_root = _stage_project(root, files)
archive_handle = tempfile.NamedTemporaryFile(
prefix=f"{project_name}-",
suffix=".zip",
delete=False,
)
archive_path = Path(archive_handle.name)
archive_handle.close()
try:
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zip_file:
for relative_path in _walk_files(staged_root):
absolute_path = staged_root / relative_path
zip_file.write(absolute_path, relative_path.as_posix())
finally:
shutil.rmtree(staged_root, ignore_errors=True)
return archive_path
def _project_files(root: Path, repository: git.Repository | None = None) -> list[Path]:
"""Return project-relative files to include in the archive."""
if repository is not None:
return _repository_project_files(root, repository)
try:
repository = git.Repository(path=str(root), fetch=False)
except ValueError:
repository = None
if repository is not None:
return _repository_project_files(root, repository)
return [
path
for path in _walk_files(root)
if not _is_excluded(path) and _is_regular_file(root / path)
]
def _repository_project_files(root: Path, repository: git.Repository) -> list[Path]:
"""Return deployable files from Git while applying local safety excludes."""
files = [Path(path) for path in repository.deployable_files()]
return [
path
for path in files
if not _is_excluded(path) and _is_regular_file(root / path)
]
def _walk_files(root: Path) -> list[Path]:
"""List regular files below root as project-relative paths."""
return [
path.relative_to(root) for path in root.rglob("*") if _is_regular_file(path)
]
def _is_regular_file(path: Path) -> bool:
"""Return True for regular files, excluding symlinks to files."""
return path.is_file() and not path.is_symlink()
def _is_excluded(path: Path) -> bool:
"""Return True when a file should be omitted from deployment ZIPs."""
parts = set(path.parts)
if parts.intersection(_EXCLUDED_DIRS):
return True
name = path.name
if name in _EXCLUDED_FILES:
return True
if name.startswith(".env.") and name not in _ALLOWED_ENV_EXAMPLES:
return True
return path.suffix in _EXCLUDED_SUFFIXES
def _stage_project(root: Path, files: list[Path]) -> Path:
"""Copy archive files into a temporary staging directory."""
staging_root = Path(tempfile.mkdtemp(prefix="crewai-deploy-"))
try:
for relative_path in files:
source = root / relative_path
if not _is_regular_file(source):
continue
destination = staging_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
if _is_json_crew_project(staging_root):
_add_json_crew_deploy_wrapper(staging_root)
except Exception:
shutil.rmtree(staging_root, ignore_errors=True)
raise
return staging_root
def _is_json_crew_project(root: Path) -> bool:
"""Return True for JSON crew projects that need a Python deploy wrapper."""
if not ((root / "crew.jsonc").is_file() or (root / "crew.json").is_file()):
return False
project = _read_pyproject(root)
tool_config = project.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
if declared_type == "flow":
return False
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
return not (root / "src" / package_name / "crew.py").is_file()
def _read_pyproject(root: Path) -> dict[str, Any]:
"""Read pyproject.toml, returning an empty mapping on missing or invalid data."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return {}
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return {}
return pyproject if isinstance(pyproject, dict) else {}
def _package_name(root: Path) -> str | None:
"""Return the normalized Python package name for the project."""
project = _read_pyproject(root).get("project")
if not isinstance(project, dict):
return None
name = project.get("name")
if not isinstance(name, str) or not name.strip():
return None
package_name = normalize_package_name(name)
return package_name or None
def _class_name(package_name: str) -> str:
"""Return the generated wrapper class name for a package."""
parts = [part for part in re.split(r"[^a-zA-Z0-9]+", package_name) if part]
class_name = "".join(part[:1].upper() + part[1:] for part in parts)
if not class_name:
return "JsonCrew"
if class_name[0].isdigit():
return f"Crew{class_name}"
return class_name
def _add_json_crew_deploy_wrapper(root: Path) -> None:
"""Add Python wrapper files required to deploy a JSON crew project."""
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
package_dir = root / "src" / package_name
config_dir = package_dir / "config"
config_dir.mkdir(parents=True, exist_ok=True)
class_name = _class_name(package_name)
crew_filename = "crew.jsonc" if (root / "crew.jsonc").is_file() else "crew.json"
(package_dir / "__init__.py").write_text("", encoding="utf-8")
(config_dir / "agents.yaml").write_text("{}\n", encoding="utf-8")
(config_dir / "tasks.yaml").write_text("{}\n", encoding="utf-8")
(package_dir / "crew.py").write_text(
_json_crew_py(class_name, crew_filename),
encoding="utf-8",
)
(package_dir / "main.py").write_text(
_json_main_py(package_name, class_name),
encoding="utf-8",
)
_ensure_project_scripts(root, package_name)
def _json_crew_py(class_name: str, crew_filename: str) -> str:
"""Render the generated crew.py module for a JSON crew."""
return f'''from pathlib import Path
from crewai import Crew
from crewai.project import CrewBase, crew
from crewai.project.crew_loader import load_crew
def _crew_path() -> Path:
return Path(__file__).resolve().parents[2] / "{crew_filename}"
@CrewBase
class {class_name}:
"""Compatibility wrapper for a JSON-defined CrewAI project."""
@crew
def crew(self) -> Crew:
crew_instance, default_inputs = load_crew(_crew_path())
self.default_inputs = default_inputs
return crew_instance
'''
def _json_main_py(package_name: str, class_name: str) -> str:
"""Render the generated main.py entrypoints for a JSON crew."""
return f"""#!/usr/bin/env python
import json
import sys
from {package_name}.crew import {class_name}
def _load():
wrapper = {class_name}()
crew = wrapper.crew()
return crew, getattr(wrapper, "default_inputs", {{}})
def run():
crew, inputs = _load()
return crew.kickoff(inputs=inputs)
def train():
crew, inputs = _load()
return crew.train(
n_iterations=int(sys.argv[1]),
filename=sys.argv[2],
inputs=inputs,
)
def replay():
crew, _ = _load()
return crew.replay(task_id=sys.argv[1])
def test():
crew, inputs = _load()
return crew.test(
n_iterations=int(sys.argv[1]),
eval_llm=sys.argv[2],
inputs=inputs,
)
def run_with_trigger():
if len(sys.argv) < 2:
raise ValueError("No trigger payload provided.")
crew, inputs = _load()
trigger_payload = json.loads(sys.argv[1])
return crew.kickoff(
inputs={{**inputs, "crewai_trigger_payload": trigger_payload}}
)
"""
def _ensure_project_scripts(root: Path, package_name: str) -> None:
"""Ensure generated wrappers have project script entrypoints."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return
content = pyproject_path.read_text(encoding="utf-8")
entries = _project_script_entries(package_name)
pyproject_path.write_text(
_update_project_scripts(content, entries),
encoding="utf-8",
)
def _project_script_entries(package_name: str) -> dict[str, str]:
"""Return script entrypoints required by the generated JSON wrapper."""
return {
package_name: f"{package_name}.main:run",
"run_crew": f"{package_name}.main:run",
"train": f"{package_name}.main:train",
"replay": f"{package_name}.main:replay",
"test": f"{package_name}.main:test",
"run_with_trigger": f"{package_name}.main:run_with_trigger",
}
def _update_project_scripts(content: str, entries: dict[str, str]) -> str:
"""Add or replace generated script entries in pyproject.toml content."""
lines = content.rstrip().splitlines()
header_index = _project_scripts_header_index(lines)
if header_index is None:
return content.rstrip() + _project_scripts_block(entries)
end_index = _section_end_index(lines, header_index + 1)
seen: set[str] = set()
for index in range(header_index + 1, end_index):
key = _script_key(lines[index])
if key in entries:
lines[index] = _script_line(key, entries[key])
seen.add(key)
missing_lines = [
_script_line(key, value) for key, value in entries.items() if key not in seen
]
lines[end_index:end_index] = missing_lines
return "\n".join(lines).rstrip() + "\n"
def _project_scripts_header_index(lines: list[str]) -> int | None:
"""Return the line index of the project scripts table, if present."""
for index, line in enumerate(lines):
if line.strip() == "[project.scripts]":
return index
return None
def _section_end_index(lines: list[str], start_index: int) -> int:
"""Return the exclusive end index for a TOML table section."""
for index in range(start_index, len(lines)):
if _SECTION_PATTERN.match(lines[index]):
return index
return len(lines)
def _script_key(line: str) -> str | None:
"""Return the script key for a pyproject script line."""
match = _SCRIPT_KEY_PATTERN.match(line)
if not match:
return None
key = match.group("key")
if key.startswith(("'", '"')) and key.endswith(("'", '"')):
return key[1:-1]
return key
def _script_line(key: str, value: str) -> str:
"""Render a project script TOML entry."""
return f'{key} = "{value}"'
def _project_scripts_block(entries: dict[str, str]) -> str:
"""Render a project scripts TOML table."""
lines = ["", "", "[project.scripts]"]
lines.extend(_script_line(key, value) for key, value in entries.items())
return "\n".join(lines) + "\n"

View File

@@ -1,3 +1,5 @@
from pathlib import Path
import subprocess
from typing import Any
from crewai_core.plus_api import CreateCrewPayload
@@ -5,14 +7,19 @@ from rich.console import Console
from crewai_cli import git
from crewai_cli.command import BaseCommand, PlusAPIMixin
from crewai_cli.deploy.validate import validate_project
from crewai_cli.deploy.archive import create_project_zip
from crewai_cli.deploy.validate import DeployValidator, Severity, render_report
from crewai_cli.utils import fetch_and_json_env_file, get_project_name
console = Console()
_MISSING_LOCKFILE_ERROR_CODES = {"missing_lockfile"}
def _run_predeploy_validation(skip_validate: bool) -> bool:
def _run_predeploy_validation(
skip_validate: bool,
ignored_error_codes: set[str] | None = None,
) -> bool:
"""Run pre-deploy validation unless skipped.
Returns True if deployment should proceed, False if it should abort.
@@ -24,8 +31,22 @@ def _run_predeploy_validation(skip_validate: bool) -> bool:
return True
console.print("Running pre-deploy validation...", style="bold blue")
validator = validate_project()
if not validator.ok:
validator = DeployValidator()
validator.run()
ignored_error_codes = ignored_error_codes or set()
visible_results = [
result
for result in validator.results
if result.severity is not Severity.ERROR
or result.code not in ignored_error_codes
]
render_report(visible_results)
blocking_errors = [
result for result in validator.errors if result.code not in ignored_error_codes
]
if blocking_errors:
console.print(
"\n[bold red]Pre-deploy validation failed. "
"Fix the issues above or re-run with --skip-validate.[/bold red]"
@@ -37,34 +58,74 @@ def _run_predeploy_validation(skip_validate: bool) -> bool:
def _display_git_repository_help() -> None:
"""Explain how to prepare a new project for deployment."""
console.print(
"Deployment requires a Git repository with an origin remote.",
style="bold red",
"Initialized a local Git repository and created an initial commit.",
style="green",
)
console.print(
"CrewAI AMP deploys from the remote repository URL, so commit and push "
"this project first, then run deploy again.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git init")
console.print(" git add .")
console.print(' git commit -m "Initial crew"')
console.print(" git branch -M main")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin main")
def _display_git_remote_help() -> None:
"""Explain how to add a remote to an existing Git repository."""
console.print("No remote repository URL found.", style="bold red")
"""Explain that ZIP deployment will be used without an origin remote."""
console.print(
"CrewAI AMP deploys from the origin remote. Add a remote, push your "
"latest commit, then run deploy again.",
"No origin remote found. Deploying from a ZIP upload instead.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin HEAD")
def _env_summary(env_vars: dict[str, str]) -> str:
"""Return a compact description of environment variables for prompts."""
if not env_vars:
return "0 env vars"
keys = ", ".join(sorted(env_vars))
return f"{len(env_vars)} env vars: {keys}"
def _needs_lockfile_for_deploy(project_root: Path | None = None) -> bool:
"""Return True when deploy should create the project's first lockfile."""
root = project_root or Path.cwd()
if not (root / "pyproject.toml").is_file():
return False
return not (root / "uv.lock").is_file() and not (root / "poetry.lock").is_file()
def _ensure_lockfile_for_deploy() -> None:
"""Create a uv lockfile before deploy when a project has not been run yet."""
if not _needs_lockfile_for_deploy():
return
from crewai_cli.install_crew import install_crew
console.print(
"No lockfile found. Installing dependencies before deployment...",
style="bold blue",
)
try:
install_crew([], raise_on_error=True)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
raise SystemExit(1) from e
def _prepare_project_for_deploy(skip_validate: bool) -> bool:
"""Validate deploy inputs before creating a missing lockfile."""
if skip_validate:
_run_predeploy_validation(skip_validate)
_ensure_lockfile_for_deploy()
return True
needs_lockfile = _needs_lockfile_for_deploy()
ignored_error_codes = _MISSING_LOCKFILE_ERROR_CODES if needs_lockfile else None
if not _run_predeploy_validation(
skip_validate,
ignored_error_codes=ignored_error_codes,
):
return False
if not needs_lockfile:
return True
_ensure_lockfile_for_deploy()
return _run_predeploy_validation(skip_validate)
class DeployCommand(BaseCommand, PlusAPIMixin):
@@ -125,14 +186,30 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
uuid (Optional[str]): The UUID of the crew to deploy.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
if not _prepare_project_for_deploy(skip_validate):
return
self._telemetry.start_deployment_span(uuid)
console.print("Starting deployment...", style="bold blue")
if uuid:
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
if remote_repo_url and uuid:
response = self.plus_api_client.deploy_by_uuid(uuid)
elif self.project_name:
elif remote_repo_url and self.project_name:
response = self.plus_api_client.deploy_by_name(self.project_name)
elif uuid:
_display_git_remote_help()
env_vars = fetch_and_json_env_file()
response = self._update_crew_from_zip(uuid, repository, env_vars)
elif self.project_name:
_display_git_remote_help()
deployment_uuid = self._deployment_uuid_by_name()
env_vars = fetch_and_json_env_file()
response = self._update_crew_from_zip(
deployment_uuid,
repository,
env_vars,
)
else:
self._standard_no_param_error_message()
return
@@ -140,6 +217,19 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._validate_response(response)
self._display_deployment_info(response.json())
def _deployment_uuid_by_name(self) -> str:
"""Resolve the current project's deployment UUID by project name."""
if not self.project_name:
raise ValueError("project_name is required to find a deployment")
response = self.plus_api_client.crew_status_by_name(self.project_name)
self._validate_response(response)
json_response = response.json()
uuid = json_response.get("uuid")
if not uuid:
raise ValueError("Deployment status response did not include a uuid")
return str(uuid)
def create_crew(self, confirm: bool = False, skip_validate: bool = False) -> None:
"""
Create a new crew deployment.
@@ -148,29 +238,143 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
confirm (bool): Whether to skip the interactive confirmation prompt.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
if not _prepare_project_for_deploy(skip_validate):
return
self._telemetry.create_crew_deployment_span()
console.print("Creating deployment...", style="bold blue")
env_vars = fetch_and_json_env_file()
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
try:
remote_repo_url = git.Repository().origin_url()
except ValueError:
_display_git_repository_help()
return
if remote_repo_url is None:
if remote_repo_url:
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
else:
_display_git_remote_help()
return
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
response = self._create_crew_from_zip(env_vars, repository, confirm)
self._validate_response(response)
self._display_creation_success(response.json())
def _prepare_git_repository(self) -> git.Repository | None:
"""Prepare Git for deploy while preserving remote deploy when possible."""
try:
repository = git.Repository(fetch=False)
except ValueError as exc:
if "not a Git repository" not in str(exc):
console.print(
f"{exc} Continuing with ZIP deployment.",
style="yellow",
)
return None
try:
repository = git.Repository.initialize()
except Exception as init_error:
console.print(
"Git auto-setup did not complete. Continuing with ZIP deployment.",
style="yellow",
)
console.print(str(init_error), style="dim")
try:
return git.Repository(fetch=False)
except Exception as repository_error:
console.print(str(repository_error), style="dim")
return None
_display_git_repository_help()
return repository
remote_repo_url = repository.origin_url()
if remote_repo_url:
try:
repository.fetch()
except ValueError as fetch_error:
console.print(
"Could not fetch from origin. Continuing with remote deployment.",
style="yellow",
)
console.print(str(fetch_error), style="dim")
try:
if repository.create_initial_commit_if_needed():
console.print(
"Created an initial Git commit for this project.",
style="green",
)
except Exception as commit_error:
console.print(
"Could not create an initial Git commit. "
"Continuing with remote deployment.",
style="yellow",
)
console.print(str(commit_error), style="dim")
return repository
try:
if repository.create_initial_commit_if_needed():
console.print(
"Created an initial Git commit for this project.",
style="green",
)
except Exception as commit_error:
console.print(
"Could not create an initial Git commit. "
"Continuing with ZIP deployment using Git file listing.",
style="yellow",
)
console.print(str(commit_error), style="dim")
return repository
return repository
def _create_crew_from_zip(
self,
env_vars: dict[str, str],
repository: git.Repository | None,
confirm: bool,
) -> Any:
"""Create a deployment by uploading a project ZIP archive."""
if not self.project_name:
raise ValueError("project_name is required to create a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
self._confirm_zip_input(env_vars, confirm)
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.create_crew_from_zip(
zip_file_path,
name=self.project_name,
env=env_vars,
)
finally:
zip_file_path.unlink(missing_ok=True)
def _update_crew_from_zip(
self,
uuid: str,
repository: git.Repository | None,
env_vars: dict[str, str],
) -> Any:
"""Update an existing deployment by uploading a project ZIP archive."""
if not self.project_name:
raise ValueError("project_name is required to update a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.update_crew_from_zip(
uuid,
zip_file_path,
env=env_vars,
)
finally:
zip_file_path.unlink(missing_ok=True)
def _confirm_input(
self, env_vars: dict[str, str], remote_repo_url: str, confirm: bool
) -> None:
@@ -183,11 +387,16 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
confirm (bool): Whether to confirm input.
"""
if not confirm:
input(f"Press Enter to continue with the following Env vars: {env_vars}")
input(f"Press Enter to continue with {_env_summary(env_vars)}")
input(
f"Press Enter to continue with the following remote repository: {remote_repo_url}\n"
)
def _confirm_zip_input(self, env_vars: dict[str, str], confirm: bool) -> None:
"""Prompt before ZIP upload unless confirmation was already supplied."""
if not confirm:
input(f"Press Enter to continue with {_env_summary(env_vars)}")
def _create_payload(
self,
env_vars: dict[str, str],

View File

@@ -1,9 +1,31 @@
from __future__ import annotations
from functools import cached_property
from pathlib import Path
import subprocess
_INITIAL_COMMIT_EXCLUDE_PATTERNS = [
".crewai/",
".env",
".env.*",
"!.env.example",
"!.env.sample",
".mypy_cache/",
".pytest_cache/",
".ruff_cache/",
".tox/",
".venv/",
"__pycache__/",
"build/",
"dist/",
"env/",
"venv/",
]
class Repository:
def __init__(self, path: str = ".") -> None:
def __init__(self, path: str = ".", fetch: bool = True) -> None:
self.path = path
if not self.is_git_installed():
@@ -12,7 +34,8 @@ class Repository:
if not self.is_git_repo:
raise ValueError(f"{self.path} is not a Git repository.")
self.fetch()
if fetch:
self.fetch()
@staticmethod
def is_git_installed() -> bool:
@@ -30,7 +53,33 @@ class Repository:
def fetch(self) -> None:
"""Fetch latest updates from the remote."""
subprocess.run(["git", "fetch"], cwd=self.path, check=True) # noqa: S607
command = ["git", "fetch"]
result = subprocess.run( # noqa: S603
command,
cwd=self.path,
capture_output=True,
text=True,
)
if result.returncode == 0:
return
if "No remote repository specified" in result.stderr:
return
details = result.stderr.strip() or result.stdout.strip() or "no output"
raise ValueError(
f"Git fetch failed with exit code {result.returncode} "
f"for command {command!r}: {details}"
)
@classmethod
def initialize(cls, path: str = ".") -> Repository:
"""Initialize a Git repository and create an initial commit if needed."""
if not cls.is_git_installed():
raise ValueError("Git is not installed or not found in your PATH.")
subprocess.run(["git", "init"], cwd=path, check=True) # noqa: S607
repository = cls(path=path, fetch=False)
repository.create_initial_commit_if_needed()
return repository
def status(self) -> str:
"""Get the git status in porcelain format."""
@@ -71,6 +120,74 @@ class Repository:
return False
return True
def has_commits(self) -> bool:
"""Return True if the repository has at least one commit."""
try:
subprocess.run(
["git", "rev-parse", "--verify", "HEAD"], # noqa: S607
cwd=self.path,
capture_output=True,
check=True,
text=True,
)
return True
except subprocess.CalledProcessError:
return False
def create_initial_commit_if_needed(self) -> bool:
"""Create a local initial commit when the repository has no commits."""
if self.has_commits():
return False
self._ensure_initial_commit_excludes()
subprocess.run(["git", "add", "."], cwd=self.path, check=True) # noqa: S607
command = [
"git",
"-c",
"user.name=CrewAI",
"-c",
"user.email=deploy@crewai.com",
"commit",
"--allow-empty",
"-m",
"Initial crew",
]
subprocess.run( # noqa: S603
command,
cwd=self.path,
check=True,
)
return True
def _ensure_initial_commit_excludes(self) -> None:
"""Add local-only ignore patterns before auto-staging an initial commit."""
exclude_file = Path(self.path) / ".git" / "info" / "exclude"
exclude_file.parent.mkdir(parents=True, exist_ok=True)
existing = exclude_file.read_text() if exclude_file.exists() else ""
existing_lines = set(existing.splitlines())
missing_patterns = [
pattern
for pattern in _INITIAL_COMMIT_EXCLUDE_PATTERNS
if pattern not in existing_lines
]
if not missing_patterns:
return
prefix = "" if existing.endswith("\n") or not existing else "\n"
patterns = "\n".join(missing_patterns)
exclude_file.write_text(
f"{existing}{prefix}# CrewAI deploy auto-commit excludes\n{patterns}\n"
)
def deployable_files(self) -> list[str]:
"""Return files tracked by Git or untracked and not ignored."""
output = subprocess.check_output(
["git", "ls-files", "--cached", "--others", "--exclude-standard"], # noqa: S607
cwd=self.path,
encoding="utf-8",
)
return [line for line in output.splitlines() if line]
def origin_url(self) -> str | None:
"""Get the Git repository's remote URL."""
try:

View File

@@ -1,20 +1,77 @@
from pathlib import Path
import subprocess
import click
from crewai_cli.utils import build_env_with_all_tool_credentials
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import build_env_with_all_tool_credentials, parse_toml
def _find_json_crew_file(project_root: Path | None = None) -> Path | None:
"""Return the JSON crew definition path when present."""
root = project_root or Path.cwd()
for filename in ("crew.jsonc", "crew.json"):
crew_path = root / filename
if crew_path.is_file():
return crew_path
return None
def _is_json_crew_project(project_root: Path | None = None) -> bool:
"""Return True for JSON crew projects that do not need package install."""
root = project_root or Path.cwd()
if _find_json_crew_file(root) is None:
return False
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return True
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return True
if not isinstance(pyproject, dict):
return True
tool_config = pyproject.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
project_config = pyproject.get("project") or {}
project_name = (
project_config.get("name") if isinstance(project_config, dict) else None
)
if isinstance(project_name, str):
package_name = normalize_package_name(project_name)
if package_name and (root / "src" / package_name / "crew.py").is_file():
return False
return declared_type != "flow"
# Be mindful about changing this.
# on some environments we don't use this command but instead uv sync directly
# so if you expect this to support more things you will need to replicate it there
# ask @joaomdmoura if you are unsure
def install_crew(proxy_options: list[str]) -> None:
def install_crew(
proxy_options: list[str],
*,
raise_on_error: bool = False,
install_project: bool | None = None,
) -> None:
"""
Install the crew by running the UV command to lock and install.
"""
try:
command = ["uv", "sync", *proxy_options]
if install_project is None:
install_project = not _is_json_crew_project()
command = ["uv", "sync"]
if not install_project and "--no-install-project" not in proxy_options:
command.append("--no-install-project")
command.extend(proxy_options)
# Inject tool repository credentials so uv can authenticate
# against private package indexes (e.g. crewai tool repository).
@@ -22,11 +79,21 @@ def install_crew(proxy_options: list[str]) -> None:
# project depends on tools from a private index.
env = build_env_with_all_tool_credentials()
subprocess.run(command, check=True, capture_output=False, text=True, env=env) # noqa: S603
subprocess.run( # noqa: S603
command,
check=True,
capture_output=False,
text=True,
env=env,
)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True)
if raise_on_error:
raise
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
if raise_on_error:
raise

View File

@@ -1,12 +1,95 @@
"""Re-export of ``crewai_core.plus_api.PlusAPI``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.plus_api`` directly.
"""
"""CrewAI CLI API client extensions."""
from __future__ import annotations
from crewai_core.plus_api import PlusAPI as PlusAPI
from pathlib import Path
from typing import Any, Literal, cast
from urllib.parse import urljoin
from crewai_core.plus_api import PlusAPI as _CorePlusAPI
import httpx
HttpMethod = Literal["GET", "POST", "PATCH", "DELETE"]
class PlusAPI(_CorePlusAPI):
"""CLI API client.
The ZIP deployment methods live here as well as in newer crewai-core
versions so editable CLI installs still work when an older crewai-core is
present in the runtime environment.
"""
def _make_multipart_request(
self,
method: HttpMethod,
endpoint: str,
*,
zip_file_path: str | Path,
data: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
"""Send an authenticated multipart request containing a project ZIP."""
url = urljoin(self.base_url, endpoint)
headers = dict(cast(dict[str, str], self.headers))
headers.pop("Content-Type", None)
path = Path(zip_file_path)
request_kwargs: dict[str, Any] = {"headers": headers}
if data is not None:
request_kwargs["data"] = data
if timeout is not None:
request_kwargs["timeout"] = timeout
with (
path.open("rb") as file_handle,
httpx.Client(trust_env=False, verify=verify) as client,
):
files = {
"zip_file": (path.name, file_handle, "application/zip"),
}
return client.request(method, url, files=files, **request_kwargs)
def create_crew_from_zip(
self,
zip_file_path: str | Path,
*,
name: str | None = None,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Create a crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if name:
data["name"] = name
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/zip",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def update_crew_from_zip(
self,
uuid: str,
zip_file_path: str | Path,
*,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Update an existing crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/{uuid}/zip_update",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
__all__ = ["PlusAPI"]

View File

@@ -35,6 +35,46 @@ class CrewType(Enum):
# crewai.utilities.string_utils (_VARIABLE_PATTERN), including hyphens —
# otherwise placeholders are interpolated at runtime but never prompted for.
_INPUT_PLACEHOLDER_RE = re.compile(r"(?<!{){([A-Za-z_][A-Za-z0-9_\-]*)}(?!})")
_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV = "CREWAI_CLI_RUNNER_PACKAGE_DIR"
_CREWAI_RUNNER_SOURCE_DIR_ENV = "CREWAI_RUNNER_SOURCE_DIR"
_JSON_CREW_RUNNER_CODE = """
import importlib.util
import os
from pathlib import Path
import sys
source_dir = os.environ.get("CREWAI_RUNNER_SOURCE_DIR")
if source_dir:
sys.path.insert(0, source_dir)
package_dir = Path(os.environ["CREWAI_CLI_RUNNER_PACKAGE_DIR"])
package_spec = importlib.util.spec_from_file_location(
"crewai_cli",
package_dir / "__init__.py",
submodule_search_locations=[str(package_dir)],
)
if package_spec is None or package_spec.loader is None:
raise ImportError(f"Cannot load CrewAI CLI package from {package_dir}")
package = importlib.util.module_from_spec(package_spec)
sys.modules["crewai_cli"] = package
package_spec.loader.exec_module(package)
module_path = package_dir / "run_crew.py"
module_spec = importlib.util.spec_from_file_location("crewai_cli.run_crew", module_path)
if module_spec is None or module_spec.loader is None:
raise ImportError(f"Cannot load CrewAI CLI runner from {module_path}")
module = importlib.util.module_from_spec(module_spec)
sys.modules["crewai_cli.run_crew"] = module
module_spec.loader.exec_module(module)
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
module._run_json_crew(
trained_agents_file=os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV)
)
""".strip()
def _has_json_crew() -> bool:
@@ -216,24 +256,148 @@ def _run_json_crew(trained_agents_file: str | None = None) -> Any:
return app._crew_result
def _has_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project already has a dependency lockfile."""
return _has_uv_lockfile(project_root) or _has_poetry_lockfile(project_root)
def _has_uv_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project has a uv lockfile."""
root = project_root or Path.cwd()
return (root / "uv.lock").is_file()
def _has_poetry_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project has a Poetry lockfile."""
root = project_root or Path.cwd()
return (root / "poetry.lock").is_file()
def _uses_poetry_lockfile(project_root: Path | None = None) -> bool:
"""Return True when Poetry is the only available lock source."""
return _has_poetry_lockfile(project_root) and not _has_uv_lockfile(project_root)
def _has_project_venv(project_root: Path | None = None) -> bool:
"""Return True when the project already has a local uv environment."""
root = project_root or Path.cwd()
return (root / ".venv").is_dir()
def _install_json_crew_dependencies_if_needed() -> None:
"""Prepare JSON crew dependencies without mutating existing lockfiles."""
project_root = Path.cwd()
if not (project_root / "pyproject.toml").is_file():
return
has_uv_lockfile = _has_uv_lockfile(project_root)
has_lockfile = has_uv_lockfile or _has_poetry_lockfile(project_root)
if has_lockfile and _has_project_venv(project_root):
return
if _uses_poetry_lockfile(project_root):
return
from crewai_cli.install_crew import install_crew
try:
if has_uv_lockfile:
click.echo("Syncing dependencies from lockfile...")
install_crew(["--frozen"], raise_on_error=True)
else:
click.echo("Installing dependencies...")
install_crew([], raise_on_error=True)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
raise SystemExit(1) from e
def _find_local_crewai_source_dir() -> Path | None:
"""Return the repo's CrewAI source dir when running from a source checkout."""
for parent in Path(__file__).resolve().parents:
candidate = parent / "lib" / "crewai" / "src"
if (candidate / "crewai" / "project" / "json_loader.py").is_file():
return candidate
return None
def _json_crew_run_command(project_root: Path | None = None) -> list[str]:
"""Return the project-environment command for running JSON crews."""
if _uses_poetry_lockfile(project_root):
return ["poetry", "run", "python", "-c", _JSON_CREW_RUNNER_CODE]
return ["uv", "run", "--no-sync", "python", "-c", _JSON_CREW_RUNNER_CODE]
def _run_json_crew_in_project_env(trained_agents_file: str | None = None) -> Any:
"""Run JSON crews from the project's uv-managed environment."""
if not (Path.cwd() / "pyproject.toml").is_file():
return _run_json_crew(trained_agents_file=trained_agents_file)
_install_json_crew_dependencies_if_needed()
command = _json_crew_run_command()
env = build_env_with_all_tool_credentials()
env[_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV] = str(Path(__file__).resolve().parent)
if local_crewai_source_dir := _find_local_crewai_source_dir():
env[_CREWAI_RUNNER_SOURCE_DIR_ENV] = str(local_crewai_source_dir)
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
subprocess.run( # noqa: S603
command,
capture_output=False,
text=True,
check=True,
env=env,
)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
click.echo(f"An unexpected error occurred while running the JSON crew: {e}")
raise SystemExit(1) from e
return None
def _chain_deploy() -> None:
from rich.console import Console
console = Console()
def print_system_exit_failure(exc: SystemExit) -> None:
if isinstance(exc.code, int):
detail = f" with exit code {exc.code}"
elif exc.code:
detail = f": {exc.code}"
else:
detail = ""
console.print(f"\nDeploy failed{detail}\n", style="bold red")
try:
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli.deploy.main import DeployCommand
console.print("\nStarting deployment…\n", style="bold #FF5A50")
DeployCommand().create_crew(confirm=False, skip_validate=True)
except SystemExit:
DeployCommand().create_crew(confirm=True, skip_validate=True)
except AuthenticationRequiredError:
from crewai_cli.authentication.main import AuthenticationCommand
console.print()
AuthenticationCommand().login()
try:
DeployCommand().create_crew(confirm=False, skip_validate=True)
DeployCommand().create_crew(confirm=True, skip_validate=True)
except AuthenticationRequiredError:
console.print(
"\nDeploy failed: authentication is still required.\n",
style="bold red",
)
except SystemExit as e:
print_system_exit_failure(e)
except Exception as e:
console.print(f"\nDeploy failed: {e}\n", style="bold red")
except SystemExit as e:
print_system_exit_failure(e)
except Exception as e:
console.print(f"\nDeploy failed: {e}\n", style="bold red")
@@ -315,7 +479,7 @@ def run_crew(trained_agents_file: str | None = None) -> None:
"""
# JSON crew projects take precedence
if _has_json_crew():
_run_json_crew(trained_agents_file=trained_agents_file)
_run_json_crew_in_project_env(trained_agents_file=trained_agents_file)
return
crewai_version = get_crewai_version()