From a6062be60e614806e2bc24cd1d35dfdf7c91600d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 29 Jan 2026 07:43:37 -0300 Subject: [PATCH] snapshot: add longhorn volume summary --- ariadne/services/cluster_state.py | 215 +++++++++++++++++++++--------- 1 file changed, 151 insertions(+), 64 deletions(-) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index a0920e9..51666ad 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -61,6 +61,7 @@ _PHASE_SEVERITY = { "Pending": 2, "Unknown": 1, } +_PENDING_15M_HOURS = 0.25 @dataclass(frozen=True) @@ -711,68 +712,91 @@ def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]] return output -def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: - items: list[dict[str, Any]] = [] - counts: dict[str, int] = {key: 0 for key in _PHASE_SEVERITY} - pending_oldest: list[dict[str, Any]] = [] - waiting_reason_counts: dict[str, int] = {} - phase_reason_counts: dict[str, int] = {} - for pod in _items(payload): - metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} - status = pod.get("status") if isinstance(pod.get("status"), dict) else {} - spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} - namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" - name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" - created_at = ( - metadata.get("creationTimestamp") - if isinstance(metadata.get("creationTimestamp"), str) - else "" - ) - age_hours = _age_hours(created_at) - if not name or not namespace: +def _record_pending_pod( + pending_oldest: list[dict[str, Any]], + info: dict[str, Any], +) -> bool: + age_hours = info.get("age_hours") + if age_hours is None: + return False + pending_oldest.append(info) + return age_hours >= _PENDING_15M_HOURS + + +def _update_pod_issue( + pod: dict[str, Any], + acc: dict[str, Any], +) -> None: + metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + created_at = ( + metadata.get("creationTimestamp") + if isinstance(metadata.get("creationTimestamp"), str) + else "" + ) + age_hours = _age_hours(created_at) + if not name or not namespace: + return + phase = status.get("phase") if isinstance(status.get("phase"), str) else "" + restarts = 0 + waiting_reasons: list[str] = [] + for container in status.get("containerStatuses") or []: + if not isinstance(container, dict): continue - phase = status.get("phase") if isinstance(status.get("phase"), str) else "" - restarts = 0 - waiting_reasons: list[str] = [] - for container in status.get("containerStatuses") or []: - if not isinstance(container, dict): - continue - restarts += int(container.get("restartCount") or 0) - state = container.get("state") if isinstance(container.get("state"), dict) else {} - waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {} - reason = waiting.get("reason") - if isinstance(reason, str) and reason: - waiting_reasons.append(reason) - waiting_reason_counts[reason] = waiting_reason_counts.get(reason, 0) + 1 - phase_reason = status.get("reason") - if isinstance(phase_reason, str) and phase_reason: - phase_reason_counts[phase_reason] = phase_reason_counts.get(phase_reason, 0) + 1 - if phase in counts: - counts[phase] += 1 - if phase in _PHASE_SEVERITY or restarts > 0: - items.append( - { - "namespace": namespace, - "pod": name, - "node": spec.get("nodeName") or "", - "phase": phase, - "reason": status.get("reason") or "", - "restarts": restarts, - "waiting_reasons": sorted(set(waiting_reasons)), - "created_at": created_at, - "age_hours": age_hours, - } - ) - if phase == "Pending" and age_hours is not None: - pending_oldest.append( - { - "namespace": namespace, - "pod": name, - "node": spec.get("nodeName") or "", - "age_hours": age_hours, - "reason": status.get("reason") or "", - } - ) + restarts += int(container.get("restartCount") or 0) + state = container.get("state") if isinstance(container.get("state"), dict) else {} + waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {} + reason = waiting.get("reason") + if isinstance(reason, str) and reason: + waiting_reasons.append(reason) + acc["waiting_reasons"][reason] = acc["waiting_reasons"].get(reason, 0) + 1 + phase_reason = status.get("reason") + if isinstance(phase_reason, str) and phase_reason: + acc["phase_reasons"][phase_reason] = acc["phase_reasons"].get(phase_reason, 0) + 1 + if phase in acc["counts"]: + acc["counts"][phase] += 1 + if phase in _PHASE_SEVERITY or restarts > 0: + acc["items"].append( + { + "namespace": namespace, + "pod": name, + "node": spec.get("nodeName") or "", + "phase": phase, + "reason": status.get("reason") or "", + "restarts": restarts, + "waiting_reasons": sorted(set(waiting_reasons)), + "created_at": created_at, + "age_hours": age_hours, + } + ) + if phase == "Pending": + info = { + "namespace": namespace, + "pod": name, + "node": spec.get("nodeName") or "", + "age_hours": age_hours, + "reason": status.get("reason") or "", + } + if _record_pending_pod(acc["pending_oldest"], info): + acc["pending_over_15m"] += 1 + + +def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: + acc = { + "items": [], + "counts": {key: 0 for key in _PHASE_SEVERITY}, + "pending_oldest": [], + "pending_over_15m": 0, + "waiting_reasons": {}, + "phase_reasons": {}, + } + for pod in _items(payload): + if isinstance(pod, dict): + _update_pod_issue(pod, acc) + items = acc["items"] items.sort( key=lambda item: ( -_PHASE_SEVERITY.get(item.get("phase") or "", 0), @@ -781,13 +805,15 @@ def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: item.get("pod") or "", ) ) + pending_oldest = acc["pending_oldest"] pending_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0)) return { - "counts": counts, + "counts": acc["counts"], "items": items[:20], "pending_oldest": pending_oldest[:10], - "waiting_reasons": waiting_reason_counts, - "phase_reasons": phase_reason_counts, + "pending_over_15m": acc["pending_over_15m"], + "waiting_reasons": acc["waiting_reasons"], + "phase_reasons": acc["phase_reasons"], } @@ -1036,6 +1062,57 @@ def _fetch_jobs(errors: list[str]) -> dict[str, Any]: return {} +def _summarize_longhorn_volumes(payload: dict[str, Any]) -> dict[str, Any]: + items = _items(payload) + if not items: + return {} + by_state: dict[str, int] = {} + by_robustness: dict[str, int] = {} + unhealthy: list[dict[str, Any]] = [] + for volume in items: + metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {} + status = volume.get("status") if isinstance(volume.get("status"), dict) else {} + spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + if not name: + continue + state = status.get("state") if isinstance(status.get("state"), str) else "unknown" + robustness = ( + status.get("robustness") if isinstance(status.get("robustness"), str) else "unknown" + ) + by_state[state] = by_state.get(state, 0) + 1 + by_robustness[robustness] = by_robustness.get(robustness, 0) + 1 + if state.lower() != "attached" or robustness.lower() != "healthy": + unhealthy.append( + { + "name": name, + "state": state, + "robustness": robustness, + "size": spec.get("size"), + "actual_size": status.get("actualSize"), + } + ) + unhealthy.sort(key=lambda item: item.get("name") or "") + return { + "total": len(items), + "by_state": by_state, + "by_robustness": by_robustness, + "unhealthy": unhealthy, + "unhealthy_count": len(unhealthy), + } + + +def _fetch_longhorn(errors: list[str]) -> dict[str, Any]: + try: + payload = get_json( + "/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes" + ) + return _summarize_longhorn_volumes(payload) + except Exception as exc: + errors.append(f"longhorn: {exc}") + return {} + + def _fetch_workload_health(errors: list[str]) -> dict[str, Any]: try: deployments_payload = get_json("/apis/apps/v1/deployments?limit=2000") @@ -1149,6 +1226,9 @@ def _postgres_connections(errors: list[str]) -> dict[str, Any]: try: postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)") postgres["max"] = _vm_scalar("max(pg_settings_max_connections)") + postgres["by_db"] = _vm_vector( + "topk(5, sum by (datname) (pg_stat_activity_count))" + ) postgres["hottest_db"] = _vm_topk( "topk(1, sum by (datname) (pg_stat_activity_count))", "datname", @@ -1271,6 +1351,11 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: metrics["top_restarts_1h"] = _vm_vector( f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))" ) + metrics["restart_namespace_top"] = _filter_namespace_vector( + _vm_vector( + f"topk(5, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))" + ) + ) metrics["pod_cpu_top"] = _filter_namespace_vector( _vm_vector( f'topk(5, sum by (namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))' @@ -1369,6 +1454,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: kustomizations = _fetch_flux(errors) workloads, namespace_pods, namespace_nodes, node_pods, pod_issues = _fetch_pods(errors) jobs = _fetch_jobs(errors) + longhorn = _fetch_longhorn(errors) workload_health = _fetch_workload_health(errors) events = _fetch_events(errors) @@ -1386,6 +1472,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "node_pods": node_pods, "pod_issues": pod_issues, "jobs": jobs, + "longhorn": longhorn, "workloads_health": workload_health, "events": events, "metrics": metrics,