pegasus/testing/pegasus_gate.py

271 lines
9.0 KiB
Python
Raw Permalink Normal View History

2026-04-11 00:02:59 -03:00
#!/usr/bin/env python3
"""Evaluate Pegasus quality checks from test artifacts and source files."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Iterable
ROOT = Path(__file__).resolve().parents[1]
BACKEND = ROOT / "backend"
FRONTEND = ROOT / "frontend"
BUILD = ROOT / "build"
PROD_EXTS = {".go", ".ts", ".tsx", ".py"}
@dataclass
class GateIssue:
check: str
path: str
detail: str
@dataclass
class GateReport:
ok: bool
issues: list[GateIssue]
file_count: int
backend_coverage: dict[str, float]
frontend_coverage: dict[str, dict[str, float]]
def _run(cmd: list[str], cwd: Path | None = None) -> tuple[int, str]:
proc = subprocess.run(cmd, cwd=cwd or ROOT, capture_output=True, text=True)
return proc.returncode, proc.stdout + proc.stderr
def _rel(path: Path) -> str:
return str(path.resolve().relative_to(ROOT))
def _production_files() -> list[Path]:
files: list[Path] = []
for path in ROOT.rglob("*"):
if not path.is_file():
continue
if path.name.endswith("_test.go"):
continue
if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"):
continue
if path.name == "AGENTS.md":
continue
if path.suffix not in PROD_EXTS and path.name not in {"Dockerfile", "Jenkinsfile"}:
continue
if any(part in {".git", "node_modules", "dist", "coverage", "build"} for part in path.parts):
continue
rel = path.relative_to(ROOT)
if rel.parts[0] == "backend" and path.suffix == ".go" and path.name.endswith(".go"):
files.append(path)
elif rel.parts[0] == "frontend" and len(rel.parts) >= 3 and rel.parts[1] == "src" and path.suffix in {".ts", ".tsx"}:
if rel.parts[2] in {"test", "types"}:
continue
if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx") or path.name.endswith(".d.ts"):
continue
files.append(path)
elif path.name in {"Dockerfile", "Jenkinsfile"}:
files.append(path)
elif rel.parts[0] == "scripts" and path.suffix == ".py":
files.append(path)
return sorted(set(files))
def _line_count(path: Path) -> int:
return len(path.read_text(encoding="utf-8").splitlines())
def _check_loc(files: Iterable[Path]) -> list[GateIssue]:
issues: list[GateIssue] = []
for path in files:
if _line_count(path) > 500:
issues.append(GateIssue("loc", _rel(path), f"{_line_count(path)} lines exceeds 500"))
return issues
def _go_exported_comment_issues(files: Iterable[Path]) -> list[GateIssue]:
issues: list[GateIssue] = []
for path in files:
if path.suffix != ".go" or path.name.endswith("_test.go"):
continue
lines = path.read_text(encoding="utf-8").splitlines()
for idx, line in enumerate(lines):
m = re.match(r"^(func|type|var|const)\s+([A-Z][A-Za-z0-9_]*)", line.strip())
if not m:
continue
prev = idx - 1
while prev >= 0 and not lines[prev].strip():
prev -= 1
if prev < 0 or not lines[prev].lstrip().startswith("//"):
issues.append(GateIssue("go-doc", _rel(path), f"exported {m.group(1)} {m.group(2)} lacks leading comment"))
return issues
def _ts_export_comment_issues(files: Iterable[Path]) -> list[GateIssue]:
issues: list[GateIssue] = []
for path in files:
if path.suffix not in {".ts", ".tsx"} or path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"):
continue
lines = path.read_text(encoding="utf-8").splitlines()
for idx, line in enumerate(lines):
if not re.match(r"^\s*export\s+(function|const|type)\s+[A-Z_a-z]", line):
continue
prev = idx - 1
while prev >= 0 and not lines[prev].strip():
prev -= 1
if prev < 0 or not lines[prev].lstrip().startswith("/**"):
issues.append(GateIssue("ts-doc", _rel(path), "exported symbol lacks JSDoc contract"))
return issues
def _go_vet() -> list[GateIssue]:
rc, output = _run(["go", "vet", "./..."], cwd=BACKEND)
if rc == 0:
return []
return [GateIssue("go-vet", "backend", output.strip() or "go vet failed")]
def _tsc_check() -> list[GateIssue]:
rc, output = _run(["npm", "exec", "tsc", "--", "--noEmit"], cwd=FRONTEND)
if rc == 0:
return []
return [GateIssue("tsc", "frontend", output.strip() or "tsc failed")]
def _parse_go_coverage(path: Path) -> dict[str, float]:
if not path.exists():
return {}
_, output = _run(["go", "tool", "cover", "-func", str(path)], cwd=BACKEND)
cover: dict[str, float] = {}
for line in output.splitlines():
if "\t" not in line or line.startswith("total:"):
continue
parts = line.split("\t")
file_part = parts[0].split(":")[0]
pct = float(parts[-1].rstrip("%"))
file_norm = file_part.replace("\\", "/")
if "/backend/" in file_norm:
file_norm = "backend/" + file_norm.split("/backend/", 1)[1]
elif file_norm.startswith("backend/"):
pass
else:
file_norm = file_norm.rsplit("/", 1)[-1]
cover[file_norm] = pct
return cover
def _parse_frontend_coverage(path: Path) -> dict[str, dict[str, float]]:
if not path.exists():
return {}
def _pct(value: object) -> float:
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
match = re.search(r"[0-9]+(?:\.[0-9]+)?", value)
if match:
return float(match.group(0))
return 0.0
def _normalize_key(raw: str) -> str:
key = raw.replace("\\", "/")
if key.startswith("./"):
key = key[2:]
for prefix in (str(ROOT).replace("\\", "/") + "/", str(FRONTEND).replace("\\", "/") + "/"):
if key.startswith(prefix):
key = key[len(prefix):]
break
key = key.lstrip("/")
if key.startswith("frontend/"):
return key
return f"frontend/{key}"
payload = json.loads(path.read_text(encoding="utf-8"))
out: dict[str, dict[str, float]] = {}
for file_path, summary in payload.items():
if file_path == "total":
continue
lines = summary.get("lines") or {}
statements = summary.get("statements") or {}
out[_normalize_key(str(file_path))] = {
"lines": _pct(lines.get("pct")),
"statements": _pct(statements.get("pct")),
}
return out
def _check_coverage(files: Iterable[Path]) -> list[GateIssue]:
issues: list[GateIssue] = []
go_cov = _parse_go_coverage(BUILD / "coverage-backend.out")
fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json")
for path in files:
rel = _rel(path)
if path.suffix == ".go":
pct = go_cov.get(rel, 0.0)
if pct < 95.0:
issues.append(GateIssue("coverage", rel, f"go line coverage {pct:.2f}% < 95%"))
elif path.suffix in {".ts", ".tsx"}:
pct = fe_cov.get(rel, {}).get("lines", 0.0)
if pct < 95.0:
issues.append(GateIssue("coverage", rel, f"frontend line coverage {pct:.2f}% < 95%"))
return issues
def evaluate() -> GateReport:
files = _production_files()
issues = []
issues.extend(_go_exported_comment_issues(files))
issues.extend(_ts_export_comment_issues(files))
2026-04-22 01:24:34 -03:00
issues.extend(_check_loc(files))
2026-04-11 00:02:59 -03:00
issues.extend(_go_vet())
issues.extend(_tsc_check())
issues.extend(_check_coverage(files))
go_cov = _parse_go_coverage(BUILD / "coverage-backend.out")
fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json")
report = GateReport(
ok=not issues,
issues=issues,
file_count=len(files),
backend_coverage=go_cov,
frontend_coverage=fe_cov,
)
BUILD.mkdir(exist_ok=True)
(BUILD / "gate-summary.json").write_text(
json.dumps(
{
"ok": report.ok,
"issues": [asdict(issue) for issue in report.issues],
"file_count": report.file_count,
"backend_coverage": report.backend_coverage,
"frontend_coverage": report.frontend_coverage,
},
indent=2,
)
+ "\n",
encoding="utf-8",
)
return report
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("mode", nargs="?", choices={"report", "enforce"}, default="enforce")
args = parser.parse_args()
report = evaluate()
print(json.dumps({"ok": report.ok, "issues": [asdict(i) for i in report.issues]}, indent=2))
if args.mode == "enforce" and not report.ok:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())