#!/usr/bin/env python3 """Publish Pegasus test-suite results to Prometheus via Pushgateway. Inputs: - Backend JUnit XML and frontend JUnit XML - Backend/frontend coverage summaries Outputs pushed: - platform_quality_gate_runs_total{suite="pegasus",status="ok|failed"} - pegasus_quality_gate_tests_total{suite="pegasus",result=*} - platform_quality_gate_test_case_result{suite="pegasus",test=*,status=*} - pegasus_quality_gate_coverage_percent{suite="pegasus"} - platform_quality_gate_workspace_line_coverage_percent{suite="pegasus"} - platform_quality_gate_source_lines_over_500_total{suite="pegasus"} """ from __future__ import annotations import json import os import urllib.request import xml.etree.ElementTree as ET from pathlib import Path SOURCE_SCAN_ROOTS = ("backend", "frontend/src", "scripts", "testing") SOURCE_EXTENSIONS = {".go", ".py", ".ts", ".tsx", ".sh"} QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"} def _escape_label(value: str) -> str: return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_text(path: Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8") def _as_int(node: ET.Element, name: str) -> int: raw = node.attrib.get(name) or "0" try: return int(float(raw)) except ValueError: return 0 def _load_junit(path: Path) -> dict[str, int]: if not path.exists(): return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} tree = ET.parse(path) root = tree.getroot() suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = list(root.findall("testsuite")) else: suites = [] totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for suite in suites: totals["tests"] += _as_int(suite, "tests") totals["failures"] += _as_int(suite, "failures") totals["errors"] += _as_int(suite, "errors") totals["skipped"] += _as_int(suite, "skipped") return totals def _load_junit_cases(path: Path) -> list[tuple[str, str]]: if not path.exists(): return [] tree = ET.parse(path) root = tree.getroot() suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = list(root.findall("testsuite")) else: suites = [] cases: list[tuple[str, str]] = [] for suite in suites: for case in suite.findall("testcase"): name = (case.attrib.get("name") or "").strip() classname = (case.attrib.get("classname") or "").strip() if not name: continue test_id = f"{classname}::{name}" if classname else name status = "passed" if case.find("failure") is not None: status = "failed" elif case.find("error") is not None: status = "error" elif case.find("skipped") is not None: status = "skipped" cases.append((test_id, status)) return cases def _load_backend_coverage_percent(path: Path) -> float: if not path.exists(): return 0.0 try: return float(path.read_text(encoding="utf-8").strip()) except ValueError: return 0.0 def _load_frontend_coverage_percent(path: Path) -> float: if not path.exists(): return 0.0 payload = json.loads(path.read_text(encoding="utf-8")) total = payload.get("total") or {} lines = total.get("lines") or {} pct = lines.get("pct") if isinstance(pct, (int, float)): return float(pct) return 0.0 def _read_test_exit_code(path: Path) -> int: if not path.exists(): return 1 raw = path.read_text(encoding="utf-8").strip() try: return int(raw) except ValueError: return 1 def _load_gate_summary(path: Path) -> dict[str, object]: if not path.exists(): return {"ok": False, "issues": []} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {"ok": False, "issues": []} def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics") if not text: return 0.0 for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{k}="{v}"' not in line for k, v in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _read_http(url: str) -> str: try: with urllib.request.urlopen(url, timeout=10) as resp: return resp.read().decode("utf-8", errors="replace") except Exception: return "" def _post_text(url: str, payload: str) -> None: req = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) with urllib.request.urlopen(req, timeout=10) as resp: if resp.status >= 400: raise RuntimeError(f"push failed status={resp.status}") def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int: count = 0 for rel_root in SOURCE_SCAN_ROOTS: base = repo_root / rel_root if not base.exists(): continue for path in base.rglob("*"): if not path.is_file(): continue if path.suffix not in SOURCE_EXTENSIONS: continue lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines()) if lines > max_lines: count += 1 return count def _load_json(path: Path) -> dict | None: if not path.exists(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return None return payload if isinstance(payload, dict) else None def _sonarqube_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json")))) if not report: return "not_applicable" status_candidates = [ report.get("status"), ((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None), ((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None), ] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def _supply_chain_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json")))) if not report: return "not_applicable" compliant = report.get("compliant") if isinstance(compliant, bool): return "ok" if compliant else "failed" status_candidates = [report.get("status"), report.get("result"), report.get("compliance")] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def main() -> int: repo_root = Path(__file__).resolve().parents[1] build_dir = repo_root / "build" suite = os.getenv("SUITE_NAME", "pegasus") pushgateway_url = os.getenv( "PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091" ) backend_junit = Path(os.getenv("BACKEND_JUNIT_XML", "build/junit-backend.xml")) frontend_junit = Path(os.getenv("FRONTEND_JUNIT_XML", "build/junit-frontend.xml")) backend_cov = Path(os.getenv("BACKEND_COVERAGE_PERCENT_FILE", "build/coverage-backend-percent.txt")) frontend_cov = Path( os.getenv("FRONTEND_COVERAGE_JSON", "build/frontend-coverage/coverage-summary.json") ) backend_rc_file = Path(os.getenv("BACKEND_TEST_RC_FILE", "build/backend-test.rc")) frontend_rc_file = Path(os.getenv("FRONTEND_TEST_RC_FILE", "build/frontend-test.rc")) gate_summary = _load_gate_summary(Path(os.getenv("GATE_SUMMARY_FILE", "build/gate-summary.json"))) b = _load_junit(backend_junit) f = _load_junit(frontend_junit) test_cases = _load_junit_cases(backend_junit) + _load_junit_cases(frontend_junit) if not test_cases: test_cases = [("__no_test_cases__", "skipped")] totals = { "tests": b["tests"] + f["tests"], "failures": b["failures"] + f["failures"], "errors": b["errors"] + f["errors"], "skipped": b["skipped"] + f["skipped"], } passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0) backend_pct = _load_backend_coverage_percent(backend_cov) frontend_pct = _load_frontend_coverage_percent(frontend_cov) coverage_pct = (backend_pct + frontend_pct) / 2 if (backend_pct or frontend_pct) else 0.0 backend_rc = _read_test_exit_code(backend_rc_file) frontend_rc = _read_test_exit_code(frontend_rc_file) backend_suite_result = "passed" if backend_rc == 0 else "failed" frontend_suite_result = "passed" if frontend_rc == 0 else "failed" branch = os.getenv("BRANCH_NAME") or os.getenv("GIT_BRANCH") or "unknown" if branch.startswith("origin/"): branch = branch[len("origin/") :] build_number = os.getenv("BUILD_NUMBER", "") jenkins_job = os.getenv("JOB_NAME", "pegasus") commit = os.getenv("GIT_COMMIT", "") labels = { "suite": suite, "branch": branch, "build_number": build_number, "jenkins_job": jenkins_job, "commit": commit, } test_case_base_labels = { "suite": suite, "branch": branch, "build_number": build_number or "unknown", "jenkins_job": jenkins_job, } gate_ok = bool(gate_summary.get("ok")) gate_issues = gate_summary.get("issues") or [] source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500) outcome = ( "ok" if gate_ok and backend_rc == 0 and frontend_rc == 0 and totals["tests"] > 0 and totals["failures"] == 0 and totals["errors"] == 0 else "failed" ) checks = { "tests": "ok" if outcome == "ok" else "failed", "coverage": "ok" if coverage_pct >= 95.0 else "failed", "loc": "ok" if source_lines_over_500 == 0 else "failed", "docs_naming": "ok" if not gate_issues else "failed", "gate_glue": "ok", "sonarqube": _sonarqube_check_status(build_dir), "supply_chain": _supply_chain_check_status(build_dir), } job_name = "platform-quality-ci" ok_count = _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "ok"}, ) failed_count = _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "failed"}, ) if outcome == "ok": ok_count += 1 else: failed_count += 1 payload_lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}', "# TYPE pegasus_test_suite_result gauge", f'pegasus_test_suite_result{{test_suite="backend",status="{backend_suite_result}"}} 1', f'pegasus_test_suite_result{{test_suite="frontend",status="{frontend_suite_result}"}} 1', "# TYPE pegasus_quality_gate_tests_total gauge", f'pegasus_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}', f'pegasus_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}', f'pegasus_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}', f'pegasus_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}', "# TYPE pegasus_quality_gate_coverage_percent gauge", f'pegasus_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}', "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', "# TYPE pegasus_quality_gate_status gauge", f'pegasus_quality_gate_status{{suite="{suite}",result="{"ok" if gate_ok else "failed"}"}} 1', "# TYPE pegasus_quality_gate_issues_total gauge", f'pegasus_quality_gate_issues_total{{suite="{suite}"}} {len(gate_issues)}', "# TYPE platform_quality_gate_build_info gauge", f"platform_quality_gate_build_info{_label_str(labels)} 1", "# TYPE pegasus_quality_gate_checks_total gauge", "# TYPE platform_quality_gate_test_case_result gauge", "# TYPE pegasus_quality_gate_build_info gauge", f"pegasus_quality_gate_build_info{_label_str(labels)} 1", ] payload_lines.extend( f"platform_quality_gate_test_case_result{_label_str({**test_case_base_labels, 'test': test_name, 'status': test_status})} 1" for test_name, test_status in test_cases ) payload_lines.extend( f'pegasus_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1' for check_name, check_status in checks.items() ) payload = "\n".join(payload_lines) + "\n" push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) summary = { "suite": suite, "tests_total": totals["tests"], "tests_passed": passed, "tests_failed": totals["failures"], "tests_errors": totals["errors"], "tests_skipped": totals["skipped"], "coverage_percent": round(coverage_pct, 3), "source_lines_over_500": source_lines_over_500, "outcome": outcome, "backend_rc": backend_rc, "frontend_rc": frontend_rc, "ok_counter": ok_count, "failed_counter": failed_count, } Path("build/metrics-summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8") print(json.dumps(summary, indent=2)) return 0 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(f"metrics push failed: {exc}") raise