From 0d86700a666cd7a921beb4e0ba34026fed2dd4c1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 05:41:46 -0300 Subject: [PATCH] feat: enrich cluster state snapshot --- ariadne/services/cluster_state.py | 309 ++++++++++++++++++++++++++++++ tests/test_cluster_state.py | 31 ++- 2 files changed, 336 insertions(+), 4 deletions(-) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index ef370cb..b794327 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -16,6 +16,26 @@ from ..utils.logging import get_logger logger = get_logger(__name__) _VALUE_PAIR_LEN = 2 +_WORKLOAD_LABEL_KEYS = ( + "app.kubernetes.io/name", + "app", + "k8s-app", + "app.kubernetes.io/instance", + "release", +) +_SYSTEM_NAMESPACES = { + "kube-system", + "kube-public", + "kube-node-lease", + "flux-system", + "monitoring", + "logging", + "traefik", + "cert-manager", + "maintenance", + "postgres", + "vault", +} @dataclass(frozen=True) @@ -68,6 +88,112 @@ def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]: } +def _node_labels(labels: dict[str, Any]) -> dict[str, Any]: + if not isinstance(labels, dict): + return {} + keep: dict[str, Any] = {} + for key, value in labels.items(): + if key.startswith("node-role.kubernetes.io/"): + keep[key] = value + if key in { + "kubernetes.io/arch", + "kubernetes.io/hostname", + "beta.kubernetes.io/arch", + "hardware", + "jetson", + }: + keep[key] = value + return keep + + +def _node_addresses(status: dict[str, Any]) -> dict[str, str]: + addresses = status.get("addresses") if isinstance(status.get("addresses"), list) else [] + output: dict[str, str] = {} + for addr in addresses: + if not isinstance(addr, dict): + continue + addr_type = addr.get("type") + addr_value = addr.get("address") + if isinstance(addr_type, str) and isinstance(addr_value, str): + output[addr_type] = addr_value + return output + + +def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]: + details: list[dict[str, Any]] = [] + for node in _items(payload): + metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {} + status = node.get("status") if isinstance(node.get("status"), dict) else {} + node_info = status.get("nodeInfo") if isinstance(status.get("nodeInfo"), dict) else {} + labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + if not name: + continue + roles = _node_roles(labels) + details.append( + { + "name": name, + "ready": _node_ready(status.get("conditions")), + "roles": roles, + "is_worker": _node_is_worker(labels), + "labels": _node_labels(labels), + "hardware": _hardware_hint(labels, node_info), + "arch": node_info.get("architecture") or "", + "os": node_info.get("operatingSystem") or "", + "kernel": node_info.get("kernelVersion") or "", + "kubelet": node_info.get("kubeletVersion") or "", + "container_runtime": node_info.get("containerRuntimeVersion") or "", + "addresses": _node_addresses(status), + } + ) + details.sort(key=lambda item: item.get("name") or "") + return details + + +def _node_roles(labels: dict[str, Any]) -> list[str]: + roles: list[str] = [] + for key in labels.keys(): + if key.startswith("node-role.kubernetes.io/"): + role = key.split("/", 1)[-1] + if role: + roles.append(role) + return sorted(set(roles)) + + +def _node_is_worker(labels: dict[str, Any]) -> bool: + if "node-role.kubernetes.io/control-plane" in labels: + return False + if "node-role.kubernetes.io/master" in labels: + return False + if "node-role.kubernetes.io/worker" in labels: + return True + return True + + +def _hardware_hint(labels: dict[str, Any], node_info: dict[str, Any]) -> str: + result = "unknown" + if str(labels.get("jetson") or "").lower() == "true": + result = "jetson" + else: + hardware = (labels.get("hardware") or "").strip().lower() + if hardware: + result = hardware + else: + kernel = str(node_info.get("kernelVersion") or "").lower() + os_image = str(node_info.get("osImage") or "").lower() + if "tegra" in kernel or "jetson" in os_image: + result = "jetson" + elif "raspi" in kernel or "bcm2711" in kernel: + result = "rpi" + else: + arch = str(node_info.get("architecture") or "").lower() + if arch == "amd64": + result = "amd64" + elif arch == "arm64": + result = "arm64-unknown" + return result + + def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]: if not isinstance(conditions, list): return None, "", "" @@ -116,6 +242,82 @@ def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]: } +def _namespace_allowed(namespace: str) -> bool: + return bool(namespace) and namespace not in _SYSTEM_NAMESPACES + + +def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]: + for key in _WORKLOAD_LABEL_KEYS: + value = labels.get(key) + if isinstance(value, str) and value: + return value, f"label:{key}" + return "", "" + + +def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]: + owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else [] + for owner in owners: + if not isinstance(owner, dict): + continue + name = owner.get("name") + kind = owner.get("kind") + if isinstance(name, str) and name: + return name, f"owner:{kind or 'unknown'}" + return "", "" + + +def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]: + labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {} + name, source = _workload_from_labels(labels) + if name: + return name, source + return _owner_reference(meta) + + +def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]: + workloads: dict[tuple[str, str], dict[str, Any]] = {} + for pod in _items(payload): + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + if not _namespace_allowed(namespace): + continue + workload, source = _pod_workload(metadata) + if not workload: + continue + node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else "" + phase = status.get("phase") if isinstance(status.get("phase"), str) else "" + key = (namespace, workload) + entry = workloads.setdefault( + key, + { + "namespace": namespace, + "workload": workload, + "source": source, + "nodes": {}, + "pods_total": 0, + "pods_running": 0, + }, + ) + entry["pods_total"] += 1 + if phase == "Running": + entry["pods_running"] += 1 + if node: + nodes = entry["nodes"] + nodes[node] = nodes.get(node, 0) + 1 + output: list[dict[str, Any]] = [] + for entry in workloads.values(): + nodes = entry.get("nodes") or {} + primary = "" + if isinstance(nodes, dict) and nodes: + primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0] + entry["primary_node"] = primary + output.append(entry) + output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or "")) + return output + + def _vm_query(expr: str) -> list[dict[str, Any]] | None: base = settings.vm_url if not base: @@ -164,6 +366,99 @@ def _vm_vector(expr: str) -> list[dict[str, Any]]: return output +def _vm_topk(expr: str, label_key: str) -> dict[str, Any] | None: + result = _vm_vector(expr) + if not result: + return None + metric = result[0].get("metric") if isinstance(result[0], dict) else {} + value = result[0].get("value") + label = metric.get(label_key) if isinstance(metric, dict) else None + return {"label": label or "", "value": value, "metric": metric} + + +def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for item in _vm_vector(expr): + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + label = metric.get(label_key) + value = item.get("value") + if isinstance(label, str) and label: + output.append({"node": label, "value": value}) + output.sort(key=lambda item: item.get("node") or "") + return output + + +def _postgres_connections(errors: list[str]) -> dict[str, Any]: + postgres: dict[str, Any] = {} + try: + postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)") + postgres["max"] = _vm_scalar("max(pg_settings_max_connections)") + postgres["hottest_db"] = _vm_topk( + "topk(1, sum by (datname) (pg_stat_activity_count))", + "datname", + ) + except Exception as exc: + errors.append(f"postgres: {exc}") + return postgres + + +def _hottest_nodes(errors: list[str]) -> dict[str, Any]: + hottest: dict[str, Any] = {} + try: + hottest["cpu"] = _vm_topk( + 'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m]))) * 100) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")', + "node", + ) + hottest["ram"] = _vm_topk( + 'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) ' + '/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")', + "node", + ) + hottest["net"] = _vm_topk( + 'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~"lo"}[5m]) ' + '+ rate(node_network_transmit_bytes_total{device!~"lo"}[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")', + "node", + ) + hottest["io"] = _vm_topk( + 'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")', + "node", + ) + except Exception as exc: + errors.append(f"hottest: {exc}") + return hottest + + +def _node_usage(errors: list[str]) -> dict[str, Any]: + usage: dict[str, Any] = {} + try: + usage["cpu"] = _vm_node_metric( + 'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m]))) * 100) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', + "node", + ) + usage["ram"] = _vm_node_metric( + 'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) ' + '/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', + "node", + ) + usage["net"] = _vm_node_metric( + 'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~"lo"}[5m]) ' + '+ rate(node_network_transmit_bytes_total{device!~"lo"}[5m]))) * on(instance) group_left(node) ' + 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', + "node", + ) + usage["io"] = _vm_node_metric( + 'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', + "node", + ) + except Exception as exc: + errors.append(f"node_usage: {exc}") + return usage + + def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics: dict[str, Any] = {} try: @@ -180,6 +475,9 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: ) except Exception as exc: errors.append(f"vm: {exc}") + metrics["postgres_connections"] = _postgres_connections(errors) + metrics["hottest_nodes"] = _hottest_nodes(errors) + metrics["node_usage"] = _node_usage(errors) return metrics @@ -188,9 +486,11 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: collected_at = datetime.now(timezone.utc) nodes: dict[str, Any] | None = None + node_details: list[dict[str, Any]] = [] try: payload = get_json("/api/v1/nodes") nodes = _summarize_nodes(payload) + node_details = _node_details(payload) except Exception as exc: errors.append(f"nodes: {exc}") @@ -203,12 +503,21 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: except Exception as exc: errors.append(f"flux: {exc}") + workloads: list[dict[str, Any]] = [] + try: + pods_payload = get_json("/api/v1/pods?limit=5000") + workloads = _summarize_workloads(pods_payload) + except Exception as exc: + errors.append(f"pods: {exc}") + metrics = _summarize_metrics(errors) snapshot = { "collected_at": collected_at.isoformat(), "nodes": nodes or {}, + "nodes_detail": node_details, "flux": kustomizations or {}, + "workloads": workloads, "metrics": metrics, "errors": errors, } diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index d8b3d0c..381c2c1 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -23,15 +23,36 @@ def test_collect_cluster_state(monkeypatch) -> None: return { "items": [ { - "metadata": {"name": "node-a"}, - "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + "metadata": {"name": "node-a", "labels": {"kubernetes.io/arch": "arm64"}}, + "status": { + "conditions": [{"type": "Ready", "status": "True"}], + "nodeInfo": {"architecture": "arm64"}, + "addresses": [{"type": "InternalIP", "address": "10.0.0.1"}], + }, }, { - "metadata": {"name": "node-b"}, - "status": {"conditions": [{"type": "Ready", "status": "False"}]}, + "metadata": {"name": "node-b", "labels": {"kubernetes.io/arch": "amd64"}}, + "status": { + "conditions": [{"type": "Ready", "status": "False"}], + "nodeInfo": {"architecture": "amd64"}, + }, }, ] } + if path.startswith("/api/v1/pods"): + return { + "items": [ + { + "metadata": { + "name": "jellyfin-0", + "namespace": "media", + "labels": {"app": "jellyfin"}, + }, + "spec": {"nodeName": "node-a"}, + "status": {"phase": "Running"}, + } + ] + } return { "items": [ { @@ -55,6 +76,8 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["nodes"]["total"] == 2 assert snapshot["nodes"]["ready"] == 1 assert snapshot["flux"]["not_ready"] == 1 + assert snapshot["nodes_detail"] + assert snapshot["workloads"] assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0