From 30a9377594f5d5ba71a52b63bb29ec6308a0e981 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 28 Jan 2026 22:33:26 -0300 Subject: [PATCH] cluster_state: add namespace/node pod summaries --- ariadne/services/cluster_state.py | 117 ++++++++++++++++++++++++++++++ tests/test_cluster_state.py | 2 + 2 files changed, 119 insertions(+) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 127aaa9..70445b3 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -397,6 +397,117 @@ def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]: return output +def _summarize_namespace_nodes(payload: dict[str, Any]) -> list[dict[str, Any]]: + namespaces: dict[str, dict[str, Any]] = {} + for pod in _items(payload): + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + if not _namespace_allowed(namespace): + continue + node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else "" + if not node: + continue + phase = status.get("phase") if isinstance(status.get("phase"), str) else "" + entry = namespaces.setdefault( + namespace, + { + "namespace": namespace, + "pods_total": 0, + "pods_running": 0, + "nodes": {}, + }, + ) + entry["pods_total"] += 1 + if phase == "Running": + entry["pods_running"] += 1 + nodes = entry["nodes"] + nodes[node] = nodes.get(node, 0) + 1 + output: list[dict[str, Any]] = [] + for entry in namespaces.values(): + nodes = entry.get("nodes") or {} + primary = "" + if isinstance(nodes, dict) and nodes: + primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0] + entry["primary_node"] = primary + output.append(entry) + output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or "")) + return output + + +_NODE_PHASE_KEYS = { + "Running": "pods_running", + "Pending": "pods_pending", + "Failed": "pods_failed", + "Succeeded": "pods_succeeded", +} + + +def _summarize_node_pods(payload: dict[str, Any]) -> list[dict[str, Any]]: + nodes: dict[str, dict[str, Any]] = {} + for pod in _items(payload): + context = _node_pod_context(pod) + if not context: + continue + node, namespace, phase = context + entry = _node_pod_entry(nodes, node) + _node_pod_apply(entry, namespace, phase) + return _node_pod_finalize(nodes) + + +def _node_pod_context(pod: dict[str, Any]) -> tuple[str, str, str] | None: + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + if not _namespace_allowed(namespace): + return None + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else "" + if not node: + return None + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + phase = status.get("phase") if isinstance(status.get("phase"), str) else "" + return node, namespace, phase + + +def _node_pod_entry(nodes: dict[str, dict[str, Any]], node: str) -> dict[str, Any]: + return nodes.setdefault( + node, + { + "node": node, + "pods_total": 0, + "pods_running": 0, + "pods_pending": 0, + "pods_failed": 0, + "pods_succeeded": 0, + "namespaces": {}, + }, + ) + + +def _node_pod_apply(entry: dict[str, Any], namespace: str, phase: str) -> None: + entry["pods_total"] += 1 + phase_key = _NODE_PHASE_KEYS.get(phase) + if phase_key: + entry[phase_key] += 1 + if namespace: + namespaces = entry["namespaces"] + namespaces[namespace] = namespaces.get(namespace, 0) + 1 + + +def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for entry in nodes.values(): + namespaces = entry.get("namespaces") or {} + if isinstance(namespaces, dict): + entry["namespaces_top"] = sorted( + namespaces.items(), key=lambda item: (-item[1], item[0]) + )[:3] + output.append(entry) + output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("node") or "")) + return output + + def _vm_query(expr: str) -> list[dict[str, Any]] | None: base = settings.vm_url if not base: @@ -651,10 +762,14 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: workloads: list[dict[str, Any]] = [] namespace_pods: list[dict[str, Any]] = [] + namespace_nodes: list[dict[str, Any]] = [] + node_pods: list[dict[str, Any]] = [] try: pods_payload = get_json("/api/v1/pods?limit=5000") workloads = _summarize_workloads(pods_payload) namespace_pods = _summarize_namespace_pods(pods_payload) + namespace_nodes = _summarize_namespace_nodes(pods_payload) + node_pods = _summarize_node_pods(pods_payload) except Exception as exc: errors.append(f"pods: {exc}") @@ -668,6 +783,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "flux": kustomizations or {}, "workloads": workloads, "namespace_pods": namespace_pods, + "namespace_nodes": namespace_nodes, + "node_pods": node_pods, "metrics": metrics, "errors": errors, } diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index 4877a61..c215809 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -82,6 +82,8 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["workloads"] assert snapshot["namespace_pods"] assert snapshot["namespace_pods"][0]["namespace"] == "media" + assert snapshot["namespace_nodes"] + assert snapshot["node_pods"] assert "node_usage_stats" in snapshot["metrics"] assert snapshot["metrics"]["namespace_cpu_top"] == [] assert snapshot["metrics"]["namespace_mem_top"] == []