cluster-state: refactor anomalies

This commit is contained in:
Brad Stein 2026-01-30 00:14:10 -03:00
parent 56ea582c97
commit d5086a0b98

View File

@ -64,6 +64,7 @@ _PHASE_SEVERITY = {
_PENDING_15M_HOURS = 0.25 _PENDING_15M_HOURS = 0.25
_LOAD_TOP_COUNT = 5 _LOAD_TOP_COUNT = 5
_NAMESPACE_TOP_COUNT = 5 _NAMESPACE_TOP_COUNT = 5
_PVC_PRESSURE_THRESHOLD = 80.0
def _node_usage_by_hardware(node_load: list[dict[str, Any]], node_details: list[dict[str, Any]]) -> list[dict[str, Any]]: def _node_usage_by_hardware(node_load: list[dict[str, Any]], node_details: list[dict[str, Any]]) -> list[dict[str, Any]]:
@ -1939,6 +1940,17 @@ def _build_anomalies(
events: dict[str, Any], events: dict[str, Any],
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
anomalies: list[dict[str, Any]] = [] anomalies: list[dict[str, Any]] = []
_append_pod_anomalies(anomalies, metrics)
_append_workload_anomalies(anomalies, workloads_health)
_append_flux_anomalies(anomalies, kustomizations)
_append_job_failure_anomalies(anomalies, metrics)
_append_pvc_anomalies(anomalies, metrics)
_append_node_anomalies(anomalies, nodes_summary)
_append_event_anomalies(anomalies, events)
return anomalies
def _append_pod_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
pods_pending = metrics.get("pods_pending") or 0 pods_pending = metrics.get("pods_pending") or 0
pods_failed = metrics.get("pods_failed") or 0 pods_failed = metrics.get("pods_failed") or 0
if pods_pending: if pods_pending:
@ -1957,6 +1969,11 @@ def _build_anomalies(
"summary": f"{int(pods_failed)} pods failed", "summary": f"{int(pods_failed)} pods failed",
} }
) )
def _append_workload_anomalies(
anomalies: list[dict[str, Any]], workloads_health: dict[str, Any]
) -> None:
for key in ("deployments", "statefulsets", "daemonsets"): for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {} entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
not_ready = entry.get("not_ready") or 0 not_ready = entry.get("not_ready") or 0
@ -1969,6 +1986,9 @@ def _build_anomalies(
"items": entry.get("items"), "items": entry.get("items"),
} }
) )
def _append_flux_anomalies(anomalies: list[dict[str, Any]], kustomizations: dict[str, Any]) -> None:
flux_not_ready = (kustomizations or {}).get("not_ready") or 0 flux_not_ready = (kustomizations or {}).get("not_ready") or 0
if flux_not_ready: if flux_not_ready:
anomalies.append( anomalies.append(
@ -1979,6 +1999,9 @@ def _build_anomalies(
"items": (kustomizations or {}).get("items"), "items": (kustomizations or {}).get("items"),
} }
) )
def _append_job_failure_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
job_failures = metrics.get("job_failures_24h") or [] job_failures = metrics.get("job_failures_24h") or []
job_failures = [ job_failures = [
entry for entry in job_failures if isinstance(entry, dict) and (entry.get("value") or 0) > 0 entry for entry in job_failures if isinstance(entry, dict) and (entry.get("value") or 0) > 0
@ -1992,53 +2015,73 @@ def _build_anomalies(
"items": job_failures[:5], "items": job_failures[:5],
} }
) )
pvc_top = _pvc_top(metrics.get("pvc_usage_top") or [])
pvc_pressure = [entry for entry in pvc_top if (entry.get("used_percent") or 0) >= 80]
def _append_pvc_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
pvc_pressure = _pvc_pressure_entries(metrics)
if pvc_pressure: if pvc_pressure:
anomalies.append( anomalies.append(
{ {
"kind": "pvc_pressure", "kind": "pvc_pressure",
"severity": "warning", "severity": "warning",
"summary": "PVCs above 80% usage", "summary": f"PVCs above {_PVC_PRESSURE_THRESHOLD:.0f}% usage",
"items": pvc_pressure[:5], "items": pvc_pressure[:5],
} }
) )
if nodes_summary:
pressure_nodes = nodes_summary.get("pressure_nodes") or {}
flagged = [ def _pvc_pressure_entries(metrics: dict[str, Any]) -> list[dict[str, Any]]:
name for names in pressure_nodes.values() if isinstance(names, list) for name in names if name pvc_top = _pvc_top(metrics.get("pvc_usage_top") or [])
] return [
if flagged: entry
anomalies.append( for entry in pvc_top
{ if isinstance(entry, dict)
"kind": "node_pressure", and isinstance(entry.get("used_percent"), (int, float))
"severity": "warning", and float(entry.get("used_percent") or 0) >= _PVC_PRESSURE_THRESHOLD
"summary": f"{len(flagged)} nodes report pressure", ]
"items": sorted(set(flagged)),
}
) def _append_node_anomalies(anomalies: list[dict[str, Any]], nodes_summary: dict[str, Any]) -> None:
unschedulable = nodes_summary.get("unschedulable_nodes") or [] if not nodes_summary:
if unschedulable: return
anomalies.append( pressure_nodes = nodes_summary.get("pressure_nodes") or {}
{ flagged = [
"kind": "unschedulable_nodes", name for names in pressure_nodes.values() if isinstance(names, list) for name in names if name
"severity": "info", ]
"summary": f"{len(unschedulable)} nodes unschedulable", if flagged:
"items": unschedulable, anomalies.append(
} {
) "kind": "node_pressure",
if events: "severity": "warning",
warnings = events.get("warnings_total") or 0 "summary": f"{len(flagged)} nodes report pressure",
if warnings: "items": sorted(set(flagged)),
anomalies.append( }
{ )
"kind": "event_warnings", unschedulable = nodes_summary.get("unschedulable_nodes") or []
"severity": "info", if unschedulable:
"summary": f"{int(warnings)} warning events", anomalies.append(
"items": events.get("warnings") or [], {
} "kind": "unschedulable_nodes",
) "severity": "info",
return anomalies "summary": f"{len(unschedulable)} nodes unschedulable",
"items": unschedulable,
}
)
def _append_event_anomalies(anomalies: list[dict[str, Any]], events: dict[str, Any]) -> None:
if not events:
return
warnings = events.get("warnings_total") or 0
if warnings:
anomalies.append(
{
"kind": "event_warnings",
"severity": "info",
"summary": f"{int(warnings)} warning events",
"items": events.get("warnings") or [],
}
)
def _health_bullets( def _health_bullets(