395 lines
16 KiB
Python

from __future__ import annotations
from .common import *
from .nodes import *
from .k8s import *
from .pods import *
from .vm import *
from .trends import *
def _pvc_usage_trends() -> dict[str, list[dict[str, Any]]]:
trends: dict[str, list[dict[str, Any]]] = {}
expr = "kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100"
for window in _TREND_WINDOWS:
entries = _vm_vector(
f"topk({_TREND_PVC_LIMIT}, max_over_time(({expr})[{window}]))"
)
trends[window] = _pvc_top(entries)
return trends
def _postgres_connections(errors: list[str]) -> dict[str, Any]:
postgres: dict[str, Any] = {}
try:
postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)")
postgres["max"] = _vm_scalar("max(pg_settings_max_connections)")
postgres["by_db"] = _vm_vector(
"topk(5, sum by (datname) (pg_stat_activity_count))"
)
postgres["hottest_db"] = _vm_topk(
"topk(1, sum by (datname) (pg_stat_activity_count))",
"datname",
)
except Exception as exc:
errors.append(f"postgres: {exc}")
return postgres
def _hottest_nodes(errors: list[str]) -> dict[str, Any]:
hottest: dict[str, Any] = {}
try:
hottest["cpu"] = _vm_topk(
f'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["ram"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
f'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["net"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["io"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
except Exception as exc:
errors.append(f"hottest: {exc}")
return hottest
def _node_usage(errors: list[str]) -> dict[str, Any]:
usage: dict[str, Any] = {}
try:
exprs = _node_usage_exprs()
usage["cpu"] = _vm_node_metric(exprs["cpu"], "node")
usage["ram"] = _vm_node_metric(exprs["ram"], "node")
usage["net"] = _vm_node_metric(exprs["net"], "node")
usage["io"] = _vm_node_metric(exprs["io"], "node")
usage["disk"] = _vm_node_metric(exprs["disk"], "node")
except Exception as exc:
errors.append(f"node_usage: {exc}")
return usage
def _pvc_usage(errors: list[str]) -> list[dict[str, Any]]:
try:
entries = _vm_vector(
"topk(5, max by (namespace,persistentvolumeclaim) "
"(kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100))"
)
return _filter_namespace_vector(entries)
except Exception as exc:
errors.append(f"pvc_usage: {exc}")
return []
def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]:
values: list[float] = []
for entry in series:
if not isinstance(entry, dict):
continue
try:
values.append(float(entry.get("value")))
except (TypeError, ValueError):
continue
if not values:
return {}
return {
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
}
def _vm_namespace_totals(expr: str) -> dict[str, float]:
totals: dict[str, float] = {}
for item in _vm_vector(expr):
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
if not isinstance(namespace, str) or not namespace:
continue
try:
totals[namespace] = float(item.get("value"))
except (TypeError, ValueError):
continue
return totals
def _build_namespace_capacity(
cpu_usage: dict[str, float],
cpu_requests: dict[str, float],
mem_usage: dict[str, float],
mem_requests: dict[str, float],
) -> list[dict[str, Any]]:
namespaces = sorted(set(cpu_usage) | set(cpu_requests) | set(mem_usage) | set(mem_requests))
output: list[dict[str, Any]] = []
for namespace in namespaces:
cpu_used = cpu_usage.get(namespace)
cpu_req = cpu_requests.get(namespace)
mem_used = mem_usage.get(namespace)
mem_req = mem_requests.get(namespace)
cpu_ratio = None
mem_ratio = None
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)) and cpu_req > 0:
cpu_ratio = cpu_used / cpu_req
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)) and mem_req > 0:
mem_ratio = mem_used / mem_req
output.append(
{
"namespace": namespace,
"cpu_usage": cpu_used,
"cpu_requests": cpu_req,
"cpu_usage_ratio": cpu_ratio,
"mem_usage": mem_used,
"mem_requests": mem_req,
"mem_usage_ratio": mem_ratio,
}
)
output.sort(
key=lambda item: (
-(item.get("cpu_requests") or 0),
-(item.get("mem_requests") or 0),
item.get("namespace") or "",
)
)
return output
def _node_usage_profile(
node_usage: dict[str, list[dict[str, Any]]],
node_details: list[dict[str, Any]],
node_pods: list[dict[str, Any]],
) -> list[dict[str, Any]]:
usage: dict[str, dict[str, Any]] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
for item in node_usage.get(key, []) or []:
node = item.get("node")
value = item.get("value")
if not isinstance(node, str) or not node:
continue
if not isinstance(value, (int, float)):
continue
usage.setdefault(node, {})[key] = float(value)
max_values: dict[str, float] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
values = [entry.get(key) for entry in usage.values() if isinstance(entry.get(key), (int, float))]
max_values[key] = max(values) if values else 0.0
detail_map: dict[str, dict[str, Any]] = {
entry.get("name"): entry for entry in node_details if isinstance(entry, dict)
}
pod_map: dict[str, dict[str, Any]] = {
entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)
}
output: list[dict[str, Any]] = []
for node, entry in usage.items():
detail = detail_map.get(node, {})
pressure = detail.get("pressure") if isinstance(detail.get("pressure"), dict) else {}
pressure_count = sum(1 for value in pressure.values() if value)
taints = detail.get("taints") if isinstance(detail.get("taints"), list) else []
unschedulable = bool(detail.get("unschedulable"))
pods_total = None
pod_entry = pod_map.get(node)
if isinstance(pod_entry, dict):
pods_total = pod_entry.get("pods_total")
normalized: dict[str, float] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
raw = entry.get(key)
max_val = max_values.get(key) or 0.0
if isinstance(raw, (int, float)) and max_val > 0:
normalized[f"{key}_norm"] = raw / max_val
norm_values = [v for v in normalized.values() if isinstance(v, (int, float))]
load_index = sum(norm_values) / len(norm_values) if norm_values else None
output.append(
{
"node": node,
"cpu": entry.get("cpu"),
"ram": entry.get("ram"),
"disk": entry.get("disk"),
"net": entry.get("net"),
"io": entry.get("io"),
**normalized,
"pressure_flags": pressure,
"pressure_count": pressure_count,
"taints": taints,
"unschedulable": unschedulable,
"pods_total": pods_total,
"load_index": load_index,
}
)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
return output
def _percentile(values: list[float], percentile: float) -> float | None:
if not values:
return None
ordered = sorted(values)
idx = int(round((len(ordered) - 1) * percentile))
idx = min(max(idx, 0), len(ordered) - 1)
return ordered[idx]
def _node_load_summary(node_load: list[dict[str, Any]]) -> dict[str, Any]:
items = [
entry
for entry in node_load
if isinstance(entry, dict) and isinstance(entry.get("load_index"), (int, float))
]
if not items:
return {}
values = [float(entry.get("load_index") or 0) for entry in items]
avg = sum(values) / len(values)
variance = sum((value - avg) ** 2 for value in values) / len(values)
stddev = variance**0.5
top = sorted(items, key=lambda item: -(item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
bottom = sorted(items, key=lambda item: (item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
outliers = [
item
for item in items
if isinstance(item.get("load_index"), (int, float))
and item.get("load_index") >= avg + stddev
]
outliers.sort(key=lambda item: -(item.get("load_index") or 0))
return {
"avg": round(avg, 3),
"p90": round(_percentile(values, 0.9) or 0.0, 3),
"min": round(min(values), 3),
"max": round(max(values), 3),
"top": top,
"bottom": bottom,
"outliers": outliers[:_LOAD_TOP_COUNT],
}
def _namespace_capacity_summary(capacity: list[dict[str, Any]]) -> dict[str, Any]:
if not capacity:
return {}
cpu_ratio = [
entry
for entry in capacity
if isinstance(entry, dict) and isinstance(entry.get("cpu_usage_ratio"), (int, float))
]
mem_ratio = [
entry
for entry in capacity
if isinstance(entry, dict) and isinstance(entry.get("mem_usage_ratio"), (int, float))
]
cpu_ratio.sort(key=lambda item: -(item.get("cpu_usage_ratio") or 0))
mem_ratio.sort(key=lambda item: -(item.get("mem_usage_ratio") or 0))
cpu_headroom: list[dict[str, Any]] = []
mem_headroom: list[dict[str, Any]] = []
for entry in capacity:
if not isinstance(entry, dict):
continue
cpu_used = entry.get("cpu_usage")
cpu_req = entry.get("cpu_requests")
mem_used = entry.get("mem_usage")
mem_req = entry.get("mem_requests")
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)):
cpu_headroom.append(
{
"namespace": entry.get("namespace"),
"headroom": cpu_req - cpu_used,
"usage": cpu_used,
"requests": cpu_req,
"ratio": entry.get("cpu_usage_ratio"),
}
)
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)):
mem_headroom.append(
{
"namespace": entry.get("namespace"),
"headroom": mem_req - mem_used,
"usage": mem_used,
"requests": mem_req,
"ratio": entry.get("mem_usage_ratio"),
}
)
cpu_headroom.sort(key=lambda item: (item.get("headroom") or 0))
mem_headroom.sort(key=lambda item: (item.get("headroom") or 0))
cpu_over_names = [
entry.get("namespace")
for entry in cpu_ratio
if (entry.get("cpu_usage_ratio") or 0) > 1 and entry.get("namespace")
]
mem_over_names = [
entry.get("namespace")
for entry in mem_ratio
if (entry.get("mem_usage_ratio") or 0) > 1 and entry.get("namespace")
]
over_cpu = len(cpu_over_names)
over_mem = len(mem_over_names)
return {
"cpu_ratio_top": cpu_ratio[:_NAMESPACE_TOP_COUNT],
"mem_ratio_top": mem_ratio[:_NAMESPACE_TOP_COUNT],
"cpu_headroom_low": cpu_headroom[:_NAMESPACE_TOP_COUNT],
"mem_headroom_low": mem_headroom[:_NAMESPACE_TOP_COUNT],
"cpu_overcommitted": over_cpu,
"mem_overcommitted": over_mem,
"cpu_overcommitted_names": sorted({name for name in cpu_over_names if isinstance(name, str)}),
"mem_overcommitted_names": sorted({name for name in mem_over_names if isinstance(name, str)}),
}
def _collect_vm_core(metrics: dict[str, Any], errors: list[str]) -> None:
try:
metrics["nodes_total"] = _vm_scalar("count(kube_node_info)")
metrics["nodes_ready"] = _vm_scalar(
"count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})"
)
metrics["capacity_cpu"] = _vm_scalar("sum(kube_node_status_capacity_cpu_cores)")
metrics["allocatable_cpu"] = _vm_scalar("sum(kube_node_status_allocatable_cpu_cores)")
metrics["capacity_mem_bytes"] = _vm_scalar("sum(kube_node_status_capacity_memory_bytes)")
metrics["allocatable_mem_bytes"] = _vm_scalar("sum(kube_node_status_allocatable_memory_bytes)")
metrics["capacity_pods"] = _vm_scalar("sum(kube_node_status_capacity_pods)")
metrics["allocatable_pods"] = _vm_scalar("sum(kube_node_status_allocatable_pods)")
metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})")
metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})")
metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})")
metrics["pods_succeeded"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Succeeded\"})")
metrics["top_restarts_1h"] = _vm_vector(
f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
)
metrics["restart_namespace_top"] = _filter_namespace_vector(
_vm_vector(
f"topk(5, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
)
)
metrics["pod_cpu_top"] = _filter_namespace_vector(
_vm_vector(
f'topk(5, sum by (namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))'
)
)
metrics["pod_cpu_top_node"] = _filter_namespace_vector(
_vm_vector(
f'topk(5, sum by (node,namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]) * on (namespace,pod) group_left(node) kube_pod_info))'
)
)
metrics["pod_mem_top"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"}))"
)
)
metrics["pod_mem_top_node"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (node,namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"} * on (namespace,pod) group_left(node) kube_pod_info))"
)
)
metrics["job_failures_24h"] = _vm_vector(
"topk(5, sum by (namespace,job_name) (increase(kube_job_status_failed[24h])))"
)
except Exception as exc:
errors.append(f"vm: {exc}")
__all__ = [name for name in globals() if not name.startswith("__")]