From e809f0b8bd3ab8821e60be79e6710ba4f481674f Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 29 Jan 2026 02:31:30 -0300 Subject: [PATCH] atlasbot: expand cluster snapshot events and storage --- ariadne/services/cluster_state.py | 93 +++++++++++++++++++++++++++++++ tests/test_cluster_state.py | 4 ++ 2 files changed, 97 insertions(+) diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index e3db568..e2b3735 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -53,6 +53,8 @@ _PRESSURE_TYPES = { "PIDPressure", "NetworkUnavailable", } +_EVENTS_MAX = 20 +_EVENT_WARNING = "Warning" _PHASE_SEVERITY = { "Failed": 3, "Pending": 2, @@ -354,6 +356,67 @@ def _namespace_allowed(namespace: str) -> bool: return namespace not in _SYSTEM_NAMESPACES +def _event_timestamp(event: dict[str, Any]) -> str: + for key in ("eventTime", "lastTimestamp", "firstTimestamp"): + value = event.get(key) + if isinstance(value, str) and value: + return value + return "" + + +def _event_sort_key(timestamp: str) -> float: + if not timestamp: + return 0.0 + try: + return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp() + except ValueError: + return 0.0 + + +def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]: + warnings: list[dict[str, Any]] = [] + by_reason: dict[str, int] = {} + by_namespace: dict[str, int] = {} + for event in _items(payload): + metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {} + namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else "" + if not _namespace_allowed(namespace): + continue + event_type = event.get("type") if isinstance(event.get("type"), str) else "" + if event_type != _EVENT_WARNING: + continue + reason = event.get("reason") if isinstance(event.get("reason"), str) else "" + message = event.get("message") if isinstance(event.get("message"), str) else "" + count = event.get("count") if isinstance(event.get("count"), int) else 1 + involved = ( + event.get("involvedObject") if isinstance(event.get("involvedObject"), dict) else {} + ) + timestamp = _event_timestamp(event) + warnings.append( + { + "namespace": namespace, + "reason": reason, + "message": message, + "count": count, + "last_seen": timestamp, + "object_kind": involved.get("kind") or "", + "object_name": involved.get("name") or "", + } + ) + if reason: + by_reason[reason] = by_reason.get(reason, 0) + count + if namespace: + by_namespace[namespace] = by_namespace.get(namespace, 0) + count + warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True) + top = warnings[:_EVENTS_MAX] + return { + "warnings_total": len(warnings), + "warnings_by_reason": by_reason, + "warnings_by_namespace": by_namespace, + "warnings_recent": top, + } + + def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]: for key in _WORKLOAD_LABEL_KEYS: value = labels.get(key) @@ -770,11 +833,29 @@ def _node_usage(errors: list[str]) -> dict[str, Any]: '* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', "node", ) + usage["disk"] = _vm_node_metric( + 'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} ' + '/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) ' + 'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))', + "node", + ) except Exception as exc: errors.append(f"node_usage: {exc}") return usage +def _pvc_usage(errors: list[str]) -> list[dict[str, Any]]: + try: + entries = _vm_vector( + "topk(5, max by (namespace,persistentvolumeclaim) " + "(kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100))" + ) + return _filter_namespace_vector(entries) + except Exception as exc: + errors.append(f"pvc_usage: {exc}") + return [] + + def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]: values: list[float] = [] for entry in series: @@ -823,6 +904,7 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: "ram": _usage_stats(metrics.get("node_usage", {}).get("ram", [])), "net": _usage_stats(metrics.get("node_usage", {}).get("net", [])), "io": _usage_stats(metrics.get("node_usage", {}).get("io", [])), + "disk": _usage_stats(metrics.get("node_usage", {}).get("disk", [])), } try: metrics["namespace_cpu_top"] = _filter_namespace_vector( @@ -837,14 +919,17 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: ) except Exception as exc: errors.append(f"namespace_usage: {exc}") + metrics["pvc_usage_top"] = _pvc_usage(errors) metrics["units"] = { "cpu": "percent", "ram": "percent", "net": "bytes_per_sec", "io": "bytes_per_sec", + "disk": "percent", "restarts": "count", "namespace_cpu": "cores", "namespace_mem": "bytes", + "pvc_used_percent": "percent", "capacity_cpu": "cores", "allocatable_cpu": "cores", "capacity_mem_bytes": "bytes", @@ -898,6 +983,13 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: except Exception as exc: errors.append(f"pods: {exc}") + events: dict[str, Any] = {} + try: + events_payload = get_json("/api/v1/events?limit=2000") + events = _summarize_events(events_payload) + except Exception as exc: + errors.append(f"events: {exc}") + metrics = _summarize_metrics(errors) snapshot = { @@ -911,6 +1003,7 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: "namespace_nodes": namespace_nodes, "node_pods": node_pods, "pod_issues": pod_issues, + "events": events, "metrics": metrics, "errors": errors, } diff --git a/tests/test_cluster_state.py b/tests/test_cluster_state.py index db5e358..75dc0e9 100644 --- a/tests/test_cluster_state.py +++ b/tests/test_cluster_state.py @@ -53,6 +53,8 @@ def test_collect_cluster_state(monkeypatch) -> None: } ] } + if path.startswith("/api/v1/events"): + return {"items": []} return { "items": [ { @@ -86,9 +88,11 @@ def test_collect_cluster_state(monkeypatch) -> None: assert snapshot["namespace_nodes"] assert snapshot["node_pods"] assert "pod_issues" in snapshot + assert snapshot["events"]["warnings_total"] == 0 assert "node_usage_stats" in snapshot["metrics"] assert snapshot["metrics"]["namespace_cpu_top"] == [] assert snapshot["metrics"]["namespace_mem_top"] == [] + assert snapshot["metrics"]["pvc_usage_top"] == [] assert summary.nodes_total == 2 assert summary.nodes_ready == 1 assert summary.pods_running == 5.0