diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 0f16e44..8d2037f 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -18,6 +18,12 @@ logger = get_logger(__name__) _VALUE_PAIR_LEN = 2 _RATE_WINDOW = "5m" _RESTARTS_WINDOW = "1h" +_BASELINE_WINDOW = "24h" +_NODE_DISK_ALERT = 80.0 +_NODE_CPU_ALERT = 80.0 +_NODE_RAM_ALERT = 80.0 +_NET_SPIKE_MULTIPLIER = 2.0 +_IO_SPIKE_MULTIPLIER = 2.0 _NODE_UNAME_LABEL = 'node_uname_info{nodename!=""}' _WORKLOAD_LABEL_KEYS = ( "app.kubernetes.io/name", @@ -1308,6 +1314,78 @@ def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]: return output +def _vm_baseline_map(expr: str, label_key: str, window: str) -> dict[str, dict[str, float]]: + averages = _vm_vector(f"avg_over_time(({expr})[{window}])") + maximums = _vm_vector(f"max_over_time(({expr})[{window}])") + baseline: dict[str, dict[str, float]] = {} + for item in averages: + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + label = metric.get(label_key) + if not isinstance(label, str) or not label: + continue + baseline.setdefault(label, {})["avg"] = float(item.get("value") or 0) + for item in maximums: + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + label = metric.get(label_key) + if not isinstance(label, str) or not label: + continue + baseline.setdefault(label, {})["max"] = float(item.get("value") or 0) + return baseline + + +def _baseline_map_to_list( + baseline: dict[str, dict[str, float]], + name_key: str, +) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for name, stats in baseline.items(): + if not isinstance(name, str) or not name: + continue + output.append( + { + name_key: name, + "avg": stats.get("avg"), + "max": stats.get("max"), + } + ) + output.sort(key=lambda item: (-(item.get("avg") or 0), item.get(name_key) or "")) + return output + + +def _node_usage_exprs() -> dict[str, str]: + return { + "cpu": ( + f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))' + ), + "ram": ( + 'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) ' + '/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))' + ), + "net": ( + f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) ' + f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) ' + 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))' + ), + "io": ( + f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) ' + '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))' + ), + "disk": ( + 'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} ' + '/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) ' + 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))' + ), + } + + +def _namespace_usage_exprs() -> dict[str, str]: + return { + "cpu": f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))', + "mem": 'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})', + } + + def _postgres_connections(errors: list[str]) -> dict[str, Any]: postgres: dict[str, Any] = {} try: @@ -1356,33 +1434,12 @@ def _hottest_nodes(errors: list[str]) -> dict[str, Any]: def _node_usage(errors: list[str]) -> dict[str, Any]: usage: dict[str, Any] = {} try: - usage["cpu"] = _vm_node_metric( - f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) ' - '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', - "node", - ) - usage["ram"] = _vm_node_metric( - 'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) ' - '/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', - "node", - ) - usage["net"] = _vm_node_metric( - f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) ' - f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) ' - 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', - "node", - ) - usage["io"] = _vm_node_metric( - f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) ' - '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', - "node", - ) - usage["disk"] = _vm_node_metric( - 'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} ' - '/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) ' - 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', - "node", - ) + exprs = _node_usage_exprs() + usage["cpu"] = _vm_node_metric(exprs["cpu"], "node") + usage["ram"] = _vm_node_metric(exprs["ram"], "node") + usage["net"] = _vm_node_metric(exprs["net"], "node") + usage["io"] = _vm_node_metric(exprs["io"], "node") + usage["disk"] = _vm_node_metric(exprs["disk"], "node") except Exception as exc: errors.append(f"node_usage: {exc}") return usage @@ -1651,8 +1708,7 @@ def _namespace_capacity_summary(capacity: list[dict[str, Any]]) -> dict[str, Any } -def _summarize_metrics(errors: list[str]) -> dict[str, Any]: - metrics: dict[str, Any] = {} +def _collect_vm_core(metrics: dict[str, Any], errors: list[str]) -> None: try: metrics["nodes_total"] = _vm_scalar("count(kube_node_info)") metrics["nodes_ready"] = _vm_scalar( @@ -1701,6 +1757,9 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: ) except Exception as exc: errors.append(f"vm: {exc}") + + +def _collect_node_metrics(metrics: dict[str, Any], errors: list[str]) -> None: metrics["postgres_connections"] = _postgres_connections(errors) metrics["hottest_nodes"] = _hottest_nodes(errors) metrics["node_usage"] = _node_usage(errors) @@ -1711,6 +1770,20 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: "io": _usage_stats(metrics.get("node_usage", {}).get("io", [])), "disk": _usage_stats(metrics.get("node_usage", {}).get("disk", [])), } + try: + node_exprs = _node_usage_exprs() + node_baseline_map: dict[str, dict[str, dict[str, float]]] = {} + for key, expr in node_exprs.items(): + baseline = _vm_baseline_map(expr, "node", _BASELINE_WINDOW) + metrics.setdefault("node_baseline", {})[key] = _baseline_map_to_list(baseline, "node") + for name, stats in baseline.items(): + node_baseline_map.setdefault(name, {})[key] = stats + metrics["node_baseline_map"] = node_baseline_map + except Exception as exc: + errors.append(f"baseline: {exc}") + + +def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None: try: metrics["namespace_cpu_top"] = _filter_namespace_vector( _vm_vector( @@ -1760,18 +1833,31 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: namespace_mem_usage, namespace_mem_requests, ) + metrics["namespace_totals"] = { + "cpu": _namespace_totals_list(namespace_cpu_usage), + "mem": _namespace_totals_list(namespace_mem_usage), + "cpu_requests": _namespace_totals_list(namespace_cpu_requests), + "mem_requests": _namespace_totals_list(namespace_mem_requests), + } except Exception as exc: errors.append(f"namespace_usage: {exc}") + try: + namespace_exprs = _namespace_usage_exprs() + namespace_baseline_map: dict[str, dict[str, dict[str, float]]] = {} + for key, expr in namespace_exprs.items(): + baseline = _vm_baseline_map(expr, "namespace", _BASELINE_WINDOW) + metrics.setdefault("namespace_baseline", {})[key] = _baseline_map_to_list(baseline, "namespace") + for name, stats in baseline.items(): + namespace_baseline_map.setdefault(name, {})[key] = stats + metrics["namespace_baseline_map"] = namespace_baseline_map + except Exception as exc: + errors.append(f"baseline: {exc}") metrics["namespace_capacity_summary"] = _namespace_capacity_summary( metrics.get("namespace_capacity", []), ) - metrics["namespace_totals"] = { - "cpu": _namespace_totals_list(namespace_cpu_usage), - "mem": _namespace_totals_list(namespace_mem_usage), - "cpu_requests": _namespace_totals_list(namespace_cpu_requests), - "mem_requests": _namespace_totals_list(namespace_mem_requests), - } - metrics["pvc_usage_top"] = _pvc_usage(errors) + + +def _finalize_metrics(metrics: dict[str, Any]) -> None: metrics["units"] = { "cpu": "percent", "ram": "percent", @@ -1802,6 +1888,15 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: "rates": _RATE_WINDOW, "restarts": _RESTARTS_WINDOW, } + + +def _summarize_metrics(errors: list[str]) -> dict[str, Any]: + metrics: dict[str, Any] = {} + _collect_vm_core(metrics, errors) + _collect_node_metrics(metrics, errors) + _collect_namespace_metrics(metrics, errors) + metrics["pvc_usage_top"] = _pvc_usage(errors) + _finalize_metrics(metrics) return metrics @@ -1897,6 +1992,7 @@ def _namespace_context( def _node_context( node_details: list[dict[str, Any]], node_load: list[dict[str, Any]], + node_baseline: dict[str, dict[str, dict[str, float]]], ) -> list[dict[str, Any]]: load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)} output: list[dict[str, Any]] = [] @@ -1907,6 +2003,7 @@ def _node_context( if not isinstance(name, str) or not name: continue load_entry = load_map.get(name, {}) + baseline = node_baseline.get(name, {}) if isinstance(node_baseline, dict) else {} output.append( { "node": name, @@ -1926,6 +2023,7 @@ def _node_context( "net": load_entry.get("net"), "io": load_entry.get("io"), "load_index": load_entry.get("load_index"), + "baseline": baseline, } ) output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) @@ -2114,6 +2212,161 @@ def _health_bullets( return bullets[:4] +def _workload_not_ready_items(workloads_health: dict[str, Any]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for key in ("deployments", "statefulsets", "daemonsets"): + entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {} + for item in entry.get("items") or []: + if not isinstance(item, dict): + continue + output.append( + { + "kind": key[:-1], + "namespace": item.get("namespace") or "", + "name": item.get("name") or "", + "desired": item.get("desired"), + "ready": item.get("ready"), + } + ) + output.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or "")) + return output + + +def _pod_restarts_top(metrics: dict[str, Any]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for item in metrics.get("top_restarts_1h") or []: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + namespace = metric.get("namespace") + pod = metric.get("pod") + if not isinstance(namespace, str) or not isinstance(pod, str): + continue + output.append( + { + "namespace": namespace, + "pod": pod, + "value": item.get("value"), + } + ) + output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "")) + return output[:5] + + +def _node_attention_score(node: dict[str, Any]) -> tuple[float, list[str]]: + score = 0.0 + reasons: list[str] = [] + disk = node.get("disk") + if isinstance(disk, (int, float)) and disk >= _NODE_DISK_ALERT: + score += 3 + (disk - _NODE_DISK_ALERT) / 10 + reasons.append(f"disk {disk:.1f}%") + cpu = node.get("cpu") + if isinstance(cpu, (int, float)) and cpu >= _NODE_CPU_ALERT: + score += 2 + (cpu - _NODE_CPU_ALERT) / 20 + reasons.append(f"cpu {cpu:.1f}%") + ram = node.get("ram") + if isinstance(ram, (int, float)) and ram >= _NODE_RAM_ALERT: + score += 2 + (ram - _NODE_RAM_ALERT) / 20 + reasons.append(f"ram {ram:.1f}%") + baseline = node.get("baseline") if isinstance(node.get("baseline"), dict) else {} + for key, label, multiplier in (("net", "net", _NET_SPIKE_MULTIPLIER), ("io", "io", _IO_SPIKE_MULTIPLIER)): + current = node.get(key) + base = baseline.get(key) if isinstance(baseline.get(key), dict) else {} + base_max = base.get("max") + if isinstance(current, (int, float)) and isinstance(base_max, (int, float)) and base_max > 0: + if current > base_max * multiplier: + score += 1.5 + reasons.append(f"{label} {current:.2f} > {multiplier:.1f}x baseline") + pressure = node.get("pressure_flags") if isinstance(node.get("pressure_flags"), list) else [] + if pressure: + score += 2 + reasons.append("pressure flags") + return score, reasons + + +def _node_attention_entries(node_context: list[dict[str, Any]]) -> list[dict[str, Any]]: + entries: list[dict[str, Any]] = [] + for node in node_context: + if not isinstance(node, dict): + continue + name = node.get("node") + if not isinstance(name, str) or not name: + continue + score, reasons = _node_attention_score(node) + if score > 0: + entries.append( + { + "kind": "node", + "target": name, + "score": round(score, 2), + "reasons": reasons, + } + ) + return entries + + +def _pvc_attention_entries(metrics: dict[str, Any]) -> list[dict[str, Any]]: + entries: list[dict[str, Any]] = [] + for item in _pvc_pressure_entries(metrics): + if not isinstance(item, dict): + continue + used = float(item.get("used_percent") or 0) + entries.append( + { + "kind": "pvc", + "target": f"{item.get('namespace')}/{item.get('pvc')}", + "score": round(1 + (used - _PVC_PRESSURE_THRESHOLD) / 10, 2), + "reasons": [f"usage {used:.1f}%"], + } + ) + return entries + + +def _pod_attention_entries(pod_issues: dict[str, Any]) -> list[dict[str, Any]]: + entries: list[dict[str, Any]] = [] + pending = pod_issues.get("pending_over_15m") or 0 + if pending: + entries.append( + { + "kind": "pods", + "target": "pending", + "score": float(pending), + "reasons": [f"{int(pending)} pending >15m"], + } + ) + return entries + + +def _workload_attention_entries(workloads_health: dict[str, Any]) -> list[dict[str, Any]]: + entries: list[dict[str, Any]] = [] + for item in _workload_not_ready_items(workloads_health)[:5]: + entries.append( + { + "kind": "workload", + "target": f"{item.get('namespace')}/{item.get('name')}", + "score": 2.0, + "reasons": [f"{item.get('ready')}/{item.get('desired')} ready"], + } + ) + return entries + + +def _build_attention_ranked( + metrics: dict[str, Any], + node_context: list[dict[str, Any]], + pod_issues: dict[str, Any], + workloads_health: dict[str, Any], +) -> list[dict[str, Any]]: + entries = ( + _node_attention_entries(node_context) + + _pvc_attention_entries(metrics) + + _pod_attention_entries(pod_issues) + + _workload_attention_entries(workloads_health) + ) + entries.sort(key=lambda item: (-(item.get("score") or 0), item.get("kind") or "", item.get("target") or "")) + return entries[:5] + + def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: errors: list[str] = [] collected_at = datetime.now(timezone.utc) @@ -2149,10 +2402,15 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: namespace_nodes, metrics.get("namespace_capacity", []), ) - node_context = _node_context(node_details, metrics.get("node_load", [])) + node_context = _node_context( + node_details, + metrics.get("node_load", []), + metrics.get("node_baseline_map", {}), + ) summary = { "generated_at": collected_at.isoformat(), "windows": metrics.get("windows", {}), + "baseline_window": _BASELINE_WINDOW, "counts": { "nodes_total": metrics.get("nodes_total"), "nodes_ready": metrics.get("nodes_ready"), @@ -2170,7 +2428,10 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "node_hottest": metrics.get("hottest_nodes", {}), "postgres": metrics.get("postgres_connections", {}), "pvc_usage": _pvc_top(metrics.get("pvc_usage_top", [])), + "workload_not_ready": _workload_not_ready_items(workload_health)[:5], + "pod_restarts": _pod_restarts_top(metrics), }, + "attention_ranked": _build_attention_ranked(metrics, node_context, pod_issues, workload_health), "anomalies": anomalies, "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), "unknowns": errors, diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index 8ba5e3f..7699b0d 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -147,10 +147,15 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["summary"]["counts"]["nodes_ready"] == 5.0 assert snapshot["summary"]["counts"]["pods_running"] == 5.0 assert snapshot["summary"]["top"]["namespace_pods"][0]["namespace"] == "media" + assert snapshot["summary"]["baseline_window"] + assert "workload_not_ready" in snapshot["summary"]["top"] + assert "pod_restarts" in snapshot["summary"]["top"] + assert "attention_ranked" in snapshot["summary"] assert snapshot["summary"]["health_bullets"] assert snapshot["summary"]["unknowns"] == [] assert snapshot["context"]["nodes"] assert snapshot["context"]["namespaces"] + assert "baseline" in snapshot["context"]["nodes"][0] assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0