From 0ad5f2afae9d55d087fa46ebf150e926725d04bc Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 30 Jan 2026 16:51:31 -0300 Subject: [PATCH] cluster_state: add signals and profiles --- ariadne/services/cluster_state.py | 297 ++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 32607ee..5ac52d0 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -48,6 +48,10 @@ _SYSTEM_NAMESPACES = { _WORKLOAD_ALLOWED_NAMESPACES = { "maintenance", } +_BASELINE_DELTA_WARN = 50.0 +_BASELINE_DELTA_CRIT = 100.0 +_SIGNAL_LIMIT = 15 +_PROFILE_LIMIT = 6 _CAPACITY_KEYS = { "cpu", "memory", @@ -2089,6 +2093,287 @@ def _baseline_delta(current: Any, stats: dict[str, Any]) -> float | None: return round(((float(current) - float(avg)) / float(avg)) * 100, 2) +def _delta_severity(delta: float) -> str: + magnitude = abs(delta) + if magnitude >= _BASELINE_DELTA_CRIT: + return "critical" + if magnitude >= _BASELINE_DELTA_WARN: + return "warning" + return "info" + + +def _delta_hit(delta: Any) -> bool: + if not isinstance(delta, (int, float)): + return False + return abs(float(delta)) >= _BASELINE_DELTA_WARN + + +def _node_delta_signals(node_context: list[dict[str, Any]]) -> list[dict[str, Any]]: + signals: list[dict[str, Any]] = [] + for entry in node_context: + if not isinstance(entry, dict): + continue + node = entry.get("node") + deltas = entry.get("baseline_delta") if isinstance(entry.get("baseline_delta"), dict) else {} + baseline = entry.get("baseline") if isinstance(entry.get("baseline"), dict) else {} + if not isinstance(node, str) or not node: + continue + for metric in ("cpu", "ram", "net", "io", "disk"): + delta = deltas.get(metric) + if not _delta_hit(delta): + continue + avg = baseline.get(metric, {}).get("avg") if isinstance(baseline.get(metric), dict) else None + signals.append( + { + "scope": "node", + "target": node, + "metric": metric, + "current": entry.get(metric), + "baseline_avg": avg, + "delta_pct": delta, + "severity": _delta_severity(float(delta)), + } + ) + return signals + + +def _namespace_delta_signals(namespace_context: list[dict[str, Any]]) -> list[dict[str, Any]]: + signals: list[dict[str, Any]] = [] + for entry in namespace_context: + if not isinstance(entry, dict): + continue + namespace = entry.get("namespace") + deltas = entry.get("baseline_delta") if isinstance(entry.get("baseline_delta"), dict) else {} + baseline = entry.get("baseline") if isinstance(entry.get("baseline"), dict) else {} + if not isinstance(namespace, str) or not namespace: + continue + for metric, current_key in (("cpu", "cpu_usage"), ("mem", "mem_usage")): + delta = deltas.get(metric) + if not _delta_hit(delta): + continue + avg = baseline.get(metric, {}).get("avg") if isinstance(baseline.get(metric), dict) else None + signals.append( + { + "scope": "namespace", + "target": namespace, + "metric": metric, + "current": entry.get(current_key), + "baseline_avg": avg, + "delta_pct": delta, + "severity": _delta_severity(float(delta)), + } + ) + return signals + + +def _kustomization_signals(kustomizations: dict[str, Any]) -> list[dict[str, Any]]: + count = int(kustomizations.get("not_ready") or 0) if isinstance(kustomizations, dict) else 0 + if count <= 0: + return [] + return [ + { + "scope": "flux", + "target": "kustomizations", + "metric": "not_ready", + "current": count, + "severity": "warning", + } + ] + + +def _pod_issue_signals(pod_issues: dict[str, Any]) -> list[dict[str, Any]]: + if not isinstance(pod_issues, dict): + return [] + signals: list[dict[str, Any]] = [] + pending_over = int(pod_issues.get("pending_over_15m") or 0) + if pending_over > 0: + signals.append( + { + "scope": "pods", + "target": "pending_over_15m", + "metric": "count", + "current": pending_over, + "severity": "warning", + } + ) + counts = pod_issues.get("counts") if isinstance(pod_issues.get("counts"), dict) else {} + failed = int(counts.get("Failed") or 0) if isinstance(counts, dict) else 0 + if failed > 0: + signals.append( + { + "scope": "pods", + "target": "failed", + "metric": "count", + "current": failed, + "severity": "critical", + } + ) + return signals + + +def _workload_health_signals(workloads_health: dict[str, Any]) -> list[dict[str, Any]]: + not_ready = _workload_not_ready_items(workloads_health) + if not not_ready: + return [] + output: list[dict[str, Any]] = [] + for entry in not_ready[:5]: + output.append( + { + "scope": "workload", + "target": f"{entry.get('namespace')}/{entry.get('workload')}", + "metric": "not_ready", + "current": entry.get("ready") or 0, + "desired": entry.get("desired") or 0, + "severity": "warning", + } + ) + return output + + +def _build_signals( + metrics: dict[str, Any], + node_context: list[dict[str, Any]], + namespace_context: list[dict[str, Any]], + workloads_health: dict[str, Any], + pod_issues: dict[str, Any], + kustomizations: dict[str, Any], +) -> list[dict[str, Any]]: + signals = ( + _node_delta_signals(node_context) + + _namespace_delta_signals(namespace_context) + + _workload_health_signals(workloads_health) + + _pod_issue_signals(pod_issues) + + _kustomization_signals(kustomizations) + + _pvc_pressure_signals(metrics) + ) + signals.sort(key=lambda item: (_severity_rank(item.get("severity")), item.get("scope") or "")) + return signals[:_SIGNAL_LIMIT] + + +def _node_profiles(node_context: list[dict[str, Any]], node_pods: list[dict[str, Any]]) -> list[dict[str, Any]]: + pod_map = {entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)} + profiles: list[dict[str, Any]] = [] + for entry in node_context: + if not isinstance(entry, dict): + continue + node = entry.get("node") + if not isinstance(node, str) or not node: + continue + pods = pod_map.get(node, {}) + profiles.append( + { + "node": node, + "ready": entry.get("ready"), + "hardware": entry.get("hardware"), + "arch": entry.get("arch"), + "roles": entry.get("roles"), + "pods_total": pods.get("pods_total"), + "pods_running": pods.get("pods_running"), + "namespaces_top": pods.get("namespaces_top") or [], + "load_index": entry.get("load_index"), + "cpu": entry.get("cpu"), + "ram": entry.get("ram"), + "net": entry.get("net"), + "io": entry.get("io"), + "disk": entry.get("disk"), + "baseline_delta": entry.get("baseline_delta") or {}, + } + ) + profiles.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) + return profiles[:_PROFILE_LIMIT] + + +def _namespace_profiles(namespace_context: list[dict[str, Any]]) -> list[dict[str, Any]]: + entries = [entry for entry in namespace_context if isinstance(entry, dict)] + entries.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "")) + output: list[dict[str, Any]] = [] + for entry in entries[:_PROFILE_LIMIT]: + output.append( + { + "namespace": entry.get("namespace"), + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "primary_node": entry.get("primary_node"), + "nodes_top": entry.get("nodes_top") or [], + "cpu_usage": entry.get("cpu_usage"), + "mem_usage": entry.get("mem_usage"), + "cpu_ratio": entry.get("cpu_ratio"), + "mem_ratio": entry.get("mem_ratio"), + "baseline_delta": entry.get("baseline_delta") or {}, + } + ) + return output + + +def _workload_profiles(workloads: list[dict[str, Any]]) -> list[dict[str, Any]]: + entries = [entry for entry in workloads if isinstance(entry, dict)] + entries.sort( + key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""), + ) + output: list[dict[str, Any]] = [] + for entry in entries[:_PROFILE_LIMIT]: + nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {} + nodes_top = ( + sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:3] + if isinstance(nodes, dict) + else [] + ) + output.append( + { + "namespace": entry.get("namespace"), + "workload": entry.get("workload"), + "source": entry.get("source"), + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "primary_node": entry.get("primary_node"), + "nodes_top": nodes_top, + } + ) + return output + + +def _build_profiles( + node_context: list[dict[str, Any]], + namespace_context: list[dict[str, Any]], + node_pods: list[dict[str, Any]], + workloads: list[dict[str, Any]], +) -> dict[str, Any]: + return { + "nodes": _node_profiles(node_context, node_pods), + "namespaces": _namespace_profiles(namespace_context), + "workloads": _workload_profiles(workloads), + } + + +def _severity_rank(value: Any) -> int: + if value == "critical": + return 0 + if value == "warning": + return 1 + return 2 + + +def _pvc_pressure_signals(metrics: dict[str, Any]) -> list[dict[str, Any]]: + pvc_top = _pvc_top(metrics.get("pvc_usage_top", [])) + if not pvc_top: + return [] + output: list[dict[str, Any]] = [] + for entry in pvc_top: + used = entry.get("used_percent") + if not isinstance(used, (int, float)) or used < _PVC_PRESSURE_THRESHOLD: + continue + output.append( + { + "scope": "pvc", + "target": f"{entry.get('namespace')}/{entry.get('pvc')}", + "metric": "used_percent", + "current": used, + "severity": "warning" if used < 90 else "critical", + } + ) + return output + + def _build_anomalies( metrics: dict[str, Any], nodes_summary: dict[str, Any], @@ -2467,10 +2752,20 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: metrics.get("node_load", []), metrics.get("node_baseline_map", {}), ) + signals = _build_signals( + metrics, + node_context, + namespace_context, + workload_health, + pod_issues, + kustomizations, + ) + profiles = _build_profiles(node_context, namespace_context, node_pods, workloads) summary = { "generated_at": collected_at.isoformat(), "windows": metrics.get("windows", {}), "baseline_window": _BASELINE_WINDOW, + "inventory": node_summary, "counts": { "nodes_total": metrics.get("nodes_total"), "nodes_ready": metrics.get("nodes_ready"), @@ -2497,6 +2792,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "workload_nodes": _workload_nodes_top(workloads, 5), }, "attention_ranked": _build_attention_ranked(metrics, node_context, pod_issues, workload_health), + "signals": signals, + "profiles": profiles, "anomalies": anomalies, "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), "unknowns": errors,