from __future__ import annotations from datetime import datetime, timezone import ariadne.services.cluster_state as cluster_state class DummyStorage: def __init__(self) -> None: self.snapshot = None self.keep = None def record_cluster_state(self, snapshot): # type: ignore[no-untyped-def] self.snapshot = snapshot def prune_cluster_state(self, keep: int) -> None: self.keep = keep def test_collect_cluster_state(monkeypatch) -> None: def fake_get_json(path: str): if path.endswith("/nodes"): return { "items": [ { "metadata": {"name": "node-a", "labels": {"kubernetes.io/arch": "arm64"}}, "status": { "conditions": [{"type": "Ready", "status": "True"}], "nodeInfo": {"architecture": "arm64"}, "addresses": [{"type": "InternalIP", "address": "10.0.0.1"}], }, }, { "metadata": {"name": "node-b", "labels": {"kubernetes.io/arch": "amd64"}}, "status": { "conditions": [{"type": "Ready", "status": "False"}], "nodeInfo": {"architecture": "amd64"}, }, }, ] } if path.startswith("/api/v1/pods"): return { "items": [ { "metadata": { "name": "jellyfin-0", "namespace": "media", "labels": {"app": "jellyfin"}, }, "spec": {"nodeName": "node-a"}, "status": {"phase": "Running"}, } ] } return { "items": [ { "metadata": {"name": "apps", "namespace": "flux-system"}, "spec": {"suspend": False}, "status": {"conditions": [{"type": "Ready", "status": "True"}]}, }, { "metadata": {"name": "broken", "namespace": "flux-system"}, "spec": {"suspend": False}, "status": {"conditions": [{"type": "Ready", "status": "False", "reason": "Fail"}]}, }, ] } monkeypatch.setattr(cluster_state, "get_json", fake_get_json) monkeypatch.setattr(cluster_state, "_vm_scalar", lambda _expr: 5.0) monkeypatch.setattr(cluster_state, "_vm_vector", lambda _expr: []) snapshot, summary = cluster_state.collect_cluster_state() assert snapshot["nodes"]["total"] == 2 assert snapshot["nodes"]["ready"] == 1 assert snapshot["flux"]["not_ready"] == 1 assert snapshot["nodes_summary"]["total"] == 2 assert snapshot["nodes_summary"]["ready"] == 1 assert snapshot["nodes_detail"] assert snapshot["workloads"] assert snapshot["namespace_pods"] assert snapshot["namespace_pods"][0]["namespace"] == "media" assert "node_usage_stats" in snapshot["metrics"] assert snapshot["metrics"]["namespace_cpu_top"] == [] assert snapshot["metrics"]["namespace_mem_top"] == [] assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0 def test_run_cluster_state_records(monkeypatch) -> None: dummy = DummyStorage() snapshot = {"collected_at": datetime.now(timezone.utc).isoformat()} summary = cluster_state.ClusterStateSummary(1, 1, 1.0, 0, 0) monkeypatch.setattr(cluster_state, "collect_cluster_state", lambda: (snapshot, summary)) result = cluster_state.run_cluster_state(dummy) assert result == summary assert dummy.snapshot == snapshot assert dummy.keep == cluster_state.settings.cluster_state_keep