413 lines
14 KiB
Python

import logging
import time
from typing import Any
import httpx
from atlasbot.config import Settings
log = logging.getLogger(__name__)
_BYTES_KB = 1024
_BYTES_MB = 1024 * 1024
_VALUE_PAIR_LEN = 2
class SnapshotProvider:
def __init__(self, settings: Settings) -> None:
self._settings = settings
self._cache: dict[str, Any] = {}
self._cache_ts = 0.0
def _cache_valid(self) -> bool:
return time.monotonic() - self._cache_ts < max(5, self._settings.snapshot_ttl_sec)
def get(self) -> dict[str, Any] | None:
if self._cache and self._cache_valid():
return self._cache
if not self._settings.ariadne_state_url:
return self._cache or None
headers = {}
if self._settings.ariadne_state_token:
headers["x-internal-token"] = self._settings.ariadne_state_token
try:
resp = httpx.get(self._settings.ariadne_state_url, headers=headers, timeout=10.0)
resp.raise_for_status()
payload = resp.json()
if isinstance(payload, dict):
self._cache = payload
self._cache_ts = time.monotonic()
return payload
except Exception as exc:
log.warning("snapshot fetch failed", extra={"extra": {"error": str(exc)}})
return self._cache or None
def _node_usage_top(series: list[dict[str, Any]]) -> dict[str, Any] | None:
best = None
for entry in series or []:
if not isinstance(entry, dict):
continue
node = entry.get("node")
value = entry.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best is None or numeric > best["value"]:
best = {"node": node, "value": numeric}
return best
def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
nodes_detail = _nodes_detail(snapshot)
metrics = _metrics(snapshot)
summary: dict[str, Any] = {}
if isinstance(snapshot.get("nodes_summary"), dict):
summary["nodes_summary"] = snapshot.get("nodes_summary")
if metrics:
summary["metrics"] = metrics
summary.update(_build_nodes(snapshot))
summary.update(_build_hardware(nodes_detail))
summary.update(_build_pods(metrics))
summary.update(_build_namespace_pods(snapshot))
summary.update(_build_postgres(metrics))
summary.update(_build_hottest(metrics))
summary.update(_build_workloads(snapshot))
summary.update(_build_flux(snapshot))
return summary
def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
items = snapshot.get("nodes_detail")
return items if isinstance(items, list) else []
def _metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _build_nodes(snapshot: dict[str, Any]) -> dict[str, Any]:
nodes_summary = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {}
if not nodes_summary:
return {}
return {
"nodes": {
"total": nodes_summary.get("total"),
"ready": nodes_summary.get("ready"),
"not_ready": nodes_summary.get("not_ready"),
}
}
def _build_hardware(nodes_detail: list[dict[str, Any]]) -> dict[str, Any]:
hardware: dict[str, list[str]] = {}
for node in nodes_detail or []:
if not isinstance(node, dict):
continue
name = node.get("name")
hardware_class = node.get("hardware") or "unknown"
if name:
hardware.setdefault(hardware_class, []).append(name)
if not hardware:
return {}
return {"hardware": {key: sorted(value) for key, value in hardware.items()}}
def _build_pods(metrics: dict[str, Any]) -> dict[str, Any]:
pods = {
"running": metrics.get("pods_running"),
"pending": metrics.get("pods_pending"),
"failed": metrics.get("pods_failed"),
"succeeded": metrics.get("pods_succeeded"),
}
if not any(value is not None for value in pods.values()):
return {}
return {"pods": pods}
def _build_namespace_pods(snapshot: dict[str, Any]) -> dict[str, Any]:
namespaces = snapshot.get("namespace_pods")
if not isinstance(namespaces, list) or not namespaces:
return {}
return {"namespace_pods": namespaces}
def _build_postgres(metrics: dict[str, Any]) -> dict[str, Any]:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if not postgres:
return {}
return {
"postgres": {
"used": postgres.get("used"),
"max": postgres.get("max"),
"hottest_db": postgres.get("hottest_db"),
}
}
def _build_hottest(metrics: dict[str, Any]) -> dict[str, Any]:
node_usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
hottest: dict[str, Any] = {}
for key in ("cpu", "ram", "net", "io"):
top = _node_usage_top(node_usage.get(key, []))
if top:
hottest[key] = top
if not hottest:
return {}
return {"hottest": hottest}
def _build_workloads(snapshot: dict[str, Any]) -> dict[str, Any]:
workloads = snapshot.get("workloads") if isinstance(snapshot.get("workloads"), list) else []
return {"workloads": workloads}
def _build_flux(snapshot: dict[str, Any]) -> dict[str, Any]:
flux = snapshot.get("flux") if isinstance(snapshot.get("flux"), dict) else {}
return {"flux": flux}
def _format_float(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
return f"{numeric:.2f}".rstrip("0").rstrip(".")
def _format_rate_bytes(value: Any) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
if numeric >= _BYTES_MB:
return f"{numeric / _BYTES_MB:.2f} MB/s"
if numeric >= _BYTES_KB:
return f"{numeric / _BYTES_KB:.2f} KB/s"
return f"{numeric:.2f} B/s"
def _format_kv_map(values: dict[str, Any]) -> str:
parts = []
for key, value in values.items():
parts.append(f"{key}={value}")
return ", ".join(parts)
def _format_names(names: list[str]) -> str:
if not names:
return ""
return ", ".join(sorted(names))
def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None:
nodes = summary.get("nodes") if isinstance(summary.get("nodes"), dict) else {}
if not nodes:
return
workers = {}
if isinstance(summary.get("nodes_summary"), dict):
workers = summary["nodes_summary"].get("workers") or {}
workers_total = workers.get("total")
workers_ready = workers.get("ready")
workers_str = ""
if workers_total is not None and workers_ready is not None:
workers_str = f", workers_ready={workers_ready}/{workers_total}"
lines.append(
"nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format(
total=nodes.get("total"),
ready=nodes.get("ready"),
not_ready=nodes.get("not_ready"),
workers=workers_str,
)
)
if not isinstance(summary.get("nodes_summary"), dict):
return
not_ready_names = summary["nodes_summary"].get("not_ready_names") or []
if not_ready_names:
lines.append("nodes_not_ready: " + _format_names(not_ready_names))
by_arch = summary["nodes_summary"].get("by_arch") or {}
if isinstance(by_arch, dict) and by_arch:
lines.append("archs: " + _format_kv_map(by_arch))
by_role = summary["nodes_summary"].get("by_role") or {}
if isinstance(by_role, dict) and by_role:
lines.append("roles: " + _format_kv_map(by_role))
def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None:
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
if not hardware:
return
parts = []
for key, names in hardware.items():
if not isinstance(names, list):
continue
label = f"{key}={len(names)}"
name_list = _format_names([str(name) for name in names if name])
if name_list:
label = f"{label} ({name_list})"
parts.append(label)
if parts:
lines.append("hardware: " + "; ".join(sorted(parts)))
def _append_pods(lines: list[str], summary: dict[str, Any]) -> None:
pods = summary.get("pods") if isinstance(summary.get("pods"), dict) else {}
if not pods:
return
lines.append(
"pods: running={running}, pending={pending}, failed={failed}, succeeded={succeeded}".format(
running=pods.get("running"),
pending=pods.get("pending"),
failed=pods.get("failed"),
succeeded=pods.get("succeeded"),
)
)
def _append_namespace_pods(lines: list[str], summary: dict[str, Any]) -> None:
namespaces = summary.get("namespace_pods")
if not isinstance(namespaces, list) or not namespaces:
return
top = sorted(
(item for item in namespaces if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("namespace") or ""),
)[:8]
parts = []
for item in top:
name = item.get("namespace")
total = item.get("pods_total")
running = item.get("pods_running")
if not name:
continue
label = f"{name}={total}"
if running is not None:
label = f"{label} (running={running})"
parts.append(label)
if parts:
lines.append("namespaces_top: " + "; ".join(parts))
def _append_restarts(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
top_restarts = metrics.get("top_restarts_1h") or []
if not isinstance(top_restarts, list) or not top_restarts:
return
parts = []
for entry in top_restarts:
metric = entry.get("metric") if isinstance(entry, dict) else {}
value = entry.get("value") if isinstance(entry, dict) else []
if not isinstance(metric, dict) or not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN:
continue
namespace = metric.get("namespace")
pod = metric.get("pod")
count = _format_float(value[1])
if namespace and pod:
parts.append(f"{namespace}/{pod}={count}")
if parts:
lines.append("restarts_1h_top: " + "; ".join(parts))
def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None:
postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {}
if not postgres:
return
hottest = postgres.get("hottest_db") or ""
lines.append(
"postgres: used={used}, max={max}, hottest_db={hottest}".format(
used=postgres.get("used"),
max=postgres.get("max"),
hottest=hottest,
)
)
def _append_hottest(lines: list[str], summary: dict[str, Any]) -> None:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
if not hottest:
return
parts = []
for key, entry in hottest.items():
if not isinstance(entry, dict):
continue
node = entry.get("node")
if key in {"net", "io"}:
value = _format_rate_bytes(entry.get("value"))
else:
value = _format_float(entry.get("value"))
if node:
parts.append(f"{key}={node} ({value})")
if parts:
lines.append("hottest: " + "; ".join(parts))
def _append_workloads(lines: list[str], summary: dict[str, Any]) -> None:
workloads = summary.get("workloads")
if not isinstance(workloads, list) or not workloads:
return
lines.append(f"workloads: total={len(workloads)}")
top_workloads = sorted(
(item for item in workloads if isinstance(item, dict)),
key=lambda item: (-int(item.get("pods_total") or 0), item.get("workload") or ""),
)[:5]
if not top_workloads:
return
parts = []
for item in top_workloads:
namespace = item.get("namespace")
name = item.get("workload")
pods_total = item.get("pods_total")
primary = item.get("primary_node")
if namespace and name:
label = f"{namespace}/{name}={pods_total}"
if primary:
label = f"{label} (primary={primary})"
parts.append(label)
if parts:
lines.append("workloads_top: " + "; ".join(parts))
def _append_flux(lines: list[str], summary: dict[str, Any]) -> None:
flux = summary.get("flux") if isinstance(summary.get("flux"), dict) else {}
if not flux:
return
not_ready = flux.get("not_ready")
if not_ready is not None:
lines.append(f"flux_not_ready: {not_ready}")
def _append_units_windows(lines: list[str], summary: dict[str, Any]) -> None:
metrics = summary.get("metrics") if isinstance(summary.get("metrics"), dict) else {}
units = metrics.get("units") if isinstance(metrics.get("units"), dict) else {}
windows = metrics.get("windows") if isinstance(metrics.get("windows"), dict) else {}
if units:
lines.append("units: " + _format_kv_map(units))
else:
lines.append("units: cpu_pct, ram_pct, net=bytes_per_sec, io=bytes_per_sec")
if windows:
lines.append("windows: " + _format_kv_map(windows))
else:
lines.append("windows: rates=5m, restarts=1h")
def summary_text(snapshot: dict[str, Any] | None) -> str:
summary = build_summary(snapshot)
if not summary:
return ""
lines: list[str] = []
_append_nodes(lines, summary)
_append_hardware(lines, summary)
_append_pods(lines, summary)
_append_namespace_pods(lines, summary)
_append_restarts(lines, summary)
_append_postgres(lines, summary)
_append_hottest(lines, summary)
_append_workloads(lines, summary)
_append_flux(lines, summary)
_append_units_windows(lines, summary)
return "\n".join(lines)