bstein-dev-home/testing/ci/quality_gate.py

371 lines
14 KiB
Python

from __future__ import annotations
"""Unified quality gate for the repo's managed production scope."""
import argparse
import ast
import json
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_CONTRACT = ROOT / "testing" / "quality_contract.json"
DEFAULT_BACKEND_COVERAGE = ROOT / "build" / "backend-coverage.xml"
DEFAULT_FRONTEND_COVERAGE = ROOT / "frontend" / "coverage" / "coverage-summary.json"
TEXT_EXTENSIONS = {".py", ".js", ".mjs", ".ts", ".vue", ".css", ".json", ".yaml", ".yml"}
DOCSTRING_MIN_LINES = 10
@dataclass(frozen=True)
class GateIssue:
"""Describe one violated gate condition."""
check: str
path: str
message: str
def load_contract(path: Path) -> dict:
"""Load the JSON gate contract from disk."""
return json.loads(path.read_text())
def _resolve(path_str: str) -> Path:
path = Path(path_str)
return path if path.is_absolute() else ROOT / path
def _count_lines(path: Path) -> int:
return len(path.read_text().splitlines())
def check_file_sizes(paths: Iterable[Path], *, max_lines: int = 500) -> list[GateIssue]:
"""Flag text files that exceed the maximum line budget."""
issues: list[GateIssue] = []
for path in paths:
if not path.exists() or path.suffix.lower() not in TEXT_EXTENSIONS:
continue
lines = _count_lines(path)
if lines > max_lines:
issues.append(GateIssue("loc", str(path), f"{lines} lines exceeds {max_lines}"))
return issues
def _node_span(node: ast.AST) -> int:
"""Return the physical source span for a parsed Python definition."""
start = getattr(node, "lineno", 0)
end = getattr(node, "end_lineno", start)
return max(end - start + 1, 1)
def _is_nontrivial_python_node(node: ast.AST) -> bool:
"""Decide whether a Python definition needs an explicit contract.
WHY: the gate should document public APIs and meaningful logic without
forcing noisy docstrings on tiny private glue helpers.
"""
name = getattr(node, "name", "")
if isinstance(node, ast.ClassDef):
return not name.startswith("_") or _node_span(node) >= DOCSTRING_MIN_LINES
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
return False
if name.startswith("__") and name.endswith("__"):
return _node_span(node) >= DOCSTRING_MIN_LINES
if not name.startswith("_"):
return True
return _node_span(node) >= DOCSTRING_MIN_LINES
def _python_node_issues(path: Path) -> list[GateIssue]:
"""Require docstrings on non-trivial Python functions and classes."""
issues: list[GateIssue] = []
tree = ast.parse(path.read_text())
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
continue
if not _is_nontrivial_python_node(node):
continue
if ast.get_docstring(node):
continue
issues.append(GateIssue("docstring", str(path), f"missing docstring on {node.__class__.__name__} {node.name}"))
return issues
_FUNCTION_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(")
_CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_$][\w$]*)\s*")
def _has_js_contract(lines: list[str], index: int) -> bool:
"""Check whether the nearest leading comment block documents a JS function."""
seen_comment = False
for pos in range(index - 1, -1, -1):
raw = lines[pos].rstrip()
stripped = raw.strip()
if not stripped:
if seen_comment:
continue
continue
if stripped.startswith("//"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.startswith("*"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.endswith("*/"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
if stripped.startswith("/**"):
seen_comment = True
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
return True
continue
break
return seen_comment and any(
marker in line for line in lines[max(0, index - 6):index] for marker in ("WHY:", "@param", "@returns")
)
def _is_nontrivial_js_definition(lines: list[str], index: int) -> bool:
"""Decide whether a JavaScript definition needs a leading contract comment."""
current = lines[index]
exported = "export" in current.split("function", 1)[0].split("class", 1)[0]
if exported:
return True
depth = 0
for offset, line in enumerate(lines[index:], start=1):
depth += line.count("{")
depth -= line.count("}")
if offset >= DOCSTRING_MIN_LINES:
return True
if offset > 1 and depth <= 0:
return False
return False
def _js_node_issues(path: Path) -> list[GateIssue]:
"""Require leading contract comments for non-trivial JS functions/classes."""
lines = path.read_text().splitlines()
issues: list[GateIssue] = []
for index, line in enumerate(lines):
match = _FUNCTION_RE.match(line) or _CLASS_RE.match(line)
if not match:
continue
if not _is_nontrivial_js_definition(lines, index):
continue
name = match.group(1)
if _has_js_contract(lines, index):
continue
issues.append(GateIssue("docstring", str(path), f"missing contract comment on {name}"))
return issues
def check_docstrings(paths: Iterable[Path]) -> list[GateIssue]:
"""Check that managed production files document non-trivial definitions."""
issues: list[GateIssue] = []
for path in paths:
if not path.exists():
continue
suffix = path.suffix.lower()
if suffix == ".py":
issues.extend(_python_node_issues(path))
elif suffix in {".js", ".mjs", ".ts", ".vue"}:
issues.extend(_js_node_issues(path))
return issues
def _normalize_key(value: str) -> str:
return value.replace("\\", "/").lstrip("./")
def _path_suffixes(value: str) -> set[str]:
parts = _normalize_key(value).split("/")
return {"/".join(parts[index:]) for index in range(len(parts))}
def _coverage_lookup(report: dict, wanted: str) -> dict | None:
wanted_key = _normalize_key(wanted)
wanted_suffixes = _path_suffixes(wanted_key)
candidates = []
for key, value in report.items():
if not isinstance(value, dict) or "lines" not in value:
continue
normalized = _normalize_key(key)
if normalized == wanted_key or normalized in wanted_suffixes or any(normalized.endswith(f"/{suffix}") for suffix in wanted_suffixes):
candidates.append(value)
if candidates:
return candidates[0]
return None
def _load_frontend_coverage(path: Path) -> dict:
data = json.loads(path.read_text())
return {key: value for key, value in data.items() if isinstance(value, dict)}
def _load_backend_coverage(path: Path) -> dict[str, dict[str, float]]:
root = ET.parse(path).getroot()
report: dict[str, dict[str, float]] = {}
for class_node in root.findall(".//class"):
filename = class_node.attrib.get("filename")
if not filename:
continue
report[_normalize_key(filename)] = {
"lines": float(class_node.attrib.get("line-rate", "0")) * 100,
"branches": float(class_node.attrib.get("branch-rate", "0")) * 100,
}
return report
def check_coverage(
paths: Iterable[Path],
*,
backend_report: Path,
frontend_report: Path,
threshold: float = 95.0,
) -> list[GateIssue]:
"""Check the per-file coverage floor for the managed production scope."""
issues: list[GateIssue] = []
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
for path in paths:
if not path.exists():
continue
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
if rel.startswith("backend/"):
metrics = _coverage_lookup(backend_cov, rel)
if metrics is None:
issues.append(GateIssue("coverage", rel, "missing from backend coverage report"))
continue
if metrics["lines"] < threshold:
issues.append(GateIssue("coverage", rel, f"line coverage {metrics['lines']:.2f}% below {threshold}%"))
elif rel.startswith("frontend/"):
lookup = rel.split("frontend/", 1)[1]
metrics = _coverage_lookup(frontend_cov, lookup)
if metrics is None:
issues.append(GateIssue("coverage", rel, "missing from frontend coverage report"))
continue
pct = metrics.get("lines", {}).get("pct", 0.0)
if pct < threshold:
issues.append(GateIssue("coverage", rel, f"line coverage {pct:.2f}% below {threshold}%"))
return issues
def _coverage_values_for_paths(
paths: Iterable[Path],
*,
backend_report: Path,
frontend_report: Path,
) -> list[float]:
"""Return per-file line coverage values for tracked backend/frontend files."""
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
values: list[float] = []
for path in paths:
if not path.exists():
continue
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
if rel.startswith("backend/"):
metrics = _coverage_lookup(backend_cov, rel)
if metrics is None:
continue
values.append(float(metrics.get("lines", 0.0)))
continue
if rel.startswith("frontend/"):
lookup = rel.split("frontend/", 1)[1]
metrics = _coverage_lookup(frontend_cov, lookup)
if metrics is None:
continue
pct = metrics.get("lines", {}).get("pct", 0.0)
values.append(float(pct))
return values
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run the repo's unified quality gate")
parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract")
parser.add_argument("--backend-coverage", default=str(DEFAULT_BACKEND_COVERAGE), help="Backend coverage XML")
parser.add_argument("--frontend-coverage", default=str(DEFAULT_FRONTEND_COVERAGE), help="Frontend coverage summary JSON")
parser.add_argument("--report", default=str(ROOT / "build" / "quality-gate.json"), help="Write a JSON report here")
return parser
def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage: Path) -> tuple[list[GateIssue], dict]:
contract = load_contract(contract_path)
managed_files = [_resolve(path) for path in contract["managed_files"]]
docstring_files = [_resolve(path) for path in contract["docstring_files"]]
coverage_files = [_resolve(path) for path in contract["coverage_files"]]
max_lines = int(contract.get("max_lines", 500))
threshold = float(contract.get("coverage_threshold_pct", 95))
issues: list[GateIssue] = []
issues.extend(check_file_sizes(managed_files, max_lines=max_lines))
issues.extend(check_docstrings(docstring_files))
issues.extend(check_coverage(coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, threshold=threshold))
coverage_values = _coverage_values_for_paths(
coverage_files,
backend_report=backend_coverage,
frontend_report=frontend_coverage,
)
workspace_line_coverage_percent = round(sum(coverage_values) / len(coverage_values), 3) if coverage_values else 0.0
source_lines_over_500 = sum(1 for issue in issues if issue.check == "loc")
report = {
"managed_files": [str(path.relative_to(ROOT)) for path in managed_files],
"docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files],
"coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files],
"max_lines": max_lines,
"coverage_threshold_pct": threshold,
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": source_lines_over_500,
"issue_count": len(issues),
"issues": [issue.__dict__ for issue in issues],
}
return issues, report
def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
backend_coverage = _resolve(args.backend_coverage)
frontend_coverage = _resolve(args.frontend_coverage)
report_path = _resolve(args.report)
issues, report = run_gate(_resolve(args.contract), backend_coverage=backend_coverage, frontend_coverage=frontend_coverage)
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n")
for issue in issues:
print(f"{issue.check}: {issue.path}: {issue.message}")
if issues:
print(f"quality gate failed: {len(issues)} issue(s)")
return 1
print(f"quality gate passed: {len(report['managed_files'])} managed files checked")
return 0
if __name__ == "__main__":
raise SystemExit(main())