diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 5ac52d0..d438d9a 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -52,6 +52,10 @@ _BASELINE_DELTA_WARN = 50.0 _BASELINE_DELTA_CRIT = 100.0 _SIGNAL_LIMIT = 15 _PROFILE_LIMIT = 6 +_WORKLOAD_INDEX_LIMIT = 20 +_NODE_WORKLOAD_LIMIT = 12 +_NODE_WORKLOAD_TOP = 3 +_EVENTS_SUMMARY_LIMIT = 5 _CAPACITY_KEYS = { "cpu", "memory", @@ -2035,10 +2039,99 @@ def _workload_nodes_top(workloads: list[dict[str, Any]], limit: int = 5) -> list return output +def _node_workload_map(workloads: list[dict[str, Any]]) -> dict[str, dict[str, int]]: + mapping: dict[str, dict[str, int]] = {} + for entry in workloads: + if not isinstance(entry, dict): + continue + namespace = entry.get("namespace") + workload = entry.get("workload") + if not isinstance(workload, str) or not workload: + continue + nodes = entry.get("nodes") + if not isinstance(nodes, dict): + continue + key = f"{namespace}/{workload}" if isinstance(namespace, str) and namespace else workload + for node, count in nodes.items(): + if not isinstance(node, str) or not node: + continue + if not isinstance(count, int): + try: + count = int(count) + except (TypeError, ValueError): + continue + if count <= 0: + continue + mapping.setdefault(node, {})[key] = mapping.setdefault(node, {}).get(key, 0) + count + return mapping + + +def _node_workloads_top( + workload_map: dict[str, dict[str, int]], + limit_nodes: int = _NODE_WORKLOAD_LIMIT, + limit_workloads: int = _NODE_WORKLOAD_TOP, +) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for node, workloads in workload_map.items(): + if not isinstance(node, str) or not node or not isinstance(workloads, dict): + continue + total = sum(count for count in workloads.values() if isinstance(count, int)) + top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:limit_workloads] + output.append({"node": node, "pods_total": total, "workloads_top": top}) + output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("node") or "")) + return output[:limit_nodes] + + +def _workload_index(workloads: list[dict[str, Any]], limit: int = _WORKLOAD_INDEX_LIMIT) -> list[dict[str, Any]]: + entries = [entry for entry in workloads if isinstance(entry, dict)] + entries.sort( + key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""), + ) + output: list[dict[str, Any]] = [] + for entry in entries[:limit]: + nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {} + nodes_top = ( + sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP] + if isinstance(nodes, dict) + else [] + ) + output.append( + { + "namespace": entry.get("namespace"), + "workload": entry.get("workload"), + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "primary_node": entry.get("primary_node"), + "nodes_top": nodes_top, + } + ) + return output + + +def _events_summary(events: dict[str, Any]) -> dict[str, Any]: + if not isinstance(events, dict): + return {} + by_namespace = events.get("warnings_by_namespace") if isinstance(events.get("warnings_by_namespace"), dict) else {} + top_namespace = "" + top_namespace_count = 0 + if by_namespace: + top_namespace, top_namespace_count = sorted( + by_namespace.items(), key=lambda item: (-item[1], item[0]) + )[0] + return { + "warnings_total": events.get("warnings_total"), + "top_reason": events.get("warnings_top_reason"), + "top_namespace": {"namespace": top_namespace, "count": top_namespace_count}, + "latest": events.get("warnings_latest"), + "recent": (events.get("warnings_recent") or [])[:_EVENTS_SUMMARY_LIMIT], + } + + def _node_context( node_details: list[dict[str, Any]], node_load: list[dict[str, Any]], node_baseline: dict[str, dict[str, dict[str, float]]], + node_workloads: dict[str, dict[str, int]], ) -> list[dict[str, Any]]: load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)} output: list[dict[str, Any]] = [] @@ -2057,6 +2150,8 @@ def _node_context( delta = _baseline_delta(current, stats) if delta is not None: deltas[key] = delta + workloads = node_workloads.get(name, {}) if isinstance(node_workloads, dict) else {} + workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP] output.append( { "node": name, @@ -2078,6 +2173,7 @@ def _node_context( "load_index": load_entry.get("load_index"), "baseline": baseline, "baseline_delta": deltas, + "workloads_top": workloads_top, } ) output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) @@ -2250,8 +2346,13 @@ def _build_signals( return signals[:_SIGNAL_LIMIT] -def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str, Any]]) -> list[dict[str, Any]]: +def _node_profiles( + node_context: list[dict[str, Any]], + node_pods: list[dict[str, Any]], + node_workloads: dict[str, dict[str, int]], +) -> list[dict[str, Any]]: pod_map = {entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)} + workload_map = node_workloads or {} profiles: list[dict[str, Any]] = [] for entry in node_context: if not isinstance(entry, dict): @@ -2260,6 +2361,8 @@ def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str, if not isinstance(node, str) or not node: continue pods = pod_map.get(node, {}) + workloads = workload_map.get(node, {}) + workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP] profiles.append( { "node": node, @@ -2270,6 +2373,7 @@ def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str, "pods_total": pods.get("pods_total"), "pods_running": pods.get("pods_running"), "namespaces_top": pods.get("namespaces_top") or [], + "workloads_top": workloads_top, "load_index": entry.get("load_index"), "cpu": entry.get("cpu"), "ram": entry.get("ram"), @@ -2337,9 +2441,10 @@ def _build_profiles( namespace_context: list[dict[str, Any]], node_pods: list[dict[str, Any]], workloads: list[dict[str, Any]], + node_workloads: dict[str, dict[str, int]], ) -> dict[str, Any]: return { - "nodes": _node_profiles(node_context, node_pods), + "nodes": _node_profiles(node_context, node_pods, node_workloads), "namespaces": _namespace_profiles(namespace_context), "workloads": _workload_profiles(workloads), } @@ -2747,10 +2852,12 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: metrics.get("namespace_capacity", []), metrics.get("namespace_baseline_map", {}), ) + node_workloads = _node_workload_map(workloads) node_context = _node_context( node_details, metrics.get("node_load", []), metrics.get("node_baseline_map", {}), + node_workloads, ) signals = _build_signals( metrics, @@ -2760,7 +2867,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: pod_issues, kustomizations, ) - profiles = _build_profiles(node_context, namespace_context, node_pods, workloads) + profiles = _build_profiles(node_context, namespace_context, node_pods, workloads, node_workloads) summary = { "generated_at": collected_at.isoformat(), "windows": metrics.get("windows", {}), @@ -2791,11 +2898,17 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "node_namespaces": metrics.get("node_pods_top", []), "workload_nodes": _workload_nodes_top(workloads, 5), }, + "topology": { + "nodes": _node_workloads_top(node_workloads), + "workloads": _workload_index(workloads), + "namespaces": _namespace_nodes_top(namespace_context, 12), + }, "attention_ranked": _build_attention_ranked(metrics, node_context, pod_issues, workload_health), "signals": signals, "profiles": profiles, "anomalies": anomalies, "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), + "events": _events_summary(events), "unknowns": errors, }