ariadne/ariadne/services/cluster_state.py

649 lines
24 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import httpx
from ..db.storage import Storage
from ..k8s.client import get_json
from ..metrics.metrics import set_cluster_state_metrics
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
_VALUE_PAIR_LEN = 2
_RATE_WINDOW = "5m"
_RESTARTS_WINDOW = "1h"
_WORKLOAD_LABEL_KEYS = (
"app.kubernetes.io/name",
"app",
"k8s-app",
"app.kubernetes.io/instance",
"release",
)
_SYSTEM_NAMESPACES = {
"kube-system",
"kube-public",
"kube-node-lease",
"flux-system",
"monitoring",
"logging",
"traefik",
"cert-manager",
"maintenance",
"postgres",
"vault",
}
_WORKLOAD_ALLOWED_NAMESPACES = {
"maintenance",
}
@dataclass(frozen=True)
class ClusterStateSummary:
nodes_total: int | None
nodes_ready: int | None
pods_running: int | None
kustomizations_not_ready: int | None
errors: int
def _items(payload: dict[str, Any]) -> list[dict[str, Any]]:
items = payload.get("items") if isinstance(payload.get("items"), list) else []
return [item for item in items if isinstance(item, dict)]
def _node_ready(conditions: Any) -> bool:
if not isinstance(conditions, list):
return False
for condition in conditions:
if not isinstance(condition, dict):
continue
if condition.get("type") == "Ready":
return condition.get("status") == "True"
return False
def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]:
names: list[str] = []
not_ready: list[str] = []
for node in _items(payload):
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
status = node.get("status") if isinstance(node.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
if not name:
continue
names.append(name)
if not _node_ready(status.get("conditions")):
not_ready.append(name)
names.sort()
not_ready.sort()
total = len(names)
ready = total - len(not_ready)
return {
"total": total,
"ready": ready,
"not_ready": len(not_ready),
"names": names,
"not_ready_names": not_ready,
}
def _node_labels(labels: dict[str, Any]) -> dict[str, Any]:
if not isinstance(labels, dict):
return {}
keep: dict[str, Any] = {}
for key, value in labels.items():
if key.startswith("node-role.kubernetes.io/"):
keep[key] = value
if key in {
"kubernetes.io/arch",
"kubernetes.io/hostname",
"beta.kubernetes.io/arch",
"hardware",
"jetson",
}:
keep[key] = value
return keep
def _node_addresses(status: dict[str, Any]) -> dict[str, str]:
addresses = status.get("addresses") if isinstance(status.get("addresses"), list) else []
output: dict[str, str] = {}
for addr in addresses:
if not isinstance(addr, dict):
continue
addr_type = addr.get("type")
addr_value = addr.get("address")
if isinstance(addr_type, str) and isinstance(addr_value, str):
output[addr_type] = addr_value
return output
def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]:
details: list[dict[str, Any]] = []
for node in _items(payload):
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
status = node.get("status") if isinstance(node.get("status"), dict) else {}
node_info = status.get("nodeInfo") if isinstance(status.get("nodeInfo"), dict) else {}
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
if not name:
continue
roles = _node_roles(labels)
details.append(
{
"name": name,
"ready": _node_ready(status.get("conditions")),
"roles": roles,
"is_worker": _node_is_worker(labels),
"labels": _node_labels(labels),
"hardware": _hardware_hint(labels, node_info),
"arch": node_info.get("architecture") or "",
"os": node_info.get("operatingSystem") or "",
"kernel": node_info.get("kernelVersion") or "",
"kubelet": node_info.get("kubeletVersion") or "",
"container_runtime": node_info.get("containerRuntimeVersion") or "",
"addresses": _node_addresses(status),
}
)
details.sort(key=lambda item: item.get("name") or "")
return details
def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]:
summary = {
"total": 0,
"ready": 0,
"workers": {"total": 0, "ready": 0},
"by_hardware": {},
"by_arch": {},
"by_role": {},
"not_ready_names": [],
}
not_ready: list[str] = []
for node in details:
name = node.get("name") if isinstance(node, dict) else ""
if not isinstance(name, str) or not name:
continue
summary["total"] += 1
ready = bool(node.get("ready"))
if ready:
summary["ready"] += 1
else:
not_ready.append(name)
if node.get("is_worker"):
summary["workers"]["total"] += 1
if ready:
summary["workers"]["ready"] += 1
hardware = node.get("hardware") or "unknown"
arch = node.get("arch") or "unknown"
summary["by_hardware"][hardware] = summary["by_hardware"].get(hardware, 0) + 1
summary["by_arch"][arch] = summary["by_arch"].get(arch, 0) + 1
for role in node.get("roles") or []:
summary["by_role"][role] = summary["by_role"].get(role, 0) + 1
not_ready.sort()
summary["not_ready_names"] = not_ready
return summary
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _node_is_worker(labels: dict[str, Any]) -> bool:
if "node-role.kubernetes.io/control-plane" in labels:
return False
if "node-role.kubernetes.io/master" in labels:
return False
if "node-role.kubernetes.io/worker" in labels:
return True
return True
def _hardware_hint(labels: dict[str, Any], node_info: dict[str, Any]) -> str:
result = "unknown"
if str(labels.get("jetson") or "").lower() == "true":
result = "jetson"
else:
hardware = (labels.get("hardware") or "").strip().lower()
if hardware:
result = hardware
else:
kernel = str(node_info.get("kernelVersion") or "").lower()
os_image = str(node_info.get("osImage") or "").lower()
if "tegra" in kernel or "jetson" in os_image:
result = "jetson"
elif "raspi" in kernel or "bcm2711" in kernel:
result = "rpi"
else:
arch = str(node_info.get("architecture") or "").lower()
if arch == "amd64":
result = "amd64"
elif arch == "arm64":
result = "arm64-unknown"
return result
def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]:
if not isinstance(conditions, list):
return None, "", ""
for condition in conditions:
if not isinstance(condition, dict):
continue
if condition.get("type") != cond_type:
continue
status = condition.get("status")
if status == "True":
return True, condition.get("reason") or "", condition.get("message") or ""
if status == "False":
return False, condition.get("reason") or "", condition.get("message") or ""
return None, condition.get("reason") or "", condition.get("message") or ""
return None, "", ""
def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]:
not_ready: list[dict[str, Any]] = []
for item in _items(payload):
metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
spec = item.get("spec") if isinstance(item.get("spec"), dict) else {}
status = item.get("status") if isinstance(item.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
conditions = status.get("conditions")
ready, reason, message = _condition_status(conditions, "Ready")
suspended = bool(spec.get("suspend"))
if ready is True and not suspended:
continue
not_ready.append(
{
"name": name,
"namespace": namespace,
"ready": ready,
"suspended": suspended,
"reason": reason,
"message": message,
}
)
not_ready.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(_items(payload)),
"not_ready": len(not_ready),
"items": not_ready,
}
def _namespace_allowed(namespace: str) -> bool:
if not namespace:
return False
if namespace in _WORKLOAD_ALLOWED_NAMESPACES:
return True
return namespace not in _SYSTEM_NAMESPACES
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
for key in _WORKLOAD_LABEL_KEYS:
value = labels.get(key)
if isinstance(value, str) and value:
return value, f"label:{key}"
return "", ""
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
for owner in owners:
if not isinstance(owner, dict):
continue
name = owner.get("name")
kind = owner.get("kind")
if isinstance(name, str) and name:
return name, f"owner:{kind or 'unknown'}"
return "", ""
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
name, source = _workload_from_labels(labels)
if name:
return name, source
return _owner_reference(meta)
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
workloads: dict[tuple[str, str], dict[str, Any]] = {}
for pod in _items(payload):
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
workload, source = _pod_workload(metadata)
if not workload:
continue
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
key = (namespace, workload)
entry = workloads.setdefault(
key,
{
"namespace": namespace,
"workload": workload,
"source": source,
"nodes": {},
"pods_total": 0,
"pods_running": 0,
},
)
entry["pods_total"] += 1
if phase == "Running":
entry["pods_running"] += 1
if node:
nodes = entry["nodes"]
nodes[node] = nodes.get(node, 0) + 1
output: list[dict[str, Any]] = []
for entry in workloads.values():
nodes = entry.get("nodes") or {}
primary = ""
if isinstance(nodes, dict) and nodes:
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
entry["primary_node"] = primary
output.append(entry)
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
return output
def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
namespaces: dict[str, dict[str, Any]] = {}
for pod in _items(payload):
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
entry = namespaces.setdefault(
namespace,
{
"namespace": namespace,
"pods_total": 0,
"pods_running": 0,
"pods_pending": 0,
"pods_failed": 0,
"pods_succeeded": 0,
},
)
entry["pods_total"] += 1
if phase == "Running":
entry["pods_running"] += 1
elif phase == "Pending":
entry["pods_pending"] += 1
elif phase == "Failed":
entry["pods_failed"] += 1
elif phase == "Succeeded":
entry["pods_succeeded"] += 1
output = list(namespaces.values())
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
return output
def _vm_query(expr: str) -> list[dict[str, Any]] | None:
base = settings.vm_url
if not base:
return None
url = f"{base.rstrip('/')}/api/v1/query"
params = {"query": expr}
with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client:
resp = client.get(url, params=params)
resp.raise_for_status()
payload = resp.json()
if payload.get("status") != "success":
return None
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
result = data.get("result")
return result if isinstance(result, list) else None
def _vm_scalar(expr: str) -> float | None:
result = _vm_query(expr)
if not result:
return None
value = result[0].get("value") if isinstance(result[0], dict) else None
if not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN:
return None
try:
return float(value[1])
except (TypeError, ValueError):
return None
def _vm_vector(expr: str) -> list[dict[str, Any]]:
result = _vm_query(expr) or []
output: list[dict[str, Any]] = []
for item in result:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
value = item.get("value") if isinstance(item.get("value"), list) else []
if len(value) < _VALUE_PAIR_LEN:
continue
try:
numeric = float(value[1])
except (TypeError, ValueError):
continue
output.append({"metric": metric, "value": numeric})
return output
def _vm_topk(expr: str, label_key: str) -> dict[str, Any] | None:
result = _vm_vector(expr)
if not result:
return None
metric = result[0].get("metric") if isinstance(result[0], dict) else {}
value = result[0].get("value")
label = metric.get(label_key) if isinstance(metric, dict) else None
return {"label": label or "", "value": value, "metric": metric}
def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in _vm_vector(expr):
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
value = item.get("value")
if isinstance(label, str) and label:
output.append({"node": label, "value": value})
output.sort(key=lambda item: item.get("node") or "")
return output
def _postgres_connections(errors: list[str]) -> dict[str, Any]:
postgres: dict[str, Any] = {}
try:
postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)")
postgres["max"] = _vm_scalar("max(pg_settings_max_connections)")
postgres["hottest_db"] = _vm_topk(
"topk(1, sum by (datname) (pg_stat_activity_count))",
"datname",
)
except Exception as exc:
errors.append(f"postgres: {exc}")
return postgres
def _hottest_nodes(errors: list[str]) -> dict[str, Any]:
hottest: dict[str, Any] = {}
try:
hottest["cpu"] = _vm_topk(
f'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["ram"] = _vm_topk(
'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["net"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["io"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
except Exception as exc:
errors.append(f"hottest: {exc}")
return hottest
def _node_usage(errors: list[str]) -> dict[str, Any]:
usage: dict[str, Any] = {}
try:
usage["cpu"] = _vm_node_metric(
f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
usage["ram"] = _vm_node_metric(
'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
usage["net"] = _vm_node_metric(
f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
usage["io"] = _vm_node_metric(
f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
except Exception as exc:
errors.append(f"node_usage: {exc}")
return usage
def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
metrics: dict[str, Any] = {}
try:
metrics["nodes_total"] = _vm_scalar("count(kube_node_info)")
metrics["nodes_ready"] = _vm_scalar(
"count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})"
)
metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})")
metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})")
metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})")
metrics["pods_succeeded"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Succeeded\"})")
metrics["top_restarts_1h"] = _vm_vector(
f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
)
except Exception as exc:
errors.append(f"vm: {exc}")
metrics["postgres_connections"] = _postgres_connections(errors)
metrics["hottest_nodes"] = _hottest_nodes(errors)
metrics["node_usage"] = _node_usage(errors)
metrics["units"] = {
"cpu": "percent",
"ram": "percent",
"net": "bytes_per_sec",
"io": "bytes_per_sec",
"restarts": "count",
}
metrics["windows"] = {
"rates": _RATE_WINDOW,
"restarts": _RESTARTS_WINDOW,
}
return metrics
def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
errors: list[str] = []
collected_at = datetime.now(timezone.utc)
nodes: dict[str, Any] | None = None
node_details: list[dict[str, Any]] = []
node_summary: dict[str, Any] = {}
try:
payload = get_json("/api/v1/nodes")
nodes = _summarize_nodes(payload)
node_details = _node_details(payload)
node_summary = _summarize_inventory(node_details)
except Exception as exc:
errors.append(f"nodes: {exc}")
kustomizations: dict[str, Any] | None = None
try:
payload = get_json(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations"
)
kustomizations = _summarize_kustomizations(payload)
except Exception as exc:
errors.append(f"flux: {exc}")
workloads: list[dict[str, Any]] = []
namespace_pods: list[dict[str, Any]] = []
try:
pods_payload = get_json("/api/v1/pods?limit=5000")
workloads = _summarize_workloads(pods_payload)
namespace_pods = _summarize_namespace_pods(pods_payload)
except Exception as exc:
errors.append(f"pods: {exc}")
metrics = _summarize_metrics(errors)
snapshot = {
"collected_at": collected_at.isoformat(),
"nodes": nodes or {},
"nodes_summary": node_summary,
"nodes_detail": node_details,
"flux": kustomizations or {},
"workloads": workloads,
"namespace_pods": namespace_pods,
"metrics": metrics,
"errors": errors,
}
summary = ClusterStateSummary(
nodes_total=(nodes or {}).get("total"),
nodes_ready=(nodes or {}).get("ready"),
pods_running=metrics.get("pods_running"),
kustomizations_not_ready=(kustomizations or {}).get("not_ready"),
errors=len(errors),
)
set_cluster_state_metrics(
collected_at,
summary.nodes_total,
summary.nodes_ready,
summary.pods_running,
summary.kustomizations_not_ready,
)
return snapshot, summary
def run_cluster_state(storage: Storage) -> ClusterStateSummary:
snapshot, summary = collect_cluster_state()
try:
storage.record_cluster_state(snapshot)
storage.prune_cluster_state(settings.cluster_state_keep)
except Exception as exc:
logger.info(
"cluster state storage failed",
extra={"event": "cluster_state", "status": "error", "detail": str(exc)},
)
return summary