#!/usr/bin/env python3 """Evaluate Pegasus quality checks from test artifacts and source files.""" from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import dataclass, asdict from pathlib import Path from typing import Iterable ROOT = Path(__file__).resolve().parents[1] BACKEND = ROOT / "backend" FRONTEND = ROOT / "frontend" BUILD = ROOT / "build" PROD_EXTS = {".go", ".ts", ".tsx", ".py"} @dataclass class GateIssue: check: str path: str detail: str @dataclass class GateReport: ok: bool issues: list[GateIssue] file_count: int backend_coverage: dict[str, float] frontend_coverage: dict[str, dict[str, float]] def _run(cmd: list[str], cwd: Path | None = None) -> tuple[int, str]: proc = subprocess.run(cmd, cwd=cwd or ROOT, capture_output=True, text=True) return proc.returncode, proc.stdout + proc.stderr def _rel(path: Path) -> str: return str(path.resolve().relative_to(ROOT)) def _production_files() -> list[Path]: files: list[Path] = [] for path in ROOT.rglob("*"): if not path.is_file(): continue if path.name.endswith("_test.go"): continue if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"): continue if path.name == "AGENTS.md": continue if path.suffix not in PROD_EXTS and path.name not in {"Dockerfile", "Jenkinsfile"}: continue if any(part in {".git", "node_modules", "dist", "coverage", "build"} for part in path.parts): continue rel = path.relative_to(ROOT) if rel.parts[0] == "backend" and path.suffix == ".go" and path.name.endswith(".go"): files.append(path) elif rel.parts[0] == "frontend" and len(rel.parts) >= 3 and rel.parts[1] == "src" and path.suffix in {".ts", ".tsx"}: if rel.parts[2] in {"test", "types"}: continue if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx") or path.name.endswith(".d.ts"): continue files.append(path) elif path.name in {"Dockerfile", "Jenkinsfile"}: files.append(path) elif rel.parts[0] == "scripts" and path.suffix == ".py": files.append(path) return sorted(set(files)) def _line_count(path: Path) -> int: return len(path.read_text(encoding="utf-8").splitlines()) def _check_loc(files: Iterable[Path]) -> list[GateIssue]: issues: list[GateIssue] = [] for path in files: if _line_count(path) > 500: issues.append(GateIssue("loc", _rel(path), f"{_line_count(path)} lines exceeds 500")) return issues def _go_exported_comment_issues(files: Iterable[Path]) -> list[GateIssue]: issues: list[GateIssue] = [] for path in files: if path.suffix != ".go" or path.name.endswith("_test.go"): continue lines = path.read_text(encoding="utf-8").splitlines() for idx, line in enumerate(lines): m = re.match(r"^(func|type|var|const)\s+([A-Z][A-Za-z0-9_]*)", line.strip()) if not m: continue prev = idx - 1 while prev >= 0 and not lines[prev].strip(): prev -= 1 if prev < 0 or not lines[prev].lstrip().startswith("//"): issues.append(GateIssue("go-doc", _rel(path), f"exported {m.group(1)} {m.group(2)} lacks leading comment")) return issues def _ts_export_comment_issues(files: Iterable[Path]) -> list[GateIssue]: issues: list[GateIssue] = [] for path in files: if path.suffix not in {".ts", ".tsx"} or path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"): continue lines = path.read_text(encoding="utf-8").splitlines() for idx, line in enumerate(lines): if not re.match(r"^\s*export\s+(function|const|type)\s+[A-Z_a-z]", line): continue prev = idx - 1 while prev >= 0 and not lines[prev].strip(): prev -= 1 if prev < 0 or not lines[prev].lstrip().startswith("/**"): issues.append(GateIssue("ts-doc", _rel(path), "exported symbol lacks JSDoc contract")) return issues def _go_vet() -> list[GateIssue]: rc, output = _run(["go", "vet", "./..."], cwd=BACKEND) if rc == 0: return [] return [GateIssue("go-vet", "backend", output.strip() or "go vet failed")] def _tsc_check() -> list[GateIssue]: rc, output = _run(["npm", "exec", "tsc", "--", "--noEmit"], cwd=FRONTEND) if rc == 0: return [] return [GateIssue("tsc", "frontend", output.strip() or "tsc failed")] def _parse_go_coverage(path: Path) -> dict[str, float]: if not path.exists(): return {} _, output = _run(["go", "tool", "cover", "-func", str(path)], cwd=BACKEND) cover: dict[str, float] = {} for line in output.splitlines(): if "\t" not in line or line.startswith("total:"): continue parts = line.split("\t") file_part = parts[0].split(":")[0] pct = float(parts[-1].rstrip("%")) file_norm = file_part.replace("\\", "/") if "/backend/" in file_norm: file_norm = "backend/" + file_norm.split("/backend/", 1)[1] elif file_norm.startswith("backend/"): pass else: file_norm = file_norm.rsplit("/", 1)[-1] cover[file_norm] = pct return cover def _parse_frontend_coverage(path: Path) -> dict[str, dict[str, float]]: if not path.exists(): return {} def _pct(value: object) -> float: if isinstance(value, (int, float)): return float(value) if isinstance(value, str): match = re.search(r"[0-9]+(?:\.[0-9]+)?", value) if match: return float(match.group(0)) return 0.0 def _normalize_key(raw: str) -> str: key = raw.replace("\\", "/") if key.startswith("./"): key = key[2:] for prefix in (str(ROOT).replace("\\", "/") + "/", str(FRONTEND).replace("\\", "/") + "/"): if key.startswith(prefix): key = key[len(prefix):] break key = key.lstrip("/") if key.startswith("frontend/"): return key return f"frontend/{key}" payload = json.loads(path.read_text(encoding="utf-8")) out: dict[str, dict[str, float]] = {} for file_path, summary in payload.items(): if file_path == "total": continue lines = summary.get("lines") or {} statements = summary.get("statements") or {} out[_normalize_key(str(file_path))] = { "lines": _pct(lines.get("pct")), "statements": _pct(statements.get("pct")), } return out def _check_coverage(files: Iterable[Path]) -> list[GateIssue]: issues: list[GateIssue] = [] go_cov = _parse_go_coverage(BUILD / "coverage-backend.out") fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json") for path in files: rel = _rel(path) if path.suffix == ".go": pct = go_cov.get(rel, 0.0) if pct < 95.0: issues.append(GateIssue("coverage", rel, f"go line coverage {pct:.2f}% < 95%")) elif path.suffix in {".ts", ".tsx"}: pct = fe_cov.get(rel, {}).get("lines", 0.0) if pct < 95.0: issues.append(GateIssue("coverage", rel, f"frontend line coverage {pct:.2f}% < 95%")) return issues def evaluate() -> GateReport: files = _production_files() issues = [] issues.extend(_go_exported_comment_issues(files)) issues.extend(_ts_export_comment_issues(files)) issues.extend(_check_loc(files)) issues.extend(_go_vet()) issues.extend(_tsc_check()) issues.extend(_check_coverage(files)) go_cov = _parse_go_coverage(BUILD / "coverage-backend.out") fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json") report = GateReport( ok=not issues, issues=issues, file_count=len(files), backend_coverage=go_cov, frontend_coverage=fe_cov, ) BUILD.mkdir(exist_ok=True) (BUILD / "gate-summary.json").write_text( json.dumps( { "ok": report.ok, "issues": [asdict(issue) for issue in report.issues], "file_count": report.file_count, "backend_coverage": report.backend_coverage, "frontend_coverage": report.frontend_coverage, }, indent=2, ) + "\n", encoding="utf-8", ) return report def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("mode", nargs="?", choices={"report", "enforce"}, default="enforce") args = parser.parse_args() report = evaluate() print(json.dumps({"ok": report.ok, "issues": [asdict(i) for i in report.issues]}, indent=2)) if args.mode == "enforce" and not report.ok: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())