test(ariadne): cover cluster state domains

This commit is contained in:
codex 2026-04-21 02:22:47 -03:00
parent f0e161ba8b
commit cbe774acfd
3 changed files with 576 additions and 0 deletions

View File

@ -0,0 +1,148 @@
from __future__ import annotations
from ariadne.services import cluster_state_anomalies as anomalies
from ariadne.services import cluster_state_attention as attention
from ariadne.services import cluster_state_health as health
from ariadne.services import cluster_state_profiles as profiles
from ariadne.services import cluster_state_relationships as relationships
from ariadne.services import cluster_state_signals as signals
from ariadne.services.cluster_state_contract import SignalContext
def test_relationship_context_and_cross_stats() -> None:
namespace_context = relationships._namespace_context(
[{"namespace": "apps", "pods_total": 4, "pods_running": 3, "primary_node": "titan-1"}],
[{"namespace": "apps", "nodes": {"titan-1": 3}, "primary_node": "titan-1"}],
[
{
"namespace": "apps",
"cpu_usage": 2.0,
"mem_usage": 4.0,
"cpu_usage_ratio": 1.5,
"mem_usage_ratio": 0.5,
}
],
{"apps": {"cpu": {"avg": 1.0}, "mem": {"avg": 2.0}}},
)
assert namespace_context[0]["baseline_delta"]["cpu"] == 100.0
assert relationships._namespace_nodes_top(namespace_context, 1)[0]["namespace"] == "apps"
workloads = [{"namespace": "apps", "workload": "api", "nodes": {"titan-1": 2}, "pods_total": 2}]
node_workloads = relationships._node_workload_map(workloads)
assert node_workloads["titan-1"] == {"apps/api": 2}
assert relationships._workload_nodes_top(workloads, 1)[0]["workload"] == "api"
assert relationships._node_workloads_top(node_workloads)[0]["node"] == "titan-1"
assert relationships._workload_index(workloads)[0]["workload"] == "api"
node_context = relationships._node_context(
[
{
"name": "titan-1",
"ready": True,
"hardware": "rpi5",
"arch": "arm64",
"roles": ["worker"],
}
],
[{"node": "titan-1", "cpu": 80.0, "ram": 40.0, "load_index": 0.9}],
{"titan-1": {"cpu": {"avg": 40.0}, "ram": {"avg": 20.0}}},
node_workloads,
)
assert node_context[0]["baseline_delta"]["cpu"] == 100.0
metrics = {
"node_usage": {"cpu": [{"node": "titan-1", "value": 80.0}]},
"namespace_top": {"cpu": [{"namespace": "apps", "value": 2.0}]},
"pvc_usage_top": [{"metric": {"namespace": "apps", "persistentvolumeclaim": "data"}, "value": 91.0}],
}
assert relationships._cross_node_metric_top(metrics, node_context)[0]["node"] == "titan-1"
assert relationships._cross_namespace_metric_top(metrics, namespace_context)[0]["namespace"] == "apps"
assert relationships._build_cross_stats(metrics, node_context, namespace_context, workloads)["node_metric_top"]
assert relationships._build_lexicon()["aliases"]["hot node"]
assert relationships._delta_top(node_context, "cpu")[0]["severity"] == "critical"
assert relationships._reason_top({"OOMKilled": 2})[0]["reason"] == "OOMKilled"
def test_health_anomaly_signal_profile_and_attention_domains() -> None:
metrics = {
"nodes_total": 2,
"nodes_ready": 1,
"pods_running": 8,
"pods_pending": 2,
"pods_failed": 1,
"job_failures_24h": [{"value": 1, "metric": {"job_name": "job"}}],
"pvc_usage_top": [{"metric": {"namespace": "apps", "persistentvolumeclaim": "data"}, "value": 92.0}],
"top_restarts_1h": [{"metric": {"namespace": "apps", "pod": "api"}, "value": 3}],
}
nodes_summary = {"pressure_nodes": {"DiskPressure": ["titan-1"]}, "unschedulable_nodes": ["titan-2"]}
workloads_health = {
"deployments": {"not_ready": 1, "items": [{"namespace": "apps", "name": "api", "desired": 2, "ready": 1}]},
"statefulsets": {"not_ready": 0, "items": []},
"daemonsets": {"not_ready": 0, "items": []},
}
pod_issues = {
"pending_over_15m": 2,
"counts": {"Failed": 1},
"waiting_reasons": {"CrashLoopBackOff": 3},
"phase_reasons": {"Evicted": 1},
}
kustomizations = {"not_ready": 1, "items": [{"name": "apps"}]}
events = {"warnings_total": 1, "warnings": [{"reason": "BackOff"}]}
anomaly_rows = anomalies._build_anomalies(metrics, nodes_summary, workloads_health, kustomizations, events)
assert {row["kind"] for row in anomaly_rows} >= {"pods_pending", "pvc_pressure", "flux_not_ready"}
assert anomalies._severity_rank("critical") == 0
assert anomalies._pvc_pressure_signals(metrics)[0]["target"] == "apps/data"
bullets = health._health_bullets(metrics, nodes_summary, workloads_health, anomaly_rows)
assert bullets[0] == "Nodes ready: 1/2"
assert health._workload_not_ready_items(workloads_health)[0]["name"] == "api"
assert health._pod_restarts_top(metrics)[0]["pod"] == "api"
node_context = [
{
"node": "titan-1",
"ready": True,
"hardware": "rpi5",
"arch": "arm64",
"roles": ["worker"],
"cpu": 90.0,
"ram": 85.0,
"disk": 95.0,
"net": 50.0,
"io": 60.0,
"load_index": 0.95,
"baseline": {"net": {"max": 10.0}, "io": {"max": 20.0}},
"baseline_delta": {"cpu": 100.0},
"pressure_flags": ["DiskPressure"],
}
]
namespace_context = [
{
"namespace": "apps",
"pods_total": 4,
"pods_running": 3,
"primary_node": "titan-1",
"nodes_top": [("titan-1", 4)],
"cpu_usage": 2.0,
"mem_usage": 4.0,
"cpu_ratio": 1.5,
"mem_ratio": 0.5,
"baseline_delta": {"cpu": 100.0},
"baseline": {"cpu": {"avg": 1.0}},
}
]
context = SignalContext(metrics, node_context, namespace_context, workloads_health, pod_issues, kustomizations)
assert signals._pod_issue_summary(pod_issues, metrics)["waiting_reasons_top"][0]["reason"] == "CrashLoopBackOff"
assert signals._build_signals(context)
node_pods = [{"node": "titan-1", "pods_total": 4, "pods_running": 3, "namespaces_top": [("apps", 4)]}]
node_workloads = {"titan-1": {"apps/api": 2}}
workloads = [{"namespace": "apps", "workload": "api", "pods_total": 2, "pods_running": 1, "nodes": {"titan-1": 2}}]
built_profiles = profiles._build_profiles(node_context, namespace_context, node_pods, workloads, node_workloads)
assert built_profiles["nodes"][0]["node"] == "titan-1"
assert built_profiles["namespaces"][0]["namespace"] == "apps"
assert built_profiles["workloads"][0]["workload"] == "api"
ranked = attention._build_attention_ranked(metrics, node_context, pod_issues, workloads_health)
assert ranked[0]["score"] > 0
assert attention._node_attention_score(node_context[0])[0] > 0

View File

@ -0,0 +1,210 @@
from __future__ import annotations
from ariadne.services import cluster_state_fetchers as fetchers
from ariadne.services import cluster_state_flux_events as flux_events
from ariadne.services import cluster_state_nodes as nodes
from ariadne.services import cluster_state_pods as pods
from ariadne.services import cluster_state_workloads as workloads
def test_node_summary_inventory_and_hardware_usage() -> None:
payload = {
"items": [
{
"metadata": {
"name": "titan-1",
"labels": {"kubernetes.io/arch": "arm64", "hardware": "rpi5"},
"creationTimestamp": "2026-01-01T00:00:00Z",
},
"spec": {"unschedulable": True, "taints": [{"key": "dedicated", "effect": "NoSchedule"}]},
"status": {
"conditions": [
{"type": "Ready", "status": "True"},
{"type": "DiskPressure", "status": "True", "reason": "Full", "message": "disk full"},
],
"capacity": {"cpu": "4", "memory": "8Gi", "pods": "110"},
"allocatable": {"cpu": "3", "memory": "7Gi"},
"nodeInfo": {"architecture": "arm64"},
"addresses": [{"type": "InternalIP", "address": "10.0.0.1"}],
},
},
{"metadata": {"name": "titan-2", "labels": {}}, "status": {"conditions": []}},
]
}
summary = nodes._summarize_nodes(payload)
assert summary["total"] == 2
assert summary["not_ready"] == 1
details = nodes._node_details(payload)
assert details[0]["pressure"]["DiskPressure"] is True
assert details[0]["taints"][0]["key"] == "dedicated"
assert nodes._node_labels({"node-role.kubernetes.io/control-plane": "", "other": "skip"})
assert nodes._node_addresses({"addresses": [{"type": "Hostname", "address": "titan-1"}]}) == {"Hostname": "titan-1"}
assert nodes._node_capacity({"cpu": "4", "unknown": "skip"}) == {"cpu": "4"}
assert nodes._node_pressure_conditions([{"type": "PIDPressure", "status": "True"}])["PIDPressure"] is True
assert nodes._node_roles({"node-role.kubernetes.io/control-plane": ""}) == ["control-plane"]
assert nodes._node_is_worker({"node-role.kubernetes.io/control-plane": ""}) is False
assert nodes._hardware_hint({"jetson": "true"}, {"architecture": "arm64"}) == "jetson"
assert nodes._condition_status([], "Ready") == (None, "", "")
assert nodes._age_hours("not-a-date") is None
assert nodes._node_age_stats(details)["oldest"]
assert nodes._node_flagged(details, "unschedulable") == ["titan-1"]
assert nodes._summarize_inventory(details)["unschedulable_nodes"] == ["titan-1"]
assert nodes._hardware_groups(details)[0]["hardware"]
assert nodes._pressure_summary(nodes._summarize_inventory(details))["total"] == 1
usage = nodes._node_usage_by_hardware([{"node": "titan-1", "cpu": 80.0, "load_index": 0.5}], details)
assert usage[0]["hardware"] == "rpi5"
def test_flux_and_event_summaries() -> None:
flux_payload = {
"items": [
{
"metadata": {"name": "apps", "namespace": "flux-system"},
"spec": {"suspend": False},
"status": {"conditions": [{"type": "Ready", "status": "True"}]},
},
{
"metadata": {"name": "broken", "namespace": "flux-system"},
"spec": {"suspend": True},
"status": {"conditions": [{"type": "Ready", "status": "False", "reason": "Bad", "message": "no"}]},
},
]
}
assert flux_events._summarize_kustomizations(flux_payload)["not_ready"] == 1
assert flux_events._namespace_allowed("apps") is True
assert flux_events._namespace_allowed("kube-system") is False
assert flux_events._event_sort_key("bad") == 0.0
events = flux_events._summarize_events(
{
"items": [
{
"metadata": {"namespace": "apps"},
"type": "Warning",
"reason": "BackOff",
"message": "retry",
"count": 2,
"lastTimestamp": "2026-01-01T00:00:00Z",
"involvedObject": {"kind": "Pod", "name": "api"},
},
{"metadata": {"namespace": "kube-system"}, "type": "Warning", "reason": "Ignored"},
{"metadata": {"namespace": "apps"}, "type": "Normal"},
]
}
)
assert events["warnings_total"] == 1
assert events["warnings_top_reason"]["reason"] == "BackOff"
def test_pod_summaries_and_issue_detection() -> None:
payload = {
"items": [
{
"metadata": {
"name": "api-1",
"namespace": "apps",
"labels": {"app": "api"},
"creationTimestamp": "2026-01-01T00:00:00Z",
},
"spec": {"nodeName": "titan-1"},
"status": {
"phase": "Pending",
"reason": "Unschedulable",
"containerStatuses": [
{"restartCount": 2, "state": {"waiting": {"reason": "CrashLoopBackOff"}}}
],
},
},
{
"metadata": {
"name": "worker-1",
"namespace": "apps",
"ownerReferences": [{"name": "worker", "kind": "ReplicaSet"}],
},
"spec": {"nodeName": "titan-2"},
"status": {"phase": "Running"},
},
]
}
assert pods._workload_from_labels({"app": "api"}) == ("api", "label:app")
assert pods._owner_reference({"ownerReferences": [{"name": "rs", "kind": "ReplicaSet"}]}) == ("rs", "owner:ReplicaSet")
assert pods._pod_workload({"labels": {}, "ownerReferences": [{"name": "rs"}]})[0] == "rs"
assert pods._summarize_workloads(payload)[0]["workload"] == "api"
assert pods._summarize_namespace_pods(payload)[0]["pods_total"] == 2
assert pods._summarize_namespace_nodes(payload)[0]["primary_node"]
node_pods = pods._summarize_node_pods(payload)
assert pods._node_pods_top(node_pods)[0]["node"] == "titan-1"
issues = pods._summarize_pod_issues(payload)
assert issues["counts"]["Pending"] == 1
assert issues["waiting_reasons"]["CrashLoopBackOff"] == 1
def test_workload_job_longhorn_and_fetch_summaries(monkeypatch) -> None:
jobs = workloads._summarize_jobs(
{
"items": [
{
"metadata": {"name": "backup", "namespace": "apps", "creationTimestamp": "2026-01-01T00:00:00Z"},
"status": {"failed": 1, "succeeded": 0, "active": 1},
}
]
}
)
assert jobs["totals"]["failed"] == 1
deployments = workloads._summarize_deployments(
{"items": [{"metadata": {"name": "api", "namespace": "apps"}, "spec": {"replicas": 2}, "status": {"readyReplicas": 1}}]}
)
statefulsets = workloads._summarize_statefulsets(
{"items": [{"metadata": {"name": "db", "namespace": "apps"}, "spec": {"replicas": 2}, "status": {"readyReplicas": 1}}]}
)
daemonsets = workloads._summarize_daemonsets(
{"items": [{"metadata": {"name": "agent", "namespace": "apps"}, "status": {"desiredNumberScheduled": 2, "numberReady": 1}}]}
)
health = workloads._summarize_workload_health(deployments, statefulsets, daemonsets)
assert health["deployments"]["not_ready"] == 1
longhorn = workloads._summarize_longhorn_volumes(
{
"items": [
{
"metadata": {"name": "pvc-data"},
"spec": {"size": "1Gi"},
"status": {"state": "attached", "robustness": "degraded", "actualSize": "500Mi"},
}
]
}
)
assert longhorn["degraded_count"] == 1
def fake_get_json(path: str):
if path.endswith("/nodes"):
return {"items": []}
if path.startswith("/api/v1/pods"):
return {"items": []}
if path.startswith("/apis/batch"):
return {"items": []}
if "longhorn" in path:
return {"items": []}
if "deployments" in path or "statefulsets" in path or "daemonsets" in path:
return {"items": []}
if path.startswith("/api/v1/events"):
return {"items": []}
return {"items": []}
monkeypatch.setattr(fetchers, "_get_json", fake_get_json)
errors: list[str] = []
assert fetchers._fetch_nodes(errors)[0]["total"] == 0
assert fetchers._fetch_flux(errors)["not_ready"] == 0
assert fetchers._fetch_pods(errors)[0] == []
assert fetchers._fetch_jobs(errors)["totals"]["total"] == 0
assert fetchers._fetch_longhorn(errors) == {}
assert fetchers._fetch_workload_health(errors)["deployments"]["total"] == 0
assert fetchers._fetch_events(errors)["warnings_total"] == 0
assert errors == []
monkeypatch.setattr(fetchers, "_get_json", lambda _path: (_ for _ in ()).throw(RuntimeError("boom")))
errors = []
assert fetchers._fetch_jobs(errors) == {}
assert errors == ["jobs: boom"]

View File

@ -0,0 +1,218 @@
from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import cluster_state_vm_client as vm_client
from ariadne.services import cluster_state_vm_trends as vm_trends
from ariadne.services import cluster_state_vm_usage as vm_usage
class _FakeResponse:
def __init__(self, payload):
self.payload = payload
def raise_for_status(self) -> None:
if isinstance(self.payload, Exception):
raise self.payload
def json(self):
return self.payload
class _FakeClient:
payload = {"status": "success", "data": {"result": []}}
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
def get(self, *args, **kwargs):
return _FakeResponse(self.payload)
def _metric(name: str, value: float, **labels):
return {"metric": {name: labels.pop(name, "target"), **labels}, "value": value}
def test_vm_query_scalar_vector_and_alert_helpers(monkeypatch) -> None:
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(vm_url="", cluster_state_vm_timeout_sec=1.0))
assert vm_client._vm_query("up") is None
_FakeClient.payload = {
"status": "success",
"data": {"result": [{"metric": {"node": "titan-1"}, "value": [1, "42.5"]}]},
}
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(vm_url="http://victoria", cluster_state_vm_timeout_sec=1.0))
monkeypatch.setattr(vm_client.httpx, "Client", _FakeClient)
assert vm_client._vm_query("up")[0]["metric"]["node"] == "titan-1"
assert vm_client._vm_scalar("up") == 42.5
assert vm_client._vm_vector("up") == [{"metric": {"node": "titan-1"}, "value": 42.5}]
_FakeClient.payload = {"status": "error", "data": {"result": []}}
assert vm_client._vm_query("bad") is None
monkeypatch.setattr(vm_client, "_vm_query", lambda _expr: [{"metric": {}, "value": [1]}])
assert vm_client._vm_scalar("short") is None
assert vm_client._vm_vector("short") == []
def test_vm_client_alerts_and_namespace_filters(monkeypatch) -> None:
entries = [
{"metric": {"alertname": "NodeDown", "severity": "critical"}, "value": 2},
{"metric": {"alertname": "", "severity": "warning"}, "value": 1},
]
assert vm_client._alert_entries(entries) == [
{"alert": "NodeDown", "severity": "critical", "value": 2}
]
monkeypatch.setattr(vm_client, "_vm_vector", lambda _expr: entries)
assert vm_client._vm_alerts_now()[0]["alert"] == "NodeDown"
assert vm_client._vm_alerts_trend("1h")[0]["severity"] == "critical"
filtered = vm_client._filter_namespace_vector(
[
{"metric": {"namespace": "kube-system"}, "value": 1},
{"metric": {"namespace": "apps"}, "value": 2},
{"metric": {"namespace": ""}, "value": 3},
]
)
assert filtered == [{"metric": {"namespace": "apps"}, "value": 2}]
alerts = [
{"labels": {"alertname": "DiskHot", "severity": "warning"}},
{"labels": {"alertname": "CPUHot", "severity": "critical"}},
{"labels": {}},
]
assert vm_client._summarize_alerts(alerts)["by_severity"] == {"warning": 1, "critical": 1}
def test_vm_client_topk_baselines_and_window_series(monkeypatch) -> None:
monkeypatch.setattr(
vm_client,
"_vm_vector",
lambda _expr: [
{"metric": {"node": "titan-1", "namespace": "apps"}, "value": 9.0},
{"metric": {"node": "titan-2", "namespace": "apps"}, "value": 4.0},
],
)
assert vm_client._vm_topk("expr", "node") == {
"label": "titan-1",
"metric": {"node": "titan-1", "namespace": "apps"},
"value": 9.0,
}
assert vm_client._vm_node_metric("expr", "node")[0] == {"node": "titan-1", "value": 9.0}
baseline = vm_client._vm_baseline_map("expr", "node", "24h")
assert baseline["titan-1"]["avg"] == 9.0
assert vm_client._baseline_map_to_list(baseline, "node")[0]["node"] == "titan-1"
assert vm_client._limit_entries([{"value": 1}, {"value": 2}], 1) == [{"value": 1}]
series = vm_client._vm_window_series("expr", "node", "node", "1h")
assert series["avg"][0]["node"] == "titan-1"
trends = vm_client._build_metric_trends({"cpu": "expr"}, "node", "node", ("1h",), 2)
assert trends["cpu"]["1h"]["avg"][0]["node"] == "titan-1"
monkeypatch.setattr(vm_client, "_vm_scalar", lambda _expr: 7.5)
assert vm_client._vm_scalar_window("expr", "1h", "max_over_time") == 7.5
assert vm_client._scalar_trends("expr", ("1h",))["1h"]["avg"] == 7.5
assert "nodes_ready" in vm_client._cluster_trends()
assert "not_ready" in vm_client._node_condition_trends()
def test_vm_trend_helpers(monkeypatch) -> None:
monkeypatch.setattr(
vm_trends,
"_scalar_trends",
lambda _expr, windows: {window: {"avg": 3.0, "max": 4.0, "min": 2.0} for window in windows},
)
monkeypatch.setattr(
vm_trends,
"_vm_vector",
lambda expr: [
{
"metric": {
"namespace": "apps",
"pod": "api",
"job_name": "api",
"reason": "CrashLoopBackOff",
"persistentvolumeclaim": "data",
},
"value": 3.0,
}
],
)
assert vm_trends._pod_reason_totals({"crash": "CrashLoopBackOff"}, "waiting")["crash"]["1h"]["avg"] == 3.0
assert "cpu" in vm_trends._node_usage_exprs()
assert "mem" in vm_trends._namespace_usage_exprs()
assert "cpu_requests" in vm_trends._namespace_request_exprs()
assert vm_trends._restart_namespace_trend("1h")[0]["namespace"] == "apps"
assert vm_trends._job_failure_trend("1h")[0]["job"] == "api"
assert vm_trends._pod_reason_entries("expr", 5)[0]["pod"] == "api"
assert vm_trends._namespace_reason_entries("expr", 5)[0]["namespace"] == "apps"
assert vm_trends._pod_waiting_now()["crash_loop"][0]["pod"] == "api"
assert vm_trends._pod_waiting_trends()["crash_loop"]["1h"][0]["pod"] == "api"
assert vm_trends._pod_terminated_now()["oom_killed"][0]["pod"] == "api"
assert vm_trends._pod_terminated_trends()["oom_killed"]["1h"][0]["pod"] == "api"
assert vm_trends._pvc_usage_trends()["1h"][0]["namespace"] == "apps"
monkeypatch.setattr(vm_trends, "_vm_scalar_window", lambda _expr, _window, _fn: 2.0)
assert vm_trends._pods_phase_trends()["1h"]["running"]["avg"] == 2.0
def test_vm_usage_helpers(monkeypatch) -> None:
monkeypatch.setattr(vm_usage, "_vm_scalar", lambda _expr: 10.0)
monkeypatch.setattr(
vm_usage,
"_vm_vector",
lambda _expr: [{"metric": {"namespace": "apps", "persistentvolumeclaim": "data"}, "value": 88.0}],
)
monkeypatch.setattr(vm_usage, "_vm_topk", lambda _expr, label: {label: "top", "value": 9.0})
monkeypatch.setattr(vm_usage, "_vm_node_metric", lambda _expr, label: [{label: "titan-1", "value": 50.0}])
errors: list[str] = []
assert vm_usage._postgres_connections(errors)["used"] == 10.0
assert vm_usage._hottest_nodes(errors)["cpu"]["node"] == "top"
assert vm_usage._node_usage(errors)["cpu"][0]["node"] == "titan-1"
assert vm_usage._pvc_usage(errors)[0]["metric"]["persistentvolumeclaim"] == "data"
assert vm_usage._usage_stats([{"value": 2}, {"value": 4}]) == {"min": 2.0, "max": 4.0, "avg": 3.0}
assert vm_usage._vm_namespace_totals("expr") == {"apps": 88.0}
capacity = vm_usage._build_namespace_capacity({"apps": 2.0}, {"apps": 1.0}, {"apps": 4.0}, {"apps": 2.0})
assert capacity[0]["cpu_usage_ratio"] == 2.0
profile = vm_usage._node_usage_profile(
{"cpu": [{"node": "titan-1", "value": 50.0}], "ram": [{"node": "titan-1", "value": 25.0}]},
[{"name": "titan-1", "pressure": {"DiskPressure": True}, "taints": [], "unschedulable": False}],
[{"node": "titan-1", "pods_total": 4}],
)
assert profile[0]["pressure_count"] == 1
assert vm_usage._percentile([1, 2, 3], 0.9) == 3
assert vm_usage._node_load_summary([{"node": "titan-1", "load_index": 1.0}])["max"] == 1.0
assert vm_usage._namespace_capacity_summary(capacity)["cpu_overcommitted"] == 1
def test_vm_usage_error_paths(monkeypatch) -> None:
def boom(*args, **kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(vm_usage, "_vm_scalar", boom)
monkeypatch.setattr(vm_usage, "_vm_vector", boom)
monkeypatch.setattr(vm_usage, "_vm_topk", boom)
monkeypatch.setattr(vm_usage, "_vm_node_metric", boom)
errors: list[str] = []
assert vm_usage._postgres_connections(errors) == {}
assert vm_usage._hottest_nodes(errors) == {}
assert vm_usage._node_usage(errors) == {}
assert vm_usage._pvc_usage(errors) == []
assert errors
assert vm_usage._usage_stats([{"value": "bad"}]) == {}
assert vm_usage._percentile([], 0.5) is None
assert vm_usage._node_load_summary([]) == {}
assert vm_usage._namespace_capacity_summary([]) == {}