#!/usr/bin/env python3 """Publish titan-iac quality-gate results to Pushgateway.""" from __future__ import annotations import json import os from glob import glob import urllib.error import urllib.request import xml.etree.ElementTree as ET def _escape_label(value: str) -> str: """Escape a Prometheus label value without changing its content.""" return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: """Render a stable Prometheus label set from a mapping.""" parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_text(url: str) -> str: """Fetch a plain-text response body from the given URL.""" with urllib.request.urlopen(url, timeout=10) as response: return response.read().decode("utf-8") def _post_text(url: str, payload: str) -> None: """PUT a plain-text payload and fail on any 4xx/5xx response.""" request = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) with urllib.request.urlopen(request, timeout=10) as response: if response.status >= 400: raise RuntimeError(f"push failed with status={response.status}") def _parse_junit(path: str) -> dict[str, int]: """Parse a JUnit XML file into aggregate test counters.""" if not os.path.exists(path): return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} tree = ET.parse(path) root = tree.getroot() totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = [elem for elem in root if elem.tag == "testsuite"] else: suites = [] for suite in suites: for key in totals: raw_value = suite.attrib.get(key, "0") try: totals[key] += int(float(raw_value)) except ValueError: totals[key] += 0 return totals def _collect_junit_totals(pattern: str) -> dict[str, int]: """Sum JUnit counters across every XML file matching the pattern.""" totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for path in sorted(glob(pattern)): parsed = _parse_junit(path) for key in totals: totals[key] += parsed[key] return totals def _read_exit_code(path: str) -> int: """Read the quality-gate exit code, defaulting to failure if missing.""" try: with open(path, "r", encoding="utf-8") as handle: return int(handle.read().strip()) except (FileNotFoundError, ValueError): return 1 def _load_summary(path: str) -> dict: """Load the JSON quality-gate summary, returning an empty mapping on error.""" try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except (FileNotFoundError, json.JSONDecodeError): return {} def _summary_float(summary: dict, key: str) -> float: """Extract a float-like value from the summary, defaulting to 0.0.""" value = summary.get(key) if isinstance(value, (int, float)): return float(value) return 0.0 def _summary_int(summary: dict, key: str) -> int: """Extract an int-like value from the summary, defaulting to 0.""" value = summary.get(key) if isinstance(value, int): return value if isinstance(value, float): return int(value) return 0 def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{key}="{value}"' not in line for key, value in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _build_payload( suite: str, status: str, tests: dict[str, int], ok_count: int, failed_count: int, branch: str, build_number: str, summary: dict | None = None, workspace_line_coverage_percent: float = 0.0, source_lines_over_500: int = 0, ) -> str: """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) build_labels = _label_str( { "suite": suite, "branch": branch or "unknown", "build_number": build_number or "unknown", } ) lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', "# TYPE titan_iac_quality_gate_tests_total gauge", f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests["failures"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests["errors"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests["skipped"]}', "# TYPE titan_iac_quality_gate_run_status gauge", f'titan_iac_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if status == "ok" else 0}', f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}', "# TYPE titan_iac_quality_gate_build_info gauge", f"titan_iac_quality_gate_build_info{build_labels} 1", "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', ] results = summary.get("results", []) if isinstance(summary, dict) else [] if results: lines.append("# TYPE titan_iac_quality_gate_checks_total gauge") for result in results: check_name = result.get("name") check_status = result.get("status") if not check_name or not check_status: continue lines.append( f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1' ) return "\n".join(lines) + "\n" def main() -> int: """Publish the quality-gate metrics and print a compact run summary.""" suite = os.getenv("SUITE_NAME", "titan_iac") pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091") job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci") junit_glob = os.getenv("JUNIT_GLOB", os.getenv("JUNIT_PATH", "build/junit-*.xml")) exit_code_path = os.getenv("QUALITY_GATE_EXIT_CODE_PATH", os.getenv("GLUE_EXIT_CODE_PATH", "build/quality-gate.rc")) summary_path = os.getenv("QUALITY_GATE_SUMMARY_PATH", "build/quality-gate-summary.json") branch = os.getenv("BRANCH_NAME", os.getenv("GIT_BRANCH", "")) build_number = os.getenv("BUILD_NUMBER", "") tests = _collect_junit_totals(junit_glob) exit_code = _read_exit_code(exit_code_path) status = "ok" if exit_code == 0 else "failed" summary = _load_summary(summary_path) workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") source_lines_over_500 = _summary_int(summary, "source_lines_over_500") ok_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "ok"}, ) ) failed_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "failed"}, ) ) if status == "ok": ok_count += 1 else: failed_count += 1 payload = _build_payload( suite=suite, status=status, tests=tests, ok_count=ok_count, failed_count=failed_count, branch=branch, build_number=build_number, summary=summary, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) summary = { "suite": suite, "status": status, "tests_total": tests["tests"], "tests_failed": tests["failures"], "tests_error": tests["errors"], "tests_skipped": tests["skipped"], "ok_count": ok_count, "failed_count": failed_count, "checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0, "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, } print(json.dumps(summary, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())