Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
6bdad873a0 feat: add deterministic CI check-state classifier for PR triage
Closes #4576

- Add scripts/classify_ci_checks.py with deterministic JSON state output
- Categories: passed, failed, pending, no_checks, policy_blocked
- Source check metadata retained for audit/review
- Handles both GitHub check runs and commit statuses
- CLI entry-point with meaningful exit codes
- Add 45 automated tests covering all categories and edge cases

Co-Authored-By: João <joao@crewai.com>
2026-02-24 11:53:20 +00:00
2 changed files with 736 additions and 0 deletions

View File

@@ -0,0 +1,443 @@
"""Tests for the deterministic CI check-state classifier.
Covers every category defined in the acceptance criteria for issue #4576:
- passed
- failed
- pending
- no_checks
- policy_blocked
Also validates that source check metadata is retained for audit/review,
and that the output contract (JSON shape) is stable.
"""
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
from typing import Any
import pytest
# ---------------------------------------------------------------------------
# Dynamic import of the classifier script from ``scripts/``
# ---------------------------------------------------------------------------
_SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "classify_ci_checks.py"
_spec = importlib.util.spec_from_file_location("classify_ci_checks", _SCRIPT_PATH)
assert _spec is not None and _spec.loader is not None
_mod = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
classify = _mod.classify
main = _mod.main
PASSED = _mod.PASSED
FAILED = _mod.FAILED
PENDING = _mod.PENDING
NO_CHECKS = _mod.NO_CHECKS
POLICY_BLOCKED = _mod.POLICY_BLOCKED
ALL_STATES = _mod.ALL_STATES
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_check_run(
name: str = "ci",
status: str = "completed",
conclusion: str = "success",
started_at: str = "2026-01-01T00:00:00Z",
completed_at: str = "2026-01-01T00:05:00Z",
) -> dict[str, Any]:
"""Build a minimal GitHub check-run dict."""
return {
"name": name,
"status": status,
"conclusion": conclusion,
"started_at": started_at,
"completed_at": completed_at,
}
def _make_commit_status(
context: str = "ci/status",
state: str = "success",
updated_at: str = "2026-01-01T00:05:00Z",
) -> dict[str, str]:
"""Build a minimal GitHub commit-status dict."""
return {
"context": context,
"state": state,
"updated_at": updated_at,
}
# ===================================================================
# Category: no_checks
# ===================================================================
class TestNoChecks:
"""When there are zero check runs and zero statuses -> ``no_checks``."""
def test_empty_check_runs_list(self) -> None:
result = classify({"check_runs": []})
assert result["state"] == NO_CHECKS
assert result["total"] == 0
def test_empty_payload(self) -> None:
result = classify({})
assert result["state"] == NO_CHECKS
assert result["total"] == 0
def test_empty_check_runs_and_statuses(self) -> None:
result = classify({"check_runs": [], "statuses": []})
assert result["state"] == NO_CHECKS
assert result["total"] == 0
def test_summary_message(self) -> None:
result = classify({"check_runs": []})
assert "No CI checks found" in result["summary"]
# ===================================================================
# Category: passed
# ===================================================================
class TestPassed:
"""All checks completed successfully -> ``passed``."""
def test_single_success(self) -> None:
result = classify({"check_runs": [_make_check_run()]})
assert result["state"] == PASSED
assert result["total"] == 1
def test_multiple_successes(self) -> None:
runs = [
_make_check_run(name="lint"),
_make_check_run(name="tests (3.10)"),
_make_check_run(name="tests (3.12)"),
]
result = classify({"check_runs": runs})
assert result["state"] == PASSED
assert result["total"] == 3
def test_neutral_conclusion_counts_as_passed(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="neutral")]}
)
assert result["state"] == PASSED
def test_skipped_conclusion_counts_as_passed(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="skipped")]}
)
assert result["state"] == PASSED
def test_commit_status_success(self) -> None:
result = classify(
{"check_runs": [], "statuses": [_make_commit_status(state="success")]}
)
assert result["state"] == PASSED
def test_mixed_check_runs_and_statuses_all_pass(self) -> None:
result = classify({
"check_runs": [_make_check_run(name="build")],
"statuses": [_make_commit_status(context="deploy", state="success")],
})
assert result["state"] == PASSED
assert result["total"] == 2
# ===================================================================
# Category: failed
# ===================================================================
class TestFailed:
"""At least one check failed -> ``failed``."""
def test_single_failure(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="failure")]}
)
assert result["state"] == FAILED
def test_timed_out(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="timed_out")]}
)
assert result["state"] == FAILED
def test_cancelled(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="cancelled")]}
)
assert result["state"] == FAILED
def test_startup_failure(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="startup_failure")]}
)
assert result["state"] == FAILED
def test_failure_among_successes(self) -> None:
runs = [
_make_check_run(name="lint"),
_make_check_run(name="tests", conclusion="failure"),
_make_check_run(name="build"),
]
result = classify({"check_runs": runs})
assert result["state"] == FAILED
assert result["total"] == 3
def test_commit_status_failure(self) -> None:
result = classify(
{"check_runs": [], "statuses": [_make_commit_status(state="failure")]}
)
assert result["state"] == FAILED
def test_commit_status_error(self) -> None:
result = classify(
{"check_runs": [], "statuses": [_make_commit_status(state="error")]}
)
assert result["state"] == FAILED
def test_failed_overrides_pending(self) -> None:
"""Failed takes precedence over pending."""
runs = [
_make_check_run(name="lint", status="in_progress", conclusion=""),
_make_check_run(name="tests", conclusion="failure"),
]
result = classify({"check_runs": runs})
assert result["state"] == FAILED
# ===================================================================
# Category: pending
# ===================================================================
class TestPending:
"""At least one check still in progress or queued -> ``pending``."""
def test_queued(self) -> None:
result = classify(
{"check_runs": [_make_check_run(status="queued", conclusion="")]}
)
assert result["state"] == PENDING
def test_in_progress(self) -> None:
result = classify(
{"check_runs": [_make_check_run(status="in_progress", conclusion="")]}
)
assert result["state"] == PENDING
def test_waiting(self) -> None:
result = classify(
{"check_runs": [_make_check_run(status="waiting", conclusion="")]}
)
assert result["state"] == PENDING
def test_pending_among_successes(self) -> None:
runs = [
_make_check_run(name="lint"),
_make_check_run(name="tests", status="in_progress", conclusion=""),
]
result = classify({"check_runs": runs})
assert result["state"] == PENDING
def test_commit_status_pending(self) -> None:
result = classify(
{"check_runs": [], "statuses": [_make_commit_status(state="pending")]}
)
assert result["state"] == PENDING
# ===================================================================
# Category: policy_blocked
# ===================================================================
class TestPolicyBlocked:
"""A check requires manual action -> ``policy_blocked``."""
def test_action_required(self) -> None:
result = classify(
{"check_runs": [_make_check_run(conclusion="action_required")]}
)
assert result["state"] == POLICY_BLOCKED
def test_policy_blocked_overrides_failed(self) -> None:
"""policy_blocked has highest priority after no_checks."""
runs = [
_make_check_run(name="lint", conclusion="failure"),
_make_check_run(name="review", conclusion="action_required"),
]
result = classify({"check_runs": runs})
assert result["state"] == POLICY_BLOCKED
def test_policy_blocked_overrides_pending(self) -> None:
runs = [
_make_check_run(name="build", status="in_progress", conclusion=""),
_make_check_run(name="policy", conclusion="action_required"),
]
result = classify({"check_runs": runs})
assert result["state"] == POLICY_BLOCKED
# ===================================================================
# Output contract / metadata retention
# ===================================================================
class TestOutputContract:
"""The JSON output has a stable shape and retains source metadata."""
def test_result_keys(self) -> None:
result = classify({"check_runs": [_make_check_run()]})
assert set(result.keys()) == {"state", "total", "summary", "checks"}
def test_state_is_a_known_value(self) -> None:
for conclusion in ("success", "failure", "action_required"):
result = classify({"check_runs": [_make_check_run(conclusion=conclusion)]})
assert result["state"] in ALL_STATES
def test_check_metadata_retained(self) -> None:
cr = _make_check_run(name="my-job", conclusion="success")
result = classify({"check_runs": [cr]})
meta = result["checks"][0]
assert meta["name"] == "my-job"
assert meta["status"] == "completed"
assert meta["conclusion"] == "success"
assert meta["started_at"] == "2026-01-01T00:00:00Z"
assert meta["completed_at"] == "2026-01-01T00:05:00Z"
def test_commit_status_metadata_retained(self) -> None:
cs = _make_commit_status(context="ci/deploy", state="success")
result = classify({"check_runs": [], "statuses": [cs]})
meta = result["checks"][0]
assert meta["name"] == "ci/deploy"
assert meta["status"] == "success"
def test_result_is_json_serialisable(self) -> None:
result = classify({
"check_runs": [_make_check_run()],
"statuses": [_make_commit_status()],
})
roundtripped = json.loads(json.dumps(result))
assert roundtripped == result
def test_total_matches_checks_length(self) -> None:
runs = [_make_check_run(name=f"job-{i}") for i in range(5)]
result = classify({"check_runs": runs})
assert result["total"] == len(result["checks"]) == 5
# ===================================================================
# CLI entry-point (main)
# ===================================================================
class TestCLI:
"""Test the ``main()`` function that wraps classify for CLI use."""
def test_exit_code_passed(self, tmp_path: Path) -> None:
payload = {"check_runs": [_make_check_run()]}
f = tmp_path / "input.json"
f.write_text(json.dumps(payload))
assert main([str(f)]) == 0
def test_exit_code_failed(self, tmp_path: Path) -> None:
payload = {"check_runs": [_make_check_run(conclusion="failure")]}
f = tmp_path / "input.json"
f.write_text(json.dumps(payload))
assert main([str(f)]) == 1
def test_exit_code_pending(self, tmp_path: Path) -> None:
payload = {"check_runs": [_make_check_run(status="queued", conclusion="")]}
f = tmp_path / "input.json"
f.write_text(json.dumps(payload))
assert main([str(f)]) == 2
def test_exit_code_no_checks(self, tmp_path: Path) -> None:
payload = {"check_runs": []}
f = tmp_path / "input.json"
f.write_text(json.dumps(payload))
assert main([str(f)]) == 2
def test_exit_code_policy_blocked(self, tmp_path: Path) -> None:
payload = {"check_runs": [_make_check_run(conclusion="action_required")]}
f = tmp_path / "input.json"
f.write_text(json.dumps(payload))
assert main([str(f)]) == 1
def test_invalid_json_returns_error(self, tmp_path: Path) -> None:
f = tmp_path / "bad.json"
f.write_text("NOT JSON")
assert main([str(f)]) == 1
def test_missing_file_returns_error(self) -> None:
assert main(["/nonexistent/path.json"]) == 1
# ===================================================================
# Edge cases
# ===================================================================
class TestEdgeCases:
"""Boundary and edge-case scenarios."""
def test_check_run_with_missing_fields(self) -> None:
"""Gracefully handles check runs that omit optional fields."""
result = classify({"check_runs": [{"status": "completed", "conclusion": "success"}]})
assert result["state"] == PASSED
meta = result["checks"][0]
assert meta["name"] == ""
assert meta["started_at"] == ""
def test_case_insensitive_conclusion(self) -> None:
"""Conclusion strings are normalised to lowercase."""
result = classify(
{"check_runs": [_make_check_run(conclusion="FAILURE")]}
)
assert result["state"] == FAILED
def test_case_insensitive_status(self) -> None:
result = classify(
{"check_runs": [_make_check_run(status="IN_PROGRESS", conclusion="")]}
)
assert result["state"] == PENDING
def test_stale_conclusion_is_not_failure(self) -> None:
"""``stale`` is a non-blocking conclusion."""
result = classify(
{"check_runs": [_make_check_run(conclusion="stale")]}
)
assert result["state"] == PASSED
def test_large_number_of_checks(self) -> None:
"""Classifier handles many checks without error."""
runs = [_make_check_run(name=f"job-{i}") for i in range(500)]
result = classify({"check_runs": runs})
assert result["state"] == PASSED
assert result["total"] == 500
def test_mixed_all_states(self) -> None:
"""When all state types are present, policy_blocked wins."""
runs = [
_make_check_run(name="pass", conclusion="success"),
_make_check_run(name="fail", conclusion="failure"),
_make_check_run(name="pend", status="queued", conclusion=""),
_make_check_run(name="block", conclusion="action_required"),
]
result = classify({"check_runs": runs})
assert result["state"] == POLICY_BLOCKED
assert result["total"] == 4

View File

@@ -0,0 +1,293 @@
"""Deterministic CI check-state classifier for CrewAI PR triage.
Normalizes raw GitHub CI check data into a deterministic JSON contract
so that cross-repo planning and execution can rely on a stable,
machine-readable CI-state output.
Categories
----------
- ``passed`` -- every check completed successfully
- ``failed`` -- at least one check failed, timed-out, or was cancelled
- ``pending`` -- at least one check is still queued or in progress
- ``no_checks`` -- the PR has no associated check runs or commit statuses
- ``policy_blocked`` -- at least one check requires manual action (e.g. review)
Usage
-----
Pipe JSON from the GitHub Checks API (or a compatible payload) into stdin::
gh api repos/{owner}/{repo}/commits/{ref}/check-runs | python scripts/classify_ci_checks.py
Or supply a file path as the first positional argument::
python scripts/classify_ci_checks.py checks.json
The script prints a single JSON object to stdout and exits with code 0 for
``passed``, 1 for ``failed``/``policy_blocked``, and 2 for ``pending``/``no_checks``.
Example output::
{
"state": "failed",
"total": 3,
"summary": "1 failed, 2 passed (3 total)",
"checks": [
{
"name": "tests (3.12)",
"status": "completed",
"conclusion": "failure",
"started_at": "2026-02-24T10:00:00Z",
"completed_at": "2026-02-24T10:05:00Z"
},
...
]
}
"""
from __future__ import annotations
import json
import sys
from typing import Any
# ---------------------------------------------------------------------------
# Public state constants
# ---------------------------------------------------------------------------
PASSED: str = "passed"
FAILED: str = "failed"
PENDING: str = "pending"
NO_CHECKS: str = "no_checks"
POLICY_BLOCKED: str = "policy_blocked"
ALL_STATES: frozenset[str] = frozenset(
{PASSED, FAILED, PENDING, NO_CHECKS, POLICY_BLOCKED}
)
# ---------------------------------------------------------------------------
# Internal mapping helpers
# ---------------------------------------------------------------------------
# GitHub check-run conclusions that map to *failed*
_FAILED_CONCLUSIONS: frozenset[str] = frozenset(
{"failure", "timed_out", "cancelled", "startup_failure"}
)
# GitHub check-run conclusions that map to *policy_blocked*
_POLICY_CONCLUSIONS: frozenset[str] = frozenset({"action_required"})
# GitHub check-run statuses that map to *pending*
_PENDING_STATUSES: frozenset[str] = frozenset({"queued", "in_progress", "waiting"})
# GitHub commit-status states that map to *failed*
_FAILED_COMMIT_STATES: frozenset[str] = frozenset({"failure", "error"})
# GitHub commit-status states that map to *pending*
_PENDING_COMMIT_STATES: frozenset[str] = frozenset({"pending"})
# ---------------------------------------------------------------------------
# Core classifier
# ---------------------------------------------------------------------------
def _extract_check_metadata(check: dict[str, Any]) -> dict[str, Any]:
"""Extract audit-relevant metadata from a single check run or status.
Parameters
----------
check:
A single check-run or commit-status object from the GitHub API.
Returns
-------
dict:
Normalized metadata dict with name, status, conclusion, and timestamps.
"""
return {
"name": check.get("name") or check.get("context") or "",
"status": check.get("status") or check.get("state") or "",
"conclusion": check.get("conclusion") or "",
"started_at": check.get("started_at") or "",
"completed_at": check.get("completed_at") or check.get("updated_at") or "",
}
def classify(payload: dict[str, Any]) -> dict[str, Any]:
"""Classify CI check data into a deterministic state.
Accepts the JSON body returned by the GitHub ``check-runs`` endpoint
(which wraps runs in ``{"total_count": N, "check_runs": [...]}``),
a plain list of check-run objects, or a combined payload that also
includes commit statuses under the ``statuses`` key.
Parameters
----------
payload:
Raw GitHub API response or a dict with ``check_runs`` and/or
``statuses`` lists.
Returns
-------
dict:
Deterministic JSON-serialisable result with keys ``state``,
``total``, ``summary``, and ``checks`` (source metadata).
Examples
--------
>>> result = classify({"check_runs": [], "statuses": []})
>>> result["state"]
'no_checks'
>>> result = classify({
... "check_runs": [
... {"name": "lint", "status": "completed", "conclusion": "success"}
... ]
... })
>>> result["state"]
'passed'
"""
# Normalise input: accept top-level list or wrapped object
if isinstance(payload.get("check_runs"), list):
check_runs: list[dict[str, Any]] = payload["check_runs"]
elif isinstance(payload, list): # type: ignore[arg-type]
check_runs = payload # type: ignore[assignment]
else:
check_runs = []
statuses: list[dict[str, Any]] = payload.get("statuses", []) if isinstance(payload, dict) else []
all_metadata: list[dict[str, Any]] = []
has_policy_blocked = False
has_failed = False
has_pending = False
# --- Classify check runs ---
for cr in check_runs:
all_metadata.append(_extract_check_metadata(cr))
status = (cr.get("status") or "").lower()
conclusion = (cr.get("conclusion") or "").lower()
if conclusion in _POLICY_CONCLUSIONS:
has_policy_blocked = True
elif conclusion in _FAILED_CONCLUSIONS:
has_failed = True
elif status in _PENDING_STATUSES:
has_pending = True
# completed + success/neutral/skipped/stale → not a problem
# --- Classify commit statuses ---
for cs in statuses:
all_metadata.append(_extract_check_metadata(cs))
state = (cs.get("state") or "").lower()
if state in _FAILED_COMMIT_STATES:
has_failed = True
elif state in _PENDING_COMMIT_STATES:
has_pending = True
# --- Determine aggregate state (priority order) ---
total = len(all_metadata)
if total == 0:
state = NO_CHECKS
elif has_policy_blocked:
state = POLICY_BLOCKED
elif has_failed:
state = FAILED
elif has_pending:
state = PENDING
else:
state = PASSED
return {
"state": state,
"total": total,
"summary": _build_summary(state, all_metadata),
"checks": all_metadata,
}
def _build_summary(state: str, checks: list[dict[str, Any]]) -> str:
"""Build a human-readable one-line summary.
Parameters
----------
state:
The classified state string.
checks:
List of normalized check metadata dicts.
Returns
-------
str:
Human-readable summary string.
"""
total = len(checks)
if total == 0:
return "No CI checks found"
# Count by conclusion/status bucket
counts: dict[str, int] = {}
for c in checks:
conclusion = c.get("conclusion", "")
status = c.get("status", "")
# Use conclusion if available, otherwise status
bucket = conclusion if conclusion else status
if not bucket:
bucket = "unknown"
counts[bucket] = counts.get(bucket, 0) + 1
parts = [f"{v} {k}" for k, v in sorted(counts.items())]
return f"{', '.join(parts)} ({total} total)"
# ---------------------------------------------------------------------------
# CLI entry-point
# ---------------------------------------------------------------------------
# Exit codes aligned to state severity
_EXIT_CODES: dict[str, int] = {
PASSED: 0,
FAILED: 1,
POLICY_BLOCKED: 1,
PENDING: 2,
NO_CHECKS: 2,
}
def main(argv: list[str] | None = None) -> int:
"""CLI entry-point: read JSON from *stdin* or a file and classify.
Parameters
----------
argv:
Command-line arguments (default: ``sys.argv[1:]``).
Returns
-------
int:
Exit code (0 = passed, 1 = failed/blocked, 2 = pending/no checks).
"""
args = argv if argv is not None else sys.argv[1:]
try:
if args:
with open(args[0]) as fh:
raw = fh.read()
else:
raw = sys.stdin.read()
payload = json.loads(raw)
except (json.JSONDecodeError, OSError) as exc:
print(json.dumps({"error": str(exc)}), file=sys.stderr) # noqa: T201
return 1
result = classify(payload)
print(json.dumps(result, indent=2)) # noqa: T201
return _EXIT_CODES.get(result["state"], 1)
if __name__ == "__main__":
raise SystemExit(main())