from __future__ import annotations from ariadne.services import cluster_state_metric_collectors as collectors def test_collect_vm_core_records_query_errors(monkeypatch) -> None: def boom(_expr): raise RuntimeError("vm unavailable") metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_vm_scalar", boom) collectors._collect_vm_core(metrics, errors) assert errors == ["vm: vm unavailable"] def test_collect_node_metrics_builds_and_reports_baselines(monkeypatch) -> None: metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_postgres_connections", lambda errors: {"used": 2}) monkeypatch.setattr(collectors, "_hottest_nodes", lambda errors: {"cpu": {"node": "titan-1"}}) monkeypatch.setattr( collectors, "_node_usage", lambda errors: {"cpu": [{"value": 2.0}], "ram": [], "net": [], "io": [], "disk": []}, ) monkeypatch.setattr(collectors, "_usage_stats", lambda entries: {"count": len(entries)}) monkeypatch.setattr(collectors, "_node_usage_exprs", lambda: {"cpu": "expr"}) monkeypatch.setattr(collectors, "_vm_baseline_map", lambda _expr, _label, _window: {"titan-1": {"avg": 2.0}}) monkeypatch.setattr( collectors, "_baseline_map_to_list", lambda baseline, label: [{label: name, **stats} for name, stats in baseline.items()], ) collectors._collect_node_metrics(metrics, errors) assert metrics["node_usage_stats"]["cpu"] == {"count": 1} assert metrics["node_baseline"]["cpu"] == [{"node": "titan-1", "avg": 2.0}] assert metrics["node_baseline_map"]["titan-1"]["cpu"] == {"avg": 2.0} assert errors == [] def test_collect_node_metrics_records_baseline_errors(monkeypatch) -> None: metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_postgres_connections", lambda errors: {}) monkeypatch.setattr(collectors, "_hottest_nodes", lambda errors: {}) monkeypatch.setattr(collectors, "_node_usage", lambda errors: {}) monkeypatch.setattr(collectors, "_usage_stats", lambda entries: {}) monkeypatch.setattr(collectors, "_node_usage_exprs", lambda: (_ for _ in ()).throw(RuntimeError("baseline down"))) collectors._collect_node_metrics(metrics, errors) assert errors == ["baseline: baseline down"] def test_collect_trend_issue_and_alert_errors(monkeypatch) -> None: metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_build_metric_trends", lambda *args: (_ for _ in ()).throw(RuntimeError("trend down"))) monkeypatch.setattr(collectors, "_namespace_reason_entries", lambda *args: (_ for _ in ()).throw(RuntimeError("issue down"))) monkeypatch.setattr(collectors, "_vm_alerts_now", lambda: (_ for _ in ()).throw(RuntimeError("alerts down"))) collectors._collect_trend_metrics(metrics, errors) collectors._collect_issue_metrics(metrics, errors) collectors._collect_alert_metrics(metrics, errors) assert "trends: trend down" in errors assert "issues: issue down" in errors assert "alerts: alerts down" in errors def test_collect_namespace_metrics_builds_baselines_and_totals(monkeypatch) -> None: metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_vm_vector", lambda _expr: []) monkeypatch.setattr(collectors, "_filter_namespace_vector", lambda entries: entries) monkeypatch.setattr(collectors, "_vm_namespace_totals", lambda expr: {"apps": 2.0} if "cpu" in expr else {"apps": 4.0}) monkeypatch.setattr(collectors, "_build_namespace_capacity", lambda *args: [{"namespace": "apps", "cpu_usage_ratio": 2.0}]) monkeypatch.setattr(collectors, "_namespace_usage_exprs", lambda: {"cpu": "expr"}) monkeypatch.setattr(collectors, "_vm_baseline_map", lambda _expr, _label, _window: {"apps": {"avg": 2.0}}) monkeypatch.setattr( collectors, "_baseline_map_to_list", lambda baseline, label: [{label: name, **stats} for name, stats in baseline.items()], ) monkeypatch.setattr(collectors, "_namespace_capacity_summary", lambda capacity: {"count": len(capacity)}) collectors._collect_namespace_metrics(metrics, errors) assert metrics["namespace_totals"]["cpu"] == [{"namespace": "apps", "value": 2.0}] assert metrics["namespace_baseline"]["cpu"] == [{"namespace": "apps", "avg": 2.0}] assert metrics["namespace_baseline_map"]["apps"]["cpu"] == {"avg": 2.0} assert metrics["namespace_capacity_summary"] == {"count": 1} assert errors == [] def test_collect_namespace_metrics_records_usage_and_baseline_errors(monkeypatch) -> None: metrics: dict[str, object] = {} errors: list[str] = [] monkeypatch.setattr(collectors, "_vm_vector", lambda _expr: (_ for _ in ()).throw(RuntimeError("usage down"))) monkeypatch.setattr(collectors, "_namespace_usage_exprs", lambda: (_ for _ in ()).throw(RuntimeError("baseline down"))) monkeypatch.setattr(collectors, "_namespace_capacity_summary", lambda capacity: {"count": len(capacity)}) collectors._collect_namespace_metrics(metrics, errors) assert "namespace_usage: usage down" in errors assert "baseline: baseline down" in errors assert metrics["namespace_capacity_summary"] == {"count": 0} def test_namespace_totals_list_filters_invalid_names() -> None: assert collectors._namespace_totals_list({"apps": 2.0, "": 9.0, 3: 4.0}) == [ {"namespace": "apps", "value": 2.0} ]