1870 lines
70 KiB
Python
1870 lines
70 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from ..db.storage import Storage
|
|
from ..k8s.client import get_json
|
|
from ..metrics.metrics import set_cluster_state_metrics
|
|
from ..settings import settings
|
|
from ..utils.logging import get_logger
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
_VALUE_PAIR_LEN = 2
|
|
_RATE_WINDOW = "5m"
|
|
_RESTARTS_WINDOW = "1h"
|
|
_NODE_UNAME_LABEL = 'node_uname_info{nodename!=""}'
|
|
_WORKLOAD_LABEL_KEYS = (
|
|
"app.kubernetes.io/name",
|
|
"app",
|
|
"k8s-app",
|
|
"app.kubernetes.io/instance",
|
|
"release",
|
|
)
|
|
_SYSTEM_NAMESPACES = {
|
|
"kube-system",
|
|
"kube-public",
|
|
"kube-node-lease",
|
|
"flux-system",
|
|
"monitoring",
|
|
"logging",
|
|
"traefik",
|
|
"cert-manager",
|
|
"maintenance",
|
|
"postgres",
|
|
"vault",
|
|
}
|
|
_WORKLOAD_ALLOWED_NAMESPACES = {
|
|
"maintenance",
|
|
}
|
|
_CAPACITY_KEYS = {
|
|
"cpu",
|
|
"memory",
|
|
"pods",
|
|
"ephemeral-storage",
|
|
}
|
|
_PRESSURE_TYPES = {
|
|
"MemoryPressure",
|
|
"DiskPressure",
|
|
"PIDPressure",
|
|
"NetworkUnavailable",
|
|
}
|
|
_EVENTS_MAX = 20
|
|
_EVENT_WARNING = "Warning"
|
|
_PHASE_SEVERITY = {
|
|
"Failed": 3,
|
|
"Pending": 2,
|
|
"Unknown": 1,
|
|
}
|
|
_PENDING_15M_HOURS = 0.25
|
|
_LOAD_TOP_COUNT = 5
|
|
_NAMESPACE_TOP_COUNT = 5
|
|
|
|
|
|
def _node_usage_by_hardware(node_load: list[dict[str, Any]], node_details: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
if not node_load or not node_details:
|
|
return []
|
|
hardware_by_node = _hardware_map(node_details)
|
|
buckets: dict[str, dict[str, list[float]]] = {}
|
|
for entry in node_load:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
node = entry.get("node")
|
|
if not isinstance(node, str) or not node:
|
|
continue
|
|
hardware = hardware_by_node.get(node, "unknown")
|
|
_append_hardware_usage(buckets, str(hardware), entry)
|
|
return _finalize_hardware_usage(buckets)
|
|
|
|
|
|
def _hardware_map(node_details: list[dict[str, Any]]) -> dict[str, str]:
|
|
mapping: dict[str, str] = {}
|
|
for node in node_details:
|
|
if not isinstance(node, dict):
|
|
continue
|
|
name = node.get("name")
|
|
if isinstance(name, str) and name:
|
|
mapping[name] = str(node.get("hardware") or "unknown")
|
|
return mapping
|
|
|
|
|
|
def _append_hardware_usage(buckets: dict[str, dict[str, list[float]]], hardware: str, entry: dict[str, Any]) -> None:
|
|
bucket = buckets.setdefault(hardware, {"load_index": [], "cpu": [], "ram": [], "net": [], "io": []})
|
|
for key in ("load_index", "cpu", "ram", "net", "io"):
|
|
value = entry.get(key)
|
|
if isinstance(value, (int, float)):
|
|
bucket[key].append(float(value))
|
|
|
|
|
|
def _finalize_hardware_usage(buckets: dict[str, dict[str, list[float]]]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for hardware, metrics in buckets.items():
|
|
row: dict[str, Any] = {"hardware": hardware}
|
|
for key, values in metrics.items():
|
|
if values:
|
|
row[key] = sum(values) / len(values)
|
|
output.append(row)
|
|
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("hardware") or ""))
|
|
return output
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ClusterStateSummary:
|
|
nodes_total: int | None
|
|
nodes_ready: int | None
|
|
pods_running: int | None
|
|
kustomizations_not_ready: int | None
|
|
errors: int
|
|
|
|
|
|
def _items(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
items = payload.get("items") if isinstance(payload.get("items"), list) else []
|
|
return [item for item in items if isinstance(item, dict)]
|
|
|
|
|
|
def _node_ready(conditions: Any) -> bool:
|
|
if not isinstance(conditions, list):
|
|
return False
|
|
for condition in conditions:
|
|
if not isinstance(condition, dict):
|
|
continue
|
|
if condition.get("type") == "Ready":
|
|
return condition.get("status") == "True"
|
|
return False
|
|
|
|
|
|
def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]:
|
|
names: list[str] = []
|
|
not_ready: list[str] = []
|
|
for node in _items(payload):
|
|
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
|
|
status = node.get("status") if isinstance(node.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
if not name:
|
|
continue
|
|
names.append(name)
|
|
if not _node_ready(status.get("conditions")):
|
|
not_ready.append(name)
|
|
names.sort()
|
|
not_ready.sort()
|
|
total = len(names)
|
|
ready = total - len(not_ready)
|
|
return {
|
|
"total": total,
|
|
"ready": ready,
|
|
"not_ready": len(not_ready),
|
|
"names": names,
|
|
"not_ready_names": not_ready,
|
|
}
|
|
|
|
|
|
def _node_labels(labels: dict[str, Any]) -> dict[str, Any]:
|
|
if not isinstance(labels, dict):
|
|
return {}
|
|
keep: dict[str, Any] = {}
|
|
for key, value in labels.items():
|
|
if key.startswith("node-role.kubernetes.io/"):
|
|
keep[key] = value
|
|
if key in {
|
|
"kubernetes.io/arch",
|
|
"kubernetes.io/hostname",
|
|
"beta.kubernetes.io/arch",
|
|
"hardware",
|
|
"jetson",
|
|
}:
|
|
keep[key] = value
|
|
return keep
|
|
|
|
|
|
def _node_addresses(status: dict[str, Any]) -> dict[str, str]:
|
|
addresses = status.get("addresses") if isinstance(status.get("addresses"), list) else []
|
|
output: dict[str, str] = {}
|
|
for addr in addresses:
|
|
if not isinstance(addr, dict):
|
|
continue
|
|
addr_type = addr.get("type")
|
|
addr_value = addr.get("address")
|
|
if isinstance(addr_type, str) and isinstance(addr_value, str):
|
|
output[addr_type] = addr_value
|
|
return output
|
|
|
|
|
|
def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
details: list[dict[str, Any]] = []
|
|
for node in _items(payload):
|
|
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
|
|
spec = node.get("spec") if isinstance(node.get("spec"), dict) else {}
|
|
status = node.get("status") if isinstance(node.get("status"), dict) else {}
|
|
node_info = status.get("nodeInfo") if isinstance(status.get("nodeInfo"), dict) else {}
|
|
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
if not name:
|
|
continue
|
|
roles = _node_roles(labels)
|
|
conditions = _node_pressure_conditions(status.get("conditions"))
|
|
created_at = metadata.get("creationTimestamp") if isinstance(metadata.get("creationTimestamp"), str) else ""
|
|
taints = _node_taints(spec.get("taints"))
|
|
details.append(
|
|
{
|
|
"name": name,
|
|
"ready": _node_ready(status.get("conditions")),
|
|
"roles": roles,
|
|
"is_worker": _node_is_worker(labels),
|
|
"labels": _node_labels(labels),
|
|
"hardware": _hardware_hint(labels, node_info),
|
|
"arch": node_info.get("architecture") or "",
|
|
"os": node_info.get("operatingSystem") or "",
|
|
"kernel": node_info.get("kernelVersion") or "",
|
|
"kubelet": node_info.get("kubeletVersion") or "",
|
|
"container_runtime": node_info.get("containerRuntimeVersion") or "",
|
|
"addresses": _node_addresses(status),
|
|
"created_at": created_at,
|
|
"age_hours": _age_hours(created_at),
|
|
"taints": taints,
|
|
"unschedulable": bool(spec.get("unschedulable")),
|
|
"capacity": _node_capacity(status.get("capacity")),
|
|
"allocatable": _node_capacity(status.get("allocatable")),
|
|
"pressure": conditions,
|
|
}
|
|
)
|
|
details.sort(key=lambda item: item.get("name") or "")
|
|
return details
|
|
|
|
|
|
def _age_hours(timestamp: str) -> float | None:
|
|
if not timestamp:
|
|
return None
|
|
try:
|
|
parsed = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
return round((datetime.now(timezone.utc) - parsed).total_seconds() / 3600, 1)
|
|
|
|
|
|
def _node_age_stats(details: list[dict[str, Any]]) -> dict[str, Any]:
|
|
ages: list[tuple[str, float]] = []
|
|
for node in details:
|
|
name = node.get("name") if isinstance(node, dict) else ""
|
|
age = node.get("age_hours")
|
|
if isinstance(name, str) and name and isinstance(age, (int, float)):
|
|
ages.append((name, float(age)))
|
|
if not ages:
|
|
return {}
|
|
ages.sort(key=lambda item: item[1])
|
|
values = [age for _, age in ages]
|
|
return {
|
|
"min": round(min(values), 1),
|
|
"max": round(max(values), 1),
|
|
"avg": round(sum(values) / len(values), 1),
|
|
"youngest": [{"name": name, "age_hours": age} for name, age in ages[:5]],
|
|
"oldest": [{"name": name, "age_hours": age} for name, age in ages[-5:]],
|
|
}
|
|
|
|
|
|
def _node_flagged(details: list[dict[str, Any]], key: str) -> list[str]:
|
|
names: list[str] = []
|
|
for node in details:
|
|
name = node.get("name") if isinstance(node, dict) else ""
|
|
if not isinstance(name, str) or not name:
|
|
continue
|
|
if node.get(key):
|
|
names.append(name)
|
|
names.sort()
|
|
return names
|
|
|
|
|
|
def _node_taints(raw: Any) -> list[dict[str, str]]:
|
|
if not isinstance(raw, list):
|
|
return []
|
|
taints: list[dict[str, str]] = []
|
|
for entry in raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
key = entry.get("key")
|
|
effect = entry.get("effect")
|
|
value = entry.get("value")
|
|
if isinstance(key, str) and isinstance(effect, str):
|
|
taints.append(
|
|
{
|
|
"key": key,
|
|
"value": value if isinstance(value, str) else "",
|
|
"effect": effect,
|
|
}
|
|
)
|
|
return taints
|
|
|
|
|
|
def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]:
|
|
summary = {
|
|
"total": 0,
|
|
"ready": 0,
|
|
"workers": {"total": 0, "ready": 0},
|
|
"by_hardware": {},
|
|
"by_arch": {},
|
|
"by_role": {},
|
|
"not_ready_names": [],
|
|
"pressure_nodes": {key: [] for key in _PRESSURE_TYPES},
|
|
"age_stats": {},
|
|
"tainted_nodes": [],
|
|
"unschedulable_nodes": [],
|
|
}
|
|
not_ready: list[str] = []
|
|
for node in details:
|
|
name = _apply_node_summary(summary, node)
|
|
if name and not node.get("ready"):
|
|
not_ready.append(name)
|
|
not_ready.sort()
|
|
summary["not_ready_names"] = not_ready
|
|
for cond_type in summary["pressure_nodes"]:
|
|
summary["pressure_nodes"][cond_type].sort()
|
|
summary["age_stats"] = _node_age_stats(details)
|
|
summary["tainted_nodes"] = _node_flagged(details, "taints")
|
|
summary["unschedulable_nodes"] = _node_flagged(details, "unschedulable")
|
|
return summary
|
|
|
|
|
|
def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str:
|
|
name = node.get("name") if isinstance(node, dict) else ""
|
|
if not isinstance(name, str) or not name:
|
|
return ""
|
|
summary["total"] += 1
|
|
ready = bool(node.get("ready"))
|
|
if ready:
|
|
summary["ready"] += 1
|
|
if node.get("is_worker"):
|
|
summary["workers"]["total"] += 1
|
|
if ready:
|
|
summary["workers"]["ready"] += 1
|
|
hardware = node.get("hardware") or "unknown"
|
|
arch = node.get("arch") or "unknown"
|
|
summary["by_hardware"][hardware] = summary["by_hardware"].get(hardware, 0) + 1
|
|
summary["by_arch"][arch] = summary["by_arch"].get(arch, 0) + 1
|
|
for role in node.get("roles") or []:
|
|
summary["by_role"][role] = summary["by_role"].get(role, 0) + 1
|
|
_apply_pressure(summary, node, name)
|
|
return name
|
|
|
|
|
|
def _apply_pressure(summary: dict[str, Any], node: dict[str, Any], name: str) -> None:
|
|
pressure = node.get("pressure") or {}
|
|
if not isinstance(pressure, dict):
|
|
return
|
|
for cond_type, active in pressure.items():
|
|
if active and cond_type in summary["pressure_nodes"]:
|
|
summary["pressure_nodes"][cond_type].append(name)
|
|
|
|
|
|
def _node_capacity(raw: Any) -> dict[str, str]:
|
|
if not isinstance(raw, dict):
|
|
return {}
|
|
output: dict[str, str] = {}
|
|
for key in _CAPACITY_KEYS:
|
|
value = raw.get(key)
|
|
if isinstance(value, (str, int, float)) and value != "":
|
|
output[key] = str(value)
|
|
return output
|
|
|
|
|
|
def _node_pressure_conditions(conditions: Any) -> dict[str, bool]:
|
|
if not isinstance(conditions, list):
|
|
return {}
|
|
pressure: dict[str, bool] = {}
|
|
for condition in conditions:
|
|
if not isinstance(condition, dict):
|
|
continue
|
|
cond_type = condition.get("type")
|
|
if cond_type in _PRESSURE_TYPES:
|
|
pressure[cond_type] = condition.get("status") == "True"
|
|
return pressure
|
|
|
|
|
|
def _node_roles(labels: dict[str, Any]) -> list[str]:
|
|
roles: list[str] = []
|
|
for key in labels.keys():
|
|
if key.startswith("node-role.kubernetes.io/"):
|
|
role = key.split("/", 1)[-1]
|
|
if role:
|
|
roles.append(role)
|
|
return sorted(set(roles))
|
|
|
|
|
|
def _node_is_worker(labels: dict[str, Any]) -> bool:
|
|
if "node-role.kubernetes.io/control-plane" in labels:
|
|
return False
|
|
if "node-role.kubernetes.io/master" in labels:
|
|
return False
|
|
if "node-role.kubernetes.io/worker" in labels:
|
|
return True
|
|
return True
|
|
|
|
|
|
def _hardware_hint(labels: dict[str, Any], node_info: dict[str, Any]) -> str:
|
|
result = "unknown"
|
|
if str(labels.get("jetson") or "").lower() == "true":
|
|
result = "jetson"
|
|
else:
|
|
hardware = (labels.get("hardware") or "").strip().lower()
|
|
if hardware:
|
|
result = hardware
|
|
else:
|
|
kernel = str(node_info.get("kernelVersion") or "").lower()
|
|
os_image = str(node_info.get("osImage") or "").lower()
|
|
if "tegra" in kernel or "jetson" in os_image:
|
|
result = "jetson"
|
|
elif "raspi" in kernel or "bcm2711" in kernel:
|
|
result = "rpi"
|
|
else:
|
|
arch = str(node_info.get("architecture") or "").lower()
|
|
if arch == "amd64":
|
|
result = "amd64"
|
|
elif arch == "arm64":
|
|
result = "arm64-unknown"
|
|
return result
|
|
|
|
|
|
def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]:
|
|
if not isinstance(conditions, list):
|
|
return None, "", ""
|
|
for condition in conditions:
|
|
if not isinstance(condition, dict):
|
|
continue
|
|
if condition.get("type") != cond_type:
|
|
continue
|
|
status = condition.get("status")
|
|
if status == "True":
|
|
return True, condition.get("reason") or "", condition.get("message") or ""
|
|
if status == "False":
|
|
return False, condition.get("reason") or "", condition.get("message") or ""
|
|
return None, condition.get("reason") or "", condition.get("message") or ""
|
|
return None, "", ""
|
|
|
|
|
|
def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]:
|
|
not_ready: list[dict[str, Any]] = []
|
|
for item in _items(payload):
|
|
metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
|
|
spec = item.get("spec") if isinstance(item.get("spec"), dict) else {}
|
|
status = item.get("status") if isinstance(item.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
conditions = status.get("conditions")
|
|
ready, reason, message = _condition_status(conditions, "Ready")
|
|
suspended = bool(spec.get("suspend"))
|
|
if ready is True and not suspended:
|
|
continue
|
|
not_ready.append(
|
|
{
|
|
"name": name,
|
|
"namespace": namespace,
|
|
"ready": ready,
|
|
"suspended": suspended,
|
|
"reason": reason,
|
|
"message": message,
|
|
}
|
|
)
|
|
not_ready.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
|
|
return {
|
|
"total": len(_items(payload)),
|
|
"not_ready": len(not_ready),
|
|
"items": not_ready,
|
|
}
|
|
|
|
|
|
def _namespace_allowed(namespace: str) -> bool:
|
|
if not namespace:
|
|
return False
|
|
if namespace in _WORKLOAD_ALLOWED_NAMESPACES:
|
|
return True
|
|
return namespace not in _SYSTEM_NAMESPACES
|
|
|
|
|
|
def _event_timestamp(event: dict[str, Any]) -> str:
|
|
for key in ("eventTime", "lastTimestamp", "firstTimestamp"):
|
|
value = event.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _event_sort_key(timestamp: str) -> float:
|
|
if not timestamp:
|
|
return 0.0
|
|
try:
|
|
return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp()
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
|
|
def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]:
|
|
warnings: list[dict[str, Any]] = []
|
|
by_reason: dict[str, int] = {}
|
|
by_namespace: dict[str, int] = {}
|
|
for event in _items(payload):
|
|
metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
event_type = event.get("type") if isinstance(event.get("type"), str) else ""
|
|
if event_type != _EVENT_WARNING:
|
|
continue
|
|
reason = event.get("reason") if isinstance(event.get("reason"), str) else ""
|
|
message = event.get("message") if isinstance(event.get("message"), str) else ""
|
|
count = event.get("count") if isinstance(event.get("count"), int) else 1
|
|
involved = (
|
|
event.get("involvedObject") if isinstance(event.get("involvedObject"), dict) else {}
|
|
)
|
|
timestamp = _event_timestamp(event)
|
|
warnings.append(
|
|
{
|
|
"namespace": namespace,
|
|
"reason": reason,
|
|
"message": message,
|
|
"count": count,
|
|
"last_seen": timestamp,
|
|
"object_kind": involved.get("kind") or "",
|
|
"object_name": involved.get("name") or "",
|
|
}
|
|
)
|
|
if reason:
|
|
by_reason[reason] = by_reason.get(reason, 0) + count
|
|
if namespace:
|
|
by_namespace[namespace] = by_namespace.get(namespace, 0) + count
|
|
warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True)
|
|
top = warnings[:_EVENTS_MAX]
|
|
top_reason = ""
|
|
top_reason_count = 0
|
|
if by_reason:
|
|
top_reason, top_reason_count = sorted(
|
|
by_reason.items(), key=lambda item: (-item[1], item[0])
|
|
)[0]
|
|
latest_warning = top[0] if top else None
|
|
return {
|
|
"warnings_total": len(warnings),
|
|
"warnings_by_reason": by_reason,
|
|
"warnings_by_namespace": by_namespace,
|
|
"warnings_recent": top,
|
|
"warnings_top_reason": {"reason": top_reason, "count": top_reason_count},
|
|
"warnings_latest": latest_warning,
|
|
}
|
|
|
|
|
|
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
|
|
for key in _WORKLOAD_LABEL_KEYS:
|
|
value = labels.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value, f"label:{key}"
|
|
return "", ""
|
|
|
|
|
|
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
|
|
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
|
|
for owner in owners:
|
|
if not isinstance(owner, dict):
|
|
continue
|
|
name = owner.get("name")
|
|
kind = owner.get("kind")
|
|
if isinstance(name, str) and name:
|
|
return name, f"owner:{kind or 'unknown'}"
|
|
return "", ""
|
|
|
|
|
|
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
|
|
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
|
|
name, source = _workload_from_labels(labels)
|
|
if name:
|
|
return name, source
|
|
return _owner_reference(meta)
|
|
|
|
|
|
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
workloads: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
workload, source = _pod_workload(metadata)
|
|
if not workload:
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
key = (namespace, workload)
|
|
entry = workloads.setdefault(
|
|
key,
|
|
{
|
|
"namespace": namespace,
|
|
"workload": workload,
|
|
"source": source,
|
|
"nodes": {},
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
if node:
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in workloads.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
elif phase == "Pending":
|
|
entry["pods_pending"] += 1
|
|
elif phase == "Failed":
|
|
entry["pods_failed"] += 1
|
|
elif phase == "Succeeded":
|
|
entry["pods_succeeded"] += 1
|
|
output = list(namespaces.values())
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_nodes(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"nodes": {},
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in namespaces.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
_NODE_PHASE_KEYS = {
|
|
"Running": "pods_running",
|
|
"Pending": "pods_pending",
|
|
"Failed": "pods_failed",
|
|
"Succeeded": "pods_succeeded",
|
|
}
|
|
|
|
|
|
def _summarize_node_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
nodes: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
context = _node_pod_context(pod)
|
|
if not context:
|
|
continue
|
|
node, namespace, phase = context
|
|
entry = _node_pod_entry(nodes, node)
|
|
_node_pod_apply(entry, namespace, phase)
|
|
return _node_pod_finalize(nodes)
|
|
|
|
|
|
def _node_pod_context(pod: dict[str, Any]) -> tuple[str, str, str] | None:
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
return None
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
return None
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
return node, namespace, phase
|
|
|
|
|
|
def _node_pod_entry(nodes: dict[str, dict[str, Any]], node: str) -> dict[str, Any]:
|
|
return nodes.setdefault(
|
|
node,
|
|
{
|
|
"node": node,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
"namespaces": {},
|
|
},
|
|
)
|
|
|
|
|
|
def _node_pod_apply(entry: dict[str, Any], namespace: str, phase: str) -> None:
|
|
entry["pods_total"] += 1
|
|
phase_key = _NODE_PHASE_KEYS.get(phase)
|
|
if phase_key:
|
|
entry[phase_key] += 1
|
|
if namespace:
|
|
namespaces = entry["namespaces"]
|
|
namespaces[namespace] = namespaces.get(namespace, 0) + 1
|
|
|
|
|
|
def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for entry in nodes.values():
|
|
namespaces = entry.get("namespaces") or {}
|
|
if isinstance(namespaces, dict):
|
|
entry["namespaces_top"] = sorted(
|
|
namespaces.items(), key=lambda item: (-item[1], item[0])
|
|
)[:3]
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("node") or ""))
|
|
return output
|
|
|
|
|
|
def _node_pods_top(node_pods: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for entry in node_pods[:limit]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
output.append(
|
|
{
|
|
"node": entry.get("node"),
|
|
"pods_total": entry.get("pods_total"),
|
|
"pods_running": entry.get("pods_running"),
|
|
"namespaces_top": entry.get("namespaces_top") or [],
|
|
}
|
|
)
|
|
return output
|
|
|
|
|
|
def _record_pending_pod(
|
|
pending_oldest: list[dict[str, Any]],
|
|
info: dict[str, Any],
|
|
) -> bool:
|
|
age_hours = info.get("age_hours")
|
|
if age_hours is None:
|
|
return False
|
|
pending_oldest.append(info)
|
|
return age_hours >= _PENDING_15M_HOURS
|
|
|
|
|
|
def _update_pod_issue(
|
|
pod: dict[str, Any],
|
|
acc: dict[str, Any],
|
|
) -> None:
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
created_at = (
|
|
metadata.get("creationTimestamp")
|
|
if isinstance(metadata.get("creationTimestamp"), str)
|
|
else ""
|
|
)
|
|
age_hours = _age_hours(created_at)
|
|
if not name or not namespace:
|
|
return
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
restarts = 0
|
|
waiting_reasons: list[str] = []
|
|
for container in status.get("containerStatuses") or []:
|
|
if not isinstance(container, dict):
|
|
continue
|
|
restarts += int(container.get("restartCount") or 0)
|
|
state = container.get("state") if isinstance(container.get("state"), dict) else {}
|
|
waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {}
|
|
reason = waiting.get("reason")
|
|
if isinstance(reason, str) and reason:
|
|
waiting_reasons.append(reason)
|
|
acc["waiting_reasons"][reason] = acc["waiting_reasons"].get(reason, 0) + 1
|
|
phase_reason = status.get("reason")
|
|
if isinstance(phase_reason, str) and phase_reason:
|
|
acc["phase_reasons"][phase_reason] = acc["phase_reasons"].get(phase_reason, 0) + 1
|
|
if phase in acc["counts"]:
|
|
acc["counts"][phase] += 1
|
|
if phase in _PHASE_SEVERITY or restarts > 0:
|
|
acc["items"].append(
|
|
{
|
|
"namespace": namespace,
|
|
"pod": name,
|
|
"node": spec.get("nodeName") or "",
|
|
"phase": phase,
|
|
"reason": status.get("reason") or "",
|
|
"restarts": restarts,
|
|
"waiting_reasons": sorted(set(waiting_reasons)),
|
|
"created_at": created_at,
|
|
"age_hours": age_hours,
|
|
}
|
|
)
|
|
if phase == "Pending":
|
|
info = {
|
|
"namespace": namespace,
|
|
"pod": name,
|
|
"node": spec.get("nodeName") or "",
|
|
"age_hours": age_hours,
|
|
"reason": status.get("reason") or "",
|
|
}
|
|
if _record_pending_pod(acc["pending_oldest"], info):
|
|
acc["pending_over_15m"] += 1
|
|
|
|
|
|
def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]:
|
|
acc = {
|
|
"items": [],
|
|
"counts": {key: 0 for key in _PHASE_SEVERITY},
|
|
"pending_oldest": [],
|
|
"pending_over_15m": 0,
|
|
"waiting_reasons": {},
|
|
"phase_reasons": {},
|
|
}
|
|
for pod in _items(payload):
|
|
if isinstance(pod, dict):
|
|
_update_pod_issue(pod, acc)
|
|
items = acc["items"]
|
|
items.sort(
|
|
key=lambda item: (
|
|
-_PHASE_SEVERITY.get(item.get("phase") or "", 0),
|
|
-(item.get("restarts") or 0),
|
|
item.get("namespace") or "",
|
|
item.get("pod") or "",
|
|
)
|
|
)
|
|
pending_oldest = acc["pending_oldest"]
|
|
pending_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0))
|
|
return {
|
|
"counts": acc["counts"],
|
|
"items": items[:20],
|
|
"pending_oldest": pending_oldest[:10],
|
|
"pending_over_15m": acc["pending_over_15m"],
|
|
"waiting_reasons": acc["waiting_reasons"],
|
|
"phase_reasons": acc["phase_reasons"],
|
|
}
|
|
|
|
|
|
def _summarize_jobs(payload: dict[str, Any]) -> dict[str, Any]:
|
|
totals = {"total": 0, "active": 0, "failed": 0, "succeeded": 0}
|
|
by_namespace: dict[str, dict[str, int]] = {}
|
|
failing: list[dict[str, Any]] = []
|
|
active_oldest: list[dict[str, Any]] = []
|
|
for job in _items(payload):
|
|
metadata = job.get("metadata") if isinstance(job.get("metadata"), dict) else {}
|
|
status = job.get("status") if isinstance(job.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
created_at = (
|
|
metadata.get("creationTimestamp")
|
|
if isinstance(metadata.get("creationTimestamp"), str)
|
|
else ""
|
|
)
|
|
if not name or not namespace:
|
|
continue
|
|
active = int(status.get("active") or 0)
|
|
failed = int(status.get("failed") or 0)
|
|
succeeded = int(status.get("succeeded") or 0)
|
|
totals["total"] += 1
|
|
totals["active"] += active
|
|
totals["failed"] += failed
|
|
totals["succeeded"] += succeeded
|
|
entry = by_namespace.setdefault(namespace, {"active": 0, "failed": 0, "succeeded": 0})
|
|
entry["active"] += active
|
|
entry["failed"] += failed
|
|
entry["succeeded"] += succeeded
|
|
age_hours = _age_hours(created_at)
|
|
if failed > 0:
|
|
failing.append(
|
|
{
|
|
"namespace": namespace,
|
|
"job": name,
|
|
"failed": failed,
|
|
"age_hours": age_hours,
|
|
}
|
|
)
|
|
if active > 0 and age_hours is not None:
|
|
active_oldest.append(
|
|
{
|
|
"namespace": namespace,
|
|
"job": name,
|
|
"active": active,
|
|
"age_hours": age_hours,
|
|
}
|
|
)
|
|
failing.sort(
|
|
key=lambda item: (
|
|
-(item.get("failed") or 0),
|
|
-(item.get("age_hours") or 0.0),
|
|
item.get("namespace") or "",
|
|
item.get("job") or "",
|
|
)
|
|
)
|
|
active_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0))
|
|
namespace_summary = [
|
|
{
|
|
"namespace": ns,
|
|
"active": stats.get("active", 0),
|
|
"failed": stats.get("failed", 0),
|
|
"succeeded": stats.get("succeeded", 0),
|
|
}
|
|
for ns, stats in by_namespace.items()
|
|
]
|
|
namespace_summary.sort(
|
|
key=lambda item: (
|
|
-(item.get("active") or 0),
|
|
-(item.get("failed") or 0),
|
|
item.get("namespace") or "",
|
|
)
|
|
)
|
|
return {
|
|
"totals": totals,
|
|
"by_namespace": namespace_summary[:20],
|
|
"failing": failing[:20],
|
|
"active_oldest": active_oldest[:20],
|
|
}
|
|
|
|
|
|
def _summarize_deployments(payload: dict[str, Any]) -> dict[str, Any]:
|
|
items = _items(payload)
|
|
unhealthy: list[dict[str, Any]] = []
|
|
for dep in items:
|
|
metadata = dep.get("metadata") if isinstance(dep.get("metadata"), dict) else {}
|
|
spec = dep.get("spec") if isinstance(dep.get("spec"), dict) else {}
|
|
status = dep.get("status") if isinstance(dep.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
desired = int(spec.get("replicas") or 0)
|
|
ready = int(status.get("readyReplicas") or 0)
|
|
available = int(status.get("availableReplicas") or 0)
|
|
updated = int(status.get("updatedReplicas") or 0)
|
|
if desired <= 0:
|
|
continue
|
|
if ready < desired or available < desired:
|
|
unhealthy.append(
|
|
{
|
|
"name": name,
|
|
"namespace": namespace,
|
|
"desired": desired,
|
|
"ready": ready,
|
|
"available": available,
|
|
"updated": updated,
|
|
}
|
|
)
|
|
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
|
|
return {
|
|
"total": len(items),
|
|
"not_ready": len(unhealthy),
|
|
"items": unhealthy,
|
|
}
|
|
|
|
|
|
def _summarize_statefulsets(payload: dict[str, Any]) -> dict[str, Any]:
|
|
items = _items(payload)
|
|
unhealthy: list[dict[str, Any]] = []
|
|
for st in items:
|
|
metadata = st.get("metadata") if isinstance(st.get("metadata"), dict) else {}
|
|
spec = st.get("spec") if isinstance(st.get("spec"), dict) else {}
|
|
status = st.get("status") if isinstance(st.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
desired = int(spec.get("replicas") or 0)
|
|
ready = int(status.get("readyReplicas") or 0)
|
|
current = int(status.get("currentReplicas") or 0)
|
|
updated = int(status.get("updatedReplicas") or 0)
|
|
if desired <= 0:
|
|
continue
|
|
if ready < desired:
|
|
unhealthy.append(
|
|
{
|
|
"name": name,
|
|
"namespace": namespace,
|
|
"desired": desired,
|
|
"ready": ready,
|
|
"current": current,
|
|
"updated": updated,
|
|
}
|
|
)
|
|
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
|
|
return {
|
|
"total": len(items),
|
|
"not_ready": len(unhealthy),
|
|
"items": unhealthy,
|
|
}
|
|
|
|
|
|
def _summarize_daemonsets(payload: dict[str, Any]) -> dict[str, Any]:
|
|
items = _items(payload)
|
|
unhealthy: list[dict[str, Any]] = []
|
|
for ds in items:
|
|
metadata = ds.get("metadata") if isinstance(ds.get("metadata"), dict) else {}
|
|
status = ds.get("status") if isinstance(ds.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
desired = int(status.get("desiredNumberScheduled") or 0)
|
|
ready = int(status.get("numberReady") or 0)
|
|
updated = int(status.get("updatedNumberScheduled") or 0)
|
|
if desired <= 0:
|
|
continue
|
|
if ready < desired:
|
|
unhealthy.append(
|
|
{
|
|
"name": name,
|
|
"namespace": namespace,
|
|
"desired": desired,
|
|
"ready": ready,
|
|
"updated": updated,
|
|
}
|
|
)
|
|
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
|
|
return {
|
|
"total": len(items),
|
|
"not_ready": len(unhealthy),
|
|
"items": unhealthy,
|
|
}
|
|
|
|
|
|
def _summarize_workload_health(
|
|
deployments: dict[str, Any],
|
|
statefulsets: dict[str, Any],
|
|
daemonsets: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"deployments": deployments,
|
|
"statefulsets": statefulsets,
|
|
"daemonsets": daemonsets,
|
|
}
|
|
|
|
|
|
def _fetch_nodes(errors: list[str]) -> tuple[dict[str, Any], list[dict[str, Any]], dict[str, Any]]:
|
|
nodes: dict[str, Any] = {}
|
|
details: list[dict[str, Any]] = []
|
|
summary: dict[str, Any] = {}
|
|
try:
|
|
payload = get_json("/api/v1/nodes")
|
|
nodes = _summarize_nodes(payload)
|
|
details = _node_details(payload)
|
|
summary = _summarize_inventory(details)
|
|
except Exception as exc:
|
|
errors.append(f"nodes: {exc}")
|
|
return nodes, details, summary
|
|
|
|
|
|
def _fetch_flux(errors: list[str]) -> dict[str, Any]:
|
|
try:
|
|
payload = get_json(
|
|
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations"
|
|
)
|
|
return _summarize_kustomizations(payload)
|
|
except Exception as exc:
|
|
errors.append(f"flux: {exc}")
|
|
return {}
|
|
|
|
|
|
def _fetch_pods(
|
|
errors: list[str],
|
|
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
|
|
workloads: list[dict[str, Any]] = []
|
|
namespace_pods: list[dict[str, Any]] = []
|
|
namespace_nodes: list[dict[str, Any]] = []
|
|
node_pods: list[dict[str, Any]] = []
|
|
pod_issues: dict[str, Any] = {}
|
|
try:
|
|
pods_payload = get_json("/api/v1/pods?limit=5000")
|
|
workloads = _summarize_workloads(pods_payload)
|
|
namespace_pods = _summarize_namespace_pods(pods_payload)
|
|
namespace_nodes = _summarize_namespace_nodes(pods_payload)
|
|
node_pods = _summarize_node_pods(pods_payload)
|
|
pod_issues = _summarize_pod_issues(pods_payload)
|
|
except Exception as exc:
|
|
errors.append(f"pods: {exc}")
|
|
return workloads, namespace_pods, namespace_nodes, node_pods, pod_issues
|
|
|
|
|
|
def _fetch_jobs(errors: list[str]) -> dict[str, Any]:
|
|
try:
|
|
jobs_payload = get_json("/apis/batch/v1/jobs?limit=2000")
|
|
return _summarize_jobs(jobs_payload)
|
|
except Exception as exc:
|
|
errors.append(f"jobs: {exc}")
|
|
return {}
|
|
|
|
|
|
def _summarize_longhorn_volumes(payload: dict[str, Any]) -> dict[str, Any]:
|
|
items = _items(payload)
|
|
if not items:
|
|
return {}
|
|
by_state: dict[str, int] = {}
|
|
by_robustness: dict[str, int] = {}
|
|
degraded: list[dict[str, Any]] = []
|
|
attached_count = 0
|
|
detached_count = 0
|
|
degraded_count = 0
|
|
for volume in items:
|
|
metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {}
|
|
status = volume.get("status") if isinstance(volume.get("status"), dict) else {}
|
|
spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
if not name:
|
|
continue
|
|
state = status.get("state") if isinstance(status.get("state"), str) else "unknown"
|
|
robustness = (
|
|
status.get("robustness") if isinstance(status.get("robustness"), str) else "unknown"
|
|
)
|
|
state_lower = state.lower()
|
|
robustness_lower = robustness.lower()
|
|
by_state[state] = by_state.get(state, 0) + 1
|
|
by_robustness[robustness] = by_robustness.get(robustness, 0) + 1
|
|
if state_lower == "attached":
|
|
attached_count += 1
|
|
elif state_lower == "detached":
|
|
detached_count += 1
|
|
if robustness_lower in {"degraded", "faulted"}:
|
|
degraded_count += 1
|
|
degraded.append(
|
|
{
|
|
"name": name,
|
|
"state": state,
|
|
"robustness": robustness,
|
|
"size": spec.get("size"),
|
|
"actual_size": status.get("actualSize"),
|
|
}
|
|
)
|
|
degraded.sort(key=lambda item: item.get("name") or "")
|
|
return {
|
|
"total": len(items),
|
|
"by_state": by_state,
|
|
"by_robustness": by_robustness,
|
|
"attached_count": attached_count,
|
|
"detached_count": detached_count,
|
|
"degraded": degraded,
|
|
"degraded_count": degraded_count,
|
|
}
|
|
|
|
|
|
def _fetch_longhorn(errors: list[str]) -> dict[str, Any]:
|
|
try:
|
|
payload = get_json(
|
|
"/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes"
|
|
)
|
|
return _summarize_longhorn_volumes(payload)
|
|
except Exception as exc:
|
|
errors.append(f"longhorn: {exc}")
|
|
return {}
|
|
|
|
|
|
def _fetch_workload_health(errors: list[str]) -> dict[str, Any]:
|
|
try:
|
|
deployments_payload = get_json("/apis/apps/v1/deployments?limit=2000")
|
|
statefulsets_payload = get_json("/apis/apps/v1/statefulsets?limit=2000")
|
|
daemonsets_payload = get_json("/apis/apps/v1/daemonsets?limit=2000")
|
|
deployments = _summarize_deployments(deployments_payload)
|
|
statefulsets = _summarize_statefulsets(statefulsets_payload)
|
|
daemonsets = _summarize_daemonsets(daemonsets_payload)
|
|
return _summarize_workload_health(deployments, statefulsets, daemonsets)
|
|
except Exception as exc:
|
|
errors.append(f"workloads_health: {exc}")
|
|
return {}
|
|
|
|
|
|
def _fetch_events(errors: list[str]) -> dict[str, Any]:
|
|
try:
|
|
events_payload = get_json("/api/v1/events?limit=2000")
|
|
return _summarize_events(events_payload)
|
|
except Exception as exc:
|
|
errors.append(f"events: {exc}")
|
|
return {}
|
|
|
|
|
|
def _vm_query(expr: str) -> list[dict[str, Any]] | None:
|
|
base = settings.vm_url
|
|
if not base:
|
|
return None
|
|
url = f"{base.rstrip('/')}/api/v1/query"
|
|
params = {"query": expr}
|
|
with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client:
|
|
resp = client.get(url, params=params)
|
|
resp.raise_for_status()
|
|
payload = resp.json()
|
|
if payload.get("status") != "success":
|
|
return None
|
|
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
|
|
result = data.get("result")
|
|
return result if isinstance(result, list) else None
|
|
|
|
|
|
def _vm_scalar(expr: str) -> float | None:
|
|
result = _vm_query(expr)
|
|
if not result:
|
|
return None
|
|
value = result[0].get("value") if isinstance(result[0], dict) else None
|
|
if not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN:
|
|
return None
|
|
try:
|
|
return float(value[1])
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _vm_vector(expr: str) -> list[dict[str, Any]]:
|
|
result = _vm_query(expr) or []
|
|
output: list[dict[str, Any]] = []
|
|
for item in result:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
|
|
value = item.get("value") if isinstance(item.get("value"), list) else []
|
|
if len(value) < _VALUE_PAIR_LEN:
|
|
continue
|
|
try:
|
|
numeric = float(value[1])
|
|
except (TypeError, ValueError):
|
|
continue
|
|
output.append({"metric": metric, "value": numeric})
|
|
return output
|
|
|
|
|
|
def _filter_namespace_vector(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for item in entries:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
|
|
namespace = metric.get("namespace")
|
|
if not isinstance(namespace, str) or not namespace:
|
|
continue
|
|
if namespace in _SYSTEM_NAMESPACES:
|
|
continue
|
|
output.append(item)
|
|
return output
|
|
|
|
|
|
def _vm_topk(expr: str, label_key: str) -> dict[str, Any] | None:
|
|
result = _vm_vector(expr)
|
|
if not result:
|
|
return None
|
|
metric = result[0].get("metric") if isinstance(result[0], dict) else {}
|
|
value = result[0].get("value")
|
|
label = metric.get(label_key) if isinstance(metric, dict) else None
|
|
return {"label": label or "", "value": value, "metric": metric}
|
|
|
|
|
|
def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for item in _vm_vector(expr):
|
|
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
|
|
label = metric.get(label_key)
|
|
value = item.get("value")
|
|
if isinstance(label, str) and label:
|
|
output.append({"node": label, "value": value})
|
|
output.sort(key=lambda item: item.get("node") or "")
|
|
return output
|
|
|
|
|
|
def _postgres_connections(errors: list[str]) -> dict[str, Any]:
|
|
postgres: dict[str, Any] = {}
|
|
try:
|
|
postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)")
|
|
postgres["max"] = _vm_scalar("max(pg_settings_max_connections)")
|
|
postgres["by_db"] = _vm_vector(
|
|
"topk(5, sum by (datname) (pg_stat_activity_count))"
|
|
)
|
|
postgres["hottest_db"] = _vm_topk(
|
|
"topk(1, sum by (datname) (pg_stat_activity_count))",
|
|
"datname",
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"postgres: {exc}")
|
|
return postgres
|
|
|
|
|
|
def _hottest_nodes(errors: list[str]) -> dict[str, Any]:
|
|
hottest: dict[str, Any] = {}
|
|
try:
|
|
hottest["cpu"] = _vm_topk(
|
|
f'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
|
|
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
|
"node",
|
|
)
|
|
hottest["ram"] = _vm_topk(
|
|
f'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
|
|
f'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
|
"node",
|
|
)
|
|
hottest["net"] = _vm_topk(
|
|
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
|
|
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
|
"node",
|
|
)
|
|
hottest["io"] = _vm_topk(
|
|
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
|
|
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
|
"node",
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"hottest: {exc}")
|
|
return hottest
|
|
|
|
|
|
def _node_usage(errors: list[str]) -> dict[str, Any]:
|
|
usage: dict[str, Any] = {}
|
|
try:
|
|
usage["cpu"] = _vm_node_metric(
|
|
f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
|
|
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
|
"node",
|
|
)
|
|
usage["ram"] = _vm_node_metric(
|
|
'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
|
|
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
|
"node",
|
|
)
|
|
usage["net"] = _vm_node_metric(
|
|
f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
|
|
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) '
|
|
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
|
"node",
|
|
)
|
|
usage["io"] = _vm_node_metric(
|
|
f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
|
|
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
|
"node",
|
|
)
|
|
usage["disk"] = _vm_node_metric(
|
|
'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} '
|
|
'/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) '
|
|
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
|
"node",
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"node_usage: {exc}")
|
|
return usage
|
|
|
|
|
|
def _pvc_usage(errors: list[str]) -> list[dict[str, Any]]:
|
|
try:
|
|
entries = _vm_vector(
|
|
"topk(5, max by (namespace,persistentvolumeclaim) "
|
|
"(kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100))"
|
|
)
|
|
return _filter_namespace_vector(entries)
|
|
except Exception as exc:
|
|
errors.append(f"pvc_usage: {exc}")
|
|
return []
|
|
|
|
|
|
def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]:
|
|
values: list[float] = []
|
|
for entry in series:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
try:
|
|
values.append(float(entry.get("value")))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
if not values:
|
|
return {}
|
|
return {
|
|
"min": min(values),
|
|
"max": max(values),
|
|
"avg": sum(values) / len(values),
|
|
}
|
|
|
|
|
|
def _vm_namespace_totals(expr: str) -> dict[str, float]:
|
|
totals: dict[str, float] = {}
|
|
for item in _vm_vector(expr):
|
|
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
|
|
namespace = metric.get("namespace")
|
|
if not isinstance(namespace, str) or not namespace:
|
|
continue
|
|
try:
|
|
totals[namespace] = float(item.get("value"))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return totals
|
|
|
|
|
|
def _build_namespace_capacity(
|
|
cpu_usage: dict[str, float],
|
|
cpu_requests: dict[str, float],
|
|
mem_usage: dict[str, float],
|
|
mem_requests: dict[str, float],
|
|
) -> list[dict[str, Any]]:
|
|
namespaces = sorted(set(cpu_usage) | set(cpu_requests) | set(mem_usage) | set(mem_requests))
|
|
output: list[dict[str, Any]] = []
|
|
for namespace in namespaces:
|
|
cpu_used = cpu_usage.get(namespace)
|
|
cpu_req = cpu_requests.get(namespace)
|
|
mem_used = mem_usage.get(namespace)
|
|
mem_req = mem_requests.get(namespace)
|
|
cpu_ratio = None
|
|
mem_ratio = None
|
|
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)) and cpu_req > 0:
|
|
cpu_ratio = cpu_used / cpu_req
|
|
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)) and mem_req > 0:
|
|
mem_ratio = mem_used / mem_req
|
|
output.append(
|
|
{
|
|
"namespace": namespace,
|
|
"cpu_usage": cpu_used,
|
|
"cpu_requests": cpu_req,
|
|
"cpu_usage_ratio": cpu_ratio,
|
|
"mem_usage": mem_used,
|
|
"mem_requests": mem_req,
|
|
"mem_usage_ratio": mem_ratio,
|
|
}
|
|
)
|
|
output.sort(
|
|
key=lambda item: (
|
|
-(item.get("cpu_requests") or 0),
|
|
-(item.get("mem_requests") or 0),
|
|
item.get("namespace") or "",
|
|
)
|
|
)
|
|
return output
|
|
|
|
|
|
def _node_usage_profile(
|
|
node_usage: dict[str, list[dict[str, Any]]],
|
|
node_details: list[dict[str, Any]],
|
|
node_pods: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
usage: dict[str, dict[str, Any]] = {}
|
|
for key in ("cpu", "ram", "disk", "net", "io"):
|
|
for item in node_usage.get(key, []) or []:
|
|
node = item.get("node")
|
|
value = item.get("value")
|
|
if not isinstance(node, str) or not node:
|
|
continue
|
|
if not isinstance(value, (int, float)):
|
|
continue
|
|
usage.setdefault(node, {})[key] = float(value)
|
|
max_values: dict[str, float] = {}
|
|
for key in ("cpu", "ram", "disk", "net", "io"):
|
|
values = [entry.get(key) for entry in usage.values() if isinstance(entry.get(key), (int, float))]
|
|
max_values[key] = max(values) if values else 0.0
|
|
|
|
detail_map: dict[str, dict[str, Any]] = {
|
|
entry.get("name"): entry for entry in node_details if isinstance(entry, dict)
|
|
}
|
|
pod_map: dict[str, dict[str, Any]] = {
|
|
entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)
|
|
}
|
|
|
|
output: list[dict[str, Any]] = []
|
|
for node, entry in usage.items():
|
|
detail = detail_map.get(node, {})
|
|
pressure = detail.get("pressure") if isinstance(detail.get("pressure"), dict) else {}
|
|
pressure_count = sum(1 for value in pressure.values() if value)
|
|
taints = detail.get("taints") if isinstance(detail.get("taints"), list) else []
|
|
unschedulable = bool(detail.get("unschedulable"))
|
|
pods_total = None
|
|
pod_entry = pod_map.get(node)
|
|
if isinstance(pod_entry, dict):
|
|
pods_total = pod_entry.get("pods_total")
|
|
|
|
normalized: dict[str, float] = {}
|
|
for key in ("cpu", "ram", "disk", "net", "io"):
|
|
raw = entry.get(key)
|
|
max_val = max_values.get(key) or 0.0
|
|
if isinstance(raw, (int, float)) and max_val > 0:
|
|
normalized[f"{key}_norm"] = raw / max_val
|
|
norm_values = [v for v in normalized.values() if isinstance(v, (int, float))]
|
|
load_index = sum(norm_values) / len(norm_values) if norm_values else None
|
|
output.append(
|
|
{
|
|
"node": node,
|
|
"cpu": entry.get("cpu"),
|
|
"ram": entry.get("ram"),
|
|
"disk": entry.get("disk"),
|
|
"net": entry.get("net"),
|
|
"io": entry.get("io"),
|
|
**normalized,
|
|
"pressure_flags": pressure,
|
|
"pressure_count": pressure_count,
|
|
"taints": taints,
|
|
"unschedulable": unschedulable,
|
|
"pods_total": pods_total,
|
|
"load_index": load_index,
|
|
}
|
|
)
|
|
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
|
|
return output
|
|
|
|
|
|
def _percentile(values: list[float], percentile: float) -> float | None:
|
|
if not values:
|
|
return None
|
|
ordered = sorted(values)
|
|
idx = int(round((len(ordered) - 1) * percentile))
|
|
idx = min(max(idx, 0), len(ordered) - 1)
|
|
return ordered[idx]
|
|
|
|
|
|
def _node_load_summary(node_load: list[dict[str, Any]]) -> dict[str, Any]:
|
|
items = [
|
|
entry
|
|
for entry in node_load
|
|
if isinstance(entry, dict) and isinstance(entry.get("load_index"), (int, float))
|
|
]
|
|
if not items:
|
|
return {}
|
|
values = [float(entry.get("load_index") or 0) for entry in items]
|
|
avg = sum(values) / len(values)
|
|
variance = sum((value - avg) ** 2 for value in values) / len(values)
|
|
stddev = variance**0.5
|
|
top = sorted(items, key=lambda item: -(item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
|
|
bottom = sorted(items, key=lambda item: (item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
|
|
outliers = [
|
|
item
|
|
for item in items
|
|
if isinstance(item.get("load_index"), (int, float))
|
|
and item.get("load_index") >= avg + stddev
|
|
]
|
|
outliers.sort(key=lambda item: -(item.get("load_index") or 0))
|
|
return {
|
|
"avg": round(avg, 3),
|
|
"p90": round(_percentile(values, 0.9) or 0.0, 3),
|
|
"min": round(min(values), 3),
|
|
"max": round(max(values), 3),
|
|
"top": top,
|
|
"bottom": bottom,
|
|
"outliers": outliers[:_LOAD_TOP_COUNT],
|
|
}
|
|
|
|
|
|
def _namespace_capacity_summary(capacity: list[dict[str, Any]]) -> dict[str, Any]:
|
|
if not capacity:
|
|
return {}
|
|
cpu_ratio = [
|
|
entry
|
|
for entry in capacity
|
|
if isinstance(entry, dict) and isinstance(entry.get("cpu_usage_ratio"), (int, float))
|
|
]
|
|
mem_ratio = [
|
|
entry
|
|
for entry in capacity
|
|
if isinstance(entry, dict) and isinstance(entry.get("mem_usage_ratio"), (int, float))
|
|
]
|
|
cpu_ratio.sort(key=lambda item: -(item.get("cpu_usage_ratio") or 0))
|
|
mem_ratio.sort(key=lambda item: -(item.get("mem_usage_ratio") or 0))
|
|
cpu_headroom: list[dict[str, Any]] = []
|
|
mem_headroom: list[dict[str, Any]] = []
|
|
for entry in capacity:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
cpu_used = entry.get("cpu_usage")
|
|
cpu_req = entry.get("cpu_requests")
|
|
mem_used = entry.get("mem_usage")
|
|
mem_req = entry.get("mem_requests")
|
|
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)):
|
|
cpu_headroom.append(
|
|
{
|
|
"namespace": entry.get("namespace"),
|
|
"headroom": cpu_req - cpu_used,
|
|
"usage": cpu_used,
|
|
"requests": cpu_req,
|
|
"ratio": entry.get("cpu_usage_ratio"),
|
|
}
|
|
)
|
|
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)):
|
|
mem_headroom.append(
|
|
{
|
|
"namespace": entry.get("namespace"),
|
|
"headroom": mem_req - mem_used,
|
|
"usage": mem_used,
|
|
"requests": mem_req,
|
|
"ratio": entry.get("mem_usage_ratio"),
|
|
}
|
|
)
|
|
cpu_headroom.sort(key=lambda item: (item.get("headroom") or 0))
|
|
mem_headroom.sort(key=lambda item: (item.get("headroom") or 0))
|
|
cpu_over_names = [
|
|
entry.get("namespace")
|
|
for entry in cpu_ratio
|
|
if (entry.get("cpu_usage_ratio") or 0) > 1 and entry.get("namespace")
|
|
]
|
|
mem_over_names = [
|
|
entry.get("namespace")
|
|
for entry in mem_ratio
|
|
if (entry.get("mem_usage_ratio") or 0) > 1 and entry.get("namespace")
|
|
]
|
|
over_cpu = len(cpu_over_names)
|
|
over_mem = len(mem_over_names)
|
|
return {
|
|
"cpu_ratio_top": cpu_ratio[:_NAMESPACE_TOP_COUNT],
|
|
"mem_ratio_top": mem_ratio[:_NAMESPACE_TOP_COUNT],
|
|
"cpu_headroom_low": cpu_headroom[:_NAMESPACE_TOP_COUNT],
|
|
"mem_headroom_low": mem_headroom[:_NAMESPACE_TOP_COUNT],
|
|
"cpu_overcommitted": over_cpu,
|
|
"mem_overcommitted": over_mem,
|
|
"cpu_overcommitted_names": sorted({name for name in cpu_over_names if isinstance(name, str)}),
|
|
"mem_overcommitted_names": sorted({name for name in mem_over_names if isinstance(name, str)}),
|
|
}
|
|
|
|
|
|
def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
|
|
metrics: dict[str, Any] = {}
|
|
try:
|
|
metrics["nodes_total"] = _vm_scalar("count(kube_node_info)")
|
|
metrics["nodes_ready"] = _vm_scalar(
|
|
"count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})"
|
|
)
|
|
metrics["capacity_cpu"] = _vm_scalar("sum(kube_node_status_capacity_cpu_cores)")
|
|
metrics["allocatable_cpu"] = _vm_scalar("sum(kube_node_status_allocatable_cpu_cores)")
|
|
metrics["capacity_mem_bytes"] = _vm_scalar("sum(kube_node_status_capacity_memory_bytes)")
|
|
metrics["allocatable_mem_bytes"] = _vm_scalar("sum(kube_node_status_allocatable_memory_bytes)")
|
|
metrics["capacity_pods"] = _vm_scalar("sum(kube_node_status_capacity_pods)")
|
|
metrics["allocatable_pods"] = _vm_scalar("sum(kube_node_status_allocatable_pods)")
|
|
metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})")
|
|
metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})")
|
|
metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})")
|
|
metrics["pods_succeeded"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Succeeded\"})")
|
|
metrics["top_restarts_1h"] = _vm_vector(
|
|
f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
|
|
)
|
|
metrics["restart_namespace_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f"topk(5, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
|
|
)
|
|
)
|
|
metrics["pod_cpu_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f'topk(5, sum by (namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))'
|
|
)
|
|
)
|
|
metrics["pod_cpu_top_node"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f'topk(5, sum by (node,namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]) * on (namespace,pod) group_left(node) kube_pod_info))'
|
|
)
|
|
)
|
|
metrics["pod_mem_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
"topk(5, sum by (namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"}))"
|
|
)
|
|
)
|
|
metrics["pod_mem_top_node"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
"topk(5, sum by (node,namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"} * on (namespace,pod) group_left(node) kube_pod_info))"
|
|
)
|
|
)
|
|
metrics["job_failures_24h"] = _vm_vector(
|
|
"topk(5, sum by (namespace,job_name) (increase(kube_job_status_failed[24h])))"
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"vm: {exc}")
|
|
metrics["postgres_connections"] = _postgres_connections(errors)
|
|
metrics["hottest_nodes"] = _hottest_nodes(errors)
|
|
metrics["node_usage"] = _node_usage(errors)
|
|
metrics["node_usage_stats"] = {
|
|
"cpu": _usage_stats(metrics.get("node_usage", {}).get("cpu", [])),
|
|
"ram": _usage_stats(metrics.get("node_usage", {}).get("ram", [])),
|
|
"net": _usage_stats(metrics.get("node_usage", {}).get("net", [])),
|
|
"io": _usage_stats(metrics.get("node_usage", {}).get("io", [])),
|
|
"disk": _usage_stats(metrics.get("node_usage", {}).get("disk", [])),
|
|
}
|
|
try:
|
|
metrics["namespace_cpu_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f'topk(5, sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))'
|
|
)
|
|
)
|
|
metrics["namespace_mem_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
"topk(5, sum by (namespace) (container_memory_working_set_bytes{namespace!=\"\"}))"
|
|
)
|
|
)
|
|
metrics["namespace_cpu_requests_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
"topk(5, sum by (namespace) (kube_pod_container_resource_requests_cpu_cores))"
|
|
)
|
|
)
|
|
metrics["namespace_mem_requests_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
"topk(5, sum by (namespace) (kube_pod_container_resource_requests_memory_bytes))"
|
|
)
|
|
)
|
|
metrics["namespace_net_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f"topk(5, sum by (namespace) (rate(container_network_receive_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}]) + rate(container_network_transmit_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}])))"
|
|
)
|
|
)
|
|
metrics["namespace_io_top"] = _filter_namespace_vector(
|
|
_vm_vector(
|
|
f"topk(5, sum by (namespace) (rate(container_fs_reads_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}]) + rate(container_fs_writes_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}])))"
|
|
)
|
|
)
|
|
namespace_cpu_usage = _vm_namespace_totals(
|
|
f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))'
|
|
)
|
|
namespace_cpu_requests = _vm_namespace_totals(
|
|
"sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)"
|
|
)
|
|
namespace_mem_usage = _vm_namespace_totals(
|
|
'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})'
|
|
)
|
|
namespace_mem_requests = _vm_namespace_totals(
|
|
"sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)"
|
|
)
|
|
metrics["namespace_capacity"] = _build_namespace_capacity(
|
|
namespace_cpu_usage,
|
|
namespace_cpu_requests,
|
|
namespace_mem_usage,
|
|
namespace_mem_requests,
|
|
)
|
|
except Exception as exc:
|
|
errors.append(f"namespace_usage: {exc}")
|
|
metrics["namespace_capacity_summary"] = _namespace_capacity_summary(
|
|
metrics.get("namespace_capacity", []),
|
|
)
|
|
metrics["pvc_usage_top"] = _pvc_usage(errors)
|
|
metrics["units"] = {
|
|
"cpu": "percent",
|
|
"ram": "percent",
|
|
"net": "bytes_per_sec",
|
|
"io": "bytes_per_sec",
|
|
"disk": "percent",
|
|
"restarts": "count",
|
|
"pod_cpu": "cores",
|
|
"pod_mem": "bytes",
|
|
"pod_cpu_top_node": "cores",
|
|
"pod_mem_top_node": "bytes",
|
|
"job_failures_24h": "count",
|
|
"namespace_cpu": "cores",
|
|
"namespace_mem": "bytes",
|
|
"namespace_cpu_requests": "cores",
|
|
"namespace_mem_requests": "bytes",
|
|
"namespace_net": "bytes_per_sec",
|
|
"namespace_io": "bytes_per_sec",
|
|
"pvc_used_percent": "percent",
|
|
"capacity_cpu": "cores",
|
|
"allocatable_cpu": "cores",
|
|
"capacity_mem_bytes": "bytes",
|
|
"allocatable_mem_bytes": "bytes",
|
|
"capacity_pods": "count",
|
|
"allocatable_pods": "count",
|
|
}
|
|
metrics["windows"] = {
|
|
"rates": _RATE_WINDOW,
|
|
"restarts": _RESTARTS_WINDOW,
|
|
}
|
|
return metrics
|
|
|
|
|
|
def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
|
|
errors: list[str] = []
|
|
collected_at = datetime.now(timezone.utc)
|
|
|
|
nodes, node_details, node_summary = _fetch_nodes(errors)
|
|
kustomizations = _fetch_flux(errors)
|
|
workloads, namespace_pods, namespace_nodes, node_pods, pod_issues = _fetch_pods(errors)
|
|
jobs = _fetch_jobs(errors)
|
|
longhorn = _fetch_longhorn(errors)
|
|
workload_health = _fetch_workload_health(errors)
|
|
events = _fetch_events(errors)
|
|
|
|
metrics = _summarize_metrics(errors)
|
|
metrics["node_pods_top"] = _node_pods_top(node_pods)
|
|
metrics["node_load"] = _node_usage_profile(
|
|
metrics.get("node_usage", {}),
|
|
node_details,
|
|
node_pods,
|
|
)
|
|
metrics["node_load_summary"] = _node_load_summary(metrics.get("node_load", []))
|
|
metrics["node_load_by_hardware"] = _node_usage_by_hardware(metrics.get("node_load", []), node_details)
|
|
|
|
snapshot = {
|
|
"collected_at": collected_at.isoformat(),
|
|
"nodes": nodes,
|
|
"nodes_summary": node_summary,
|
|
"nodes_detail": node_details,
|
|
"flux": kustomizations,
|
|
"workloads": workloads,
|
|
"namespace_pods": namespace_pods,
|
|
"namespace_nodes": namespace_nodes,
|
|
"node_pods": node_pods,
|
|
"pod_issues": pod_issues,
|
|
"jobs": jobs,
|
|
"longhorn": longhorn,
|
|
"workloads_health": workload_health,
|
|
"events": events,
|
|
"metrics": metrics,
|
|
"errors": errors,
|
|
}
|
|
|
|
summary = ClusterStateSummary(
|
|
nodes_total=(nodes or {}).get("total"),
|
|
nodes_ready=(nodes or {}).get("ready"),
|
|
pods_running=metrics.get("pods_running"),
|
|
kustomizations_not_ready=(kustomizations or {}).get("not_ready"),
|
|
errors=len(errors),
|
|
)
|
|
set_cluster_state_metrics(
|
|
collected_at,
|
|
summary.nodes_total,
|
|
summary.nodes_ready,
|
|
summary.pods_running,
|
|
summary.kustomizations_not_ready,
|
|
)
|
|
return snapshot, summary
|
|
|
|
|
|
def run_cluster_state(storage: Storage) -> ClusterStateSummary:
|
|
snapshot, summary = collect_cluster_state()
|
|
try:
|
|
storage.record_cluster_state(snapshot)
|
|
storage.prune_cluster_state(settings.cluster_state_keep)
|
|
except Exception as exc:
|
|
logger.info(
|
|
"cluster state storage failed",
|
|
extra={"event": "cluster_state", "status": "error", "detail": str(exc)},
|
|
)
|
|
return summary
|