pegasus/scripts/publish_test_metrics.py

419 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Publish Pegasus test-suite results to Prometheus via Pushgateway.
Inputs:
- Backend JUnit XML and frontend JUnit XML
- Backend/frontend coverage summaries
Outputs pushed:
- platform_quality_gate_runs_total{suite="pegasus",status="ok|failed"}
- pegasus_quality_gate_tests_total{suite="pegasus",result=*}
- platform_quality_gate_test_case_result{suite="pegasus",test=*,status=*}
- pegasus_quality_gate_coverage_percent{suite="pegasus"}
- platform_quality_gate_workspace_line_coverage_percent{suite="pegasus"}
- platform_quality_gate_source_lines_over_500_total{suite="pegasus"}
"""
from __future__ import annotations
import json
import os
import urllib.request
import xml.etree.ElementTree as ET
from pathlib import Path
SOURCE_SCAN_ROOTS = ("backend", "frontend/src", "scripts", "testing")
SOURCE_EXTENSIONS = {".go", ".py", ".ts", ".tsx", ".sh"}
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
STYLE_ISSUE_CHECKS = {"go-doc", "ts-doc", "go-vet", "tsc", "docs", "naming", "docs_naming", "hygiene", "lint"}
def _escape_label(value: str) -> str:
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def _label_str(labels: dict[str, str]) -> str:
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
return "{" + ",".join(parts) + "}" if parts else ""
def _read_text(path: Path) -> str:
if not path.exists():
return ""
return path.read_text(encoding="utf-8")
def _as_int(node: ET.Element, name: str) -> int:
raw = node.attrib.get(name) or "0"
try:
return int(float(raw))
except ValueError:
return 0
def _load_junit(path: Path) -> dict[str, int]:
if not path.exists():
return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
tree = ET.parse(path)
root = tree.getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = list(root.findall("testsuite"))
else:
suites = []
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
for suite in suites:
totals["tests"] += _as_int(suite, "tests")
totals["failures"] += _as_int(suite, "failures")
totals["errors"] += _as_int(suite, "errors")
totals["skipped"] += _as_int(suite, "skipped")
return totals
def _load_junit_cases(path: Path) -> list[tuple[str, str]]:
if not path.exists():
return []
tree = ET.parse(path)
root = tree.getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = list(root.findall("testsuite"))
else:
suites = []
cases: list[tuple[str, str]] = []
for suite in suites:
for case in suite.findall("testcase"):
name = (case.attrib.get("name") or "").strip()
classname = (case.attrib.get("classname") or "").strip()
if not name:
continue
test_id = f"{classname}::{name}" if classname else name
status = "passed"
if case.find("failure") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
status = "skipped"
cases.append((test_id, status))
return cases
def _load_backend_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
try:
return float(path.read_text(encoding="utf-8").strip())
except ValueError:
return 0.0
def _load_frontend_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
payload = json.loads(path.read_text(encoding="utf-8"))
total = payload.get("total") or {}
lines = total.get("lines") or {}
pct = lines.get("pct")
if isinstance(pct, (int, float)):
return float(pct)
return 0.0
def _read_test_exit_code(path: Path) -> int:
if not path.exists():
return 1
raw = path.read_text(encoding="utf-8").strip()
try:
return int(raw)
except ValueError:
return 1
2026-04-11 00:02:59 -03:00
def _load_gate_summary(path: Path) -> dict[str, object]:
if not path.exists():
return {"ok": False, "issues": []}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {"ok": False, "issues": []}
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
if not text:
return 0.0
for line in text.splitlines():
if not line.startswith(metric + "{"):
continue
if any(f'{k}="{v}"' not in line for k, v in labels.items()):
continue
parts = line.split()
if len(parts) < 2:
continue
try:
return float(parts[1])
except ValueError:
return 0.0
return 0.0
def _read_http(url: str) -> str:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception:
return ""
def _post_text(url: str, payload: str) -> None:
req = urllib.request.Request(
url,
data=payload.encode("utf-8"),
method="PUT",
headers={"Content-Type": "text/plain"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status >= 400:
raise RuntimeError(f"push failed status={resp.status}")
def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int:
count = 0
for rel_root in SOURCE_SCAN_ROOTS:
base = repo_root / rel_root
if not base.exists():
continue
for path in base.rglob("*"):
if not path.is_file():
continue
if path.suffix not in SOURCE_EXTENSIONS:
continue
if path.name.endswith("_test.go") or path.name.endswith(".test.ts") or path.name.endswith(".test.tsx"):
continue
lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines())
if lines > max_lines:
count += 1
return count
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def main() -> int:
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
suite = os.getenv("SUITE_NAME", "pegasus")
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
)
backend_junit = Path(os.getenv("BACKEND_JUNIT_XML", "build/junit-backend.xml"))
frontend_junit = Path(os.getenv("FRONTEND_JUNIT_XML", "build/junit-frontend.xml"))
backend_cov = Path(os.getenv("BACKEND_COVERAGE_PERCENT_FILE", "build/coverage-backend-percent.txt"))
frontend_cov = Path(
os.getenv("FRONTEND_COVERAGE_JSON", "build/frontend-coverage/coverage-summary.json")
)
backend_rc_file = Path(os.getenv("BACKEND_TEST_RC_FILE", "build/backend-test.rc"))
frontend_rc_file = Path(os.getenv("FRONTEND_TEST_RC_FILE", "build/frontend-test.rc"))
2026-04-11 00:02:59 -03:00
gate_summary = _load_gate_summary(Path(os.getenv("GATE_SUMMARY_FILE", "build/gate-summary.json")))
b = _load_junit(backend_junit)
f = _load_junit(frontend_junit)
test_cases = _load_junit_cases(backend_junit) + _load_junit_cases(frontend_junit)
if not test_cases:
test_cases = [("__no_test_cases__", "skipped")]
totals = {
"tests": b["tests"] + f["tests"],
"failures": b["failures"] + f["failures"],
"errors": b["errors"] + f["errors"],
"skipped": b["skipped"] + f["skipped"],
}
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
backend_pct = _load_backend_coverage_percent(backend_cov)
frontend_pct = _load_frontend_coverage_percent(frontend_cov)
coverage_pct = (backend_pct + frontend_pct) / 2 if (backend_pct or frontend_pct) else 0.0
backend_rc = _read_test_exit_code(backend_rc_file)
frontend_rc = _read_test_exit_code(frontend_rc_file)
2026-04-11 00:02:59 -03:00
backend_suite_result = "passed" if backend_rc == 0 else "failed"
frontend_suite_result = "passed" if frontend_rc == 0 else "failed"
branch = os.getenv("BRANCH_NAME") or os.getenv("GIT_BRANCH") or "unknown"
if branch.startswith("origin/"):
branch = branch[len("origin/") :]
2026-04-11 00:02:59 -03:00
build_number = os.getenv("BUILD_NUMBER", "")
jenkins_job = os.getenv("JOB_NAME", "pegasus")
2026-04-11 00:02:59 -03:00
commit = os.getenv("GIT_COMMIT", "")
labels = {
"suite": suite,
"branch": branch,
"build_number": build_number,
"jenkins_job": jenkins_job,
2026-04-11 00:02:59 -03:00
"commit": commit,
}
test_case_base_labels = {
"suite": suite,
"branch": branch,
"build_number": build_number or "unknown",
"jenkins_job": jenkins_job,
}
2026-04-11 00:02:59 -03:00
gate_ok = bool(gate_summary.get("ok"))
gate_issues = gate_summary.get("issues") or []
source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500)
issue_checks = {
str(issue.get("check") or "").strip().lower()
for issue in gate_issues
if isinstance(issue, dict)
}
tests_ok = (
backend_rc == 0
and frontend_rc == 0
and totals["tests"] > 0
and totals["failures"] == 0
and totals["errors"] == 0
)
outcome = "ok" if gate_ok and tests_ok else "failed"
checks = {
"tests": "ok" if tests_ok else "failed",
"coverage": "ok" if coverage_pct >= 95.0 and "coverage" not in issue_checks else "failed",
"loc": "ok" if source_lines_over_500 == 0 and "loc" not in issue_checks else "failed",
"docs_naming": "ok" if not (issue_checks & STYLE_ISSUE_CHECKS) else "failed",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
job_name = "platform-quality-ci"
ok_count = _fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "ok"},
)
failed_count = _fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "failed"},
)
if outcome == "ok":
ok_count += 1
else:
failed_count += 1
payload_lines = [
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
2026-04-11 00:02:59 -03:00
"# TYPE pegasus_test_suite_result gauge",
f'pegasus_test_suite_result{{test_suite="backend",status="{backend_suite_result}"}} 1',
f'pegasus_test_suite_result{{test_suite="frontend",status="{frontend_suite_result}"}} 1',
"# TYPE pegasus_quality_gate_tests_total gauge",
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
"# TYPE pegasus_quality_gate_coverage_percent gauge",
f'pegasus_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}',
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
2026-04-11 00:02:59 -03:00
"# TYPE pegasus_quality_gate_status gauge",
f'pegasus_quality_gate_status{{suite="{suite}",result="{"ok" if gate_ok else "failed"}"}} 1',
"# TYPE pegasus_quality_gate_issues_total gauge",
f'pegasus_quality_gate_issues_total{{suite="{suite}"}} {len(gate_issues)}',
"# TYPE platform_quality_gate_build_info gauge",
f"platform_quality_gate_build_info{_label_str(labels)} 1",
"# TYPE pegasus_quality_gate_checks_total gauge",
"# TYPE platform_quality_gate_test_case_result gauge",
"# TYPE pegasus_quality_gate_build_info gauge",
f"pegasus_quality_gate_build_info{_label_str(labels)} 1",
]
payload_lines.extend(
f"platform_quality_gate_test_case_result{_label_str({**test_case_base_labels, 'test': test_name, 'status': test_status})} 1"
for test_name, test_status in test_cases
)
payload_lines.extend(
f'pegasus_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
for check_name, check_status in checks.items()
)
payload = "\n".join(payload_lines) + "\n"
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
_post_text(push_url, payload)
summary = {
"suite": suite,
"tests_total": totals["tests"],
"tests_passed": passed,
"tests_failed": totals["failures"],
"tests_errors": totals["errors"],
"tests_skipped": totals["skipped"],
"coverage_percent": round(coverage_pct, 3),
"source_lines_over_500": source_lines_over_500,
"outcome": outcome,
"backend_rc": backend_rc,
"frontend_rc": frontend_rc,
"ok_counter": ok_count,
"failed_counter": failed_count,
}
Path("build/metrics-summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")
print(json.dumps(summary, indent=2))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"metrics push failed: {exc}")
raise