Compare commits

...

16 Commits

Author SHA1 Message Date
Joao Moura
2cb9f4bc35 fix(cli): avoid auth retry for deploy exits 2026-06-15 14:26:40 -07:00
Joao Moura
26f7e4cbbf fix(deps): bump starlette audit floor 2026-06-15 14:23:28 -07:00
Joao Moura
6f4684b34b fix(cli): align json zip wrapper detection 2026-06-15 14:17:38 -07:00
Joao Moura
fa3dc8c651 fix(cli): respect poetry lock for json runs 2026-06-15 12:05:26 -07:00
Joao Moura
99face4c17 fix(cli): validate before deploy lock install 2026-06-15 12:05:26 -07:00
Joao Moura
664b3a0174 fix(cli): harden json run and zip fallback 2026-06-15 12:05:26 -07:00
Joao Moura
df712e079a fix(cli): pass env on zip redeploy 2026-06-15 12:05:26 -07:00
Joao Moura
dbbdd77449 fix(cli): address deploy zip review feedback 2026-06-15 12:05:26 -07:00
Joao Moura
b8c89a81f6 fix(cli): skip json crew sync when locked 2026-06-15 12:05:26 -07:00
Joao Moura
ce74cdf02c fix(cli): load json runner from source checkout 2026-06-15 12:05:26 -07:00
Joao Moura
bfc817bbd3 fix(cli): skip project install for json crew sync 2026-06-15 12:05:26 -07:00
Joao Moura
c4027a1fb4 test(cli): use single deploy main import style 2026-06-15 12:05:26 -07:00
Joao Moura
35d58501df fix(cli): preserve remote deploy on git setup warnings 2026-06-15 12:05:26 -07:00
Joao Moura
2c0b127ff2 fix(cli): sync missing lockfile before deploy 2026-06-15 12:05:26 -07:00
Joao Moura
49bff078af fix(cli): address deploy zip review feedback 2026-06-15 12:05:26 -07:00
Joao Moura
b11132d7ee Update crewAI CLI with various enhancements and fixes
- Updated `create_json_crew.py` to require `crewai[tools]>=1.14.7`.
- Enhanced `git.py` with improved repository initialization, including automatic initial commit creation and exclusion patterns for initial commits.
- Modified `install_crew.py` to allow error handling during installation with an optional `raise_on_error` parameter.
- Expanded `plus_api.py` to include methods for creating and updating crews from ZIP files.
- Introduced a new `archive.py` for creating deployable ZIP archives of CrewAI projects, ensuring local artifacts are excluded.
- Updated `run_crew.py` to manage JSON crew dependencies and run crews in the project's environment.
- Enhanced deployment logic in `main.py` to handle ZIP uploads and improve user feedback during deployment processes.
- Added tests for new functionalities and ensured existing tests reflect recent changes in behavior and requirements.
2026-06-15 12:05:26 -07:00
20 changed files with 2345 additions and 96 deletions

View File

@@ -13,6 +13,10 @@ from crewai_cli.plus_api import PlusAPI
console = Console()
class AuthenticationRequiredError(SystemExit):
"""Raised when a Plus API command needs the user to log in first."""
class BaseCommand:
def __init__(self) -> None:
self._telemetry = Telemetry()
@@ -31,7 +35,7 @@ class PlusAPIMixin:
style="bold red",
)
console.print("Run 'crewai login' to sign up/login.", style="bold green")
raise SystemExit from None
raise AuthenticationRequiredError from None
def _validate_response(self, response: httpx.Response) -> None:
"""Handle and display error messages from API responses.

View File

@@ -84,7 +84,7 @@ description = "{name} using crewAI"
authors = [{{ name = "Your Name", email = "you@example.com" }}]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=1.15"
"crewai[tools]>=1.14.7"
]
[build-system]

View File

@@ -0,0 +1,342 @@
from __future__ import annotations
from pathlib import Path
import re
import shutil
import tempfile
from typing import Any
import zipfile
from crewai_cli import git
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import parse_toml
_EXCLUDED_DIRS = {
".crewai",
".git",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".tox",
".venv",
"__pycache__",
"build",
"dist",
"env",
"venv",
}
_EXCLUDED_FILES = {
".DS_Store",
".env",
}
_ALLOWED_ENV_EXAMPLES = {
".env.example",
".env.sample",
}
_EXCLUDED_SUFFIXES = {
".pyc",
".pyo",
}
def create_project_zip(
project_name: str,
*,
project_dir: Path | None = None,
repository: git.Repository | None = None,
) -> Path:
"""Create a deployable ZIP archive for a CrewAI project."""
root = (project_dir or Path.cwd()).resolve()
files = _project_files(root, repository)
if not files:
raise ValueError("No deployable project files were found.")
staged_root = _stage_project(root, files)
archive_handle = tempfile.NamedTemporaryFile(
prefix=f"{project_name}-",
suffix=".zip",
delete=False,
)
archive_path = Path(archive_handle.name)
archive_handle.close()
try:
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zip_file:
for relative_path in _walk_files(staged_root):
absolute_path = staged_root / relative_path
zip_file.write(absolute_path, relative_path.as_posix())
finally:
shutil.rmtree(staged_root, ignore_errors=True)
return archive_path
def _project_files(root: Path, repository: git.Repository | None = None) -> list[Path]:
"""Return project-relative files to include in the archive."""
if repository is not None:
return _repository_project_files(root, repository)
try:
repository = git.Repository(path=str(root), fetch=False)
except ValueError:
repository = None
if repository is not None:
return _repository_project_files(root, repository)
return [
path
for path in _walk_files(root)
if not _is_excluded(path) and _is_regular_file(root / path)
]
def _repository_project_files(root: Path, repository: git.Repository) -> list[Path]:
"""Return deployable files from Git while applying local safety excludes."""
files = [Path(path) for path in repository.deployable_files()]
return [
path
for path in files
if not _is_excluded(path) and _is_regular_file(root / path)
]
def _walk_files(root: Path) -> list[Path]:
"""List regular files below root as project-relative paths."""
return [
path.relative_to(root) for path in root.rglob("*") if _is_regular_file(path)
]
def _is_regular_file(path: Path) -> bool:
"""Return True for regular files, excluding symlinks to files."""
return path.is_file() and not path.is_symlink()
def _is_excluded(path: Path) -> bool:
"""Return True when a file should be omitted from deployment ZIPs."""
parts = set(path.parts)
if parts.intersection(_EXCLUDED_DIRS):
return True
name = path.name
if name in _EXCLUDED_FILES:
return True
if name.startswith(".env.") and name not in _ALLOWED_ENV_EXAMPLES:
return True
return path.suffix in _EXCLUDED_SUFFIXES
def _stage_project(root: Path, files: list[Path]) -> Path:
"""Copy archive files into a temporary staging directory."""
staging_root = Path(tempfile.mkdtemp(prefix="crewai-deploy-"))
try:
for relative_path in files:
source = root / relative_path
if not _is_regular_file(source):
continue
destination = staging_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
if _is_json_crew_project(staging_root):
_add_json_crew_deploy_wrapper(staging_root)
except Exception:
shutil.rmtree(staging_root, ignore_errors=True)
raise
return staging_root
def _is_json_crew_project(root: Path) -> bool:
"""Return True for JSON crew projects that need a Python deploy wrapper."""
if not ((root / "crew.jsonc").is_file() or (root / "crew.json").is_file()):
return False
project = _read_pyproject(root)
tool_config = project.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
if declared_type == "flow":
return False
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
return not (root / "src" / package_name / "crew.py").is_file()
def _read_pyproject(root: Path) -> dict[str, Any]:
"""Read pyproject.toml, returning an empty mapping on missing or invalid data."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return {}
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return {}
return pyproject if isinstance(pyproject, dict) else {}
def _package_name(root: Path) -> str | None:
"""Return the normalized Python package name for the project."""
project = _read_pyproject(root).get("project")
if not isinstance(project, dict):
return None
name = project.get("name")
if not isinstance(name, str) or not name.strip():
return None
package_name = normalize_package_name(name)
return package_name or None
def _class_name(package_name: str) -> str:
"""Return the generated wrapper class name for a package."""
parts = [part for part in re.split(r"[^a-zA-Z0-9]+", package_name) if part]
class_name = "".join(part[:1].upper() + part[1:] for part in parts)
if not class_name:
return "JsonCrew"
if class_name[0].isdigit():
return f"Crew{class_name}"
return class_name
def _add_json_crew_deploy_wrapper(root: Path) -> None:
"""Add Python wrapper files required to deploy a JSON crew project."""
package_name = _package_name(root)
if package_name is None:
raise ValueError(
"Could not derive a valid Python package name from [project].name."
)
package_dir = root / "src" / package_name
config_dir = package_dir / "config"
config_dir.mkdir(parents=True, exist_ok=True)
class_name = _class_name(package_name)
crew_filename = "crew.jsonc" if (root / "crew.jsonc").is_file() else "crew.json"
(package_dir / "__init__.py").write_text("", encoding="utf-8")
(config_dir / "agents.yaml").write_text("{}\n", encoding="utf-8")
(config_dir / "tasks.yaml").write_text("{}\n", encoding="utf-8")
(package_dir / "crew.py").write_text(
_json_crew_py(class_name, crew_filename),
encoding="utf-8",
)
(package_dir / "main.py").write_text(
_json_main_py(package_name, class_name),
encoding="utf-8",
)
_ensure_project_scripts(root, package_name)
def _json_crew_py(class_name: str, crew_filename: str) -> str:
"""Render the generated crew.py module for a JSON crew."""
return f'''from pathlib import Path
from crewai import Crew
from crewai.project import CrewBase, crew
from crewai.project.crew_loader import load_crew
def _crew_path() -> Path:
return Path(__file__).resolve().parents[2] / "{crew_filename}"
@CrewBase
class {class_name}:
"""Compatibility wrapper for a JSON-defined CrewAI project."""
@crew
def crew(self) -> Crew:
crew_instance, default_inputs = load_crew(_crew_path())
self.default_inputs = default_inputs
return crew_instance
'''
def _json_main_py(package_name: str, class_name: str) -> str:
"""Render the generated main.py entrypoints for a JSON crew."""
return f"""#!/usr/bin/env python
import json
import sys
from {package_name}.crew import {class_name}
def _load():
wrapper = {class_name}()
crew = wrapper.crew()
return crew, getattr(wrapper, "default_inputs", {{}})
def run():
crew, inputs = _load()
return crew.kickoff(inputs=inputs)
def train():
crew, inputs = _load()
return crew.train(
n_iterations=int(sys.argv[1]),
filename=sys.argv[2],
inputs=inputs,
)
def replay():
crew, _ = _load()
return crew.replay(task_id=sys.argv[1])
def test():
crew, inputs = _load()
return crew.test(
n_iterations=int(sys.argv[1]),
eval_llm=sys.argv[2],
inputs=inputs,
)
def run_with_trigger():
if len(sys.argv) < 2:
raise ValueError("No trigger payload provided.")
crew, inputs = _load()
trigger_payload = json.loads(sys.argv[1])
return crew.kickoff(
inputs={{**inputs, "crewai_trigger_payload": trigger_payload}}
)
"""
def _ensure_project_scripts(root: Path, package_name: str) -> None:
"""Ensure generated wrappers have project script entrypoints."""
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return
content = pyproject_path.read_text(encoding="utf-8")
if "[project.scripts]" in content:
return
script_block = f'''
[project.scripts]
{package_name} = "{package_name}.main:run"
run_crew = "{package_name}.main:run"
train = "{package_name}.main:train"
replay = "{package_name}.main:replay"
test = "{package_name}.main:test"
run_with_trigger = "{package_name}.main:run_with_trigger"
'''
pyproject_path.write_text(content.rstrip() + script_block, encoding="utf-8")

View File

@@ -1,3 +1,5 @@
from pathlib import Path
import subprocess
from typing import Any
from crewai_core.plus_api import CreateCrewPayload
@@ -5,14 +7,19 @@ from rich.console import Console
from crewai_cli import git
from crewai_cli.command import BaseCommand, PlusAPIMixin
from crewai_cli.deploy.validate import validate_project
from crewai_cli.deploy.archive import create_project_zip
from crewai_cli.deploy.validate import DeployValidator, Severity, render_report
from crewai_cli.utils import fetch_and_json_env_file, get_project_name
console = Console()
_MISSING_LOCKFILE_ERROR_CODES = {"missing_lockfile"}
def _run_predeploy_validation(skip_validate: bool) -> bool:
def _run_predeploy_validation(
skip_validate: bool,
ignored_error_codes: set[str] | None = None,
) -> bool:
"""Run pre-deploy validation unless skipped.
Returns True if deployment should proceed, False if it should abort.
@@ -24,8 +31,22 @@ def _run_predeploy_validation(skip_validate: bool) -> bool:
return True
console.print("Running pre-deploy validation...", style="bold blue")
validator = validate_project()
if not validator.ok:
validator = DeployValidator()
validator.run()
ignored_error_codes = ignored_error_codes or set()
visible_results = [
result
for result in validator.results
if result.severity is not Severity.ERROR
or result.code not in ignored_error_codes
]
render_report(visible_results)
blocking_errors = [
result for result in validator.errors if result.code not in ignored_error_codes
]
if blocking_errors:
console.print(
"\n[bold red]Pre-deploy validation failed. "
"Fix the issues above or re-run with --skip-validate.[/bold red]"
@@ -37,34 +58,74 @@ def _run_predeploy_validation(skip_validate: bool) -> bool:
def _display_git_repository_help() -> None:
"""Explain how to prepare a new project for deployment."""
console.print(
"Deployment requires a Git repository with an origin remote.",
style="bold red",
"Initialized a local Git repository and created an initial commit.",
style="green",
)
console.print(
"CrewAI AMP deploys from the remote repository URL, so commit and push "
"this project first, then run deploy again.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git init")
console.print(" git add .")
console.print(' git commit -m "Initial crew"')
console.print(" git branch -M main")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin main")
def _display_git_remote_help() -> None:
"""Explain how to add a remote to an existing Git repository."""
console.print("No remote repository URL found.", style="bold red")
"""Explain that ZIP deployment will be used without an origin remote."""
console.print(
"CrewAI AMP deploys from the origin remote. Add a remote, push your "
"latest commit, then run deploy again.",
"No origin remote found. Deploying from a ZIP upload instead.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin HEAD")
def _env_summary(env_vars: dict[str, str]) -> str:
"""Return a compact description of environment variables for prompts."""
if not env_vars:
return "0 env vars"
keys = ", ".join(sorted(env_vars))
return f"{len(env_vars)} env vars: {keys}"
def _needs_lockfile_for_deploy(project_root: Path | None = None) -> bool:
"""Return True when deploy should create the project's first lockfile."""
root = project_root or Path.cwd()
if not (root / "pyproject.toml").is_file():
return False
return not (root / "uv.lock").is_file() and not (root / "poetry.lock").is_file()
def _ensure_lockfile_for_deploy() -> None:
"""Create a uv lockfile before deploy when a project has not been run yet."""
if not _needs_lockfile_for_deploy():
return
from crewai_cli.install_crew import install_crew
console.print(
"No lockfile found. Installing dependencies before deployment...",
style="bold blue",
)
try:
install_crew([], raise_on_error=True)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
raise SystemExit(1) from e
def _prepare_project_for_deploy(skip_validate: bool) -> bool:
"""Validate deploy inputs before creating a missing lockfile."""
if skip_validate:
_run_predeploy_validation(skip_validate)
_ensure_lockfile_for_deploy()
return True
needs_lockfile = _needs_lockfile_for_deploy()
ignored_error_codes = _MISSING_LOCKFILE_ERROR_CODES if needs_lockfile else None
if not _run_predeploy_validation(
skip_validate,
ignored_error_codes=ignored_error_codes,
):
return False
if not needs_lockfile:
return True
_ensure_lockfile_for_deploy()
return _run_predeploy_validation(skip_validate)
class DeployCommand(BaseCommand, PlusAPIMixin):
@@ -125,14 +186,30 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
uuid (Optional[str]): The UUID of the crew to deploy.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
if not _prepare_project_for_deploy(skip_validate):
return
self._telemetry.start_deployment_span(uuid)
console.print("Starting deployment...", style="bold blue")
if uuid:
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
if remote_repo_url and uuid:
response = self.plus_api_client.deploy_by_uuid(uuid)
elif self.project_name:
elif remote_repo_url and self.project_name:
response = self.plus_api_client.deploy_by_name(self.project_name)
elif uuid:
_display_git_remote_help()
env_vars = fetch_and_json_env_file()
response = self._update_crew_from_zip(uuid, repository, env_vars)
elif self.project_name:
_display_git_remote_help()
deployment_uuid = self._deployment_uuid_by_name()
env_vars = fetch_and_json_env_file()
response = self._update_crew_from_zip(
deployment_uuid,
repository,
env_vars,
)
else:
self._standard_no_param_error_message()
return
@@ -140,6 +217,19 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._validate_response(response)
self._display_deployment_info(response.json())
def _deployment_uuid_by_name(self) -> str:
"""Resolve the current project's deployment UUID by project name."""
if not self.project_name:
raise ValueError("project_name is required to find a deployment")
response = self.plus_api_client.crew_status_by_name(self.project_name)
self._validate_response(response)
json_response = response.json()
uuid = json_response.get("uuid")
if not uuid:
raise ValueError("Deployment status response did not include a uuid")
return str(uuid)
def create_crew(self, confirm: bool = False, skip_validate: bool = False) -> None:
"""
Create a new crew deployment.
@@ -148,29 +238,143 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
confirm (bool): Whether to skip the interactive confirmation prompt.
skip_validate (bool): Skip pre-deploy validation checks.
"""
if not _run_predeploy_validation(skip_validate):
if not _prepare_project_for_deploy(skip_validate):
return
self._telemetry.create_crew_deployment_span()
console.print("Creating deployment...", style="bold blue")
env_vars = fetch_and_json_env_file()
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
try:
remote_repo_url = git.Repository().origin_url()
except ValueError:
_display_git_repository_help()
return
if remote_repo_url is None:
if remote_repo_url:
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
else:
_display_git_remote_help()
return
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
response = self._create_crew_from_zip(env_vars, repository, confirm)
self._validate_response(response)
self._display_creation_success(response.json())
def _prepare_git_repository(self) -> git.Repository | None:
"""Prepare Git for deploy while preserving remote deploy when possible."""
try:
repository = git.Repository(fetch=False)
except ValueError as exc:
if "not a Git repository" not in str(exc):
console.print(
f"{exc} Continuing with ZIP deployment.",
style="yellow",
)
return None
try:
repository = git.Repository.initialize()
except Exception as init_error:
console.print(
"Git auto-setup did not complete. Continuing with ZIP deployment.",
style="yellow",
)
console.print(str(init_error), style="dim")
try:
return git.Repository(fetch=False)
except Exception as repository_error:
console.print(str(repository_error), style="dim")
return None
_display_git_repository_help()
return repository
remote_repo_url = repository.origin_url()
if remote_repo_url:
try:
repository.fetch()
except ValueError as fetch_error:
console.print(
"Could not fetch from origin. Continuing with remote deployment.",
style="yellow",
)
console.print(str(fetch_error), style="dim")
try:
if repository.create_initial_commit_if_needed():
console.print(
"Created an initial Git commit for this project.",
style="green",
)
except Exception as commit_error:
console.print(
"Could not create an initial Git commit. "
"Continuing with remote deployment.",
style="yellow",
)
console.print(str(commit_error), style="dim")
return repository
try:
if repository.create_initial_commit_if_needed():
console.print(
"Created an initial Git commit for this project.",
style="green",
)
except Exception as commit_error:
console.print(
"Could not create an initial Git commit. "
"Continuing with ZIP deployment using Git file listing.",
style="yellow",
)
console.print(str(commit_error), style="dim")
return repository
return repository
def _create_crew_from_zip(
self,
env_vars: dict[str, str],
repository: git.Repository | None,
confirm: bool,
) -> Any:
"""Create a deployment by uploading a project ZIP archive."""
if not self.project_name:
raise ValueError("project_name is required to create a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
self._confirm_zip_input(env_vars, confirm)
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.create_crew_from_zip(
zip_file_path,
name=self.project_name,
env=env_vars,
)
finally:
zip_file_path.unlink(missing_ok=True)
def _update_crew_from_zip(
self,
uuid: str,
repository: git.Repository | None,
env_vars: dict[str, str],
) -> Any:
"""Update an existing deployment by uploading a project ZIP archive."""
if not self.project_name:
raise ValueError("project_name is required to update a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.update_crew_from_zip(
uuid,
zip_file_path,
env=env_vars,
)
finally:
zip_file_path.unlink(missing_ok=True)
def _confirm_input(
self, env_vars: dict[str, str], remote_repo_url: str, confirm: bool
) -> None:
@@ -183,11 +387,16 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
confirm (bool): Whether to confirm input.
"""
if not confirm:
input(f"Press Enter to continue with the following Env vars: {env_vars}")
input(f"Press Enter to continue with {_env_summary(env_vars)}")
input(
f"Press Enter to continue with the following remote repository: {remote_repo_url}\n"
)
def _confirm_zip_input(self, env_vars: dict[str, str], confirm: bool) -> None:
"""Prompt before ZIP upload unless confirmation was already supplied."""
if not confirm:
input(f"Press Enter to continue with {_env_summary(env_vars)}")
def _create_payload(
self,
env_vars: dict[str, str],

View File

@@ -1,9 +1,31 @@
from __future__ import annotations
from functools import cached_property
from pathlib import Path
import subprocess
_INITIAL_COMMIT_EXCLUDE_PATTERNS = [
".crewai/",
".env",
".env.*",
"!.env.example",
"!.env.sample",
".mypy_cache/",
".pytest_cache/",
".ruff_cache/",
".tox/",
".venv/",
"__pycache__/",
"build/",
"dist/",
"env/",
"venv/",
]
class Repository:
def __init__(self, path: str = ".") -> None:
def __init__(self, path: str = ".", fetch: bool = True) -> None:
self.path = path
if not self.is_git_installed():
@@ -12,7 +34,8 @@ class Repository:
if not self.is_git_repo:
raise ValueError(f"{self.path} is not a Git repository.")
self.fetch()
if fetch:
self.fetch()
@staticmethod
def is_git_installed() -> bool:
@@ -30,7 +53,33 @@ class Repository:
def fetch(self) -> None:
"""Fetch latest updates from the remote."""
subprocess.run(["git", "fetch"], cwd=self.path, check=True) # noqa: S607
command = ["git", "fetch"]
result = subprocess.run( # noqa: S603
command,
cwd=self.path,
capture_output=True,
text=True,
)
if result.returncode == 0:
return
if "No remote repository specified" in result.stderr:
return
details = result.stderr.strip() or result.stdout.strip() or "no output"
raise ValueError(
f"Git fetch failed with exit code {result.returncode} "
f"for command {command!r}: {details}"
)
@classmethod
def initialize(cls, path: str = ".") -> Repository:
"""Initialize a Git repository and create an initial commit if needed."""
if not cls.is_git_installed():
raise ValueError("Git is not installed or not found in your PATH.")
subprocess.run(["git", "init"], cwd=path, check=True) # noqa: S607
repository = cls(path=path, fetch=False)
repository.create_initial_commit_if_needed()
return repository
def status(self) -> str:
"""Get the git status in porcelain format."""
@@ -71,6 +120,74 @@ class Repository:
return False
return True
def has_commits(self) -> bool:
"""Return True if the repository has at least one commit."""
try:
subprocess.run(
["git", "rev-parse", "--verify", "HEAD"], # noqa: S607
cwd=self.path,
capture_output=True,
check=True,
text=True,
)
return True
except subprocess.CalledProcessError:
return False
def create_initial_commit_if_needed(self) -> bool:
"""Create a local initial commit when the repository has no commits."""
if self.has_commits():
return False
self._ensure_initial_commit_excludes()
subprocess.run(["git", "add", "."], cwd=self.path, check=True) # noqa: S607
command = [
"git",
"-c",
"user.name=CrewAI",
"-c",
"user.email=deploy@crewai.com",
"commit",
"--allow-empty",
"-m",
"Initial crew",
]
subprocess.run( # noqa: S603
command,
cwd=self.path,
check=True,
)
return True
def _ensure_initial_commit_excludes(self) -> None:
"""Add local-only ignore patterns before auto-staging an initial commit."""
exclude_file = Path(self.path) / ".git" / "info" / "exclude"
exclude_file.parent.mkdir(parents=True, exist_ok=True)
existing = exclude_file.read_text() if exclude_file.exists() else ""
existing_lines = set(existing.splitlines())
missing_patterns = [
pattern
for pattern in _INITIAL_COMMIT_EXCLUDE_PATTERNS
if pattern not in existing_lines
]
if not missing_patterns:
return
prefix = "" if existing.endswith("\n") or not existing else "\n"
patterns = "\n".join(missing_patterns)
exclude_file.write_text(
f"{existing}{prefix}# CrewAI deploy auto-commit excludes\n{patterns}\n"
)
def deployable_files(self) -> list[str]:
"""Return files tracked by Git or untracked and not ignored."""
output = subprocess.check_output(
["git", "ls-files", "--cached", "--others", "--exclude-standard"], # noqa: S607
cwd=self.path,
encoding="utf-8",
)
return [line for line in output.splitlines() if line]
def origin_url(self) -> str | None:
"""Get the Git repository's remote URL."""
try:

View File

@@ -1,20 +1,77 @@
from pathlib import Path
import subprocess
import click
from crewai_cli.utils import build_env_with_all_tool_credentials
from crewai_cli.deploy.validate import normalize_package_name
from crewai_cli.utils import build_env_with_all_tool_credentials, parse_toml
def _find_json_crew_file(project_root: Path | None = None) -> Path | None:
"""Return the JSON crew definition path when present."""
root = project_root or Path.cwd()
for filename in ("crew.jsonc", "crew.json"):
crew_path = root / filename
if crew_path.is_file():
return crew_path
return None
def _is_json_crew_project(project_root: Path | None = None) -> bool:
"""Return True for JSON crew projects that do not need package install."""
root = project_root or Path.cwd()
if _find_json_crew_file(root) is None:
return False
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return True
try:
pyproject = parse_toml(pyproject_path.read_text())
except Exception:
return True
if not isinstance(pyproject, dict):
return True
tool_config = pyproject.get("tool") or {}
crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None
declared_type = (
crewai_config.get("type") if isinstance(crewai_config, dict) else None
)
project_config = pyproject.get("project") or {}
project_name = (
project_config.get("name") if isinstance(project_config, dict) else None
)
if isinstance(project_name, str):
package_name = normalize_package_name(project_name)
if package_name and (root / "src" / package_name / "crew.py").is_file():
return False
return declared_type != "flow"
# Be mindful about changing this.
# on some environments we don't use this command but instead uv sync directly
# so if you expect this to support more things you will need to replicate it there
# ask @joaomdmoura if you are unsure
def install_crew(proxy_options: list[str]) -> None:
def install_crew(
proxy_options: list[str],
*,
raise_on_error: bool = False,
install_project: bool | None = None,
) -> None:
"""
Install the crew by running the UV command to lock and install.
"""
try:
command = ["uv", "sync", *proxy_options]
if install_project is None:
install_project = not _is_json_crew_project()
command = ["uv", "sync"]
if not install_project and "--no-install-project" not in proxy_options:
command.append("--no-install-project")
command.extend(proxy_options)
# Inject tool repository credentials so uv can authenticate
# against private package indexes (e.g. crewai tool repository).
@@ -22,11 +79,21 @@ def install_crew(proxy_options: list[str]) -> None:
# project depends on tools from a private index.
env = build_env_with_all_tool_credentials()
subprocess.run(command, check=True, capture_output=False, text=True, env=env) # noqa: S603
subprocess.run( # noqa: S603
command,
check=True,
capture_output=False,
text=True,
env=env,
)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True)
if raise_on_error:
raise
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
if raise_on_error:
raise

View File

@@ -1,12 +1,95 @@
"""Re-export of ``crewai_core.plus_api.PlusAPI``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.plus_api`` directly.
"""
"""CrewAI CLI API client extensions."""
from __future__ import annotations
from crewai_core.plus_api import PlusAPI as PlusAPI
from pathlib import Path
from typing import Any, Literal, cast
from urllib.parse import urljoin
from crewai_core.plus_api import PlusAPI as _CorePlusAPI
import httpx
HttpMethod = Literal["GET", "POST", "PATCH", "DELETE"]
class PlusAPI(_CorePlusAPI):
"""CLI API client.
The ZIP deployment methods live here as well as in newer crewai-core
versions so editable CLI installs still work when an older crewai-core is
present in the runtime environment.
"""
def _make_multipart_request(
self,
method: HttpMethod,
endpoint: str,
*,
zip_file_path: str | Path,
data: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
"""Send an authenticated multipart request containing a project ZIP."""
url = urljoin(self.base_url, endpoint)
headers = dict(cast(dict[str, str], self.headers))
headers.pop("Content-Type", None)
path = Path(zip_file_path)
request_kwargs: dict[str, Any] = {"headers": headers}
if data is not None:
request_kwargs["data"] = data
if timeout is not None:
request_kwargs["timeout"] = timeout
with (
path.open("rb") as file_handle,
httpx.Client(trust_env=False, verify=verify) as client,
):
files = {
"zip_file": (path.name, file_handle, "application/zip"),
}
return client.request(method, url, files=files, **request_kwargs)
def create_crew_from_zip(
self,
zip_file_path: str | Path,
*,
name: str | None = None,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Create a crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if name:
data["name"] = name
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/zip",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def update_crew_from_zip(
self,
uuid: str,
zip_file_path: str | Path,
*,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Update an existing crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/{uuid}/zip_update",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
__all__ = ["PlusAPI"]

View File

@@ -35,6 +35,46 @@ class CrewType(Enum):
# crewai.utilities.string_utils (_VARIABLE_PATTERN), including hyphens —
# otherwise placeholders are interpolated at runtime but never prompted for.
_INPUT_PLACEHOLDER_RE = re.compile(r"(?<!{){([A-Za-z_][A-Za-z0-9_\-]*)}(?!})")
_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV = "CREWAI_CLI_RUNNER_PACKAGE_DIR"
_CREWAI_RUNNER_SOURCE_DIR_ENV = "CREWAI_RUNNER_SOURCE_DIR"
_JSON_CREW_RUNNER_CODE = """
import importlib.util
import os
from pathlib import Path
import sys
source_dir = os.environ.get("CREWAI_RUNNER_SOURCE_DIR")
if source_dir:
sys.path.insert(0, source_dir)
package_dir = Path(os.environ["CREWAI_CLI_RUNNER_PACKAGE_DIR"])
package_spec = importlib.util.spec_from_file_location(
"crewai_cli",
package_dir / "__init__.py",
submodule_search_locations=[str(package_dir)],
)
if package_spec is None or package_spec.loader is None:
raise ImportError(f"Cannot load CrewAI CLI package from {package_dir}")
package = importlib.util.module_from_spec(package_spec)
sys.modules["crewai_cli"] = package
package_spec.loader.exec_module(package)
module_path = package_dir / "run_crew.py"
module_spec = importlib.util.spec_from_file_location("crewai_cli.run_crew", module_path)
if module_spec is None or module_spec.loader is None:
raise ImportError(f"Cannot load CrewAI CLI runner from {module_path}")
module = importlib.util.module_from_spec(module_spec)
sys.modules["crewai_cli.run_crew"] = module
module_spec.loader.exec_module(module)
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
module._run_json_crew(
trained_agents_file=os.getenv(CREWAI_TRAINED_AGENTS_FILE_ENV)
)
""".strip()
def _has_json_crew() -> bool:
@@ -216,24 +256,148 @@ def _run_json_crew(trained_agents_file: str | None = None) -> Any:
return app._crew_result
def _has_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project already has a dependency lockfile."""
return _has_uv_lockfile(project_root) or _has_poetry_lockfile(project_root)
def _has_uv_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project has a uv lockfile."""
root = project_root or Path.cwd()
return (root / "uv.lock").is_file()
def _has_poetry_lockfile(project_root: Path | None = None) -> bool:
"""Return True when the project has a Poetry lockfile."""
root = project_root or Path.cwd()
return (root / "poetry.lock").is_file()
def _uses_poetry_lockfile(project_root: Path | None = None) -> bool:
"""Return True when Poetry is the only available lock source."""
return _has_poetry_lockfile(project_root) and not _has_uv_lockfile(project_root)
def _has_project_venv(project_root: Path | None = None) -> bool:
"""Return True when the project already has a local uv environment."""
root = project_root or Path.cwd()
return (root / ".venv").is_dir()
def _install_json_crew_dependencies_if_needed() -> None:
"""Prepare JSON crew dependencies without mutating existing lockfiles."""
project_root = Path.cwd()
if not (project_root / "pyproject.toml").is_file():
return
has_uv_lockfile = _has_uv_lockfile(project_root)
has_lockfile = has_uv_lockfile or _has_poetry_lockfile(project_root)
if has_lockfile and _has_project_venv(project_root):
return
if _uses_poetry_lockfile(project_root):
return
from crewai_cli.install_crew import install_crew
try:
if has_uv_lockfile:
click.echo("Syncing dependencies from lockfile...")
install_crew(["--frozen"], raise_on_error=True)
else:
click.echo("Installing dependencies...")
install_crew([], raise_on_error=True)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
raise SystemExit(1) from e
def _find_local_crewai_source_dir() -> Path | None:
"""Return the repo's CrewAI source dir when running from a source checkout."""
for parent in Path(__file__).resolve().parents:
candidate = parent / "lib" / "crewai" / "src"
if (candidate / "crewai" / "project" / "json_loader.py").is_file():
return candidate
return None
def _json_crew_run_command(project_root: Path | None = None) -> list[str]:
"""Return the project-environment command for running JSON crews."""
if _uses_poetry_lockfile(project_root):
return ["poetry", "run", "python", "-c", _JSON_CREW_RUNNER_CODE]
return ["uv", "run", "--no-sync", "python", "-c", _JSON_CREW_RUNNER_CODE]
def _run_json_crew_in_project_env(trained_agents_file: str | None = None) -> Any:
"""Run JSON crews from the project's uv-managed environment."""
if not (Path.cwd() / "pyproject.toml").is_file():
return _run_json_crew(trained_agents_file=trained_agents_file)
_install_json_crew_dependencies_if_needed()
command = _json_crew_run_command()
env = build_env_with_all_tool_credentials()
env[_CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV] = str(Path(__file__).resolve().parent)
if local_crewai_source_dir := _find_local_crewai_source_dir():
env[_CREWAI_RUNNER_SOURCE_DIR_ENV] = str(local_crewai_source_dir)
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
subprocess.run( # noqa: S603
command,
capture_output=False,
text=True,
check=True,
env=env,
)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
click.echo(f"An unexpected error occurred while running the JSON crew: {e}")
raise SystemExit(1) from e
return None
def _chain_deploy() -> None:
from rich.console import Console
console = Console()
def print_system_exit_failure(exc: SystemExit) -> None:
if isinstance(exc.code, int):
detail = f" with exit code {exc.code}"
elif exc.code:
detail = f": {exc.code}"
else:
detail = ""
console.print(f"\nDeploy failed{detail}\n", style="bold red")
try:
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli.deploy.main import DeployCommand
console.print("\nStarting deployment…\n", style="bold #FF5A50")
DeployCommand().create_crew(confirm=False, skip_validate=True)
except SystemExit:
DeployCommand().create_crew(confirm=True, skip_validate=True)
except AuthenticationRequiredError:
from crewai_cli.authentication.main import AuthenticationCommand
console.print()
AuthenticationCommand().login()
try:
DeployCommand().create_crew(confirm=False, skip_validate=True)
DeployCommand().create_crew(confirm=True, skip_validate=True)
except AuthenticationRequiredError:
console.print(
"\nDeploy failed: authentication is still required.\n",
style="bold red",
)
except SystemExit as e:
print_system_exit_failure(e)
except Exception as e:
console.print(f"\nDeploy failed: {e}\n", style="bold red")
except SystemExit as e:
print_system_exit_failure(e)
except Exception as e:
console.print(f"\nDeploy failed: {e}\n", style="bold red")
@@ -315,7 +479,7 @@ def run_crew(trained_agents_file: str | None = None) -> None:
"""
# JSON crew projects take precedence
if _has_json_crew():
_run_json_crew(trained_agents_file=trained_agents_file)
_run_json_crew_in_project_env(trained_agents_file=trained_agents_file)
return
crewai_version = get_crewai_version()

View File

@@ -0,0 +1,233 @@
from pathlib import Path
import subprocess
import zipfile
import pytest
from crewai_cli.deploy.archive import create_project_zip
def test_create_project_zip_excludes_local_artifacts(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text("print('hello')\n")
(tmp_path / ".env").write_text("OPENAI_API_KEY=secret\n")
(tmp_path / ".env.example").write_text("OPENAI_API_KEY=\n")
(tmp_path / "__pycache__").mkdir()
(tmp_path / "__pycache__" / "main.pyc").write_bytes(b"compiled")
(tmp_path / ".git").mkdir()
(tmp_path / ".git" / "config").write_text("[core]\n")
archive_path = create_project_zip("demo", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
archive_path.unlink(missing_ok=True)
assert names == {
"pyproject.toml",
"uv.lock",
"src/main.py",
".env.example",
}
def test_create_project_zip_uses_repository_file_list(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
(tmp_path / "ignored.txt").write_text("ignored\n")
class RepositoryStub:
def deployable_files(self) -> list[str]:
return ["pyproject.toml", "uv.lock"]
archive_path = create_project_zip(
"demo",
project_dir=tmp_path,
repository=RepositoryStub(), # type: ignore[arg-type]
)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
archive_path.unlink(missing_ok=True)
assert names == {"pyproject.toml", "uv.lock"}
def test_create_project_zip_without_repository_uses_git_ignore_rules(
tmp_path: Path,
):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / ".gitignore").write_text("node_modules/\nsecret.txt\n")
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text("print('hello')\n")
(tmp_path / "node_modules").mkdir()
(tmp_path / "node_modules" / "package.json").write_text("{}\n")
(tmp_path / "secret.txt").write_text("secret\n")
try:
subprocess.run(
["git", "init"],
cwd=tmp_path,
capture_output=True,
check=True,
text=True,
)
except (FileNotFoundError, subprocess.CalledProcessError) as exc:
pytest.skip(f"git is not available in this environment: {exc}")
archive_path = create_project_zip("demo", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
archive_path.unlink(missing_ok=True)
assert names == {
".gitignore",
"pyproject.toml",
"src/main.py",
}
def test_create_project_zip_does_not_fallback_when_repository_listing_fails(
tmp_path: Path,
):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
class RepositoryStub:
def deployable_files(self) -> list[str]:
raise RuntimeError("git listing failed")
with pytest.raises(RuntimeError, match="git listing failed"):
create_project_zip(
"demo",
project_dir=tmp_path,
repository=RepositoryStub(), # type: ignore[arg-type]
)
def test_create_project_zip_excludes_symlinked_files(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
outside_file = tmp_path.parent / f"{tmp_path.name}-secret.txt"
outside_file.write_text("secret\n")
archive_path: Path | None = None
try:
try:
(tmp_path / "external-secret.txt").symlink_to(outside_file)
except OSError as exc:
pytest.skip(f"symlinks are not supported in this environment: {exc}")
archive_path = create_project_zip("demo", project_dir=tmp_path)
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
if archive_path is not None:
archive_path.unlink(missing_ok=True)
outside_file.unlink(missing_ok=True)
assert names == {"pyproject.toml"}
def test_create_project_zip_adds_json_project_wrapper(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "json_crew"
version = "0.1.0"
dependencies = ["crewai[tools]>=1.15"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.crewai]
type = "crew"
""".strip()
+ "\n"
)
(tmp_path / "agents").mkdir()
(tmp_path / "agents" / "researcher.jsonc").write_text("{}\n")
(tmp_path / "crew.jsonc").write_text("{}\n")
archive_path = create_project_zip("json_crew", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
crew_py = archive.read("src/json_crew/crew.py").decode()
main_py = archive.read("src/json_crew/main.py").decode()
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
assert "uv.lock" not in names
assert "crew.jsonc" in names
assert "agents/researcher.jsonc" in names
assert "src/json_crew/__init__.py" in names
assert "src/json_crew/crew.py" in names
assert "src/json_crew/main.py" in names
assert "src/json_crew/config/agents.yaml" in names
assert "src/json_crew/config/tasks.yaml" in names
assert "load_crew(_crew_path())" in crew_py
assert "JsonCrew" in crew_py
assert "from json_crew.crew import JsonCrew" in main_py
assert "run_crew = \"json_crew.main:run\"" in pyproject
@pytest.mark.parametrize(
"tool_config",
[
'tool = "invalid"\n',
'[tool]\ncrewai = "invalid"\n',
],
)
def test_create_project_zip_adds_json_wrapper_for_malformed_tool_config(
tmp_path: Path, tool_config: str
):
(tmp_path / "pyproject.toml").write_text(
f"""
[project]
name = "json_crew"
version = "0.1.0"
{tool_config}
""".strip()
+ "\n"
)
(tmp_path / "crew.jsonc").write_text("{}\n")
archive_path = create_project_zip("json_crew", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
assert "src/json_crew/crew.py" in names
assert "src/json_crew/main.py" in names
assert "run_crew = \"json_crew.main:run\"" in pyproject
def test_create_project_zip_rejects_empty_normalized_package_name(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "!!!"
version = "0.1.0"
[tool.crewai]
type = "crew"
""".strip()
+ "\n"
)
(tmp_path / "crew.jsonc").write_text("{}\n")
with pytest.raises(
ValueError,
match=r"Could not derive a valid Python package name",
):
create_project_zip("invalid", project_dir=tmp_path)

View File

@@ -1,16 +1,172 @@
import sys
import unittest
from io import StringIO
from pathlib import Path
import subprocess
from unittest.mock import MagicMock, Mock, patch
import pytest
import json
import crewai_cli.deploy.main as deploy_main
import httpx
from crewai_cli.deploy.main import DeployCommand
from crewai_cli.deploy.validate import Severity, ValidationResult
from crewai_cli.utils import parse_toml
def test_ensure_lockfile_for_deploy_runs_install_when_lock_missing(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
calls = []
def fake_install_crew(proxy_options, *, raise_on_error=False):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
deploy_main._ensure_lockfile_for_deploy()
assert calls == [([], True)]
def test_ensure_lockfile_for_deploy_skips_when_lock_exists(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
calls = []
def fake_install_crew(proxy_options, *, raise_on_error=False):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
deploy_main._ensure_lockfile_for_deploy()
assert calls == []
def test_ensure_lockfile_for_deploy_skips_without_pyproject(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
calls = []
def fake_install_crew(proxy_options, *, raise_on_error=False):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
deploy_main._ensure_lockfile_for_deploy()
assert calls == []
def test_ensure_lockfile_for_deploy_failure_exits_nonzero(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
def fake_install_crew(proxy_options, *, raise_on_error=False):
raise subprocess.CalledProcessError(42, ["uv", "sync"])
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
with pytest.raises(SystemExit) as exc_info:
deploy_main._ensure_lockfile_for_deploy()
assert exc_info.value.code == 42
class _FakeDeployValidator:
def __init__(self, results: list[ValidationResult]):
self.results = results
@property
def errors(self) -> list[ValidationResult]:
return [
result
for result in self.results
if result.severity is Severity.ERROR
]
def run(self) -> list[ValidationResult]:
return self.results
def test_prepare_project_for_deploy_blocks_install_when_validation_fails(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
install_calls = []
rendered_results = []
missing_lockfile = ValidationResult(
Severity.ERROR,
"missing_lockfile",
"Expected to find a lockfile",
)
invalid_config = ValidationResult(
Severity.ERROR,
"invalid_crew_json",
"crew.jsonc has invalid configuration",
)
monkeypatch.setattr(
deploy_main,
"DeployValidator",
lambda: _FakeDeployValidator([missing_lockfile, invalid_config]),
)
monkeypatch.setattr(deploy_main, "render_report", rendered_results.append)
def fake_install_crew(proxy_options, *, raise_on_error=False):
install_calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
assert deploy_main._prepare_project_for_deploy(skip_validate=False) is False
assert install_calls == []
assert [[result.code for result in results] for results in rendered_results] == [
["invalid_crew_json"]
]
def test_prepare_project_for_deploy_creates_missing_lock_after_validation(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
install_calls = []
missing_lockfile = ValidationResult(
Severity.ERROR,
"missing_lockfile",
"Expected to find a lockfile",
)
validators = [
_FakeDeployValidator([missing_lockfile]),
_FakeDeployValidator([]),
]
def fake_validator():
return validators.pop(0)
def fake_install_crew(proxy_options, *, raise_on_error=False):
install_calls.append((proxy_options, raise_on_error))
(tmp_path / "uv.lock").write_text("# lock\n")
monkeypatch.setattr(deploy_main, "DeployValidator", fake_validator)
monkeypatch.setattr(deploy_main, "render_report", lambda results: None)
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
assert deploy_main._prepare_project_for_deploy(skip_validate=False) is True
assert install_calls == [([], True)]
assert validators == []
class TestDeployCommand(unittest.TestCase):
@patch("crewai_cli.command.get_auth_token")
@patch("crewai_cli.deploy.main.get_project_name")
@@ -28,19 +184,25 @@ class TestDeployCommand(unittest.TestCase):
self.mock_get_auth_token.return_value = "test_token"
self.mock_get_project_name.return_value = "test_project"
self.deploy_command = DeployCommand()
self.deploy_command = deploy_main.DeployCommand()
self.mock_client = self.deploy_command.plus_api_client
def test_init_success(self):
self.assertEqual(self.deploy_command.project_name, "test_project")
self.mock_plus_api.assert_called_once_with(api_key="test_token")
@patch("builtins.input")
def test_confirm_zip_input_only_confirms_env_vars(self, mock_input):
self.deploy_command._confirm_zip_input({"MODEL": "openai/gpt-5"}, False)
mock_input.assert_called_once_with("Press Enter to continue with 1 env vars: MODEL")
@patch("crewai_cli.command.get_auth_token")
def test_init_failure(self, mock_get_auth_token):
mock_get_auth_token.side_effect = Exception("Auth failed")
with self.assertRaises(SystemExit):
DeployCommand()
deploy_main.DeployCommand()
def test_validate_response_successful_response(self):
mock_response = Mock(spec=httpx.Response)
@@ -123,8 +285,15 @@ class TestDeployCommand(unittest.TestCase):
)
self.assertIn("2023-01-01 - INFO: Test log", fake_out.getvalue())
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_uuid(self, mock_display):
def test_deploy_with_uuid(self, mock_display, mock_repository):
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
@@ -135,8 +304,15 @@ class TestDeployCommand(unittest.TestCase):
self.mock_client.deploy_by_uuid.assert_called_once_with("test-uuid")
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_project_name(self, mock_display):
def test_deploy_with_project_name(self, mock_display, mock_repository):
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
@@ -147,17 +323,142 @@ class TestDeployCommand(unittest.TestCase):
self.mock_client.deploy_by_name.assert_called_once_with("test_project")
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_remote_keeps_remote_path_when_fetch_fails(
self, mock_display, mock_repository, mock_create_project_zip
):
repository = mock_repository.return_value
repository.origin_url.return_value = "https://github.com/test/repo.git"
repository.fetch.side_effect = ValueError("fetch failed")
repository.create_initial_commit_if_needed.return_value = False
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.deploy_by_name.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.deploy(skip_validate=True)
output = fake_out.getvalue()
mock_repository.assert_called_once_with(fetch=False)
repository.fetch.assert_called_once_with()
self.assertIn("Continuing with remote deployment", output)
self.mock_client.deploy_by_name.assert_called_once_with("test_project")
self.mock_client.update_crew_from_zip.assert_not_called()
mock_create_project_zip.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_remote_keeps_remote_path_when_initial_commit_fails(
self, mock_display, mock_repository, mock_create_project_zip
):
repository = mock_repository.return_value
repository.origin_url.return_value = "https://github.com/test/repo.git"
repository.create_initial_commit_if_needed.side_effect = RuntimeError(
"commit failed"
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.deploy_by_name.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.deploy(skip_validate=True)
output = fake_out.getvalue()
mock_repository.assert_called_once_with(fetch=False)
repository.fetch.assert_called_once_with()
self.assertIn("Continuing with remote deployment", output)
self.mock_client.deploy_by_name.assert_called_once_with("test_project")
self.mock_client.update_crew_from_zip.assert_not_called()
mock_create_project_zip.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository.origin_url")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_uuid_without_remote_updates_from_zip(
self, mock_display, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.update_crew_from_zip.return_value = mock_response
self.deploy_command.deploy(uuid="test-uuid", skip_validate=True)
self.mock_client.update_crew_from_zip.assert_called_once_with(
"test-uuid",
Path("/tmp/test_project.zip"),
env={"ENV_VAR": "value"},
)
self.mock_client.deploy_by_uuid.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_project_name_without_remote_updates_from_zip(
self, mock_display, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
status_response = MagicMock()
status_response.status_code = 200
status_response.is_success = True
status_response.json.return_value = {"uuid": "test-uuid"}
update_response = MagicMock()
update_response.status_code = 200
update_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.crew_status_by_name.return_value = status_response
self.mock_client.update_crew_from_zip.return_value = update_response
self.deploy_command.deploy(skip_validate=True)
self.mock_client.crew_status_by_name.assert_called_once_with("test_project")
self.mock_client.update_crew_from_zip.assert_called_once_with(
"test-uuid",
Path("/tmp/test_project.zip"),
env={"ENV_VAR": "value"},
)
self.mock_client.deploy_by_name.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("builtins.input")
@pytest.mark.timeout(180)
def test_create_crew(self, mock_input, mock_git_origin_url, mock_fetch_env):
def test_create_crew(self, mock_input, mock_repository, mock_fetch_env):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_git_origin_url.return_value = "https://github.com/test/repo.git"
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_input.return_value = ""
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "new-uuid", "status": "created"}
self.mock_client.create_crew.return_value = mock_response
@@ -166,38 +467,127 @@ class TestDeployCommand(unittest.TestCase):
self.assertIn("Deployment created successfully!", fake_out.getvalue())
self.assertIn("new-uuid", fake_out.getvalue())
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
def test_create_crew_without_git_repo_shows_setup_help(
self, mock_repository, mock_fetch_env
def test_create_crew_without_git_repo_initializes_and_uses_zip(
self, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.side_effect = ValueError("not a Git repository")
initialized_repository = MagicMock()
initialized_repository.origin_url.return_value = None
mock_repository.initialize.return_value = initialized_repository
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "zip-uuid", "status": "created"}
self.mock_client.create_crew_from_zip.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.create_crew(skip_validate=True)
self.deploy_command.create_crew(confirm=True, skip_validate=True)
output = fake_out.getvalue()
self.assertIn("Deployment requires a Git repository", output)
self.assertIn("git init", output)
self.assertIn("git remote add origin <your-repo-url>", output)
self.assertIn("Initialized a local Git repository", output)
self.assertIn("Deploying from a ZIP upload", output)
mock_repository.initialize.assert_called_once_with()
mock_create_project_zip.assert_called_once_with(
"test_project", repository=initialized_repository
)
self.mock_client.create_crew_from_zip.assert_called_once_with(
Path("/tmp/test_project.zip"),
name="test_project",
env={"ENV_VAR": "value"},
)
self.mock_client.create_crew.assert_not_called()
@patch("crewai_cli.deploy.main.git.Repository")
def test_prepare_git_repository_returns_repo_when_init_commit_fails(
self, mock_repository
):
recovered_repository = MagicMock()
mock_repository.side_effect = [
ValueError("not a Git repository"),
recovered_repository,
]
mock_repository.initialize.side_effect = RuntimeError("commit failed")
with patch("sys.stdout", new=StringIO()) as fake_out:
repository = self.deploy_command._prepare_git_repository()
self.assertIs(repository, recovered_repository)
self.assertIn("Git auto-setup did not complete", fake_out.getvalue())
mock_repository.initialize.assert_called_once_with()
self.assertEqual(mock_repository.call_count, 2)
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
def test_create_crew_without_remote_shows_remote_help(
self, mock_repository, mock_fetch_env
def test_create_crew_without_remote_uses_zip(
self, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "zip-uuid", "status": "created"}
self.mock_client.create_crew_from_zip.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.create_crew(skip_validate=True)
self.deploy_command.create_crew(confirm=True, skip_validate=True)
output = fake_out.getvalue()
self.assertIn("No remote repository URL found.", output)
self.assertIn("git remote add origin <your-repo-url>", output)
self.assertIn("git push -u origin HEAD", output)
self.assertIn("No origin remote found.", output)
self.assertIn("Deploying from a ZIP upload", output)
mock_create_project_zip.assert_called_once_with(
"test_project", repository=mock_repository.return_value
)
self.mock_client.create_crew_from_zip.assert_called_once_with(
Path("/tmp/test_project.zip"),
name="test_project",
env={"ENV_VAR": "value"},
)
self.mock_client.create_crew.assert_not_called()
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
def test_create_crew_without_remote_uses_git_file_list_when_commit_fails(
self, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
repository = mock_repository.return_value
repository.origin_url.return_value = None
repository.create_initial_commit_if_needed.side_effect = RuntimeError(
"commit failed"
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "zip-uuid", "status": "created"}
self.mock_client.create_crew_from_zip.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.create_crew(confirm=True, skip_validate=True)
output = fake_out.getvalue()
self.assertIn("Continuing with ZIP deployment using Git", output)
self.assertIn("file listing", output)
mock_create_project_zip.assert_called_once_with(
"test_project", repository=repository
)
self.mock_client.create_crew_from_zip.assert_called_once_with(
Path("/tmp/test_project.zip"),
name="test_project",
env={"ENV_VAR": "value"},
)
self.mock_client.create_crew.assert_not_called()
def test_list_crews(self):

View File

@@ -481,7 +481,7 @@ def test_modern_crewai_pin_does_not_warn(tmp_path: Path) -> None:
def test_create_crew_aborts_on_validation_error(tmp_path: Path) -> None:
"""`crewai deploy create` must not contact the API when validation fails."""
from unittest.mock import MagicMock, patch as mock_patch
from unittest.mock import patch as mock_patch
from crewai_cli.deploy.main import DeployCommand
@@ -490,10 +490,10 @@ def test_create_crew_aborts_on_validation_error(tmp_path: Path) -> None:
mock_patch("crewai_cli.deploy.main.get_project_name", return_value="p"),
mock_patch("crewai_cli.command.PlusAPI") as mock_api,
mock_patch(
"crewai_cli.deploy.main.validate_project"
) as mock_validate,
"crewai_cli.deploy.main._prepare_project_for_deploy",
return_value=False,
),
):
mock_validate.return_value = MagicMock(ok=False)
cmd = DeployCommand()
cmd.create_crew()
assert not cmd.plus_api_client.create_crew.called

View File

@@ -19,6 +19,7 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli import run_crew
from crewai_cli.crew_run_tui import CrewRunApp
@@ -68,7 +69,7 @@ def test_chain_deploy_skips_validation_after_auth_retry(monkeypatch) -> None:
create_calls.append(kwargs)
FakeDeployCommand.attempts += 1
if FakeDeployCommand.attempts == 1:
raise SystemExit(1)
raise AuthenticationRequiredError
class FakeAuthenticationCommand:
def login(self) -> None:
@@ -83,12 +84,38 @@ def test_chain_deploy_skips_validation_after_auth_retry(monkeypatch) -> None:
run_crew._chain_deploy()
assert create_calls == [
{"confirm": False, "skip_validate": True},
{"confirm": False, "skip_validate": True},
{"confirm": True, "skip_validate": True},
{"confirm": True, "skip_validate": True},
]
assert login_calls == [True]
def test_chain_deploy_does_not_login_for_deploy_exit(monkeypatch, capsys) -> None:
create_calls: list[dict[str, object]] = []
login_calls: list[bool] = []
class FakeDeployCommand:
def create_crew(self, **kwargs) -> None:
create_calls.append(kwargs)
raise SystemExit(42)
class FakeAuthenticationCommand:
def login(self) -> None:
login_calls.append(True)
monkeypatch.setattr("crewai_cli.deploy.main.DeployCommand", FakeDeployCommand)
monkeypatch.setattr(
"crewai_cli.authentication.main.AuthenticationCommand",
FakeAuthenticationCommand,
)
run_crew._chain_deploy()
assert create_calls == [{"confirm": True, "skip_validate": True}]
assert login_calls == []
assert "Deploy failed with exit code 42" in capsys.readouterr().out
def test_plan_step_status_updates_only_the_explicit_step() -> None:
app = _app_with_plan()

View File

@@ -31,6 +31,18 @@ def test_is_git_not_installed(fp):
Repository(path=".")
def test_fetch_failure_raises_value_error(fp):
fp.register(["git", "--version"], stdout="git version 2.30.0\n")
fp.register(["git", "rev-parse", "--is-inside-work-tree"], stdout="true\n")
fp.register(["git", "fetch"], returncode=128, stderr="remote unavailable\n")
with pytest.raises(
ValueError,
match=r"Git fetch failed with exit code 128 for command \['git', 'fetch'\]: remote unavailable",
):
Repository(path=".")
def test_status(fp, repository):
fp.register(
["git", "status", "--branch", "--porcelain"],
@@ -99,3 +111,45 @@ def test_origin_url(fp, repository):
stdout="https://github.com/user/repo.git\n",
)
assert repository.origin_url() == "https://github.com/user/repo.git"
def test_initialize_creates_initial_commit(fp, tmp_path):
fp.register(["git", "--version"], stdout="git version 2.30.0\n")
fp.register(["git", "init"], stdout="")
fp.register(["git", "--version"], stdout="git version 2.30.0\n")
fp.register(["git", "rev-parse", "--is-inside-work-tree"], stdout="true\n")
fp.register(["git", "rev-parse", "--verify", "HEAD"], returncode=1)
fp.register(["git", "add", "."], stdout="")
fp.register(
[
"git",
"-c",
"user.name=CrewAI",
"-c",
"user.email=deploy@crewai.com",
"commit",
"--allow-empty",
"-m",
"Initial crew",
],
stdout="",
)
repo = Repository.initialize(path=str(tmp_path))
assert repo.path == str(tmp_path)
exclude_file = tmp_path / ".git" / "info" / "exclude"
exclude_text = exclude_file.read_text()
assert ".env" in exclude_text
assert "!.env.example" in exclude_text
assert "!.env.sample" in exclude_text
assert "__pycache__/" in exclude_text
def test_deployable_files_uses_git_excludes(fp, repository):
fp.register(
["git", "ls-files", "--cached", "--others", "--exclude-standard"],
stdout="pyproject.toml\nsrc/main.py\n",
)
assert repository.deployable_files() == ["pyproject.toml", "src/main.py"]

View File

@@ -0,0 +1,102 @@
from pathlib import Path
import subprocess
import pytest
import crewai_cli.install_crew as install_crew_module
@pytest.fixture(autouse=True)
def _tool_credentials(monkeypatch):
monkeypatch.setattr(
install_crew_module,
"build_env_with_all_tool_credentials",
lambda: {"CREWAI_TEST": "1"},
)
def test_install_crew_json_project_skips_project_install(
fp, monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "json_crew"
[tool.crewai]
type = "crew"
""".strip()
)
(tmp_path / "crew.jsonc").write_text("{}\n")
fp.register(["uv", "sync", "--no-install-project"], stdout="")
install_crew_module.install_crew([])
def test_install_crew_json_project_with_python_package_installs_project(
fp, monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "hybrid-crew"
[tool.crewai]
type = "crew"
""".strip()
)
(tmp_path / "crew.jsonc").write_text("{}\n")
package_dir = tmp_path / "src" / "hybrid_crew"
package_dir.mkdir(parents=True)
(package_dir / "crew.py").write_text("class HybridCrew: ...\n")
fp.register(["uv", "sync"], stdout="")
install_crew_module.install_crew([])
def test_install_crew_flow_project_installs_project(fp, monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "flow_project"
[tool.crewai]
type = "flow"
""".strip()
)
(tmp_path / "crew.jsonc").write_text("{}\n")
fp.register(["uv", "sync"], stdout="")
install_crew_module.install_crew([])
def test_install_crew_classic_project_installs_project(
fp, monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'classic'\n")
fp.register(["uv", "sync"], stdout="")
install_crew_module.install_crew([])
def test_install_crew_install_project_false_adds_no_install_project(fp):
fp.register(["uv", "sync", "--no-install-project", "--frozen"], stdout="")
install_crew_module.install_crew(["--frozen"], install_project=False)
def test_install_crew_reraises_sync_failure_when_requested(fp):
fp.register(["uv", "sync"], returncode=1, stderr="sync failed\n")
with pytest.raises(subprocess.CalledProcessError):
install_crew_module.install_crew([], raise_on_error=True)
def test_install_crew_swallows_sync_failure_by_default(fp):
fp.register(["uv", "sync"], returncode=1, stderr="sync failed\n")
install_crew_module.install_crew([])

View File

@@ -292,6 +292,36 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_cli.plus_api.PlusAPI._make_multipart_request")
def test_create_crew_from_zip(self, mock_make_multipart_request):
self.api.create_crew_from_zip(
"/tmp/test.zip",
name="test_crew",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/zip",
zip_file_path="/tmp/test.zip",
data={"name": "test_crew", "env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_cli.plus_api.PlusAPI._make_multipart_request")
def test_update_crew_from_zip(self, mock_make_multipart_request):
self.api.update_crew_from_zip(
"test_uuid",
"/tmp/test.zip",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/test_uuid/zip_update",
zip_file_path="/tmp/test.zip",
data={"env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):

View File

@@ -2,6 +2,8 @@
import os
from pathlib import Path
import subprocess
import sys
import pytest
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
@@ -14,16 +16,340 @@ def test_run_crew_forwards_trained_agents_file_to_json_crews(monkeypatch):
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: True)
called: dict = {}
def fake_run_json_crew(trained_agents_file=None):
def fake_run_json_crew_in_project_env(trained_agents_file=None):
called["trained_agents_file"] = trained_agents_file
monkeypatch.setattr(run_crew_module, "_run_json_crew", fake_run_json_crew)
monkeypatch.setattr(
run_crew_module,
"_run_json_crew_in_project_env",
fake_run_json_crew_in_project_env,
)
run_crew_module.run_crew(trained_agents_file="some.pkl")
assert called == {"trained_agents_file": "some.pkl"}
def test_json_run_uses_project_env_when_pyproject_exists(monkeypatch, tmp_path: Path):
"""JSON crew runs should execute inside the project uv environment."""
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
install_calls = []
subprocess_calls = []
monkeypatch.setattr(
run_crew_module,
"_install_json_crew_dependencies_if_needed",
lambda: install_calls.append(True),
)
monkeypatch.setattr(
run_crew_module,
"build_env_with_all_tool_credentials",
lambda: {"EXISTING": "value"},
)
def fake_subprocess_run(command, **kwargs):
subprocess_calls.append((command, kwargs))
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
run_crew_module._run_json_crew_in_project_env(
trained_agents_file="trained.pkl"
)
expected_env = {
"EXISTING": "value",
run_crew_module._CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV: str(
Path(run_crew_module.__file__).resolve().parent
),
CREWAI_TRAINED_AGENTS_FILE_ENV: "trained.pkl",
}
if local_crewai_source_dir := run_crew_module._find_local_crewai_source_dir():
expected_env[run_crew_module._CREWAI_RUNNER_SOURCE_DIR_ENV] = str(
local_crewai_source_dir
)
assert install_calls == [True]
assert subprocess_calls == [
(
[
"uv",
"run",
"--no-sync",
"python",
"-c",
run_crew_module._JSON_CREW_RUNNER_CODE,
],
{
"capture_output": False,
"text": True,
"check": True,
"env": expected_env,
},
)
]
def test_json_run_uses_poetry_run_for_poetry_lock_without_uv_lock(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "poetry.lock").write_text("# lock\n")
monkeypatch.setattr(
run_crew_module,
"_install_json_crew_dependencies_if_needed",
lambda: None,
)
monkeypatch.setattr(
run_crew_module,
"build_env_with_all_tool_credentials",
lambda: {},
)
subprocess_calls = []
def fake_subprocess_run(command, **kwargs):
subprocess_calls.append((command, kwargs))
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
run_crew_module._run_json_crew_in_project_env()
expected_env = {
run_crew_module._CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV: str(
Path(run_crew_module.__file__).resolve().parent
),
}
if local_crewai_source_dir := run_crew_module._find_local_crewai_source_dir():
expected_env[run_crew_module._CREWAI_RUNNER_SOURCE_DIR_ENV] = str(
local_crewai_source_dir
)
assert subprocess_calls == [
(
[
"poetry",
"run",
"python",
"-c",
run_crew_module._JSON_CREW_RUNNER_CODE,
],
{
"capture_output": False,
"text": True,
"check": True,
"env": expected_env,
},
)
]
def test_json_runner_code_loads_current_cli_package_over_project_env(tmp_path: Path):
old_parent = tmp_path / "old"
old_pkg = old_parent / "crewai_cli"
old_pkg.mkdir(parents=True)
(old_pkg / "__init__.py").write_text("")
(old_pkg / "run_crew.py").write_text("raise ImportError('old package used')\n")
old_crewai_project = old_parent / "crewai" / "project"
old_crewai_project.mkdir(parents=True)
(old_parent / "crewai" / "__init__.py").write_text("")
(old_crewai_project / "__init__.py").write_text("")
(old_crewai_project / "json_loader.py").write_text(
"raise ImportError('old crewai used')\n"
)
current_pkg = tmp_path / "current" / "crewai_cli"
current_pkg.mkdir(parents=True)
marker = tmp_path / "marker.txt"
(current_pkg / "__init__.py").write_text("")
(current_pkg / "run_crew.py").write_text(
"from pathlib import Path\n"
"from crewai.project.json_loader import SOURCE\n"
"def _run_json_crew(trained_agents_file=None):\n"
f" Path({str(marker)!r}).write_text(SOURCE + ':' + (trained_agents_file or ''))\n"
)
current_crewai_project = tmp_path / "current_crewai_src" / "crewai" / "project"
current_crewai_project.mkdir(parents=True)
(tmp_path / "current_crewai_src" / "crewai" / "__init__.py").write_text("")
(current_crewai_project / "__init__.py").write_text("")
(current_crewai_project / "json_loader.py").write_text("SOURCE = 'current'\n")
env = os.environ.copy()
env["PYTHONPATH"] = str(old_parent)
env[run_crew_module._CREWAI_CLI_RUNNER_PACKAGE_DIR_ENV] = str(current_pkg)
env[run_crew_module._CREWAI_RUNNER_SOURCE_DIR_ENV] = str(
tmp_path / "current_crewai_src"
)
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = "trained.pkl"
subprocess.run(
[sys.executable, "-c", run_crew_module._JSON_CREW_RUNNER_CODE],
check=True,
env=env,
cwd=tmp_path,
)
assert marker.read_text() == "current:trained.pkl"
def test_json_run_without_pyproject_runs_in_process(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
called: dict = {}
def fake_run_json_crew(trained_agents_file=None):
called["trained_agents_file"] = trained_agents_file
return "result"
monkeypatch.setattr(run_crew_module, "_run_json_crew", fake_run_json_crew)
assert (
run_crew_module._run_json_crew_in_project_env(
trained_agents_file="trained.pkl"
)
== "result"
)
assert called == {"trained_agents_file": "trained.pkl"}
def test_json_project_env_run_failure_exits_nonzero(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
monkeypatch.setattr(
run_crew_module, "_install_json_crew_dependencies_if_needed", lambda: None
)
monkeypatch.setattr(
run_crew_module, "build_env_with_all_tool_credentials", lambda: {}
)
def fake_subprocess_run(command, **kwargs):
raise subprocess.CalledProcessError(7, command)
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
with pytest.raises(SystemExit) as exc_info:
run_crew_module._run_json_crew_in_project_env()
assert exc_info.value.code == 7
def test_json_run_installs_dependencies_when_pyproject_has_no_lockfile(
monkeypatch, tmp_path: Path
):
"""JSON crew runs should lock/sync project dependencies only when needed."""
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
calls = []
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
calls.append((proxy_options, raise_on_error, install_project))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies_if_needed()
assert calls == [([], True, None)]
def test_json_run_syncs_frozen_when_uv_lock_exists_without_venv(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
calls = []
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
calls.append((proxy_options, raise_on_error, install_project))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies_if_needed()
assert calls == [(["--frozen"], True, None)]
def test_json_run_skips_uv_sync_when_only_poetry_lock_exists_without_venv(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "poetry.lock").write_text("# lock\n")
calls = []
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
calls.append((proxy_options, raise_on_error, install_project))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies_if_needed()
assert calls == []
@pytest.mark.parametrize("lockfile", ["uv.lock", "poetry.lock"])
def test_json_run_skips_dependency_install_when_lockfile_and_venv_exist(
monkeypatch, tmp_path: Path, lockfile: str
):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / lockfile).write_text("# lock\n")
(tmp_path / ".venv").mkdir()
calls = []
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
calls.append((proxy_options, raise_on_error, install_project))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies_if_needed()
assert calls == []
def test_json_run_skips_dependency_install_without_pyproject(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
calls = []
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies_if_needed()
assert calls == []
def test_json_run_install_failure_exits_nonzero(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
def fake_install_crew(
proxy_options, *, raise_on_error=False, install_project=None
):
raise subprocess.CalledProcessError(42, ["uv", "sync"])
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
with pytest.raises(SystemExit) as exc_info:
run_crew_module._install_json_crew_dependencies_if_needed()
assert exc_info.value.code == 42
def test_run_json_crew_exports_trained_agents_env(monkeypatch, tmp_path: Path):
"""JSON crews run in-process, so the pickle path must land in the env var."""
monkeypatch.chdir(tmp_path)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Final, Literal, TypedDict, cast
from urllib.parse import urljoin
@@ -190,6 +191,36 @@ class PlusAPI:
with httpx.Client(trust_env=False, verify=verify) as client:
return client.request(method, url, **request_kwargs)
def _make_multipart_request(
self,
method: HttpMethod,
endpoint: str,
*,
zip_file_path: str | Path,
data: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
"""Send an authenticated multipart request containing a project ZIP."""
url = urljoin(self.base_url, endpoint)
headers = dict(cast(dict[str, str], self.headers))
headers.pop("Content-Type", None)
path = Path(zip_file_path)
request_kwargs: dict[str, Any] = {"headers": headers}
if data is not None:
request_kwargs["data"] = data
if timeout is not None:
request_kwargs["timeout"] = timeout
with (
path.open("rb") as file_handle,
httpx.Client(trust_env=False, verify=verify) as client,
):
files = {
"zip_file": (path.name, file_handle, "application/zip"),
}
return client.request(method, url, files=files, **request_kwargs)
def login_to_tool_repository(
self, user_identifier: str | None = None
) -> httpx.Response:
@@ -312,6 +343,46 @@ class PlusAPI:
def create_crew(self, payload: CreateCrewPayload) -> httpx.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def create_crew_from_zip(
self,
zip_file_path: str | Path,
*,
name: str | None = None,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Create a crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if name:
data["name"] = name
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/zip",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def update_crew_from_zip(
self,
uuid: str,
zip_file_path: str | Path,
*,
env: dict[str, str] | None = None,
) -> httpx.Response:
"""Update an existing crew deployment from a local project ZIP archive."""
data: dict[str, str] = {}
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/{uuid}/zip_update",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def get_organizations(self) -> httpx.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)

View File

@@ -345,6 +345,36 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_core.plus_api.PlusAPI._make_multipart_request")
def test_create_crew_from_zip(self, mock_make_multipart_request):
self.api.create_crew_from_zip(
"/tmp/test.zip",
name="test_crew",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/zip",
zip_file_path="/tmp/test.zip",
data={"name": "test_crew", "env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.PlusAPI._make_multipart_request")
def test_update_crew_from_zip(self, mock_make_multipart_request):
self.api.update_crew_from_zip(
"test_uuid",
"/tmp/test.zip",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/test_uuid/zip_update",
zip_file_path="/tmp/test.zip",
data={"env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):

View File

@@ -192,7 +192,7 @@ exclude-newer = "3 days"
# docling-core 2.74.0 has GHSA-j5xp-7m2f-49jv, GHSA-jmmv-h3mp-59v8; force 2.74.1+.
# pip <26.1.1 has GHSA-58qw-9mgm-455v (archive handling); OSV considers 26.1.1 unaffected.
# paramiko <5.0.0 has GHSA-r374-rxx8-8654 (SHA-1 in rsakey.py); OSV considers 5.0.0 unaffected. Transitive via composio-core.
# starlette <1.0.1 has PYSEC-2026-161 (missing Host header validation poisons request.url.path, bypassing path-based auth). Transitive via fastapi.
# starlette <1.3.1 has PYSEC-2026-161, GHSA-jp82-jpqv-5vv3, and GHSA-82w8-qh3p-5jfq. Transitive via fastapi.
# litellm 1.83.8+ hard-pins openai==2.24.0, missing openai.types.responses used by crewai;
# override to >=2.30.0 (the version litellm 1.83.7 used) until upstream relaxes the pin.
override-dependencies = [
@@ -218,7 +218,7 @@ override-dependencies = [
"docling-core[chunking]>=2.74.1",
"pydantic-settings>=2.14.0",
"paramiko>=5.0.0",
"starlette>=1.0.1",
"starlette>=1.3.1",
]
[tool.uv.workspace]

8
uv.lock generated
View File

@@ -43,7 +43,7 @@ overrides = [
{ name = "pypdf", specifier = ">=6.10.2,<7" },
{ name = "python-multipart", specifier = ">=0.0.27,<1" },
{ name = "rich", specifier = ">=13.7.1" },
{ name = "starlette", specifier = ">=1.0.1" },
{ name = "starlette", specifier = ">=1.3.1" },
{ name = "transformers", marker = "python_full_version >= '3.10'", specifier = ">=5.4.0" },
{ name = "urllib3", specifier = ">=2.7.0" },
{ name = "uv", specifier = ">=0.11.15,<1" },
@@ -8666,15 +8666,15 @@ wheels = [
[[package]]
name = "starlette"
version = "1.2.1"
version = "1.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/25/44/ec35f1b6e83094b997da438a02c8c9b0ade2b1e84cfc48bd4656780760a6/starlette-1.2.1.tar.gz", hash = "sha256:9b9b5ebb992e67d6093741e63c2f59e4f6fff986f81163c087867bd7b924b3f6", size = 2701854, upload-time = "2026-05-31T01:07:51.847Z" }
sdist = { url = "https://files.pythonhosted.org/packages/eb/e3/7c1dc7381d9f8ab7d854328ebfa884e62cb3f3d8549ddfd37c7814f42afa/starlette-1.3.1.tar.gz", hash = "sha256:05d0213193f2fbaae60e2ecb593b4add4262ad4e46536b54abe36f11a71724e0", size = 2703240, upload-time = "2026-06-12T09:23:11.602Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1c/54/196d0c1db10af76baa4f64894448505d60d3cdf70ef92cbb35f46a4e4c71/starlette-1.2.1-py3-none-any.whl", hash = "sha256:4de0082d08c8f6764a85a54cf1120d6939507a19905c7768acad2a9f875d2b89", size = 73350, upload-time = "2026-05-31T01:07:50.09Z" },
{ url = "https://files.pythonhosted.org/packages/ec/bb/2799cc2ede3ed41131f8975621e7213dfc7ef4acbbaadfa440f32500c370/starlette-1.3.1-py3-none-any.whl", hash = "sha256:c7372aae11c3c3f26a42df7bd626cec2f47d03483d261d369516a615a53714c6", size = 73632, upload-time = "2026-06-12T09:23:10.017Z" },
]
[[package]]