diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 27b7d39..967e242 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -19,6 +19,12 @@ _VALUE_PAIR_LEN = 2 _RATE_WINDOW = "5m" _RESTARTS_WINDOW = "1h" _BASELINE_WINDOW = "24h" +_TREND_WINDOWS = ("1h", "6h", "24h") +_TREND_NODE_LIMIT = 30 +_TREND_NAMESPACE_LIMIT = 20 +_TREND_PVC_LIMIT = 10 +_TREND_JOB_LIMIT = 10 +_TREND_POD_LIMIT = 15 _NODE_DISK_ALERT = 80.0 _NODE_CPU_ALERT = 80.0 _NODE_RAM_ALERT = 80.0 @@ -354,6 +360,37 @@ def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]: return summary +def _hardware_groups(details: list[dict[str, Any]]) -> list[dict[str, Any]]: + groups: dict[str, list[str]] = {} + for node in details: + if not isinstance(node, dict): + continue + name = node.get("name") + if not isinstance(name, str) or not name: + continue + hardware = str(node.get("hardware") or "unknown") + groups.setdefault(hardware, []).append(name) + output: list[dict[str, Any]] = [] + for hardware, nodes in groups.items(): + nodes.sort() + output.append({"hardware": hardware, "count": len(nodes), "nodes": nodes}) + output.sort(key=lambda item: (-(item.get("count") or 0), item.get("hardware") or "")) + return output + + +def _pressure_summary(nodes_summary: dict[str, Any]) -> dict[str, Any]: + pressure_nodes = nodes_summary.get("pressure_nodes") if isinstance(nodes_summary, dict) else {} + summary: dict[str, Any] = {"by_type": {}, "total": 0} + if isinstance(pressure_nodes, dict): + for cond, names in pressure_nodes.items(): + count = len(names) if isinstance(names, list) else 0 + summary["by_type"][cond] = count + summary["total"] += count + unschedulable = nodes_summary.get("unschedulable_nodes") or [] + summary["unschedulable"] = len(unschedulable) if isinstance(unschedulable, list) else 0 + return summary + + def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str: name = node.get("name") if isinstance(node, dict) else "" if not isinstance(name, str) or not name: @@ -1371,6 +1408,61 @@ def _baseline_map_to_list( return output +def _limit_entries(entries: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]: + if limit <= 0: + return [] + return entries[:limit] + + +def _vm_window_series( + expr: str, + label_key: str, + name_key: str, + window: str, +) -> dict[str, list[dict[str, Any]]]: + avg = _vector_to_named( + _vm_vector(f"avg_over_time(({expr})[{window}])"), + label_key, + name_key, + ) + max_values = _vector_to_named( + _vm_vector(f"max_over_time(({expr})[{window}])"), + label_key, + name_key, + ) + p95 = _vector_to_named( + _vm_vector(f"quantile_over_time(0.95, ({expr})[{window}])"), + label_key, + name_key, + ) + return {"avg": avg, "max": max_values, "p95": p95} + + +def _trim_window_series(series: dict[str, list[dict[str, Any]]], limit: int) -> dict[str, list[dict[str, Any]]]: + return {key: _limit_entries(entries, limit) for key, entries in series.items()} + + +def _build_metric_trends( + exprs: dict[str, str], + label_key: str, + name_key: str, + windows: tuple[str, ...], + limit: int, +) -> dict[str, dict[str, dict[str, list[dict[str, Any]]]]]: + trends: dict[str, dict[str, dict[str, list[dict[str, Any]]]]] = {} + for metric, expr in exprs.items(): + metric_trends: dict[str, dict[str, list[dict[str, Any]]]] = {} + for window in windows: + series = _vm_window_series(expr, label_key, name_key, window) + metric_trends[window] = _trim_window_series(series, limit) + trends[metric] = metric_trends + return trends + + +def _vm_scalar_window(expr: str, window: str, fn: str) -> float | None: + return _vm_scalar(f"{fn}(({expr})[{window}])") + + def _node_usage_exprs() -> dict[str, str]: return { "cpu": ( @@ -1405,6 +1497,74 @@ def _namespace_usage_exprs() -> dict[str, str]: } +def _namespace_request_exprs() -> dict[str, str]: + return { + "cpu_requests": "sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)", + "mem_requests": "sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)", + } + + +def _restart_namespace_trend(window: str) -> list[dict[str, Any]]: + entries = _vm_vector( + f"topk({_TREND_NAMESPACE_LIMIT}, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{window}])))" + ) + entries = _filter_namespace_vector(entries) + return _vector_to_named(entries, "namespace", "namespace") + + +def _job_failure_trend(window: str) -> list[dict[str, Any]]: + entries = _vm_vector( + f"topk({_TREND_JOB_LIMIT}, sum by (namespace,job_name) (increase(kube_job_status_failed[{window}])))" + ) + output: list[dict[str, Any]] = [] + for item in entries: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + namespace = metric.get("namespace") + job = metric.get("job_name") + if not isinstance(namespace, str) or not isinstance(job, str): + continue + output.append( + { + "namespace": namespace, + "job": job, + "value": item.get("value"), + } + ) + output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("job") or "")) + return output + + +def _pods_phase_trends() -> dict[str, dict[str, dict[str, float | None]]]: + phases = { + "running": "sum(kube_pod_status_phase{phase=\"Running\"})", + "pending": "sum(kube_pod_status_phase{phase=\"Pending\"})", + "failed": "sum(kube_pod_status_phase{phase=\"Failed\"})", + } + trends: dict[str, dict[str, dict[str, float | None]]] = {} + for window in _TREND_WINDOWS: + window_entry: dict[str, dict[str, float | None]] = {} + for name, expr in phases.items(): + window_entry[name] = { + "avg": _vm_scalar_window(expr, window, "avg_over_time"), + "max": _vm_scalar_window(expr, window, "max_over_time"), + } + trends[window] = window_entry + return trends + + +def _pvc_usage_trends() -> dict[str, list[dict[str, Any]]]: + trends: dict[str, list[dict[str, Any]]] = {} + expr = "kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100" + for window in _TREND_WINDOWS: + entries = _vm_vector( + f"topk({_TREND_PVC_LIMIT}, max_over_time(({expr})[{window}]))" + ) + trends[window] = _pvc_top(entries) + return trends + + def _postgres_connections(errors: list[str]) -> dict[str, Any]: postgres: dict[str, Any] = {} try: @@ -1802,6 +1962,41 @@ def _collect_node_metrics(metrics: dict[str, Any], errors: list[str]) -> None: errors.append(f"baseline: {exc}") +def _collect_trend_metrics(metrics: dict[str, Any], errors: list[str]) -> None: + try: + metrics["node_trends"] = _build_metric_trends( + _node_usage_exprs(), + "node", + "node", + _TREND_WINDOWS, + _TREND_NODE_LIMIT, + ) + metrics["namespace_trends"] = _build_metric_trends( + _namespace_usage_exprs(), + "namespace", + "namespace", + _TREND_WINDOWS, + _TREND_NAMESPACE_LIMIT, + ) + metrics["namespace_request_trends"] = _build_metric_trends( + _namespace_request_exprs(), + "namespace", + "namespace", + _TREND_WINDOWS, + _TREND_NAMESPACE_LIMIT, + ) + metrics["restart_trends"] = { + window: _restart_namespace_trend(window) for window in _TREND_WINDOWS + } + metrics["job_failure_trends"] = { + window: _job_failure_trend(window) for window in _TREND_WINDOWS + } + metrics["pods_phase_trends"] = _pods_phase_trends() + metrics["pvc_usage_trends"] = _pvc_usage_trends() + except Exception as exc: + errors.append(f"trends: {exc}") + + def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None: try: metrics["namespace_cpu_top"] = _filter_namespace_vector( @@ -1906,6 +2101,7 @@ def _finalize_metrics(metrics: dict[str, Any]) -> None: metrics["windows"] = { "rates": _RATE_WINDOW, "restarts": _RESTARTS_WINDOW, + "trend": _TREND_WINDOWS, } @@ -1913,12 +2109,43 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics: dict[str, Any] = {} _collect_vm_core(metrics, errors) _collect_node_metrics(metrics, errors) + _collect_trend_metrics(metrics, errors) _collect_namespace_metrics(metrics, errors) metrics["pvc_usage_top"] = _pvc_usage(errors) + metrics["trend_summary"] = _trend_summary(metrics) _finalize_metrics(metrics) return metrics +def _trend_summary(metrics: dict[str, Any]) -> dict[str, Any]: + node_trends = metrics.get("node_trends", {}) if isinstance(metrics.get("node_trends"), dict) else {} + namespace_trends = ( + metrics.get("namespace_trends", {}) if isinstance(metrics.get("namespace_trends"), dict) else {} + ) + restarts = metrics.get("restart_trends", {}) if isinstance(metrics.get("restart_trends"), dict) else {} + job_failures = ( + metrics.get("job_failure_trends", {}) if isinstance(metrics.get("job_failure_trends"), dict) else {} + ) + summary: dict[str, Any] = {} + for metric_key, target in (("cpu", "node_cpu"), ("ram", "node_ram")): + metric_block = node_trends.get(metric_key, {}) if isinstance(node_trends.get(metric_key), dict) else {} + summary[target] = { + window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5) + for window in _TREND_WINDOWS + } + for metric_key, target in (("cpu", "namespace_cpu"), ("mem", "namespace_mem")): + metric_block = namespace_trends.get(metric_key, {}) if isinstance(namespace_trends.get(metric_key), dict) else {} + summary[target] = { + window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5) + for window in _TREND_WINDOWS + } + summary["restarts"] = {window: _limit_entries(entries or [], 5) for window, entries in restarts.items()} + summary["job_failures"] = { + window: _limit_entries(entries or [], 5) for window, entries in job_failures.items() + } + return summary + + def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]: entries = [ {"namespace": name, "value": value} @@ -2849,6 +3076,9 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "restarts": _vector_to_named(metrics.get("restart_namespace_top", []), "namespace", "namespace"), } + hardware_groups = _hardware_groups(node_details) + pressure_summary = _pressure_summary(node_summary) + anomalies = _build_anomalies(metrics, node_summary, workload_health, kustomizations, events) namespace_context = _namespace_context( namespace_pods, @@ -2878,7 +3108,10 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "generated_at": collected_at.isoformat(), "windows": metrics.get("windows", {}), "baseline_window": _BASELINE_WINDOW, - "inventory": node_summary, + "inventory": { + **(node_summary or {}), + "hardware_groups": hardware_groups, + }, "counts": { "nodes_total": metrics.get("nodes_total"), "nodes_ready": metrics.get("nodes_ready"), @@ -2887,6 +3120,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "pods_failed": metrics.get("pods_failed"), "pods_succeeded": metrics.get("pods_succeeded"), }, + "pressure_summary": pressure_summary, + "trend_summary": metrics.get("trend_summary"), "top": { "namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5], "namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5],