from __future__ import annotations """Unified quality gate for the repo's managed production scope.""" import argparse import ast import json import re import xml.etree.ElementTree as ET from dataclasses import dataclass from pathlib import Path from typing import Iterable ROOT = Path(__file__).resolve().parents[2] DEFAULT_CONTRACT = ROOT / "testing" / "quality_contract.json" DEFAULT_BACKEND_COVERAGE = ROOT / "build" / "backend-coverage.xml" DEFAULT_FRONTEND_COVERAGE = ROOT / "frontend" / "coverage" / "coverage-summary.json" TEXT_EXTENSIONS = {".py", ".js", ".mjs", ".ts", ".vue", ".json", ".yaml", ".yml"} @dataclass(frozen=True) class GateIssue: """Describe one violated gate condition.""" check: str path: str message: str def load_contract(path: Path) -> dict: """Load the JSON gate contract from disk.""" return json.loads(path.read_text()) def _resolve(path_str: str) -> Path: path = Path(path_str) return path if path.is_absolute() else ROOT / path def _count_lines(path: Path) -> int: return len(path.read_text().splitlines()) def check_file_sizes(paths: Iterable[Path], *, max_lines: int = 500) -> list[GateIssue]: """Flag text files that exceed the maximum line budget.""" issues: list[GateIssue] = [] for path in paths: if not path.exists() or path.suffix.lower() not in TEXT_EXTENSIONS: continue lines = _count_lines(path) if lines > max_lines: issues.append(GateIssue("loc", str(path), f"{lines} lines exceeds {max_lines}")) return issues def _python_node_issues(path: Path) -> list[GateIssue]: """Require docstrings on all functions and classes in a Python module.""" issues: list[GateIssue] = [] tree = ast.parse(path.read_text()) for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): continue if ast.get_docstring(node): continue issues.append(GateIssue("docstring", str(path), f"missing docstring on {node.__class__.__name__} {node.name}")) return issues _FUNCTION_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(") _CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_$][\w$]*)\s*") def _has_js_contract(lines: list[str], index: int) -> bool: """Check whether the nearest leading comment block documents a JS function.""" seen_comment = False for pos in range(index - 1, -1, -1): raw = lines[pos].rstrip() stripped = raw.strip() if not stripped: if seen_comment: continue continue if stripped.startswith("//"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.startswith("*"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.endswith("*/"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.startswith("/**"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue break return seen_comment and any( marker in line for line in lines[max(0, index - 6):index] for marker in ("WHY:", "@param", "@returns") ) def _js_node_issues(path: Path) -> list[GateIssue]: """Require leading contract comments for named JS functions and classes.""" lines = path.read_text().splitlines() issues: list[GateIssue] = [] for index, line in enumerate(lines): match = _FUNCTION_RE.match(line) or _CLASS_RE.match(line) if not match: continue name = match.group(1) if _has_js_contract(lines, index): continue issues.append(GateIssue("docstring", str(path), f"missing contract comment on {name}")) return issues def check_docstrings(paths: Iterable[Path]) -> list[GateIssue]: """Check that managed production files document non-trivial definitions.""" issues: list[GateIssue] = [] for path in paths: if not path.exists(): continue suffix = path.suffix.lower() if suffix == ".py": issues.extend(_python_node_issues(path)) elif suffix in {".js", ".mjs", ".ts", ".vue"}: issues.extend(_js_node_issues(path)) return issues def _normalize_key(value: str) -> str: return value.replace("\\", "/").lstrip("./") def _path_suffixes(value: str) -> set[str]: parts = _normalize_key(value).split("/") return {"/".join(parts[index:]) for index in range(len(parts))} def _coverage_lookup(report: dict, wanted: str) -> dict | None: wanted_key = _normalize_key(wanted) wanted_suffixes = _path_suffixes(wanted_key) candidates = [] for key, value in report.items(): if not isinstance(value, dict) or "lines" not in value: continue normalized = _normalize_key(key) if normalized == wanted_key or normalized in wanted_suffixes or any(normalized.endswith(f"/{suffix}") for suffix in wanted_suffixes): candidates.append(value) if candidates: return candidates[0] return None def _load_frontend_coverage(path: Path) -> dict: data = json.loads(path.read_text()) return {key: value for key, value in data.items() if isinstance(value, dict)} def _load_backend_coverage(path: Path) -> dict[str, dict[str, float]]: root = ET.parse(path).getroot() report: dict[str, dict[str, float]] = {} for class_node in root.findall(".//class"): filename = class_node.attrib.get("filename") if not filename: continue report[_normalize_key(filename)] = { "lines": float(class_node.attrib.get("line-rate", "0")) * 100, "branches": float(class_node.attrib.get("branch-rate", "0")) * 100, } return report def check_coverage( paths: Iterable[Path], *, backend_report: Path, frontend_report: Path, threshold: float = 95.0, ) -> list[GateIssue]: """Check the per-file coverage floor for the managed production scope.""" issues: list[GateIssue] = [] backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {} frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {} for path in paths: if not path.exists(): continue rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path)) if rel.startswith("backend/"): metrics = _coverage_lookup(backend_cov, rel) if metrics is None: issues.append(GateIssue("coverage", rel, "missing from backend coverage report")) continue if metrics["lines"] < threshold: issues.append(GateIssue("coverage", rel, f"line coverage {metrics['lines']:.2f}% below {threshold}%")) elif rel.startswith("frontend/"): lookup = rel.split("frontend/", 1)[1] metrics = _coverage_lookup(frontend_cov, lookup) if metrics is None: issues.append(GateIssue("coverage", rel, "missing from frontend coverage report")) continue pct = metrics.get("lines", {}).get("pct", 0.0) if pct < threshold: issues.append(GateIssue("coverage", rel, f"line coverage {pct:.2f}% below {threshold}%")) return issues def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run the repo's unified quality gate") parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract") parser.add_argument("--backend-coverage", default=str(DEFAULT_BACKEND_COVERAGE), help="Backend coverage XML") parser.add_argument("--frontend-coverage", default=str(DEFAULT_FRONTEND_COVERAGE), help="Frontend coverage summary JSON") parser.add_argument("--report", default=str(ROOT / "build" / "quality-gate.json"), help="Write a JSON report here") return parser def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage: Path) -> tuple[list[GateIssue], dict]: contract = load_contract(contract_path) managed_files = [_resolve(path) for path in contract["managed_files"]] docstring_files = [_resolve(path) for path in contract["docstring_files"]] coverage_files = [_resolve(path) for path in contract["coverage_files"]] max_lines = int(contract.get("max_lines", 500)) threshold = float(contract.get("coverage_threshold_pct", 95)) issues: list[GateIssue] = [] issues.extend(check_file_sizes(managed_files, max_lines=max_lines)) issues.extend(check_docstrings(docstring_files)) issues.extend(check_coverage(coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, threshold=threshold)) report = { "managed_files": [str(path.relative_to(ROOT)) for path in managed_files], "docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files], "coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files], "max_lines": max_lines, "coverage_threshold_pct": threshold, "issue_count": len(issues), "issues": [issue.__dict__ for issue in issues], } return issues, report def main(argv: list[str] | None = None) -> int: parser = _build_parser() args = parser.parse_args(argv) backend_coverage = _resolve(args.backend_coverage) frontend_coverage = _resolve(args.frontend_coverage) report_path = _resolve(args.report) issues, report = run_gate(_resolve(args.contract), backend_coverage=backend_coverage, frontend_coverage=frontend_coverage) report_path.parent.mkdir(parents=True, exist_ok=True) report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n") for issue in issues: print(f"{issue.check}: {issue.path}: {issue.message}") if issues: print(f"quality gate failed: {len(issues)} issue(s)") return 1 print(f"quality gate passed: {len(report['managed_files'])} managed files checked") return 0 if __name__ == "__main__": raise SystemExit(main())