diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 2e473d7..669a210 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -1764,6 +1764,12 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics["namespace_capacity_summary"] = _namespace_capacity_summary( metrics.get("namespace_capacity", []), ) + metrics["namespace_totals"] = { + "cpu": _namespace_totals_list(namespace_cpu_usage), + "mem": _namespace_totals_list(namespace_mem_usage), + "cpu_requests": _namespace_totals_list(namespace_cpu_requests), + "mem_requests": _namespace_totals_list(namespace_mem_requests), + } metrics["pvc_usage_top"] = _pvc_usage(errors) metrics["units"] = { "cpu": "percent", @@ -1798,6 +1804,273 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: return metrics +def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]: + entries = [ + {"namespace": name, "value": value} + for name, value in totals.items() + if isinstance(name, str) and name + ] + entries.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "")) + return entries + + +def _vector_to_named(entries: list[dict[str, Any]], label_key: str, name_key: str) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for item in entries: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + value = item.get("value") + label = metric.get(label_key) if isinstance(metric, dict) else None + if not isinstance(label, str) or not label: + continue + output.append({name_key: label, "value": value, "metric": metric}) + output.sort(key=lambda item: (-(item.get("value") or 0), item.get(name_key) or "")) + return output + + +def _pvc_top(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for item in entries: + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + namespace = metric.get("namespace") + pvc = metric.get("persistentvolumeclaim") + if not isinstance(namespace, str) or not isinstance(pvc, str): + continue + output.append( + { + "namespace": namespace, + "pvc": pvc, + "used_percent": item.get("value"), + } + ) + output.sort(key=lambda item: (-(item.get("used_percent") or 0), item.get("namespace") or "")) + return output + + +def _namespace_context( + namespace_pods: list[dict[str, Any]], + namespace_nodes: list[dict[str, Any]], + namespace_capacity: list[dict[str, Any]], +) -> list[dict[str, Any]]: + node_map = {entry.get("namespace"): entry for entry in namespace_nodes if isinstance(entry, dict)} + cap_map = {entry.get("namespace"): entry for entry in namespace_capacity if isinstance(entry, dict)} + output: list[dict[str, Any]] = [] + for entry in namespace_pods: + if not isinstance(entry, dict): + continue + namespace = entry.get("namespace") + if not isinstance(namespace, str) or not namespace: + continue + nodes_entry = node_map.get(namespace, {}) + cap_entry = cap_map.get(namespace, {}) + nodes = nodes_entry.get("nodes") if isinstance(nodes_entry.get("nodes"), dict) else {} + top_nodes: list[dict[str, Any]] = [] + if isinstance(nodes, dict): + top_nodes = [ + {"node": name, "pods": count} + for name, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:3] + ] + output.append( + { + "namespace": namespace, + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "pods_pending": entry.get("pods_pending"), + "pods_failed": entry.get("pods_failed"), + "pods_succeeded": entry.get("pods_succeeded"), + "primary_node": nodes_entry.get("primary_node"), + "nodes_top": top_nodes, + "cpu_usage": cap_entry.get("cpu_usage"), + "cpu_requests": cap_entry.get("cpu_requests"), + "cpu_ratio": cap_entry.get("cpu_usage_ratio"), + "mem_usage": cap_entry.get("mem_usage"), + "mem_requests": cap_entry.get("mem_requests"), + "mem_ratio": cap_entry.get("mem_usage_ratio"), + } + ) + output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "")) + return output + + +def _node_context( + node_details: list[dict[str, Any]], + node_load: list[dict[str, Any]], +) -> list[dict[str, Any]]: + load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)} + output: list[dict[str, Any]] = [] + for entry in node_details: + if not isinstance(entry, dict): + continue + name = entry.get("name") + if not isinstance(name, str) or not name: + continue + load_entry = load_map.get(name, {}) + output.append( + { + "node": name, + "ready": entry.get("ready"), + "roles": entry.get("roles"), + "is_worker": entry.get("is_worker"), + "hardware": entry.get("hardware"), + "arch": entry.get("arch"), + "os": entry.get("os"), + "taints": entry.get("taints"), + "unschedulable": entry.get("unschedulable"), + "pressure_flags": entry.get("pressure"), + "pods_total": load_entry.get("pods_total"), + "cpu": load_entry.get("cpu"), + "ram": load_entry.get("ram"), + "disk": load_entry.get("disk"), + "net": load_entry.get("net"), + "io": load_entry.get("io"), + "load_index": load_entry.get("load_index"), + } + ) + output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or "")) + return output + + +def _build_anomalies( + metrics: dict[str, Any], + nodes_summary: dict[str, Any], + workloads_health: dict[str, Any], + kustomizations: dict[str, Any], + events: dict[str, Any], +) -> list[dict[str, Any]]: + anomalies: list[dict[str, Any]] = [] + pods_pending = metrics.get("pods_pending") or 0 + pods_failed = metrics.get("pods_failed") or 0 + if pods_pending: + anomalies.append( + { + "kind": "pods_pending", + "severity": "warning", + "summary": f"{int(pods_pending)} pods pending", + } + ) + if pods_failed: + anomalies.append( + { + "kind": "pods_failed", + "severity": "critical", + "summary": f"{int(pods_failed)} pods failed", + } + ) + for key in ("deployments", "statefulsets", "daemonsets"): + entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {} + not_ready = entry.get("not_ready") or 0 + if not_ready: + anomalies.append( + { + "kind": f"{key}_not_ready", + "severity": "warning", + "summary": f"{int(not_ready)} {key} not ready", + "items": entry.get("items"), + } + ) + flux_not_ready = (kustomizations or {}).get("not_ready") or 0 + if flux_not_ready: + anomalies.append( + { + "kind": "flux_not_ready", + "severity": "warning", + "summary": f"{int(flux_not_ready)} Flux kustomizations not ready", + "items": (kustomizations or {}).get("items"), + } + ) + job_failures = metrics.get("job_failures_24h") or [] + job_failures = [ + entry for entry in job_failures if isinstance(entry, dict) and (entry.get("value") or 0) > 0 + ] + if job_failures: + anomalies.append( + { + "kind": "job_failures_24h", + "severity": "warning", + "summary": "Job failures in last 24h", + "items": job_failures[:5], + } + ) + pvc_top = _pvc_top(metrics.get("pvc_usage_top") or []) + pvc_pressure = [entry for entry in pvc_top if (entry.get("used_percent") or 0) >= 80] + if pvc_pressure: + anomalies.append( + { + "kind": "pvc_pressure", + "severity": "warning", + "summary": "PVCs above 80% usage", + "items": pvc_pressure[:5], + } + ) + if nodes_summary: + pressure_nodes = nodes_summary.get("pressure_nodes") or {} + flagged = [ + name for names in pressure_nodes.values() if isinstance(names, list) for name in names if name + ] + if flagged: + anomalies.append( + { + "kind": "node_pressure", + "severity": "warning", + "summary": f"{len(flagged)} nodes report pressure", + "items": sorted(set(flagged)), + } + ) + unschedulable = nodes_summary.get("unschedulable_nodes") or [] + if unschedulable: + anomalies.append( + { + "kind": "unschedulable_nodes", + "severity": "info", + "summary": f"{len(unschedulable)} nodes unschedulable", + "items": unschedulable, + } + ) + if events: + warnings = events.get("warnings_total") or 0 + if warnings: + anomalies.append( + { + "kind": "event_warnings", + "severity": "info", + "summary": f"{int(warnings)} warning events", + "items": events.get("warnings") or [], + } + ) + return anomalies + + +def _health_bullets( + metrics: dict[str, Any], + nodes_summary: dict[str, Any], + workloads_health: dict[str, Any], + anomalies: list[dict[str, Any]], +) -> list[str]: + bullets: list[str] = [] + nodes_total = metrics.get("nodes_total") + nodes_ready = metrics.get("nodes_ready") + if nodes_total is not None and nodes_ready is not None: + bullets.append(f"Nodes ready: {int(nodes_ready)}/{int(nodes_total)}") + pods_running = metrics.get("pods_running") or 0 + pods_pending = metrics.get("pods_pending") or 0 + pods_failed = metrics.get("pods_failed") or 0 + bullets.append(f"Pods: {int(pods_running)} running, {int(pods_pending)} pending, {int(pods_failed)} failed") + not_ready = 0 + for key in ("deployments", "statefulsets", "daemonsets"): + entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {} + not_ready += int(entry.get("not_ready") or 0) + if not_ready: + bullets.append(f"Workloads not ready: {not_ready}") + else: + bullets.append("Workloads: all ready") + if anomalies: + top = anomalies[0].get("summary") if isinstance(anomalies[0], dict) else None + if isinstance(top, str) and top: + bullets.append(f"Top concern: {top}") + return bullets[:4] + + def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: errors: list[str] = [] collected_at = datetime.now(timezone.utc) @@ -1819,9 +2092,55 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: ) metrics["node_load_summary"] = _node_load_summary(metrics.get("node_load", [])) metrics["node_load_by_hardware"] = _node_usage_by_hardware(metrics.get("node_load", []), node_details) + metrics["namespace_top"] = { + "cpu": _vector_to_named(metrics.get("namespace_cpu_top", []), "namespace", "namespace"), + "mem": _vector_to_named(metrics.get("namespace_mem_top", []), "namespace", "namespace"), + "net": _vector_to_named(metrics.get("namespace_net_top", []), "namespace", "namespace"), + "io": _vector_to_named(metrics.get("namespace_io_top", []), "namespace", "namespace"), + "restarts": _vector_to_named(metrics.get("restart_namespace_top", []), "namespace", "namespace"), + } + + anomalies = _build_anomalies(metrics, node_summary, workload_health, kustomizations, events) + namespace_context = _namespace_context( + namespace_pods, + namespace_nodes, + metrics.get("namespace_capacity", []), + ) + node_context = _node_context(node_details, metrics.get("node_load", [])) + summary = { + "generated_at": collected_at.isoformat(), + "windows": metrics.get("windows", {}), + "counts": { + "nodes_total": metrics.get("nodes_total"), + "nodes_ready": metrics.get("nodes_ready"), + "pods_running": metrics.get("pods_running"), + "pods_pending": metrics.get("pods_pending"), + "pods_failed": metrics.get("pods_failed"), + "pods_succeeded": metrics.get("pods_succeeded"), + }, + "top": { + "namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5], + "namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5], + "namespace_pods": namespace_pods[:5], + "node_pods": metrics.get("node_pods_top", []), + "node_load": metrics.get("node_load_summary", {}).get("top", []), + "node_hottest": metrics.get("hottest_nodes", {}), + "postgres": metrics.get("postgres_connections", {}), + "pvc_usage": _pvc_top(metrics.get("pvc_usage_top", [])), + }, + "anomalies": anomalies, + "health_bullets": _health_bullets(metrics, node_summary, workload_health, anomalies), + "unknowns": errors, + } snapshot = { "collected_at": collected_at.isoformat(), + "snapshot_version": "v2", + "summary": summary, + "context": { + "nodes": node_context, + "namespaces": namespace_context, + }, "nodes": nodes, "nodes_summary": node_summary, "nodes_detail": node_details, diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index c814281..8ba5e3f 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -143,6 +143,14 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["metrics"]["pod_mem_top"] == [] assert snapshot["metrics"]["job_failures_24h"] == [] assert snapshot["metrics"]["pvc_usage_top"] == [] + assert snapshot["summary"]["counts"]["nodes_total"] == 5.0 + assert snapshot["summary"]["counts"]["nodes_ready"] == 5.0 + assert snapshot["summary"]["counts"]["pods_running"] == 5.0 + assert snapshot["summary"]["top"]["namespace_pods"][0]["namespace"] == "media" + assert snapshot["summary"]["health_bullets"] + assert snapshot["summary"]["unknowns"] == [] + assert snapshot["context"]["nodes"] + assert snapshot["context"]["namespaces"] assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0