230 lines
8.4 KiB
Python
Executable File
230 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Publish Ananke quality-gate counters to Pushgateway."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
|
|
DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
|
|
|
|
|
|
def _escape_label(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
|
|
|
|
|
def _label_str(labels: dict[str, str]) -> str:
|
|
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
|
return "{" + ",".join(parts) + "}" if parts else ""
|
|
|
|
|
|
def _read_http(url: str, timeout_seconds: float) -> str:
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=timeout_seconds) as resp:
|
|
return resp.read().decode("utf-8", errors="replace")
|
|
except urllib.error.HTTPError as exc:
|
|
exc.read()
|
|
exc.close()
|
|
raise
|
|
|
|
|
|
def _post_text(url: str, payload: str, timeout_seconds: float, attempts: int, retry_delay_seconds: float) -> None:
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, attempts + 1):
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=payload.encode("utf-8"),
|
|
method="POST",
|
|
headers={"Content-Type": "text/plain"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout_seconds) as resp:
|
|
if resp.status >= 400:
|
|
raise RuntimeError(f"push failed status={resp.status}")
|
|
return
|
|
except Exception as exc: # pragma: no cover - exercised via tests
|
|
last_error = exc
|
|
if attempt < attempts:
|
|
time.sleep(retry_delay_seconds)
|
|
raise RuntimeError(f"push failed after {attempts} attempt(s): {last_error}") from last_error
|
|
|
|
|
|
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str], timeout_seconds: float) -> float:
|
|
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics", timeout_seconds)
|
|
for line in text.splitlines():
|
|
if not line.startswith(metric + "{"):
|
|
continue
|
|
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
|
|
continue
|
|
parts = line.split()
|
|
if len(parts) < 2:
|
|
continue
|
|
try:
|
|
return float(parts[1])
|
|
except ValueError:
|
|
return 0.0
|
|
return 0.0
|
|
|
|
|
|
def _parse_checks(raw_checks: list[str]) -> list[tuple[str, str]]:
|
|
parsed: list[tuple[str, str]] = []
|
|
for item in raw_checks:
|
|
if ":" not in item:
|
|
continue
|
|
name, status = item.split(":", 1)
|
|
normalized_name = name.strip()
|
|
normalized_status = status.strip().lower()
|
|
if not normalized_name or normalized_status not in {"ok", "failed"}:
|
|
continue
|
|
parsed.append((normalized_name, normalized_status))
|
|
return parsed
|
|
|
|
|
|
def _build_payload(
|
|
suite: str,
|
|
trigger: str,
|
|
ok_count: int,
|
|
failed_count: int,
|
|
tests_passed: int,
|
|
tests_failed: int,
|
|
tests_error: int,
|
|
tests_skipped: int,
|
|
coverage_percent: float,
|
|
source_lines_over_500: int,
|
|
checks: list[tuple[str, str]],
|
|
) -> str:
|
|
lines = [
|
|
"# TYPE platform_quality_gate_runs_total counter",
|
|
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}',
|
|
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}',
|
|
"# TYPE ananke_quality_gate_tests_total gauge",
|
|
f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {max(tests_passed, 0)}',
|
|
f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {max(tests_failed, 0)}',
|
|
f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {max(tests_error, 0)}',
|
|
f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {max(tests_skipped, 0)}',
|
|
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
|
|
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}',
|
|
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
|
|
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {max(source_lines_over_500, 0)}',
|
|
"# TYPE ananke_quality_gate_checks_total gauge",
|
|
"# TYPE ananke_quality_gate_publish_info gauge",
|
|
f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1',
|
|
]
|
|
for check_name, check_status in checks:
|
|
lines.append(
|
|
f'ananke_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{check_status}"}} 1'
|
|
)
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"--pushgateway-url",
|
|
default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_URL", os.getenv("PUSHGATEWAY_URL", DEFAULT_PUSHGATEWAY_URL)),
|
|
)
|
|
parser.add_argument(
|
|
"--job-name",
|
|
default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_JOB", "platform-quality-ci"),
|
|
)
|
|
parser.add_argument("--suite", default=os.getenv("SUITE_NAME", "ananke"))
|
|
parser.add_argument("--trigger", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_TRIGGER", "host"))
|
|
parser.add_argument("--local-ok", type=int, required=True)
|
|
parser.add_argument("--local-failed", type=int, required=True)
|
|
parser.add_argument("--tests-passed", type=int, default=0)
|
|
parser.add_argument("--tests-failed", type=int, default=0)
|
|
parser.add_argument("--tests-error", type=int, default=0)
|
|
parser.add_argument("--tests-skipped", type=int, default=0)
|
|
parser.add_argument("--coverage-percent", type=float, default=0.0)
|
|
parser.add_argument("--source-lines-over-500", type=int, default=0)
|
|
parser.add_argument("--check", action="append", default=[], help="check_name:ok|failed")
|
|
parser.add_argument(
|
|
"--timeout-seconds",
|
|
type=float,
|
|
default=float(os.getenv("ANANKE_QUALITY_PUSH_TIMEOUT_SECONDS", "10")),
|
|
)
|
|
parser.add_argument(
|
|
"--attempts",
|
|
type=int,
|
|
default=int(os.getenv("ANANKE_QUALITY_PUSH_ATTEMPTS", "3")),
|
|
)
|
|
parser.add_argument(
|
|
"--retry-delay-seconds",
|
|
type=float,
|
|
default=float(os.getenv("ANANKE_QUALITY_PUSH_RETRY_DELAY_SECONDS", "1")),
|
|
)
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = parse_args(argv or sys.argv[1:])
|
|
|
|
remote_ok = 0
|
|
remote_failed = 0
|
|
remote_error = ""
|
|
try:
|
|
remote_ok = int(
|
|
_fetch_existing_counter(
|
|
args.pushgateway_url,
|
|
"platform_quality_gate_runs_total",
|
|
{"job": args.job_name, "suite": args.suite, "status": "ok"},
|
|
args.timeout_seconds,
|
|
)
|
|
)
|
|
remote_failed = int(
|
|
_fetch_existing_counter(
|
|
args.pushgateway_url,
|
|
"platform_quality_gate_runs_total",
|
|
{"job": args.job_name, "suite": args.suite, "status": "failed"},
|
|
args.timeout_seconds,
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
remote_error = str(exc)
|
|
|
|
resolved_ok = max(args.local_ok, remote_ok)
|
|
resolved_failed = max(args.local_failed, remote_failed)
|
|
checks = _parse_checks(args.check)
|
|
if not checks:
|
|
checks = [("gate", "ok" if args.local_failed <= 0 else "failed")]
|
|
payload = _build_payload(
|
|
args.suite,
|
|
args.trigger,
|
|
resolved_ok,
|
|
resolved_failed,
|
|
args.tests_passed,
|
|
args.tests_failed,
|
|
args.tests_error,
|
|
args.tests_skipped,
|
|
args.coverage_percent,
|
|
args.source_lines_over_500,
|
|
checks,
|
|
)
|
|
|
|
if args.dry_run:
|
|
sys.stdout.write(payload)
|
|
return 0
|
|
|
|
push_url = f"{args.pushgateway_url.rstrip('/')}/metrics/job/{args.job_name}/suite/{args.suite}"
|
|
_post_text(push_url, payload, args.timeout_seconds, max(args.attempts, 1), max(args.retry_delay_seconds, 0.0))
|
|
|
|
summary = (
|
|
f"[quality] published Pushgateway metrics suite={args.suite} job={args.job_name} ok={resolved_ok} "
|
|
f"failed={resolved_failed} checks={len(checks)} coverage={args.coverage_percent:.2f} "
|
|
f"over_500={max(args.source_lines_over_500, 0)}"
|
|
)
|
|
if remote_error:
|
|
summary += f" remote_read_error={remote_error}"
|
|
print(summary)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|