from __future__ import annotations from pathlib import Path import re import shutil import tempfile from typing import Any import zipfile from crewai_cli import git from crewai_cli.deploy.validate import normalize_package_name from crewai_cli.utils import parse_toml _EXCLUDED_DIRS = { ".crewai", ".git", ".mypy_cache", ".pytest_cache", ".ruff_cache", ".tox", ".venv", "__pycache__", "build", "dist", "env", "venv", } _EXCLUDED_FILES = { ".DS_Store", ".env", } _ALLOWED_ENV_EXAMPLES = { ".env.example", ".env.sample", } _EXCLUDED_SUFFIXES = { ".pyc", ".pyo", } _SCRIPT_KEY_PATTERN = re.compile(r"^\s*(?P[A-Za-z0-9_.-]+|\"[^\"]+\"|'[^']+')\s*=") _SECTION_PATTERN = re.compile(r"^\s*\[[^\]]+\]\s*(?:#.*)?$") def create_project_zip( project_name: str, *, project_dir: Path | None = None, repository: git.Repository | None = None, ) -> Path: """Create a deployable ZIP archive for a CrewAI project.""" root = (project_dir or Path.cwd()).resolve() files = _project_files(root, repository) if not files: raise ValueError("No deployable project files were found.") staged_root = _stage_project(root, files) archive_handle = tempfile.NamedTemporaryFile( prefix=f"{project_name}-", suffix=".zip", delete=False, ) archive_path = Path(archive_handle.name) archive_handle.close() try: with zipfile.ZipFile(archive_path, "w", zipfile.ZIP_DEFLATED) as zip_file: for relative_path in _walk_files(staged_root): absolute_path = staged_root / relative_path zip_file.write(absolute_path, relative_path.as_posix()) finally: shutil.rmtree(staged_root, ignore_errors=True) return archive_path def _project_files(root: Path, repository: git.Repository | None = None) -> list[Path]: """Return project-relative files to include in the archive.""" if repository is not None: return _repository_project_files(root, repository) try: repository = git.Repository(path=str(root), fetch=False) except ValueError: repository = None if repository is not None: return _repository_project_files(root, repository) return [ path for path in _walk_files(root) if not _is_excluded(path) and _is_regular_file(root / path) ] def _repository_project_files(root: Path, repository: git.Repository) -> list[Path]: """Return deployable files from Git while applying local safety excludes.""" files = [Path(path) for path in repository.deployable_files()] return [ path for path in files if not _is_excluded(path) and _is_regular_file(root / path) ] def _walk_files(root: Path) -> list[Path]: """List regular files below root as project-relative paths.""" return [ path.relative_to(root) for path in root.rglob("*") if _is_regular_file(path) ] def _is_regular_file(path: Path) -> bool: """Return True for regular files, excluding symlinks to files.""" return path.is_file() and not path.is_symlink() def _is_excluded(path: Path) -> bool: """Return True when a file should be omitted from deployment ZIPs.""" parts = set(path.parts) if parts.intersection(_EXCLUDED_DIRS): return True name = path.name if name in _EXCLUDED_FILES: return True if name.startswith(".env.") and name not in _ALLOWED_ENV_EXAMPLES: return True return path.suffix in _EXCLUDED_SUFFIXES def _stage_project(root: Path, files: list[Path]) -> Path: """Copy archive files into a temporary staging directory.""" staging_root = Path(tempfile.mkdtemp(prefix="crewai-deploy-")) try: for relative_path in files: source = root / relative_path if not _is_regular_file(source): continue destination = staging_root / relative_path destination.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, destination) if _is_json_crew_project(staging_root): _add_json_crew_deploy_wrapper(staging_root) except Exception: shutil.rmtree(staging_root, ignore_errors=True) raise return staging_root def _is_json_crew_project(root: Path) -> bool: """Return True for JSON crew projects that need a Python deploy wrapper.""" if not ((root / "crew.jsonc").is_file() or (root / "crew.json").is_file()): return False project = _read_pyproject(root) tool_config = project.get("tool") or {} crewai_config = tool_config.get("crewai") if isinstance(tool_config, dict) else None declared_type = ( crewai_config.get("type") if isinstance(crewai_config, dict) else None ) if declared_type == "flow": return False package_name = _package_name(root) if package_name is None: raise ValueError( "Could not derive a valid Python package name from [project].name." ) return not (root / "src" / package_name / "crew.py").is_file() def _read_pyproject(root: Path) -> dict[str, Any]: """Read pyproject.toml, returning an empty mapping on missing or invalid data.""" pyproject_path = root / "pyproject.toml" if not pyproject_path.is_file(): return {} try: pyproject = parse_toml(pyproject_path.read_text()) except Exception: return {} return pyproject if isinstance(pyproject, dict) else {} def _package_name(root: Path) -> str | None: """Return the normalized Python package name for the project.""" project = _read_pyproject(root).get("project") if not isinstance(project, dict): return None name = project.get("name") if not isinstance(name, str) or not name.strip(): return None package_name = normalize_package_name(name) return package_name or None def _class_name(package_name: str) -> str: """Return the generated wrapper class name for a package.""" parts = [part for part in re.split(r"[^a-zA-Z0-9]+", package_name) if part] class_name = "".join(part[:1].upper() + part[1:] for part in parts) if not class_name: return "JsonCrew" if class_name[0].isdigit(): return f"Crew{class_name}" return class_name def _add_json_crew_deploy_wrapper(root: Path) -> None: """Add Python wrapper files required to deploy a JSON crew project.""" package_name = _package_name(root) if package_name is None: raise ValueError( "Could not derive a valid Python package name from [project].name." ) package_dir = root / "src" / package_name config_dir = package_dir / "config" config_dir.mkdir(parents=True, exist_ok=True) class_name = _class_name(package_name) crew_filename = "crew.jsonc" if (root / "crew.jsonc").is_file() else "crew.json" (package_dir / "__init__.py").write_text("", encoding="utf-8") (config_dir / "agents.yaml").write_text("{}\n", encoding="utf-8") (config_dir / "tasks.yaml").write_text("{}\n", encoding="utf-8") (package_dir / "crew.py").write_text( _json_crew_py(class_name, crew_filename), encoding="utf-8", ) (package_dir / "main.py").write_text( _json_main_py(package_name, class_name), encoding="utf-8", ) _ensure_project_scripts(root, package_name) def _json_crew_py(class_name: str, crew_filename: str) -> str: """Render the generated crew.py module for a JSON crew.""" return f'''from pathlib import Path from crewai import Crew from crewai.project import CrewBase, crew from crewai.project.crew_loader import load_crew def _crew_path() -> Path: return Path(__file__).resolve().parents[2] / "{crew_filename}" @CrewBase class {class_name}: """Compatibility wrapper for a JSON-defined CrewAI project.""" @crew def crew(self) -> Crew: crew_instance, default_inputs = load_crew(_crew_path()) self.default_inputs = default_inputs return crew_instance ''' def _json_main_py(package_name: str, class_name: str) -> str: """Render the generated main.py entrypoints for a JSON crew.""" return f"""#!/usr/bin/env python import json import sys from {package_name}.crew import {class_name} def _load(): wrapper = {class_name}() crew = wrapper.crew() return crew, getattr(wrapper, "default_inputs", {{}}) def run(): crew, inputs = _load() return crew.kickoff(inputs=inputs) def train(): crew, inputs = _load() return crew.train( n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs, ) def replay(): crew, _ = _load() return crew.replay(task_id=sys.argv[1]) def test(): crew, inputs = _load() return crew.test( n_iterations=int(sys.argv[1]), eval_llm=sys.argv[2], inputs=inputs, ) def run_with_trigger(): if len(sys.argv) < 2: raise ValueError("No trigger payload provided.") crew, inputs = _load() trigger_payload = json.loads(sys.argv[1]) return crew.kickoff( inputs={{**inputs, "crewai_trigger_payload": trigger_payload}} ) """ def _ensure_project_scripts(root: Path, package_name: str) -> None: """Ensure generated wrappers have project script entrypoints.""" pyproject_path = root / "pyproject.toml" if not pyproject_path.is_file(): return content = pyproject_path.read_text(encoding="utf-8") entries = _project_script_entries(package_name) pyproject_path.write_text( _update_project_scripts(content, entries), encoding="utf-8", ) def _project_script_entries(package_name: str) -> dict[str, str]: """Return script entrypoints required by the generated JSON wrapper.""" return { package_name: f"{package_name}.main:run", "run_crew": f"{package_name}.main:run", "train": f"{package_name}.main:train", "replay": f"{package_name}.main:replay", "test": f"{package_name}.main:test", "run_with_trigger": f"{package_name}.main:run_with_trigger", } def _update_project_scripts(content: str, entries: dict[str, str]) -> str: """Add or replace generated script entries in pyproject.toml content.""" lines = content.rstrip().splitlines() header_index = _project_scripts_header_index(lines) if header_index is None: return content.rstrip() + _project_scripts_block(entries) end_index = _section_end_index(lines, header_index + 1) seen: set[str] = set() for index in range(header_index + 1, end_index): key = _script_key(lines[index]) if key in entries: lines[index] = _script_line(key, entries[key]) seen.add(key) missing_lines = [ _script_line(key, value) for key, value in entries.items() if key not in seen ] lines[end_index:end_index] = missing_lines return "\n".join(lines).rstrip() + "\n" def _project_scripts_header_index(lines: list[str]) -> int | None: """Return the line index of the project scripts table, if present.""" for index, line in enumerate(lines): if line.strip() == "[project.scripts]": return index return None def _section_end_index(lines: list[str], start_index: int) -> int: """Return the exclusive end index for a TOML table section.""" for index in range(start_index, len(lines)): if _SECTION_PATTERN.match(lines[index]): return index return len(lines) def _script_key(line: str) -> str | None: """Return the script key for a pyproject script line.""" match = _SCRIPT_KEY_PATTERN.match(line) if not match: return None key = match.group("key") if key.startswith(("'", '"')) and key.endswith(("'", '"')): return key[1:-1] return key def _script_line(key: str, value: str) -> str: """Render a project script TOML entry.""" return f'{key} = "{value}"' def _project_scripts_block(entries: dict[str, str]) -> str: """Render a project scripts TOML table.""" lines = ["", "", "[project.scripts]"] lines.extend(_script_line(key, value) for key, value in entries.items()) return "\n".join(lines) + "\n"