ariadne/tests/test_cluster_state.py

99 lines
3.6 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
import ariadne.services.cluster_state as cluster_state
class DummyStorage:
def __init__(self) -> None:
self.snapshot = None
self.keep = None
def record_cluster_state(self, snapshot): # type: ignore[no-untyped-def]
self.snapshot = snapshot
def prune_cluster_state(self, keep: int) -> None:
self.keep = keep
def test_collect_cluster_state(monkeypatch) -> None:
def fake_get_json(path: str):
if path.endswith("/nodes"):
return {
"items": [
{
"metadata": {"name": "node-a", "labels": {"kubernetes.io/arch": "arm64"}},
"status": {
"conditions": [{"type": "Ready", "status": "True"}],
"nodeInfo": {"architecture": "arm64"},
"addresses": [{"type": "InternalIP", "address": "10.0.0.1"}],
},
},
{
"metadata": {"name": "node-b", "labels": {"kubernetes.io/arch": "amd64"}},
"status": {
"conditions": [{"type": "Ready", "status": "False"}],
"nodeInfo": {"architecture": "amd64"},
},
},
]
}
if path.startswith("/api/v1/pods"):
return {
"items": [
{
"metadata": {
"name": "jellyfin-0",
"namespace": "media",
"labels": {"app": "jellyfin"},
},
"spec": {"nodeName": "node-a"},
"status": {"phase": "Running"},
}
]
}
return {
"items": [
{
"metadata": {"name": "apps", "namespace": "flux-system"},
"spec": {"suspend": False},
"status": {"conditions": [{"type": "Ready", "status": "True"}]},
},
{
"metadata": {"name": "broken", "namespace": "flux-system"},
"spec": {"suspend": False},
"status": {"conditions": [{"type": "Ready", "status": "False", "reason": "Fail"}]},
},
]
}
monkeypatch.setattr(cluster_state, "get_json", fake_get_json)
monkeypatch.setattr(cluster_state, "_vm_scalar", lambda _expr: 5.0)
monkeypatch.setattr(cluster_state, "_vm_vector", lambda _expr: [])
snapshot, summary = cluster_state.collect_cluster_state()
assert snapshot["nodes"]["total"] == 2
assert snapshot["nodes"]["ready"] == 1
assert snapshot["flux"]["not_ready"] == 1
assert snapshot["nodes_summary"]["total"] == 2
assert snapshot["nodes_summary"]["ready"] == 1
assert snapshot["nodes_detail"]
assert snapshot["workloads"]
assert summary.nodes_total == 2
assert summary.nodes_ready == 1
assert summary.pods_running == 5.0
def test_run_cluster_state_records(monkeypatch) -> None:
dummy = DummyStorage()
snapshot = {"collected_at": datetime.now(timezone.utc).isoformat()}
summary = cluster_state.ClusterStateSummary(1, 1, 1.0, 0, 0)
monkeypatch.setattr(cluster_state, "collect_cluster_state", lambda: (snapshot, summary))
result = cluster_state.run_cluster_state(dummy)
assert result == summary
assert dummy.snapshot == snapshot
assert dummy.keep == cluster_state.settings.cluster_state_keep