from __future__ import annotations from datetime import datetime, timezone import ariadne.services.cluster_state as cluster_state class DummyStorage: def __init__(self) -> None: self.snapshot = None self.keep = None def record_cluster_state(self, snapshot): # type: ignore[no-untyped-def] self.snapshot = snapshot def prune_cluster_state(self, keep: int) -> None: self.keep = keep def test_collect_cluster_state(monkeypatch) -> None: def fake_get_json(path: str): if path.endswith("/nodes"): return { "items": [ { "metadata": {"name": "node-a"}, "status": {"conditions": [{"type": "Ready", "status": "True"}]}, }, { "metadata": {"name": "node-b"}, "status": {"conditions": [{"type": "Ready", "status": "False"}]}, }, ] } return { "items": [ { "metadata": {"name": "apps", "namespace": "flux-system"}, "spec": {"suspend": False}, "status": {"conditions": [{"type": "Ready", "status": "True"}]}, }, { "metadata": {"name": "broken", "namespace": "flux-system"}, "spec": {"suspend": False}, "status": {"conditions": [{"type": "Ready", "status": "False", "reason": "Fail"}]}, }, ] } monkeypatch.setattr(cluster_state, "get_json", fake_get_json) monkeypatch.setattr(cluster_state, "_vm_scalar", lambda _expr: 5.0) monkeypatch.setattr(cluster_state, "_vm_vector", lambda _expr: []) snapshot, summary = cluster_state.collect_cluster_state() assert snapshot["nodes"]["total"] == 2 assert snapshot["nodes"]["ready"] == 1 assert snapshot["flux"]["not_ready"] == 1 assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0 def test_run_cluster_state_records(monkeypatch) -> None: dummy = DummyStorage() snapshot = {"collected_at": datetime.now(timezone.utc).isoformat()} summary = cluster_state.ClusterStateSummary(1, 1, 1.0, 0, 0) monkeypatch.setattr(cluster_state, "collect_cluster_state", lambda: (snapshot, summary)) result = cluster_state.run_cluster_state(dummy) assert result == summary assert dummy.snapshot == snapshot assert dummy.keep == cluster_state.settings.cluster_state_keep