Update crewAI CLI with various enhancements and fixes

- Updated `create_json_crew.py` to require `crewai[tools]>=1.14.7`.
- Enhanced `git.py` with improved repository initialization, including automatic initial commit creation and exclusion patterns for initial commits.
- Modified `install_crew.py` to allow error handling during installation with an optional `raise_on_error` parameter.
- Expanded `plus_api.py` to include methods for creating and updating crews from ZIP files.
- Introduced a new `archive.py` for creating deployable ZIP archives of CrewAI projects, ensuring local artifacts are excluded.
- Updated `run_crew.py` to manage JSON crew dependencies and run crews in the project's environment.
- Enhanced deployment logic in `main.py` to handle ZIP uploads and improve user feedback during deployment processes.
- Added tests for new functionalities and ensured existing tests reflect recent changes in behavior and requirements.
This commit is contained in:
Joao Moura
2026-06-15 01:15:45 -07:00
parent bb477f8a91
commit bc248c2b9c
15 changed files with 1248 additions and 72 deletions

View File

@@ -84,7 +84,7 @@ description = "{name} using crewAI"
authors = [{{ name = "Your Name", email = "you@example.com" }}]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=1.15"
"crewai[tools]>=1.14.7"
]
[build-system]

View File

@@ -0,0 +1,307 @@
from __future__ import annotations
from pathlib import Path
import re
import shutil
import tempfile
from typing import Any
import zipfile
from crewai_cli import git
from crewai_cli.utils import parse_toml
_EXCLUDED_DIRS = {
".crewai",
".git",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".tox",
".venv",
"__pycache__",
"build",
"dist",
"env",
"venv",
}
_EXCLUDED_FILES = {
".DS_Store",
".env",
}
_ALLOWED_ENV_EXAMPLES = {
".env.example",
".env.sample",
}
_EXCLUDED_SUFFIXES = {
".pyc",
".pyo",
}
def create_project_zip(
project_name: str,
*,
project_dir: Path | None = None,
repository: git.Repository | None = None,
) -> Path:
"""Create a deployable ZIP archive for a CrewAI project."""
root = (project_dir or Path.cwd()).resolve()
files = _project_files(root, repository)
if not files:
raise ValueError("No deployable project files were found.")
staged_root = _stage_project(root, files)
archive_handle = tempfile.NamedTemporaryFile(
prefix=f"{project_name}-",
suffix=".zip",
delete=False,
)
archive_path = Path(archive_handle.name)
archive_handle.close()
try:
with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zip_file:
for relative_path in _walk_files(staged_root):
absolute_path = staged_root / relative_path
zip_file.write(absolute_path, relative_path.as_posix())
finally:
shutil.rmtree(staged_root, ignore_errors=True)
return archive_path
def _project_files(root: Path, repository: git.Repository | None = None) -> list[Path]:
if repository is not None:
try:
files = [Path(path) for path in repository.deployable_files()]
return [
path
for path in files
if not _is_excluded(path) and (root / path).is_file()
]
except Exception: # noqa: S110
pass
return [
path
for path in _walk_files(root)
if not _is_excluded(path) and (root / path).is_file()
]
def _walk_files(root: Path) -> list[Path]:
return [path.relative_to(root) for path in root.rglob("*") if path.is_file()]
def _is_excluded(path: Path) -> bool:
parts = set(path.parts)
if parts.intersection(_EXCLUDED_DIRS):
return True
name = path.name
if name in _EXCLUDED_FILES:
return True
if name.startswith(".env.") and name not in _ALLOWED_ENV_EXAMPLES:
return True
return path.suffix in _EXCLUDED_SUFFIXES
def _stage_project(root: Path, files: list[Path]) -> Path:
staging_root = Path(tempfile.mkdtemp(prefix="crewai-deploy-"))
try:
for relative_path in files:
destination = staging_root / relative_path
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(root / relative_path, destination)
if _is_json_crew_project(staging_root):
_add_json_crew_deploy_wrapper(staging_root)
except Exception:
shutil.rmtree(staging_root, ignore_errors=True)
raise
return staging_root
def _is_json_crew_project(root: Path) -> bool:
if not ((root / "crew.jsonc").is_file() or (root / "crew.json").is_file()):
return False
project = _read_pyproject(root)
tool_config = project.get("tool") or {}
if not isinstance(tool_config, dict):
return False
crewai_config = tool_config.get("crewai") or {}
if not isinstance(crewai_config, dict):
return False
declared_type = crewai_config.get("type")
if declared_type == "flow":
return False
package_name = _package_name(root)
if package_name is None:
return False
return not (root / "src" / package_name / "crew.py").is_file()
def _read_pyproject(root: Path) -> dict[str, Any]:
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return {}
try:
return parse_toml(pyproject_path.read_text())
except Exception:
return {}
def _package_name(root: Path) -> str | None:
project = _read_pyproject(root).get("project")
if not isinstance(project, dict):
return None
name = project.get("name")
if not isinstance(name, str) or not name.strip():
return None
folder = name.replace(" ", "_").replace("-", "_").lower()
return re.sub(r"[^a-zA-Z0-9_]", "", folder)
def _class_name(package_name: str) -> str:
parts = [part for part in re.split(r"[^a-zA-Z0-9]+", package_name) if part]
class_name = "".join(part[:1].upper() + part[1:] for part in parts)
if not class_name:
return "JsonCrew"
if class_name[0].isdigit():
return f"Crew{class_name}"
return class_name
def _add_json_crew_deploy_wrapper(root: Path) -> None:
package_name = _package_name(root)
if package_name is None:
return
package_dir = root / "src" / package_name
config_dir = package_dir / "config"
config_dir.mkdir(parents=True, exist_ok=True)
class_name = _class_name(package_name)
crew_filename = "crew.jsonc" if (root / "crew.jsonc").is_file() else "crew.json"
(package_dir / "__init__.py").write_text("", encoding="utf-8")
(config_dir / "agents.yaml").write_text("{}\n", encoding="utf-8")
(config_dir / "tasks.yaml").write_text("{}\n", encoding="utf-8")
(package_dir / "crew.py").write_text(
_json_crew_py(class_name, crew_filename),
encoding="utf-8",
)
(package_dir / "main.py").write_text(
_json_main_py(package_name, class_name),
encoding="utf-8",
)
_ensure_project_scripts(root, package_name)
def _json_crew_py(class_name: str, crew_filename: str) -> str:
return f'''from pathlib import Path
from crewai import Crew
from crewai.project import CrewBase, crew
from crewai.project.crew_loader import load_crew
def _crew_path() -> Path:
return Path(__file__).resolve().parents[2] / "{crew_filename}"
@CrewBase
class {class_name}:
"""Compatibility wrapper for a JSON-defined CrewAI project."""
@crew
def crew(self) -> Crew:
crew_instance, default_inputs = load_crew(_crew_path())
self.default_inputs = default_inputs
return crew_instance
'''
def _json_main_py(package_name: str, class_name: str) -> str:
return f"""#!/usr/bin/env python
import json
import sys
from {package_name}.crew import {class_name}
def _load():
wrapper = {class_name}()
crew = wrapper.crew()
return crew, getattr(wrapper, "default_inputs", {{}})
def run():
crew, inputs = _load()
return crew.kickoff(inputs=inputs)
def train():
crew, inputs = _load()
return crew.train(
n_iterations=int(sys.argv[1]),
filename=sys.argv[2],
inputs=inputs,
)
def replay():
crew, _ = _load()
return crew.replay(task_id=sys.argv[1])
def test():
crew, inputs = _load()
return crew.test(
n_iterations=int(sys.argv[1]),
eval_llm=sys.argv[2],
inputs=inputs,
)
def run_with_trigger():
if len(sys.argv) < 2:
raise ValueError("No trigger payload provided.")
crew, inputs = _load()
trigger_payload = json.loads(sys.argv[1])
return crew.kickoff(
inputs={{**inputs, "crewai_trigger_payload": trigger_payload}}
)
"""
def _ensure_project_scripts(root: Path, package_name: str) -> None:
pyproject_path = root / "pyproject.toml"
if not pyproject_path.is_file():
return
content = pyproject_path.read_text(encoding="utf-8")
if "[project.scripts]" in content:
return
script_block = f'''
[project.scripts]
{package_name} = "{package_name}.main:run"
run_crew = "{package_name}.main:run"
train = "{package_name}.main:train"
replay = "{package_name}.main:replay"
test = "{package_name}.main:test"
run_with_trigger = "{package_name}.main:run_with_trigger"
'''
pyproject_path.write_text(content.rstrip() + script_block, encoding="utf-8")

View File

@@ -5,6 +5,7 @@ from rich.console import Console
from crewai_cli import git
from crewai_cli.command import BaseCommand, PlusAPIMixin
from crewai_cli.deploy.archive import create_project_zip
from crewai_cli.deploy.validate import validate_project
from crewai_cli.utils import fetch_and_json_env_file, get_project_name
@@ -37,34 +38,24 @@ def _run_predeploy_validation(skip_validate: bool) -> bool:
def _display_git_repository_help() -> None:
"""Explain how to prepare a new project for deployment."""
console.print(
"Deployment requires a Git repository with an origin remote.",
style="bold red",
"Initialized a local Git repository and created an initial commit.",
style="green",
)
console.print(
"CrewAI AMP deploys from the remote repository URL, so commit and push "
"this project first, then run deploy again.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git init")
console.print(" git add .")
console.print(' git commit -m "Initial crew"')
console.print(" git branch -M main")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin main")
def _display_git_remote_help() -> None:
"""Explain how to add a remote to an existing Git repository."""
console.print("No remote repository URL found.", style="bold red")
"""Explain that ZIP deployment will be used without an origin remote."""
console.print(
"CrewAI AMP deploys from the origin remote. Add a remote, push your "
"latest commit, then run deploy again.",
"No origin remote found. Deploying from a ZIP upload instead.",
style="yellow",
)
console.print("\nSuggested setup:")
console.print(" git remote add origin <your-repo-url>")
console.print(" git push -u origin HEAD")
def _env_summary(env_vars: dict[str, str]) -> str:
if not env_vars:
return "0 env vars"
keys = ", ".join(sorted(env_vars))
return f"{len(env_vars)} env vars: {keys}"
class DeployCommand(BaseCommand, PlusAPIMixin):
@@ -129,10 +120,20 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
return
self._telemetry.start_deployment_span(uuid)
console.print("Starting deployment...", style="bold blue")
if uuid:
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
if remote_repo_url and uuid:
response = self.plus_api_client.deploy_by_uuid(uuid)
elif self.project_name:
elif remote_repo_url and self.project_name:
response = self.plus_api_client.deploy_by_name(self.project_name)
elif uuid:
_display_git_remote_help()
response = self._update_crew_from_zip(uuid, repository)
elif self.project_name:
_display_git_remote_help()
deployment_uuid = self._deployment_uuid_by_name()
response = self._update_crew_from_zip(deployment_uuid, repository)
else:
self._standard_no_param_error_message()
return
@@ -140,6 +141,18 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._validate_response(response)
self._display_deployment_info(response.json())
def _deployment_uuid_by_name(self) -> str:
if not self.project_name:
raise ValueError("project_name is required to find a deployment")
response = self.plus_api_client.crew_status_by_name(self.project_name)
self._validate_response(response)
json_response = response.json()
uuid = json_response.get("uuid")
if not uuid:
raise ValueError("Deployment status response did not include a uuid")
return str(uuid)
def create_crew(self, confirm: bool = False, skip_validate: bool = False) -> None:
"""
Create a new crew deployment.
@@ -153,24 +166,98 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._telemetry.create_crew_deployment_span()
console.print("Creating deployment...", style="bold blue")
env_vars = fetch_and_json_env_file()
repository = self._prepare_git_repository()
remote_repo_url = repository.origin_url() if repository else None
try:
remote_repo_url = git.Repository().origin_url()
except ValueError:
_display_git_repository_help()
return
if remote_repo_url is None:
if remote_repo_url:
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
else:
_display_git_remote_help()
return
self._confirm_input(env_vars, remote_repo_url, confirm)
payload = self._create_payload(env_vars, remote_repo_url)
response = self.plus_api_client.create_crew(payload)
response = self._create_crew_from_zip(env_vars, repository, confirm)
self._validate_response(response)
self._display_creation_success(response.json())
def _prepare_git_repository(self) -> git.Repository | None:
try:
repository = git.Repository()
except ValueError as exc:
if "not a Git repository" not in str(exc):
console.print(
f"{exc} Continuing with ZIP deployment.",
style="yellow",
)
return None
try:
repository = git.Repository.initialize()
except Exception as init_error:
console.print(
"Git auto-setup did not complete. Continuing with ZIP deployment.",
style="yellow",
)
console.print(str(init_error), style="dim")
return None
_display_git_repository_help()
return repository
try:
if repository.create_initial_commit_if_needed():
console.print(
"Created an initial Git commit for this project.",
style="green",
)
except Exception as commit_error:
console.print(
"Could not create an initial Git commit. Continuing with ZIP deployment.",
style="yellow",
)
console.print(str(commit_error), style="dim")
return None
return repository
def _create_crew_from_zip(
self,
env_vars: dict[str, str],
repository: git.Repository | None,
confirm: bool,
) -> Any:
if not self.project_name:
raise ValueError("project_name is required to create a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
self._confirm_zip_input(env_vars, confirm)
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.create_crew_from_zip(
zip_file_path,
name=self.project_name,
env=env_vars,
)
finally:
zip_file_path.unlink(missing_ok=True)
def _update_crew_from_zip(
self,
uuid: str,
repository: git.Repository | None,
) -> Any:
if not self.project_name:
raise ValueError("project_name is required to update a ZIP deployment")
console.print("Preparing project ZIP...", style="bold blue")
zip_file_path = create_project_zip(self.project_name, repository=repository)
try:
console.print("Uploading project ZIP...", style="bold blue")
return self.plus_api_client.update_crew_from_zip(uuid, zip_file_path)
finally:
zip_file_path.unlink(missing_ok=True)
def _confirm_input(
self, env_vars: dict[str, str], remote_repo_url: str, confirm: bool
) -> None:
@@ -183,11 +270,15 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
confirm (bool): Whether to confirm input.
"""
if not confirm:
input(f"Press Enter to continue with the following Env vars: {env_vars}")
input(f"Press Enter to continue with {_env_summary(env_vars)}")
input(
f"Press Enter to continue with the following remote repository: {remote_repo_url}\n"
)
def _confirm_zip_input(self, env_vars: dict[str, str], confirm: bool) -> None:
if not confirm:
input(f"Press Enter to continue with {_env_summary(env_vars)}")
def _create_payload(
self,
env_vars: dict[str, str],

View File

@@ -1,9 +1,29 @@
from __future__ import annotations
from functools import cached_property
from pathlib import Path
import subprocess
_INITIAL_COMMIT_EXCLUDE_PATTERNS = [
".crewai/",
".env",
".env.*",
".mypy_cache/",
".pytest_cache/",
".ruff_cache/",
".tox/",
".venv/",
"__pycache__/",
"build/",
"dist/",
"env/",
"venv/",
]
class Repository:
def __init__(self, path: str = ".") -> None:
def __init__(self, path: str = ".", fetch: bool = True) -> None:
self.path = path
if not self.is_git_installed():
@@ -12,7 +32,8 @@ class Repository:
if not self.is_git_repo:
raise ValueError(f"{self.path} is not a Git repository.")
self.fetch()
if fetch:
self.fetch()
@staticmethod
def is_git_installed() -> bool:
@@ -30,7 +51,33 @@ class Repository:
def fetch(self) -> None:
"""Fetch latest updates from the remote."""
subprocess.run(["git", "fetch"], cwd=self.path, check=True) # noqa: S607
result = subprocess.run(
["git", "fetch"], # noqa: S607
cwd=self.path,
capture_output=True,
text=True,
)
if result.returncode == 0:
return
if "No remote repository specified" in result.stderr:
return
raise subprocess.CalledProcessError(
result.returncode,
["git", "fetch"],
output=result.stdout,
stderr=result.stderr,
)
@classmethod
def initialize(cls, path: str = ".") -> Repository:
"""Initialize a Git repository and create an initial commit if needed."""
if not cls.is_git_installed():
raise ValueError("Git is not installed or not found in your PATH.")
subprocess.run(["git", "init"], cwd=path, check=True) # noqa: S607
repository = cls(path=path, fetch=False)
repository.create_initial_commit_if_needed()
return repository
def status(self) -> str:
"""Get the git status in porcelain format."""
@@ -71,6 +118,74 @@ class Repository:
return False
return True
def has_commits(self) -> bool:
"""Return True if the repository has at least one commit."""
try:
subprocess.run(
["git", "rev-parse", "--verify", "HEAD"], # noqa: S607
cwd=self.path,
capture_output=True,
check=True,
text=True,
)
return True
except subprocess.CalledProcessError:
return False
def create_initial_commit_if_needed(self) -> bool:
"""Create a local initial commit when the repository has no commits."""
if self.has_commits():
return False
self._ensure_initial_commit_excludes()
subprocess.run(["git", "add", "."], cwd=self.path, check=True) # noqa: S607
command = [
"git",
"-c",
"user.name=CrewAI",
"-c",
"user.email=deploy@crewai.com",
"commit",
"--allow-empty",
"-m",
"Initial crew",
]
subprocess.run( # noqa: S603
command,
cwd=self.path,
check=True,
)
return True
def _ensure_initial_commit_excludes(self) -> None:
"""Add local-only ignore patterns before auto-staging an initial commit."""
exclude_file = Path(self.path) / ".git" / "info" / "exclude"
exclude_file.parent.mkdir(parents=True, exist_ok=True)
existing = exclude_file.read_text() if exclude_file.exists() else ""
existing_lines = set(existing.splitlines())
missing_patterns = [
pattern
for pattern in _INITIAL_COMMIT_EXCLUDE_PATTERNS
if pattern not in existing_lines
]
if not missing_patterns:
return
prefix = "" if existing.endswith("\n") or not existing else "\n"
patterns = "\n".join(missing_patterns)
exclude_file.write_text(
f"{existing}{prefix}# CrewAI deploy auto-commit excludes\n{patterns}\n"
)
def deployable_files(self) -> list[str]:
"""Return files tracked by Git or untracked and not ignored."""
output = subprocess.check_output(
["git", "ls-files", "--cached", "--others", "--exclude-standard"], # noqa: S607
cwd=self.path,
encoding="utf-8",
)
return [line for line in output.splitlines() if line]
def origin_url(self) -> str | None:
"""Get the Git repository's remote URL."""
try:

View File

@@ -9,7 +9,7 @@ from crewai_cli.utils import build_env_with_all_tool_credentials
# on some environments we don't use this command but instead uv sync directly
# so if you expect this to support more things you will need to replicate it there
# ask @joaomdmoura if you are unsure
def install_crew(proxy_options: list[str]) -> None:
def install_crew(proxy_options: list[str], *, raise_on_error: bool = False) -> None:
"""
Install the crew by running the UV command to lock and install.
"""
@@ -22,11 +22,21 @@ def install_crew(proxy_options: list[str]) -> None:
# project depends on tools from a private index.
env = build_env_with_all_tool_credentials()
subprocess.run(command, check=True, capture_output=False, text=True, env=env) # noqa: S603
subprocess.run( # noqa: S603
command,
check=True,
capture_output=False,
text=True,
env=env,
)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True)
if raise_on_error:
raise
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
if raise_on_error:
raise

View File

@@ -1,12 +1,92 @@
"""Re-export of ``crewai_core.plus_api.PlusAPI``.
Kept as a stable import path for the CLI; new code should import from
``crewai_core.plus_api`` directly.
"""
"""CrewAI CLI API client extensions."""
from __future__ import annotations
from crewai_core.plus_api import PlusAPI as PlusAPI
from pathlib import Path
from typing import Any, Literal, cast
from urllib.parse import urljoin
from crewai_core.plus_api import PlusAPI as _CorePlusAPI
import httpx
HttpMethod = Literal["GET", "POST", "PATCH", "DELETE"]
class PlusAPI(_CorePlusAPI):
"""CLI API client.
The ZIP deployment methods live here as well as in newer crewai-core
versions so editable CLI installs still work when an older crewai-core is
present in the runtime environment.
"""
def _make_multipart_request(
self,
method: HttpMethod,
endpoint: str,
*,
zip_file_path: str | Path,
data: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
url = urljoin(self.base_url, endpoint)
headers = dict(cast(dict[str, str], self.headers))
headers.pop("Content-Type", None)
path = Path(zip_file_path)
request_kwargs: dict[str, Any] = {"headers": headers}
if data is not None:
request_kwargs["data"] = data
if timeout is not None:
request_kwargs["timeout"] = timeout
with (
path.open("rb") as file_handle,
httpx.Client(trust_env=False, verify=verify) as client,
):
files = {
"zip_file": (path.name, file_handle, "application/zip"),
}
return client.request(method, url, files=files, **request_kwargs)
def create_crew_from_zip(
self,
zip_file_path: str | Path,
*,
name: str | None = None,
env: dict[str, str] | None = None,
) -> httpx.Response:
data: dict[str, str] = {}
if name:
data["name"] = name
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/zip",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def update_crew_from_zip(
self,
uuid: str,
zip_file_path: str | Path,
*,
env: dict[str, str] | None = None,
) -> httpx.Response:
data: dict[str, str] = {}
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/{uuid}/zip_update",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
__all__ = ["PlusAPI"]

View File

@@ -35,6 +35,9 @@ class CrewType(Enum):
# crewai.utilities.string_utils (_VARIABLE_PATTERN), including hyphens —
# otherwise placeholders are interpolated at runtime but never prompted for.
_INPUT_PLACEHOLDER_RE = re.compile(r"(?<!{){([A-Za-z_][A-Za-z0-9_\-]*)}(?!})")
_JSON_CREW_RUNNER_CODE = (
"from crewai_cli.run_crew import _run_json_crew; _run_json_crew()"
)
def _has_json_crew() -> bool:
@@ -216,6 +219,49 @@ def _run_json_crew(trained_agents_file: str | None = None) -> Any:
return app._crew_result
def _install_json_crew_dependencies() -> None:
"""Lock and sync JSON crew projects before loading them in-process."""
if not (Path.cwd() / "pyproject.toml").is_file():
return
from crewai_cli.install_crew import install_crew
try:
click.echo("Installing dependencies...")
install_crew([], raise_on_error=True)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
raise SystemExit(1) from e
def _run_json_crew_in_project_env(trained_agents_file: str | None = None) -> Any:
"""Run JSON crews from the project's uv-managed environment."""
if not (Path.cwd() / "pyproject.toml").is_file():
return _run_json_crew(trained_agents_file=trained_agents_file)
_install_json_crew_dependencies()
command = ["uv", "run", "--no-sync", "python", "-c", _JSON_CREW_RUNNER_CODE]
env = build_env_with_all_tool_credentials()
if trained_agents_file:
env[CREWAI_TRAINED_AGENTS_FILE_ENV] = trained_agents_file
try:
subprocess.run( # noqa: S603
command,
capture_output=False,
text=True,
check=True,
env=env,
)
except subprocess.CalledProcessError as e:
raise SystemExit(e.returncode) from e
except Exception as e:
click.echo(f"An unexpected error occurred while running the JSON crew: {e}")
raise SystemExit(1) from e
def _chain_deploy() -> None:
from rich.console import Console
@@ -224,14 +270,14 @@ def _chain_deploy() -> None:
from crewai_cli.deploy.main import DeployCommand
console.print("\nStarting deployment…\n", style="bold #FF5A50")
DeployCommand().create_crew(confirm=False, skip_validate=True)
DeployCommand().create_crew(confirm=True, skip_validate=True)
except SystemExit:
from crewai_cli.authentication.main import AuthenticationCommand
console.print()
AuthenticationCommand().login()
try:
DeployCommand().create_crew(confirm=False, skip_validate=True)
DeployCommand().create_crew(confirm=True, skip_validate=True)
except Exception as e:
console.print(f"\nDeploy failed: {e}\n", style="bold red")
except Exception as e:
@@ -315,7 +361,7 @@ def run_crew(trained_agents_file: str | None = None) -> None:
"""
# JSON crew projects take precedence
if _has_json_crew():
_run_json_crew(trained_agents_file=trained_agents_file)
_run_json_crew_in_project_env(trained_agents_file=trained_agents_file)
return
crewai_version = get_crewai_version()

View File

@@ -0,0 +1,99 @@
from pathlib import Path
import zipfile
from crewai_cli.deploy.archive import create_project_zip
def test_create_project_zip_excludes_local_artifacts(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
(tmp_path / "src").mkdir()
(tmp_path / "src" / "main.py").write_text("print('hello')\n")
(tmp_path / ".env").write_text("OPENAI_API_KEY=secret\n")
(tmp_path / ".env.example").write_text("OPENAI_API_KEY=\n")
(tmp_path / "__pycache__").mkdir()
(tmp_path / "__pycache__" / "main.pyc").write_bytes(b"compiled")
(tmp_path / ".git").mkdir()
(tmp_path / ".git" / "config").write_text("[core]\n")
archive_path = create_project_zip("demo", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
archive_path.unlink(missing_ok=True)
assert names == {
"pyproject.toml",
"uv.lock",
"src/main.py",
".env.example",
}
def test_create_project_zip_uses_repository_file_list(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
(tmp_path / "uv.lock").write_text("# lock\n")
(tmp_path / "ignored.txt").write_text("ignored\n")
class RepositoryStub:
def deployable_files(self) -> list[str]:
return ["pyproject.toml", "uv.lock"]
archive_path = create_project_zip(
"demo",
project_dir=tmp_path,
repository=RepositoryStub(), # type: ignore[arg-type]
)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
finally:
archive_path.unlink(missing_ok=True)
assert names == {"pyproject.toml", "uv.lock"}
def test_create_project_zip_adds_json_project_wrapper(tmp_path: Path):
(tmp_path / "pyproject.toml").write_text(
"""
[project]
name = "json_crew"
version = "0.1.0"
dependencies = ["crewai[tools]>=1.15"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.crewai]
type = "crew"
""".strip()
+ "\n"
)
(tmp_path / "agents").mkdir()
(tmp_path / "agents" / "researcher.jsonc").write_text("{}\n")
(tmp_path / "crew.jsonc").write_text("{}\n")
archive_path = create_project_zip("json_crew", project_dir=tmp_path)
try:
with zipfile.ZipFile(archive_path) as archive:
names = set(archive.namelist())
crew_py = archive.read("src/json_crew/crew.py").decode()
main_py = archive.read("src/json_crew/main.py").decode()
pyproject = archive.read("pyproject.toml").decode()
finally:
archive_path.unlink(missing_ok=True)
assert "uv.lock" not in names
assert "crew.jsonc" in names
assert "agents/researcher.jsonc" in names
assert "src/json_crew/__init__.py" in names
assert "src/json_crew/crew.py" in names
assert "src/json_crew/main.py" in names
assert "src/json_crew/config/agents.yaml" in names
assert "src/json_crew/config/tasks.yaml" in names
assert "load_crew(_crew_path())" in crew_py
assert "JsonCrew" in crew_py
assert "from json_crew.crew import JsonCrew" in main_py
assert "run_crew = \"json_crew.main:run\"" in pyproject

View File

@@ -1,6 +1,7 @@
import sys
import unittest
from io import StringIO
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
import pytest
@@ -35,6 +36,12 @@ class TestDeployCommand(unittest.TestCase):
self.assertEqual(self.deploy_command.project_name, "test_project")
self.mock_plus_api.assert_called_once_with(api_key="test_token")
@patch("builtins.input")
def test_confirm_zip_input_only_confirms_env_vars(self, mock_input):
self.deploy_command._confirm_zip_input({"MODEL": "openai/gpt-5"}, False)
mock_input.assert_called_once_with("Press Enter to continue with 1 env vars: MODEL")
@patch("crewai_cli.command.get_auth_token")
def test_init_failure(self, mock_get_auth_token):
mock_get_auth_token.side_effect = Exception("Auth failed")
@@ -123,8 +130,15 @@ class TestDeployCommand(unittest.TestCase):
)
self.assertIn("2023-01-01 - INFO: Test log", fake_out.getvalue())
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_uuid(self, mock_display):
def test_deploy_with_uuid(self, mock_display, mock_repository):
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
@@ -135,8 +149,15 @@ class TestDeployCommand(unittest.TestCase):
self.mock_client.deploy_by_uuid.assert_called_once_with("test-uuid")
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_project_name(self, mock_display):
def test_deploy_with_project_name(self, mock_display, mock_repository):
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
@@ -147,17 +168,77 @@ class TestDeployCommand(unittest.TestCase):
self.mock_client.deploy_by_name.assert_called_once_with("test_project")
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_uuid_without_remote_updates_from_zip(
self, mock_display, mock_repository, mock_create_project_zip
):
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.update_crew_from_zip.return_value = mock_response
self.deploy_command.deploy(uuid="test-uuid", skip_validate=True)
self.mock_client.update_crew_from_zip.assert_called_once_with(
"test-uuid", Path("/tmp/test_project.zip")
)
self.mock_client.deploy_by_uuid.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("crewai_cli.deploy.main.DeployCommand._display_deployment_info")
def test_deploy_with_project_name_without_remote_updates_from_zip(
self, mock_display, mock_repository, mock_create_project_zip
):
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
status_response = MagicMock()
status_response.status_code = 200
status_response.is_success = True
status_response.json.return_value = {"uuid": "test-uuid"}
update_response = MagicMock()
update_response.status_code = 200
update_response.json.return_value = {"uuid": "test-uuid"}
self.mock_client.crew_status_by_name.return_value = status_response
self.mock_client.update_crew_from_zip.return_value = update_response
self.deploy_command.deploy(skip_validate=True)
self.mock_client.crew_status_by_name.assert_called_once_with("test_project")
self.mock_client.update_crew_from_zip.assert_called_once_with(
"test-uuid", Path("/tmp/test_project.zip")
)
self.mock_client.deploy_by_name.assert_not_called()
mock_display.assert_called_once_with({"uuid": "test-uuid"})
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository.origin_url")
@patch("crewai_cli.deploy.main.git.Repository")
@patch("builtins.input")
@pytest.mark.timeout(180)
def test_create_crew(self, mock_input, mock_git_origin_url, mock_fetch_env):
def test_create_crew(self, mock_input, mock_repository, mock_fetch_env):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_git_origin_url.return_value = "https://github.com/test/repo.git"
mock_repository.return_value.origin_url.return_value = (
"https://github.com/test/repo.git"
)
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_input.return_value = ""
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "new-uuid", "status": "created"}
self.mock_client.create_crew.return_value = mock_response
@@ -166,38 +247,73 @@ class TestDeployCommand(unittest.TestCase):
self.assertIn("Deployment created successfully!", fake_out.getvalue())
self.assertIn("new-uuid", fake_out.getvalue())
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
def test_create_crew_without_git_repo_shows_setup_help(
self, mock_repository, mock_fetch_env
def test_create_crew_without_git_repo_initializes_and_uses_zip(
self, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.side_effect = ValueError("not a Git repository")
initialized_repository = MagicMock()
initialized_repository.origin_url.return_value = None
mock_repository.initialize.return_value = initialized_repository
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "zip-uuid", "status": "created"}
self.mock_client.create_crew_from_zip.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.create_crew(skip_validate=True)
self.deploy_command.create_crew(confirm=True, skip_validate=True)
output = fake_out.getvalue()
self.assertIn("Deployment requires a Git repository", output)
self.assertIn("git init", output)
self.assertIn("git remote add origin <your-repo-url>", output)
self.assertIn("Initialized a local Git repository", output)
self.assertIn("Deploying from a ZIP upload", output)
mock_repository.initialize.assert_called_once_with()
mock_create_project_zip.assert_called_once_with(
"test_project", repository=initialized_repository
)
self.mock_client.create_crew_from_zip.assert_called_once_with(
Path("/tmp/test_project.zip"),
name="test_project",
env={"ENV_VAR": "value"},
)
self.mock_client.create_crew.assert_not_called()
@patch("crewai_cli.deploy.main.create_project_zip")
@patch("crewai_cli.deploy.main.fetch_and_json_env_file")
@patch("crewai_cli.deploy.main.git.Repository")
def test_create_crew_without_remote_shows_remote_help(
self, mock_repository, mock_fetch_env
def test_create_crew_without_remote_uses_zip(
self, mock_repository, mock_fetch_env, mock_create_project_zip
):
mock_fetch_env.return_value = {"ENV_VAR": "value"}
mock_repository.return_value.origin_url.return_value = None
mock_repository.return_value.create_initial_commit_if_needed.return_value = (
False
)
mock_create_project_zip.return_value = Path("/tmp/test_project.zip")
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.is_success = True
mock_response.json.return_value = {"uuid": "zip-uuid", "status": "created"}
self.mock_client.create_crew_from_zip.return_value = mock_response
with patch("sys.stdout", new=StringIO()) as fake_out:
self.deploy_command.create_crew(skip_validate=True)
self.deploy_command.create_crew(confirm=True, skip_validate=True)
output = fake_out.getvalue()
self.assertIn("No remote repository URL found.", output)
self.assertIn("git remote add origin <your-repo-url>", output)
self.assertIn("git push -u origin HEAD", output)
self.assertIn("No origin remote found.", output)
self.assertIn("Deploying from a ZIP upload", output)
mock_create_project_zip.assert_called_once_with(
"test_project", repository=mock_repository.return_value
)
self.mock_client.create_crew_from_zip.assert_called_once_with(
Path("/tmp/test_project.zip"),
name="test_project",
env={"ENV_VAR": "value"},
)
self.mock_client.create_crew.assert_not_called()
def test_list_crews(self):

View File

@@ -83,8 +83,8 @@ def test_chain_deploy_skips_validation_after_auth_retry(monkeypatch) -> None:
run_crew._chain_deploy()
assert create_calls == [
{"confirm": False, "skip_validate": True},
{"confirm": False, "skip_validate": True},
{"confirm": True, "skip_validate": True},
{"confirm": True, "skip_validate": True},
]
assert login_calls == [True]

View File

@@ -99,3 +99,43 @@ def test_origin_url(fp, repository):
stdout="https://github.com/user/repo.git\n",
)
assert repository.origin_url() == "https://github.com/user/repo.git"
def test_initialize_creates_initial_commit(fp, tmp_path):
fp.register(["git", "--version"], stdout="git version 2.30.0\n")
fp.register(["git", "init"], stdout="")
fp.register(["git", "--version"], stdout="git version 2.30.0\n")
fp.register(["git", "rev-parse", "--is-inside-work-tree"], stdout="true\n")
fp.register(["git", "rev-parse", "--verify", "HEAD"], returncode=1)
fp.register(["git", "add", "."], stdout="")
fp.register(
[
"git",
"-c",
"user.name=CrewAI",
"-c",
"user.email=deploy@crewai.com",
"commit",
"--allow-empty",
"-m",
"Initial crew",
],
stdout="",
)
repo = Repository.initialize(path=str(tmp_path))
assert repo.path == str(tmp_path)
exclude_file = tmp_path / ".git" / "info" / "exclude"
exclude_text = exclude_file.read_text()
assert ".env" in exclude_text
assert "__pycache__/" in exclude_text
def test_deployable_files_uses_git_excludes(fp, repository):
fp.register(
["git", "ls-files", "--cached", "--others", "--exclude-standard"],
stdout="pyproject.toml\nsrc/main.py\n",
)
assert repository.deployable_files() == ["pyproject.toml", "src/main.py"]

View File

@@ -292,6 +292,36 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_cli.plus_api.PlusAPI._make_multipart_request")
def test_create_crew_from_zip(self, mock_make_multipart_request):
self.api.create_crew_from_zip(
"/tmp/test.zip",
name="test_crew",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/zip",
zip_file_path="/tmp/test.zip",
data={"name": "test_crew", "env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_cli.plus_api.PlusAPI._make_multipart_request")
def test_update_crew_from_zip(self, mock_make_multipart_request):
self.api.update_crew_from_zip(
"test_uuid",
"/tmp/test.zip",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/test_uuid/zip_update",
zip_file_path="/tmp/test.zip",
data={"env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):

View File

@@ -2,6 +2,7 @@
import os
from pathlib import Path
import subprocess
import pytest
from crewai_core.constants import CREWAI_TRAINED_AGENTS_FILE_ENV
@@ -14,16 +15,159 @@ def test_run_crew_forwards_trained_agents_file_to_json_crews(monkeypatch):
monkeypatch.setattr(run_crew_module, "_has_json_crew", lambda: True)
called: dict = {}
def fake_run_json_crew(trained_agents_file=None):
def fake_run_json_crew_in_project_env(trained_agents_file=None):
called["trained_agents_file"] = trained_agents_file
monkeypatch.setattr(run_crew_module, "_run_json_crew", fake_run_json_crew)
monkeypatch.setattr(
run_crew_module,
"_run_json_crew_in_project_env",
fake_run_json_crew_in_project_env,
)
run_crew_module.run_crew(trained_agents_file="some.pkl")
assert called == {"trained_agents_file": "some.pkl"}
def test_json_run_uses_project_env_when_pyproject_exists(monkeypatch, tmp_path: Path):
"""JSON crew runs should execute inside the project uv environment."""
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
install_calls = []
subprocess_calls = []
monkeypatch.setattr(
run_crew_module,
"_install_json_crew_dependencies",
lambda: install_calls.append(True),
)
monkeypatch.setattr(
run_crew_module,
"build_env_with_all_tool_credentials",
lambda: {"EXISTING": "value"},
)
def fake_subprocess_run(command, **kwargs):
subprocess_calls.append((command, kwargs))
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
run_crew_module._run_json_crew_in_project_env(
trained_agents_file="trained.pkl"
)
assert install_calls == [True]
assert subprocess_calls == [
(
[
"uv",
"run",
"--no-sync",
"python",
"-c",
run_crew_module._JSON_CREW_RUNNER_CODE,
],
{
"capture_output": False,
"text": True,
"check": True,
"env": {
"EXISTING": "value",
CREWAI_TRAINED_AGENTS_FILE_ENV: "trained.pkl",
},
},
)
]
def test_json_run_without_pyproject_runs_in_process(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
called: dict = {}
def fake_run_json_crew(trained_agents_file=None):
called["trained_agents_file"] = trained_agents_file
return "result"
monkeypatch.setattr(run_crew_module, "_run_json_crew", fake_run_json_crew)
assert (
run_crew_module._run_json_crew_in_project_env(
trained_agents_file="trained.pkl"
)
== "result"
)
assert called == {"trained_agents_file": "trained.pkl"}
def test_json_project_env_run_failure_exits_nonzero(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
monkeypatch.setattr(run_crew_module, "_install_json_crew_dependencies", lambda: None)
monkeypatch.setattr(
run_crew_module, "build_env_with_all_tool_credentials", lambda: {}
)
def fake_subprocess_run(command, **kwargs):
raise subprocess.CalledProcessError(7, command)
monkeypatch.setattr(run_crew_module.subprocess, "run", fake_subprocess_run)
with pytest.raises(SystemExit) as exc_info:
run_crew_module._run_json_crew_in_project_env()
assert exc_info.value.code == 7
def test_json_run_installs_dependencies_when_pyproject_exists(
monkeypatch, tmp_path: Path
):
"""JSON crew runs should lock/sync project dependencies before loading."""
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
calls = []
def fake_install_crew(proxy_options, *, raise_on_error=False):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies()
assert calls == [([], True)]
def test_json_run_skips_dependency_install_without_pyproject(
monkeypatch, tmp_path: Path
):
monkeypatch.chdir(tmp_path)
calls = []
def fake_install_crew(proxy_options, *, raise_on_error=False):
calls.append((proxy_options, raise_on_error))
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
run_crew_module._install_json_crew_dependencies()
assert calls == []
def test_json_run_install_failure_exits_nonzero(monkeypatch, tmp_path: Path):
monkeypatch.chdir(tmp_path)
(tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\n")
def fake_install_crew(proxy_options, *, raise_on_error=False):
raise subprocess.CalledProcessError(42, ["uv", "sync"])
monkeypatch.setattr("crewai_cli.install_crew.install_crew", fake_install_crew)
with pytest.raises(SystemExit) as exc_info:
run_crew_module._install_json_crew_dependencies()
assert exc_info.value.code == 42
def test_run_json_crew_exports_trained_agents_env(monkeypatch, tmp_path: Path):
"""JSON crews run in-process, so the pickle path must land in the env var."""
monkeypatch.chdir(tmp_path)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Final, Literal, TypedDict, cast
from urllib.parse import urljoin
@@ -190,6 +191,35 @@ class PlusAPI:
with httpx.Client(trust_env=False, verify=verify) as client:
return client.request(method, url, **request_kwargs)
def _make_multipart_request(
self,
method: HttpMethod,
endpoint: str,
*,
zip_file_path: str | Path,
data: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
url = urljoin(self.base_url, endpoint)
headers = dict(cast(dict[str, str], self.headers))
headers.pop("Content-Type", None)
path = Path(zip_file_path)
request_kwargs: dict[str, Any] = {"headers": headers}
if data is not None:
request_kwargs["data"] = data
if timeout is not None:
request_kwargs["timeout"] = timeout
with (
path.open("rb") as file_handle,
httpx.Client(trust_env=False, verify=verify) as client,
):
files = {
"zip_file": (path.name, file_handle, "application/zip"),
}
return client.request(method, url, files=files, **request_kwargs)
def login_to_tool_repository(
self, user_identifier: str | None = None
) -> httpx.Response:
@@ -312,6 +342,44 @@ class PlusAPI:
def create_crew(self, payload: CreateCrewPayload) -> httpx.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def create_crew_from_zip(
self,
zip_file_path: str | Path,
*,
name: str | None = None,
env: dict[str, str] | None = None,
) -> httpx.Response:
data: dict[str, str] = {}
if name:
data["name"] = name
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/zip",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def update_crew_from_zip(
self,
uuid: str,
zip_file_path: str | Path,
*,
env: dict[str, str] | None = None,
) -> httpx.Response:
data: dict[str, str] = {}
if env:
data.update({f"env[{key}]": value for key, value in env.items()})
return self._make_multipart_request(
"POST",
f"{self.CREWS_RESOURCE}/{uuid}/zip_update",
zip_file_path=zip_file_path,
data=data or None,
timeout=300,
)
def get_organizations(self) -> httpx.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)

View File

@@ -345,6 +345,36 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_core.plus_api.PlusAPI._make_multipart_request")
def test_create_crew_from_zip(self, mock_make_multipart_request):
self.api.create_crew_from_zip(
"/tmp/test.zip",
name="test_crew",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/zip",
zip_file_path="/tmp/test.zip",
data={"name": "test_crew", "env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.PlusAPI._make_multipart_request")
def test_update_crew_from_zip(self, mock_make_multipart_request):
self.api.update_crew_from_zip(
"test_uuid",
"/tmp/test.zip",
env={"ENV_VAR": "value"},
)
mock_make_multipart_request.assert_called_once_with(
"POST",
"/crewai_plus/api/v1/crews/test_uuid/zip_update",
zip_file_path="/tmp/test.zip",
data={"env[ENV_VAR]": "value"},
timeout=300,
)
@patch("crewai_core.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):