bstein-dev-home/testing/ci/quality_gate.py

330 lines
12 KiB
Python

from __future__ import annotations
"""Unified quality gate for the repo's managed production scope."""
import argparse
import ast
import json
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_CONTRACT = ROOT / "testing" / "quality_contract.json"
DEFAULT_BACKEND_COVERAGE = ROOT / "build" / "backend-coverage.xml"
DEFAULT_FRONTEND_COVERAGE = ROOT / "frontend" / "coverage" / "coverage-summary.json"
TEXT_EXTENSIONS = {".py", ".js", ".mjs", ".ts", ".vue", ".json", ".yaml", ".yml"}
@dataclass(frozen=True)
class GateIssue:
"""Describe one violated gate condition."""
check: str
path: str
message: str
def load_contract(path: Path) -> dict:
"""Load the JSON gate contract from disk."""
return json.loads(path.read_text())
def _resolve(path_str: str) -> Path:
path = Path(path_str)
return path if path.is_absolute() else ROOT / path
def _count_lines(path: Path) -> int:
return len(path.read_text().splitlines())
def check_file_sizes(paths: Iterable[Path], *, max_lines: int = 500) -> list[GateIssue]:
"""Flag text files that exceed the maximum line budget."""
issues: list[GateIssue] = []
for path in paths:
if not path.exists() or path.suffix.lower() not in TEXT_EXTENSIONS:
continue
lines = _count_lines(path)
if lines > max_lines:
issues.append(GateIssue("loc", str(path), f"{lines} lines exceeds {max_lines}"))
return issues
def _python_node_issues(path: Path) -> list[GateIssue]:
"""Require docstrings on all functions and classes in a Python module."""
issues: list[GateIssue] = []
tree = ast.parse(path.read_text())
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
continue
if ast.get_docstring(node):
continue
issues.append(GateIssue("docstring", str(path), f"missing docstring on {node.__class__.__name__} {node.name}"))
return issues
_FUNCTION_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(")
_CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_$][\w$]*)\s*")
def _has_js_contract(lines: list[str], index: int) -> bool:
"""Check whether the nearest leading comment block documents a JS function."""
seen_comment = False
for pos in range(index - 1, -1, -1):
raw = lines[pos].rstrip()
stripped = raw.strip()
if not stripped:
if seen_comment:
continue
continue
if stripped.startswith("//"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.startswith("*"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.endswith("*/"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.startswith("/**"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
break
return seen_comment and any(
marker in line for line in lines[max(0, index - 6):index] for marker in ("WHY:", "@param", "@returns")
)
def _js_node_issues(path: Path) -> list[GateIssue]:
"""Require leading contract comments for named JS functions and classes."""
lines = path.read_text().splitlines()
issues: list[GateIssue] = []
for index, line in enumerate(lines):
match = _FUNCTION_RE.match(line) or _CLASS_RE.match(line)
if not match:
continue
name = match.group(1)
if _has_js_contract(lines, index):
continue
issues.append(GateIssue("docstring", str(path), f"missing contract comment on {name}"))
return issues
def check_docstrings(paths: Iterable[Path]) -> list[GateIssue]:
"""Check that managed production files document non-trivial definitions."""
issues: list[GateIssue] = []
for path in paths:
if not path.exists():
continue
suffix = path.suffix.lower()
if suffix == ".py":
issues.extend(_python_node_issues(path))
elif suffix in {".js", ".mjs", ".ts", ".vue"}:
issues.extend(_js_node_issues(path))
return issues
def _normalize_key(value: str) -> str:
return value.replace("\\", "/").lstrip("./")
def _path_suffixes(value: str) -> set[str]:
parts = _normalize_key(value).split("/")
return {"/".join(parts[index:]) for index in range(len(parts))}
def _coverage_lookup(report: dict, wanted: str) -> dict | None:
wanted_key = _normalize_key(wanted)
wanted_suffixes = _path_suffixes(wanted_key)
candidates = []
for key, value in report.items():
if not isinstance(value, dict) or "lines" not in value:
continue
normalized = _normalize_key(key)
if normalized == wanted_key or normalized in wanted_suffixes or any(normalized.endswith(f"/{suffix}") for suffix in wanted_suffixes):
candidates.append(value)
if candidates:
return candidates[0]
return None
def _load_frontend_coverage(path: Path) -> dict:
data = json.loads(path.read_text())
return {key: value for key, value in data.items() if isinstance(value, dict)}
def _load_backend_coverage(path: Path) -> dict[str, dict[str, float]]:
root = ET.parse(path).getroot()
report: dict[str, dict[str, float]] = {}
for class_node in root.findall(".//class"):
filename = class_node.attrib.get("filename")
if not filename:
continue
report[_normalize_key(filename)] = {
"lines": float(class_node.attrib.get("line-rate", "0")) * 100,
"branches": float(class_node.attrib.get("branch-rate", "0")) * 100,
}
return report
def check_coverage(
paths: Iterable[Path],
*,
backend_report: Path,
frontend_report: Path,
threshold: float = 95.0,
) -> list[GateIssue]:
"""Check the per-file coverage floor for the managed production scope."""
issues: list[GateIssue] = []
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
for path in paths:
if not path.exists():
continue
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
if rel.startswith("backend/"):
metrics = _coverage_lookup(backend_cov, rel)
if metrics is None:
issues.append(GateIssue("coverage", rel, "missing from backend coverage report"))
continue
if metrics["lines"] < threshold:
issues.append(GateIssue("coverage", rel, f"line coverage {metrics['lines']:.2f}% below {threshold}%"))
elif rel.startswith("frontend/"):
lookup = rel.split("frontend/", 1)[1]
metrics = _coverage_lookup(frontend_cov, lookup)
if metrics is None:
issues.append(GateIssue("coverage", rel, "missing from frontend coverage report"))
continue
pct = metrics.get("lines", {}).get("pct", 0.0)
if pct < threshold:
issues.append(GateIssue("coverage", rel, f"line coverage {pct:.2f}% below {threshold}%"))
return issues
def compute_workspace_line_coverage(
paths: Iterable[Path],
*,
backend_report: Path,
frontend_report: Path,
) -> float:
"""Compute the mean line coverage percentage across managed coverage files."""
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
samples: list[float] = []
for path in paths:
if not path.exists():
continue
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
if rel.startswith("backend/"):
metrics = _coverage_lookup(backend_cov, rel)
if not metrics:
continue
samples.append(float(metrics.get("lines", 0.0)))
elif rel.startswith("frontend/"):
lookup = rel.split("frontend/", 1)[1]
metrics = _coverage_lookup(frontend_cov, lookup)
if not metrics:
continue
lines = metrics.get("lines")
if isinstance(lines, dict):
samples.append(float(lines.get("pct", 0.0)))
if not samples:
return 0.0
return round(sum(samples) / len(samples), 3)
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run the repo's unified quality gate")
parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract")
parser.add_argument("--backend-coverage", default=str(DEFAULT_BACKEND_COVERAGE), help="Backend coverage XML")
parser.add_argument("--frontend-coverage", default=str(DEFAULT_FRONTEND_COVERAGE), help="Frontend coverage summary JSON")
parser.add_argument("--report", default=str(ROOT / "build" / "quality-gate.json"), help="Write a JSON report here")
return parser
def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage: Path) -> tuple[list[GateIssue], dict]:
contract = load_contract(contract_path)
managed_files = [_resolve(path) for path in contract["managed_files"]]
docstring_files = [_resolve(path) for path in contract["docstring_files"]]
coverage_files = [_resolve(path) for path in contract["coverage_files"]]
max_lines = int(contract.get("max_lines", 500))
threshold = float(contract.get("coverage_threshold_pct", 95))
issues: list[GateIssue] = []
loc_issues = check_file_sizes(managed_files, max_lines=max_lines)
doc_issues = check_docstrings(docstring_files)
coverage_issues = check_coverage(
coverage_files,
backend_report=backend_coverage,
frontend_report=frontend_coverage,
threshold=threshold,
)
issues.extend(loc_issues)
issues.extend(doc_issues)
issues.extend(coverage_issues)
workspace_line_coverage = compute_workspace_line_coverage(
coverage_files,
backend_report=backend_coverage,
frontend_report=frontend_coverage,
)
report = {
"managed_files": [str(path.relative_to(ROOT)) for path in managed_files],
"docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files],
"coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files],
"max_lines": max_lines,
"coverage_threshold_pct": threshold,
"workspace_line_coverage_percent": workspace_line_coverage,
"source_lines_over_500": len(loc_issues),
"issue_count": len(issues),
"issues": [issue.__dict__ for issue in issues],
}
return issues, report
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
backend_coverage = _resolve(args.backend_coverage)
frontend_coverage = _resolve(args.frontend_coverage)
report_path = _resolve(args.report)
issues, report = run_gate(_resolve(args.contract), backend_coverage=backend_coverage, frontend_coverage=frontend_coverage)
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n")
for issue in issues:
print(f"{issue.check}: {issue.path}: {issue.message}")
if issues:
print(f"quality gate failed: {len(issues)} issue(s)")
return 1
print(f"quality gate passed: {len(report['managed_files'])} managed files checked")
return 0
if __name__ == "__main__":
raise SystemExit(main())