cluster-state: enrich snapshot substrate

This commit is contained in:
Brad Stein 2026-01-30 00:09:53 -03:00
parent 3d21506ff0
commit 56ea582c97
2 changed files with 327 additions and 0 deletions

View File

@ -1764,6 +1764,12 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
metrics["namespace_capacity_summary"] = _namespace_capacity_summary(
metrics.get("namespace_capacity", []),
)
metrics["namespace_totals"] = {
"cpu": _namespace_totals_list(namespace_cpu_usage),
"mem": _namespace_totals_list(namespace_mem_usage),
"cpu_requests": _namespace_totals_list(namespace_cpu_requests),
"mem_requests": _namespace_totals_list(namespace_mem_requests),
}
metrics["pvc_usage_top"] = _pvc_usage(errors)
metrics["units"] = {
"cpu": "percent",
@ -1798,6 +1804,273 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
return metrics
def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]:
entries = [
{"namespace": name, "value": value}
for name, value in totals.items()
if isinstance(name, str) and name
]
entries.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or ""))
return entries
def _vector_to_named(entries: list[dict[str, Any]], label_key: str, name_key: str) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
value = item.get("value")
label = metric.get(label_key) if isinstance(metric, dict) else None
if not isinstance(label, str) or not label:
continue
output.append({name_key: label, "value": value, "metric": metric})
output.sort(key=lambda item: (-(item.get("value") or 0), item.get(name_key) or ""))
return output
def _pvc_top(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
pvc = metric.get("persistentvolumeclaim")
if not isinstance(namespace, str) or not isinstance(pvc, str):
continue
output.append(
{
"namespace": namespace,
"pvc": pvc,
"used_percent": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("used_percent") or 0), item.get("namespace") or ""))
return output
def _namespace_context(
namespace_pods: list[dict[str, Any]],
namespace_nodes: list[dict[str, Any]],
namespace_capacity: list[dict[str, Any]],
) -> list[dict[str, Any]]:
node_map = {entry.get("namespace"): entry for entry in namespace_nodes if isinstance(entry, dict)}
cap_map = {entry.get("namespace"): entry for entry in namespace_capacity if isinstance(entry, dict)}
output: list[dict[str, Any]] = []
for entry in namespace_pods:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
if not isinstance(namespace, str) or not namespace:
continue
nodes_entry = node_map.get(namespace, {})
cap_entry = cap_map.get(namespace, {})
nodes = nodes_entry.get("nodes") if isinstance(nodes_entry.get("nodes"), dict) else {}
top_nodes: list[dict[str, Any]] = []
if isinstance(nodes, dict):
top_nodes = [
{"node": name, "pods": count}
for name, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:3]
]
output.append(
{
"namespace": namespace,
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"pods_pending": entry.get("pods_pending"),
"pods_failed": entry.get("pods_failed"),
"pods_succeeded": entry.get("pods_succeeded"),
"primary_node": nodes_entry.get("primary_node"),
"nodes_top": top_nodes,
"cpu_usage": cap_entry.get("cpu_usage"),
"cpu_requests": cap_entry.get("cpu_requests"),
"cpu_ratio": cap_entry.get("cpu_usage_ratio"),
"mem_usage": cap_entry.get("mem_usage"),
"mem_requests": cap_entry.get("mem_requests"),
"mem_ratio": cap_entry.get("mem_usage_ratio"),
}
)
output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or ""))
return output
def _node_context(
node_details: list[dict[str, Any]],
node_load: list[dict[str, Any]],
) -> list[dict[str, Any]]:
load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)}
output: list[dict[str, Any]] = []
for entry in node_details:
if not isinstance(entry, dict):
continue
name = entry.get("name")
if not isinstance(name, str) or not name:
continue
load_entry = load_map.get(name, {})
output.append(
{
"node": name,
"ready": entry.get("ready"),
"roles": entry.get("roles"),
"is_worker": entry.get("is_worker"),
"hardware": entry.get("hardware"),
"arch": entry.get("arch"),
"os": entry.get("os"),
"taints": entry.get("taints"),
"unschedulable": entry.get("unschedulable"),
"pressure_flags": entry.get("pressure"),
"pods_total": load_entry.get("pods_total"),
"cpu": load_entry.get("cpu"),
"ram": load_entry.get("ram"),
"disk": load_entry.get("disk"),
"net": load_entry.get("net"),
"io": load_entry.get("io"),
"load_index": load_entry.get("load_index"),
}
)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
return output
def _build_anomalies(
metrics: dict[str, Any],
nodes_summary: dict[str, Any],
workloads_health: dict[str, Any],
kustomizations: dict[str, Any],
events: dict[str, Any],
) -> list[dict[str, Any]]:
anomalies: list[dict[str, Any]] = []
pods_pending = metrics.get("pods_pending") or 0
pods_failed = metrics.get("pods_failed") or 0
if pods_pending:
anomalies.append(
{
"kind": "pods_pending",
"severity": "warning",
"summary": f"{int(pods_pending)} pods pending",
}
)
if pods_failed:
anomalies.append(
{
"kind": "pods_failed",
"severity": "critical",
"summary": f"{int(pods_failed)} pods failed",
}
)
for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
not_ready = entry.get("not_ready") or 0
if not_ready:
anomalies.append(
{
"kind": f"{key}_not_ready",
"severity": "warning",
"summary": f"{int(not_ready)} {key} not ready",
"items": entry.get("items"),
}
)
flux_not_ready = (kustomizations or {}).get("not_ready") or 0
if flux_not_ready:
anomalies.append(
{
"kind": "flux_not_ready",
"severity": "warning",
"summary": f"{int(flux_not_ready)} Flux kustomizations not ready",
"items": (kustomizations or {}).get("items"),
}
)
job_failures = metrics.get("job_failures_24h") or []
job_failures = [
entry for entry in job_failures if isinstance(entry, dict) and (entry.get("value") or 0) > 0
]
if job_failures:
anomalies.append(
{
"kind": "job_failures_24h",
"severity": "warning",
"summary": "Job failures in last 24h",
"items": job_failures[:5],
}
)
pvc_top = _pvc_top(metrics.get("pvc_usage_top") or [])
pvc_pressure = [entry for entry in pvc_top if (entry.get("used_percent") or 0) >= 80]
if pvc_pressure:
anomalies.append(
{
"kind": "pvc_pressure",
"severity": "warning",
"summary": "PVCs above 80% usage",
"items": pvc_pressure[:5],
}
)
if nodes_summary:
pressure_nodes = nodes_summary.get("pressure_nodes") or {}
flagged = [
name for names in pressure_nodes.values() if isinstance(names, list) for name in names if name
]
if flagged:
anomalies.append(
{
"kind": "node_pressure",
"severity": "warning",
"summary": f"{len(flagged)} nodes report pressure",
"items": sorted(set(flagged)),
}
)
unschedulable = nodes_summary.get("unschedulable_nodes") or []
if unschedulable:
anomalies.append(
{
"kind": "unschedulable_nodes",
"severity": "info",
"summary": f"{len(unschedulable)} nodes unschedulable",
"items": unschedulable,
}
)
if events:
warnings = events.get("warnings_total") or 0
if warnings:
anomalies.append(
{
"kind": "event_warnings",
"severity": "info",
"summary": f"{int(warnings)} warning events",
"items": events.get("warnings") or [],
}
)
return anomalies
def _health_bullets(
metrics: dict[str, Any],
nodes_summary: dict[str, Any],
workloads_health: dict[str, Any],
anomalies: list[dict[str, Any]],
) -> list[str]:
bullets: list[str] = []
nodes_total = metrics.get("nodes_total")
nodes_ready = metrics.get("nodes_ready")
if nodes_total is not None and nodes_ready is not None:
bullets.append(f"Nodes ready: {int(nodes_ready)}/{int(nodes_total)}")
pods_running = metrics.get("pods_running") or 0
pods_pending = metrics.get("pods_pending") or 0
pods_failed = metrics.get("pods_failed") or 0
bullets.append(f"Pods: {int(pods_running)} running, {int(pods_pending)} pending, {int(pods_failed)} failed")
not_ready = 0
for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
not_ready += int(entry.get("not_ready") or 0)
if not_ready:
bullets.append(f"Workloads not ready: {not_ready}")
else:
bullets.append("Workloads: all ready")
if anomalies:
top = anomalies[0].get("summary") if isinstance(anomalies[0], dict) else None
if isinstance(top, str) and top:
bullets.append(f"Top concern: {top}")
return bullets[:4]
def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
errors: list[str] = []
collected_at = datetime.now(timezone.utc)
@ -1819,9 +2092,55 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
)
metrics["node_load_summary"] = _node_load_summary(metrics.get("node_load", []))
metrics["node_load_by_hardware"] = _node_usage_by_hardware(metrics.get("node_load", []), node_details)
metrics["namespace_top"] = {
"cpu": _vector_to_named(metrics.get("namespace_cpu_top", []), "namespace", "namespace"),
"mem": _vector_to_named(metrics.get("namespace_mem_top", []), "namespace", "namespace"),
"net": _vector_to_named(metrics.get("namespace_net_top", []), "namespace", "namespace"),
"io": _vector_to_named(metrics.get("namespace_io_top", []), "namespace", "namespace"),
"restarts": _vector_to_named(metrics.get("restart_namespace_top", []), "namespace", "namespace"),
}
anomalies = _build_anomalies(metrics, node_summary, workload_health, kustomizations, events)
namespace_context = _namespace_context(
namespace_pods,
namespace_nodes,
metrics.get("namespace_capacity", []),
)
node_context = _node_context(node_details, metrics.get("node_load", []))
summary = {
"generated_at": collected_at.isoformat(),
"windows": metrics.get("windows", {}),
"counts": {
"nodes_total": metrics.get("nodes_total"),
"nodes_ready": metrics.get("nodes_ready"),
"pods_running": metrics.get("pods_running"),
"pods_pending": metrics.get("pods_pending"),
"pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"),
},
"top": {
"namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5],
"namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5],
"namespace_pods": namespace_pods[:5],
"node_pods": metrics.get("node_pods_top", []),
"node_load": metrics.get("node_load_summary", {}).get("top", []),
"node_hottest": metrics.get("hottest_nodes", {}),
"postgres": metrics.get("postgres_connections", {}),
"pvc_usage": _pvc_top(metrics.get("pvc_usage_top", [])),
},
"anomalies": anomalies,
"health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies),
"unknowns": errors,
}
snapshot = {
"collected_at": collected_at.isoformat(),
"snapshot_version": "v2",
"summary": summary,
"context": {
"nodes": node_context,
"namespaces": namespace_context,
},
"nodes": nodes,
"nodes_summary": node_summary,
"nodes_detail": node_details,

View File

@ -143,6 +143,14 @@ def test_collect_cluster_state(monkeypatch) -> None:
assert snapshot["metrics"]["pod_mem_top"] == []
assert snapshot["metrics"]["job_failures_24h"] == []
assert snapshot["metrics"]["pvc_usage_top"] == []
assert snapshot["summary"]["counts"]["nodes_total"] == 5.0
assert snapshot["summary"]["counts"]["nodes_ready"] == 5.0
assert snapshot["summary"]["counts"]["pods_running"] == 5.0
assert snapshot["summary"]["top"]["namespace_pods"][0]["namespace"] == "media"
assert snapshot["summary"]["health_bullets"]
assert snapshot["summary"]["unknowns"] == []
assert snapshot["context"]["nodes"]
assert snapshot["context"]["namespaces"]
assert summary.nodes_total == 2
assert summary.nodes_ready == 1
assert summary.pods_running == 5.0