feat: enrich cluster state snapshot
This commit is contained in:
parent
0675b8b688
commit
0d86700a66
@ -16,6 +16,26 @@ from ..utils.logging import get_logger
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_VALUE_PAIR_LEN = 2
|
||||
_WORKLOAD_LABEL_KEYS = (
|
||||
"app.kubernetes.io/name",
|
||||
"app",
|
||||
"k8s-app",
|
||||
"app.kubernetes.io/instance",
|
||||
"release",
|
||||
)
|
||||
_SYSTEM_NAMESPACES = {
|
||||
"kube-system",
|
||||
"kube-public",
|
||||
"kube-node-lease",
|
||||
"flux-system",
|
||||
"monitoring",
|
||||
"logging",
|
||||
"traefik",
|
||||
"cert-manager",
|
||||
"maintenance",
|
||||
"postgres",
|
||||
"vault",
|
||||
}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@ -68,6 +88,112 @@ def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def _node_labels(labels: dict[str, Any]) -> dict[str, Any]:
|
||||
if not isinstance(labels, dict):
|
||||
return {}
|
||||
keep: dict[str, Any] = {}
|
||||
for key, value in labels.items():
|
||||
if key.startswith("node-role.kubernetes.io/"):
|
||||
keep[key] = value
|
||||
if key in {
|
||||
"kubernetes.io/arch",
|
||||
"kubernetes.io/hostname",
|
||||
"beta.kubernetes.io/arch",
|
||||
"hardware",
|
||||
"jetson",
|
||||
}:
|
||||
keep[key] = value
|
||||
return keep
|
||||
|
||||
|
||||
def _node_addresses(status: dict[str, Any]) -> dict[str, str]:
|
||||
addresses = status.get("addresses") if isinstance(status.get("addresses"), list) else []
|
||||
output: dict[str, str] = {}
|
||||
for addr in addresses:
|
||||
if not isinstance(addr, dict):
|
||||
continue
|
||||
addr_type = addr.get("type")
|
||||
addr_value = addr.get("address")
|
||||
if isinstance(addr_type, str) and isinstance(addr_value, str):
|
||||
output[addr_type] = addr_value
|
||||
return output
|
||||
|
||||
|
||||
def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
details: list[dict[str, Any]] = []
|
||||
for node in _items(payload):
|
||||
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
|
||||
status = node.get("status") if isinstance(node.get("status"), dict) else {}
|
||||
node_info = status.get("nodeInfo") if isinstance(status.get("nodeInfo"), dict) else {}
|
||||
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
|
||||
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
||||
if not name:
|
||||
continue
|
||||
roles = _node_roles(labels)
|
||||
details.append(
|
||||
{
|
||||
"name": name,
|
||||
"ready": _node_ready(status.get("conditions")),
|
||||
"roles": roles,
|
||||
"is_worker": _node_is_worker(labels),
|
||||
"labels": _node_labels(labels),
|
||||
"hardware": _hardware_hint(labels, node_info),
|
||||
"arch": node_info.get("architecture") or "",
|
||||
"os": node_info.get("operatingSystem") or "",
|
||||
"kernel": node_info.get("kernelVersion") or "",
|
||||
"kubelet": node_info.get("kubeletVersion") or "",
|
||||
"container_runtime": node_info.get("containerRuntimeVersion") or "",
|
||||
"addresses": _node_addresses(status),
|
||||
}
|
||||
)
|
||||
details.sort(key=lambda item: item.get("name") or "")
|
||||
return details
|
||||
|
||||
|
||||
def _node_roles(labels: dict[str, Any]) -> list[str]:
|
||||
roles: list[str] = []
|
||||
for key in labels.keys():
|
||||
if key.startswith("node-role.kubernetes.io/"):
|
||||
role = key.split("/", 1)[-1]
|
||||
if role:
|
||||
roles.append(role)
|
||||
return sorted(set(roles))
|
||||
|
||||
|
||||
def _node_is_worker(labels: dict[str, Any]) -> bool:
|
||||
if "node-role.kubernetes.io/control-plane" in labels:
|
||||
return False
|
||||
if "node-role.kubernetes.io/master" in labels:
|
||||
return False
|
||||
if "node-role.kubernetes.io/worker" in labels:
|
||||
return True
|
||||
return True
|
||||
|
||||
|
||||
def _hardware_hint(labels: dict[str, Any], node_info: dict[str, Any]) -> str:
|
||||
result = "unknown"
|
||||
if str(labels.get("jetson") or "").lower() == "true":
|
||||
result = "jetson"
|
||||
else:
|
||||
hardware = (labels.get("hardware") or "").strip().lower()
|
||||
if hardware:
|
||||
result = hardware
|
||||
else:
|
||||
kernel = str(node_info.get("kernelVersion") or "").lower()
|
||||
os_image = str(node_info.get("osImage") or "").lower()
|
||||
if "tegra" in kernel or "jetson" in os_image:
|
||||
result = "jetson"
|
||||
elif "raspi" in kernel or "bcm2711" in kernel:
|
||||
result = "rpi"
|
||||
else:
|
||||
arch = str(node_info.get("architecture") or "").lower()
|
||||
if arch == "amd64":
|
||||
result = "amd64"
|
||||
elif arch == "arm64":
|
||||
result = "arm64-unknown"
|
||||
return result
|
||||
|
||||
|
||||
def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]:
|
||||
if not isinstance(conditions, list):
|
||||
return None, "", ""
|
||||
@ -116,6 +242,82 @@ def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
def _namespace_allowed(namespace: str) -> bool:
|
||||
return bool(namespace) and namespace not in _SYSTEM_NAMESPACES
|
||||
|
||||
|
||||
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
|
||||
for key in _WORKLOAD_LABEL_KEYS:
|
||||
value = labels.get(key)
|
||||
if isinstance(value, str) and value:
|
||||
return value, f"label:{key}"
|
||||
return "", ""
|
||||
|
||||
|
||||
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
|
||||
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
|
||||
for owner in owners:
|
||||
if not isinstance(owner, dict):
|
||||
continue
|
||||
name = owner.get("name")
|
||||
kind = owner.get("kind")
|
||||
if isinstance(name, str) and name:
|
||||
return name, f"owner:{kind or 'unknown'}"
|
||||
return "", ""
|
||||
|
||||
|
||||
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
|
||||
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
|
||||
name, source = _workload_from_labels(labels)
|
||||
if name:
|
||||
return name, source
|
||||
return _owner_reference(meta)
|
||||
|
||||
|
||||
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
workloads: dict[tuple[str, str], dict[str, Any]] = {}
|
||||
for pod in _items(payload):
|
||||
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
||||
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
||||
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
||||
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
||||
if not _namespace_allowed(namespace):
|
||||
continue
|
||||
workload, source = _pod_workload(metadata)
|
||||
if not workload:
|
||||
continue
|
||||
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
||||
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
||||
key = (namespace, workload)
|
||||
entry = workloads.setdefault(
|
||||
key,
|
||||
{
|
||||
"namespace": namespace,
|
||||
"workload": workload,
|
||||
"source": source,
|
||||
"nodes": {},
|
||||
"pods_total": 0,
|
||||
"pods_running": 0,
|
||||
},
|
||||
)
|
||||
entry["pods_total"] += 1
|
||||
if phase == "Running":
|
||||
entry["pods_running"] += 1
|
||||
if node:
|
||||
nodes = entry["nodes"]
|
||||
nodes[node] = nodes.get(node, 0) + 1
|
||||
output: list[dict[str, Any]] = []
|
||||
for entry in workloads.values():
|
||||
nodes = entry.get("nodes") or {}
|
||||
primary = ""
|
||||
if isinstance(nodes, dict) and nodes:
|
||||
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
||||
entry["primary_node"] = primary
|
||||
output.append(entry)
|
||||
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
|
||||
return output
|
||||
|
||||
|
||||
def _vm_query(expr: str) -> list[dict[str, Any]] | None:
|
||||
base = settings.vm_url
|
||||
if not base:
|
||||
@ -164,6 +366,99 @@ def _vm_vector(expr: str) -> list[dict[str, Any]]:
|
||||
return output
|
||||
|
||||
|
||||
def _vm_topk(expr: str, label_key: str) -> dict[str, Any] | None:
|
||||
result = _vm_vector(expr)
|
||||
if not result:
|
||||
return None
|
||||
metric = result[0].get("metric") if isinstance(result[0], dict) else {}
|
||||
value = result[0].get("value")
|
||||
label = metric.get(label_key) if isinstance(metric, dict) else None
|
||||
return {"label": label or "", "value": value, "metric": metric}
|
||||
|
||||
|
||||
def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]:
|
||||
output: list[dict[str, Any]] = []
|
||||
for item in _vm_vector(expr):
|
||||
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
|
||||
label = metric.get(label_key)
|
||||
value = item.get("value")
|
||||
if isinstance(label, str) and label:
|
||||
output.append({"node": label, "value": value})
|
||||
output.sort(key=lambda item: item.get("node") or "")
|
||||
return output
|
||||
|
||||
|
||||
def _postgres_connections(errors: list[str]) -> dict[str, Any]:
|
||||
postgres: dict[str, Any] = {}
|
||||
try:
|
||||
postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)")
|
||||
postgres["max"] = _vm_scalar("max(pg_settings_max_connections)")
|
||||
postgres["hottest_db"] = _vm_topk(
|
||||
"topk(1, sum by (datname) (pg_stat_activity_count))",
|
||||
"datname",
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"postgres: {exc}")
|
||||
return postgres
|
||||
|
||||
|
||||
def _hottest_nodes(errors: list[str]) -> dict[str, Any]:
|
||||
hottest: dict[str, Any] = {}
|
||||
try:
|
||||
hottest["cpu"] = _vm_topk(
|
||||
'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m]))) * 100) '
|
||||
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
||||
"node",
|
||||
)
|
||||
hottest["ram"] = _vm_topk(
|
||||
'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
|
||||
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
||||
"node",
|
||||
)
|
||||
hottest["net"] = _vm_topk(
|
||||
'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~"lo"}[5m]) '
|
||||
'+ rate(node_network_transmit_bytes_total{device!~"lo"}[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
||||
"node",
|
||||
)
|
||||
hottest["io"] = _vm_topk(
|
||||
'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) '
|
||||
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
|
||||
"node",
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"hottest: {exc}")
|
||||
return hottest
|
||||
|
||||
|
||||
def _node_usage(errors: list[str]) -> dict[str, Any]:
|
||||
usage: dict[str, Any] = {}
|
||||
try:
|
||||
usage["cpu"] = _vm_node_metric(
|
||||
'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m]))) * 100) '
|
||||
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
||||
"node",
|
||||
)
|
||||
usage["ram"] = _vm_node_metric(
|
||||
'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
|
||||
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
||||
"node",
|
||||
)
|
||||
usage["net"] = _vm_node_metric(
|
||||
'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~"lo"}[5m]) '
|
||||
'+ rate(node_network_transmit_bytes_total{device!~"lo"}[5m]))) * on(instance) group_left(node) '
|
||||
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
||||
"node",
|
||||
)
|
||||
usage["io"] = _vm_node_metric(
|
||||
'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) '
|
||||
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
|
||||
"node",
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"node_usage: {exc}")
|
||||
return usage
|
||||
|
||||
|
||||
def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
|
||||
metrics: dict[str, Any] = {}
|
||||
try:
|
||||
@ -180,6 +475,9 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
|
||||
)
|
||||
except Exception as exc:
|
||||
errors.append(f"vm: {exc}")
|
||||
metrics["postgres_connections"] = _postgres_connections(errors)
|
||||
metrics["hottest_nodes"] = _hottest_nodes(errors)
|
||||
metrics["node_usage"] = _node_usage(errors)
|
||||
return metrics
|
||||
|
||||
|
||||
@ -188,9 +486,11 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
|
||||
collected_at = datetime.now(timezone.utc)
|
||||
|
||||
nodes: dict[str, Any] | None = None
|
||||
node_details: list[dict[str, Any]] = []
|
||||
try:
|
||||
payload = get_json("/api/v1/nodes")
|
||||
nodes = _summarize_nodes(payload)
|
||||
node_details = _node_details(payload)
|
||||
except Exception as exc:
|
||||
errors.append(f"nodes: {exc}")
|
||||
|
||||
@ -203,12 +503,21 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
|
||||
except Exception as exc:
|
||||
errors.append(f"flux: {exc}")
|
||||
|
||||
workloads: list[dict[str, Any]] = []
|
||||
try:
|
||||
pods_payload = get_json("/api/v1/pods?limit=5000")
|
||||
workloads = _summarize_workloads(pods_payload)
|
||||
except Exception as exc:
|
||||
errors.append(f"pods: {exc}")
|
||||
|
||||
metrics = _summarize_metrics(errors)
|
||||
|
||||
snapshot = {
|
||||
"collected_at": collected_at.isoformat(),
|
||||
"nodes": nodes or {},
|
||||
"nodes_detail": node_details,
|
||||
"flux": kustomizations or {},
|
||||
"workloads": workloads,
|
||||
"metrics": metrics,
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
@ -23,15 +23,36 @@ def test_collect_cluster_state(monkeypatch) -> None:
|
||||
return {
|
||||
"items": [
|
||||
{
|
||||
"metadata": {"name": "node-a"},
|
||||
"status": {"conditions": [{"type": "Ready", "status": "True"}]},
|
||||
"metadata": {"name": "node-a", "labels": {"kubernetes.io/arch": "arm64"}},
|
||||
"status": {
|
||||
"conditions": [{"type": "Ready", "status": "True"}],
|
||||
"nodeInfo": {"architecture": "arm64"},
|
||||
"addresses": [{"type": "InternalIP", "address": "10.0.0.1"}],
|
||||
},
|
||||
},
|
||||
{
|
||||
"metadata": {"name": "node-b"},
|
||||
"status": {"conditions": [{"type": "Ready", "status": "False"}]},
|
||||
"metadata": {"name": "node-b", "labels": {"kubernetes.io/arch": "amd64"}},
|
||||
"status": {
|
||||
"conditions": [{"type": "Ready", "status": "False"}],
|
||||
"nodeInfo": {"architecture": "amd64"},
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
if path.startswith("/api/v1/pods"):
|
||||
return {
|
||||
"items": [
|
||||
{
|
||||
"metadata": {
|
||||
"name": "jellyfin-0",
|
||||
"namespace": "media",
|
||||
"labels": {"app": "jellyfin"},
|
||||
},
|
||||
"spec": {"nodeName": "node-a"},
|
||||
"status": {"phase": "Running"},
|
||||
}
|
||||
]
|
||||
}
|
||||
return {
|
||||
"items": [
|
||||
{
|
||||
@ -55,6 +76,8 @@ def test_collect_cluster_state(monkeypatch) -> None:
|
||||
assert snapshot["nodes"]["total"] == 2
|
||||
assert snapshot["nodes"]["ready"] == 1
|
||||
assert snapshot["flux"]["not_ready"] == 1
|
||||
assert snapshot["nodes_detail"]
|
||||
assert snapshot["workloads"]
|
||||
assert summary.nodes_total == 2
|
||||
assert summary.nodes_ready == 1
|
||||
assert summary.pods_running == 5.0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user