diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index e2b3735..4223cce 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -682,6 +682,185 @@ def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]: return {"counts": counts, "items": items[:20]} +def _summarize_deployments(payload: dict[str, Any]) -> dict[str, Any]: + items = _items(payload) + unhealthy: list[dict[str, Any]] = [] + for dep in items: + metadata = dep.get("metadata") if isinstance(dep.get("metadata"), dict) else {} + spec = dep.get("spec") if isinstance(dep.get("spec"), dict) else {} + status = dep.get("status") if isinstance(dep.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + desired = int(spec.get("replicas") or 0) + ready = int(status.get("readyReplicas") or 0) + available = int(status.get("availableReplicas") or 0) + updated = int(status.get("updatedReplicas") or 0) + if desired <= 0: + continue + if ready < desired or available < desired: + unhealthy.append( + { + "name": name, + "namespace": namespace, + "desired": desired, + "ready": ready, + "available": available, + "updated": updated, + } + ) + unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or "")) + return { + "total": len(items), + "not_ready": len(unhealthy), + "items": unhealthy, + } + + +def _summarize_statefulsets(payload: dict[str, Any]) -> dict[str, Any]: + items = _items(payload) + unhealthy: list[dict[str, Any]] = [] + for st in items: + metadata = st.get("metadata") if isinstance(st.get("metadata"), dict) else {} + spec = st.get("spec") if isinstance(st.get("spec"), dict) else {} + status = st.get("status") if isinstance(st.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + desired = int(spec.get("replicas") or 0) + ready = int(status.get("readyReplicas") or 0) + current = int(status.get("currentReplicas") or 0) + updated = int(status.get("updatedReplicas") or 0) + if desired <= 0: + continue + if ready < desired: + unhealthy.append( + { + "name": name, + "namespace": namespace, + "desired": desired, + "ready": ready, + "current": current, + "updated": updated, + } + ) + unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or "")) + return { + "total": len(items), + "not_ready": len(unhealthy), + "items": unhealthy, + } + + +def _summarize_daemonsets(payload: dict[str, Any]) -> dict[str, Any]: + items = _items(payload) + unhealthy: list[dict[str, Any]] = [] + for ds in items: + metadata = ds.get("metadata") if isinstance(ds.get("metadata"), dict) else {} + status = ds.get("status") if isinstance(ds.get("status"), dict) else {} + name = metadata.get("name") if isinstance(metadata.get("name"), str) else "" + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + desired = int(status.get("desiredNumberScheduled") or 0) + ready = int(status.get("numberReady") or 0) + updated = int(status.get("updatedNumberScheduled") or 0) + if desired <= 0: + continue + if ready < desired: + unhealthy.append( + { + "name": name, + "namespace": namespace, + "desired": desired, + "ready": ready, + "updated": updated, + } + ) + unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or "")) + return { + "total": len(items), + "not_ready": len(unhealthy), + "items": unhealthy, + } + + +def _summarize_workload_health( + deployments: dict[str, Any], + statefulsets: dict[str, Any], + daemonsets: dict[str, Any], +) -> dict[str, Any]: + return { + "deployments": deployments, + "statefulsets": statefulsets, + "daemonsets": daemonsets, + } + + +def _fetch_nodes(errors: list[str]) -> tuple[dict[str, Any], list[dict[str, Any]], dict[str, Any]]: + nodes: dict[str, Any] = {} + details: list[dict[str, Any]] = [] + summary: dict[str, Any] = {} + try: + payload = get_json("/api/v1/nodes") + nodes = _summarize_nodes(payload) + details = _node_details(payload) + summary = _summarize_inventory(details) + except Exception as exc: + errors.append(f"nodes: {exc}") + return nodes, details, summary + + +def _fetch_flux(errors: list[str]) -> dict[str, Any]: + try: + payload = get_json( + "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations" + ) + return _summarize_kustomizations(payload) + except Exception as exc: + errors.append(f"flux: {exc}") + return {} + + +def _fetch_pods( + errors: list[str], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]: + workloads: list[dict[str, Any]] = [] + namespace_pods: list[dict[str, Any]] = [] + namespace_nodes: list[dict[str, Any]] = [] + node_pods: list[dict[str, Any]] = [] + pod_issues: dict[str, Any] = {} + try: + pods_payload = get_json("/api/v1/pods?limit=5000") + workloads = _summarize_workloads(pods_payload) + namespace_pods = _summarize_namespace_pods(pods_payload) + namespace_nodes = _summarize_namespace_nodes(pods_payload) + node_pods = _summarize_node_pods(pods_payload) + pod_issues = _summarize_pod_issues(pods_payload) + except Exception as exc: + errors.append(f"pods: {exc}") + return workloads, namespace_pods, namespace_nodes, node_pods, pod_issues + + +def _fetch_workload_health(errors: list[str]) -> dict[str, Any]: + try: + deployments_payload = get_json("/apis/apps/v1/deployments?limit=2000") + statefulsets_payload = get_json("/apis/apps/v1/statefulsets?limit=2000") + daemonsets_payload = get_json("/apis/apps/v1/daemonsets?limit=2000") + deployments = _summarize_deployments(deployments_payload) + statefulsets = _summarize_statefulsets(statefulsets_payload) + daemonsets = _summarize_daemonsets(daemonsets_payload) + return _summarize_workload_health(deployments, statefulsets, daemonsets) + except Exception as exc: + errors.append(f"workloads_health: {exc}") + return {} + + +def _fetch_events(errors: list[str]) -> dict[str, Any]: + try: + events_payload = get_json("/api/v1/events?limit=2000") + return _summarize_events(events_payload) + except Exception as exc: + errors.append(f"events: {exc}") + return {} + + def _vm_query(expr: str) -> list[dict[str, Any]] | None: base = settings.vm_url if not base: @@ -948,61 +1127,26 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: errors: list[str] = [] collected_at = datetime.now(timezone.utc) - nodes: dict[str, Any] | None = None - node_details: list[dict[str, Any]] = [] - node_summary: dict[str, Any] = {} - try: - payload = get_json("/api/v1/nodes") - nodes = _summarize_nodes(payload) - node_details = _node_details(payload) - node_summary = _summarize_inventory(node_details) - except Exception as exc: - errors.append(f"nodes: {exc}") - - kustomizations: dict[str, Any] | None = None - try: - payload = get_json( - "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations" - ) - kustomizations = _summarize_kustomizations(payload) - except Exception as exc: - errors.append(f"flux: {exc}") - - workloads: list[dict[str, Any]] = [] - namespace_pods: list[dict[str, Any]] = [] - namespace_nodes: list[dict[str, Any]] = [] - node_pods: list[dict[str, Any]] = [] - pod_issues: dict[str, Any] = {} - try: - pods_payload = get_json("/api/v1/pods?limit=5000") - workloads = _summarize_workloads(pods_payload) - namespace_pods = _summarize_namespace_pods(pods_payload) - namespace_nodes = _summarize_namespace_nodes(pods_payload) - node_pods = _summarize_node_pods(pods_payload) - pod_issues = _summarize_pod_issues(pods_payload) - except Exception as exc: - errors.append(f"pods: {exc}") - - events: dict[str, Any] = {} - try: - events_payload = get_json("/api/v1/events?limit=2000") - events = _summarize_events(events_payload) - except Exception as exc: - errors.append(f"events: {exc}") + nodes, node_details, node_summary = _fetch_nodes(errors) + kustomizations = _fetch_flux(errors) + workloads, namespace_pods, namespace_nodes, node_pods, pod_issues = _fetch_pods(errors) + workload_health = _fetch_workload_health(errors) + events = _fetch_events(errors) metrics = _summarize_metrics(errors) snapshot = { "collected_at": collected_at.isoformat(), - "nodes": nodes or {}, + "nodes": nodes, "nodes_summary": node_summary, "nodes_detail": node_details, - "flux": kustomizations or {}, + "flux": kustomizations, "workloads": workloads, "namespace_pods": namespace_pods, "namespace_nodes": namespace_nodes, "node_pods": node_pods, "pod_issues": pod_issues, + "workloads_health": workload_health, "events": events, "metrics": metrics, "errors": errors, diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index 75dc0e9..b0e0780 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -55,6 +55,35 @@ def test_collect_cluster_state(monkeypatch) -> None: } if path.startswith("/api/v1/events"): return {"items": []} + if path.startswith("/apis/apps/v1/deployments"): + return { + "items": [ + { + "metadata": {"name": "api", "namespace": "apps"}, + "spec": {"replicas": 2}, + "status": {"readyReplicas": 1, "availableReplicas": 1, "updatedReplicas": 1}, + } + ] + } + if path.startswith("/apis/apps/v1/statefulsets"): + return { + "items": [ + { + "metadata": {"name": "db", "namespace": "apps"}, + "spec": {"replicas": 1}, + "status": {"readyReplicas": 1, "currentReplicas": 1, "updatedReplicas": 1}, + } + ] + } + if path.startswith("/apis/apps/v1/daemonsets"): + return { + "items": [ + { + "metadata": {"name": "agent", "namespace": "apps"}, + "status": {"desiredNumberScheduled": 3, "numberReady": 3, "updatedNumberScheduled": 3}, + } + ] + } return { "items": [ { @@ -88,6 +117,9 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["namespace_nodes"] assert snapshot["node_pods"] assert "pod_issues" in snapshot + assert "workloads_health" in snapshot + assert snapshot["workloads_health"]["deployments"]["total"] == 1 + assert snapshot["workloads_health"]["deployments"]["not_ready"] == 1 assert snapshot["events"]["warnings_total"] == 0 assert "node_usage_stats" in snapshot["metrics"] assert snapshot["metrics"]["namespace_cpu_top"] == []