1721 lines
67 KiB
Python

import logging
import time
from typing import Any
import httpx
from atlasbot.config import Settings
log = logging.getLogger(__name__)
_BYTES_KB = 1024
_BYTES_MB = 1024 * 1024
_BYTES_GB = 1024 * 1024 * 1024
_VALUE_PAIR_LEN = 2
class SnapshotProvider:
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._cache: dict[str, Any] = {}
self._cache_ts = 0.0
def _cache_valid(self) -> bool:
return time.monotonic() - self._cache_ts < max(5, self._settings.snapshot_ttl_sec)
def get(self) -> dict[str, Any] | None:
if self._cache and self._cache_valid():
return self._cache
if not self._settings.ariadne_state_url:
return self._cache or None
headers = {}
if self._settings.ariadne_state_token:
headers["x-internal-token"] = self._settings.ariadne_state_token
try:
resp = httpx.get(self._settings.ariadne_state_url, headers=headers, timeout=10.0)
resp.raise_for_status()
payload = resp.json()
if isinstance(payload, dict):
self._cache = payload
self._cache_ts = time.monotonic()
return payload
except Exception as exc:
log.warning("snapshot fetch failed", extra={"extra": {"error": str(exc)}})
return self._cache or None
def _node_usage_top(series: list[dict[str, Any]]) -> dict[str, Any] | None:
best = None
for entry in series or []:
if not isinstance(entry, dict):
continue
node = entry.get("node")
value = entry.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best is None or numeric > best["value"]:
best = {"node": node, "value": numeric}
return best
def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
nodes_detail = _nodes_detail(snapshot)
metrics = _metrics(snapshot)
summary: dict[str, Any] = {}
if isinstance(snapshot.get("nodes_summary"), dict):
summary["nodes_summary"] = snapshot.get("nodes_summary")
if metrics:
summary["metrics"] = metrics
if isinstance(snapshot.get("jobs"), dict):
summary["jobs"] = snapshot.get("jobs")
summary.update(_build_nodes(snapshot))
summary.update(_build_pressure(snapshot))
summary.update(_build_hardware(nodes_detail))
summary.update(_build_hardware_by_node(nodes_detail))
summary.update(_build_hardware_usage(metrics, summary.get("hardware_by_node")))
summary.update(_build_node_facts(nodes_detail))
summary.update(_build_node_ages(nodes_detail))
summary.update(_build_node_taints(nodes_detail))
summary.update(_build_capacity(metrics))
summary.update(_build_pods(metrics))
summary.update(_build_namespace_pods(snapshot))
summary.update(_build_namespace_nodes(snapshot))
summary.update(_build_node_pods(snapshot))
summary.update(_build_node_pods_top(metrics))
summary.update(_build_pod_issues(snapshot))
summary.update(_build_workload_health(snapshot))
summary.update(_build_events(snapshot))
summary.update(_build_event_summary(snapshot))
summary.update(_build_postgres(metrics))
summary.update(_build_hottest(metrics))
summary.update(_build_pvc(metrics))
summary.update(_build_namespace_capacity(metrics))
summary.update(_build_namespace_capacity_summary(metrics))
summary.update(_build_longhorn(snapshot))
summary.update(_build_root_disk_headroom(metrics))
summary.update(_build_node_load(metrics))
summary.update(_build_node_load_summary(metrics))
summary.update(_build_cluster_watchlist(summary))
summary.update(_build_workloads(snapshot))
summary.update(_build_flux(snapshot))
_merge_cluster_summary(snapshot, summary)
return summary
def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) -> None:
cluster_summary = snapshot.get("summary") if isinstance(snapshot.get("summary"), dict) else {}
if not cluster_summary:
return
signals = cluster_summary.get("signals")
profiles = cluster_summary.get("profiles")
inventory = cluster_summary.get("inventory")
topology = cluster_summary.get("topology")
if isinstance(signals, list):
summary["signals"] = signals
if isinstance(profiles, dict):
summary["profiles"] = profiles
if isinstance(inventory, dict):
summary["inventory"] = inventory
if isinstance(topology, dict):
summary["topology"] = topology
def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
items = snapshot.get("nodes_detail")
return items if isinstance(items, list) else []
def _metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _build_nodes(snapshot: dict[str, Any]) -> dict[str, Any]:
nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {}
if not nodes_summary:
return {}
return {
"nodes": {
"total": nodes_summary.get("total"),
"ready": nodes_summary.get("ready"),
"not_ready": nodes_summary.get("not_ready"),
}
}
def _build_pressure(snapshot: dict[str, Any]) -> dict[str, Any]:
nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {}
pressure = nodes_summary.get("pressure_nodes") if isinstance(nodes_summary.get("pressure_nodes"), dict) else {}
if not pressure:
return {}
return {"pressure_nodes": pressure}
def _build_hardware(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
hardware: dict[str, list[str]] = {}
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
name = node.get("name")
hardware_class = node.get("hardware") or "unknown"
if name:
hardware.setdefault(hardware_class, []).append(name)
if not hardware:
return {}
return {"hardware": {key: sorted(value) for key, value in hardware.items()}}
def _build_hardware_by_node(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
mapping: dict[str, str] = {}
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
name = node.get("name")
if isinstance(name, str) and name:
hardware = node.get("hardware") or "unknown"
mapping[name] = str(hardware)
return {"hardware_by_node": mapping} if mapping else {}
def _build_hardware_usage(metrics: dict[str, Any], hardware_by_node: dict[str, Any] | None) -> dict[str, Any]:
if not isinstance(hardware_by_node, dict) or not hardware_by_node:
return {}
node_load = metrics.get("node_load") if isinstance(metrics.get("node_load"), list) else []
if not node_load:
return {}
buckets: dict[str, dict[str, list[float]]] = {}
for entry in node_load:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
hardware = hardware_by_node.get(node, "unknown")
bucket = buckets.setdefault(str(hardware), {"load_index": [], "cpu": [], "ram": [], "net": [], "io": []})
for key in ("load_index", "cpu", "ram", "net", "io"):
value = entry.get(key)
if isinstance(value, (int, float)):
bucket[key].append(float(value))
output: list[dict[str, Any]] = []
for hardware, metrics_bucket in buckets.items():
row: dict[str, Any] = {"hardware": hardware}
for key, values in metrics_bucket.items():
if values:
row[key] = sum(values) / len(values)
output.append(row)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("hardware") or ""))
return {"hardware_usage_avg": output}
def _build_node_ages(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
ages: list[dict[str, Any]] = []
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
name = node.get("name")
age = node.get("age_hours")
if name and isinstance(age, (int, float)):
ages.append({"name": name, "age_hours": age})
ages.sort(key=lambda item: -(item.get("age_hours") or 0))
return {"node_ages": ages[:5]} if ages else {}
def _count_values(nodes_detail: list[dict[str, Any]], key: str) -> dict[str, int]:
counts: dict[str, int] = {}
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
value = node.get(key)
if isinstance(value, str) and value:
counts[value] = counts.get(value, 0) + 1
return counts
def _build_node_facts(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
if not nodes_detail:
return {}
role_counts: dict[str, int] = {}
for node in nodes_detail:
if not isinstance(node, dict):
continue
if node.get("is_worker"):
role_counts["worker"] = role_counts.get("worker", 0) + 1
roles = node.get("roles")
if isinstance(roles, list):
for role in roles:
if isinstance(role, str) and role:
role_counts[role] = role_counts.get(role, 0) + 1
return {
"node_arch_counts": _count_values(nodes_detail, "arch"),
"node_os_counts": _count_values(nodes_detail, "os"),
"node_kubelet_versions": _count_values(nodes_detail, "kubelet"),
"node_kernel_versions": _count_values(nodes_detail, "kernel"),
"node_runtime_versions": _count_values(nodes_detail, "container_runtime"),
"node_role_counts": role_counts,
}
def _build_node_taints(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
taints: dict[str, list[str]] = {}
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
name = node.get("name")
if not name:
continue
entries = node.get("taints") if isinstance(node.get("taints"), list) else []
for entry in entries:
if not isinstance(entry, dict):
continue
key = entry.get("key")
effect = entry.get("effect")
if isinstance(key, str) and isinstance(effect, str):
label = f"{key}:{effect}"
taints.setdefault(label, []).append(name)
if not taints:
return {}
return {"node_taints": {key: sorted(names) for key, names in taints.items()}}
def _build_root_disk_headroom(metrics: dict[str, Any]) -> dict[str, Any]:
node_usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
disk = node_usage.get("disk") if isinstance(node_usage.get("disk"), list) else []
if not disk:
return {}
entries = []
for entry in disk:
if not isinstance(entry, dict):
continue
node = entry.get("node")
try:
used_pct = float(entry.get("value"))
except (TypeError, ValueError):
continue
headroom = max(0.0, 100.0 - used_pct)
if node:
entries.append({"node": node, "headroom_pct": headroom, "used_pct": used_pct})
entries.sort(key=lambda item: (item.get("headroom_pct") or 0.0, item.get("node") or ""))
return {"root_disk_low_headroom": entries[:5]} if entries else {}
def _build_longhorn(snapshot: dict[str, Any]) -> dict[str, Any]:
longhorn = snapshot.get("longhorn")
return {"longhorn": longhorn} if isinstance(longhorn, dict) and longhorn else {}
def _build_node_load(metrics: dict[str, Any]) -> dict[str, Any]:
node_load = metrics.get("node_load")
if not isinstance(node_load, list) or not node_load:
return {}
return {"node_load": node_load}
def _build_pods(metrics: dict[str, Any]) -> dict[str, Any]:
pods = {
"running": metrics.get("pods_running"),
"pending": metrics.get("pods_pending"),
"failed": metrics.get("pods_failed"),
"succeeded": metrics.get("pods_succeeded"),
}
if not any(value is not None for value in pods.values()):
return {}
return {"pods": pods}
def _build_capacity(metrics: dict[str, Any]) -> dict[str, Any]:
if not metrics:
return {}
capacity = {
"cpu": metrics.get("capacity_cpu"),
"allocatable_cpu": metrics.get("allocatable_cpu"),
"mem_bytes": metrics.get("capacity_mem_bytes"),
"allocatable_mem_bytes": metrics.get("allocatable_mem_bytes"),
"pods": metrics.get("capacity_pods"),
"allocatable_pods": metrics.get("allocatable_pods"),
}
if not any(value is not None for value in capacity.values()):
return {}
return {"capacity": capacity}
def _build_namespace_pods(snapshot: dict[str, Any]) -> dict[str, Any]:
namespaces = snapshot.get("namespace_pods")
if not isinstance(namespaces, list) or not namespaces:
return {}
return {"namespace_pods": namespaces}
def _build_namespace_nodes(snapshot: dict[str, Any]) -> dict[str, Any]:
namespace_nodes = snapshot.get("namespace_nodes")
if not isinstance(namespace_nodes, list) or not namespace_nodes:
return {}
return {"namespace_nodes": namespace_nodes}
def _build_node_pods(snapshot: dict[str, Any]) -> dict[str, Any]:
node_pods = snapshot.get("node_pods")
if not isinstance(node_pods, list) or not node_pods:
return {}
return {"node_pods": node_pods}
def _build_node_pods_top(metrics: dict[str, Any]) -> dict[str, Any]:
top = metrics.get("node_pods_top")
if not isinstance(top, list) or not top:
return {}
return {"node_pods_top": top}
def _build_pod_issues(snapshot: dict[str, Any]) -> dict[str, Any]:
pod_issues = snapshot.get("pod_issues")
if not isinstance(pod_issues, dict) or not pod_issues:
return {}
return {"pod_issues": pod_issues}
def _build_workload_health(snapshot: dict[str, Any]) -> dict[str, Any]:
health = snapshot.get("workloads_health")
if not isinstance(health, dict) or not health:
return {}
deployments = health.get("deployments")
statefulsets = health.get("statefulsets")
daemonsets = health.get("daemonsets")
if not isinstance(deployments, dict) or not isinstance(statefulsets, dict) or not isinstance(daemonsets, dict):
return {}
return {
"workloads_health": {
"deployments": deployments,
"statefulsets": statefulsets,
"daemonsets": daemonsets,
}
}
def _build_events(snapshot: dict[str, Any]) -> dict[str, Any]:
events = snapshot.get("events")
if not isinstance(events, dict) or not events:
return {}
return {"events": events}
def _build_event_summary(snapshot: dict[str, Any]) -> dict[str, Any]:
events = snapshot.get("events")
if not isinstance(events, dict) or not events:
return {}
summary = {}
if isinstance(events.get("warnings_top_reason"), dict):
summary["warnings_top_reason"] = events.get("warnings_top_reason")
if events.get("warnings_latest"):
summary["warnings_latest"] = events.get("warnings_latest")
return {"event_summary": summary} if summary else {}
def _build_postgres(metrics: dict[str, Any]) -> dict[str, Any]:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if not postgres:
return {}
return {
"postgres": {
"used": postgres.get("used"),
"max": postgres.get("max"),
"hottest_db": postgres.get("hottest_db"),
"by_db": postgres.get("by_db"),
}
}
def _build_hottest(metrics: dict[str, Any]) -> dict[str, Any]:
node_usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
hottest: dict[str, Any] = {}
for key in ("cpu", "ram", "net", "io", "disk"):
top = _node_usage_top(node_usage.get(key, []))
if top:
hottest[key] = top
if not hottest:
return {}
return {"hottest": hottest}
def _build_pvc(metrics: dict[str, Any]) -> dict[str, Any]:
pvc_usage = metrics.get("pvc_usage_top") if isinstance(metrics.get("pvc_usage_top"), list) else []
if not pvc_usage:
return {}
return {"pvc_usage_top": pvc_usage}
def _build_namespace_capacity(metrics: dict[str, Any]) -> dict[str, Any]:
capacity = metrics.get("namespace_capacity")
if not isinstance(capacity, list) or not capacity:
return {}
return {"namespace_capacity": capacity}
def _build_namespace_capacity_summary(metrics: dict[str, Any]) -> dict[str, Any]:
summary = metrics.get("namespace_capacity_summary")
if not isinstance(summary, dict) or not summary:
return {}
return {"namespace_capacity_summary": summary}
def _build_node_load_summary(metrics: dict[str, Any]) -> dict[str, Any]:
summary = metrics.get("node_load_summary")
if not isinstance(summary, dict) or not summary:
return {}
return {"node_load_summary": summary}
def _build_workloads(snapshot: dict[str, Any]) -> dict[str, Any]:
workloads = snapshot.get("workloads") if isinstance(snapshot.get("workloads"), list) else []
return {"workloads": workloads}
def _build_flux(snapshot: dict[str, Any]) -> dict[str, Any]:
flux = snapshot.get("flux") if isinstance(snapshot.get("flux"), dict) else {}
return {"flux": flux}
def _format_float(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
return f"{numeric:.2f}".rstrip("0").rstrip(".")
def _format_rate_bytes(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
if numeric >= _BYTES_MB:
return f"{numeric / _BYTES_MB:.2f} MB/s"
if numeric >= _BYTES_KB:
return f"{numeric / _BYTES_KB:.2f} KB/s"
return f"{numeric:.2f} B/s"
def _format_bytes(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
if numeric >= _BYTES_GB:
return f"{numeric / _BYTES_GB:.2f} GB"
if numeric >= _BYTES_MB:
return f"{numeric / _BYTES_MB:.2f} MB"
if numeric >= _BYTES_KB:
return f"{numeric / _BYTES_KB:.2f} KB"
return f"{numeric:.2f} B"
def _format_kv_map(values: dict[str, Any]) -> str:
parts = []
for key, value in values.items():
parts.append(f"{key}={value}")
return ", ".join(parts)
def _format_names(names: list[str]) -> str:
if not names:
return ""
return ", ".join(sorted(names))
def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None:
nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {}
if not nodes:
return
workers = {}
if isinstance(summary.get("nodes_summary"), dict):
workers = summary["nodes_summary"].get("workers") or {}
workers_total = workers.get("total")
workers_ready = workers.get("ready")
workers_str = ""
if workers_total is not None and workers_ready is not None:
workers_str = f", workers_ready={workers_ready}/{workers_total}"
lines.append(
"nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format(
total=nodes.get("total"),
ready=nodes.get("ready"),
not_ready=nodes.get("not_ready"),
workers=workers_str,
)
)
if not isinstance(summary.get("nodes_summary"), dict):
return
not_ready_names = summary["nodes_summary"].get("not_ready_names") or []
if not_ready_names:
lines.append("nodes_not_ready: " + _format_names(not_ready_names))
by_arch = summary["nodes_summary"].get("by_arch") or {}
if isinstance(by_arch, dict) and by_arch:
lines.append("archs: " + _format_kv_map(by_arch))
by_role = summary["nodes_summary"].get("by_role") or {}
if isinstance(by_role, dict) and by_role:
lines.append("roles: " + _format_kv_map(by_role))
def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None:
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
if not hardware:
return
parts = []
for key, names in hardware.items():
if not isinstance(names, list):
continue
label = f"{key}={len(names)}"
name_list = _format_names([str(name) for name in names if name])
if name_list:
label = f"{label} ({name_list})"
parts.append(label)
if parts:
lines.append("hardware: " + "; ".join(sorted(parts)))
def _append_node_ages(lines: list[str], summary: dict[str, Any]) -> None:
ages = summary.get("node_ages") if isinstance(summary.get("node_ages"), list) else []
if not ages:
return
parts = []
for entry in ages[:3]:
if not isinstance(entry, dict):
continue
name = entry.get("name")
age = entry.get("age_hours")
if name and isinstance(age, (int, float)):
parts.append(f"{name}={_format_float(age)}h")
if parts:
lines.append("node_age_top: " + "; ".join(parts))
def _append_node_taints(lines: list[str], summary: dict[str, Any]) -> None:
taints = summary.get("node_taints") if isinstance(summary.get("node_taints"), dict) else {}
if not taints:
return
parts = []
for key, names in taints.items():
if not isinstance(names, list):
continue
name_list = _format_names([str(name) for name in names if name])
parts.append(f"{key}={len(names)} ({name_list})" if name_list else f"{key}={len(names)}")
if parts:
lines.append("node_taints: " + "; ".join(sorted(parts)))
def _append_node_facts(lines: list[str], summary: dict[str, Any]) -> None:
def top_counts(label: str, counts: dict[str, int], limit: int = 4) -> None:
if not counts:
return
top = sorted(counts.items(), key=lambda item: (-item[1], item[0]))[:limit]
rendered = "; ".join([f"{name}={count}" for name, count in top])
if rendered:
lines.append(f"{label}: {rendered}")
top_counts("node_arch", summary.get("node_arch_counts") or {})
top_counts("node_os", summary.get("node_os_counts") or {})
top_counts("node_kubelet_versions", summary.get("node_kubelet_versions") or {})
top_counts("node_kernel_versions", summary.get("node_kernel_versions") or {})
top_counts("node_runtime_versions", summary.get("node_runtime_versions") or {})
top_counts("node_roles", summary.get("node_role_counts") or {})
def _append_pressure(lines: list[str], summary: dict[str, Any]) -> None:
pressure = summary.get("pressure_nodes")
if not isinstance(pressure, dict) or not pressure:
return
parts = []
for cond, nodes in sorted(pressure.items()):
if not nodes:
continue
name_list = _format_names([str(name) for name in nodes if name])
parts.append(f"{cond}={len(nodes)} ({name_list})" if name_list else f"{cond}={len(nodes)}")
if parts:
lines.append("node_pressure: " + "; ".join(parts))
def _append_pods(lines: list[str], summary: dict[str, Any]) -> None:
pods = summary.get("pods") if isinstance(summary.get("pods"), dict) else {}
if not pods:
return
lines.append(
"pods: running={running}, pending={pending}, failed={failed}, succeeded={succeeded}".format(
running=pods.get("running"),
pending=pods.get("pending"),
failed=pods.get("failed"),
succeeded=pods.get("succeeded"),
)
)
def _append_capacity(lines: list[str], summary: dict[str, Any]) -> None:
capacity = summary.get("capacity") if isinstance(summary.get("capacity"), dict) else {}
if not capacity:
return
parts = []
if capacity.get("cpu") is not None:
parts.append(f"cpu={_format_float(capacity.get('cpu'))}")
if capacity.get("allocatable_cpu") is not None:
parts.append(f"alloc_cpu={_format_float(capacity.get('allocatable_cpu'))}")
if capacity.get("mem_bytes") is not None:
parts.append(f"mem={_format_bytes(capacity.get('mem_bytes'))}")
if capacity.get("allocatable_mem_bytes") is not None:
parts.append(f"alloc_mem={_format_bytes(capacity.get('allocatable_mem_bytes'))}")
if capacity.get("pods") is not None:
parts.append(f"pods={_format_float(capacity.get('pods'))}")
if capacity.get("allocatable_pods") is not None:
parts.append(f"alloc_pods={_format_float(capacity.get('allocatable_pods'))}")
if parts:
lines.append("capacity: " + "; ".join(parts))
def _append_namespace_pods(lines: list[str], summary: dict[str, Any]) -> None:
namespaces = summary.get("namespace_pods")
if not isinstance(namespaces, list) or not namespaces:
return
top = sorted(
(item for item in namespaces if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("namespace") or ""),
)[:8]
parts = []
for item in top:
name = item.get("namespace")
total = item.get("pods_total")
running = item.get("pods_running")
if not name:
continue
label = f"{name}={total}"
if running is not None:
label = f"{label} (running={running})"
parts.append(label)
if parts:
lines.append("namespaces_top: " + "; ".join(parts))
def _append_namespace_nodes(lines: list[str], summary: dict[str, Any]) -> None:
namespace_nodes = summary.get("namespace_nodes")
if not isinstance(namespace_nodes, list) or not namespace_nodes:
return
top = sorted(
(item for item in namespace_nodes if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("namespace") or ""),
)[:8]
parts = []
for item in top:
namespace = item.get("namespace")
pods_total = item.get("pods_total")
primary = item.get("primary_node")
if namespace:
label = f"{namespace}={pods_total}"
if primary:
label = f"{label} (primary={primary})"
parts.append(label)
if parts:
lines.append("namespace_nodes_top: " + "; ".join(parts))
def _append_node_pods(lines: list[str], summary: dict[str, Any]) -> None:
node_pods = summary.get("node_pods")
if not isinstance(node_pods, list) or not node_pods:
return
top = sorted(
(item for item in node_pods if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("node") or ""),
)[:8]
max_entry = None
for entry in node_pods:
if not isinstance(entry, dict):
continue
pods_total = entry.get("pods_total")
try:
pods_value = int(pods_total)
except (TypeError, ValueError):
continue
if max_entry is None or pods_value > max_entry["pods_total"]:
max_entry = {
"node": entry.get("node"),
"pods_total": pods_value,
"namespaces_top": entry.get("namespaces_top") or [],
}
parts = []
for item in top:
node = item.get("node")
pods_total = item.get("pods_total")
namespaces = item.get("namespaces_top") or []
ns_label = ""
if namespaces:
ns_label = ", ".join([f"{name}={count}" for name, count in namespaces])
if node:
label = f"{node}={pods_total}"
if ns_label:
label = f"{label} ({ns_label})"
parts.append(label)
if parts:
lines.append("node_pods_top: " + "; ".join(parts))
if max_entry and isinstance(max_entry.get("node"), str):
ns_label = ""
namespaces = max_entry.get("namespaces_top") or []
if namespaces:
ns_label = ", ".join([f"{name}={count}" for name, count in namespaces])
label = f"{max_entry.get('node')}={max_entry.get('pods_total')}"
if ns_label:
label = f"{label} ({ns_label})"
lines.append("node_pods_max: " + label)
def _append_pod_issues(lines: list[str], summary: dict[str, Any]) -> None:
pod_issues = summary.get("pod_issues") if isinstance(summary.get("pod_issues"), dict) else {}
if not pod_issues:
return
counts_line = _format_pod_issue_counts(pod_issues)
if counts_line:
lines.append(counts_line)
top_line = _format_pod_issue_top(pod_issues)
if top_line:
lines.append(top_line)
pending_line = _format_pod_pending_oldest(pod_issues)
if pending_line:
lines.append(pending_line)
pending_over_line = _format_pod_pending_over_15m(pod_issues)
if pending_over_line:
lines.append(pending_over_line)
reasons_line = _format_pod_waiting_reasons(pod_issues)
if reasons_line:
lines.append(reasons_line)
def _format_pod_issue_counts(pod_issues: dict[str, Any]) -> str:
counts = pod_issues.get("counts") if isinstance(pod_issues.get("counts"), dict) else {}
if not counts:
return ""
parts = []
for key in ("Failed", "Pending", "Unknown"):
if key in counts:
parts.append(f"{key}={counts.get(key)}")
return "pod_issues: " + "; ".join(parts) if parts else ""
def _format_pod_issue_top(pod_issues: dict[str, Any]) -> str:
items = pod_issues.get("items") if isinstance(pod_issues.get("items"), list) else []
if not items:
return ""
top = []
for item in items[:5]:
if not isinstance(item, dict):
continue
namespace = item.get("namespace")
pod = item.get("pod")
if not namespace or not pod:
continue
phase = item.get("phase") or ""
restarts = item.get("restarts") or 0
top.append(f"{namespace}/{pod}({phase},r={restarts})")
return "pod_issues_top: " + "; ".join(top) if top else ""
def _format_pod_pending_oldest(pod_issues: dict[str, Any]) -> str:
pending = pod_issues.get("pending_oldest") if isinstance(pod_issues.get("pending_oldest"), list) else []
if not pending:
return ""
parts = []
for item in pending[:5]:
if not isinstance(item, dict):
continue
namespace = item.get("namespace")
pod = item.get("pod")
age = item.get("age_hours")
reason = item.get("reason") or ""
if namespace and pod and age is not None:
label = f"{namespace}/{pod}={_format_float(age)}h"
if reason:
label = f"{label} ({reason})"
parts.append(label)
return "pods_pending_oldest: " + "; ".join(parts) if parts else ""
def _format_pod_waiting_reasons(pod_issues: dict[str, Any]) -> str:
reasons = pod_issues.get("waiting_reasons") if isinstance(pod_issues.get("waiting_reasons"), dict) else {}
if not reasons:
return ""
pairs = sorted(reasons.items(), key=lambda item: (-item[1], item[0]))[:5]
return "pod_waiting_reasons: " + "; ".join([f"{key}={val}" for key, val in pairs])
def _format_pod_pending_over_15m(pod_issues: dict[str, Any]) -> str:
count = pod_issues.get("pending_over_15m")
if count is None:
return ""
try:
count_val = int(count)
except (TypeError, ValueError):
return ""
return f"pods_pending_over_15m: {count_val}"
def _append_workload_health(lines: list[str], summary: dict[str, Any]) -> None:
health = summary.get("workloads_health") if isinstance(summary.get("workloads_health"), dict) else {}
if not health:
return
deployments = health.get("deployments") if isinstance(health.get("deployments"), dict) else {}
statefulsets = health.get("statefulsets") if isinstance(health.get("statefulsets"), dict) else {}
daemonsets = health.get("daemonsets") if isinstance(health.get("daemonsets"), dict) else {}
total_not_ready = 0
for entry in (deployments, statefulsets, daemonsets):
total_not_ready += int(entry.get("not_ready") or 0)
lines.append(
"workloads_not_ready: "
f"deployments={deployments.get('not_ready', 0)}, "
f"statefulsets={statefulsets.get('not_ready', 0)}, "
f"daemonsets={daemonsets.get('not_ready', 0)} "
f"(total={total_not_ready})"
)
def _append_node_usage_stats(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
stats = metrics.get("node_usage_stats") if isinstance(metrics.get("node_usage_stats"), dict) else {}
if not stats:
return
parts = []
for key in ("cpu", "ram", "net", "io", "disk"):
entry = stats.get(key) if isinstance(stats.get(key), dict) else {}
avg = entry.get("avg")
if avg is None:
continue
if key in {"net", "io"}:
value = _format_rate_bytes(avg)
else:
value = _format_float(avg)
parts.append(f"{key}={value}")
if parts:
lines.append("node_usage_avg: " + "; ".join(parts))
def _append_events(lines: list[str], summary: dict[str, Any]) -> None:
events = summary.get("events") if isinstance(summary.get("events"), dict) else {}
if not events:
return
total = events.get("warnings_total")
by_reason = events.get("warnings_by_reason") if isinstance(events.get("warnings_by_reason"), dict) else {}
if total is None:
return
if by_reason:
top = sorted(by_reason.items(), key=lambda item: (-item[1], item[0]))[:3]
reasons = "; ".join([f"{reason}={count}" for reason, count in top])
lines.append(f"warnings: total={total}; top={reasons}")
else:
lines.append(f"warnings: total={total}")
def _append_pvc_usage(lines: list[str], summary: dict[str, Any]) -> None:
pvc_usage = summary.get("pvc_usage_top")
if not isinstance(pvc_usage, list) or not pvc_usage:
return
parts = []
for entry in pvc_usage:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
pvc = metric.get("persistentvolumeclaim")
value = entry.get("value")
if namespace and pvc:
parts.append(f"{namespace}/{pvc}={_format_float(value)}%")
if parts:
lines.append("pvc_usage_top: " + "; ".join(parts))
def _append_root_disk_headroom(lines: list[str], summary: dict[str, Any]) -> None:
headroom = summary.get("root_disk_low_headroom")
if not isinstance(headroom, list) or not headroom:
return
parts = []
for entry in headroom:
if not isinstance(entry, dict):
continue
node = entry.get("node")
headroom_pct = entry.get("headroom_pct")
if node and headroom_pct is not None:
parts.append(f"{node}={_format_float(headroom_pct)}%")
if parts:
lines.append("root_disk_low_headroom: " + "; ".join(parts))
def _append_longhorn(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
longhorn = summary.get("longhorn") if isinstance(summary.get("longhorn"), dict) else {}
if not longhorn:
return
total = longhorn.get("total")
attached = longhorn.get("attached_count")
detached = longhorn.get("detached_count")
degraded = longhorn.get("degraded_count")
by_state = longhorn.get("by_state") if isinstance(longhorn.get("by_state"), dict) else {}
by_robust = longhorn.get("by_robustness") if isinstance(longhorn.get("by_robustness"), dict) else {}
if total is not None:
if attached is None and detached is None and degraded is None:
unhealthy = longhorn.get("unhealthy_count")
lines.append(
"longhorn: total={total}, unhealthy={unhealthy}".format(
total=total,
unhealthy=unhealthy if unhealthy is not None else 0,
)
)
else:
lines.append(
"longhorn: total={total}, attached={attached}, detached={detached}, degraded={degraded}".format(
total=total,
attached=attached if attached is not None else 0,
detached=detached if detached is not None else 0,
degraded=degraded if degraded is not None else 0,
)
)
if by_state:
lines.append("longhorn_state: " + _format_kv_map(by_state))
if by_robust:
lines.append("longhorn_robustness: " + _format_kv_map(by_robust))
unhealthy_items = longhorn.get("unhealthy")
if isinstance(unhealthy_items, list) and unhealthy_items:
parts = []
for entry in unhealthy_items[:5]:
if not isinstance(entry, dict):
continue
name = entry.get("name")
state = entry.get("state")
robustness = entry.get("robustness")
if name:
label = name
if state or robustness:
label = f"{label}({state},{robustness})"
parts.append(label)
if parts:
lines.append("longhorn_unhealthy_top: " + "; ".join(parts))
def _append_namespace_usage(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
cpu_top = metrics.get("namespace_cpu_top") if isinstance(metrics.get("namespace_cpu_top"), list) else []
mem_top = metrics.get("namespace_mem_top") if isinstance(metrics.get("namespace_mem_top"), list) else []
if cpu_top:
parts = []
for entry in cpu_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_float(value)}")
if parts:
lines.append("namespace_cpu_top: " + "; ".join(parts))
if mem_top:
parts = []
for entry in mem_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_bytes(value)}")
if parts:
lines.append("namespace_mem_top: " + "; ".join(parts))
def _append_namespace_requests(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
cpu_req = metrics.get("namespace_cpu_requests_top") if isinstance(metrics.get("namespace_cpu_requests_top"), list) else []
mem_req = metrics.get("namespace_mem_requests_top") if isinstance(metrics.get("namespace_mem_requests_top"), list) else []
if cpu_req:
parts = []
for entry in cpu_req:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_float(value)}")
if parts:
lines.append("namespace_cpu_requests_top: " + "; ".join(parts))
if mem_req:
parts = []
for entry in mem_req:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_bytes(value)}")
if parts:
lines.append("namespace_mem_requests_top: " + "; ".join(parts))
def _append_namespace_io_net(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
net_top = metrics.get("namespace_net_top") if isinstance(metrics.get("namespace_net_top"), list) else []
io_top = metrics.get("namespace_io_top") if isinstance(metrics.get("namespace_io_top"), list) else []
if net_top:
parts = []
for entry in net_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_rate_bytes(value)}")
if parts:
lines.append("namespace_net_top: " + "; ".join(parts))
if io_top:
parts = []
for entry in io_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
value = entry.get("value")
if namespace:
parts.append(f"{namespace}={_format_rate_bytes(value)}")
if parts:
lines.append("namespace_io_top: " + "; ".join(parts))
def _append_pod_usage(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901, PLR0912
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
cpu_top = metrics.get("pod_cpu_top") if isinstance(metrics.get("pod_cpu_top"), list) else []
cpu_top_node = (
metrics.get("pod_cpu_top_node")
if isinstance(metrics.get("pod_cpu_top_node"), list)
else []
)
mem_top = metrics.get("pod_mem_top") if isinstance(metrics.get("pod_mem_top"), list) else []
mem_top_node = (
metrics.get("pod_mem_top_node")
if isinstance(metrics.get("pod_mem_top_node"), list)
else []
)
if cpu_top:
parts = []
for entry in cpu_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
value = entry.get("value")
if namespace and pod and value is not None:
parts.append(f"{namespace}/{pod}={_format_float(value)}")
if parts:
lines.append("pod_cpu_top: " + "; ".join(parts))
if cpu_top_node:
parts = []
for entry in cpu_top_node:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
node = metric.get("node")
value = entry.get("value")
if namespace and pod and node and value is not None:
parts.append(f"{node}:{namespace}/{pod}={_format_float(value)}")
if parts:
lines.append("pod_cpu_top_node: " + "; ".join(parts))
if mem_top:
parts = []
for entry in mem_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
value = entry.get("value")
if namespace and pod and value is not None:
parts.append(f"{namespace}/{pod}={_format_bytes(value)}")
if parts:
lines.append("pod_mem_top: " + "; ".join(parts))
if mem_top_node:
parts = []
for entry in mem_top_node:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
node = metric.get("node")
value = entry.get("value")
if namespace and pod and node and value is not None:
parts.append(f"{node}:{namespace}/{pod}={_format_bytes(value)}")
if parts:
lines.append("pod_mem_top_node: " + "; ".join(parts))
def _append_restarts(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
top_restarts = metrics.get("top_restarts_1h") or []
if not isinstance(top_restarts, list) or not top_restarts:
top_restarts = []
parts = []
for entry in top_restarts:
metric = entry.get("metric") if isinstance(entry, dict) else {}
value = entry.get("value") if isinstance(entry, dict) else []
if not isinstance(metric, dict) or not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN:
continue
namespace = metric.get("namespace")
pod = metric.get("pod")
count = _format_float(value[1])
if namespace and pod:
parts.append(f"{namespace}/{pod}={count}")
if parts:
lines.append("restarts_1h_top: " + "; ".join(parts))
else:
lines.append("restarts_1h_top: none")
ns_top = metrics.get("restart_namespace_top") or []
if isinstance(ns_top, list) and ns_top:
ns_parts = []
for entry in ns_top:
metric = entry.get("metric") if isinstance(entry, dict) else {}
value = entry.get("value")
namespace = metric.get("namespace") if isinstance(metric, dict) else None
if namespace and value is not None:
ns_parts.append(f"{namespace}={_format_float(value)}")
if ns_parts:
lines.append("restarts_1h_namespace_top: " + "; ".join(ns_parts))
else:
lines.append("restarts_1h_namespace_top: none")
def _append_job_failures(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
failures = metrics.get("job_failures_24h") if isinstance(metrics.get("job_failures_24h"), list) else []
if not failures:
return
parts = []
for entry in failures:
metric = entry.get("metric") if isinstance(entry, dict) else {}
namespace = metric.get("namespace")
job_name = metric.get("job_name") or metric.get("job")
value = entry.get("value")
if namespace and job_name and value is not None:
parts.append(f"{namespace}/{job_name}={_format_float(value)}")
if parts:
lines.append("job_failures_24h: " + "; ".join(parts))
def _append_jobs(lines: list[str], summary: dict[str, Any]) -> None:
jobs = summary.get("jobs") if isinstance(summary.get("jobs"), dict) else {}
if not jobs:
return
totals_line = _format_jobs_totals(jobs)
if totals_line:
lines.append(totals_line)
failing_line = _format_jobs_failing(jobs)
if failing_line:
lines.append(failing_line)
active_line = _format_jobs_active_oldest(jobs)
if active_line:
lines.append(active_line)
def _format_jobs_totals(jobs: dict[str, Any]) -> str:
totals = jobs.get("totals") if isinstance(jobs.get("totals"), dict) else {}
if not totals:
return ""
return "jobs: total={total}, active={active}, failed={failed}, succeeded={succeeded}".format(
total=totals.get("total"),
active=totals.get("active"),
failed=totals.get("failed"),
succeeded=totals.get("succeeded"),
)
def _format_jobs_failing(jobs: dict[str, Any]) -> str:
failing = jobs.get("failing") if isinstance(jobs.get("failing"), list) else []
if not failing:
return ""
parts = []
for item in failing[:5]:
if not isinstance(item, dict):
continue
namespace = item.get("namespace")
name = item.get("job")
failed = item.get("failed")
age = item.get("age_hours")
if namespace and name and failed is not None:
label = f"{namespace}/{name}={failed}"
if age is not None:
label = f"{label} ({_format_float(age)}h)"
parts.append(label)
return "jobs_failing_top: " + "; ".join(parts) if parts else ""
def _format_jobs_active_oldest(jobs: dict[str, Any]) -> str:
active_oldest = jobs.get("active_oldest") if isinstance(jobs.get("active_oldest"), list) else []
if not active_oldest:
return ""
parts = []
for item in active_oldest[:5]:
if not isinstance(item, dict):
continue
namespace = item.get("namespace")
name = item.get("job")
age = item.get("age_hours")
if namespace and name and age is not None:
parts.append(f"{namespace}/{name}={_format_float(age)}h")
return "jobs_active_oldest: " + "; ".join(parts) if parts else ""
def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None:
postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {}
if not postgres:
return
hottest = postgres.get("hottest_db") or ""
lines.append(
"postgres: used={used}, max={max}, hottest_db={hottest}".format(
used=postgres.get("used"),
max=postgres.get("max"),
hottest=hottest,
)
)
by_db = postgres.get("by_db")
if isinstance(by_db, list) and by_db:
parts = []
for entry in by_db:
metric = entry.get("metric") if isinstance(entry, dict) else {}
value = entry.get("value")
if isinstance(value, list) and len(value) >= _VALUE_PAIR_LEN:
value = value[1]
name = metric.get("datname") if isinstance(metric, dict) else None
if name and value is not None:
parts.append(f"{name}={_format_float(value)}")
if parts:
lines.append("postgres_connections_by_db: " + "; ".join(parts))
def _append_hottest(lines: list[str], summary: dict[str, Any]) -> None:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
if not hottest:
return
hardware_map = summary.get("hardware_by_node")
if not isinstance(hardware_map, dict):
hardware_map = {}
parts = []
for key, entry in hottest.items():
if not isinstance(entry, dict):
continue
node = entry.get("node")
hardware = hardware_map.get(node) if node else None
if key in {"net", "io"}:
value = _format_rate_bytes(entry.get("value"))
else:
value = _format_float(entry.get("value"))
if node:
label = node
if hardware:
label = f"{label} [{hardware}]"
parts.append(f"{key}={label} ({value})")
if parts:
lines.append("hottest: " + "; ".join(parts))
def _append_workloads(lines: list[str], summary: dict[str, Any]) -> None:
workloads = summary.get("workloads")
if not isinstance(workloads, list) or not workloads:
return
lines.append(f"workloads: total={len(workloads)}")
top_workloads = sorted(
(item for item in workloads if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or ""),
)[:5]
if not top_workloads:
return
parts = []
for item in top_workloads:
namespace = item.get("namespace")
name = item.get("workload")
pods_total = item.get("pods_total")
primary = item.get("primary_node")
if namespace and name:
label = f"{namespace}/{name}={pods_total}"
if primary:
label = f"{label} (primary={primary})"
parts.append(label)
if parts:
lines.append("workloads_top: " + "; ".join(parts))
def _append_topology(lines: list[str], summary: dict[str, Any]) -> None:
topology = summary.get("topology") if isinstance(summary.get("topology"), dict) else {}
if not topology:
return
nodes = topology.get("nodes") if isinstance(topology.get("nodes"), list) else []
workloads = topology.get("workloads") if isinstance(topology.get("workloads"), list) else []
if nodes:
parts = []
for entry in nodes[:5]:
if not isinstance(entry, dict):
continue
node = entry.get("node")
top = entry.get("workloads_top") if isinstance(entry.get("workloads_top"), list) else []
if not node or not top:
continue
items = ", ".join([f"{name}({count})" for name, count in top if name and count is not None])
if items:
parts.append(f"{node}={items}")
if parts:
lines.append("node_workloads_top: " + "; ".join(parts))
if workloads:
parts = []
for entry in workloads[:5]:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
name = entry.get("workload")
nodes_top = entry.get("nodes_top") if isinstance(entry.get("nodes_top"), list) else []
if not namespace or not name:
continue
nodes_label = ", ".join([f"{node}:{count}" for node, count in nodes_top if node])
label = f"{namespace}/{name}"
if nodes_label:
label = f"{label} [{nodes_label}]"
parts.append(label)
if parts:
lines.append("workload_nodes_top: " + "; ".join(parts))
def _append_flux(lines: list[str], summary: dict[str, Any]) -> None:
flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {}
if not flux:
return
not_ready = flux.get("not_ready")
if not_ready is not None:
lines.append(f"flux_not_ready: {not_ready}")
items = flux.get("items")
if isinstance(items, list) and items:
parts = []
for item in items[:10]:
if not isinstance(item, dict):
continue
name = item.get("name") or ""
namespace = item.get("namespace") or ""
reason = item.get("reason") or ""
suspended = item.get("suspended")
label = f"{namespace}/{name}".strip("/")
if reason:
label = f"{label} ({reason})"
if suspended:
label = f"{label} [suspended]"
if label:
parts.append(label)
if parts:
lines.append("flux_not_ready_items: " + "; ".join(parts))
def _append_signals(lines: list[str], summary: dict[str, Any]) -> None:
signals = summary.get("signals") if isinstance(summary.get("signals"), list) else []
if not signals:
return
lines.append("signals:")
for entry in signals[:8]:
if not isinstance(entry, dict):
continue
scope = entry.get("scope") or ""
target = entry.get("target") or ""
metric = entry.get("metric") or ""
current = entry.get("current")
delta = entry.get("delta_pct")
severity = entry.get("severity") or ""
detail = f"{scope}:{target} {metric}={current}"
if delta is not None:
detail += f" delta={delta}%"
if severity:
detail += f" severity={severity}"
lines.append(f"- {detail}")
def _append_profiles(lines: list[str], summary: dict[str, Any]) -> None:
profiles = summary.get("profiles") if isinstance(summary.get("profiles"), dict) else {}
if not profiles:
return
nodes = profiles.get("nodes") if isinstance(profiles.get("nodes"), list) else []
namespaces = profiles.get("namespaces") if isinstance(profiles.get("namespaces"), list) else []
workloads = profiles.get("workloads") if isinstance(profiles.get("workloads"), list) else []
if nodes:
lines.append("node_profiles:")
for entry in nodes[:3]:
if not isinstance(entry, dict):
continue
lines.append(
f"- {entry.get('node')}: load={entry.get('load_index')} cpu={entry.get('cpu')} ram={entry.get('ram')} "
f"pods={entry.get('pods_total')} hw={entry.get('hardware')}"
)
if namespaces:
lines.append("namespace_profiles:")
for entry in namespaces[:3]:
if not isinstance(entry, dict):
continue
lines.append(
f"- {entry.get('namespace')}: pods={entry.get('pods_total')} cpu={entry.get('cpu_usage')} "
f"mem={entry.get('mem_usage')} primary={entry.get('primary_node')}"
)
if workloads:
lines.append("workload_profiles:")
for entry in workloads[:3]:
if not isinstance(entry, dict):
continue
lines.append(
f"- {entry.get('namespace')}/{entry.get('workload')}: pods={entry.get('pods_total')} "
f"running={entry.get('pods_running')} node={entry.get('primary_node')}"
)
def _append_units_windows(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
units = metrics.get("units") if isinstance(metrics.get("units"), dict) else {}
windows = metrics.get("windows") if isinstance(metrics.get("windows"), dict) else {}
if units:
lines.append("units: " + _format_kv_map(units))
else:
lines.append("units: cpu_pct, ram_pct, net=bytes_per_sec, io=bytes_per_sec")
if windows:
lines.append("windows: " + _format_kv_map(windows))
else:
lines.append("windows: rates=5m, restarts=1h")
def _append_node_load_summary(lines: list[str], summary: dict[str, Any]) -> None:
node_load = summary.get("node_load_summary")
if not isinstance(node_load, dict) or not node_load:
return
hardware_by_node = summary.get("hardware_by_node")
hardware_by_node = hardware_by_node if isinstance(hardware_by_node, dict) else {}
top = node_load.get("top")
if isinstance(top, list) and top:
parts = []
for entry in top[:5]:
if not isinstance(entry, dict):
continue
node = entry.get("node") or ""
load = entry.get("load_index")
cpu = entry.get("cpu")
ram = entry.get("ram")
io = entry.get("io")
net = entry.get("net")
pods_total = entry.get("pods_total")
label = f"{node} idx={_format_float(load)}"
if node and node in hardware_by_node:
label += f" hw={hardware_by_node.get(node)}"
if isinstance(pods_total, (int, float)):
label += f" pods={int(pods_total)}"
label += f" cpu={_format_float(cpu)} ram={_format_float(ram)}"
label += f" io={_format_rate_bytes(io)} net={_format_rate_bytes(net)}"
parts.append(label)
if parts:
lines.append("node_load_top: " + "; ".join(parts))
outliers = node_load.get("outliers")
if isinstance(outliers, list) and outliers:
names = [entry.get("node") for entry in outliers if isinstance(entry, dict)]
names = [name for name in names if isinstance(name, str) and name]
if names:
lines.append("node_load_outliers: " + _format_names(names))
def _append_hardware_usage(lines: list[str], summary: dict[str, Any]) -> None:
usage = summary.get("hardware_usage_avg")
if not isinstance(usage, list) or not usage:
return
parts = []
for entry in usage[:5]:
if not isinstance(entry, dict):
continue
hardware = entry.get("hardware")
load = entry.get("load_index")
cpu = entry.get("cpu")
ram = entry.get("ram")
io = entry.get("io")
net = entry.get("net")
if not hardware:
continue
label = f"{hardware} idx={_format_float(load)}"
label += f" cpu={_format_float(cpu)} ram={_format_float(ram)}"
label += f" io={_format_rate_bytes(io)} net={_format_rate_bytes(net)}"
parts.append(label)
if parts:
lines.append("hardware_usage_avg: " + "; ".join(parts))
def _append_cluster_watchlist(lines: list[str], summary: dict[str, Any]) -> None:
watchlist = summary.get("cluster_watchlist")
if not isinstance(watchlist, list) or not watchlist:
return
lines.append("cluster_watchlist: " + "; ".join(watchlist))
def _build_cluster_watchlist(summary: dict[str, Any]) -> dict[str, Any]:
items: list[str] = []
nodes_summary = summary.get("nodes_summary") if isinstance(summary.get("nodes_summary"), dict) else {}
not_ready = int(nodes_summary.get("not_ready") or 0)
if not_ready > 0:
items.append(f"not_ready_nodes={not_ready}")
pressure = summary.get("pressure_nodes") if isinstance(summary.get("pressure_nodes"), dict) else {}
pressure_nodes = pressure.get("names") if isinstance(pressure.get("names"), list) else []
if pressure_nodes:
items.append(f"pressure_nodes={len(pressure_nodes)}")
pod_issues = summary.get("pod_issues") if isinstance(summary.get("pod_issues"), dict) else {}
pending_over = int(pod_issues.get("pending_over_15m") or 0)
if pending_over > 0:
items.append(f"pods_pending_over_15m={pending_over}")
workloads = summary.get("workloads_health") if isinstance(summary.get("workloads_health"), dict) else {}
deployments = workloads.get("deployments") if isinstance(workloads.get("deployments"), dict) else {}
statefulsets = workloads.get("statefulsets") if isinstance(workloads.get("statefulsets"), dict) else {}
daemonsets = workloads.get("daemonsets") if isinstance(workloads.get("daemonsets"), dict) else {}
total_not_ready = int(deployments.get("not_ready") or 0) + int(statefulsets.get("not_ready") or 0) + int(daemonsets.get("not_ready") or 0)
if total_not_ready > 0:
items.append(f"workloads_not_ready={total_not_ready}")
flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {}
flux_not_ready = int(flux.get("not_ready") or 0)
if flux_not_ready > 0:
items.append(f"flux_not_ready={flux_not_ready}")
pvc_usage = summary.get("pvc_usage_top") if isinstance(summary.get("pvc_usage_top"), list) else []
high_pvc = [entry for entry in pvc_usage if isinstance(entry, dict) and (entry.get("value") or 0) >= 90]
if high_pvc:
items.append("pvc_usage>=90%")
return {"cluster_watchlist": items} if items else {}
def _capacity_ratio_parts(entries: list[dict[str, Any]], ratio_key: str, usage_key: str, req_key: str) -> list[str]:
parts: list[str] = []
for entry in entries[:5]:
if not isinstance(entry, dict):
continue
ns = entry.get("namespace") or ""
ratio = entry.get(ratio_key)
usage = entry.get(usage_key)
req = entry.get(req_key)
if ns:
parts.append(
f"{ns}={_format_float(ratio)} (usage={_format_float(usage)} req={_format_float(req)})"
)
return parts
def _capacity_headroom_parts(entries: list[dict[str, Any]]) -> list[str]:
parts: list[str] = []
for entry in entries[:5]:
if not isinstance(entry, dict):
continue
ns = entry.get("namespace") or ""
headroom = entry.get("headroom")
if ns:
parts.append(f"{ns}={_format_float(headroom)}")
return parts
def _append_namespace_capacity_summary(lines: list[str], summary: dict[str, Any]) -> None: # noqa: C901
cap = summary.get("namespace_capacity_summary")
if not isinstance(cap, dict) or not cap:
return
cpu_ratio = cap.get("cpu_ratio_top")
if isinstance(cpu_ratio, list):
parts = _capacity_ratio_parts(cpu_ratio, "cpu_usage_ratio", "cpu_usage", "cpu_requests")
if parts:
lines.append("namespace_cpu_ratio_top: " + "; ".join(parts))
mem_ratio = cap.get("mem_ratio_top")
if isinstance(mem_ratio, list):
parts = _capacity_ratio_parts(mem_ratio, "mem_usage_ratio", "mem_usage", "mem_requests")
if parts:
lines.append("namespace_mem_ratio_top: " + "; ".join(parts))
cpu_headroom = cap.get("cpu_headroom_low")
if isinstance(cpu_headroom, list):
parts = _capacity_headroom_parts(cpu_headroom)
if parts:
lines.append("namespace_cpu_headroom_low: " + "; ".join(parts))
mem_headroom = cap.get("mem_headroom_low")
if isinstance(mem_headroom, list):
parts = _capacity_headroom_parts(mem_headroom)
if parts:
lines.append("namespace_mem_headroom_low: " + "; ".join(parts))
cpu_over = cap.get("cpu_overcommitted")
mem_over = cap.get("mem_overcommitted")
if cpu_over is not None or mem_over is not None:
lines.append(f"namespace_overcommitted: cpu={cpu_over} mem={mem_over}")
cpu_over_names = cap.get("cpu_overcommitted_names")
if isinstance(cpu_over_names, list) and cpu_over_names:
names = [name for name in cpu_over_names if isinstance(name, str) and name]
if names:
lines.append("namespace_cpu_overcommitted_names: " + _format_names(names))
mem_over_names = cap.get("mem_overcommitted_names")
if isinstance(mem_over_names, list) and mem_over_names:
names = [name for name in mem_over_names if isinstance(name, str) and name]
if names:
lines.append("namespace_mem_overcommitted_names: " + _format_names(names))
def _append_workloads_by_namespace(lines: list[str], summary: dict[str, Any]) -> None:
workloads = summary.get("workloads")
if not isinstance(workloads, list) or not workloads:
return
by_ns: dict[str, list[dict[str, Any]]] = {}
for item in workloads:
if not isinstance(item, dict):
continue
ns = item.get("namespace") or ""
name = item.get("workload") or ""
if not ns or not name:
continue
by_ns.setdefault(ns, []).append(item)
for ns, items in sorted(by_ns.items()):
items.sort(
key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or "")
)
parts = []
for entry in items[:2]:
name = entry.get("workload") or ""
pods = entry.get("pods_total")
primary = entry.get("primary_node")
label = f"{name}({pods})" if pods is not None else name
if primary:
label = f"{label}@{primary}"
if label:
parts.append(label)
if parts:
lines.append(f"workloads_top_{ns}: " + "; ".join(parts))
def summary_text(snapshot: dict[str, Any] | None) -> str:
summary = build_summary(snapshot)
if not summary:
return ""
lines: list[str] = []
lines.append("atlas_cluster: Titan Lab Atlas Kubernetes cluster (internal).")
collected_at = snapshot.get("collected_at") if isinstance(snapshot, dict) else None
snapshot_version = snapshot.get("snapshot_version") if isinstance(snapshot, dict) else None
if collected_at or snapshot_version:
bits = []
if collected_at:
bits.append(f"collected_at={collected_at}")
if snapshot_version:
bits.append(f"version={snapshot_version}")
lines.append("snapshot: " + ", ".join(bits))
_append_nodes(lines, summary)
_append_pressure(lines, summary)
_append_hardware(lines, summary)
_append_node_facts(lines, summary)
_append_node_ages(lines, summary)
_append_node_taints(lines, summary)
_append_capacity(lines, summary)
_append_pods(lines, summary)
_append_namespace_pods(lines, summary)
_append_namespace_nodes(lines, summary)
_append_node_pods(lines, summary)
_append_pod_issues(lines, summary)
_append_workload_health(lines, summary)
_append_events(lines, summary)
_append_node_usage_stats(lines, summary)
_append_namespace_usage(lines, summary)
_append_namespace_requests(lines, summary)
_append_namespace_io_net(lines, summary)
_append_pod_usage(lines, summary)
_append_restarts(lines, summary)
_append_job_failures(lines, summary)
_append_jobs(lines, summary)
_append_postgres(lines, summary)
_append_hottest(lines, summary)
_append_pvc_usage(lines, summary)
_append_root_disk_headroom(lines, summary)
_append_namespace_capacity_summary(lines, summary)
_append_longhorn(lines, summary)
_append_workloads(lines, summary)
_append_topology(lines, summary)
_append_workloads_by_namespace(lines, summary)
_append_node_load_summary(lines, summary)
_append_cluster_watchlist(lines, summary)
_append_hardware_usage(lines, summary)
_append_flux(lines, summary)
_append_signals(lines, summary)
_append_profiles(lines, summary)
_append_units_windows(lines, summary)
return "\n".join(lines)