ariadne/tests/unit/services/test_cluster_state_vm_domains.py

282 lines
11 KiB
Python

from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import cluster_state_vm_client as vm_client
from ariadne.services import cluster_state_vm_trends as vm_trends
from ariadne.services import cluster_state_vm_usage as vm_usage
class _FakeResponse:
def __init__(self, payload):
self.payload = payload
def raise_for_status(self) -> None:
if isinstance(self.payload, Exception):
raise self.payload
def json(self):
return self.payload
class _FakeClient:
payload = {"status": "success", "data": {"result": []}}
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
def get(self, *args, **kwargs):
return _FakeResponse(self.payload)
def _metric(name: str, value: float, **labels):
return {"metric": {name: labels.pop(name, "target"), **labels}, "value": value}
def test_vm_query_scalar_vector_and_alert_helpers(monkeypatch) -> None:
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(vm_url="", cluster_state_vm_timeout_sec=1.0))
assert vm_client._vm_query("up") is None
_FakeClient.payload = {
"status": "success",
"data": {"result": [{"metric": {"node": "titan-1"}, "value": [1, "42.5"]}]},
}
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(vm_url="http://victoria", cluster_state_vm_timeout_sec=1.0))
monkeypatch.setattr(vm_client.httpx, "Client", _FakeClient)
assert vm_client._vm_query("up")[0]["metric"]["node"] == "titan-1"
assert vm_client._vm_scalar("up") == 42.5
assert vm_client._vm_vector("up") == [{"metric": {"node": "titan-1"}, "value": 42.5}]
_FakeClient.payload = {"status": "error", "data": {"result": []}}
assert vm_client._vm_query("bad") is None
monkeypatch.setattr(vm_client, "_vm_query", lambda _expr: [{"metric": {}, "value": [1]}])
assert vm_client._vm_scalar("short") is None
assert vm_client._vm_vector("short") == []
def test_vm_client_rejects_empty_and_malformed_values(monkeypatch) -> None:
monkeypatch.setattr(vm_client, "_vm_query", lambda _expr: [])
assert vm_client._vm_scalar("empty") is None
monkeypatch.setattr(vm_client, "_vm_query", lambda _expr: [{"metric": {}, "value": [1, "bad"]}])
assert vm_client._vm_scalar("bad") is None
monkeypatch.setattr(
vm_client,
"_vm_query",
lambda _expr: [
"bad",
{"metric": {"node": "titan-1"}, "value": [1, "not-a-number"]},
{"metric": {"node": "titan-2"}, "value": [1, "2"]},
],
)
assert vm_client._vm_vector("mixed") == [{"metric": {"node": "titan-2"}, "value": 2.0}]
def test_vm_client_alerts_and_namespace_filters(monkeypatch) -> None:
entries = [
None,
{"metric": {"alertname": "NodeDown", "severity": "critical"}, "value": 2},
{"metric": {"alertname": "", "severity": "warning"}, "value": 1},
]
assert vm_client._alert_entries(entries) == [
{"alert": "NodeDown", "severity": "critical", "value": 2}
]
monkeypatch.setattr(vm_client, "_vm_vector", lambda _expr: entries)
assert vm_client._vm_alerts_now()[0]["alert"] == "NodeDown"
assert vm_client._vm_alerts_trend("1h")[0]["severity"] == "critical"
filtered = vm_client._filter_namespace_vector(
[
{"metric": {"namespace": "kube-system"}, "value": 1},
{"metric": {"namespace": "apps"}, "value": 2},
{"metric": {"namespace": ""}, "value": 3},
"bad",
]
)
assert filtered == [{"metric": {"namespace": "apps"}, "value": 2}]
alerts = [
{"labels": {"alertname": "DiskHot", "severity": "warning"}},
{"labels": {"alertname": "CPUHot", "severity": "critical"}},
{"labels": {}},
]
assert vm_client._summarize_alerts(alerts)["by_severity"] == {"warning": 1, "critical": 1}
def test_vm_client_alertmanager_success_and_errors(monkeypatch) -> None:
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(alertmanager_url="", cluster_state_vm_timeout_sec=1.0))
errors: list[str] = []
assert vm_client._alertmanager_alerts(errors) == []
assert errors == []
_FakeClient.payload = [{"labels": {"alertname": "DiskHot"}}, "bad"]
monkeypatch.setattr(vm_client, "settings", SimpleNamespace(alertmanager_url="http://alertmanager", cluster_state_vm_timeout_sec=1.0))
monkeypatch.setattr(vm_client.httpx, "Client", _FakeClient)
assert vm_client._alertmanager_alerts(errors) == [{"labels": {"alertname": "DiskHot"}}]
_FakeClient.payload = RuntimeError("alertmanager down")
assert vm_client._alertmanager_alerts(errors) == []
assert errors[-1] == "alertmanager: alertmanager down"
_FakeClient.payload = {"unexpected": "shape"}
assert vm_client._alertmanager_alerts(errors) == []
def test_vm_client_topk_baselines_and_window_series(monkeypatch) -> None:
monkeypatch.setattr(
vm_client,
"_vm_vector",
lambda _expr: [
{"metric": {"node": "titan-1", "namespace": "apps"}, "value": 9.0},
{"metric": {"node": "titan-2", "namespace": "apps"}, "value": 4.0},
],
)
assert vm_client._vm_topk("expr", "node") == {
"label": "titan-1",
"metric": {"node": "titan-1", "namespace": "apps"},
"value": 9.0,
}
assert vm_client._vm_node_metric("expr", "node")[0] == {"node": "titan-1", "value": 9.0}
baseline = vm_client._vm_baseline_map("expr", "node", "24h")
assert baseline["titan-1"]["avg"] == 9.0
assert vm_client._baseline_map_to_list(baseline, "node")[0]["node"] == "titan-1"
assert vm_client._baseline_map_to_list({"": {"avg": 10.0}, "titan-3": {"avg": 1.0}}, "node") == [
{"node": "titan-3", "avg": 1.0, "max": None}
]
assert vm_client._limit_entries([{"value": 1}, {"value": 2}], 1) == [{"value": 1}]
assert vm_client._limit_entries([{"value": 1}], 0) == []
monkeypatch.setattr(
vm_client,
"_vm_vector",
lambda _expr: [
{"metric": {}, "value": 1.0},
{"metric": {"node": ""}, "value": 2.0},
{"metric": {"node": "titan-1"}, "value": 3.0},
],
)
assert vm_client._vm_baseline_map("expr", "node", "24h") == {"titan-1": {"avg": 3.0, "max": 3.0}}
monkeypatch.setattr(
vm_client,
"_vm_vector",
lambda _expr: [
{"metric": {"node": "titan-1", "namespace": "apps"}, "value": 9.0},
{"metric": {"node": "titan-2", "namespace": "apps"}, "value": 4.0},
],
)
series = vm_client._vm_window_series("expr", "node", "node", "1h")
assert series["avg"][0]["node"] == "titan-1"
trends = vm_client._build_metric_trends({"cpu": "expr"}, "node", "node", ("1h",), 2)
assert trends["cpu"]["1h"]["avg"][0]["node"] == "titan-1"
monkeypatch.setattr(vm_client, "_vm_scalar", lambda _expr: 7.5)
assert vm_client._vm_scalar_window("expr", "1h", "max_over_time") == 7.5
assert vm_client._scalar_trends("expr", ("1h",))["1h"]["avg"] == 7.5
assert "nodes_ready" in vm_client._cluster_trends()
assert "not_ready" in vm_client._node_condition_trends()
def test_vm_trend_helpers(monkeypatch) -> None:
monkeypatch.setattr(
vm_trends,
"_scalar_trends",
lambda _expr, windows: {window: {"avg": 3.0, "max": 4.0, "min": 2.0} for window in windows},
)
monkeypatch.setattr(
vm_trends,
"_vm_vector",
lambda expr: [
{
"metric": {
"namespace": "apps",
"pod": "api",
"job_name": "api",
"reason": "CrashLoopBackOff",
"persistentvolumeclaim": "data",
},
"value": 3.0,
}
],
)
assert vm_trends._pod_reason_totals({"crash": "CrashLoopBackOff"}, "waiting")["crash"]["1h"]["avg"] == 3.0
assert "cpu" in vm_trends._node_usage_exprs()
assert "mem" in vm_trends._namespace_usage_exprs()
assert "cpu_requests" in vm_trends._namespace_request_exprs()
assert vm_trends._restart_namespace_trend("1h")[0]["namespace"] == "apps"
assert vm_trends._job_failure_trend("1h")[0]["job"] == "api"
assert vm_trends._pod_reason_entries("expr", 5)[0]["pod"] == "api"
assert vm_trends._namespace_reason_entries("expr", 5)[0]["namespace"] == "apps"
assert vm_trends._pod_waiting_now()["crash_loop"][0]["pod"] == "api"
assert vm_trends._pod_waiting_trends()["crash_loop"]["1h"][0]["pod"] == "api"
assert vm_trends._pod_terminated_now()["oom_killed"][0]["pod"] == "api"
assert vm_trends._pod_terminated_trends()["oom_killed"]["1h"][0]["pod"] == "api"
assert vm_trends._pvc_usage_trends()["1h"][0]["namespace"] == "apps"
monkeypatch.setattr(vm_trends, "_vm_scalar_window", lambda _expr, _window, _fn: 2.0)
assert vm_trends._pods_phase_trends()["1h"]["running"]["avg"] == 2.0
def test_vm_usage_helpers(monkeypatch) -> None:
monkeypatch.setattr(vm_usage, "_vm_scalar", lambda _expr: 10.0)
monkeypatch.setattr(
vm_usage,
"_vm_vector",
lambda _expr: [{"metric": {"namespace": "apps", "persistentvolumeclaim": "data"}, "value": 88.0}],
)
monkeypatch.setattr(vm_usage, "_vm_topk", lambda _expr, label: {label: "top", "value": 9.0})
monkeypatch.setattr(vm_usage, "_vm_node_metric", lambda _expr, label: [{label: "titan-1", "value": 50.0}])
errors: list[str] = []
assert vm_usage._postgres_connections(errors)["used"] == 10.0
assert vm_usage._hottest_nodes(errors)["cpu"]["node"] == "top"
assert vm_usage._node_usage(errors)["cpu"][0]["node"] == "titan-1"
assert vm_usage._pvc_usage(errors)[0]["metric"]["persistentvolumeclaim"] == "data"
assert vm_usage._usage_stats([{"value": 2}, {"value": 4}]) == {"min": 2.0, "max": 4.0, "avg": 3.0}
assert vm_usage._vm_namespace_totals("expr") == {"apps": 88.0}
capacity = vm_usage._build_namespace_capacity({"apps": 2.0}, {"apps": 1.0}, {"apps": 4.0}, {"apps": 2.0})
assert capacity[0]["cpu_usage_ratio"] == 2.0
profile = vm_usage._node_usage_profile(
{"cpu": [{"node": "titan-1", "value": 50.0}], "ram": [{"node": "titan-1", "value": 25.0}]},
[{"name": "titan-1", "pressure": {"DiskPressure": True}, "taints": [], "unschedulable": False}],
[{"node": "titan-1", "pods_total": 4}],
)
assert profile[0]["pressure_count"] == 1
assert vm_usage._percentile([1, 2, 3], 0.9) == 3
assert vm_usage._node_load_summary([{"node": "titan-1", "load_index": 1.0}])["max"] == 1.0
assert vm_usage._namespace_capacity_summary(capacity)["cpu_overcommitted"] == 1
def test_vm_usage_error_paths(monkeypatch) -> None:
def boom(*args, **kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(vm_usage, "_vm_scalar", boom)
monkeypatch.setattr(vm_usage, "_vm_vector", boom)
monkeypatch.setattr(vm_usage, "_vm_topk", boom)
monkeypatch.setattr(vm_usage, "_vm_node_metric", boom)
errors: list[str] = []
assert vm_usage._postgres_connections(errors) == {}
assert vm_usage._hottest_nodes(errors) == {}
assert vm_usage._node_usage(errors) == {}
assert vm_usage._pvc_usage(errors) == []
assert errors
assert vm_usage._usage_stats([{"value": "bad"}]) == {}
assert vm_usage._percentile([], 0.5) is None
assert vm_usage._node_load_summary([]) == {}
assert vm_usage._namespace_capacity_summary([]) == {}