From b58a1138b3b1ee2419983d533f26a22f14d34288 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 28 Jan 2026 20:29:24 -0300 Subject: [PATCH] snapshot: surface units and windows --- atlasbot/snapshot/builder.py | 316 ++++++++++++++++++++--------------- 1 file changed, 183 insertions(+), 133 deletions(-) diff --git a/atlasbot/snapshot/builder.py b/atlasbot/snapshot/builder.py index 077b381..786ef41 100644 --- a/atlasbot/snapshot/builder.py +++ b/atlasbot/snapshot/builder.py @@ -8,6 +8,10 @@ from atlasbot.config import Settings log = logging.getLogger(__name__) +_BYTES_KB = 1024 +_BYTES_MB = 1024 * 1024 +_VALUE_PAIR_LEN = 2 + class SnapshotProvider: def __init__(self, settings: Settings) -> None: @@ -173,10 +177,10 @@ def _format_rate_bytes(value: Any) -> str: numeric = float(value) except (TypeError, ValueError): return str(value) - if numeric >= 1024 * 1024: - return f"{numeric / (1024 * 1024):.2f} MB/s" - if numeric >= 1024: - return f"{numeric / 1024:.2f} KB/s" + if numeric >= _BYTES_MB: + return f"{numeric / _BYTES_MB:.2f} MB/s" + if numeric >= _BYTES_KB: + return f"{numeric / _BYTES_KB:.2f} KB/s" return f"{numeric:.2f} B/s" @@ -193,138 +197,184 @@ def _format_names(names: list[str]) -> str: return ", ".join(sorted(names)) +def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None: + nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {} + if not nodes: + return + workers = {} + if isinstance(summary.get("nodes_summary"), dict): + workers = summary["nodes_summary"].get("workers") or {} + workers_total = workers.get("total") + workers_ready = workers.get("ready") + workers_str = "" + if workers_total is not None and workers_ready is not None: + workers_str = f", workers_ready={workers_ready}/{workers_total}" + lines.append( + "nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format( + total=nodes.get("total"), + ready=nodes.get("ready"), + not_ready=nodes.get("not_ready"), + workers=workers_str, + ) + ) + if not isinstance(summary.get("nodes_summary"), dict): + return + not_ready_names = summary["nodes_summary"].get("not_ready_names") or [] + if not_ready_names: + lines.append("nodes_not_ready: " + _format_names(not_ready_names)) + by_arch = summary["nodes_summary"].get("by_arch") or {} + if isinstance(by_arch, dict) and by_arch: + lines.append("archs: " + _format_kv_map(by_arch)) + by_role = summary["nodes_summary"].get("by_role") or {} + if isinstance(by_role, dict) and by_role: + lines.append("roles: " + _format_kv_map(by_role)) + + +def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None: + hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {} + if not hardware: + return + parts = [] + for key, names in hardware.items(): + if not isinstance(names, list): + continue + label = f"{key}={len(names)}" + name_list = _format_names([str(name) for name in names if name]) + if name_list: + label = f"{label} ({name_list})" + parts.append(label) + if parts: + lines.append("hardware: " + "; ".join(sorted(parts))) + + +def _append_pods(lines: list[str], summary: dict[str, Any]) -> None: + pods = summary.get("pods") if isinstance(summary.get("pods"), dict) else {} + if not pods: + return + lines.append( + "pods: running={running}, pending={pending}, failed={failed}, succeeded={succeeded}".format( + running=pods.get("running"), + pending=pods.get("pending"), + failed=pods.get("failed"), + succeeded=pods.get("succeeded"), + ) + ) + + +def _append_restarts(lines: list[str], summary: dict[str, Any]) -> None: + metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} + top_restarts = metrics.get("top_restarts_1h") or [] + if not isinstance(top_restarts, list) or not top_restarts: + return + parts = [] + for entry in top_restarts: + metric = entry.get("metric") if isinstance(entry, dict) else {} + value = entry.get("value") if isinstance(entry, dict) else [] + if not isinstance(metric, dict) or not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN: + continue + namespace = metric.get("namespace") + pod = metric.get("pod") + count = _format_float(value[1]) + if namespace and pod: + parts.append(f"{namespace}/{pod}={count}") + if parts: + lines.append("restarts_1h_top: " + "; ".join(parts)) + + +def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None: + postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {} + if not postgres: + return + hottest = postgres.get("hottest_db") or "" + lines.append( + "postgres: used={used}, max={max}, hottest_db={hottest}".format( + used=postgres.get("used"), + max=postgres.get("max"), + hottest=hottest, + ) + ) + + +def _append_hottest(lines: list[str], summary: dict[str, Any]) -> None: + hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {} + if not hottest: + return + parts = [] + for key, entry in hottest.items(): + if not isinstance(entry, dict): + continue + node = entry.get("node") + if key in {"net", "io"}: + value = _format_rate_bytes(entry.get("value")) + else: + value = _format_float(entry.get("value")) + if node: + parts.append(f"{key}={node} ({value})") + if parts: + lines.append("hottest: " + "; ".join(parts)) + + +def _append_workloads(lines: list[str], summary: dict[str, Any]) -> None: + workloads = summary.get("workloads") + if not isinstance(workloads, list) or not workloads: + return + lines.append(f"workloads: total={len(workloads)}") + top_workloads = sorted( + (item for item in workloads if isinstance(item, dict)), + key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or ""), + )[:5] + if not top_workloads: + return + parts = [] + for item in top_workloads: + namespace = item.get("namespace") + name = item.get("workload") + pods_total = item.get("pods_total") + primary = item.get("primary_node") + if namespace and name: + label = f"{namespace}/{name}={pods_total}" + if primary: + label = f"{label} (primary={primary})" + parts.append(label) + if parts: + lines.append("workloads_top: " + "; ".join(parts)) + + +def _append_flux(lines: list[str], summary: dict[str, Any]) -> None: + flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {} + if not flux: + return + not_ready = flux.get("not_ready") + if not_ready is not None: + lines.append(f"flux_not_ready: {not_ready}") + + +def _append_units_windows(lines: list[str], summary: dict[str, Any]) -> None: + metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} + units = metrics.get("units") if isinstance(metrics.get("units"), dict) else {} + windows = metrics.get("windows") if isinstance(metrics.get("windows"), dict) else {} + if units: + lines.append("units: " + _format_kv_map(units)) + else: + lines.append("units: cpu_pct, ram_pct, net=bytes_per_sec, io=bytes_per_sec") + if windows: + lines.append("windows: " + _format_kv_map(windows)) + else: + lines.append("windows: rates=5m, restarts=1h") + + def summary_text(snapshot: dict[str, Any] | None) -> str: summary = build_summary(snapshot) if not summary: return "" lines: list[str] = [] - - nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {} - if nodes: - workers = {} - if isinstance(summary.get("nodes_summary"), dict): - workers = summary["nodes_summary"].get("workers") or {} - workers_total = workers.get("total") - workers_ready = workers.get("ready") - workers_str = "" - if workers_total is not None and workers_ready is not None: - workers_str = f", workers_ready={workers_ready}/{workers_total}" - lines.append( - "nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format( - total=nodes.get("total"), - ready=nodes.get("ready"), - not_ready=nodes.get("not_ready"), - workers=workers_str, - ) - ) - if isinstance(summary.get("nodes_summary"), dict): - not_ready_names = summary["nodes_summary"].get("not_ready_names") or [] - if not_ready_names: - lines.append("nodes_not_ready: " + _format_names(not_ready_names)) - by_arch = summary["nodes_summary"].get("by_arch") or {} - if isinstance(by_arch, dict) and by_arch: - lines.append("archs: " + _format_kv_map(by_arch)) - by_role = summary["nodes_summary"].get("by_role") or {} - if isinstance(by_role, dict) and by_role: - lines.append("roles: " + _format_kv_map(by_role)) - - hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {} - if hardware: - parts = [] - for key, names in hardware.items(): - if not isinstance(names, list): - continue - label = f"{key}={len(names)}" - name_list = _format_names([str(name) for name in names if name]) - if name_list: - label = f"{label} ({name_list})" - parts.append(label) - if parts: - lines.append("hardware: " + "; ".join(sorted(parts))) - - pods = summary.get("pods") if isinstance(summary.get("pods"), dict) else {} - if pods: - lines.append( - "pods: running={running}, pending={pending}, failed={failed}, succeeded={succeeded}".format( - running=pods.get("running"), - pending=pods.get("pending"), - failed=pods.get("failed"), - succeeded=pods.get("succeeded"), - ) - ) - if isinstance(summary.get("metrics"), dict): - top_restarts = summary["metrics"].get("top_restarts_1h") or [] - if isinstance(top_restarts, list) and top_restarts: - parts = [] - for entry in top_restarts: - metric = entry.get("metric") if isinstance(entry, dict) else {} - value = entry.get("value") if isinstance(entry, dict) else [] - if not isinstance(metric, dict) or not isinstance(value, list) or len(value) < 2: - continue - namespace = metric.get("namespace") - pod = metric.get("pod") - count = _format_float(value[1]) - if namespace and pod: - parts.append(f"{namespace}/{pod}={count}") - if parts: - lines.append("restarts_1h_top: " + "; ".join(parts)) - - postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {} - if postgres: - hottest = postgres.get("hottest_db") or "" - lines.append( - "postgres: used={used}, max={max}, hottest_db={hottest}".format( - used=postgres.get("used"), - max=postgres.get("max"), - hottest=hottest, - ) - ) - - hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {} - if hottest: - parts = [] - for key, entry in hottest.items(): - if not isinstance(entry, dict): - continue - node = entry.get("node") - if key in {"net", "io"}: - value = _format_rate_bytes(entry.get("value")) - else: - value = _format_float(entry.get("value")) - if node: - parts.append(f"{key}={node} ({value})") - if parts: - lines.append("hottest: " + "; ".join(parts)) - - workloads = summary.get("workloads") - if isinstance(workloads, list) and workloads: - lines.append(f"workloads: total={len(workloads)}") - top_workloads = sorted( - (item for item in workloads if isinstance(item, dict)), - key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or ""), - )[:5] - if top_workloads: - parts = [] - for item in top_workloads: - namespace = item.get("namespace") - name = item.get("workload") - pods_total = item.get("pods_total") - primary = item.get("primary_node") - if namespace and name: - label = f"{namespace}/{name}={pods_total}" - if primary: - label = f"{label} (primary={primary})" - parts.append(label) - if parts: - lines.append("workloads_top: " + "; ".join(parts)) - - flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {} - if flux: - not_ready = flux.get("not_ready") - if not_ready is not None: - lines.append(f"flux_not_ready: {not_ready}") - - lines.append("units: cpu_pct, ram_pct, net=bytes_per_sec, io=bytes_per_sec") - lines.append("windows: rates=5m, restarts=1h") - + _append_nodes(lines, summary) + _append_hardware(lines, summary) + _append_pods(lines, summary) + _append_restarts(lines, summary) + _append_postgres(lines, summary) + _append_hottest(lines, summary) + _append_workloads(lines, summary) + _append_flux(lines, summary) + _append_units_windows(lines, summary) return "\n".join(lines)