#!/usr/bin/env python3 """Publish titan-iac quality-gate results to Pushgateway.""" from __future__ import annotations import json import os from glob import glob import urllib.error import urllib.request import xml.etree.ElementTree as ET from ci.scripts import publish_test_metrics_quality as _quality_helpers CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS _build_check_statuses = _quality_helpers._build_check_statuses _combine_statuses = _quality_helpers._combine_statuses _infer_sonarqube_status = _quality_helpers._infer_sonarqube_status _infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500 _infer_supply_chain_status = _quality_helpers._infer_supply_chain_status _infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent _load_optional_json = _quality_helpers._load_optional_json _normalize_result_status = _quality_helpers._normalize_result_status def _escape_label(value: str) -> str: """Escape a Prometheus label value without changing its content.""" return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: """Render a stable Prometheus label set from a mapping.""" parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_text(url: str) -> str: """Fetch a plain-text response body from the given URL.""" with urllib.request.urlopen(url, timeout=10) as response: return response.read().decode("utf-8") def _post_text(url: str, payload: str) -> None: """PUT a plain-text payload and fail on any 4xx/5xx response.""" request = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) with urllib.request.urlopen(request, timeout=10) as response: if response.status >= 400: raise RuntimeError(f"push failed with status={response.status}") def _parse_junit(path: str) -> dict[str, int]: """Parse a JUnit XML file into aggregate test counters.""" if not os.path.exists(path): return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} tree = ET.parse(path) root = tree.getroot() totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = [elem for elem in root if elem.tag == "testsuite"] else: suites = [] for suite in suites: for key in totals: raw_value = suite.attrib.get(key, "0") try: totals[key] += int(float(raw_value)) except ValueError: totals[key] += 0 return totals def _collect_junit_totals(pattern: str) -> dict[str, int]: """Sum JUnit counters across every XML file matching the pattern.""" totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for path in sorted(glob(pattern)): parsed = _parse_junit(path) for key in totals: totals[key] += parsed[key] return totals def _collect_junit_cases(pattern: str) -> list[tuple[str, str]]: """Collect individual JUnit test-case statuses for flaky-test trend panels.""" cases: list[tuple[str, str]] = [] for path in sorted(glob(pattern)): if not os.path.exists(path): continue root = ET.parse(path).getroot() suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = [elem for elem in root if elem.tag == "testsuite"] else: suites = [] for suite in suites: for test_case in suite.findall("testcase"): case_name = test_case.attrib.get("name", "").strip() class_name = test_case.attrib.get("classname", "").strip() if not case_name: continue full_name = f"{class_name}.{case_name}" if class_name else case_name status = "passed" if test_case.find("failure") is not None or test_case.find("error") is not None: status = "failed" elif test_case.find("skipped") is not None: status = "skipped" cases.append((full_name, status)) return cases def _read_exit_code(path: str) -> int: """Read the quality-gate exit code, defaulting to failure if missing.""" try: with open(path, "r", encoding="utf-8") as handle: return int(handle.read().strip()) except (FileNotFoundError, ValueError): return 1 def _load_summary(path: str) -> dict: """Load the JSON quality-gate summary, returning an empty mapping on error.""" try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except (FileNotFoundError, json.JSONDecodeError): return {} def _summary_float(summary: dict, key: str) -> float: """Extract a float-like value from the summary, defaulting to 0.0.""" value = summary.get(key) if isinstance(value, (int, float)): return float(value) return 0.0 def _summary_int(summary: dict, key: str) -> int: """Extract an int-like value from the summary, defaulting to 0.""" value = summary.get(key) if isinstance(value, int): return value if isinstance(value, float): return int(value) return 0 def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{key}="{value}"' not in line for key, value in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _build_payload( suite: str, status: str, tests: dict[str, int], test_cases: list[tuple[str, str]], ok_count: int, failed_count: int, branch: str, build_number: str, summary: dict | None = None, workspace_line_coverage_percent: float = 0.0, source_lines_over_500: int = 0, check_statuses: dict[str, str] | None = None, ) -> str: """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) build_labels = _label_str( { "suite": suite, "branch": branch or "unknown", "build_number": build_number or "unknown", } ) lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', "# TYPE titan_iac_quality_gate_tests_total gauge", f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests["failures"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests["errors"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests["skipped"]}', "# TYPE titan_iac_quality_gate_run_status gauge", f'titan_iac_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if status == "ok" else 0}', f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}', "# TYPE platform_quality_gate_build_info gauge", f"platform_quality_gate_build_info{build_labels} 1", "# TYPE titan_iac_quality_gate_build_info gauge", f"titan_iac_quality_gate_build_info{build_labels} 1", "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', ] if check_statuses: lines.append("# TYPE titan_iac_quality_gate_checks_total gauge") for check_name in CANONICAL_CHECKS: check_status = check_statuses.get(check_name, "not_applicable") lines.append( f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1' ) lines.append("# TYPE platform_quality_gate_test_case_result gauge") if test_cases: for test_name, test_status in test_cases: lines.append( f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1' ) else: lines.append( f'platform_quality_gate_test_case_result{{suite="{suite}",test="__no_test_cases__",status="skipped"}} 1' ) return "\n".join(lines) + "\n" def main() -> int: """Publish the quality-gate metrics and print a compact run summary.""" suite = os.getenv("SUITE_NAME", "titan_iac") pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091") job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci") junit_glob = os.getenv("JUNIT_GLOB", os.getenv("JUNIT_PATH", "build/junit-*.xml")) exit_code_path = os.getenv("QUALITY_GATE_EXIT_CODE_PATH", os.getenv("GLUE_EXIT_CODE_PATH", "build/quality-gate.rc")) summary_path = os.getenv("QUALITY_GATE_SUMMARY_PATH", "build/quality-gate-summary.json") branch = os.getenv("BRANCH_NAME") or os.getenv("GIT_BRANCH") or "unknown" if branch.startswith("origin/"): branch = branch[len("origin/") :] build_number = os.getenv("BUILD_NUMBER", "") tests = _collect_junit_totals(junit_glob) test_cases = _collect_junit_cases(junit_glob) exit_code = _read_exit_code(exit_code_path) status = "ok" if exit_code == 0 else "failed" summary = _load_summary(summary_path) workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") if workspace_line_coverage_percent <= 0: workspace_line_coverage_percent = _infer_workspace_coverage_percent(summary, "build/coverage-unit.xml") source_lines_over_500 = _summary_int(summary, "source_lines_over_500") if source_lines_over_500 <= 0: source_lines_over_500 = _infer_source_lines_over_500(summary) sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json")) supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json")) supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"} check_statuses = _build_check_statuses( summary=summary, tests=tests, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, sonarqube_report=sonarqube_report, supply_chain_report=supply_chain_report, supply_chain_required=supply_chain_required, ) ok_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "ok"}, ) ) failed_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "failed"}, ) ) if status == "ok": ok_count += 1 else: failed_count += 1 payload = _build_payload( suite=suite, status=status, tests=tests, test_cases=test_cases, ok_count=ok_count, failed_count=failed_count, branch=branch, build_number=build_number, summary=summary, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, check_statuses=check_statuses, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) summary = { "suite": suite, "status": status, "tests_total": tests["tests"], "tests_failed": tests["failures"], "tests_error": tests["errors"], "tests_skipped": tests["skipped"], "ok_count": ok_count, "failed_count": failed_count, "checks_recorded": len(check_statuses), "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, } print(json.dumps(summary, sort_keys=True)) return 0 if __name__ == "__main__": # pragma: no cover raise SystemExit(main())