2026-04-11 00:02:59 -03:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""Evaluate Pegasus quality checks from test artifacts and source files."""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
import subprocess
|
|
|
|
|
import sys
|
|
|
|
|
from dataclasses import dataclass, asdict
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Iterable
|
|
|
|
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
BACKEND = ROOT / "backend"
|
|
|
|
|
FRONTEND = ROOT / "frontend"
|
|
|
|
|
BUILD = ROOT / "build"
|
|
|
|
|
|
|
|
|
|
PROD_EXTS = {".go", ".ts", ".tsx", ".py"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class GateIssue:
|
|
|
|
|
check: str
|
|
|
|
|
path: str
|
|
|
|
|
detail: str
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class GateReport:
|
|
|
|
|
ok: bool
|
|
|
|
|
issues: list[GateIssue]
|
|
|
|
|
file_count: int
|
|
|
|
|
backend_coverage: dict[str, float]
|
|
|
|
|
frontend_coverage: dict[str, dict[str, float]]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run(cmd: list[str], cwd: Path | None = None) -> tuple[int, str]:
|
|
|
|
|
proc = subprocess.run(cmd, cwd=cwd or ROOT, capture_output=True, text=True)
|
|
|
|
|
return proc.returncode, proc.stdout + proc.stderr
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _rel(path: Path) -> str:
|
|
|
|
|
return str(path.resolve().relative_to(ROOT))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _production_files() -> list[Path]:
|
|
|
|
|
files: list[Path] = []
|
|
|
|
|
for path in ROOT.rglob("*"):
|
|
|
|
|
if not path.is_file():
|
|
|
|
|
continue
|
|
|
|
|
if path.name.endswith("_test.go"):
|
|
|
|
|
continue
|
|
|
|
|
if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"):
|
|
|
|
|
continue
|
|
|
|
|
if path.name == "AGENTS.md":
|
|
|
|
|
continue
|
|
|
|
|
if path.suffix not in PROD_EXTS and path.name not in {"Dockerfile", "Jenkinsfile"}:
|
|
|
|
|
continue
|
|
|
|
|
if any(part in {".git", "node_modules", "dist", "coverage", "build"} for part in path.parts):
|
|
|
|
|
continue
|
|
|
|
|
rel = path.relative_to(ROOT)
|
|
|
|
|
if rel.parts[0] == "backend" and path.suffix == ".go" and path.name.endswith(".go"):
|
|
|
|
|
files.append(path)
|
|
|
|
|
elif rel.parts[0] == "frontend" and len(rel.parts) >= 3 and rel.parts[1] == "src" and path.suffix in {".ts", ".tsx"}:
|
|
|
|
|
if rel.parts[2] in {"test", "types"}:
|
|
|
|
|
continue
|
|
|
|
|
if path.name.endswith(".test.ts") or path.name.endswith(".test.tsx") or path.name.endswith(".d.ts"):
|
|
|
|
|
continue
|
|
|
|
|
files.append(path)
|
|
|
|
|
elif path.name in {"Dockerfile", "Jenkinsfile"}:
|
|
|
|
|
files.append(path)
|
|
|
|
|
elif rel.parts[0] == "scripts" and path.suffix == ".py":
|
|
|
|
|
files.append(path)
|
|
|
|
|
return sorted(set(files))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _line_count(path: Path) -> int:
|
|
|
|
|
return len(path.read_text(encoding="utf-8").splitlines())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_loc(files: Iterable[Path]) -> list[GateIssue]:
|
|
|
|
|
issues: list[GateIssue] = []
|
|
|
|
|
for path in files:
|
|
|
|
|
if _line_count(path) > 500:
|
|
|
|
|
issues.append(GateIssue("loc", _rel(path), f"{_line_count(path)} lines exceeds 500"))
|
|
|
|
|
return issues
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _go_exported_comment_issues(files: Iterable[Path]) -> list[GateIssue]:
|
|
|
|
|
issues: list[GateIssue] = []
|
|
|
|
|
for path in files:
|
|
|
|
|
if path.suffix != ".go" or path.name.endswith("_test.go"):
|
|
|
|
|
continue
|
|
|
|
|
lines = path.read_text(encoding="utf-8").splitlines()
|
|
|
|
|
for idx, line in enumerate(lines):
|
|
|
|
|
m = re.match(r"^(func|type|var|const)\s+([A-Z][A-Za-z0-9_]*)", line.strip())
|
|
|
|
|
if not m:
|
|
|
|
|
continue
|
|
|
|
|
prev = idx - 1
|
|
|
|
|
while prev >= 0 and not lines[prev].strip():
|
|
|
|
|
prev -= 1
|
|
|
|
|
if prev < 0 or not lines[prev].lstrip().startswith("//"):
|
|
|
|
|
issues.append(GateIssue("go-doc", _rel(path), f"exported {m.group(1)} {m.group(2)} lacks leading comment"))
|
|
|
|
|
return issues
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ts_export_comment_issues(files: Iterable[Path]) -> list[GateIssue]:
|
|
|
|
|
issues: list[GateIssue] = []
|
|
|
|
|
for path in files:
|
|
|
|
|
if path.suffix not in {".ts", ".tsx"} or path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"):
|
|
|
|
|
continue
|
|
|
|
|
lines = path.read_text(encoding="utf-8").splitlines()
|
|
|
|
|
for idx, line in enumerate(lines):
|
|
|
|
|
if not re.match(r"^\s*export\s+(function|const|type)\s+[A-Z_a-z]", line):
|
|
|
|
|
continue
|
|
|
|
|
prev = idx - 1
|
|
|
|
|
while prev >= 0 and not lines[prev].strip():
|
|
|
|
|
prev -= 1
|
|
|
|
|
if prev < 0 or not lines[prev].lstrip().startswith("/**"):
|
|
|
|
|
issues.append(GateIssue("ts-doc", _rel(path), "exported symbol lacks JSDoc contract"))
|
|
|
|
|
return issues
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _go_vet() -> list[GateIssue]:
|
|
|
|
|
rc, output = _run(["go", "vet", "./..."], cwd=BACKEND)
|
|
|
|
|
if rc == 0:
|
|
|
|
|
return []
|
|
|
|
|
return [GateIssue("go-vet", "backend", output.strip() or "go vet failed")]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _tsc_check() -> list[GateIssue]:
|
|
|
|
|
rc, output = _run(["npm", "exec", "tsc", "--", "--noEmit"], cwd=FRONTEND)
|
|
|
|
|
if rc == 0:
|
|
|
|
|
return []
|
|
|
|
|
return [GateIssue("tsc", "frontend", output.strip() or "tsc failed")]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_go_coverage(path: Path) -> dict[str, float]:
|
|
|
|
|
if not path.exists():
|
|
|
|
|
return {}
|
|
|
|
|
_, output = _run(["go", "tool", "cover", "-func", str(path)], cwd=BACKEND)
|
|
|
|
|
cover: dict[str, float] = {}
|
|
|
|
|
for line in output.splitlines():
|
|
|
|
|
if "\t" not in line or line.startswith("total:"):
|
|
|
|
|
continue
|
|
|
|
|
parts = line.split("\t")
|
|
|
|
|
file_part = parts[0].split(":")[0]
|
|
|
|
|
pct = float(parts[-1].rstrip("%"))
|
|
|
|
|
file_norm = file_part.replace("\\", "/")
|
|
|
|
|
if "/backend/" in file_norm:
|
|
|
|
|
file_norm = "backend/" + file_norm.split("/backend/", 1)[1]
|
|
|
|
|
elif file_norm.startswith("backend/"):
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
file_norm = file_norm.rsplit("/", 1)[-1]
|
|
|
|
|
cover[file_norm] = pct
|
|
|
|
|
return cover
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_frontend_coverage(path: Path) -> dict[str, dict[str, float]]:
|
|
|
|
|
if not path.exists():
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
def _pct(value: object) -> float:
|
|
|
|
|
if isinstance(value, (int, float)):
|
|
|
|
|
return float(value)
|
|
|
|
|
if isinstance(value, str):
|
|
|
|
|
match = re.search(r"[0-9]+(?:\.[0-9]+)?", value)
|
|
|
|
|
if match:
|
|
|
|
|
return float(match.group(0))
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
def _normalize_key(raw: str) -> str:
|
|
|
|
|
key = raw.replace("\\", "/")
|
|
|
|
|
if key.startswith("./"):
|
|
|
|
|
key = key[2:]
|
|
|
|
|
for prefix in (str(ROOT).replace("\\", "/") + "/", str(FRONTEND).replace("\\", "/") + "/"):
|
|
|
|
|
if key.startswith(prefix):
|
|
|
|
|
key = key[len(prefix):]
|
|
|
|
|
break
|
|
|
|
|
key = key.lstrip("/")
|
|
|
|
|
if key.startswith("frontend/"):
|
|
|
|
|
return key
|
|
|
|
|
return f"frontend/{key}"
|
|
|
|
|
|
|
|
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
out: dict[str, dict[str, float]] = {}
|
|
|
|
|
for file_path, summary in payload.items():
|
|
|
|
|
if file_path == "total":
|
|
|
|
|
continue
|
|
|
|
|
lines = summary.get("lines") or {}
|
|
|
|
|
statements = summary.get("statements") or {}
|
|
|
|
|
out[_normalize_key(str(file_path))] = {
|
|
|
|
|
"lines": _pct(lines.get("pct")),
|
|
|
|
|
"statements": _pct(statements.get("pct")),
|
|
|
|
|
}
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_coverage(files: Iterable[Path]) -> list[GateIssue]:
|
|
|
|
|
issues: list[GateIssue] = []
|
|
|
|
|
go_cov = _parse_go_coverage(BUILD / "coverage-backend.out")
|
|
|
|
|
fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json")
|
|
|
|
|
|
|
|
|
|
for path in files:
|
|
|
|
|
rel = _rel(path)
|
|
|
|
|
if path.suffix == ".go":
|
|
|
|
|
pct = go_cov.get(rel, 0.0)
|
|
|
|
|
if pct < 95.0:
|
|
|
|
|
issues.append(GateIssue("coverage", rel, f"go line coverage {pct:.2f}% < 95%"))
|
|
|
|
|
elif path.suffix in {".ts", ".tsx"}:
|
|
|
|
|
pct = fe_cov.get(rel, {}).get("lines", 0.0)
|
|
|
|
|
if pct < 95.0:
|
|
|
|
|
issues.append(GateIssue("coverage", rel, f"frontend line coverage {pct:.2f}% < 95%"))
|
|
|
|
|
return issues
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def evaluate() -> GateReport:
|
|
|
|
|
files = _production_files()
|
|
|
|
|
issues = []
|
|
|
|
|
issues.extend(_go_exported_comment_issues(files))
|
|
|
|
|
issues.extend(_ts_export_comment_issues(files))
|
2026-04-22 01:24:34 -03:00
|
|
|
issues.extend(_check_loc(files))
|
2026-04-11 00:02:59 -03:00
|
|
|
issues.extend(_go_vet())
|
|
|
|
|
issues.extend(_tsc_check())
|
|
|
|
|
issues.extend(_check_coverage(files))
|
|
|
|
|
|
|
|
|
|
go_cov = _parse_go_coverage(BUILD / "coverage-backend.out")
|
|
|
|
|
fe_cov = _parse_frontend_coverage(BUILD / "frontend-coverage" / "coverage-summary.json")
|
|
|
|
|
report = GateReport(
|
|
|
|
|
ok=not issues,
|
|
|
|
|
issues=issues,
|
|
|
|
|
file_count=len(files),
|
|
|
|
|
backend_coverage=go_cov,
|
|
|
|
|
frontend_coverage=fe_cov,
|
|
|
|
|
)
|
|
|
|
|
BUILD.mkdir(exist_ok=True)
|
|
|
|
|
(BUILD / "gate-summary.json").write_text(
|
|
|
|
|
json.dumps(
|
|
|
|
|
{
|
|
|
|
|
"ok": report.ok,
|
|
|
|
|
"issues": [asdict(issue) for issue in report.issues],
|
|
|
|
|
"file_count": report.file_count,
|
|
|
|
|
"backend_coverage": report.backend_coverage,
|
|
|
|
|
"frontend_coverage": report.frontend_coverage,
|
|
|
|
|
},
|
|
|
|
|
indent=2,
|
|
|
|
|
)
|
|
|
|
|
+ "\n",
|
|
|
|
|
encoding="utf-8",
|
|
|
|
|
)
|
|
|
|
|
return report
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> int:
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument("mode", nargs="?", choices={"report", "enforce"}, default="enforce")
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
report = evaluate()
|
|
|
|
|
print(json.dumps({"ok": report.ok, "issues": [asdict(i) for i in report.issues]}, indent=2))
|
|
|
|
|
if args.mode == "enforce" and not report.ok:
|
|
|
|
|
return 1
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
raise SystemExit(main())
|