atlasbot: add workload health to cluster snapshot

This commit is contained in:
Brad Stein 2026-01-29 02:39:08 -03:00
parent e809f0b8bd
commit 2370aa4e5d
2 changed files with 219 additions and 43 deletions

View File

@ -682,6 +682,185 @@ def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]:
return {"counts": counts, "items": items[:20]}
def _summarize_deployments(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for dep in items:
metadata = dep.get("metadata") if isinstance(dep.get("metadata"), dict) else {}
spec = dep.get("spec") if isinstance(dep.get("spec"), dict) else {}
status = dep.get("status") if isinstance(dep.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(spec.get("replicas") or 0)
ready = int(status.get("readyReplicas") or 0)
available = int(status.get("availableReplicas") or 0)
updated = int(status.get("updatedReplicas") or 0)
if desired <= 0:
continue
if ready < desired or available < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"available": available,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_statefulsets(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for st in items:
metadata = st.get("metadata") if isinstance(st.get("metadata"), dict) else {}
spec = st.get("spec") if isinstance(st.get("spec"), dict) else {}
status = st.get("status") if isinstance(st.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(spec.get("replicas") or 0)
ready = int(status.get("readyReplicas") or 0)
current = int(status.get("currentReplicas") or 0)
updated = int(status.get("updatedReplicas") or 0)
if desired <= 0:
continue
if ready < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"current": current,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_daemonsets(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for ds in items:
metadata = ds.get("metadata") if isinstance(ds.get("metadata"), dict) else {}
status = ds.get("status") if isinstance(ds.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(status.get("desiredNumberScheduled") or 0)
ready = int(status.get("numberReady") or 0)
updated = int(status.get("updatedNumberScheduled") or 0)
if desired <= 0:
continue
if ready < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_workload_health(
deployments: dict[str, Any],
statefulsets: dict[str, Any],
daemonsets: dict[str, Any],
) -> dict[str, Any]:
return {
"deployments": deployments,
"statefulsets": statefulsets,
"daemonsets": daemonsets,
}
def _fetch_nodes(errors: list[str]) -> tuple[dict[str, Any], list[dict[str, Any]], dict[str, Any]]:
nodes: dict[str, Any] = {}
details: list[dict[str, Any]] = []
summary: dict[str, Any] = {}
try:
payload = get_json("/api/v1/nodes")
nodes = _summarize_nodes(payload)
details = _node_details(payload)
summary = _summarize_inventory(details)
except Exception as exc:
errors.append(f"nodes: {exc}")
return nodes, details, summary
def _fetch_flux(errors: list[str]) -> dict[str, Any]:
try:
payload = get_json(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations"
)
return _summarize_kustomizations(payload)
except Exception as exc:
errors.append(f"flux: {exc}")
return {}
def _fetch_pods(
errors: list[str],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
workloads: list[dict[str, Any]] = []
namespace_pods: list[dict[str, Any]] = []
namespace_nodes: list[dict[str, Any]] = []
node_pods: list[dict[str, Any]] = []
pod_issues: dict[str, Any] = {}
try:
pods_payload = get_json("/api/v1/pods?limit=5000")
workloads = _summarize_workloads(pods_payload)
namespace_pods = _summarize_namespace_pods(pods_payload)
namespace_nodes = _summarize_namespace_nodes(pods_payload)
node_pods = _summarize_node_pods(pods_payload)
pod_issues = _summarize_pod_issues(pods_payload)
except Exception as exc:
errors.append(f"pods: {exc}")
return workloads, namespace_pods, namespace_nodes, node_pods, pod_issues
def _fetch_workload_health(errors: list[str]) -> dict[str, Any]:
try:
deployments_payload = get_json("/apis/apps/v1/deployments?limit=2000")
statefulsets_payload = get_json("/apis/apps/v1/statefulsets?limit=2000")
daemonsets_payload = get_json("/apis/apps/v1/daemonsets?limit=2000")
deployments = _summarize_deployments(deployments_payload)
statefulsets = _summarize_statefulsets(statefulsets_payload)
daemonsets = _summarize_daemonsets(daemonsets_payload)
return _summarize_workload_health(deployments, statefulsets, daemonsets)
except Exception as exc:
errors.append(f"workloads_health: {exc}")
return {}
def _fetch_events(errors: list[str]) -> dict[str, Any]:
try:
events_payload = get_json("/api/v1/events?limit=2000")
return _summarize_events(events_payload)
except Exception as exc:
errors.append(f"events: {exc}")
return {}
def _vm_query(expr: str) -> list[dict[str, Any]] | None:
base = settings.vm_url
if not base:
@ -948,61 +1127,26 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
errors: list[str] = []
collected_at = datetime.now(timezone.utc)
nodes: dict[str, Any] | None = None
node_details: list[dict[str, Any]] = []
node_summary: dict[str, Any] = {}
try:
payload = get_json("/api/v1/nodes")
nodes = _summarize_nodes(payload)
node_details = _node_details(payload)
node_summary = _summarize_inventory(node_details)
except Exception as exc:
errors.append(f"nodes: {exc}")
kustomizations: dict[str, Any] | None = None
try:
payload = get_json(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations"
)
kustomizations = _summarize_kustomizations(payload)
except Exception as exc:
errors.append(f"flux: {exc}")
workloads: list[dict[str, Any]] = []
namespace_pods: list[dict[str, Any]] = []
namespace_nodes: list[dict[str, Any]] = []
node_pods: list[dict[str, Any]] = []
pod_issues: dict[str, Any] = {}
try:
pods_payload = get_json("/api/v1/pods?limit=5000")
workloads = _summarize_workloads(pods_payload)
namespace_pods = _summarize_namespace_pods(pods_payload)
namespace_nodes = _summarize_namespace_nodes(pods_payload)
node_pods = _summarize_node_pods(pods_payload)
pod_issues = _summarize_pod_issues(pods_payload)
except Exception as exc:
errors.append(f"pods: {exc}")
events: dict[str, Any] = {}
try:
events_payload = get_json("/api/v1/events?limit=2000")
events = _summarize_events(events_payload)
except Exception as exc:
errors.append(f"events: {exc}")
nodes, node_details, node_summary = _fetch_nodes(errors)
kustomizations = _fetch_flux(errors)
workloads, namespace_pods, namespace_nodes, node_pods, pod_issues = _fetch_pods(errors)
workload_health = _fetch_workload_health(errors)
events = _fetch_events(errors)
metrics = _summarize_metrics(errors)
snapshot = {
"collected_at": collected_at.isoformat(),
"nodes": nodes or {},
"nodes": nodes,
"nodes_summary": node_summary,
"nodes_detail": node_details,
"flux": kustomizations or {},
"flux": kustomizations,
"workloads": workloads,
"namespace_pods": namespace_pods,
"namespace_nodes": namespace_nodes,
"node_pods": node_pods,
"pod_issues": pod_issues,
"workloads_health": workload_health,
"events": events,
"metrics": metrics,
"errors": errors,

View File

@ -55,6 +55,35 @@ def test_collect_cluster_state(monkeypatch) -> None:
}
if path.startswith("/api/v1/events"):
return {"items": []}
if path.startswith("/apis/apps/v1/deployments"):
return {
"items": [
{
"metadata": {"name": "api", "namespace": "apps"},
"spec": {"replicas": 2},
"status": {"readyReplicas": 1, "availableReplicas": 1, "updatedReplicas": 1},
}
]
}
if path.startswith("/apis/apps/v1/statefulsets"):
return {
"items": [
{
"metadata": {"name": "db", "namespace": "apps"},
"spec": {"replicas": 1},
"status": {"readyReplicas": 1, "currentReplicas": 1, "updatedReplicas": 1},
}
]
}
if path.startswith("/apis/apps/v1/daemonsets"):
return {
"items": [
{
"metadata": {"name": "agent", "namespace": "apps"},
"status": {"desiredNumberScheduled": 3, "numberReady": 3, "updatedNumberScheduled": 3},
}
]
}
return {
"items": [
{
@ -88,6 +117,9 @@ def test_collect_cluster_state(monkeypatch) -> None:
assert snapshot["namespace_nodes"]
assert snapshot["node_pods"]
assert "pod_issues" in snapshot
assert "workloads_health" in snapshot
assert snapshot["workloads_health"]["deployments"]["total"] == 1
assert snapshot["workloads_health"]["deployments"]["not_ready"] == 1
assert snapshot["events"]["warnings_total"] == 0
assert "node_usage_stats" in snapshot["metrics"]
assert snapshot["metrics"]["namespace_cpu_top"] == []