diff --git a/lib/crewai/tests/test_ci_check_classifier.py b/lib/crewai/tests/test_ci_check_classifier.py new file mode 100644 index 000000000..265f28dd0 --- /dev/null +++ b/lib/crewai/tests/test_ci_check_classifier.py @@ -0,0 +1,443 @@ +"""Tests for the deterministic CI check-state classifier. + +Covers every category defined in the acceptance criteria for issue #4576: + - passed + - failed + - pending + - no_checks + - policy_blocked + +Also validates that source check metadata is retained for audit/review, +and that the output contract (JSON shape) is stable. +""" + +from __future__ import annotations + +import importlib.util +import json +import sys +from pathlib import Path +from typing import Any + +import pytest + + +# --------------------------------------------------------------------------- +# Dynamic import of the classifier script from ``scripts/`` +# --------------------------------------------------------------------------- + +_SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "classify_ci_checks.py" + +_spec = importlib.util.spec_from_file_location("classify_ci_checks", _SCRIPT_PATH) +assert _spec is not None and _spec.loader is not None +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +classify = _mod.classify +main = _mod.main +PASSED = _mod.PASSED +FAILED = _mod.FAILED +PENDING = _mod.PENDING +NO_CHECKS = _mod.NO_CHECKS +POLICY_BLOCKED = _mod.POLICY_BLOCKED +ALL_STATES = _mod.ALL_STATES + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_check_run( + name: str = "ci", + status: str = "completed", + conclusion: str = "success", + started_at: str = "2026-01-01T00:00:00Z", + completed_at: str = "2026-01-01T00:05:00Z", +) -> dict[str, Any]: + """Build a minimal GitHub check-run dict.""" + return { + "name": name, + "status": status, + "conclusion": conclusion, + "started_at": started_at, + "completed_at": completed_at, + } + + +def _make_commit_status( + context: str = "ci/status", + state: str = "success", + updated_at: str = "2026-01-01T00:05:00Z", +) -> dict[str, str]: + """Build a minimal GitHub commit-status dict.""" + return { + "context": context, + "state": state, + "updated_at": updated_at, + } + + +# =================================================================== +# Category: no_checks +# =================================================================== + + +class TestNoChecks: + """When there are zero check runs and zero statuses -> ``no_checks``.""" + + def test_empty_check_runs_list(self) -> None: + result = classify({"check_runs": []}) + assert result["state"] == NO_CHECKS + assert result["total"] == 0 + + def test_empty_payload(self) -> None: + result = classify({}) + assert result["state"] == NO_CHECKS + assert result["total"] == 0 + + def test_empty_check_runs_and_statuses(self) -> None: + result = classify({"check_runs": [], "statuses": []}) + assert result["state"] == NO_CHECKS + assert result["total"] == 0 + + def test_summary_message(self) -> None: + result = classify({"check_runs": []}) + assert "No CI checks found" in result["summary"] + + +# =================================================================== +# Category: passed +# =================================================================== + + +class TestPassed: + """All checks completed successfully -> ``passed``.""" + + def test_single_success(self) -> None: + result = classify({"check_runs": [_make_check_run()]}) + assert result["state"] == PASSED + assert result["total"] == 1 + + def test_multiple_successes(self) -> None: + runs = [ + _make_check_run(name="lint"), + _make_check_run(name="tests (3.10)"), + _make_check_run(name="tests (3.12)"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == PASSED + assert result["total"] == 3 + + def test_neutral_conclusion_counts_as_passed(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="neutral")]} + ) + assert result["state"] == PASSED + + def test_skipped_conclusion_counts_as_passed(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="skipped")]} + ) + assert result["state"] == PASSED + + def test_commit_status_success(self) -> None: + result = classify( + {"check_runs": [], "statuses": [_make_commit_status(state="success")]} + ) + assert result["state"] == PASSED + + def test_mixed_check_runs_and_statuses_all_pass(self) -> None: + result = classify({ + "check_runs": [_make_check_run(name="build")], + "statuses": [_make_commit_status(context="deploy", state="success")], + }) + assert result["state"] == PASSED + assert result["total"] == 2 + + +# =================================================================== +# Category: failed +# =================================================================== + + +class TestFailed: + """At least one check failed -> ``failed``.""" + + def test_single_failure(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="failure")]} + ) + assert result["state"] == FAILED + + def test_timed_out(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="timed_out")]} + ) + assert result["state"] == FAILED + + def test_cancelled(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="cancelled")]} + ) + assert result["state"] == FAILED + + def test_startup_failure(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="startup_failure")]} + ) + assert result["state"] == FAILED + + def test_failure_among_successes(self) -> None: + runs = [ + _make_check_run(name="lint"), + _make_check_run(name="tests", conclusion="failure"), + _make_check_run(name="build"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == FAILED + assert result["total"] == 3 + + def test_commit_status_failure(self) -> None: + result = classify( + {"check_runs": [], "statuses": [_make_commit_status(state="failure")]} + ) + assert result["state"] == FAILED + + def test_commit_status_error(self) -> None: + result = classify( + {"check_runs": [], "statuses": [_make_commit_status(state="error")]} + ) + assert result["state"] == FAILED + + def test_failed_overrides_pending(self) -> None: + """Failed takes precedence over pending.""" + runs = [ + _make_check_run(name="lint", status="in_progress", conclusion=""), + _make_check_run(name="tests", conclusion="failure"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == FAILED + + +# =================================================================== +# Category: pending +# =================================================================== + + +class TestPending: + """At least one check still in progress or queued -> ``pending``.""" + + def test_queued(self) -> None: + result = classify( + {"check_runs": [_make_check_run(status="queued", conclusion="")]} + ) + assert result["state"] == PENDING + + def test_in_progress(self) -> None: + result = classify( + {"check_runs": [_make_check_run(status="in_progress", conclusion="")]} + ) + assert result["state"] == PENDING + + def test_waiting(self) -> None: + result = classify( + {"check_runs": [_make_check_run(status="waiting", conclusion="")]} + ) + assert result["state"] == PENDING + + def test_pending_among_successes(self) -> None: + runs = [ + _make_check_run(name="lint"), + _make_check_run(name="tests", status="in_progress", conclusion=""), + ] + result = classify({"check_runs": runs}) + assert result["state"] == PENDING + + def test_commit_status_pending(self) -> None: + result = classify( + {"check_runs": [], "statuses": [_make_commit_status(state="pending")]} + ) + assert result["state"] == PENDING + + +# =================================================================== +# Category: policy_blocked +# =================================================================== + + +class TestPolicyBlocked: + """A check requires manual action -> ``policy_blocked``.""" + + def test_action_required(self) -> None: + result = classify( + {"check_runs": [_make_check_run(conclusion="action_required")]} + ) + assert result["state"] == POLICY_BLOCKED + + def test_policy_blocked_overrides_failed(self) -> None: + """policy_blocked has highest priority after no_checks.""" + runs = [ + _make_check_run(name="lint", conclusion="failure"), + _make_check_run(name="review", conclusion="action_required"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == POLICY_BLOCKED + + def test_policy_blocked_overrides_pending(self) -> None: + runs = [ + _make_check_run(name="build", status="in_progress", conclusion=""), + _make_check_run(name="policy", conclusion="action_required"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == POLICY_BLOCKED + + +# =================================================================== +# Output contract / metadata retention +# =================================================================== + + +class TestOutputContract: + """The JSON output has a stable shape and retains source metadata.""" + + def test_result_keys(self) -> None: + result = classify({"check_runs": [_make_check_run()]}) + assert set(result.keys()) == {"state", "total", "summary", "checks"} + + def test_state_is_a_known_value(self) -> None: + for conclusion in ("success", "failure", "action_required"): + result = classify({"check_runs": [_make_check_run(conclusion=conclusion)]}) + assert result["state"] in ALL_STATES + + def test_check_metadata_retained(self) -> None: + cr = _make_check_run(name="my-job", conclusion="success") + result = classify({"check_runs": [cr]}) + meta = result["checks"][0] + assert meta["name"] == "my-job" + assert meta["status"] == "completed" + assert meta["conclusion"] == "success" + assert meta["started_at"] == "2026-01-01T00:00:00Z" + assert meta["completed_at"] == "2026-01-01T00:05:00Z" + + def test_commit_status_metadata_retained(self) -> None: + cs = _make_commit_status(context="ci/deploy", state="success") + result = classify({"check_runs": [], "statuses": [cs]}) + meta = result["checks"][0] + assert meta["name"] == "ci/deploy" + assert meta["status"] == "success" + + def test_result_is_json_serialisable(self) -> None: + result = classify({ + "check_runs": [_make_check_run()], + "statuses": [_make_commit_status()], + }) + roundtripped = json.loads(json.dumps(result)) + assert roundtripped == result + + def test_total_matches_checks_length(self) -> None: + runs = [_make_check_run(name=f"job-{i}") for i in range(5)] + result = classify({"check_runs": runs}) + assert result["total"] == len(result["checks"]) == 5 + + +# =================================================================== +# CLI entry-point (main) +# =================================================================== + + +class TestCLI: + """Test the ``main()`` function that wraps classify for CLI use.""" + + def test_exit_code_passed(self, tmp_path: Path) -> None: + payload = {"check_runs": [_make_check_run()]} + f = tmp_path / "input.json" + f.write_text(json.dumps(payload)) + assert main([str(f)]) == 0 + + def test_exit_code_failed(self, tmp_path: Path) -> None: + payload = {"check_runs": [_make_check_run(conclusion="failure")]} + f = tmp_path / "input.json" + f.write_text(json.dumps(payload)) + assert main([str(f)]) == 1 + + def test_exit_code_pending(self, tmp_path: Path) -> None: + payload = {"check_runs": [_make_check_run(status="queued", conclusion="")]} + f = tmp_path / "input.json" + f.write_text(json.dumps(payload)) + assert main([str(f)]) == 2 + + def test_exit_code_no_checks(self, tmp_path: Path) -> None: + payload = {"check_runs": []} + f = tmp_path / "input.json" + f.write_text(json.dumps(payload)) + assert main([str(f)]) == 2 + + def test_exit_code_policy_blocked(self, tmp_path: Path) -> None: + payload = {"check_runs": [_make_check_run(conclusion="action_required")]} + f = tmp_path / "input.json" + f.write_text(json.dumps(payload)) + assert main([str(f)]) == 1 + + def test_invalid_json_returns_error(self, tmp_path: Path) -> None: + f = tmp_path / "bad.json" + f.write_text("NOT JSON") + assert main([str(f)]) == 1 + + def test_missing_file_returns_error(self) -> None: + assert main(["/nonexistent/path.json"]) == 1 + + +# =================================================================== +# Edge cases +# =================================================================== + + +class TestEdgeCases: + """Boundary and edge-case scenarios.""" + + def test_check_run_with_missing_fields(self) -> None: + """Gracefully handles check runs that omit optional fields.""" + result = classify({"check_runs": [{"status": "completed", "conclusion": "success"}]}) + assert result["state"] == PASSED + meta = result["checks"][0] + assert meta["name"] == "" + assert meta["started_at"] == "" + + def test_case_insensitive_conclusion(self) -> None: + """Conclusion strings are normalised to lowercase.""" + result = classify( + {"check_runs": [_make_check_run(conclusion="FAILURE")]} + ) + assert result["state"] == FAILED + + def test_case_insensitive_status(self) -> None: + result = classify( + {"check_runs": [_make_check_run(status="IN_PROGRESS", conclusion="")]} + ) + assert result["state"] == PENDING + + def test_stale_conclusion_is_not_failure(self) -> None: + """``stale`` is a non-blocking conclusion.""" + result = classify( + {"check_runs": [_make_check_run(conclusion="stale")]} + ) + assert result["state"] == PASSED + + def test_large_number_of_checks(self) -> None: + """Classifier handles many checks without error.""" + runs = [_make_check_run(name=f"job-{i}") for i in range(500)] + result = classify({"check_runs": runs}) + assert result["state"] == PASSED + assert result["total"] == 500 + + def test_mixed_all_states(self) -> None: + """When all state types are present, policy_blocked wins.""" + runs = [ + _make_check_run(name="pass", conclusion="success"), + _make_check_run(name="fail", conclusion="failure"), + _make_check_run(name="pend", status="queued", conclusion=""), + _make_check_run(name="block", conclusion="action_required"), + ] + result = classify({"check_runs": runs}) + assert result["state"] == POLICY_BLOCKED + assert result["total"] == 4 diff --git a/scripts/classify_ci_checks.py b/scripts/classify_ci_checks.py new file mode 100644 index 000000000..5cfeeefb8 --- /dev/null +++ b/scripts/classify_ci_checks.py @@ -0,0 +1,293 @@ +"""Deterministic CI check-state classifier for CrewAI PR triage. + +Normalizes raw GitHub CI check data into a deterministic JSON contract +so that cross-repo planning and execution can rely on a stable, +machine-readable CI-state output. + +Categories +---------- +- ``passed`` -- every check completed successfully +- ``failed`` -- at least one check failed, timed-out, or was cancelled +- ``pending`` -- at least one check is still queued or in progress +- ``no_checks`` -- the PR has no associated check runs or commit statuses +- ``policy_blocked`` -- at least one check requires manual action (e.g. review) + +Usage +----- +Pipe JSON from the GitHub Checks API (or a compatible payload) into stdin:: + + gh api repos/{owner}/{repo}/commits/{ref}/check-runs | python scripts/classify_ci_checks.py + +Or supply a file path as the first positional argument:: + + python scripts/classify_ci_checks.py checks.json + +The script prints a single JSON object to stdout and exits with code 0 for +``passed``, 1 for ``failed``/``policy_blocked``, and 2 for ``pending``/``no_checks``. + +Example output:: + + { + "state": "failed", + "total": 3, + "summary": "1 failed, 2 passed (3 total)", + "checks": [ + { + "name": "tests (3.12)", + "status": "completed", + "conclusion": "failure", + "started_at": "2026-02-24T10:00:00Z", + "completed_at": "2026-02-24T10:05:00Z" + }, + ... + ] + } +""" + +from __future__ import annotations + +import json +import sys +from typing import Any + + +# --------------------------------------------------------------------------- +# Public state constants +# --------------------------------------------------------------------------- + +PASSED: str = "passed" +FAILED: str = "failed" +PENDING: str = "pending" +NO_CHECKS: str = "no_checks" +POLICY_BLOCKED: str = "policy_blocked" + +ALL_STATES: frozenset[str] = frozenset( + {PASSED, FAILED, PENDING, NO_CHECKS, POLICY_BLOCKED} +) + +# --------------------------------------------------------------------------- +# Internal mapping helpers +# --------------------------------------------------------------------------- + +# GitHub check-run conclusions that map to *failed* +_FAILED_CONCLUSIONS: frozenset[str] = frozenset( + {"failure", "timed_out", "cancelled", "startup_failure"} +) + +# GitHub check-run conclusions that map to *policy_blocked* +_POLICY_CONCLUSIONS: frozenset[str] = frozenset({"action_required"}) + +# GitHub check-run statuses that map to *pending* +_PENDING_STATUSES: frozenset[str] = frozenset({"queued", "in_progress", "waiting"}) + +# GitHub commit-status states that map to *failed* +_FAILED_COMMIT_STATES: frozenset[str] = frozenset({"failure", "error"}) + +# GitHub commit-status states that map to *pending* +_PENDING_COMMIT_STATES: frozenset[str] = frozenset({"pending"}) + + +# --------------------------------------------------------------------------- +# Core classifier +# --------------------------------------------------------------------------- + + +def _extract_check_metadata(check: dict[str, Any]) -> dict[str, Any]: + """Extract audit-relevant metadata from a single check run or status. + + Parameters + ---------- + check: + A single check-run or commit-status object from the GitHub API. + + Returns + ------- + dict: + Normalized metadata dict with name, status, conclusion, and timestamps. + """ + return { + "name": check.get("name") or check.get("context") or "", + "status": check.get("status") or check.get("state") or "", + "conclusion": check.get("conclusion") or "", + "started_at": check.get("started_at") or "", + "completed_at": check.get("completed_at") or check.get("updated_at") or "", + } + + +def classify(payload: dict[str, Any]) -> dict[str, Any]: + """Classify CI check data into a deterministic state. + + Accepts the JSON body returned by the GitHub ``check-runs`` endpoint + (which wraps runs in ``{"total_count": N, "check_runs": [...]}``), + a plain list of check-run objects, or a combined payload that also + includes commit statuses under the ``statuses`` key. + + Parameters + ---------- + payload: + Raw GitHub API response or a dict with ``check_runs`` and/or + ``statuses`` lists. + + Returns + ------- + dict: + Deterministic JSON-serialisable result with keys ``state``, + ``total``, ``summary``, and ``checks`` (source metadata). + + Examples + -------- + >>> result = classify({"check_runs": [], "statuses": []}) + >>> result["state"] + 'no_checks' + + >>> result = classify({ + ... "check_runs": [ + ... {"name": "lint", "status": "completed", "conclusion": "success"} + ... ] + ... }) + >>> result["state"] + 'passed' + """ + # Normalise input: accept top-level list or wrapped object + if isinstance(payload.get("check_runs"), list): + check_runs: list[dict[str, Any]] = payload["check_runs"] + elif isinstance(payload, list): # type: ignore[arg-type] + check_runs = payload # type: ignore[assignment] + else: + check_runs = [] + + statuses: list[dict[str, Any]] = payload.get("statuses", []) if isinstance(payload, dict) else [] + + all_metadata: list[dict[str, Any]] = [] + has_policy_blocked = False + has_failed = False + has_pending = False + + # --- Classify check runs --- + for cr in check_runs: + all_metadata.append(_extract_check_metadata(cr)) + status = (cr.get("status") or "").lower() + conclusion = (cr.get("conclusion") or "").lower() + + if conclusion in _POLICY_CONCLUSIONS: + has_policy_blocked = True + elif conclusion in _FAILED_CONCLUSIONS: + has_failed = True + elif status in _PENDING_STATUSES: + has_pending = True + # completed + success/neutral/skipped/stale → not a problem + + # --- Classify commit statuses --- + for cs in statuses: + all_metadata.append(_extract_check_metadata(cs)) + state = (cs.get("state") or "").lower() + + if state in _FAILED_COMMIT_STATES: + has_failed = True + elif state in _PENDING_COMMIT_STATES: + has_pending = True + + # --- Determine aggregate state (priority order) --- + total = len(all_metadata) + + if total == 0: + state = NO_CHECKS + elif has_policy_blocked: + state = POLICY_BLOCKED + elif has_failed: + state = FAILED + elif has_pending: + state = PENDING + else: + state = PASSED + + return { + "state": state, + "total": total, + "summary": _build_summary(state, all_metadata), + "checks": all_metadata, + } + + +def _build_summary(state: str, checks: list[dict[str, Any]]) -> str: + """Build a human-readable one-line summary. + + Parameters + ---------- + state: + The classified state string. + checks: + List of normalized check metadata dicts. + + Returns + ------- + str: + Human-readable summary string. + """ + total = len(checks) + if total == 0: + return "No CI checks found" + + # Count by conclusion/status bucket + counts: dict[str, int] = {} + for c in checks: + conclusion = c.get("conclusion", "") + status = c.get("status", "") + # Use conclusion if available, otherwise status + bucket = conclusion if conclusion else status + if not bucket: + bucket = "unknown" + counts[bucket] = counts.get(bucket, 0) + 1 + + parts = [f"{v} {k}" for k, v in sorted(counts.items())] + return f"{', '.join(parts)} ({total} total)" + + +# --------------------------------------------------------------------------- +# CLI entry-point +# --------------------------------------------------------------------------- + +# Exit codes aligned to state severity +_EXIT_CODES: dict[str, int] = { + PASSED: 0, + FAILED: 1, + POLICY_BLOCKED: 1, + PENDING: 2, + NO_CHECKS: 2, +} + + +def main(argv: list[str] | None = None) -> int: + """CLI entry-point: read JSON from *stdin* or a file and classify. + + Parameters + ---------- + argv: + Command-line arguments (default: ``sys.argv[1:]``). + + Returns + ------- + int: + Exit code (0 = passed, 1 = failed/blocked, 2 = pending/no checks). + """ + args = argv if argv is not None else sys.argv[1:] + + try: + if args: + with open(args[0]) as fh: + raw = fh.read() + else: + raw = sys.stdin.read() + + payload = json.loads(raw) + except (json.JSONDecodeError, OSError) as exc: + print(json.dumps({"error": str(exc)}), file=sys.stderr) # noqa: T201 + return 1 + + result = classify(payload) + print(json.dumps(result, indent=2)) # noqa: T201 + return _EXIT_CODES.get(result["state"], 1) + + +if __name__ == "__main__": + raise SystemExit(main())