diff --git a/ariadne/services/cluster_state.py b/ariadne/services/cluster_state.py index 967e242..eac6c60 100644 --- a/ariadne/services/cluster_state.py +++ b/ariadne/services/cluster_state.py @@ -86,6 +86,19 @@ _PENDING_15M_HOURS = 0.25 _LOAD_TOP_COUNT = 5 _NAMESPACE_TOP_COUNT = 5 _PVC_PRESSURE_THRESHOLD = 80.0 +_ALERT_TOP_LIMIT = 10 +_POD_REASON_LIMIT = 10 +_POD_REASON_TREND_LIMIT = 10 +_POD_TERMINATED_REASONS = { + "oom_killed": "OOMKilled", + "error": "Error", +} +_POD_WAITING_REASONS = { + "crash_loop": "CrashLoopBackOff", + "image_pull_backoff": "ImagePullBackOff", + "err_image_pull": "ErrImagePull", + "create_config_error": "CreateContainerConfigError", +} def _node_usage_by_hardware(node_load: list[dict[str, Any]], node_details: list[dict[str, Any]]) -> list[dict[str, Any]]: @@ -1333,6 +1346,77 @@ def _vm_vector(expr: str) -> list[dict[str, Any]]: return output +def _alert_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for item in entries: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + value = item.get("value") + name = metric.get("alertname") + if not isinstance(name, str) or not name: + continue + severity = metric.get("severity") if isinstance(metric.get("severity"), str) else "" + output.append( + { + "alert": name, + "severity": severity, + "value": value, + } + ) + output.sort(key=lambda item: (-(item.get("value") or 0), item.get("alert") or "")) + return output + + +def _vm_alerts_now() -> list[dict[str, Any]]: + entries = _vm_vector('sum by (alertname,severity) (ALERTS{alertstate="firing"})') + return _alert_entries(entries)[:_ALERT_TOP_LIMIT] + + +def _vm_alerts_trend(window: str) -> list[dict[str, Any]]: + entries = _vm_vector( + f"topk({_ALERT_TOP_LIMIT}, sum by (alertname,severity) (count_over_time(ALERTS{{alertstate=\"firing\"}}[{window}])))" + ) + return _alert_entries(entries) + + +def _alertmanager_alerts(errors: list[str]) -> list[dict[str, Any]]: + base = settings.alertmanager_url + if not base: + return [] + url = f"{base.rstrip('/')}/api/v2/alerts" + try: + with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client: + resp = client.get(url) + resp.raise_for_status() + payload = resp.json() + if isinstance(payload, list): + return [item for item in payload if isinstance(item, dict)] + except Exception as exc: + errors.append(f"alertmanager: {exc}") + return [] + + +def _summarize_alerts(alerts: list[dict[str, Any]]) -> dict[str, Any]: + items: list[dict[str, Any]] = [] + by_severity: dict[str, int] = {} + for alert in alerts: + labels = alert.get("labels") if isinstance(alert.get("labels"), dict) else {} + alertname = labels.get("alertname") + if not isinstance(alertname, str) or not alertname: + continue + severity = labels.get("severity") if isinstance(labels.get("severity"), str) else "" + items.append({"alert": alertname, "severity": severity}) + if severity: + by_severity[severity] = by_severity.get(severity, 0) + 1 + items.sort(key=lambda item: (item.get("severity") or "", item.get("alert") or "")) + return { + "total": len(items), + "by_severity": by_severity, + "items": items[:_ALERT_TOP_LIMIT], + } + + def _filter_namespace_vector(entries: list[dict[str, Any]]) -> list[dict[str, Any]]: output: list[dict[str, Any]] = [] for item in entries: @@ -1536,6 +1620,66 @@ def _job_failure_trend(window: str) -> list[dict[str, Any]]: return output +def _pod_reason_entries(expr: str, limit: int) -> list[dict[str, Any]]: + entries = _vm_vector(f"topk({limit}, sum by (namespace,pod) ({expr}))") + output: list[dict[str, Any]] = [] + for item in entries: + if not isinstance(item, dict): + continue + metric = item.get("metric") if isinstance(item.get("metric"), dict) else {} + namespace = metric.get("namespace") + pod = metric.get("pod") + if not isinstance(namespace, str) or not isinstance(pod, str): + continue + output.append( + { + "namespace": namespace, + "pod": pod, + "value": item.get("value"), + } + ) + output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("pod") or "")) + return output + + +def _pod_waiting_now() -> dict[str, list[dict[str, Any]]]: + output: dict[str, list[dict[str, Any]]] = {} + for key, reason in _POD_WAITING_REASONS.items(): + expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}' + output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT) + return output + + +def _pod_waiting_trends() -> dict[str, dict[str, list[dict[str, Any]]]]: + trends: dict[str, dict[str, list[dict[str, Any]]]] = {} + for key, reason in _POD_WAITING_REASONS.items(): + expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}' + trends[key] = { + window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT) + for window in _TREND_WINDOWS + } + return trends + + +def _pod_terminated_now() -> dict[str, list[dict[str, Any]]]: + output: dict[str, list[dict[str, Any]]] = {} + for key, reason in _POD_TERMINATED_REASONS.items(): + expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}' + output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT) + return output + + +def _pod_terminated_trends() -> dict[str, dict[str, list[dict[str, Any]]]]: + trends: dict[str, dict[str, list[dict[str, Any]]]] = {} + for key, reason in _POD_TERMINATED_REASONS.items(): + expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}' + trends[key] = { + window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT) + for window in _TREND_WINDOWS + } + return trends + + def _pods_phase_trends() -> dict[str, dict[str, dict[str, float | None]]]: phases = { "running": "sum(kube_pod_status_phase{phase=\"Running\"})", @@ -1993,10 +2137,31 @@ def _collect_trend_metrics(metrics: dict[str, Any], errors: list[str]) -> None: } metrics["pods_phase_trends"] = _pods_phase_trends() metrics["pvc_usage_trends"] = _pvc_usage_trends() + metrics["pod_waiting_now"] = _pod_waiting_now() + metrics["pod_waiting_trends"] = _pod_waiting_trends() + metrics["pod_terminated_now"] = _pod_terminated_now() + metrics["pod_terminated_trends"] = _pod_terminated_trends() except Exception as exc: errors.append(f"trends: {exc}") +def _collect_alert_metrics(metrics: dict[str, Any], errors: list[str]) -> None: + try: + vm_now = _vm_alerts_now() + vm_trends = {window: _vm_alerts_trend(window) for window in _TREND_WINDOWS} + alertmanager_alerts = _alertmanager_alerts(errors) + metrics["alerts"] = { + "vm": { + "active": vm_now, + "active_total": len(vm_now), + }, + "alertmanager": _summarize_alerts(alertmanager_alerts) if alertmanager_alerts else {}, + "trends": vm_trends, + } + except Exception as exc: + errors.append(f"alerts: {exc}") + + def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None: try: metrics["namespace_cpu_top"] = _filter_namespace_vector( @@ -2110,6 +2275,7 @@ def _summarize_metrics(errors: list[str]) -> dict[str, Any]: _collect_vm_core(metrics, errors) _collect_node_metrics(metrics, errors) _collect_trend_metrics(metrics, errors) + _collect_alert_metrics(metrics, errors) _collect_namespace_metrics(metrics, errors) metrics["pvc_usage_top"] = _pvc_usage(errors) metrics["trend_summary"] = _trend_summary(metrics) @@ -2146,6 +2312,16 @@ def _trend_summary(metrics: dict[str, Any]) -> dict[str, Any]: return summary +def _build_offenders(metrics: dict[str, Any]) -> dict[str, Any]: + offenders: dict[str, Any] = {} + offenders["pod_restarts_1h"] = _pod_restarts_top(metrics) + offenders["pod_waiting_now"] = metrics.get("pod_waiting_now") or {} + offenders["pod_terminated_now"] = metrics.get("pod_terminated_now") or {} + offenders["job_failures_24h"] = metrics.get("job_failures_24h") or [] + offenders["pvc_pressure"] = _pvc_pressure_entries(metrics) + return offenders + + def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]: entries = [ {"namespace": name, "value": value} @@ -3122,6 +3298,8 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]: }, "pressure_summary": pressure_summary, "trend_summary": metrics.get("trend_summary"), + "offenders": _build_offenders(metrics), + "alerts": metrics.get("alerts", {}), "top": { "namespace_cpu": (metrics.get("namespace_totals", {}) or {}).get("cpu", [])[:5], "namespace_mem": (metrics.get("namespace_totals", {}) or {}).get("mem", [])[:5], diff --git a/ariadne/settings.py b/ariadne/settings.py index f663db2..aef518e 100644 --- a/ariadne/settings.py +++ b/ariadne/settings.py @@ -189,6 +189,7 @@ class Settings: k8s_api_timeout_sec: float vm_url: str cluster_state_vm_timeout_sec: float + alertmanager_url: str mailu_sync_cron: str nextcloud_sync_cron: str @@ -469,6 +470,7 @@ class Settings: "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", ).rstrip("/"), "cluster_state_vm_timeout_sec": _env_float("ARIADNE_CLUSTER_STATE_VM_TIMEOUT_SEC", 5.0), + "alertmanager_url": _env("ARIADNE_ALERTMANAGER_URL", "").rstrip("/"), "cluster_state_cron": _env("ARIADNE_SCHEDULE_CLUSTER_STATE", "*/15 * * * *"), "cluster_state_keep": _env_int("ARIADNE_CLUSTER_STATE_KEEP", 168), }