#!/usr/bin/env python3 """Publish Ananke quality-gate counters to Pushgateway.""" from __future__ import annotations import argparse import json import os from pathlib import Path import re import sys import time import urllib.error import urllib.request DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091" SOURCE_SCAN_ROOTS = ("cmd", "internal", "scripts", "testing") SOURCE_EXTENSIONS = {".go", ".py", ".sh"} QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"} def _escape_label(value: str) -> str: return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_http(url: str, timeout_seconds: float) -> str: try: with urllib.request.urlopen(url, timeout=timeout_seconds) as resp: return resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as exc: exc.read() exc.close() raise def _post_text(url: str, payload: str, timeout_seconds: float, attempts: int, retry_delay_seconds: float) -> None: last_error: Exception | None = None for attempt in range(1, attempts + 1): req = urllib.request.Request( url, data=payload.encode("utf-8"), method="PUT", headers={"Content-Type": "text/plain"}, ) try: with urllib.request.urlopen(req, timeout=timeout_seconds) as resp: if resp.status >= 400: raise RuntimeError(f"push failed status={resp.status}") return except Exception as exc: # pragma: no cover - exercised via tests last_error = exc if attempt < attempts: time.sleep(retry_delay_seconds) raise RuntimeError(f"push failed after {attempts} attempt(s): {last_error}") from last_error def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str], timeout_seconds: float) -> float: text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics", timeout_seconds) for line in text.splitlines(): if not line.startswith(metric + "{"): continue if any(f'{key}="{value}"' not in line for key, value in labels.items()): continue parts = line.split() if len(parts) < 2: continue try: return float(parts[1]) except ValueError: return 0.0 return 0.0 def _build_payload( suite: str, trigger: str, ok_count: int, failed_count: int, *, tests_passed: int, tests_failed: int, tests_errors: int, tests_skipped: int, coverage_percent: float, source_lines_over_500: int, checks: dict[str, str], ) -> str: lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', "# TYPE ananke_quality_gate_tests_total gauge", f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {tests_passed}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests_failed}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests_errors}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests_skipped}', "# TYPE ananke_quality_gate_coverage_percent gauge", f'ananke_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}', "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', "# TYPE ananke_quality_gate_checks_total gauge", "# TYPE ananke_quality_gate_publish_info gauge", f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1', ] lines.extend( f'ananke_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1' for check_name, check_status in checks.items() ) return "\n".join(lines) + "\n" def _read_coverage_percent(path: str) -> float: if not path: return 0.0 try: raw = Path(path).read_text(encoding="utf-8").strip() except OSError: return 0.0 try: return float(raw) except ValueError: return 0.0 def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int: count = 0 for rel_root in SOURCE_SCAN_ROOTS: base = repo_root / rel_root if not base.exists(): continue for path in base.rglob("*"): if not path.is_file(): continue if path.suffix not in SOURCE_EXTENSIONS: continue lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines()) if lines > max_lines: count += 1 return count def _parse_go_test_counts(output_path: Path) -> dict[str, int]: if not output_path.exists(): return {"passed": 0, "failed": 0, "errors": 0, "skipped": 0} text = output_path.read_text(encoding="utf-8", errors="ignore") return { "passed": len(re.findall(r"^--- PASS:", text, flags=re.M)), "failed": len(re.findall(r"^--- FAIL:", text, flags=re.M)), "errors": 0, "skipped": len(re.findall(r"^--- SKIP:", text, flags=re.M)), } def _read_exit_code(path: Path) -> int: if not path.exists(): return 1 raw = path.read_text(encoding="utf-8").strip() try: return int(raw) except ValueError: return 1 def _read_status(path: Path, default: str = "failed") -> str: if not path.exists(): return default raw = path.read_text(encoding="utf-8").strip().lower() if raw in {"ok", "pass", "passed", "success"}: return "ok" if raw in {"failed", "fail", "error"}: return "failed" return default def _load_json(path: Path) -> dict | None: if not path.exists(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return None return payload if isinstance(payload, dict) else None def _sonarqube_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json")))) if not report: return "not_applicable" status_candidates = [ report.get("status"), ((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None), ((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None), ] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def _supply_chain_check_status(build_dir: Path) -> str: report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json")))) if not report: return "not_applicable" compliant = report.get("compliant") if isinstance(compliant, bool): return "ok" if compliant else "failed" status_candidates = [report.get("status"), report.get("result"), report.get("compliance")] for value in status_candidates: if isinstance(value, str): return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" return "failed" def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--pushgateway-url", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_URL", os.getenv("PUSHGATEWAY_URL", DEFAULT_PUSHGATEWAY_URL)), ) parser.add_argument( "--job-name", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_JOB", "platform-quality-ci"), ) parser.add_argument("--suite", default=os.getenv("SUITE_NAME", "ananke")) parser.add_argument("--trigger", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_TRIGGER", "host")) parser.add_argument("--local-ok", type=int, required=True) parser.add_argument("--local-failed", type=int, required=True) parser.add_argument( "--coverage-percent-file", default=os.getenv("ANANKE_QUALITY_COVERAGE_PERCENT_FILE", "build/coverage-percent.txt"), ) parser.add_argument( "--timeout-seconds", type=float, default=float(os.getenv("ANANKE_QUALITY_PUSH_TIMEOUT_SECONDS", "10")), ) parser.add_argument( "--attempts", type=int, default=int(os.getenv("ANANKE_QUALITY_PUSH_ATTEMPTS", "3")), ) parser.add_argument( "--retry-delay-seconds", type=float, default=float(os.getenv("ANANKE_QUALITY_PUSH_RETRY_DELAY_SECONDS", "1")), ) parser.add_argument("--dry-run", action="store_true") return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv or sys.argv[1:]) repo_root = Path(__file__).resolve().parents[1] build_dir = repo_root / "build" remote_ok = 0 remote_failed = 0 remote_error = "" try: remote_ok = int( _fetch_existing_counter( args.pushgateway_url, "platform_quality_gate_runs_total", {"job": args.job_name, "suite": args.suite, "status": "ok"}, args.timeout_seconds, ) ) remote_failed = int( _fetch_existing_counter( args.pushgateway_url, "platform_quality_gate_runs_total", {"job": args.job_name, "suite": args.suite, "status": "failed"}, args.timeout_seconds, ) ) except Exception as exc: remote_error = str(exc) resolved_ok = max(args.local_ok, remote_ok) resolved_failed = max(args.local_failed, remote_failed) coverage_percent = _read_coverage_percent(args.coverage_percent_file) source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500) tests = _parse_go_test_counts(Path(os.getenv("ANANKE_QUALITY_OUTPUT_FILE", str(build_dir / "quality-gate.out")))) gate_rc = _read_exit_code(Path(os.getenv("ANANKE_QUALITY_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")))) docs_status = _read_status(Path(os.getenv("ANANKE_QUALITY_DOCS_STATUS_PATH", str(build_dir / "docs-naming.status")))) gate_failed = gate_rc != 0 checks = { "tests": "failed" if gate_failed or tests["failed"] > 0 else "ok", "coverage": "ok" if coverage_percent >= 95.0 else "failed", "loc": "ok" if source_lines_over_500 == 0 else "failed", "docs_naming": docs_status, "gate_glue": "ok", "sonarqube": _sonarqube_check_status(build_dir), "supply_chain": _supply_chain_check_status(build_dir), } payload = _build_payload( args.suite, args.trigger, resolved_ok, resolved_failed, tests_passed=tests["passed"], tests_failed=tests["failed"], tests_errors=tests["errors"], tests_skipped=tests["skipped"], coverage_percent=coverage_percent, source_lines_over_500=source_lines_over_500, checks=checks, ) if args.dry_run: sys.stdout.write(payload) return 0 push_url = f"{args.pushgateway_url.rstrip('/')}/metrics/job/{args.job_name}/suite/{args.suite}" _post_text(push_url, payload, args.timeout_seconds, max(args.attempts, 1), max(args.retry_delay_seconds, 0.0)) summary = ( f"[quality] published Pushgateway metrics suite={args.suite} job={args.job_name} ok={resolved_ok} " f"failed={resolved_failed} coverage={coverage_percent:.3f} source_lines_over_500={source_lines_over_500}" ) if remote_error: summary += f" remote_read_error={remote_error}" print(summary) return 0 if __name__ == "__main__": raise SystemExit(main())