From bdb94ffbe1558148b4d3fbe30a2b0172a641d75d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 31 Jan 2026 21:37:59 -0300 Subject: [PATCH] cluster: add lexicon and cross stats --- ariadne/services/cluster_state.py | 137 ++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index a481a56..461af6a 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -90,6 +90,9 @@ _ALERT_TOP_LIMIT = 10 _POD_REASON_LIMIT = 10 _POD_REASON_TREND_LIMIT = 10 _NAMESPACE_ISSUE_LIMIT = 8 +_CROSS_NODE_TOP = 3 +_CROSS_NAMESPACE_TOP = 3 +_CROSS_PVC_TOP = 3 _POD_TERMINATED_REASONS = { "oom_killed": "OOMKilled", "error": "Error", @@ -2654,6 +2657,138 @@ def _events_summary(events: dict[str, Any]) -> dict[str, Any]: } +def _build_lexicon() -> dict[str, Any]: + terms = [ + { + "term": "hottest", + "meaning": "highest utilization for a metric (cpu, ram, net, io, load_index).", + }, + { + "term": "pressure", + "meaning": "node condition flags (MemoryPressure, DiskPressure, PIDPressure, NetworkUnavailable).", + }, + { + "term": "load_index", + "meaning": "composite load score derived from cpu, ram, net, io.", + }, + {"term": "top", "meaning": "highest values within a category."}, + {"term": "pods", "meaning": "running workload instances on a node or namespace."}, + {"term": "workload", "meaning": "deployment/statefulset/daemonset grouping."}, + ] + aliases = { + "hot node": "node with highest load_index", + "hottest by cpu": "node with highest cpu utilization", + "hottest by ram": "node with highest ram utilization", + "pressure node": "node with pressure condition flags", + } + return {"terms": terms, "aliases": aliases} + + +def _top_named_entries( + entries: list[dict[str, Any]], + name_key: str, + limit: int, +) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for entry in entries or []: + if not isinstance(entry, dict): + continue + name = entry.get(name_key) + if not isinstance(name, str) or not name: + continue + value = entry.get("value") + try: + numeric = float(value) + except (TypeError, ValueError): + numeric = 0.0 + output.append({"name": name, "value": numeric}) + output.sort(key=lambda item: -(item.get("value") or 0)) + return output[:limit] + + +def _cross_node_metric_top(metrics: dict[str, Any], node_context: list[dict[str, Any]]) -> list[dict[str, Any]]: + usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {} + node_map = {entry.get("node"): entry for entry in node_context if isinstance(entry, dict)} + output: list[dict[str, Any]] = [] + for metric in ("cpu", "ram", "net", "io", "disk"): + series = usage.get(metric) + if not isinstance(series, list): + continue + for top in _top_named_entries(series, "node", _CROSS_NODE_TOP): + node = top.get("name") + if not node: + continue + context = node_map.get(node, {}) + output.append( + { + "metric": metric, + "node": node, + "value": top.get("value"), + "cpu": context.get("cpu"), + "ram": context.get("ram"), + "net": context.get("net"), + "io": context.get("io"), + "disk": context.get("disk"), + "load_index": context.get("load_index"), + "pods_total": context.get("pods_total"), + "hardware": context.get("hardware"), + "roles": context.get("roles"), + "pressure_flags": context.get("pressure_flags"), + } + ) + return output + + +def _cross_namespace_metric_top( + metrics: dict[str, Any], + namespace_context: list[dict[str, Any]], +) -> list[dict[str, Any]]: + top = metrics.get("namespace_top") if isinstance(metrics.get("namespace_top"), dict) else {} + namespace_map = { + entry.get("namespace"): entry + for entry in namespace_context + if isinstance(entry, dict) and entry.get("namespace") + } + output: list[dict[str, Any]] = [] + for metric in ("cpu", "mem", "net", "io", "restarts"): + series = top.get(metric) + if not isinstance(series, list): + continue + for entry in _top_named_entries(series, "namespace", _CROSS_NAMESPACE_TOP): + namespace = entry.get("name") + if not namespace: + continue + context = namespace_map.get(namespace, {}) + output.append( + { + "metric": metric, + "namespace": namespace, + "value": entry.get("value"), + "pods_total": context.get("pods_total"), + "pods_running": context.get("pods_running"), + "cpu_ratio": context.get("cpu_ratio"), + "mem_ratio": context.get("mem_ratio"), + "primary_node": context.get("primary_node"), + "nodes_top": context.get("nodes_top") or [], + } + ) + return output + + +def _build_cross_stats( + metrics: dict[str, Any], + node_context: list[dict[str, Any]], + namespace_context: list[dict[str, Any]], + workloads: list[dict[str, Any]], +) -> dict[str, Any]: + return { + "node_metric_top": _cross_node_metric_top(metrics, node_context), + "namespace_metric_top": _cross_namespace_metric_top(metrics, namespace_context), + "pvc_top": _pvc_top(metrics.get("pvc_usage_top", []))[:_CROSS_PVC_TOP], + "workload_top": _workload_nodes_top(workloads, _CROSS_NAMESPACE_TOP), + } + + def _node_context( node_details: list[dict[str, Any]], node_load: list[dict[str, Any]], @@ -3444,6 +3579,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "anomalies": anomalies, "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), "events": _events_summary(events), + "lexicon": _build_lexicon(), + "cross_stats": _build_cross_stats(metrics, node_context, namespace_context, workloads), "unknowns": errors, }