ariadne/ariadne/metrics/metrics.py

116 lines
3.6 KiB
Python

from __future__ import annotations
from datetime import datetime
from prometheus_client import Counter, Gauge, Histogram
TASK_RUNS_TOTAL = Counter(
"ariadne_task_runs_total",
"Ariadne task runs by status",
["task", "status"],
)
TASK_DURATION_SECONDS = Histogram(
"ariadne_task_duration_seconds",
"Ariadne task durations in seconds",
["task", "status"],
buckets=(0.5, 1, 2, 5, 10, 30, 60, 120, 300),
)
SCHEDULE_LAST_RUN_TS = Gauge(
"ariadne_schedule_last_run_timestamp_seconds",
"Last schedule run timestamp",
["task"],
)
SCHEDULE_LAST_SUCCESS_TS = Gauge(
"ariadne_schedule_last_success_timestamp_seconds",
"Last successful schedule run timestamp",
["task"],
)
SCHEDULE_LAST_ERROR_TS = Gauge(
"ariadne_schedule_last_error_timestamp_seconds",
"Last failed schedule run timestamp",
["task"],
)
SCHEDULE_NEXT_RUN_TS = Gauge(
"ariadne_schedule_next_run_timestamp_seconds",
"Next scheduled run timestamp",
["task"],
)
SCHEDULE_STATUS = Gauge(
"ariadne_schedule_last_status",
"Last schedule status (1=ok,0=error)",
["task"],
)
ACCESS_REQUESTS = Gauge(
"ariadne_access_requests_total",
"Access requests by status",
["status"],
)
CLUSTER_STATE_LAST_TS = Gauge(
"ariadne_cluster_state_timestamp_seconds",
"Last cluster state snapshot timestamp",
)
CLUSTER_STATE_NODES_TOTAL = Gauge(
"ariadne_cluster_nodes_total",
"Cluster nodes total",
)
CLUSTER_STATE_NODES_READY = Gauge(
"ariadne_cluster_nodes_ready",
"Cluster nodes Ready",
)
CLUSTER_STATE_PODS_RUNNING = Gauge(
"ariadne_cluster_pods_running",
"Cluster pods Running",
)
CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY = Gauge(
"ariadne_cluster_kustomizations_not_ready",
"Flux kustomizations not Ready",
)
def record_task_run(task: str, status: str, duration_sec: float | None) -> None:
"""Increment task counters and duration histograms for one run."""
TASK_RUNS_TOTAL.labels(task=task, status=status).inc()
if duration_sec is not None:
TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec)
def record_schedule_state(task: str, last_run_ts: float | None, last_success_ts: float | None, next_run_ts: float | None, ok: bool | None) -> None:
"""Publish the latest scheduler timestamps and status for a task."""
if last_run_ts:
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
if last_success_ts:
SCHEDULE_LAST_SUCCESS_TS.labels(task=task).set(last_success_ts)
if next_run_ts:
SCHEDULE_NEXT_RUN_TS.labels(task=task).set(next_run_ts)
if ok is not None:
SCHEDULE_STATUS.labels(task=task).set(1 if ok else 0)
if ok is False and last_run_ts:
SCHEDULE_LAST_ERROR_TS.labels(task=task).set(last_run_ts)
def set_access_request_counts(counts: dict[str, int]) -> None:
"""Set access-request gauges grouped by lifecycle status."""
for status, count in counts.items():
ACCESS_REQUESTS.labels(status=status).set(count)
def set_cluster_state_metrics(collected_at: datetime, nodes_total: int | None, nodes_ready: int | None, pods_running: float | None, kustomizations_not_ready: int | None) -> None:
"""Set cluster-state gauges from the most recent collector snapshot."""
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
if nodes_total is not None:
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)
if nodes_ready is not None:
CLUSTER_STATE_NODES_READY.set(nodes_ready)
if pods_running is not None:
CLUSTER_STATE_PODS_RUNNING.set(pods_running)
if kustomizations_not_ready is not None:
CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY.set(kustomizations_not_ready)