From fc2a482df14add392b40fac53580c24f61e25fa8 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 29 Jan 2026 13:05:04 -0300 Subject: [PATCH] snapshot: add node load and namespace capacity --- ariadne/services/cluster_state.py | 171 ++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index ba03392..cd18306 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -487,11 +487,20 @@ def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]: by_namespace[namespace] = by_namespace.get(namespace, 0) + count warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True) top = warnings[:_EVENTS_MAX] + top_reason = "" + top_reason_count = 0 + if by_reason: + top_reason, top_reason_count = sorted( + by_reason.items(), key=lambda item: (-item[1], item[0]) + )[0] + latest_warning = top[0] if top else None return { "warnings_total": len(warnings), "warnings_by_reason": by_reason, "warnings_by_namespace": by_namespace, "warnings_recent": top, + "warnings_top_reason": {"reason": top_reason, "count": top_reason_count}, + "warnings_latest": latest_warning, } @@ -712,6 +721,22 @@ def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]] return output +def _node_pods_top(node_pods: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for entry in node_pods[:limit]: + if not isinstance(entry, dict): + continue + output.append( + { + "node": entry.get("node"), + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "namespaces_top": entry.get("namespaces_top") or [], + } + ) + return output + + def _record_pending_pod( pending_oldest: list[dict[str, Any]], info: dict[str, Any], @@ -1343,6 +1368,128 @@ def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]: } +def _vm_namespace_totals(expr: str) -> dict[str, float]: + totals: dict[str, float] = {} + for item in _vm_vector(expr): + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + namespace = metric.get("namespace") + if not isinstance(namespace, str) or not namespace: + continue + try: + totals[namespace] = float(item.get("value")) + except (TypeError, ValueError): + continue + return totals + + +def _build_namespace_capacity( + cpu_usage: dict[str, float], + cpu_requests: dict[str, float], + mem_usage: dict[str, float], + mem_requests: dict[str, float], +) -> list[dict[str, Any]]: + namespaces = sorted(set(cpu_usage) | set(cpu_requests) | set(mem_usage) | set(mem_requests)) + output: list[dict[str, Any]] = [] + for namespace in namespaces: + cpu_used = cpu_usage.get(namespace) + cpu_req = cpu_requests.get(namespace) + mem_used = mem_usage.get(namespace) + mem_req = mem_requests.get(namespace) + cpu_ratio = None + mem_ratio = None + if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)) and cpu_req > 0: + cpu_ratio = cpu_used / cpu_req + if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)) and mem_req > 0: + mem_ratio = mem_used / mem_req + output.append( + { + "namespace": namespace, + "cpu_usage": cpu_used, + "cpu_requests": cpu_req, + "cpu_usage_ratio": cpu_ratio, + "mem_usage": mem_used, + "mem_requests": mem_req, + "mem_usage_ratio": mem_ratio, + } + ) + output.sort( + key=lambda item: ( + -(item.get("cpu_requests") or 0), + -(item.get("mem_requests") or 0), + item.get("namespace") or "", + ) + ) + return output + + +def _node_usage_profile( + node_usage: dict[str, list[dict[str, Any]]], + node_details: list[dict[str, Any]], + node_pods: list[dict[str, Any]], +) -> list[dict[str, Any]]: + usage: dict[str, dict[str, Any]] = {} + for key in ("cpu", "ram", "disk", "net", "io"): + for item in node_usage.get(key, []) or []: + node = item.get("node") + value = item.get("value") + if not isinstance(node, str) or not node: + continue + if not isinstance(value, (int, float)): + continue + usage.setdefault(node, {})[key] = float(value) + max_values: dict[str, float] = {} + for key in ("cpu", "ram", "disk", "net", "io"): + values = [entry.get(key) for entry in usage.values() if isinstance(entry.get(key), (int, float))] + max_values[key] = max(values) if values else 0.0 + + detail_map: dict[str, dict[str, Any]] = { + entry.get("name"): entry for entry in node_details if isinstance(entry, dict) + } + pod_map: dict[str, dict[str, Any]] = { + entry.get("node"): entry for entry in node_pods if isinstance(entry, dict) + } + + output: list[dict[str, Any]] = [] + for node, entry in usage.items(): + detail = detail_map.get(node, {}) + pressure = detail.get("pressure") if isinstance(detail.get("pressure"), dict) else {} + pressure_count = sum(1 for value in pressure.values() if value) + taints = detail.get("taints") if isinstance(detail.get("taints"), list) else [] + unschedulable = bool(detail.get("unschedulable")) + pods_total = None + pod_entry = pod_map.get(node) + if isinstance(pod_entry, dict): + pods_total = pod_entry.get("pods_total") + + normalized: dict[str, float] = {} + for key in ("cpu", "ram", "disk", "net", "io"): + raw = entry.get(key) + max_val = max_values.get(key) or 0.0 + if isinstance(raw, (int, float)) and max_val > 0: + normalized[f"{key}_norm"] = raw / max_val + norm_values = [v for v in normalized.values() if isinstance(v, (int, float))] + load_index = sum(norm_values) / len(norm_values) if norm_values else None + output.append( + { + "node": node, + "cpu": entry.get("cpu"), + "ram": entry.get("ram"), + "disk": entry.get("disk"), + "net": entry.get("net"), + "io": entry.get("io"), + **normalized, + "pressure_flags": pressure, + "pressure_count": pressure_count, + "taints": taints, + "unschedulable": unschedulable, + "pods_total": pods_total, + "load_index": load_index, + } + ) + output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) + return output + + def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics: dict[str, Any] = {} try: @@ -1434,6 +1581,24 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: f"topk(5, sum by (namespace) (rate(container_fs_reads_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}]) + rate(container_fs_writes_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}])))" ) ) + namespace_cpu_usage = _vm_namespace_totals( + f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))' + ) + namespace_cpu_requests = _vm_namespace_totals( + "sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)" + ) + namespace_mem_usage = _vm_namespace_totals( + 'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})' + ) + namespace_mem_requests = _vm_namespace_totals( + "sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)" + ) + metrics["namespace_capacity"] = _build_namespace_capacity( + namespace_cpu_usage, + namespace_cpu_requests, + namespace_mem_usage, + namespace_mem_requests, + ) except Exception as exc: errors.append(f"namespace_usage: {exc}") metrics["pvc_usage_top"] = _pvc_usage(errors) @@ -1483,6 +1648,12 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: events = _fetch_events(errors) metrics = _summarize_metrics(errors) + metrics["node_pods_top"] = _node_pods_top(node_pods) + metrics["node_load"] = _node_usage_profile( + metrics.get("node_usage", {}), + node_details, + node_pods, + ) snapshot = { "collected_at": collected_at.isoformat(),