From fbfa701d422f4a014f5468a449227fbfbac8b708 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 29 Jan 2026 05:54:00 -0300 Subject: [PATCH] feat(cluster-state): add job and node age summaries --- ariadne/services/cluster_state.py | 155 +++++++++++++++++++++++++++++- 1 file changed, 154 insertions(+), 1 deletion(-) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 8a990ee..9194ddb 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -195,6 +195,38 @@ def _age_hours(timestamp: str) -> float | None: return round((datetime.now(timezone.utc) - parsed).total_seconds() / 3600, 1) +def _node_age_stats(details: list[dict[str, Any]]) -> dict[str, Any]: + ages: list[tuple[str, float]] = [] + for node in details: + name = node.get("name") if isinstance(node, dict) else "" + age = node.get("age_hours") + if isinstance(name, str) and name and isinstance(age, (int, float)): + ages.append((name, float(age))) + if not ages: + return {} + ages.sort(key=lambda item: item[1]) + values = [age for _, age in ages] + return { + "min": round(min(values), 1), + "max": round(max(values), 1), + "avg": round(sum(values) / len(values), 1), + "youngest": [{"name": name, "age_hours": age} for name, age in ages[:5]], + "oldest": [{"name": name, "age_hours": age} for name, age in ages[-5:]], + } + + +def _node_flagged(details: list[dict[str, Any]], key: str) -> list[str]: + names: list[str] = [] + for node in details: + name = node.get("name") if isinstance(node, dict) else "" + if not isinstance(name, str) or not name: + continue + if node.get(key): + names.append(name) + names.sort() + return names + + def _node_taints(raw: Any) -> list[dict[str, str]]: if not isinstance(raw, list): return [] @@ -226,6 +258,9 @@ def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]: "by_role": {}, "not_ready_names": [], "pressure_nodes": {key: [] for key in _PRESSURE_TYPES}, + "age_stats": {}, + "tainted_nodes": [], + "unschedulable_nodes": [], } not_ready: list[str] = [] for node in details: @@ -236,6 +271,9 @@ def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]: summary["not_ready_names"] = not_ready for cond_type in summary["pressure_nodes"]: summary["pressure_nodes"][cond_type].sort() + summary["age_stats"] = _node_age_stats(details) + summary["tainted_nodes"] = _node_flagged(details, "taints") + summary["unschedulable_nodes"] = _node_flagged(details, "unschedulable") return summary @@ -675,12 +713,19 @@ def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]] def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: items: list[dict[str, Any]] = [] counts: dict[str, int] = {key: 0 for key in _PHASE_SEVERITY} + pending_oldest: list[dict[str, Any]] = [] for pod in _items(payload): metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {} status = pod.get("status") if isinstance(pod.get("status"), dict) else {} spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {} namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + created_at = ( + metadata.get("creationTimestamp") + if isinstance(metadata.get("creationTimestamp"), str) + else "" + ) + age_hours = _age_hours(created_at) if not name or not namespace: continue phase = status.get("phase") if isinstance(status.get("phase"), str) else "" @@ -707,6 +752,18 @@ def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: "reason": status.get("reason") or "", "restarts": restarts, "waiting_reasons": sorted(set(waiting_reasons)), + "created_at": created_at, + "age_hours": age_hours, + } + ) + if phase == "Pending" and age_hours is not None: + pending_oldest.append( + { + "namespace": namespace, + "pod": name, + "node": spec.get("nodeName") or "", + "age_hours": age_hours, + "reason": status.get("reason") or "", } ) items.sort( @@ -717,7 +774,92 @@ def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: item.get("pod") or "", ) ) - return {"counts": counts, "items": items[:20]} + pending_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0)) + return { + "counts": counts, + "items": items[:20], + "pending_oldest": pending_oldest[:10], + } + + +def _summarize_jobs(payload: dict[str, Any]) -> dict[str, Any]: + totals = {"total": 0, "active": 0, "failed": 0, "succeeded": 0} + by_namespace: dict[str, dict[str, int]] = {} + failing: list[dict[str, Any]] = [] + active_oldest: list[dict[str, Any]] = [] + for job in _items(payload): + metadata = job.get("metadata") if isinstance(job.get("metadata"), dict) else {} + status = job.get("status") if isinstance(job.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + created_at = ( + metadata.get("creationTimestamp") + if isinstance(metadata.get("creationTimestamp"), str) + else "" + ) + if not name or not namespace: + continue + active = int(status.get("active") or 0) + failed = int(status.get("failed") or 0) + succeeded = int(status.get("succeeded") or 0) + totals["total"] += 1 + totals["active"] += active + totals["failed"] += failed + totals["succeeded"] += succeeded + entry = by_namespace.setdefault(namespace, {"active": 0, "failed": 0, "succeeded": 0}) + entry["active"] += active + entry["failed"] += failed + entry["succeeded"] += succeeded + age_hours = _age_hours(created_at) + if failed > 0: + failing.append( + { + "namespace": namespace, + "job": name, + "failed": failed, + "age_hours": age_hours, + } + ) + if active > 0 and age_hours is not None: + active_oldest.append( + { + "namespace": namespace, + "job": name, + "active": active, + "age_hours": age_hours, + } + ) + failing.sort( + key=lambda item: ( + -(item.get("failed") or 0), + -(item.get("age_hours") or 0.0), + item.get("namespace") or "", + item.get("job") or "", + ) + ) + active_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0)) + namespace_summary = [ + { + "namespace": ns, + "active": stats.get("active", 0), + "failed": stats.get("failed", 0), + "succeeded": stats.get("succeeded", 0), + } + for ns, stats in by_namespace.items() + ] + namespace_summary.sort( + key=lambda item: ( + -(item.get("active") or 0), + -(item.get("failed") or 0), + item.get("namespace") or "", + ) + ) + return { + "totals": totals, + "by_namespace": namespace_summary[:20], + "failing": failing[:20], + "active_oldest": active_oldest[:20], + } def _summarize_deployments(payload: dict[str, Any]) -> dict[str, Any]: @@ -876,6 +1018,15 @@ def _fetch_pods( return workloads, namespace_pods, namespace_nodes, node_pods, pod_issues +def _fetch_jobs(errors: list[str]) -> dict[str, Any]: + try: + jobs_payload = get_json("/apis/batch/v1/jobs?limit=2000") + return _summarize_jobs(jobs_payload) + except Exception as exc: + errors.append(f"jobs: {exc}") + return {} + + def _fetch_workload_health(errors: list[str]) -> dict[str, Any]: try: deployments_payload = get_json("/apis/apps/v1/deployments?limit=2000") @@ -1208,6 +1359,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: nodes, node_details, node_summary = _fetch_nodes(errors) kustomizations = _fetch_flux(errors) workloads, namespace_pods, namespace_nodes, node_pods, pod_issues = _fetch_pods(errors) + jobs = _fetch_jobs(errors) workload_health = _fetch_workload_health(errors) events = _fetch_events(errors) @@ -1224,6 +1376,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "namespace_nodes": namespace_nodes, "node_pods": node_pods, "pod_issues": pod_issues, + "jobs": jobs, "workloads_health": workload_health, "events": events, "metrics": metrics,