atlasbot: expand cluster snapshot events and storage

This commit is contained in:
Brad Stein 2026-01-29 02:31:30 -03:00
parent ef756ff1fa
commit e809f0b8bd
2 changed files with 97 additions and 0 deletions

View File

@ -53,6 +53,8 @@ _PRESSURE_TYPES = {
"PIDPressure",
"NetworkUnavailable",
}
_EVENTS_MAX = 20
_EVENT_WARNING = "Warning"
_PHASE_SEVERITY = {
"Failed": 3,
"Pending": 2,
@ -354,6 +356,67 @@ def _namespace_allowed(namespace: str) -> bool:
return namespace not in _SYSTEM_NAMESPACES
def _event_timestamp(event: dict[str, Any]) -> str:
for key in ("eventTime", "lastTimestamp", "firstTimestamp"):
value = event.get(key)
if isinstance(value, str) and value:
return value
return ""
def _event_sort_key(timestamp: str) -> float:
if not timestamp:
return 0.0
try:
return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp()
except ValueError:
return 0.0
def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]:
warnings: list[dict[str, Any]] = []
by_reason: dict[str, int] = {}
by_namespace: dict[str, int] = {}
for event in _items(payload):
metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
event_type = event.get("type") if isinstance(event.get("type"), str) else ""
if event_type != _EVENT_WARNING:
continue
reason = event.get("reason") if isinstance(event.get("reason"), str) else ""
message = event.get("message") if isinstance(event.get("message"), str) else ""
count = event.get("count") if isinstance(event.get("count"), int) else 1
involved = (
event.get("involvedObject") if isinstance(event.get("involvedObject"), dict) else {}
)
timestamp = _event_timestamp(event)
warnings.append(
{
"namespace": namespace,
"reason": reason,
"message": message,
"count": count,
"last_seen": timestamp,
"object_kind": involved.get("kind") or "",
"object_name": involved.get("name") or "",
}
)
if reason:
by_reason[reason] = by_reason.get(reason, 0) + count
if namespace:
by_namespace[namespace] = by_namespace.get(namespace, 0) + count
warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True)
top = warnings[:_EVENTS_MAX]
return {
"warnings_total": len(warnings),
"warnings_by_reason": by_reason,
"warnings_by_namespace": by_namespace,
"warnings_recent": top,
}
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
for key in _WORKLOAD_LABEL_KEYS:
value = labels.get(key)
@ -770,11 +833,29 @@ def _node_usage(errors: list[str]) -> dict[str, Any]:
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
usage["disk"] = _vm_node_metric(
'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} '
'/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))',
"node",
)
except Exception as exc:
errors.append(f"node_usage: {exc}")
return usage
def _pvc_usage(errors: list[str]) -> list[dict[str, Any]]:
try:
entries = _vm_vector(
"topk(5, max by (namespace,persistentvolumeclaim) "
"(kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100))"
)
return _filter_namespace_vector(entries)
except Exception as exc:
errors.append(f"pvc_usage: {exc}")
return []
def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]:
values: list[float] = []
for entry in series:
@ -823,6 +904,7 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
"ram": _usage_stats(metrics.get("node_usage", {}).get("ram", [])),
"net": _usage_stats(metrics.get("node_usage", {}).get("net", [])),
"io": _usage_stats(metrics.get("node_usage", {}).get("io", [])),
"disk": _usage_stats(metrics.get("node_usage", {}).get("disk", [])),
}
try:
metrics["namespace_cpu_top"] = _filter_namespace_vector(
@ -837,14 +919,17 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
)
except Exception as exc:
errors.append(f"namespace_usage: {exc}")
metrics["pvc_usage_top"] = _pvc_usage(errors)
metrics["units"] = {
"cpu": "percent",
"ram": "percent",
"net": "bytes_per_sec",
"io": "bytes_per_sec",
"disk": "percent",
"restarts": "count",
"namespace_cpu": "cores",
"namespace_mem": "bytes",
"pvc_used_percent": "percent",
"capacity_cpu": "cores",
"allocatable_cpu": "cores",
"capacity_mem_bytes": "bytes",
@ -898,6 +983,13 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
except Exception as exc:
errors.append(f"pods: {exc}")
events: dict[str, Any] = {}
try:
events_payload = get_json("/api/v1/events?limit=2000")
events = _summarize_events(events_payload)
except Exception as exc:
errors.append(f"events: {exc}")
metrics = _summarize_metrics(errors)
snapshot = {
@ -911,6 +1003,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"namespace_nodes": namespace_nodes,
"node_pods": node_pods,
"pod_issues": pod_issues,
"events": events,
"metrics": metrics,
"errors": errors,
}

View File

@ -53,6 +53,8 @@ def test_collect_cluster_state(monkeypatch) -> None:
}
]
}
if path.startswith("/api/v1/events"):
return {"items": []}
return {
"items": [
{
@ -86,9 +88,11 @@ def test_collect_cluster_state(monkeypatch) -> None:
assert snapshot["namespace_nodes"]
assert snapshot["node_pods"]
assert "pod_issues" in snapshot
assert snapshot["events"]["warnings_total"] == 0
assert "node_usage_stats" in snapshot["metrics"]
assert snapshot["metrics"]["namespace_cpu_top"] == []
assert snapshot["metrics"]["namespace_mem_top"] == []
assert snapshot["metrics"]["pvc_usage_top"] == []
assert summary.nodes_total == 2
assert summary.nodes_ready == 1
assert summary.pods_running == 5.0