330 lines
13 KiB
Python

from __future__ import annotations
from .common import *
from .nodes import *
from .k8s import *
from .pods import *
from .vm import *
def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in _vm_vector(expr):
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
value = item.get("value")
if isinstance(label, str) and label:
output.append({"node": label, "value": value})
output.sort(key=lambda item: item.get("node") or "")
return output
def _vm_baseline_map(expr: str, label_key: str, window: str) -> dict[str, dict[str, float]]:
averages = _vm_vector(f"avg_over_time(({expr})[{window}])")
maximums = _vm_vector(f"max_over_time(({expr})[{window}])")
baseline: dict[str, dict[str, float]] = {}
for item in averages:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
if not isinstance(label, str) or not label:
continue
baseline.setdefault(label, {})["avg"] = float(item.get("value") or 0)
for item in maximums:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
if not isinstance(label, str) or not label:
continue
baseline.setdefault(label, {})["max"] = float(item.get("value") or 0)
return baseline
def _baseline_map_to_list(
baseline: dict[str, dict[str, float]],
name_key: str,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for name, stats in baseline.items():
if not isinstance(name, str) or not name:
continue
output.append(
{
name_key: name,
"avg": stats.get("avg"),
"max": stats.get("max"),
}
)
output.sort(key=lambda item: (-(item.get("avg") or 0), item.get(name_key) or ""))
return output
def _limit_entries(entries: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]:
if limit <= 0:
return []
return entries[:limit]
def _vm_window_series(
expr: str,
label_key: str,
name_key: str,
window: str,
) -> dict[str, list[dict[str, Any]]]:
avg = _vector_to_named(
_vm_vector(f"avg_over_time(({expr})[{window}])"),
label_key,
name_key,
)
max_values = _vector_to_named(
_vm_vector(f"max_over_time(({expr})[{window}])"),
label_key,
name_key,
)
p95 = _vector_to_named(
_vm_vector(f"quantile_over_time(0.95, ({expr})[{window}])"),
label_key,
name_key,
)
return {"avg": avg, "max": max_values, "p95": p95}
def _trim_window_series(series: dict[str, list[dict[str, Any]]], limit: int) -> dict[str, list[dict[str, Any]]]:
return {key: _limit_entries(entries, limit) for key, entries in series.items()}
def _build_metric_trends(
exprs: dict[str, str],
label_key: str,
name_key: str,
windows: tuple[str, ...],
limit: int,
) -> dict[str, dict[str, dict[str, list[dict[str, Any]]]]]:
trends: dict[str, dict[str, dict[str, list[dict[str, Any]]]]] = {}
for metric, expr in exprs.items():
metric_trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for window in windows:
series = _vm_window_series(expr, label_key, name_key, window)
metric_trends[window] = _trim_window_series(series, limit)
trends[metric] = metric_trends
return trends
def _vm_scalar_window(expr: str, window: str, fn: str) -> float | None:
return _vm_scalar(f"{fn}(({expr})[{window}])")
def _scalar_trends(expr: str, windows: tuple[str, ...]) -> dict[str, dict[str, float | None]]:
return {
window: {
"avg": _vm_scalar_window(expr, window, "avg_over_time"),
"min": _vm_scalar_window(expr, window, "min_over_time"),
"max": _vm_scalar_window(expr, window, "max_over_time"),
}
for window in windows
}
def _cluster_trends() -> dict[str, dict[str, dict[str, float | None]]]:
exprs = {
"nodes_ready": 'sum(kube_node_status_condition{condition="Ready",status="true"})',
"nodes_not_ready": 'sum(kube_node_status_condition{condition="Ready",status="false"})',
"pods_running": 'sum(kube_pod_status_phase{phase="Running"})',
"pods_pending": 'sum(kube_pod_status_phase{phase="Pending"})',
"pods_failed": 'sum(kube_pod_status_phase{phase="Failed"})',
"pods_succeeded": 'sum(kube_pod_status_phase{phase="Succeeded"})',
"alerts_firing": 'sum(ALERTS{alertstate="firing"})',
"cpu_usage": f'sum(rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))',
"mem_usage": 'sum(container_memory_working_set_bytes{namespace!=""})',
"net_io": (
f'sum(rate(container_network_receive_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]) '
f'+ rate(container_network_transmit_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]))'
),
"fs_io": (
f'sum(rate(container_fs_reads_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]) '
f'+ rate(container_fs_writes_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]))'
),
}
return {key: _scalar_trends(expr, _TREND_WINDOWS) for key, expr in exprs.items()}
def _node_condition_trends() -> dict[str, dict[str, dict[str, float | None]]]:
conditions = {
"ready": 'sum(kube_node_status_condition{condition="Ready",status="true"})',
"not_ready": 'sum(kube_node_status_condition{condition="Ready",status="false"})',
"unschedulable": "sum(kube_node_spec_unschedulable)",
}
for cond in _PRESSURE_TYPES:
conditions[cond.lower()] = (
f'sum(kube_node_status_condition{{condition="{cond}",status="true"}})'
)
return {key: _scalar_trends(expr, _TREND_WINDOWS) for key, expr in conditions.items()}
def _pod_reason_totals(
reasons: dict[str, str],
series: str,
) -> dict[str, dict[str, dict[str, float | None]]]:
totals: dict[str, dict[str, dict[str, float | None]]] = {}
for key, reason in reasons.items():
expr = f'sum({series}{{reason="{reason}"}})'
totals[key] = _scalar_trends(expr, _TREND_WINDOWS)
return totals
def _node_usage_exprs() -> dict[str, str]:
return {
"cpu": (
f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"ram": (
'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"net": (
f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"io": (
f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"disk": (
'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} '
'/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
}
def _namespace_usage_exprs() -> dict[str, str]:
return {
"cpu": f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))',
"mem": 'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})',
}
def _namespace_request_exprs() -> dict[str, str]:
return {
"cpu_requests": "sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)",
"mem_requests": "sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)",
}
def _restart_namespace_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_NAMESPACE_LIMIT}, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{window}])))"
)
entries = _filter_namespace_vector(entries)
return _vector_to_named(entries, "namespace", "namespace")
def _job_failure_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_JOB_LIMIT}, sum by (namespace,job_name) (increase(kube_job_status_failed[{window}])))"
)
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
job = metric.get("job_name")
if not isinstance(namespace, str) or not isinstance(job, str):
continue
output.append(
{
"namespace": namespace,
"job": job,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("job") or ""))
return output
def _pod_reason_entries(expr: str, limit: int) -> list[dict[str, Any]]:
entries = _vm_vector(f"topk({limit}, sum by (namespace,pod) ({expr}))")
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
if not isinstance(namespace, str) or not isinstance(pod, str):
continue
output.append(
{
"namespace": namespace,
"pod": pod,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("pod") or ""))
return output
def _namespace_reason_entries(expr: str, limit: int) -> list[dict[str, Any]]:
entries = _vm_vector(f"topk({limit}, sum by (namespace) ({expr}))")
entries = _filter_namespace_vector(entries)
return _vector_to_named(entries, "namespace", "namespace")
def _pod_waiting_now() -> dict[str, list[dict[str, Any]]]:
output: dict[str, list[dict[str, Any]]] = {}
for key, reason in _POD_WAITING_REASONS.items():
expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}'
output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT)
return output
def _pod_waiting_trends() -> dict[str, dict[str, list[dict[str, Any]]]]:
trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for key, reason in _POD_WAITING_REASONS.items():
expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}'
trends[key] = {
window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT)
for window in _TREND_WINDOWS
}
return trends
def _pod_terminated_now() -> dict[str, list[dict[str, Any]]]:
output: dict[str, list[dict[str, Any]]] = {}
for key, reason in _POD_TERMINATED_REASONS.items():
expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}'
output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT)
return output
def _pod_terminated_trends() -> dict[str, dict[str, list[dict[str, Any]]]]:
trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for key, reason in _POD_TERMINATED_REASONS.items():
expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}'
trends[key] = {
window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT)
for window in _TREND_WINDOWS
}
return trends
def _pods_phase_trends() -> dict[str, dict[str, dict[str, float | None]]]:
phases = {
"running": "sum(kube_pod_status_phase{phase=\"Running\"})",
"pending": "sum(kube_pod_status_phase{phase=\"Pending\"})",
"failed": "sum(kube_pod_status_phase{phase=\"Failed\"})",
}
trends: dict[str, dict[str, dict[str, float | None]]] = {}
for window in _TREND_WINDOWS:
window_entry: dict[str, dict[str, float | None]] = {}
for name, expr in phases.items():
window_entry[name] = {
"avg": _vm_scalar_window(expr, window, "avg_over_time"),
"max": _vm_scalar_window(expr, window, "max_over_time"),
}
trends[window] = window_entry
return trends
__all__ = [name for name in globals() if not name.startswith("__")]