cluster_state: add topology and event summaries

This commit is contained in:
Brad Stein 2026-01-30 17:04:22 -03:00
parent 0ad5f2afae
commit 37284ea7ac

View File

@ -52,6 +52,10 @@ _BASELINE_DELTA_WARN = 50.0
_BASELINE_DELTA_CRIT = 100.0 _BASELINE_DELTA_CRIT = 100.0
_SIGNAL_LIMIT = 15 _SIGNAL_LIMIT = 15
_PROFILE_LIMIT = 6 _PROFILE_LIMIT = 6
_WORKLOAD_INDEX_LIMIT = 20
_NODE_WORKLOAD_LIMIT = 12
_NODE_WORKLOAD_TOP = 3
_EVENTS_SUMMARY_LIMIT = 5
_CAPACITY_KEYS = { _CAPACITY_KEYS = {
"cpu", "cpu",
"memory", "memory",
@ -2035,10 +2039,99 @@ def _workload_nodes_top(workloads: list[dict[str, Any]], limit: int = 5) -> list
return output return output
def _node_workload_map(workloads: list[dict[str, Any]]) -> dict[str, dict[str, int]]:
mapping: dict[str, dict[str, int]] = {}
for entry in workloads:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
workload = entry.get("workload")
if not isinstance(workload, str) or not workload:
continue
nodes = entry.get("nodes")
if not isinstance(nodes, dict):
continue
key = f"{namespace}/{workload}" if isinstance(namespace, str) and namespace else workload
for node, count in nodes.items():
if not isinstance(node, str) or not node:
continue
if not isinstance(count, int):
try:
count = int(count)
except (TypeError, ValueError):
continue
if count <= 0:
continue
mapping.setdefault(node, {})[key] = mapping.setdefault(node, {}).get(key, 0) + count
return mapping
def _node_workloads_top(
workload_map: dict[str, dict[str, int]],
limit_nodes: int = _NODE_WORKLOAD_LIMIT,
limit_workloads: int = _NODE_WORKLOAD_TOP,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for node, workloads in workload_map.items():
if not isinstance(node, str) or not node or not isinstance(workloads, dict):
continue
total = sum(count for count in workloads.values() if isinstance(count, int))
top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:limit_workloads]
output.append({"node": node, "pods_total": total, "workloads_top": top})
output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("node") or ""))
return output[:limit_nodes]
def _workload_index(workloads: list[dict[str, Any]], limit: int = _WORKLOAD_INDEX_LIMIT) -> list[dict[str, Any]]:
entries = [entry for entry in workloads if isinstance(entry, dict)]
entries.sort(
key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""),
)
output: list[dict[str, Any]] = []
for entry in entries[:limit]:
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
nodes_top = (
sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
if isinstance(nodes, dict)
else []
)
output.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes_top": nodes_top,
}
)
return output
def _events_summary(events: dict[str, Any]) -> dict[str, Any]:
if not isinstance(events, dict):
return {}
by_namespace = events.get("warnings_by_namespace") if isinstance(events.get("warnings_by_namespace"), dict) else {}
top_namespace = ""
top_namespace_count = 0
if by_namespace:
top_namespace, top_namespace_count = sorted(
by_namespace.items(), key=lambda item: (-item[1], item[0])
)[0]
return {
"warnings_total": events.get("warnings_total"),
"top_reason": events.get("warnings_top_reason"),
"top_namespace": {"namespace": top_namespace, "count": top_namespace_count},
"latest": events.get("warnings_latest"),
"recent": (events.get("warnings_recent") or [])[:_EVENTS_SUMMARY_LIMIT],
}
def _node_context( def _node_context(
node_details: list[dict[str, Any]], node_details: list[dict[str, Any]],
node_load: list[dict[str, Any]], node_load: list[dict[str, Any]],
node_baseline: dict[str, dict[str, dict[str, float]]], node_baseline: dict[str, dict[str, dict[str, float]]],
node_workloads: dict[str, dict[str, int]],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)} load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)}
output: list[dict[str, Any]] = [] output: list[dict[str, Any]] = []
@ -2057,6 +2150,8 @@ def _node_context(
delta = _baseline_delta(current, stats) delta = _baseline_delta(current, stats)
if delta is not None: if delta is not None:
deltas[key] = delta deltas[key] = delta
workloads = node_workloads.get(name, {}) if isinstance(node_workloads, dict) else {}
workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
output.append( output.append(
{ {
"node": name, "node": name,
@ -2078,6 +2173,7 @@ def _node_context(
"load_index": load_entry.get("load_index"), "load_index": load_entry.get("load_index"),
"baseline": baseline, "baseline": baseline,
"baseline_delta": deltas, "baseline_delta": deltas,
"workloads_top": workloads_top,
} }
) )
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
@ -2250,8 +2346,13 @@ def _build_signals(
return signals[:_SIGNAL_LIMIT] return signals[:_SIGNAL_LIMIT]
def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str, Any]]) -> list[dict[str, Any]]: def _node_profiles(
node_context: list[dict[str, Any]],
node_pods: list[dict[str, Any]],
node_workloads: dict[str, dict[str, int]],
) -> list[dict[str, Any]]:
pod_map = {entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)} pod_map = {entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)}
workload_map = node_workloads or {}
profiles: list[dict[str, Any]] = [] profiles: list[dict[str, Any]] = []
for entry in node_context: for entry in node_context:
if not isinstance(entry, dict): if not isinstance(entry, dict):
@ -2260,6 +2361,8 @@ def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str,
if not isinstance(node, str) or not node: if not isinstance(node, str) or not node:
continue continue
pods = pod_map.get(node, {}) pods = pod_map.get(node, {})
workloads = workload_map.get(node, {})
workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
profiles.append( profiles.append(
{ {
"node": node, "node": node,
@ -2270,6 +2373,7 @@ def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str,
"pods_total": pods.get("pods_total"), "pods_total": pods.get("pods_total"),
"pods_running": pods.get("pods_running"), "pods_running": pods.get("pods_running"),
"namespaces_top": pods.get("namespaces_top") or [], "namespaces_top": pods.get("namespaces_top") or [],
"workloads_top": workloads_top,
"load_index": entry.get("load_index"), "load_index": entry.get("load_index"),
"cpu": entry.get("cpu"), "cpu": entry.get("cpu"),
"ram": entry.get("ram"), "ram": entry.get("ram"),
@ -2337,9 +2441,10 @@ def _build_profiles(
namespace_context: list[dict[str, Any]], namespace_context: list[dict[str, Any]],
node_pods: list[dict[str, Any]], node_pods: list[dict[str, Any]],
workloads: list[dict[str, Any]], workloads: list[dict[str, Any]],
node_workloads: dict[str, dict[str, int]],
) -> dict[str, Any]: ) -> dict[str, Any]:
return { return {
"nodes": _node_profiles(node_context, node_pods), "nodes": _node_profiles(node_context, node_pods, node_workloads),
"namespaces": _namespace_profiles(namespace_context), "namespaces": _namespace_profiles(namespace_context),
"workloads": _workload_profiles(workloads), "workloads": _workload_profiles(workloads),
} }
@ -2747,10 +2852,12 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
metrics.get("namespace_capacity", []), metrics.get("namespace_capacity", []),
metrics.get("namespace_baseline_map", {}), metrics.get("namespace_baseline_map", {}),
) )
node_workloads = _node_workload_map(workloads)
node_context = _node_context( node_context = _node_context(
node_details, node_details,
metrics.get("node_load", []), metrics.get("node_load", []),
metrics.get("node_baseline_map", {}), metrics.get("node_baseline_map", {}),
node_workloads,
) )
signals = _build_signals( signals = _build_signals(
metrics, metrics,
@ -2760,7 +2867,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
pod_issues, pod_issues,
kustomizations, kustomizations,
) )
profiles = _build_profiles(node_context, namespace_context, node_pods, workloads) profiles = _build_profiles(node_context, namespace_context, node_pods, workloads, node_workloads)
summary = { summary = {
"generated_at": collected_at.isoformat(), "generated_at": collected_at.isoformat(),
"windows": metrics.get("windows", {}), "windows": metrics.get("windows", {}),
@ -2791,11 +2898,17 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"node_namespaces": metrics.get("node_pods_top", []), "node_namespaces": metrics.get("node_pods_top", []),
"workload_nodes": _workload_nodes_top(workloads, 5), "workload_nodes": _workload_nodes_top(workloads, 5),
}, },
"topology": {
"nodes": _node_workloads_top(node_workloads),
"workloads": _workload_index(workloads),
"namespaces": _namespace_nodes_top(namespace_context, 12),
},
"attention_ranked": _build_attention_ranked(metrics, node_context, pod_issues, workload_health), "attention_ranked": _build_attention_ranked(metrics, node_context, pod_issues, workload_health),
"signals": signals, "signals": signals,
"profiles": profiles, "profiles": profiles,
"anomalies": anomalies, "anomalies": anomalies,
"health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies),
"events": _events_summary(events),
"unknowns": errors, "unknowns": errors,
} }