#!/usr/bin/env python3 """Publish titan-iac quality-gate results to Pushgateway.""" from __future__ import annotations import json import os from glob import glob from pathlib import Path import urllib.error import urllib.request import xml.etree.ElementTree as ET SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} CANONICAL_CHECKS = [ "tests", "coverage", "loc", "docs_naming", "gate_glue", "sonarqube", "supply_chain", ] def _escape_label(value: str) -> str: """Escape a Prometheus label value without changing its content.""" return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: """Render a stable Prometheus label set from a mapping.""" parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_text(url: str) -> str: """Fetch a plain-text response body from the given URL.""" with urllib.request.urlopen(url, timeout=10) as response: return response.read().decode("utf-8") def _post_text(url: str, payload: str) -> None: """PUT a plain-text payload and fail on any 4xx/5xx response.""" request = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) with urllib.request.urlopen(request, timeout=10) as response: if response.status >= 400: raise RuntimeError(f"push failed with status={response.status}") def _parse_junit(path: str) -> dict[str, int]: """Parse a JUnit XML file into aggregate test counters.""" if not os.path.exists(path): return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} tree = ET.parse(path) root = tree.getroot() totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} suites: list[ET.Element] if root.tag == "testsuite": suites = [root] elif root.tag == "testsuites": suites = [elem for elem in root if elem.tag == "testsuite"] else: suites = [] for suite in suites: for key in totals: raw_value = suite.attrib.get(key, "0") try: totals[key] += int(float(raw_value)) except ValueError: totals[key] += 0 return totals def _collect_junit_totals(pattern: str) -> dict[str, int]: """Sum JUnit counters across every XML file matching the pattern.""" totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for path in sorted(glob(pattern)): parsed = _parse_junit(path) for key in totals: totals[key] += parsed[key] return totals def _read_exit_code(path: str) -> int: """Read the quality-gate exit code, defaulting to failure if missing.""" try: with open(path, "r", encoding="utf-8") as handle: return int(handle.read().strip()) except (FileNotFoundError, ValueError): return 1 def _load_summary(path: str) -> dict: """Load the JSON quality-gate summary, returning an empty mapping on error.""" try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except (FileNotFoundError, json.JSONDecodeError): return {} def _summary_float(summary: dict, key: str) -> float: """Extract a float-like value from the summary, defaulting to 0.0.""" value = summary.get(key) if isinstance(value, (int, float)): return float(value) return 0.0 def _summary_int(summary: dict, key: str) -> int: """Extract an int-like value from the summary, defaulting to 0.""" value = summary.get(key) if isinstance(value, int): return value if isinstance(value, float): return int(value) return 0 def _normalize_result_status(value: str | None, default: str = "failed") -> str: """Map arbitrary check status text into canonical check result buckets.""" if not value: return default normalized = value.strip().lower() if normalized in SUCCESS_STATUSES: return "ok" if normalized in NOT_APPLICABLE_STATUSES: return "not_applicable" if normalized in FAILED_STATUSES: return "failed" return default def _load_optional_json(path: str | None) -> dict: """Load an optional JSON report file, returning an empty object when absent.""" if not path: return {} candidate = Path(path) if not candidate.exists(): return {} try: return json.loads(candidate.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def _combine_statuses(statuses: list[str]) -> str: """Roll up many check statuses into one canonical result.""" if not statuses: return "not_applicable" if any(status == "failed" for status in statuses): return "failed" if all(status == "not_applicable" for status in statuses): return "not_applicable" if all(status in {"ok", "not_applicable"} for status in statuses): return "ok" return "failed" def _infer_sonarqube_status(report: dict) -> str: """Infer canonical SonarQube check status from its JSON report payload.""" if not report: return "not_applicable" status = ( report.get("projectStatus", {}).get("status") or report.get("qualityGate", {}).get("status") or report.get("status") ) return _normalize_result_status(str(status) if status is not None else None, default="failed") def _infer_supply_chain_status(report: dict, required: bool) -> str: """Infer canonical supply-chain status from IronBank/artifact report payload.""" if not report: return "failed" if required else "not_applicable" compliant = report.get("compliant") if isinstance(compliant, bool): return "ok" if compliant else "failed" status = report.get("status") if status is None: return "failed" if required else "not_applicable" normalized = _normalize_result_status(str(status), default="failed") if normalized == "not_applicable" and required: return "failed" return normalized def _build_check_statuses( summary: dict | None, tests: dict[str, int], workspace_line_coverage_percent: float, source_lines_over_500: int, sonarqube_report: dict, supply_chain_report: dict, supply_chain_required: bool, ) -> dict[str, str]: """Generate the canonical quality-check status map for dashboarding.""" raw_results = summary.get("results", []) if isinstance(summary, dict) else [] status_by_name: dict[str, str] = {} for result in raw_results: if not isinstance(result, dict): continue check_name = str(result.get("name") or "").strip().lower() if not check_name: continue status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") # tests tests_status = status_by_name.get("tests") if not tests_status: candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] if candidates: tests_status = _combine_statuses(candidates) elif tests["tests"] > 0: tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" else: tests_status = "not_applicable" # coverage coverage_status = status_by_name.get("coverage") if not coverage_status: if workspace_line_coverage_percent > 0: coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" else: coverage_status = "not_applicable" # loc loc_status = status_by_name.get("loc") if not loc_status: loc_status = "ok" if source_lines_over_500 == 0 else "failed" # docs + naming + lint hygiene docs_naming_status = status_by_name.get("docs_naming") if not docs_naming_status: candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" # gate glue gate_glue_status = status_by_name.get("gate_glue") if not gate_glue_status: candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( supply_chain_report, required=supply_chain_required, ) return { "tests": tests_status, "coverage": coverage_status, "loc": loc_status, "docs_naming": docs_naming_status, "gate_glue": gate_glue_status, "sonarqube": sonarqube_status, "supply_chain": supply_chain_status, } def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{key}="{value}"' not in line for key, value in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _build_payload( suite: str, status: str, tests: dict[str, int], ok_count: int, failed_count: int, branch: str, build_number: str, summary: dict | None = None, workspace_line_coverage_percent: float = 0.0, source_lines_over_500: int = 0, check_statuses: dict[str, str] | None = None, ) -> str: """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) build_labels = _label_str( { "suite": suite, "branch": branch or "unknown", "build_number": build_number or "unknown", } ) lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', "# TYPE titan_iac_quality_gate_tests_total gauge", f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests["failures"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests["errors"]}', f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests["skipped"]}', "# TYPE titan_iac_quality_gate_run_status gauge", f'titan_iac_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if status == "ok" else 0}', f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}', "# TYPE titan_iac_quality_gate_build_info gauge", f"titan_iac_quality_gate_build_info{build_labels} 1", "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', ] if check_statuses: lines.append("# TYPE titan_iac_quality_gate_checks_total gauge") for check_name in CANONICAL_CHECKS: check_status = check_statuses.get(check_name, "not_applicable") lines.append( f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1' ) return "\n".join(lines) + "\n" def main() -> int: """Publish the quality-gate metrics and print a compact run summary.""" suite = os.getenv("SUITE_NAME", "titan_iac") pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091") job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci") junit_glob = os.getenv("JUNIT_GLOB", os.getenv("JUNIT_PATH", "build/junit-*.xml")) exit_code_path = os.getenv("QUALITY_GATE_EXIT_CODE_PATH", os.getenv("GLUE_EXIT_CODE_PATH", "build/quality-gate.rc")) summary_path = os.getenv("QUALITY_GATE_SUMMARY_PATH", "build/quality-gate-summary.json") branch = os.getenv("BRANCH_NAME", os.getenv("GIT_BRANCH", "")) build_number = os.getenv("BUILD_NUMBER", "") tests = _collect_junit_totals(junit_glob) exit_code = _read_exit_code(exit_code_path) status = "ok" if exit_code == 0 else "failed" summary = _load_summary(summary_path) workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") source_lines_over_500 = _summary_int(summary, "source_lines_over_500") sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json")) supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json")) supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"} check_statuses = _build_check_statuses( summary=summary, tests=tests, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, sonarqube_report=sonarqube_report, supply_chain_report=supply_chain_report, supply_chain_required=supply_chain_required, ) ok_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "ok"}, ) ) failed_count = int( _fetch_existing_counter( pushgateway_url, "platform_quality_gate_runs_total", {"job": job_name, "suite": suite, "status": "failed"}, ) ) if status == "ok": ok_count += 1 else: failed_count += 1 payload = _build_payload( suite=suite, status=status, tests=tests, ok_count=ok_count, failed_count=failed_count, branch=branch, build_number=build_number, summary=summary, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, check_statuses=check_statuses, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) summary = { "suite": suite, "status": status, "tests_total": tests["tests"], "tests_failed": tests["failures"], "tests_error": tests["errors"], "tests_skipped": tests["skipped"], "ok_count": ok_count, "failed_count": failed_count, "checks_recorded": len(check_statuses), "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, } print(json.dumps(summary, sort_keys=True)) return 0 if __name__ == "__main__": # pragma: no cover raise SystemExit(main())