cluster: enrich time-aware snapshot

This commit is contained in:
Brad Stein 2026-01-30 20:52:29 -03:00
parent 4c011ca6f1
commit 7ee450228f

View File

@ -19,6 +19,12 @@ _VALUE_PAIR_LEN = 2
_RATE_WINDOW = "5m" _RATE_WINDOW = "5m"
_RESTARTS_WINDOW = "1h" _RESTARTS_WINDOW = "1h"
_BASELINE_WINDOW = "24h" _BASELINE_WINDOW = "24h"
_TREND_WINDOWS = ("1h", "6h", "24h")
_TREND_NODE_LIMIT = 30
_TREND_NAMESPACE_LIMIT = 20
_TREND_PVC_LIMIT = 10
_TREND_JOB_LIMIT = 10
_TREND_POD_LIMIT = 15
_NODE_DISK_ALERT = 80.0 _NODE_DISK_ALERT = 80.0
_NODE_CPU_ALERT = 80.0 _NODE_CPU_ALERT = 80.0
_NODE_RAM_ALERT = 80.0 _NODE_RAM_ALERT = 80.0
@ -354,6 +360,37 @@ def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]:
return summary return summary
def _hardware_groups(details: list[dict[str, Any]]) -> list[dict[str, Any]]:
groups: dict[str, list[str]] = {}
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not isinstance(name, str) or not name:
continue
hardware = str(node.get("hardware") or "unknown")
groups.setdefault(hardware, []).append(name)
output: list[dict[str, Any]] = []
for hardware, nodes in groups.items():
nodes.sort()
output.append({"hardware": hardware, "count": len(nodes), "nodes": nodes})
output.sort(key=lambda item: (-(item.get("count") or 0), item.get("hardware") or ""))
return output
def _pressure_summary(nodes_summary: dict[str, Any]) -> dict[str, Any]:
pressure_nodes = nodes_summary.get("pressure_nodes") if isinstance(nodes_summary, dict) else {}
summary: dict[str, Any] = {"by_type": {}, "total": 0}
if isinstance(pressure_nodes, dict):
for cond, names in pressure_nodes.items():
count = len(names) if isinstance(names, list) else 0
summary["by_type"][cond] = count
summary["total"] += count
unschedulable = nodes_summary.get("unschedulable_nodes") or []
summary["unschedulable"] = len(unschedulable) if isinstance(unschedulable, list) else 0
return summary
def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str: def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str:
name = node.get("name") if isinstance(node, dict) else "" name = node.get("name") if isinstance(node, dict) else ""
if not isinstance(name, str) or not name: if not isinstance(name, str) or not name:
@ -1371,6 +1408,61 @@ def _baseline_map_to_list(
return output return output
def _limit_entries(entries: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]:
if limit <= 0:
return []
return entries[:limit]
def _vm_window_series(
expr: str,
label_key: str,
name_key: str,
window: str,
) -> dict[str, list[dict[str, Any]]]:
avg = _vector_to_named(
_vm_vector(f"avg_over_time(({expr})[{window}])"),
label_key,
name_key,
)
max_values = _vector_to_named(
_vm_vector(f"max_over_time(({expr})[{window}])"),
label_key,
name_key,
)
p95 = _vector_to_named(
_vm_vector(f"quantile_over_time(0.95, ({expr})[{window}])"),
label_key,
name_key,
)
return {"avg": avg, "max": max_values, "p95": p95}
def _trim_window_series(series: dict[str, list[dict[str, Any]]], limit: int) -> dict[str, list[dict[str, Any]]]:
return {key: _limit_entries(entries, limit) for key, entries in series.items()}
def _build_metric_trends(
exprs: dict[str, str],
label_key: str,
name_key: str,
windows: tuple[str, ...],
limit: int,
) -> dict[str, dict[str, dict[str, list[dict[str, Any]]]]]:
trends: dict[str, dict[str, dict[str, list[dict[str, Any]]]]] = {}
for metric, expr in exprs.items():
metric_trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for window in windows:
series = _vm_window_series(expr, label_key, name_key, window)
metric_trends[window] = _trim_window_series(series, limit)
trends[metric] = metric_trends
return trends
def _vm_scalar_window(expr: str, window: str, fn: str) -> float | None:
return _vm_scalar(f"{fn}(({expr})[{window}])")
def _node_usage_exprs() -> dict[str, str]: def _node_usage_exprs() -> dict[str, str]:
return { return {
"cpu": ( "cpu": (
@ -1405,6 +1497,74 @@ def _namespace_usage_exprs() -> dict[str, str]:
} }
def _namespace_request_exprs() -> dict[str, str]:
return {
"cpu_requests": "sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)",
"mem_requests": "sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)",
}
def _restart_namespace_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_NAMESPACE_LIMIT}, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{window}])))"
)
entries = _filter_namespace_vector(entries)
return _vector_to_named(entries, "namespace", "namespace")
def _job_failure_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_JOB_LIMIT}, sum by (namespace,job_name) (increase(kube_job_status_failed[{window}])))"
)
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
job = metric.get("job_name")
if not isinstance(namespace, str) or not isinstance(job, str):
continue
output.append(
{
"namespace": namespace,
"job": job,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("job") or ""))
return output
def _pods_phase_trends() -> dict[str, dict[str, dict[str, float | None]]]:
phases = {
"running": "sum(kube_pod_status_phase{phase=\"Running\"})",
"pending": "sum(kube_pod_status_phase{phase=\"Pending\"})",
"failed": "sum(kube_pod_status_phase{phase=\"Failed\"})",
}
trends: dict[str, dict[str, dict[str, float | None]]] = {}
for window in _TREND_WINDOWS:
window_entry: dict[str, dict[str, float | None]] = {}
for name, expr in phases.items():
window_entry[name] = {
"avg": _vm_scalar_window(expr, window, "avg_over_time"),
"max": _vm_scalar_window(expr, window, "max_over_time"),
}
trends[window] = window_entry
return trends
def _pvc_usage_trends() -> dict[str, list[dict[str, Any]]]:
trends: dict[str, list[dict[str, Any]]] = {}
expr = "kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100"
for window in _TREND_WINDOWS:
entries = _vm_vector(
f"topk({_TREND_PVC_LIMIT}, max_over_time(({expr})[{window}]))"
)
trends[window] = _pvc_top(entries)
return trends
def _postgres_connections(errors: list[str]) -> dict[str, Any]: def _postgres_connections(errors: list[str]) -> dict[str, Any]:
postgres: dict[str, Any] = {} postgres: dict[str, Any] = {}
try: try:
@ -1802,6 +1962,41 @@ def _collect_node_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
errors.append(f"baseline: {exc}") errors.append(f"baseline: {exc}")
def _collect_trend_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try:
metrics["node_trends"] = _build_metric_trends(
_node_usage_exprs(),
"node",
"node",
_TREND_WINDOWS,
_TREND_NODE_LIMIT,
)
metrics["namespace_trends"] = _build_metric_trends(
_namespace_usage_exprs(),
"namespace",
"namespace",
_TREND_WINDOWS,
_TREND_NAMESPACE_LIMIT,
)
metrics["namespace_request_trends"] = _build_metric_trends(
_namespace_request_exprs(),
"namespace",
"namespace",
_TREND_WINDOWS,
_TREND_NAMESPACE_LIMIT,
)
metrics["restart_trends"] = {
window: _restart_namespace_trend(window) for window in _TREND_WINDOWS
}
metrics["job_failure_trends"] = {
window: _job_failure_trend(window) for window in _TREND_WINDOWS
}
metrics["pods_phase_trends"] = _pods_phase_trends()
metrics["pvc_usage_trends"] = _pvc_usage_trends()
except Exception as exc:
errors.append(f"trends: {exc}")
def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None: def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try: try:
metrics["namespace_cpu_top"] = _filter_namespace_vector( metrics["namespace_cpu_top"] = _filter_namespace_vector(
@ -1906,6 +2101,7 @@ def _finalize_metrics(metrics: dict[str, Any]) -> None:
metrics["windows"] = { metrics["windows"] = {
"rates": _RATE_WINDOW, "rates": _RATE_WINDOW,
"restarts": _RESTARTS_WINDOW, "restarts": _RESTARTS_WINDOW,
"trend": _TREND_WINDOWS,
} }
@ -1913,12 +2109,43 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
metrics: dict[str, Any] = {} metrics: dict[str, Any] = {}
_collect_vm_core(metrics, errors) _collect_vm_core(metrics, errors)
_collect_node_metrics(metrics, errors) _collect_node_metrics(metrics, errors)
_collect_trend_metrics(metrics, errors)
_collect_namespace_metrics(metrics, errors) _collect_namespace_metrics(metrics, errors)
metrics["pvc_usage_top"] = _pvc_usage(errors) metrics["pvc_usage_top"] = _pvc_usage(errors)
metrics["trend_summary"] = _trend_summary(metrics)
_finalize_metrics(metrics) _finalize_metrics(metrics)
return metrics return metrics
def _trend_summary(metrics: dict[str, Any]) -> dict[str, Any]:
node_trends = metrics.get("node_trends", {}) if isinstance(metrics.get("node_trends"), dict) else {}
namespace_trends = (
metrics.get("namespace_trends", {}) if isinstance(metrics.get("namespace_trends"), dict) else {}
)
restarts = metrics.get("restart_trends", {}) if isinstance(metrics.get("restart_trends"), dict) else {}
job_failures = (
metrics.get("job_failure_trends", {}) if isinstance(metrics.get("job_failure_trends"), dict) else {}
)
summary: dict[str, Any] = {}
for metric_key, target in (("cpu", "node_cpu"), ("ram", "node_ram")):
metric_block = node_trends.get(metric_key, {}) if isinstance(node_trends.get(metric_key), dict) else {}
summary[target] = {
window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5)
for window in _TREND_WINDOWS
}
for metric_key, target in (("cpu", "namespace_cpu"), ("mem", "namespace_mem")):
metric_block = namespace_trends.get(metric_key, {}) if isinstance(namespace_trends.get(metric_key), dict) else {}
summary[target] = {
window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5)
for window in _TREND_WINDOWS
}
summary["restarts"] = {window: _limit_entries(entries or [], 5) for window, entries in restarts.items()}
summary["job_failures"] = {
window: _limit_entries(entries or [], 5) for window, entries in job_failures.items()
}
return summary
def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]: def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]:
entries = [ entries = [
{"namespace": name, "value": value} {"namespace": name, "value": value}
@ -2849,6 +3076,9 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"restarts": _vector_to_named(metrics.get("restart_namespace_top", []), "namespace", "namespace"), "restarts": _vector_to_named(metrics.get("restart_namespace_top", []), "namespace", "namespace"),
} }
hardware_groups = _hardware_groups(node_details)
pressure_summary = _pressure_summary(node_summary)
anomalies = _build_anomalies(metrics, node_summary, workload_health, kustomizations, events) anomalies = _build_anomalies(metrics, node_summary, workload_health, kustomizations, events)
namespace_context = _namespace_context( namespace_context = _namespace_context(
namespace_pods, namespace_pods,
@ -2878,7 +3108,10 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"generated_at": collected_at.isoformat(), "generated_at": collected_at.isoformat(),
"windows": metrics.get("windows", {}), "windows": metrics.get("windows", {}),
"baseline_window": _BASELINE_WINDOW, "baseline_window": _BASELINE_WINDOW,
"inventory": node_summary, "inventory": {
**(node_summary or {}),
"hardware_groups": hardware_groups,
},
"counts": { "counts": {
"nodes_total": metrics.get("nodes_total"), "nodes_total": metrics.get("nodes_total"),
"nodes_ready": metrics.get("nodes_ready"), "nodes_ready": metrics.get("nodes_ready"),
@ -2887,6 +3120,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"pods_failed": metrics.get("pods_failed"), "pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"), "pods_succeeded": metrics.get("pods_succeeded"),
}, },
"pressure_summary": pressure_summary,
"trend_summary": metrics.get("trend_summary"),
"top": { "top": {
"namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5], "namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5],
"namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5], "namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5],