From ef756ff1fa78aae742d0d73859a2a0d864268d44 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 29 Jan 2026 01:52:58 -0300 Subject: [PATCH] cluster_state: add pressure and pod issues --- ariadne/services/cluster_state.py | 162 ++++++++++++++++++++++++++---- tests/test_cluster_state.py | 2 + 2 files changed, 146 insertions(+), 18 deletions(-) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 70445b3..e3db568 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -41,6 +41,23 @@ _SYSTEM_NAMESPACES = { _WORKLOAD_ALLOWED_NAMESPACES = { "maintenance", } +_CAPACITY_KEYS = { + "cpu", + "memory", + "pods", + "ephemeral-storage", +} +_PRESSURE_TYPES = { + "MemoryPressure", + "DiskPressure", + "PIDPressure", + "NetworkUnavailable", +} +_PHASE_SEVERITY = { + "Failed": 3, + "Pending": 2, + "Unknown": 1, +} @dataclass(frozen=True) @@ -135,6 +152,7 @@ def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]: if not name: continue roles = _node_roles(labels) + conditions = _node_pressure_conditions(status.get("conditions")) details.append( { "name": name, @@ -149,6 +167,9 @@ def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]: "kubelet": node_info.get("kubeletVersion") or "", "container_runtime": node_info.get("containerRuntimeVersion") or "", "addresses": _node_addresses(status), + "capacity": _node_capacity(status.get("capacity")), + "allocatable": _node_capacity(status.get("allocatable")), + "pressure": conditions, } ) details.sort(key=lambda item: item.get("name") or "") @@ -164,33 +185,75 @@ def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]: "by_arch": {}, "by_role": {}, "not_ready_names": [], + "pressure_nodes": {key: [] for key in _PRESSURE_TYPES}, } not_ready: list[str] = [] for node in details: - name = node.get("name") if isinstance(node, dict) else "" - if not isinstance(name, str) or not name: - continue - summary["total"] += 1 - ready = bool(node.get("ready")) - if ready: - summary["ready"] += 1 - else: + name = _apply_node_summary(summary, node) + if name and not node.get("ready"): not_ready.append(name) - if node.get("is_worker"): - summary["workers"]["total"] += 1 - if ready: - summary["workers"]["ready"] += 1 - hardware = node.get("hardware") or "unknown" - arch = node.get("arch") or "unknown" - summary["by_hardware"][hardware] = summary["by_hardware"].get(hardware, 0) + 1 - summary["by_arch"][arch] = summary["by_arch"].get(arch, 0) + 1 - for role in node.get("roles") or []: - summary["by_role"][role] = summary["by_role"].get(role, 0) + 1 not_ready.sort() summary["not_ready_names"] = not_ready + for cond_type in summary["pressure_nodes"]: + summary["pressure_nodes"][cond_type].sort() return summary +def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str: + name = node.get("name") if isinstance(node, dict) else "" + if not isinstance(name, str) or not name: + return "" + summary["total"] += 1 + ready = bool(node.get("ready")) + if ready: + summary["ready"] += 1 + if node.get("is_worker"): + summary["workers"]["total"] += 1 + if ready: + summary["workers"]["ready"] += 1 + hardware = node.get("hardware") or "unknown" + arch = node.get("arch") or "unknown" + summary["by_hardware"][hardware] = summary["by_hardware"].get(hardware, 0) + 1 + summary["by_arch"][arch] = summary["by_arch"].get(arch, 0) + 1 + for role in node.get("roles") or []: + summary["by_role"][role] = summary["by_role"].get(role, 0) + 1 + _apply_pressure(summary, node, name) + return name + + +def _apply_pressure(summary: dict[str, Any], node: dict[str, Any], name: str) -> None: + pressure = node.get("pressure") or {} + if not isinstance(pressure, dict): + return + for cond_type, active in pressure.items(): + if active and cond_type in summary["pressure_nodes"]: + summary["pressure_nodes"][cond_type].append(name) + + +def _node_capacity(raw: Any) -> dict[str, str]: + if not isinstance(raw, dict): + return {} + output: dict[str, str] = {} + for key in _CAPACITY_KEYS: + value = raw.get(key) + if isinstance(value, (str, int, float)) and value != "": + output[key] = str(value) + return output + + +def _node_pressure_conditions(conditions: Any) -> dict[str, bool]: + if not isinstance(conditions, list): + return {} + pressure: dict[str, bool] = {} + for condition in conditions: + if not isinstance(condition, dict): + continue + cond_type = condition.get("type") + if cond_type in _PRESSURE_TYPES: + pressure[cond_type] = condition.get("status") == "True" + return pressure + + def _node_roles(labels: dict[str, Any]) -> list[str]: roles: list[str] = [] for key in labels.keys(): @@ -508,6 +571,54 @@ def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]] return output +def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: + items: list[dict[str, Any]] = [] + counts: dict[str, int] = {key: 0 for key in _PHASE_SEVERITY} + for pod in _items(payload): + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + if not name or not namespace: + continue + phase = status.get("phase") if isinstance(status.get("phase"), str) else "" + restarts = 0 + waiting_reasons: list[str] = [] + for container in status.get("containerStatuses") or []: + if not isinstance(container, dict): + continue + restarts += int(container.get("restartCount") or 0) + state = container.get("state") if isinstance(container.get("state"), dict) else {} + waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {} + reason = waiting.get("reason") + if isinstance(reason, str) and reason: + waiting_reasons.append(reason) + if phase in counts: + counts[phase] += 1 + if phase in _PHASE_SEVERITY or restarts > 0: + items.append( + { + "namespace": namespace, + "pod": name, + "node": spec.get("nodeName") or "", + "phase": phase, + "reason": status.get("reason") or "", + "restarts": restarts, + "waiting_reasons": sorted(set(waiting_reasons)), + } + ) + items.sort( + key=lambda item: ( + -_PHASE_SEVERITY.get(item.get("phase") or "", 0), + -(item.get("restarts") or 0), + item.get("namespace") or "", + item.get("pod") or "", + ) + ) + return {"counts": counts, "items": items[:20]} + + def _vm_query(expr: str) -> list[dict[str, Any]] | None: base = settings.vm_url if not base: @@ -689,6 +800,12 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics["nodes_ready"] = _vm_scalar( "count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})" ) + metrics["capacity_cpu"] = _vm_scalar("sum(kube_node_status_capacity_cpu_cores)") + metrics["allocatable_cpu"] = _vm_scalar("sum(kube_node_status_allocatable_cpu_cores)") + metrics["capacity_mem_bytes"] = _vm_scalar("sum(kube_node_status_capacity_memory_bytes)") + metrics["allocatable_mem_bytes"] = _vm_scalar("sum(kube_node_status_allocatable_memory_bytes)") + metrics["capacity_pods"] = _vm_scalar("sum(kube_node_status_capacity_pods)") + metrics["allocatable_pods"] = _vm_scalar("sum(kube_node_status_allocatable_pods)") metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})") metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})") metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})") @@ -728,6 +845,12 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: "restarts": "count", "namespace_cpu": "cores", "namespace_mem": "bytes", + "capacity_cpu": "cores", + "allocatable_cpu": "cores", + "capacity_mem_bytes": "bytes", + "allocatable_mem_bytes": "bytes", + "capacity_pods": "count", + "allocatable_pods": "count", } metrics["windows"] = { "rates": _RATE_WINDOW, @@ -764,12 +887,14 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: namespace_pods: list[dict[str, Any]] = [] namespace_nodes: list[dict[str, Any]] = [] node_pods: list[dict[str, Any]] = [] + pod_issues: dict[str, Any] = {} try: pods_payload = get_json("/api/v1/pods?limit=5000") workloads = _summarize_workloads(pods_payload) namespace_pods = _summarize_namespace_pods(pods_payload) namespace_nodes = _summarize_namespace_nodes(pods_payload) node_pods = _summarize_node_pods(pods_payload) + pod_issues = _summarize_pod_issues(pods_payload) except Exception as exc: errors.append(f"pods: {exc}") @@ -785,6 +910,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "namespace_pods": namespace_pods, "namespace_nodes": namespace_nodes, "node_pods": node_pods, + "pod_issues": pod_issues, "metrics": metrics, "errors": errors, } diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index c215809..db5e358 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -78,12 +78,14 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["flux"]["not_ready"] == 1 assert snapshot["nodes_summary"]["total"] == 2 assert snapshot["nodes_summary"]["ready"] == 1 + assert "pressure_nodes" in snapshot["nodes_summary"] assert snapshot["nodes_detail"] assert snapshot["workloads"] assert snapshot["namespace_pods"] assert snapshot["namespace_pods"][0]["namespace"] == "media" assert snapshot["namespace_nodes"] assert snapshot["node_pods"] + assert "pod_issues" in snapshot assert "node_usage_stats" in snapshot["metrics"] assert snapshot["metrics"]["namespace_cpu_top"] == [] assert snapshot["metrics"]["namespace_mem_top"] == []