from __future__ import annotations """Unified quality gate for the repo's managed production scope.""" import argparse import ast import json import re import xml.etree.ElementTree as ET from dataclasses import dataclass from pathlib import Path from typing import Iterable ROOT = Path(__file__).resolve().parents[2] DEFAULT_CONTRACT = ROOT / "testing" / "quality_contract.json" DEFAULT_BACKEND_COVERAGE = ROOT / "build" / "backend-coverage.xml" DEFAULT_FRONTEND_COVERAGE = ROOT / "frontend" / "coverage" / "coverage-summary.json" TEXT_EXTENSIONS = {".py", ".js", ".mjs", ".ts", ".vue", ".css", ".json", ".yaml", ".yml"} DOCSTRING_MIN_LINES = 10 @dataclass(frozen=True) class GateIssue: """Describe one violated gate condition.""" check: str path: str message: str def load_contract(path: Path) -> dict: """Load the JSON gate contract from disk.""" return json.loads(path.read_text()) def _resolve(path_str: str) -> Path: path = Path(path_str) return path if path.is_absolute() else ROOT / path def _count_lines(path: Path) -> int: return len(path.read_text().splitlines()) def check_file_sizes(paths: Iterable[Path], *, max_lines: int = 500) -> list[GateIssue]: """Flag text files that exceed the maximum line budget.""" issues: list[GateIssue] = [] for path in paths: if not path.exists() or path.suffix.lower() not in TEXT_EXTENSIONS: continue lines = _count_lines(path) if lines > max_lines: issues.append(GateIssue("loc", str(path), f"{lines} lines exceeds {max_lines}")) return issues def _node_span(node: ast.AST) -> int: """Return the physical source span for a parsed Python definition.""" start = getattr(node, "lineno", 0) end = getattr(node, "end_lineno", start) return max(end - start + 1, 1) def _is_nontrivial_python_node(node: ast.AST) -> bool: """Decide whether a Python definition needs an explicit contract. WHY: the gate should document public APIs and meaningful logic without forcing noisy docstrings on tiny private glue helpers. """ name = getattr(node, "name", "") if isinstance(node, ast.ClassDef): return not name.startswith("_") or _node_span(node) >= DOCSTRING_MIN_LINES if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): return False if name.startswith("__") and name.endswith("__"): return _node_span(node) >= DOCSTRING_MIN_LINES if not name.startswith("_"): return True return _node_span(node) >= DOCSTRING_MIN_LINES def _python_node_issues(path: Path) -> list[GateIssue]: """Require docstrings on non-trivial Python functions and classes.""" issues: list[GateIssue] = [] tree = ast.parse(path.read_text()) for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): continue if not _is_nontrivial_python_node(node): continue if ast.get_docstring(node): continue issues.append(GateIssue("docstring", str(path), f"missing docstring on {node.__class__.__name__} {node.name}")) return issues _FUNCTION_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(") _CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_$][\w$]*)\s*") def _has_js_contract(lines: list[str], index: int) -> bool: """Check whether the nearest leading comment block documents a JS function.""" seen_comment = False for pos in range(index - 1, -1, -1): raw = lines[pos].rstrip() stripped = raw.strip() if not stripped: if seen_comment: continue continue if stripped.startswith("//"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.startswith("*"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.endswith("*/"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue if stripped.startswith("/**"): seen_comment = True if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped: return True continue break return seen_comment and any( marker in line for line in lines[max(0, index - 6):index] for marker in ("WHY:", "@param", "@returns") ) def _is_nontrivial_js_definition(lines: list[str], index: int) -> bool: """Decide whether a JavaScript definition needs a leading contract comment.""" current = lines[index] exported = "export" in current.split("function", 1)[0].split("class", 1)[0] if exported: return True depth = 0 for offset, line in enumerate(lines[index:], start=1): depth += line.count("{") depth -= line.count("}") if offset >= DOCSTRING_MIN_LINES: return True if offset > 1 and depth <= 0: return False return False def _js_node_issues(path: Path) -> list[GateIssue]: """Require leading contract comments for non-trivial JS functions/classes.""" lines = path.read_text().splitlines() issues: list[GateIssue] = [] for index, line in enumerate(lines): match = _FUNCTION_RE.match(line) or _CLASS_RE.match(line) if not match: continue if not _is_nontrivial_js_definition(lines, index): continue name = match.group(1) if _has_js_contract(lines, index): continue issues.append(GateIssue("docstring", str(path), f"missing contract comment on {name}")) return issues def check_docstrings(paths: Iterable[Path]) -> list[GateIssue]: """Check that managed production files document non-trivial definitions.""" issues: list[GateIssue] = [] for path in paths: if not path.exists(): continue suffix = path.suffix.lower() if suffix == ".py": issues.extend(_python_node_issues(path)) elif suffix in {".js", ".mjs", ".ts", ".vue"}: issues.extend(_js_node_issues(path)) return issues def _normalize_key(value: str) -> str: return value.replace("\\", "/").lstrip("./") def _path_suffixes(value: str) -> set[str]: parts = _normalize_key(value).split("/") return {"/".join(parts[index:]) for index in range(len(parts))} def _coverage_lookup(report: dict, wanted: str) -> dict | None: wanted_key = _normalize_key(wanted) wanted_suffixes = _path_suffixes(wanted_key) candidates = [] for key, value in report.items(): if not isinstance(value, dict) or "lines" not in value: continue normalized = _normalize_key(key) if normalized == wanted_key or normalized in wanted_suffixes or any(normalized.endswith(f"/{suffix}") for suffix in wanted_suffixes): candidates.append(value) if candidates: return candidates[0] return None def _load_frontend_coverage(path: Path) -> dict: data = json.loads(path.read_text()) return {key: value for key, value in data.items() if isinstance(value, dict)} def _load_backend_coverage(path: Path) -> dict[str, dict[str, float]]: root = ET.parse(path).getroot() report: dict[str, dict[str, float]] = {} for class_node in root.findall(".//class"): filename = class_node.attrib.get("filename") if not filename: continue report[_normalize_key(filename)] = { "lines": float(class_node.attrib.get("line-rate", "0")) * 100, "branches": float(class_node.attrib.get("branch-rate", "0")) * 100, } return report def check_coverage( paths: Iterable[Path], *, backend_report: Path, frontend_report: Path, threshold: float = 95.0, ) -> list[GateIssue]: """Check the per-file coverage floor for the managed production scope.""" issues: list[GateIssue] = [] backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {} frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {} for path in paths: if not path.exists(): continue rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path)) if rel.startswith("backend/"): metrics = _coverage_lookup(backend_cov, rel) if metrics is None: issues.append(GateIssue("coverage", rel, "missing from backend coverage report")) continue if metrics["lines"] < threshold: issues.append(GateIssue("coverage", rel, f"line coverage {metrics['lines']:.2f}% below {threshold}%")) elif rel.startswith("frontend/"): lookup = rel.split("frontend/", 1)[1] metrics = _coverage_lookup(frontend_cov, lookup) if metrics is None: issues.append(GateIssue("coverage", rel, "missing from frontend coverage report")) continue pct = metrics.get("lines", {}).get("pct", 0.0) if pct < threshold: issues.append(GateIssue("coverage", rel, f"line coverage {pct:.2f}% below {threshold}%")) return issues def _coverage_values_for_paths( paths: Iterable[Path], *, backend_report: Path, frontend_report: Path, ) -> list[float]: """Return per-file line coverage values for tracked backend/frontend files.""" backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {} frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {} values: list[float] = [] for path in paths: if not path.exists(): continue rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path)) if rel.startswith("backend/"): metrics = _coverage_lookup(backend_cov, rel) if metrics is None: continue values.append(float(metrics.get("lines", 0.0))) continue if rel.startswith("frontend/"): lookup = rel.split("frontend/", 1)[1] metrics = _coverage_lookup(frontend_cov, lookup) if metrics is None: continue pct = metrics.get("lines", {}).get("pct", 0.0) values.append(float(pct)) return values def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run the repo's unified quality gate") parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract") parser.add_argument("--backend-coverage", default=str(DEFAULT_BACKEND_COVERAGE), help="Backend coverage XML") parser.add_argument("--frontend-coverage", default=str(DEFAULT_FRONTEND_COVERAGE), help="Frontend coverage summary JSON") parser.add_argument("--report", default=str(ROOT / "build" / "quality-gate.json"), help="Write a JSON report here") return parser def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage: Path) -> tuple[list[GateIssue], dict]: contract = load_contract(contract_path) managed_files = [_resolve(path) for path in contract["managed_files"]] docstring_files = [_resolve(path) for path in contract["docstring_files"]] coverage_files = [_resolve(path) for path in contract["coverage_files"]] max_lines = int(contract.get("max_lines", 500)) threshold = float(contract.get("coverage_threshold_pct", 95)) issues: list[GateIssue] = [] issues.extend(check_file_sizes(managed_files, max_lines=max_lines)) issues.extend(check_docstrings(docstring_files)) issues.extend(check_coverage(coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, threshold=threshold)) coverage_values = _coverage_values_for_paths( coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, ) workspace_line_coverage_percent = round(sum(coverage_values) / len(coverage_values), 3) if coverage_values else 0.0 source_lines_over_500 = sum(1 for issue in issues if issue.check == "loc") report = { "managed_files": [str(path.relative_to(ROOT)) for path in managed_files], "docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files], "coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files], "max_lines": max_lines, "coverage_threshold_pct": threshold, "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, "issue_count": len(issues), "issues": [issue.__dict__ for issue in issues], } return issues, report def main(argv: list[str] | None = None) -> int: parser = _build_parser() args = parser.parse_args(argv) backend_coverage = _resolve(args.backend_coverage) frontend_coverage = _resolve(args.frontend_coverage) report_path = _resolve(args.report) issues, report = run_gate(_resolve(args.contract), backend_coverage=backend_coverage, frontend_coverage=frontend_coverage) report_path.parent.mkdir(parents=True, exist_ok=True) report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n") for issue in issues: print(f"{issue.check}: {issue.path}: {issue.message}") if issues: print(f"quality gate failed: {len(issues)} issue(s)") return 1 print(f"quality gate passed: {len(report['managed_files'])} managed files checked") return 0 if __name__ == "__main__": raise SystemExit(main())