snapshot: add usage summaries

This commit is contained in:
Brad Stein 2026-01-28 20:41:29 -03:00
parent e35f7714c3
commit 6914b3b070

View File

@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
_BYTES_KB = 1024
_BYTES_MB = 1024 * 1024
_BYTES_GB = 1024 * 1024 * 1024
_VALUE_PAIR_LEN = 2
@ -192,6 +193,20 @@ def _format_rate_bytes(value: Any) -> str:
return f"{numeric:.2f} B/s"
def _format_bytes(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
if numeric >= _BYTES_GB:
return f"{numeric / _BYTES_GB:.2f} GB"
if numeric >= _BYTES_MB:
return f"{numeric / _BYTES_MB:.2f} MB"
if numeric >= _BYTES_KB:
return f"{numeric / _BYTES_KB:.2f} KB"
return f"{numeric:.2f} B"
def _format_kv_map(values: dict[str, Any]) -> str:
parts = []
for key, value in values.items():
@ -292,6 +307,52 @@ def _append_namespace_pods(lines: list[str], summary: dict[str, Any]) -> None:
lines.append("namespaces_top: " + "; ".join(parts))
def _append_node_usage_stats(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
stats = metrics.get("node_usage_stats") if isinstance(metrics.get("node_usage_stats"), dict) else {}
if not stats:
return
parts = []
for key in ("cpu", "ram", "net", "io"):
entry = stats.get(key) if isinstance(stats.get(key), dict) else {}
avg = entry.get("avg")
if avg is None:
continue
if key in {"net", "io"}:
value = _format_rate_bytes(avg)
else:
value = _format_float(avg)
parts.append(f"{key}={value}")
if parts:
lines.append("node_usage_avg: " + "; ".join(parts))
def _append_namespace_usage(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
cpu_top = metrics.get("namespace_cpu_top") if isinstance(metrics.get("namespace_cpu_top"), list) else []
mem_top = metrics.get("namespace_mem_top") if isinstance(metrics.get("namespace_mem_top"), list) else []
if cpu_top:
parts = []
for entry in cpu_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_float(value)}")
if parts:
lines.append("namespace_cpu_top: " + "; ".join(parts))
if mem_top:
parts = []
for entry in mem_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_bytes(value)}")
if parts:
lines.append("namespace_mem_top: " + "; ".join(parts))
def _append_restarts(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
top_restarts = metrics.get("top_restarts_1h") or []
@ -403,6 +464,8 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
_append_hardware(lines, summary)
_append_pods(lines, summary)
_append_namespace_pods(lines, summary)
_append_node_usage_stats(lines, summary)
_append_namespace_usage(lines, summary)
_append_restarts(lines, summary)
_append_postgres(lines, summary)
_append_hottest(lines, summary)