quality: publish ananke gate results to pushgateway

This commit is contained in:
Brad Stein 2026-04-10 13:53:42 -03:00
parent a6d4f71d02
commit 3721e2ad2a
4 changed files with 339 additions and 8 deletions

View File

@ -122,6 +122,8 @@ Gate order:
Installer behavior:
- `scripts/install.sh` runs the quality gate by default
- override only for emergency break/fix: `ANANKE_ENFORCE_QUALITY_GATE=0`
- host quality runs keep writing local `ananke_quality_gate_*` metrics and also publish `platform_quality_gate_runs_total{suite="ananke",status=*}` to Pushgateway for shared Grafana panels
- override the Pushgateway target when running outside cluster DNS: `ANANKE_QUALITY_PUSHGATEWAY_URL=http://... ./scripts/quality_gate.sh`
## Growing with the lab

View File

@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Publish Ananke quality-gate counters to Pushgateway."""
from __future__ import annotations
import argparse
import os
import sys
import time
import urllib.error
import urllib.request
DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
def _escape_label(value: str) -> str:
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def _label_str(labels: dict[str, str]) -> str:
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
return "{" + ",".join(parts) + "}" if parts else ""
def _read_http(url: str, timeout_seconds: float) -> str:
try:
with urllib.request.urlopen(url, timeout=timeout_seconds) as resp:
return resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
exc.read()
exc.close()
raise
def _post_text(url: str, payload: str, timeout_seconds: float, attempts: int, retry_delay_seconds: float) -> None:
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
req = urllib.request.Request(
url,
data=payload.encode("utf-8"),
method="POST",
headers={"Content-Type": "text/plain"},
)
try:
with urllib.request.urlopen(req, timeout=timeout_seconds) as resp:
if resp.status >= 400:
raise RuntimeError(f"push failed status={resp.status}")
return
except Exception as exc: # pragma: no cover - exercised via tests
last_error = exc
if attempt < attempts:
time.sleep(retry_delay_seconds)
raise RuntimeError(f"push failed after {attempts} attempt(s): {last_error}") from last_error
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str], timeout_seconds: float) -> float:
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics", timeout_seconds)
for line in text.splitlines():
if not line.startswith(metric + "{"):
continue
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
continue
parts = line.split()
if len(parts) < 2:
continue
try:
return float(parts[1])
except ValueError:
return 0.0
return 0.0
def _build_payload(suite: str, trigger: str, ok_count: int, failed_count: int) -> str:
lines = [
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}',
"# TYPE ananke_quality_gate_publish_info gauge",
f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1',
]
return "\n".join(lines) + "\n"
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--pushgateway-url",
default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_URL", os.getenv("PUSHGATEWAY_URL", DEFAULT_PUSHGATEWAY_URL)),
)
parser.add_argument(
"--job-name",
default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_JOB", "platform-quality-ci"),
)
parser.add_argument("--suite", default=os.getenv("SUITE_NAME", "ananke"))
parser.add_argument("--trigger", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_TRIGGER", "host"))
parser.add_argument("--local-ok", type=int, required=True)
parser.add_argument("--local-failed", type=int, required=True)
parser.add_argument(
"--timeout-seconds",
type=float,
default=float(os.getenv("ANANKE_QUALITY_PUSH_TIMEOUT_SECONDS", "10")),
)
parser.add_argument(
"--attempts",
type=int,
default=int(os.getenv("ANANKE_QUALITY_PUSH_ATTEMPTS", "3")),
)
parser.add_argument(
"--retry-delay-seconds",
type=float,
default=float(os.getenv("ANANKE_QUALITY_PUSH_RETRY_DELAY_SECONDS", "1")),
)
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
remote_ok = 0
remote_failed = 0
remote_error = ""
try:
remote_ok = int(
_fetch_existing_counter(
args.pushgateway_url,
"platform_quality_gate_runs_total",
{"job": args.job_name, "suite": args.suite, "status": "ok"},
args.timeout_seconds,
)
)
remote_failed = int(
_fetch_existing_counter(
args.pushgateway_url,
"platform_quality_gate_runs_total",
{"job": args.job_name, "suite": args.suite, "status": "failed"},
args.timeout_seconds,
)
)
except Exception as exc:
remote_error = str(exc)
resolved_ok = max(args.local_ok, remote_ok)
resolved_failed = max(args.local_failed, remote_failed)
payload = _build_payload(args.suite, args.trigger, resolved_ok, resolved_failed)
if args.dry_run:
sys.stdout.write(payload)
return 0
push_url = f"{args.pushgateway_url.rstrip('/')}/metrics/job/{args.job_name}/suite/{args.suite}"
_post_text(push_url, payload, args.timeout_seconds, max(args.attempts, 1), max(args.retry_delay_seconds, 0.0))
summary = f"[quality] published Pushgateway metrics suite={args.suite} job={args.job_name} ok={resolved_ok} failed={resolved_failed}"
if remote_error:
summary += f" remote_read_error={remote_error}"
print(summary)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
from __future__ import annotations
import http.server
import socketserver
import threading
import unittest
import publish_quality_metrics as publisher
class _GatewayHandler(http.server.BaseHTTPRequestHandler):
metrics_text = ""
fail_metrics_read = False
posts: list[tuple[str, str]] = []
def do_GET(self) -> None: # noqa: N802
if self.path != "/metrics":
self.send_response(404)
self.end_headers()
return
if self.fail_metrics_read:
self.send_response(500)
self.end_headers()
self.wfile.write(b"boom")
return
body = self.metrics_text.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self) -> None: # noqa: N802
size = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(size).decode("utf-8")
self.posts.append((self.path, body))
self.send_response(202)
self.end_headers()
def log_message(self, format: str, *args: object) -> None: # noqa: A003
return
class PublishQualityMetricsTest(unittest.TestCase):
def setUp(self) -> None:
_GatewayHandler.metrics_text = ""
_GatewayHandler.fail_metrics_read = False
_GatewayHandler.posts = []
self.server = socketserver.TCPServer(("127.0.0.1", 0), _GatewayHandler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
self.base_url = f"http://127.0.0.1:{self.server.server_address[1]}"
def tearDown(self) -> None:
self.server.shutdown()
self.server.server_close()
self.thread.join(timeout=5)
def test_publish_uses_remote_high_water_mark(self) -> None:
_GatewayHandler.metrics_text = "\n".join(
[
'# TYPE platform_quality_gate_runs_total counter',
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="ananke",status="ok"} 7',
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="ananke",status="failed"} 1',
]
)
exit_code = publisher.main(
[
"--pushgateway-url",
self.base_url,
"--job-name",
"platform-quality-ci",
"--suite",
"ananke",
"--trigger",
"host",
"--local-ok",
"5",
"--local-failed",
"2",
]
)
self.assertEqual(exit_code, 0)
self.assertEqual(len(_GatewayHandler.posts), 1)
path, body = _GatewayHandler.posts[0]
self.assertEqual(path, "/metrics/job/platform-quality-ci/suite/ananke")
self.assertIn('platform_quality_gate_runs_total{suite="ananke",status="ok"} 7', body)
self.assertIn('platform_quality_gate_runs_total{suite="ananke",status="failed"} 2', body)
self.assertIn('ananke_quality_gate_publish_info{suite="ananke",trigger="host"} 1', body)
def test_publish_falls_back_to_local_counters_when_metrics_read_fails(self) -> None:
_GatewayHandler.fail_metrics_read = True
exit_code = publisher.main(
[
"--pushgateway-url",
self.base_url,
"--job-name",
"platform-quality-ci",
"--suite",
"ananke",
"--local-ok",
"11",
"--local-failed",
"3",
]
)
self.assertEqual(exit_code, 0)
self.assertEqual(len(_GatewayHandler.posts), 1)
_, body = _GatewayHandler.posts[0]
self.assertIn('platform_quality_gate_runs_total{suite="ananke",status="ok"} 11', body)
self.assertIn('platform_quality_gate_runs_total{suite="ananke",status="failed"} 3', body)
if __name__ == "__main__":
unittest.main()

View File

@ -5,6 +5,16 @@ REPO_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
QUALITY_METRICS_ENABLED="${ANANKE_QUALITY_METRICS_ENABLED:-1}"
QUALITY_METRICS_FILE="${ANANKE_QUALITY_METRICS_FILE:-/var/lib/ananke/quality-gate.prom}"
QUALITY_STATE_FILE="${ANANKE_QUALITY_STATE_FILE:-/var/lib/ananke/quality-gate.state}"
QUALITY_PUSHGATEWAY_ENABLED="${ANANKE_QUALITY_PUSHGATEWAY_ENABLED:-1}"
QUALITY_PUSHGATEWAY_URL="${ANANKE_QUALITY_PUSHGATEWAY_URL:-${PUSHGATEWAY_URL:-http://platform-quality-gateway.monitoring.svc.cluster.local:9091}}"
QUALITY_PUSHGATEWAY_JOB="${ANANKE_QUALITY_PUSHGATEWAY_JOB:-platform-quality-ci}"
QUALITY_PUSHGATEWAY_TRIGGER="${ANANKE_QUALITY_PUSHGATEWAY_TRIGGER:-host}"
QUALITY_LAST_OK=0
QUALITY_LAST_FAILED=0
QUALITY_LAST_SUCCESS=0
QUALITY_LAST_RUN_TS=0
QUALITY_SUCCESS_PERCENT="0.00"
read_quality_counter() {
local key="$1"
@ -23,14 +33,14 @@ read_quality_counter() {
write_quality_metrics() {
local exit_code="$1"
if [[ "${QUALITY_METRICS_ENABLED}" != "1" ]]; then
return 0
fi
local metrics_dir state_dir
metrics_dir="$(dirname "${QUALITY_METRICS_FILE}")"
state_dir="$(dirname "${QUALITY_STATE_FILE}")"
mkdir -p "${metrics_dir}" "${state_dir}" >/dev/null 2>&1 || return 0
mkdir -p "${state_dir}" >/dev/null 2>&1 || return 0
if [[ "${QUALITY_METRICS_ENABLED}" == "1" ]]; then
mkdir -p "${metrics_dir}" >/dev/null 2>&1 || return 0
fi
local ok failed total last_success now success_percent
ok="$(read_quality_counter ok)"
@ -45,12 +55,20 @@ write_quality_metrics() {
total=$((ok + failed))
now="$(date +%s)"
success_percent="$(awk -v ok="${ok}" -v total="${total}" 'BEGIN { if (total <= 0) { print "0.00" } else { printf "%.2f", (ok * 100.0) / total } }')"
QUALITY_LAST_OK="${ok}"
QUALITY_LAST_FAILED="${failed}"
QUALITY_LAST_SUCCESS="${last_success}"
QUALITY_LAST_RUN_TS="${now}"
QUALITY_SUCCESS_PERCENT="${success_percent}"
local tmp_metrics tmp_state
tmp_metrics="$(mktemp "${metrics_dir}/quality-gate.prom.XXXXXX")"
local tmp_metrics="" tmp_state
if [[ "${QUALITY_METRICS_ENABLED}" == "1" ]]; then
tmp_metrics="$(mktemp "${metrics_dir}/quality-gate.prom.XXXXXX")"
fi
tmp_state="$(mktemp "${state_dir}/quality-gate.state.XXXXXX")"
cat > "${tmp_metrics}" <<EOF
if [[ "${QUALITY_METRICS_ENABLED}" == "1" ]]; then
cat > "${tmp_metrics}" <<EOF
# HELP ananke_quality_gate_runs_total Total Ananke quality gate runs by status.
# TYPE ananke_quality_gate_runs_total counter
ananke_quality_gate_runs_total{suite="ananke",status="ok"} ${ok}
@ -65,6 +83,7 @@ ananke_quality_gate_last_run_timestamp_seconds{suite="ananke"} ${now}
# TYPE ananke_quality_gate_success_percent gauge
ananke_quality_gate_success_percent{suite="ananke"} ${success_percent}
EOF
fi
cat > "${tmp_state}" <<EOF
ok=${ok}
@ -73,14 +92,40 @@ last_success=${last_success}
last_run=${now}
EOF
mv -f "${tmp_metrics}" "${QUALITY_METRICS_FILE}"
mv -f "${tmp_state}" "${QUALITY_STATE_FILE}"
if [[ "${QUALITY_METRICS_ENABLED}" == "1" ]]; then
mv -f "${tmp_metrics}" "${QUALITY_METRICS_FILE}"
fi
}
publish_quality_metrics() {
if [[ "${QUALITY_PUSHGATEWAY_ENABLED}" != "1" ]]; then
return 0
fi
if [[ -z "${QUALITY_PUSHGATEWAY_URL}" ]]; then
return 0
fi
if ! command -v python3 >/dev/null 2>&1; then
echo "[quality] warning: python3 not found; skipping Pushgateway publish" >&2
return 0
fi
if ! python3 "${REPO_DIR}/scripts/publish_quality_metrics.py" \
--pushgateway-url "${QUALITY_PUSHGATEWAY_URL}" \
--job-name "${QUALITY_PUSHGATEWAY_JOB}" \
--suite "ananke" \
--trigger "${QUALITY_PUSHGATEWAY_TRIGGER}" \
--local-ok "${QUALITY_LAST_OK}" \
--local-failed "${QUALITY_LAST_FAILED}"; then
echo "[quality] warning: Pushgateway publish failed for suite=ananke url=${QUALITY_PUSHGATEWAY_URL}" >&2
fi
}
quality_gate_finalize() {
local exit_code="$1"
set +e
write_quality_metrics "${exit_code}" || true
publish_quality_metrics || true
exit "${exit_code}"
}