#!/usr/bin/env python3 """Publish Metis quality-gate test metrics to Pushgateway (Prometheus ingest).""" from __future__ import annotations import json import os from pathlib import Path import urllib.request import xml.etree.ElementTree as ET QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"} def _escape_label(value: str) -> str: return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _load_coverage(path: str) -> float: with open(path, "r", encoding="utf-8") as handle: payload = json.load(handle) summary = payload.get("summary") or {} percent = summary.get("percent_covered") if isinstance(percent, (int, float)): return float(percent) raise RuntimeError("coverage summary missing percent_covered") def _load_junit(path: str) -> dict[str, int]: tree = ET.parse(path) root = tree.getroot() def _as_int(node: ET.Element, name: str) -> int: raw = node.attrib.get(name) or "0" try: return int(float(raw)) except ValueError: return 0 if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = list(root.findall("testsuite")) else: suites = [] totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for suite in suites: totals["tests"] += _as_int(suite, "tests") totals["failures"] += _as_int(suite, "failures") totals["errors"] += _as_int(suite, "errors") totals["skipped"] += _as_int(suite, "skipped") return totals def _load_junit_cases(path: str) -> list[tuple[str, str]]: tree = ET.parse(path) root = tree.getroot() if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = list(root.findall("testsuite")) else: suites = [] cases: list[tuple[str, str]] = [] for suite in suites: for test_case in suite.findall("testcase"): case_name = (test_case.attrib.get("name") or "").strip() class_name = (test_case.attrib.get("classname") or "").strip() if not case_name: continue test_name = f"{class_name}::{case_name}" if class_name else case_name status = "passed" if test_case.find("failure") is not None or test_case.find("error") is not None: status = "failed" elif test_case.find("skipped") is not None: status = "skipped" cases.append((test_name, status)) return cases def _load_exit_code(path: str) -> int | None: if not path or not os.path.exists(path): return None with open(path, "r", encoding="utf-8") as handle: raw = handle.read().strip() if not raw: return None try: return int(raw) except ValueError as exc: raise RuntimeError(f"invalid test exit code {raw!r} in {path}") from exc def _post_text(url: str, payload: str) -> None: req = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) with urllib.request.urlopen(req, timeout=10) as resp: if resp.status >= 400: raise RuntimeError(f"metrics push failed status={resp.status}") def _read_http(url: str) -> str: try: with urllib.request.urlopen(url, timeout=10) as resp: return resp.read().decode("utf-8", errors="replace") except Exception: return "" def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics") if not text: return 0.0 for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{key}="{value}"' not in line for key, value in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int: """Count source files above the configured line budget.""" count = 0 for rel_root in ("cmd", "pkg", "scripts", "testing"): base = repo_root / rel_root if not base.exists(): continue for path in base.rglob("*"): if not path.is_file(): continue if path.suffix not in {".go", ".py", ".sh"}: continue lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines()) if lines > max_lines: count += 1 return count def _load_json(path: Path) -> dict | None: if not path.exists(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return None return payload if isinstance(payload, dict) else None def _sonarqube_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json")))) if not report: return "not_applicable" status_candidates = [ report.get("status"), ((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None), ((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None), ] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def _supply_chain_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json")))) if not report: return "not_applicable" compliant = report.get("compliant") if isinstance(compliant, bool): return "ok" if compliant else "failed" status_candidates = [report.get("status"), report.get("result"), report.get("compliance")] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def main() -> int: coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json") junit_path = os.getenv("JUNIT_XML", "build/junit.xml") test_exit_code_path = os.getenv("TEST_EXIT_CODE_PATH", "build/test.exitcode") docs_exit_code_path = os.getenv("DOCS_EXIT_CODE_PATH", "build/docs-naming.rc") pushgateway_url = os.getenv( "PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091" ).strip() suite = os.getenv("SUITE_NAME", "metis") branch = os.getenv("BRANCH_NAME", "") build_number = os.getenv("BUILD_NUMBER", "") commit = os.getenv("GIT_COMMIT", "") strict = os.getenv("METRICS_STRICT", "") == "1" repo_root = Path(__file__).resolve().parents[1] build_dir = repo_root / "build" if not os.path.exists(coverage_path): raise RuntimeError(f"missing coverage file {coverage_path}") if not os.path.exists(junit_path): raise RuntimeError(f"missing junit file {junit_path}") coverage = _load_coverage(coverage_path) totals = _load_junit(junit_path) test_cases = _load_junit_cases(junit_path) test_exit_code = _load_exit_code(test_exit_code_path) docs_exit_code = _load_exit_code(docs_exit_code_path) source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500) passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0) outcome = "ok" if ( (test_exit_code is not None and test_exit_code != 0) or totals["tests"] <= 0 or totals["failures"] > 0 or totals["errors"] > 0 ): outcome = "failed" checks = { "tests": "ok" if outcome == "ok" else "failed", "coverage": "ok" if coverage >= 95.0 else "failed", "loc": "ok" if source_lines_over_500 == 0 else "failed", "docs_naming": "ok" if docs_exit_code == 0 else "failed", "gate_glue": "ok", "sonarqube": _sonarqube_check_status(build_dir), "supply_chain": _supply_chain_check_status(build_dir), } ok_count = _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": "platform-quality-ci", "suite": suite, "status": "ok"}, ) failed_count = _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": "platform-quality-ci", "suite": suite, "status": "failed"}, ) if outcome == "ok": ok_count += 1 else: failed_count += 1 labels = { "job": "platform-quality-ci", "suite": suite, "branch": branch, "build_number": build_number, "commit": commit, } payload_lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}', "# TYPE metis_quality_gate_tests_total gauge", f'metis_quality_gate_tests_total{{suite="{suite}",result="total"}} {totals["tests"]}', f'metis_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}', f'metis_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}', f'metis_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}', f'metis_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}', "# TYPE metis_quality_gate_run_status gauge", f'metis_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if outcome == "ok" else 0}', f'metis_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if outcome == "failed" else 0}', "# TYPE metis_quality_gate_coverage_percent gauge", f'metis_quality_gate_coverage_percent{{suite="{suite}"}} {coverage:.3f}', "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', "# TYPE metis_quality_gate_checks_total gauge", "# TYPE platform_quality_gate_test_case_result gauge", "# TYPE metis_quality_gate_build_info gauge", f"metis_quality_gate_build_info{_label_str(labels)} 1", ] if test_cases: payload_lines.extend( f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1' for test_name, test_status in test_cases ) else: payload_lines.append( f'platform_quality_gate_test_case_result{{suite="{suite}",test="__no_test_cases__",status="skipped"}} 1' ) payload_lines.extend( f'metis_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1' for check_name, check_status in checks.items() ) payload = "\n".join(payload_lines) + "\n" try: _post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{labels['job']}/suite/{suite}", payload) except Exception as exc: print(f"metrics push failed: {exc}") if strict: raise print( json.dumps( { "suite": suite, "outcome": outcome, "tests_total": totals["tests"], "tests_passed": passed, "tests_failed": totals["failures"], "tests_errors": totals["errors"], "tests_skipped": totals["skipped"], "coverage_percent": round(coverage, 3), "source_lines_over_500": source_lines_over_500, "test_exit_code": test_exit_code, }, indent=2, ) ) return 0 if __name__ == "__main__": raise SystemExit(main())