From af4ee468403122bf6ed605bb28288b8d9f255344 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 26 Jan 2026 03:31:42 -0300 Subject: [PATCH] feat: add cluster state snapshots and metrics --- ariadne/app.py | 24 +++ ariadne/db/schema.py | 11 ++ ariadne/db/storage.py | 46 ++++++ ariadne/metrics/metrics.py | 41 +++++ ariadne/services/cluster_state.py | 243 ++++++++++++++++++++++++++++++ ariadne/settings.py | 18 +++ tests/test_cluster_state.py | 73 +++++++++ tests/test_metrics.py | 8 +- tests/test_storage.py | 21 +++ 9 files changed, 484 insertions(+), 1 deletion(-) create mode 100644 ariadne/services/cluster_state.py create mode 100644 tests/test_cluster_state.py diff --git a/ariadne/app.py b/ariadne/app.py index 94c36a6..eed4701 100644 --- a/ariadne/app.py +++ b/ariadne/app.py @@ -16,6 +16,7 @@ from .db.storage import Storage, TaskRunRecord from .manager.provisioning import ProvisioningManager from .metrics.metrics import record_task_run from .scheduler.cron import CronScheduler +from .services.cluster_state import run_cluster_state from .services.comms import comms from .services.firefly import firefly from .services.keycloak_admin import keycloak_admin @@ -338,6 +339,11 @@ def _startup() -> None: settings.comms_seed_room_cron, lambda: comms.run_seed_room(wait=True), ) + scheduler.add_task( + "schedule.cluster_state", + settings.cluster_state_cron, + lambda: run_cluster_state(storage), + ) scheduler.start() logger.info( "ariadne started", @@ -362,6 +368,7 @@ def _startup() -> None: "comms_reset_room_cron": settings.comms_reset_room_cron, "comms_seed_room_cron": settings.comms_seed_room_cron, "keycloak_profile_cron": settings.keycloak_profile_cron, + "cluster_state_cron": settings.cluster_state_cron, }, ) @@ -486,6 +493,23 @@ def list_audit_task_runs( return JSONResponse({"task_runs": output}) +@app.get("/api/admin/cluster/state") +def get_cluster_state(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_admin(ctx) + snapshot = storage.latest_cluster_state() + if not snapshot: + raise HTTPException(status_code=404, detail="cluster state unavailable") + return JSONResponse(snapshot) + + +@app.get("/api/internal/cluster/state") +def get_cluster_state_internal() -> JSONResponse: + snapshot = storage.latest_cluster_state() + if not snapshot: + raise HTTPException(status_code=404, detail="cluster state unavailable") + return JSONResponse(snapshot) + + @app.post("/api/admin/access/requests/{username}/approve") async def approve_access_request( username: str, diff --git a/ariadne/db/schema.py b/ariadne/db/schema.py index e3d4c15..b146e5b 100644 --- a/ariadne/db/schema.py +++ b/ariadne/db/schema.py @@ -31,6 +31,17 @@ ARIADNE_TABLES_SQL = [ ) """, """ + CREATE TABLE IF NOT EXISTS ariadne_cluster_state ( + id BIGSERIAL PRIMARY KEY, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + snapshot JSONB NOT NULL + ) + """, + """ + CREATE INDEX IF NOT EXISTS ariadne_cluster_state_created_idx + ON ariadne_cluster_state (created_at DESC) + """, + """ CREATE TABLE IF NOT EXISTS ariadne_events ( id BIGSERIAL PRIMARY KEY, event_type TEXT NOT NULL, diff --git a/ariadne/db/storage.py b/ariadne/db/storage.py index e7b24b4..664feb3 100644 --- a/ariadne/db/storage.py +++ b/ariadne/db/storage.py @@ -262,6 +262,52 @@ class Storage: ), ) + def record_cluster_state(self, snapshot: dict[str, Any]) -> None: + payload = json.dumps(snapshot, ensure_ascii=True) + self._db.execute( + """ + INSERT INTO ariadne_cluster_state (snapshot, created_at) + VALUES (%s, NOW()) + """, + (payload,), + ) + + def prune_cluster_state(self, keep: int) -> None: + if keep <= 0: + return + self._db.execute( + """ + DELETE FROM ariadne_cluster_state + WHERE id IN ( + SELECT id FROM ariadne_cluster_state + ORDER BY created_at DESC + OFFSET %s + ) + """, + (keep,), + ) + + def latest_cluster_state(self) -> dict[str, Any] | None: + row = self._db.fetchone( + """ + SELECT snapshot, created_at + FROM ariadne_cluster_state + ORDER BY created_at DESC + LIMIT 1 + """ + ) + if not row: + return None + snapshot = row.get("snapshot") + if isinstance(snapshot, dict): + return snapshot + if isinstance(snapshot, str): + try: + return json.loads(snapshot) + except json.JSONDecodeError: + return None + return None + def record_event(self, event_type: str, detail: dict[str, Any] | str | None) -> None: payload = detail if isinstance(detail, dict): diff --git a/ariadne/metrics/metrics.py b/ariadne/metrics/metrics.py index e31c893..843b393 100644 --- a/ariadne/metrics/metrics.py +++ b/ariadne/metrics/metrics.py @@ -1,5 +1,7 @@ from __future__ import annotations +from datetime import datetime + from prometheus_client import Counter, Gauge, Histogram @@ -47,6 +49,27 @@ ACCESS_REQUESTS = Gauge( ["status"], ) +CLUSTER_STATE_LAST_TS = Gauge( + "ariadne_cluster_state_timestamp_seconds", + "Last cluster state snapshot timestamp", +) +CLUSTER_STATE_NODES_TOTAL = Gauge( + "ariadne_cluster_nodes_total", + "Cluster nodes total", +) +CLUSTER_STATE_NODES_READY = Gauge( + "ariadne_cluster_nodes_ready", + "Cluster nodes Ready", +) +CLUSTER_STATE_PODS_RUNNING = Gauge( + "ariadne_cluster_pods_running", + "Cluster pods Running", +) +CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY = Gauge( + "ariadne_cluster_kustomizations_not_ready", + "Flux kustomizations not Ready", +) + def record_task_run(task: str, status: str, duration_sec: float | None) -> None: TASK_RUNS_TOTAL.labels(task=task, status=status).inc() @@ -76,3 +99,21 @@ def record_schedule_state( def set_access_request_counts(counts: dict[str, int]) -> None: for status, count in counts.items(): ACCESS_REQUESTS.labels(status=status).set(count) + + +def set_cluster_state_metrics( + collected_at: datetime, + nodes_total: int | None, + nodes_ready: int | None, + pods_running: float | None, + kustomizations_not_ready: int | None, +) -> None: + CLUSTER_STATE_LAST_TS.set(collected_at.timestamp()) + if nodes_total is not None: + CLUSTER_STATE_NODES_TOTAL.set(nodes_total) + if nodes_ready is not None: + CLUSTER_STATE_NODES_READY.set(nodes_ready) + if pods_running is not None: + CLUSTER_STATE_PODS_RUNNING.set(pods_running) + if kustomizations_not_ready is not None: + CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY.set(kustomizations_not_ready) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py new file mode 100644 index 0000000..ef370cb --- /dev/null +++ b/ariadne/services/cluster_state.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any + +import httpx + +from ..db.storage import Storage +from ..k8s.client import get_json +from ..metrics.metrics import set_cluster_state_metrics +from ..settings import settings +from ..utils.logging import get_logger + + +logger = get_logger(__name__) + +_VALUE_PAIR_LEN = 2 + + +@dataclass(frozen=True) +class ClusterStateSummary: + nodes_total: int | None + nodes_ready: int | None + pods_running: int | None + kustomizations_not_ready: int | None + errors: int + + +def _items(payload: dict[str, Any]) -> list[dict[str, Any]]: + items = payload.get("items") if isinstance(payload.get("items"), list) else [] + return [item for item in items if isinstance(item, dict)] + + +def _node_ready(conditions: Any) -> bool: + if not isinstance(conditions, list): + return False + for condition in conditions: + if not isinstance(condition, dict): + continue + if condition.get("type") == "Ready": + return condition.get("status") == "True" + return False + + +def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]: + names: list[str] = [] + not_ready: list[str] = [] + for node in _items(payload): + metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {} + status = node.get("status") if isinstance(node.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + if not name: + continue + names.append(name) + if not _node_ready(status.get("conditions")): + not_ready.append(name) + names.sort() + not_ready.sort() + total = len(names) + ready = total - len(not_ready) + return { + "total": total, + "ready": ready, + "not_ready": len(not_ready), + "names": names, + "not_ready_names": not_ready, + } + + +def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]: + if not isinstance(conditions, list): + return None, "", "" + for condition in conditions: + if not isinstance(condition, dict): + continue + if condition.get("type") != cond_type: + continue + status = condition.get("status") + if status == "True": + return True, condition.get("reason") or "", condition.get("message") or "" + if status == "False": + return False, condition.get("reason") or "", condition.get("message") or "" + return None, condition.get("reason") or "", condition.get("message") or "" + return None, "", "" + + +def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]: + not_ready: list[dict[str, Any]] = [] + for item in _items(payload): + metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {} + spec = item.get("spec") if isinstance(item.get("spec"), dict) else {} + status = item.get("status") if isinstance(item.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + conditions = status.get("conditions") + ready, reason, message = _condition_status(conditions, "Ready") + suspended = bool(spec.get("suspend")) + if ready is True and not suspended: + continue + not_ready.append( + { + "name": name, + "namespace": namespace, + "ready": ready, + "suspended": suspended, + "reason": reason, + "message": message, + } + ) + not_ready.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or "")) + return { + "total": len(_items(payload)), + "not_ready": len(not_ready), + "items": not_ready, + } + + +def _vm_query(expr: str) -> list[dict[str, Any]] | None: + base = settings.vm_url + if not base: + return None + url = f"{base.rstrip('/')}/api/v1/query" + params = {"query": expr} + with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client: + resp = client.get(url, params=params) + resp.raise_for_status() + payload = resp.json() + if payload.get("status") != "success": + return None + data = payload.get("data") if isinstance(payload.get("data"), dict) else {} + result = data.get("result") + return result if isinstance(result, list) else None + + +def _vm_scalar(expr: str) -> float | None: + result = _vm_query(expr) + if not result: + return None + value = result[0].get("value") if isinstance(result[0], dict) else None + if not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN: + return None + try: + return float(value[1]) + except (TypeError, ValueError): + return None + + +def _vm_vector(expr: str) -> list[dict[str, Any]]: + result = _vm_query(expr) or [] + output: list[dict[str, Any]] = [] + for item in result: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + value = item.get("value") if isinstance(item.get("value"), list) else [] + if len(value) < _VALUE_PAIR_LEN: + continue + try: + numeric = float(value[1]) + except (TypeError, ValueError): + continue + output.append({"metric": metric, "value": numeric}) + return output + + +def _summarize_metrics(errors: list[str]) -> dict[str, Any]: + metrics: dict[str, Any] = {} + try: + metrics["nodes_total"] = _vm_scalar("count(kube_node_info)") + metrics["nodes_ready"] = _vm_scalar( + "count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})" + ) + metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})") + metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})") + metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})") + metrics["pods_succeeded"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Succeeded\"})") + metrics["top_restarts_1h"] = _vm_vector( + "topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[1h])))" + ) + except Exception as exc: + errors.append(f"vm: {exc}") + return metrics + + +def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: + errors: list[str] = [] + collected_at = datetime.now(timezone.utc) + + nodes: dict[str, Any] | None = None + try: + payload = get_json("/api/v1/nodes") + nodes = _summarize_nodes(payload) + except Exception as exc: + errors.append(f"nodes: {exc}") + + kustomizations: dict[str, Any] | None = None + try: + payload = get_json( + "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations" + ) + kustomizations = _summarize_kustomizations(payload) + except Exception as exc: + errors.append(f"flux: {exc}") + + metrics = _summarize_metrics(errors) + + snapshot = { + "collected_at": collected_at.isoformat(), + "nodes": nodes or {}, + "flux": kustomizations or {}, + "metrics": metrics, + "errors": errors, + } + + summary = ClusterStateSummary( + nodes_total=(nodes or {}).get("total"), + nodes_ready=(nodes or {}).get("ready"), + pods_running=metrics.get("pods_running"), + kustomizations_not_ready=(kustomizations or {}).get("not_ready"), + errors=len(errors), + ) + set_cluster_state_metrics( + collected_at, + summary.nodes_total, + summary.nodes_ready, + summary.pods_running, + summary.kustomizations_not_ready, + ) + return snapshot, summary + + +def run_cluster_state(storage: Storage) -> ClusterStateSummary: + snapshot, summary = collect_cluster_state() + try: + storage.record_cluster_state(snapshot) + storage.prune_cluster_state(settings.cluster_state_keep) + except Exception as exc: + logger.info( + "cluster state storage failed", + extra={"event": "cluster_state", "status": "error", "detail": str(exc)}, + ) + return summary diff --git a/ariadne/settings.py b/ariadne/settings.py index 3af91e2..339cfc9 100644 --- a/ariadne/settings.py +++ b/ariadne/settings.py @@ -186,6 +186,8 @@ class Settings: provision_retry_cooldown_sec: float schedule_tick_sec: float k8s_api_timeout_sec: float + vm_url: str + cluster_state_vm_timeout_sec: float mailu_sync_cron: str nextcloud_sync_cron: str @@ -206,6 +208,8 @@ class Settings: comms_reset_room_cron: str comms_seed_room_cron: str keycloak_profile_cron: str + cluster_state_cron: str + cluster_state_keep: int opensearch_url: str opensearch_limit_bytes: int @@ -455,6 +459,18 @@ class Settings: "keycloak_profile_cron": _env("ARIADNE_SCHEDULE_KEYCLOAK_PROFILE", "0 */6 * * *"), } + @classmethod + def _cluster_state_config(cls) -> dict[str, Any]: + return { + "vm_url": _env( + "ARIADNE_VM_URL", + "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", + ).rstrip("/"), + "cluster_state_vm_timeout_sec": _env_float("ARIADNE_CLUSTER_STATE_VM_TIMEOUT_SEC", 5.0), + "cluster_state_cron": _env("ARIADNE_SCHEDULE_CLUSTER_STATE", "*/15 * * * *"), + "cluster_state_keep": _env_int("ARIADNE_CLUSTER_STATE_KEEP", 168), + } + @classmethod def _opensearch_config(cls) -> dict[str, Any]: return { @@ -481,6 +497,7 @@ class Settings: image_cfg = cls._image_sweeper_config() vaultwarden_cfg = cls._vaultwarden_config() schedule_cfg = cls._schedule_config() + cluster_cfg = cls._cluster_state_config() opensearch_cfg = cls._opensearch_config() portal_db = _env("PORTAL_DATABASE_URL", "") @@ -518,6 +535,7 @@ class Settings: **image_cfg, **vaultwarden_cfg, **schedule_cfg, + **cluster_cfg, **opensearch_cfg, ) diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py new file mode 100644 index 0000000..d8b3d0c --- /dev/null +++ b/tests/test_cluster_state.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +import ariadne.services.cluster_state as cluster_state + + +class DummyStorage: + def __init__(self) -> None: + self.snapshot = None + self.keep = None + + def record_cluster_state(self, snapshot): # type: ignore[no-untyped-def] + self.snapshot = snapshot + + def prune_cluster_state(self, keep: int) -> None: + self.keep = keep + + +def test_collect_cluster_state(monkeypatch) -> None: + def fake_get_json(path: str): + if path.endswith("/nodes"): + return { + "items": [ + { + "metadata": {"name": "node-a"}, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": {"name": "node-b"}, + "status": {"conditions": [{"type": "Ready", "status": "False"}]}, + }, + ] + } + return { + "items": [ + { + "metadata": {"name": "apps", "namespace": "flux-system"}, + "spec": {"suspend": False}, + "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + }, + { + "metadata": {"name": "broken", "namespace": "flux-system"}, + "spec": {"suspend": False}, + "status": {"conditions": [{"type": "Ready", "status": "False", "reason": "Fail"}]}, + }, + ] + } + + monkeypatch.setattr(cluster_state, "get_json", fake_get_json) + monkeypatch.setattr(cluster_state, "_vm_scalar", lambda _expr: 5.0) + monkeypatch.setattr(cluster_state, "_vm_vector", lambda _expr: []) + + snapshot, summary = cluster_state.collect_cluster_state() + assert snapshot["nodes"]["total"] == 2 + assert snapshot["nodes"]["ready"] == 1 + assert snapshot["flux"]["not_ready"] == 1 + assert summary.nodes_total == 2 + assert summary.nodes_ready == 1 + assert summary.pods_running == 5.0 + + +def test_run_cluster_state_records(monkeypatch) -> None: + dummy = DummyStorage() + snapshot = {"collected_at": datetime.now(timezone.utc).isoformat()} + summary = cluster_state.ClusterStateSummary(1, 1, 1.0, 0, 0) + + monkeypatch.setattr(cluster_state, "collect_cluster_state", lambda: (snapshot, summary)) + + result = cluster_state.run_cluster_state(dummy) + assert result == summary + assert dummy.snapshot == snapshot + assert dummy.keep == cluster_state.settings.cluster_state_keep diff --git a/tests/test_metrics.py b/tests/test_metrics.py index 19dc1ea..d82481d 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -1,7 +1,13 @@ from __future__ import annotations -from ariadne.metrics.metrics import set_access_request_counts +from datetime import datetime, timezone + +from ariadne.metrics.metrics import set_access_request_counts, set_cluster_state_metrics def test_set_access_request_counts() -> None: set_access_request_counts({"pending": 2, "approved": 1}) + + +def test_set_cluster_state_metrics() -> None: + set_cluster_state_metrics(datetime.now(timezone.utc), 4, 3, 12.0, 1) diff --git a/tests/test_storage.py b/tests/test_storage.py index 5c2ff44..c15c021 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -343,3 +343,24 @@ def test_update_schedule_state_executes() -> None: ) ) assert db.executed + + +def test_record_cluster_state_executes() -> None: + db = DummyDB() + storage = Storage(db) + storage.record_cluster_state({"ok": True}) + assert db.executed + + +def test_prune_cluster_state_skips_zero() -> None: + db = DummyDB() + storage = Storage(db) + storage.prune_cluster_state(0) + assert not db.executed + + +def test_latest_cluster_state_parses_json() -> None: + db = DummyDB(row={"snapshot": "{\"ok\": true}", "created_at": datetime.now()}) + storage = Storage(db) + snapshot = storage.latest_cluster_state() + assert snapshot == {"ok": True}