import logging import time from typing import Any import httpx from atlasbot.config import Settings log = logging.getLogger(__name__) _BYTES_KB = 1024 _BYTES_MB = 1024 * 1024 _BYTES_GB = 1024 * 1024 * 1024 _VALUE_PAIR_LEN = 2 class SnapshotProvider: def __init__(self, settings: Settings) -> None: self._settings = settings self._cache: dict[str, Any] = {} self._cache_ts = 0.0 def _cache_valid(self) -> bool: return time.monotonic() - self._cache_ts < max(5, self._settings.snapshot_ttl_sec) def get(self) -> dict[str, Any] | None: if self._cache and self._cache_valid(): return self._cache if not self._settings.ariadne_state_url: return self._cache or None headers = {} if self._settings.ariadne_state_token: headers["x-internal-token"] = self._settings.ariadne_state_token try: resp = httpx.get(self._settings.ariadne_state_url, headers=headers, timeout=10.0) resp.raise_for_status() payload = resp.json() if isinstance(payload, dict): self._cache = payload self._cache_ts = time.monotonic() return payload except Exception as exc: log.warning("snapshot fetch failed", extra={"extra": {"error": str(exc)}}) return self._cache or None def _node_usage_top(series: list[dict[str, Any]]) -> dict[str, Any] | None: best = None for entry in series or []: if not isinstance(entry, dict): continue node = entry.get("node") value = entry.get("value") try: numeric = float(value) except (TypeError, ValueError): continue if best is None or numeric > best["value"]: best = {"node": node, "value": numeric} return best def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]: if not snapshot: return {} nodes_detail = _nodes_detail(snapshot) metrics = _metrics(snapshot) summary: dict[str, Any] = {} if isinstance(snapshot.get("nodes_summary"), dict): summary["nodes_summary"] = snapshot.get("nodes_summary") if metrics: summary["metrics"] = metrics summary.update(_build_nodes(snapshot)) summary.update(_build_pressure(snapshot)) summary.update(_build_hardware(nodes_detail)) summary.update(_build_node_ages(nodes_detail)) summary.update(_build_capacity(metrics)) summary.update(_build_pods(metrics)) summary.update(_build_namespace_pods(snapshot)) summary.update(_build_namespace_nodes(snapshot)) summary.update(_build_node_pods(snapshot)) summary.update(_build_pod_issues(snapshot)) summary.update(_build_workload_health(snapshot)) summary.update(_build_events(snapshot)) summary.update(_build_postgres(metrics)) summary.update(_build_hottest(metrics)) summary.update(_build_pvc(metrics)) summary.update(_build_workloads(snapshot)) summary.update(_build_flux(snapshot)) return summary def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: items = snapshot.get("nodes_detail") return items if isinstance(items, list) else [] def _metrics(snapshot: dict[str, Any]) -> dict[str, Any]: metrics = snapshot.get("metrics") return metrics if isinstance(metrics, dict) else {} def _build_nodes(snapshot: dict[str, Any]) -> dict[str, Any]: nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {} if not nodes_summary: return {} return { "nodes": { "total": nodes_summary.get("total"), "ready": nodes_summary.get("ready"), "not_ready": nodes_summary.get("not_ready"), } } def _build_pressure(snapshot: dict[str, Any]) -> dict[str, Any]: nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {} pressure = nodes_summary.get("pressure_nodes") if isinstance(nodes_summary.get("pressure_nodes"), dict) else {} if not pressure: return {} return {"pressure_nodes": pressure} def _build_hardware(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]: hardware: dict[str, list[str]] = {} for node in nodes_detail or []: if not isinstance(node, dict): continue name = node.get("name") hardware_class = node.get("hardware") or "unknown" if name: hardware.setdefault(hardware_class, []).append(name) if not hardware: return {} return {"hardware": {key: sorted(value) for key, value in hardware.items()}} def _build_node_ages(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]: ages: list[dict[str, Any]] = [] for node in nodes_detail or []: if not isinstance(node, dict): continue name = node.get("name") age = node.get("age_hours") if name and isinstance(age, (int, float)): ages.append({"name": name, "age_hours": age}) ages.sort(key=lambda item: -(item.get("age_hours") or 0)) return {"node_ages": ages[:5]} if ages else {} def _build_pods(metrics: dict[str, Any]) -> dict[str, Any]: pods = { "running": metrics.get("pods_running"), "pending": metrics.get("pods_pending"), "failed": metrics.get("pods_failed"), "succeeded": metrics.get("pods_succeeded"), } if not any(value is not None for value in pods.values()): return {} return {"pods": pods} def _build_capacity(metrics: dict[str, Any]) -> dict[str, Any]: if not metrics: return {} capacity = { "cpu": metrics.get("capacity_cpu"), "allocatable_cpu": metrics.get("allocatable_cpu"), "mem_bytes": metrics.get("capacity_mem_bytes"), "allocatable_mem_bytes": metrics.get("allocatable_mem_bytes"), "pods": metrics.get("capacity_pods"), "allocatable_pods": metrics.get("allocatable_pods"), } if not any(value is not None for value in capacity.values()): return {} return {"capacity": capacity} def _build_namespace_pods(snapshot: dict[str, Any]) -> dict[str, Any]: namespaces = snapshot.get("namespace_pods") if not isinstance(namespaces, list) or not namespaces: return {} return {"namespace_pods": namespaces} def _build_namespace_nodes(snapshot: dict[str, Any]) -> dict[str, Any]: namespace_nodes = snapshot.get("namespace_nodes") if not isinstance(namespace_nodes, list) or not namespace_nodes: return {} return {"namespace_nodes": namespace_nodes} def _build_node_pods(snapshot: dict[str, Any]) -> dict[str, Any]: node_pods = snapshot.get("node_pods") if not isinstance(node_pods, list) or not node_pods: return {} return {"node_pods": node_pods} def _build_pod_issues(snapshot: dict[str, Any]) -> dict[str, Any]: pod_issues = snapshot.get("pod_issues") if not isinstance(pod_issues, dict) or not pod_issues: return {} return {"pod_issues": pod_issues} def _build_workload_health(snapshot: dict[str, Any]) -> dict[str, Any]: health = snapshot.get("workloads_health") if not isinstance(health, dict) or not health: return {} deployments = health.get("deployments") statefulsets = health.get("statefulsets") daemonsets = health.get("daemonsets") if not isinstance(deployments, dict) or not isinstance(statefulsets, dict) or not isinstance(daemonsets, dict): return {} return { "workloads_health": { "deployments": deployments, "statefulsets": statefulsets, "daemonsets": daemonsets, } } def _build_events(snapshot: dict[str, Any]) -> dict[str, Any]: events = snapshot.get("events") if not isinstance(events, dict) or not events: return {} return {"events": events} def _build_postgres(metrics: dict[str, Any]) -> dict[str, Any]: postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} if not postgres: return {} return { "postgres": { "used": postgres.get("used"), "max": postgres.get("max"), "hottest_db": postgres.get("hottest_db"), } } def _build_hottest(metrics: dict[str, Any]) -> dict[str, Any]: node_usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {} hottest: dict[str, Any] = {} for key in ("cpu", "ram", "net", "io", "disk"): top = _node_usage_top(node_usage.get(key, [])) if top: hottest[key] = top if not hottest: return {} return {"hottest": hottest} def _build_pvc(metrics: dict[str, Any]) -> dict[str, Any]: pvc_usage = metrics.get("pvc_usage_top") if isinstance(metrics.get("pvc_usage_top"), list) else [] if not pvc_usage: return {} return {"pvc_usage_top": pvc_usage} def _build_workloads(snapshot: dict[str, Any]) -> dict[str, Any]: workloads = snapshot.get("workloads") if isinstance(snapshot.get("workloads"), list) else [] return {"workloads": workloads} def _build_flux(snapshot: dict[str, Any]) -> dict[str, Any]: flux = snapshot.get("flux") if isinstance(snapshot.get("flux"), dict) else {} return {"flux": flux} def _format_float(value: Any) -> str: try: numeric = float(value) except (TypeError, ValueError): return str(value) return f"{numeric:.2f}".rstrip("0").rstrip(".") def _format_rate_bytes(value: Any) -> str: try: numeric = float(value) except (TypeError, ValueError): return str(value) if numeric >= _BYTES_MB: return f"{numeric / _BYTES_MB:.2f} MB/s" if numeric >= _BYTES_KB: return f"{numeric / _BYTES_KB:.2f} KB/s" return f"{numeric:.2f} B/s" def _format_bytes(value: Any) -> str: try: numeric = float(value) except (TypeError, ValueError): return str(value) if numeric >= _BYTES_GB: return f"{numeric / _BYTES_GB:.2f} GB" if numeric >= _BYTES_MB: return f"{numeric / _BYTES_MB:.2f} MB" if numeric >= _BYTES_KB: return f"{numeric / _BYTES_KB:.2f} KB" return f"{numeric:.2f} B" def _format_kv_map(values: dict[str, Any]) -> str: parts = [] for key, value in values.items(): parts.append(f"{key}={value}") return ", ".join(parts) def _format_names(names: list[str]) -> str: if not names: return "" return ", ".join(sorted(names)) def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None: nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {} if not nodes: return workers = {} if isinstance(summary.get("nodes_summary"), dict): workers = summary["nodes_summary"].get("workers") or {} workers_total = workers.get("total") workers_ready = workers.get("ready") workers_str = "" if workers_total is not None and workers_ready is not None: workers_str = f", workers_ready={workers_ready}/{workers_total}" lines.append( "nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format( total=nodes.get("total"), ready=nodes.get("ready"), not_ready=nodes.get("not_ready"), workers=workers_str, ) ) if not isinstance(summary.get("nodes_summary"), dict): return not_ready_names = summary["nodes_summary"].get("not_ready_names") or [] if not_ready_names: lines.append("nodes_not_ready: " + _format_names(not_ready_names)) by_arch = summary["nodes_summary"].get("by_arch") or {} if isinstance(by_arch, dict) and by_arch: lines.append("archs: " + _format_kv_map(by_arch)) by_role = summary["nodes_summary"].get("by_role") or {} if isinstance(by_role, dict) and by_role: lines.append("roles: " + _format_kv_map(by_role)) def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None: hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {} if not hardware: return parts = [] for key, names in hardware.items(): if not isinstance(names, list): continue label = f"{key}={len(names)}" name_list = _format_names([str(name) for name in names if name]) if name_list: label = f"{label} ({name_list})" parts.append(label) if parts: lines.append("hardware: " + "; ".join(sorted(parts))) def _append_node_ages(lines: list[str], summary: dict[str, Any]) -> None: ages = summary.get("node_ages") if isinstance(summary.get("node_ages"), list) else [] if not ages: return parts = [] for entry in ages[:3]: if not isinstance(entry, dict): continue name = entry.get("name") age = entry.get("age_hours") if name and isinstance(age, (int, float)): parts.append(f"{name}={_format_float(age)}h") if parts: lines.append("node_age_top: " + "; ".join(parts)) def _append_pressure(lines: list[str], summary: dict[str, Any]) -> None: pressure = summary.get("pressure_nodes") if not isinstance(pressure, dict) or not pressure: return parts = [] for cond, nodes in sorted(pressure.items()): if not nodes: continue name_list = _format_names([str(name) for name in nodes if name]) parts.append(f"{cond}={len(nodes)} ({name_list})" if name_list else f"{cond}={len(nodes)}") if parts: lines.append("node_pressure: " + "; ".join(parts)) def _append_pods(lines: list[str], summary: dict[str, Any]) -> None: pods = summary.get("pods") if isinstance(summary.get("pods"), dict) else {} if not pods: return lines.append( "pods: running={running}, pending={pending}, failed={failed}, succeeded={succeeded}".format( running=pods.get("running"), pending=pods.get("pending"), failed=pods.get("failed"), succeeded=pods.get("succeeded"), ) ) def _append_capacity(lines: list[str], summary: dict[str, Any]) -> None: capacity = summary.get("capacity") if isinstance(summary.get("capacity"), dict) else {} if not capacity: return parts = [] if capacity.get("cpu") is not None: parts.append(f"cpu={_format_float(capacity.get('cpu'))}") if capacity.get("allocatable_cpu") is not None: parts.append(f"alloc_cpu={_format_float(capacity.get('allocatable_cpu'))}") if capacity.get("mem_bytes") is not None: parts.append(f"mem={_format_bytes(capacity.get('mem_bytes'))}") if capacity.get("allocatable_mem_bytes") is not None: parts.append(f"alloc_mem={_format_bytes(capacity.get('allocatable_mem_bytes'))}") if capacity.get("pods") is not None: parts.append(f"pods={_format_float(capacity.get('pods'))}") if capacity.get("allocatable_pods") is not None: parts.append(f"alloc_pods={_format_float(capacity.get('allocatable_pods'))}") if parts: lines.append("capacity: " + "; ".join(parts)) def _append_namespace_pods(lines: list[str], summary: dict[str, Any]) -> None: namespaces = summary.get("namespace_pods") if not isinstance(namespaces, list) or not namespaces: return top = sorted( (item for item in namespaces if isinstance(item, dict)), key=lambda item: (-int(item.get("pods_total") or 0), item.get("namespace") or ""), )[:8] parts = [] for item in top: name = item.get("namespace") total = item.get("pods_total") running = item.get("pods_running") if not name: continue label = f"{name}={total}" if running is not None: label = f"{label} (running={running})" parts.append(label) if parts: lines.append("namespaces_top: " + "; ".join(parts)) def _append_namespace_nodes(lines: list[str], summary: dict[str, Any]) -> None: namespace_nodes = summary.get("namespace_nodes") if not isinstance(namespace_nodes, list) or not namespace_nodes: return top = sorted( (item for item in namespace_nodes if isinstance(item, dict)), key=lambda item: (-int(item.get("pods_total") or 0), item.get("namespace") or ""), )[:8] parts = [] for item in top: namespace = item.get("namespace") pods_total = item.get("pods_total") primary = item.get("primary_node") if namespace: label = f"{namespace}={pods_total}" if primary: label = f"{label} (primary={primary})" parts.append(label) if parts: lines.append("namespace_nodes_top: " + "; ".join(parts)) def _append_node_pods(lines: list[str], summary: dict[str, Any]) -> None: node_pods = summary.get("node_pods") if not isinstance(node_pods, list) or not node_pods: return top = sorted( (item for item in node_pods if isinstance(item, dict)), key=lambda item: (-int(item.get("pods_total") or 0), item.get("node") or ""), )[:8] parts = [] for item in top: node = item.get("node") pods_total = item.get("pods_total") namespaces = item.get("namespaces_top") or [] ns_label = "" if namespaces: ns_label = ", ".join([f"{name}={count}" for name, count in namespaces]) if node: label = f"{node}={pods_total}" if ns_label: label = f"{label} ({ns_label})" parts.append(label) if parts: lines.append("node_pods_top: " + "; ".join(parts)) def _append_pod_issues(lines: list[str], summary: dict[str, Any]) -> None: pod_issues = summary.get("pod_issues") if isinstance(summary.get("pod_issues"), dict) else {} if not pod_issues: return counts_line = _format_pod_issue_counts(pod_issues) if counts_line: lines.append(counts_line) top_line = _format_pod_issue_top(pod_issues) if top_line: lines.append(top_line) def _format_pod_issue_counts(pod_issues: dict[str, Any]) -> str: counts = pod_issues.get("counts") if isinstance(pod_issues.get("counts"), dict) else {} if not counts: return "" parts = [] for key in ("Failed", "Pending", "Unknown"): if key in counts: parts.append(f"{key}={counts.get(key)}") return "pod_issues: " + "; ".join(parts) if parts else "" def _format_pod_issue_top(pod_issues: dict[str, Any]) -> str: items = pod_issues.get("items") if isinstance(pod_issues.get("items"), list) else [] if not items: return "" top = [] for item in items[:5]: if not isinstance(item, dict): continue namespace = item.get("namespace") pod = item.get("pod") if not namespace or not pod: continue phase = item.get("phase") or "" restarts = item.get("restarts") or 0 top.append(f"{namespace}/{pod}({phase},r={restarts})") return "pod_issues_top: " + "; ".join(top) if top else "" def _append_workload_health(lines: list[str], summary: dict[str, Any]) -> None: health = summary.get("workloads_health") if isinstance(summary.get("workloads_health"), dict) else {} if not health: return deployments = health.get("deployments") if isinstance(health.get("deployments"), dict) else {} statefulsets = health.get("statefulsets") if isinstance(health.get("statefulsets"), dict) else {} daemonsets = health.get("daemonsets") if isinstance(health.get("daemonsets"), dict) else {} total_not_ready = 0 for entry in (deployments, statefulsets, daemonsets): total_not_ready += int(entry.get("not_ready") or 0) lines.append( "workloads_not_ready: " f"deployments={deployments.get('not_ready', 0)}, " f"statefulsets={statefulsets.get('not_ready', 0)}, " f"daemonsets={daemonsets.get('not_ready', 0)} " f"(total={total_not_ready})" ) def _append_node_usage_stats(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} stats = metrics.get("node_usage_stats") if isinstance(metrics.get("node_usage_stats"), dict) else {} if not stats: return parts = [] for key in ("cpu", "ram", "net", "io", "disk"): entry = stats.get(key) if isinstance(stats.get(key), dict) else {} avg = entry.get("avg") if avg is None: continue if key in {"net", "io"}: value = _format_rate_bytes(avg) else: value = _format_float(avg) parts.append(f"{key}={value}") if parts: lines.append("node_usage_avg: " + "; ".join(parts)) def _append_events(lines: list[str], summary: dict[str, Any]) -> None: events = summary.get("events") if isinstance(summary.get("events"), dict) else {} if not events: return total = events.get("warnings_total") by_reason = events.get("warnings_by_reason") if isinstance(events.get("warnings_by_reason"), dict) else {} if total is None: return if by_reason: top = sorted(by_reason.items(), key=lambda item: (-item[1], item[0]))[:3] reasons = "; ".join([f"{reason}={count}" for reason, count in top]) lines.append(f"warnings: total={total}; top={reasons}") else: lines.append(f"warnings: total={total}") def _append_pvc_usage(lines: list[str], summary: dict[str, Any]) -> None: pvc_usage = summary.get("pvc_usage_top") if not isinstance(pvc_usage, list) or not pvc_usage: return parts = [] for entry in pvc_usage: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") pvc = metric.get("persistentvolumeclaim") value = entry.get("value") if namespace and pvc: parts.append(f"{namespace}/{pvc}={_format_float(value)}%") if parts: lines.append("pvc_usage_top: " + "; ".join(parts)) def _append_namespace_usage(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} cpu_top = metrics.get("namespace_cpu_top") if isinstance(metrics.get("namespace_cpu_top"), list) else [] mem_top = metrics.get("namespace_mem_top") if isinstance(metrics.get("namespace_mem_top"), list) else [] if cpu_top: parts = [] for entry in cpu_top: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") value = entry.get("value") if namespace: parts.append(f"{namespace}={_format_float(value)}") if parts: lines.append("namespace_cpu_top: " + "; ".join(parts)) if mem_top: parts = [] for entry in mem_top: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") value = entry.get("value") if namespace: parts.append(f"{namespace}={_format_bytes(value)}") if parts: lines.append("namespace_mem_top: " + "; ".join(parts)) def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} cpu_top = metrics.get("pod_cpu_top") if isinstance(metrics.get("pod_cpu_top"), list) else [] mem_top = metrics.get("pod_mem_top") if isinstance(metrics.get("pod_mem_top"), list) else [] if cpu_top: parts = [] for entry in cpu_top: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") value = entry.get("value") if namespace and pod and value is not None: parts.append(f"{namespace}/{pod}={_format_float(value)}") if parts: lines.append("pod_cpu_top: " + "; ".join(parts)) if mem_top: parts = [] for entry in mem_top: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") pod = metric.get("pod") value = entry.get("value") if namespace and pod and value is not None: parts.append(f"{namespace}/{pod}={_format_bytes(value)}") if parts: lines.append("pod_mem_top: " + "; ".join(parts)) def _append_restarts(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} top_restarts = metrics.get("top_restarts_1h") or [] if not isinstance(top_restarts, list) or not top_restarts: return parts = [] for entry in top_restarts: metric = entry.get("metric") if isinstance(entry, dict) else {} value = entry.get("value") if isinstance(entry, dict) else [] if not isinstance(metric, dict) or not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN: continue namespace = metric.get("namespace") pod = metric.get("pod") count = _format_float(value[1]) if namespace and pod: parts.append(f"{namespace}/{pod}={count}") if parts: lines.append("restarts_1h_top: " + "; ".join(parts)) def _append_job_failures(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} failures = metrics.get("job_failures_24h") if isinstance(metrics.get("job_failures_24h"), list) else [] if not failures: return parts = [] for entry in failures: metric = entry.get("metric") if isinstance(entry, dict) else {} namespace = metric.get("namespace") job_name = metric.get("job_name") or metric.get("job") value = entry.get("value") if namespace and job_name and value is not None: parts.append(f"{namespace}/{job_name}={_format_float(value)}") if parts: lines.append("job_failures_24h: " + "; ".join(parts)) def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None: postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {} if not postgres: return hottest = postgres.get("hottest_db") or "" lines.append( "postgres: used={used}, max={max}, hottest_db={hottest}".format( used=postgres.get("used"), max=postgres.get("max"), hottest=hottest, ) ) def _append_hottest(lines: list[str], summary: dict[str, Any]) -> None: hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {} if not hottest: return parts = [] for key, entry in hottest.items(): if not isinstance(entry, dict): continue node = entry.get("node") if key in {"net", "io"}: value = _format_rate_bytes(entry.get("value")) else: value = _format_float(entry.get("value")) if node: parts.append(f"{key}={node} ({value})") if parts: lines.append("hottest: " + "; ".join(parts)) def _append_workloads(lines: list[str], summary: dict[str, Any]) -> None: workloads = summary.get("workloads") if not isinstance(workloads, list) or not workloads: return lines.append(f"workloads: total={len(workloads)}") top_workloads = sorted( (item for item in workloads if isinstance(item, dict)), key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or ""), )[:5] if not top_workloads: return parts = [] for item in top_workloads: namespace = item.get("namespace") name = item.get("workload") pods_total = item.get("pods_total") primary = item.get("primary_node") if namespace and name: label = f"{namespace}/{name}={pods_total}" if primary: label = f"{label} (primary={primary})" parts.append(label) if parts: lines.append("workloads_top: " + "; ".join(parts)) def _append_flux(lines: list[str], summary: dict[str, Any]) -> None: flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {} if not flux: return not_ready = flux.get("not_ready") if not_ready is not None: lines.append(f"flux_not_ready: {not_ready}") def _append_units_windows(lines: list[str], summary: dict[str, Any]) -> None: metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {} units = metrics.get("units") if isinstance(metrics.get("units"), dict) else {} windows = metrics.get("windows") if isinstance(metrics.get("windows"), dict) else {} if units: lines.append("units: " + _format_kv_map(units)) else: lines.append("units: cpu_pct, ram_pct, net=bytes_per_sec, io=bytes_per_sec") if windows: lines.append("windows: " + _format_kv_map(windows)) else: lines.append("windows: rates=5m, restarts=1h") def summary_text(snapshot: dict[str, Any] | None) -> str: summary = build_summary(snapshot) if not summary: return "" lines: list[str] = [] _append_nodes(lines, summary) _append_pressure(lines, summary) _append_hardware(lines, summary) _append_node_ages(lines, summary) _append_capacity(lines, summary) _append_pods(lines, summary) _append_namespace_pods(lines, summary) _append_namespace_nodes(lines, summary) _append_node_pods(lines, summary) _append_pod_issues(lines, summary) _append_workload_health(lines, summary) _append_events(lines, summary) _append_node_usage_stats(lines, summary) _append_namespace_usage(lines, summary) _append_pod_usage(lines, summary) _append_restarts(lines, summary) _append_job_failures(lines, summary) _append_postgres(lines, summary) _append_hottest(lines, summary) _append_pvc_usage(lines, summary) _append_workloads(lines, summary) _append_flux(lines, summary) _append_units_windows(lines, summary) return "\n".join(lines)