refactor(ariadne): split cluster state domains

This commit is contained in:
codex 2026-04-21 02:01:10 -03:00
parent 0fa6138612
commit 152c19665e
18 changed files with 3666 additions and 3509 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,188 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
from .cluster_state_relationships import *
def _severity_rank(value: Any) -> int:
if value == "critical":
return 0
if value == "warning":
return 1
return 2
def _pvc_pressure_signals(metrics: dict[str, Any]) -> list[dict[str, Any]]:
pvc_top = _pvc_top(metrics.get("pvc_usage_top", []))
if not pvc_top:
return []
output: list[dict[str, Any]] = []
for entry in pvc_top:
used = entry.get("used_percent")
if not isinstance(used, (int, float)) or used < _PVC_PRESSURE_THRESHOLD:
continue
output.append(
{
"scope": "pvc",
"target": f"{entry.get('namespace')}/{entry.get('pvc')}",
"metric": "used_percent",
"current": used,
"severity": "warning" if used < _PVC_CRITICAL_THRESHOLD else "critical",
}
)
return output
def _build_anomalies(
metrics: dict[str, Any],
nodes_summary: dict[str, Any],
workloads_health: dict[str, Any],
kustomizations: dict[str, Any],
events: dict[str, Any],
) -> list[dict[str, Any]]:
anomalies: list[dict[str, Any]] = []
_append_pod_anomalies(anomalies, metrics)
_append_workload_anomalies(anomalies, workloads_health)
_append_flux_anomalies(anomalies, kustomizations)
_append_job_failure_anomalies(anomalies, metrics)
_append_pvc_anomalies(anomalies, metrics)
_append_node_anomalies(anomalies, nodes_summary)
_append_event_anomalies(anomalies, events)
return anomalies
def _append_pod_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
pods_pending = metrics.get("pods_pending") or 0
pods_failed = metrics.get("pods_failed") or 0
if pods_pending:
anomalies.append(
{
"kind": "pods_pending",
"severity": "warning",
"summary": f"{int(pods_pending)} pods pending",
}
)
if pods_failed:
anomalies.append(
{
"kind": "pods_failed",
"severity": "critical",
"summary": f"{int(pods_failed)} pods failed",
}
)
def _append_workload_anomalies(
anomalies: list[dict[str, Any]], workloads_health: dict[str, Any]
) -> None:
for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
not_ready = entry.get("not_ready") or 0
if not_ready:
anomalies.append(
{
"kind": f"{key}_not_ready",
"severity": "warning",
"summary": f"{int(not_ready)} {key} not ready",
"items": entry.get("items"),
}
)
def _append_flux_anomalies(anomalies: list[dict[str, Any]], kustomizations: dict[str, Any]) -> None:
flux_not_ready = (kustomizations or {}).get("not_ready") or 0
if flux_not_ready:
anomalies.append(
{
"kind": "flux_not_ready",
"severity": "warning",
"summary": f"{int(flux_not_ready)} Flux kustomizations not ready",
"items": (kustomizations or {}).get("items"),
}
)
def _append_job_failure_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
job_failures = metrics.get("job_failures_24h") or []
job_failures = [
entry for entry in job_failures if isinstance(entry, dict) and (entry.get("value") or 0) > 0
]
if job_failures:
anomalies.append(
{
"kind": "job_failures_24h",
"severity": "warning",
"summary": "Job failures in last 24h",
"items": job_failures[:5],
}
)
def _append_pvc_anomalies(anomalies: list[dict[str, Any]], metrics: dict[str, Any]) -> None:
pvc_pressure = _pvc_pressure_entries(metrics)
if pvc_pressure:
anomalies.append(
{
"kind": "pvc_pressure",
"severity": "warning",
"summary": f"PVCs above {_PVC_PRESSURE_THRESHOLD:.0f}% usage",
"items": pvc_pressure[:5],
}
)
def _pvc_pressure_entries(metrics: dict[str, Any]) -> list[dict[str, Any]]:
pvc_top = _pvc_top(metrics.get("pvc_usage_top") or [])
return [
entry
for entry in pvc_top
if isinstance(entry, dict)
and isinstance(entry.get("used_percent"), (int, float))
and float(entry.get("used_percent") or 0) >= _PVC_PRESSURE_THRESHOLD
]
def _append_node_anomalies(anomalies: list[dict[str, Any]], nodes_summary: dict[str, Any]) -> None:
if not nodes_summary:
return
pressure_nodes = nodes_summary.get("pressure_nodes") or {}
flagged = [
name for names in pressure_nodes.values() if isinstance(names, list) for name in names if name
]
if flagged:
anomalies.append(
{
"kind": "node_pressure",
"severity": "warning",
"summary": f"{len(flagged)} nodes report pressure",
"items": sorted(set(flagged)),
}
)
unschedulable = nodes_summary.get("unschedulable_nodes") or []
if unschedulable:
anomalies.append(
{
"kind": "unschedulable_nodes",
"severity": "info",
"summary": f"{len(unschedulable)} nodes unschedulable",
"items": unschedulable,
}
)
def _append_event_anomalies(anomalies: list[dict[str, Any]], events: dict[str, Any]) -> None:
if not events:
return
warnings = events.get("warnings_total") or 0
if warnings:
anomalies.append(
{
"kind": "event_warnings",
"severity": "info",
"summary": f"{int(warnings)} warning events",
"items": events.get("warnings") or [],
}
)
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,122 @@
from __future__ import annotations
from typing import Any
from .cluster_state_anomalies import *
from .cluster_state_contract import *
from .cluster_state_health import *
def _node_attention_score(node: dict[str, Any]) -> tuple[float, list[str]]:
score = 0.0
reasons: list[str] = []
disk = node.get("disk")
if isinstance(disk, (int, float)) and disk >= _NODE_DISK_ALERT:
score += 3 + (disk - _NODE_DISK_ALERT) / 10
reasons.append(f"disk {disk:.1f}%")
cpu = node.get("cpu")
if isinstance(cpu, (int, float)) and cpu >= _NODE_CPU_ALERT:
score += 2 + (cpu - _NODE_CPU_ALERT) / 20
reasons.append(f"cpu {cpu:.1f}%")
ram = node.get("ram")
if isinstance(ram, (int, float)) and ram >= _NODE_RAM_ALERT:
score += 2 + (ram - _NODE_RAM_ALERT) / 20
reasons.append(f"ram {ram:.1f}%")
baseline = node.get("baseline") if isinstance(node.get("baseline"), dict) else {}
for key, label, multiplier in (("net", "net", _NET_SPIKE_MULTIPLIER), ("io", "io", _IO_SPIKE_MULTIPLIER)):
current = node.get(key)
base = baseline.get(key) if isinstance(baseline.get(key), dict) else {}
base_max = base.get("max")
if isinstance(current, (int, float)) and isinstance(base_max, (int, float)) and base_max > 0:
if current > base_max * multiplier:
score += 1.5
reasons.append(f"{label} {current:.2f} > {multiplier:.1f}x baseline")
pressure = node.get("pressure_flags") if isinstance(node.get("pressure_flags"), list) else []
if pressure:
score += 2
reasons.append("pressure flags")
return score, reasons
def _node_attention_entries(node_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for node in node_context:
if not isinstance(node, dict):
continue
name = node.get("node")
if not isinstance(name, str) or not name:
continue
score, reasons = _node_attention_score(node)
if score > 0:
entries.append(
{
"kind": "node",
"target": name,
"score": round(score, 2),
"reasons": reasons,
}
)
return entries
def _pvc_attention_entries(metrics: dict[str, Any]) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for item in _pvc_pressure_entries(metrics):
if not isinstance(item, dict):
continue
used = float(item.get("used_percent") or 0)
entries.append(
{
"kind": "pvc",
"target": f"{item.get('namespace')}/{item.get('pvc')}",
"score": round(1 + (used - _PVC_PRESSURE_THRESHOLD) / 10, 2),
"reasons": [f"usage {used:.1f}%"],
}
)
return entries
def _pod_attention_entries(pod_issues: dict[str, Any]) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
pending = pod_issues.get("pending_over_15m") or 0
if pending:
entries.append(
{
"kind": "pods",
"target": "pending",
"score": float(pending),
"reasons": [f"{int(pending)} pending >15m"],
}
)
return entries
def _workload_attention_entries(workloads_health: dict[str, Any]) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
for item in _workload_not_ready_items(workloads_health)[:5]:
entries.append(
{
"kind": "workload",
"target": f"{item.get('namespace')}/{item.get('name')}",
"score": 2.0,
"reasons": [f"{item.get('ready')}/{item.get('desired')} ready"],
}
)
return entries
def _build_attention_ranked(
metrics: dict[str, Any],
node_context: list[dict[str, Any]],
pod_issues: dict[str, Any],
workloads_health: dict[str, Any],
) -> list[dict[str, Any]]:
entries = (
_node_attention_entries(node_context)
+ _pvc_attention_entries(metrics)
+ _pod_attention_entries(pod_issues)
+ _workload_attention_entries(workloads_health)
)
entries.sort(key=lambda item: (-(item.get("score") or 0), item.get("kind") or "", item.get("target") or ""))
return entries[:5]
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,121 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
_VALUE_PAIR_LEN = 2
_RATE_WINDOW = "5m"
_RESTARTS_WINDOW = "1h"
_BASELINE_WINDOW = "24h"
_TREND_WINDOWS = ("1h", "6h", "24h")
_TREND_NODE_LIMIT = 30
_TREND_NAMESPACE_LIMIT = 20
_TREND_PVC_LIMIT = 10
_TREND_JOB_LIMIT = 10
_TREND_POD_LIMIT = 15
_NODE_DISK_ALERT = 80.0
_NODE_CPU_ALERT = 80.0
_NODE_RAM_ALERT = 80.0
_NET_SPIKE_MULTIPLIER = 2.0
_IO_SPIKE_MULTIPLIER = 2.0
_NODE_UNAME_LABEL = 'node_uname_info{nodename!=""}'
_WORKLOAD_LABEL_KEYS = (
"app.kubernetes.io/name",
"app",
"k8s-app",
"app.kubernetes.io/instance",
"release",
)
_SYSTEM_NAMESPACES = {
"kube-system",
"kube-public",
"kube-node-lease",
"flux-system",
"monitoring",
"logging",
"traefik",
"cert-manager",
"maintenance",
"postgres",
"vault",
}
_WORKLOAD_ALLOWED_NAMESPACES = {
"maintenance",
}
_BASELINE_DELTA_WARN = 50.0
_BASELINE_DELTA_CRIT = 100.0
_SIGNAL_LIMIT = 15
_PROFILE_LIMIT = 6
_WORKLOAD_INDEX_LIMIT = 20
_NODE_WORKLOAD_LIMIT = 12
_NODE_WORKLOAD_TOP = 3
_EVENTS_SUMMARY_LIMIT = 5
_PVC_CRITICAL_THRESHOLD = 90.0
_CAPACITY_KEYS = {
"cpu",
"memory",
"pods",
"ephemeral-storage",
}
_PRESSURE_TYPES = {
"MemoryPressure",
"DiskPressure",
"PIDPressure",
"NetworkUnavailable",
}
_EVENTS_MAX = 20
_EVENT_WARNING = "Warning"
_PHASE_SEVERITY = {
"Failed": 3,
"Pending": 2,
"Unknown": 1,
}
_PENDING_15M_HOURS = 0.25
_LOAD_TOP_COUNT = 5
_NAMESPACE_TOP_COUNT = 5
_PVC_PRESSURE_THRESHOLD = 80.0
_ALERT_TOP_LIMIT = 10
_POD_REASON_LIMIT = 10
_POD_REASON_TREND_LIMIT = 10
_NAMESPACE_ISSUE_LIMIT = 8
_CROSS_NODE_TOP = 3
_CROSS_NAMESPACE_TOP = 3
_CROSS_PVC_TOP = 3
_POD_TERMINATED_REASONS = {
"oom_killed": "OOMKilled",
"error": "Error",
}
_POD_WAITING_REASONS = {
"crash_loop": "CrashLoopBackOff",
"image_pull_backoff": "ImagePullBackOff",
"err_image_pull": "ErrImagePull",
"create_config_error": "CreateContainerConfigError",
}
_DELTA_TOP_LIMIT = 6
_REASON_TOP_LIMIT = 5
@dataclass(frozen=True)
class ClusterStateSummary:
nodes_total: int | None
nodes_ready: int | None
pods_running: int | None
kustomizations_not_ready: int | None
errors: int
@dataclass(frozen=True)
class SignalContext:
metrics: dict[str, Any]
node_context: list[dict[str, Any]]
namespace_context: list[dict[str, Any]]
workloads_health: dict[str, Any]
pod_issues: dict[str, Any]
kustomizations: dict[str, Any]
def _items(payload: dict[str, Any]) -> list[dict[str, Any]]:
items = payload.get("items") if isinstance(payload.get("items"), list) else []
return [item for item in items if isinstance(item, dict)]
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,104 @@
from __future__ import annotations
import sys
from typing import Any
from ..k8s.client import get_json as _default_get_json
from .cluster_state_flux_events import *
from .cluster_state_nodes import *
from .cluster_state_pods import *
from .cluster_state_workloads import *
def _get_json(path: str) -> dict[str, Any]:
facade = sys.modules.get("ariadne.services.cluster_state")
getter = getattr(facade, "get_json", _default_get_json) if facade is not None else _default_get_json
return getter(path)
def _fetch_nodes(errors: list[str]) -> tuple[dict[str, Any], list[dict[str, Any]], dict[str, Any]]:
nodes: dict[str, Any] = {}
details: list[dict[str, Any]] = []
summary: dict[str, Any] = {}
try:
payload = _get_json("/api/v1/nodes")
nodes = _summarize_nodes(payload)
details = _node_details(payload)
summary = _summarize_inventory(details)
except Exception as exc:
errors.append(f"nodes: {exc}")
return nodes, details, summary
def _fetch_flux(errors: list[str]) -> dict[str, Any]:
try:
payload = _get_json(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations"
)
return _summarize_kustomizations(payload)
except Exception as exc:
errors.append(f"flux: {exc}")
return {}
def _fetch_pods(
errors: list[str],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
workloads: list[dict[str, Any]] = []
namespace_pods: list[dict[str, Any]] = []
namespace_nodes: list[dict[str, Any]] = []
node_pods: list[dict[str, Any]] = []
pod_issues: dict[str, Any] = {}
try:
pods_payload = _get_json("/api/v1/pods?limit=5000")
workloads = _summarize_workloads(pods_payload)
namespace_pods = _summarize_namespace_pods(pods_payload)
namespace_nodes = _summarize_namespace_nodes(pods_payload)
node_pods = _summarize_node_pods(pods_payload)
pod_issues = _summarize_pod_issues(pods_payload)
except Exception as exc:
errors.append(f"pods: {exc}")
return workloads, namespace_pods, namespace_nodes, node_pods, pod_issues
def _fetch_jobs(errors: list[str]) -> dict[str, Any]:
try:
jobs_payload = _get_json("/apis/batch/v1/jobs?limit=2000")
return _summarize_jobs(jobs_payload)
except Exception as exc:
errors.append(f"jobs: {exc}")
return {}
def _fetch_longhorn(errors: list[str]) -> dict[str, Any]:
try:
payload = _get_json(
"/apis/longhorn.io/v1beta2/namespaces/longhorn-system/volumes"
)
return _summarize_longhorn_volumes(payload)
except Exception as exc:
errors.append(f"longhorn: {exc}")
return {}
def _fetch_workload_health(errors: list[str]) -> dict[str, Any]:
try:
deployments_payload = _get_json("/apis/apps/v1/deployments?limit=2000")
statefulsets_payload = _get_json("/apis/apps/v1/statefulsets?limit=2000")
daemonsets_payload = _get_json("/apis/apps/v1/daemonsets?limit=2000")
deployments = _summarize_deployments(deployments_payload)
statefulsets = _summarize_statefulsets(statefulsets_payload)
daemonsets = _summarize_daemonsets(daemonsets_payload)
return _summarize_workload_health(deployments, statefulsets, daemonsets)
except Exception as exc:
errors.append(f"workloads_health: {exc}")
return {}
def _fetch_events(errors: list[str]) -> dict[str, Any]:
try:
events_payload = _get_json("/api/v1/events?limit=2000")
return _summarize_events(events_payload)
except Exception as exc:
errors.append(f"events: {exc}")
return {}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,117 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from .cluster_state_contract import *
from .cluster_state_nodes import *
def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]:
not_ready: list[dict[str, Any]] = []
for item in _items(payload):
metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
spec = item.get("spec") if isinstance(item.get("spec"), dict) else {}
status = item.get("status") if isinstance(item.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
conditions = status.get("conditions")
ready, reason, message = _condition_status(conditions, "Ready")
suspended = bool(spec.get("suspend"))
if ready is True and not suspended:
continue
not_ready.append(
{
"name": name,
"namespace": namespace,
"ready": ready,
"suspended": suspended,
"reason": reason,
"message": message,
}
)
not_ready.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(_items(payload)),
"not_ready": len(not_ready),
"items": not_ready,
}
def _namespace_allowed(namespace: str) -> bool:
if not namespace:
return False
if namespace in _WORKLOAD_ALLOWED_NAMESPACES:
return True
return namespace not in _SYSTEM_NAMESPACES
def _event_timestamp(event: dict[str, Any]) -> str:
for key in ("eventTime", "lastTimestamp", "firstTimestamp"):
value = event.get(key)
if isinstance(value, str) and value:
return value
return ""
def _event_sort_key(timestamp: str) -> float:
if not timestamp:
return 0.0
try:
return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp()
except ValueError:
return 0.0
def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]:
warnings: list[dict[str, Any]] = []
by_reason: dict[str, int] = {}
by_namespace: dict[str, int] = {}
for event in _items(payload):
metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
event_type = event.get("type") if isinstance(event.get("type"), str) else ""
if event_type != _EVENT_WARNING:
continue
reason = event.get("reason") if isinstance(event.get("reason"), str) else ""
message = event.get("message") if isinstance(event.get("message"), str) else ""
count = event.get("count") if isinstance(event.get("count"), int) else 1
involved = (
event.get("involvedObject") if isinstance(event.get("involvedObject"), dict) else {}
)
timestamp = _event_timestamp(event)
warnings.append(
{
"namespace": namespace,
"reason": reason,
"message": message,
"count": count,
"last_seen": timestamp,
"object_kind": involved.get("kind") or "",
"object_name": involved.get("name") or "",
}
)
if reason:
by_reason[reason] = by_reason.get(reason, 0) + count
if namespace:
by_namespace[namespace] = by_namespace.get(namespace, 0) + count
warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True)
top = warnings[:_EVENTS_MAX]
top_reason = ""
top_reason_count = 0
if by_reason:
top_reason, top_reason_count = sorted(
by_reason.items(), key=lambda item: (-item[1], item[0])
)[0]
latest_warning = top[0] if top else None
return {
"warnings_total": len(warnings),
"warnings_by_reason": by_reason,
"warnings_by_namespace": by_namespace,
"warnings_recent": top,
"warnings_top_reason": {"reason": top_reason, "count": top_reason_count},
"warnings_latest": latest_warning,
}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,77 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
def _health_bullets(
metrics: dict[str, Any],
nodes_summary: dict[str, Any],
workloads_health: dict[str, Any],
anomalies: list[dict[str, Any]],
) -> list[str]:
bullets: list[str] = []
nodes_total = metrics.get("nodes_total")
nodes_ready = metrics.get("nodes_ready")
if nodes_total is not None and nodes_ready is not None:
bullets.append(f"Nodes ready: {int(nodes_ready)}/{int(nodes_total)}")
pods_running = metrics.get("pods_running") or 0
pods_pending = metrics.get("pods_pending") or 0
pods_failed = metrics.get("pods_failed") or 0
bullets.append(f"Pods: {int(pods_running)} running, {int(pods_pending)} pending, {int(pods_failed)} failed")
not_ready = 0
for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
not_ready += int(entry.get("not_ready") or 0)
if not_ready:
bullets.append(f"Workloads not ready: {not_ready}")
else:
bullets.append("Workloads: all ready")
if anomalies:
top = anomalies[0].get("summary") if isinstance(anomalies[0], dict) else None
if isinstance(top, str) and top:
bullets.append(f"Top concern: {top}")
return bullets[:4]
def _workload_not_ready_items(workloads_health: dict[str, Any]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for key in ("deployments", "statefulsets", "daemonsets"):
entry = workloads_health.get(key) if isinstance(workloads_health.get(key), dict) else {}
for item in entry.get("items") or []:
if not isinstance(item, dict):
continue
output.append(
{
"kind": key[:-1],
"namespace": item.get("namespace") or "",
"name": item.get("name") or "",
"desired": item.get("desired"),
"ready": item.get("ready"),
}
)
output.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return output
def _pod_restarts_top(metrics: dict[str, Any]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in metrics.get("top_restarts_1h") or []:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
if not isinstance(namespace, str) or not isinstance(pod, str):
continue
output.append(
{
"namespace": namespace,
"pod": pod,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or ""))
return output[:5]
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,361 @@
from __future__ import annotations
from typing import Any
from .cluster_state_anomalies import *
from .cluster_state_contract import *
from .cluster_state_flux_events import *
from .cluster_state_health import *
from .cluster_state_vm_client import *
from .cluster_state_vm_trends import *
from .cluster_state_vm_usage import *
def _collect_vm_core(metrics: dict[str, Any], errors: list[str]) -> None:
try:
metrics["nodes_total"] = _vm_scalar("count(kube_node_info)")
metrics["nodes_ready"] = _vm_scalar(
"count(kube_node_status_condition{condition=\"Ready\",status=\"true\"})"
)
metrics["capacity_cpu"] = _vm_scalar("sum(kube_node_status_capacity_cpu_cores)")
metrics["allocatable_cpu"] = _vm_scalar("sum(kube_node_status_allocatable_cpu_cores)")
metrics["capacity_mem_bytes"] = _vm_scalar("sum(kube_node_status_capacity_memory_bytes)")
metrics["allocatable_mem_bytes"] = _vm_scalar("sum(kube_node_status_allocatable_memory_bytes)")
metrics["capacity_pods"] = _vm_scalar("sum(kube_node_status_capacity_pods)")
metrics["allocatable_pods"] = _vm_scalar("sum(kube_node_status_allocatable_pods)")
metrics["pods_running"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Running\"})")
metrics["pods_pending"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Pending\"})")
metrics["pods_failed"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Failed\"})")
metrics["pods_succeeded"] = _vm_scalar("sum(kube_pod_status_phase{phase=\"Succeeded\"})")
metrics["top_restarts_1h"] = _vm_vector(
f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
)
metrics["restart_namespace_top"] = _filter_namespace_vector(
_vm_vector(
f"topk(5, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{_RESTARTS_WINDOW}])))"
)
)
metrics["pod_cpu_top"] = _filter_namespace_vector(
_vm_vector(
f'topk(5, sum by (namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))'
)
)
metrics["pod_cpu_top_node"] = _filter_namespace_vector(
_vm_vector(
f'topk(5, sum by (node,namespace,pod) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]) * on (namespace,pod) group_left(node) kube_pod_info))'
)
)
metrics["pod_mem_top"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"}))"
)
)
metrics["pod_mem_top_node"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (node,namespace,pod) (container_memory_working_set_bytes{namespace!=\"\"} * on (namespace,pod) group_left(node) kube_pod_info))"
)
)
metrics["job_failures_24h"] = _vm_vector(
"topk(5, sum by (namespace,job_name) (increase(kube_job_status_failed[24h])))"
)
except Exception as exc:
errors.append(f"vm: {exc}")
def _collect_node_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
metrics["postgres_connections"] = _postgres_connections(errors)
metrics["hottest_nodes"] = _hottest_nodes(errors)
metrics["node_usage"] = _node_usage(errors)
metrics["node_usage_stats"] = {
"cpu": _usage_stats(metrics.get("node_usage", {}).get("cpu", [])),
"ram": _usage_stats(metrics.get("node_usage", {}).get("ram", [])),
"net": _usage_stats(metrics.get("node_usage", {}).get("net", [])),
"io": _usage_stats(metrics.get("node_usage", {}).get("io", [])),
"disk": _usage_stats(metrics.get("node_usage", {}).get("disk", [])),
}
try:
node_exprs = _node_usage_exprs()
node_baseline_map: dict[str, dict[str, dict[str, float]]] = {}
for key, expr in node_exprs.items():
baseline = _vm_baseline_map(expr, "node", _BASELINE_WINDOW)
metrics.setdefault("node_baseline", {})[key] = _baseline_map_to_list(baseline, "node")
for name, stats in baseline.items():
node_baseline_map.setdefault(name, {})[key] = stats
metrics["node_baseline_map"] = node_baseline_map
except Exception as exc:
errors.append(f"baseline: {exc}")
def _collect_trend_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try:
metrics["node_trends"] = _build_metric_trends(
_node_usage_exprs(),
"node",
"node",
_TREND_WINDOWS,
_TREND_NODE_LIMIT,
)
metrics["namespace_trends"] = _build_metric_trends(
_namespace_usage_exprs(),
"namespace",
"namespace",
_TREND_WINDOWS,
_TREND_NAMESPACE_LIMIT,
)
metrics["namespace_request_trends"] = _build_metric_trends(
_namespace_request_exprs(),
"namespace",
"namespace",
_TREND_WINDOWS,
_TREND_NAMESPACE_LIMIT,
)
metrics["restart_trends"] = {
window: _restart_namespace_trend(window) for window in _TREND_WINDOWS
}
metrics["job_failure_trends"] = {
window: _job_failure_trend(window) for window in _TREND_WINDOWS
}
metrics["pods_phase_trends"] = _pods_phase_trends()
metrics["pvc_usage_trends"] = _pvc_usage_trends()
metrics["pod_waiting_now"] = _pod_waiting_now()
metrics["pod_waiting_trends"] = _pod_waiting_trends()
metrics["pod_terminated_now"] = _pod_terminated_now()
metrics["pod_terminated_trends"] = _pod_terminated_trends()
metrics["cluster_trends"] = _cluster_trends()
metrics["node_condition_trends"] = _node_condition_trends()
metrics["pod_reason_totals"] = {
"waiting": _pod_reason_totals(
_POD_WAITING_REASONS,
"kube_pod_container_status_waiting_reason",
),
"terminated": _pod_reason_totals(
_POD_TERMINATED_REASONS,
"kube_pod_container_status_terminated_reason",
),
}
except Exception as exc:
errors.append(f"trends: {exc}")
def _collect_issue_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try:
waiting_series = "kube_pod_container_status_waiting_reason"
terminated_series = "kube_pod_container_status_terminated_reason"
metrics["namespace_issue_top"] = {
"crash_loop": _namespace_reason_entries(
f'{waiting_series}{{reason="CrashLoopBackOff"}}',
_NAMESPACE_ISSUE_LIMIT,
),
"image_pull": _namespace_reason_entries(
f'{waiting_series}{{reason="ImagePullBackOff"}}',
_NAMESPACE_ISSUE_LIMIT,
),
"err_image_pull": _namespace_reason_entries(
f'{waiting_series}{{reason="ErrImagePull"}}',
_NAMESPACE_ISSUE_LIMIT,
),
"config_error": _namespace_reason_entries(
f'{waiting_series}{{reason="CreateContainerConfigError"}}',
_NAMESPACE_ISSUE_LIMIT,
),
"oom_killed": _namespace_reason_entries(
f'{terminated_series}{{reason="OOMKilled"}}',
_NAMESPACE_ISSUE_LIMIT,
),
"terminated_error": _namespace_reason_entries(
f'{terminated_series}{{reason="Error"}}',
_NAMESPACE_ISSUE_LIMIT,
),
}
except Exception as exc:
errors.append(f"issues: {exc}")
def _collect_alert_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try:
vm_now = _vm_alerts_now()
vm_trends = {window: _vm_alerts_trend(window) for window in _TREND_WINDOWS}
alertmanager_alerts = _alertmanager_alerts(errors)
metrics["alerts"] = {
"vm": {
"active": vm_now,
"active_total": len(vm_now),
},
"alertmanager": _summarize_alerts(alertmanager_alerts) if alertmanager_alerts else {},
"trends": vm_trends,
}
except Exception as exc:
errors.append(f"alerts: {exc}")
def _collect_namespace_metrics(metrics: dict[str, Any], errors: list[str]) -> None:
try:
metrics["namespace_cpu_top"] = _filter_namespace_vector(
_vm_vector(
f'topk(5, sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}])))'
)
)
metrics["namespace_mem_top"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (namespace) (container_memory_working_set_bytes{namespace!=\"\"}))"
)
)
metrics["namespace_cpu_requests_top"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (namespace) (kube_pod_container_resource_requests_cpu_cores))"
)
)
metrics["namespace_mem_requests_top"] = _filter_namespace_vector(
_vm_vector(
"topk(5, sum by (namespace) (kube_pod_container_resource_requests_memory_bytes))"
)
)
metrics["namespace_net_top"] = _filter_namespace_vector(
_vm_vector(
f"topk(5, sum by (namespace) (rate(container_network_receive_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}]) + rate(container_network_transmit_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}])))"
)
)
metrics["namespace_io_top"] = _filter_namespace_vector(
_vm_vector(
f"topk(5, sum by (namespace) (rate(container_fs_reads_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}]) + rate(container_fs_writes_bytes_total{{namespace!=\"\"}}[{_RATE_WINDOW}])))"
)
)
namespace_cpu_usage = _vm_namespace_totals(
f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))'
)
namespace_cpu_requests = _vm_namespace_totals(
"sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)"
)
namespace_mem_usage = _vm_namespace_totals(
'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})'
)
namespace_mem_requests = _vm_namespace_totals(
"sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)"
)
metrics["namespace_capacity"] = _build_namespace_capacity(
namespace_cpu_usage,
namespace_cpu_requests,
namespace_mem_usage,
namespace_mem_requests,
)
metrics["namespace_totals"] = {
"cpu": _namespace_totals_list(namespace_cpu_usage),
"mem": _namespace_totals_list(namespace_mem_usage),
"cpu_requests": _namespace_totals_list(namespace_cpu_requests),
"mem_requests": _namespace_totals_list(namespace_mem_requests),
}
except Exception as exc:
errors.append(f"namespace_usage: {exc}")
try:
namespace_exprs = _namespace_usage_exprs()
namespace_baseline_map: dict[str, dict[str, dict[str, float]]] = {}
for key, expr in namespace_exprs.items():
baseline = _vm_baseline_map(expr, "namespace", _BASELINE_WINDOW)
metrics.setdefault("namespace_baseline", {})[key] = _baseline_map_to_list(baseline, "namespace")
for name, stats in baseline.items():
namespace_baseline_map.setdefault(name, {})[key] = stats
metrics["namespace_baseline_map"] = namespace_baseline_map
except Exception as exc:
errors.append(f"baseline: {exc}")
metrics["namespace_capacity_summary"] = _namespace_capacity_summary(
metrics.get("namespace_capacity", []),
)
def _finalize_metrics(metrics: dict[str, Any]) -> None:
metrics["units"] = {
"cpu": "percent",
"ram": "percent",
"net": "bytes_per_sec",
"io": "bytes_per_sec",
"disk": "percent",
"restarts": "count",
"pod_cpu": "cores",
"pod_mem": "bytes",
"pod_cpu_top_node": "cores",
"pod_mem_top_node": "bytes",
"job_failures_24h": "count",
"namespace_cpu": "cores",
"namespace_mem": "bytes",
"namespace_cpu_requests": "cores",
"namespace_mem_requests": "bytes",
"namespace_net": "bytes_per_sec",
"namespace_io": "bytes_per_sec",
"pvc_used_percent": "percent",
"capacity_cpu": "cores",
"allocatable_cpu": "cores",
"capacity_mem_bytes": "bytes",
"allocatable_mem_bytes": "bytes",
"capacity_pods": "count",
"allocatable_pods": "count",
}
metrics["windows"] = {
"rates": _RATE_WINDOW,
"restarts": _RESTARTS_WINDOW,
"trend": _TREND_WINDOWS,
}
def _summarize_metrics(errors: list[str]) -> dict[str, Any]:
metrics: dict[str, Any] = {}
_collect_vm_core(metrics, errors)
_collect_node_metrics(metrics, errors)
_collect_trend_metrics(metrics, errors)
_collect_alert_metrics(metrics, errors)
_collect_namespace_metrics(metrics, errors)
_collect_issue_metrics(metrics, errors)
metrics["pvc_usage_top"] = _pvc_usage(errors)
metrics["trend_summary"] = _trend_summary(metrics)
_finalize_metrics(metrics)
return metrics
def _trend_summary(metrics: dict[str, Any]) -> dict[str, Any]:
node_trends = metrics.get("node_trends", {}) if isinstance(metrics.get("node_trends"), dict) else {}
namespace_trends = (
metrics.get("namespace_trends", {}) if isinstance(metrics.get("namespace_trends"), dict) else {}
)
restarts = metrics.get("restart_trends", {}) if isinstance(metrics.get("restart_trends"), dict) else {}
job_failures = (
metrics.get("job_failure_trends", {}) if isinstance(metrics.get("job_failure_trends"), dict) else {}
)
summary: dict[str, Any] = {}
for metric_key, target in (("cpu", "node_cpu"), ("ram", "node_ram")):
metric_block = node_trends.get(metric_key, {}) if isinstance(node_trends.get(metric_key), dict) else {}
summary[target] = {
window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5)
for window in _TREND_WINDOWS
}
for metric_key, target in (("cpu", "namespace_cpu"), ("mem", "namespace_mem")):
metric_block = namespace_trends.get(metric_key, {}) if isinstance(namespace_trends.get(metric_key), dict) else {}
summary[target] = {
window: _limit_entries((metric_block.get(window) or {}).get("avg", []), 5)
for window in _TREND_WINDOWS
}
summary["restarts"] = {window: _limit_entries(entries or [], 5) for window, entries in restarts.items()}
summary["job_failures"] = {
window: _limit_entries(entries or [], 5) for window, entries in job_failures.items()
}
return summary
def _build_offenders(metrics: dict[str, Any]) -> dict[str, Any]:
offenders: dict[str, Any] = {}
offenders["pod_restarts_1h"] = _pod_restarts_top(metrics)
offenders["pod_waiting_now"] = metrics.get("pod_waiting_now") or {}
offenders["pod_terminated_now"] = metrics.get("pod_terminated_now") or {}
offenders["job_failures_24h"] = metrics.get("job_failures_24h") or []
offenders["pvc_pressure"] = _pvc_pressure_entries(metrics)
offenders["namespace_issues"] = metrics.get("namespace_issue_top") or {}
return offenders
def _namespace_totals_list(totals: dict[str, float]) -> list[dict[str, Any]]:
entries = [
{"namespace": name, "value": value}
for name, value in totals.items()
if isinstance(name, str) and name
]
entries.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or ""))
return entries
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,401 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from .cluster_state_contract import *
def _node_usage_by_hardware(node_load: list[dict[str, Any]], node_details: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not node_load or not node_details:
return []
hardware_by_node = _hardware_map(node_details)
buckets: dict[str, dict[str, list[float]]] = {}
for entry in node_load:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
hardware = hardware_by_node.get(node, "unknown")
_append_hardware_usage(buckets, str(hardware), entry)
return _finalize_hardware_usage(buckets)
def _hardware_map(node_details: list[dict[str, Any]]) -> dict[str, str]:
mapping: dict[str, str] = {}
for node in node_details:
if not isinstance(node, dict):
continue
name = node.get("name")
if isinstance(name, str) and name:
mapping[name] = str(node.get("hardware") or "unknown")
return mapping
def _append_hardware_usage(buckets: dict[str, dict[str, list[float]]], hardware: str, entry: dict[str, Any]) -> None:
bucket = buckets.setdefault(hardware, {"load_index": [], "cpu": [], "ram": [], "net": [], "io": []})
for key in ("load_index", "cpu", "ram", "net", "io"):
value = entry.get(key)
if isinstance(value, (int, float)):
bucket[key].append(float(value))
def _finalize_hardware_usage(buckets: dict[str, dict[str, list[float]]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for hardware, metrics in buckets.items():
row: dict[str, Any] = {"hardware": hardware}
for key, values in metrics.items():
if values:
row[key] = sum(values) / len(values)
output.append(row)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("hardware") or ""))
return output
def _node_ready(conditions: Any) -> bool:
if not isinstance(conditions, list):
return False
for condition in conditions:
if not isinstance(condition, dict):
continue
if condition.get("type") == "Ready":
return condition.get("status") == "True"
return False
def _summarize_nodes(payload: dict[str, Any]) -> dict[str, Any]:
names: list[str] = []
not_ready: list[str] = []
for node in _items(payload):
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
status = node.get("status") if isinstance(node.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
if not name:
continue
names.append(name)
if not _node_ready(status.get("conditions")):
not_ready.append(name)
names.sort()
not_ready.sort()
total = len(names)
ready = total - len(not_ready)
return {
"total": total,
"ready": ready,
"not_ready": len(not_ready),
"names": names,
"not_ready_names": not_ready,
}
def _node_labels(labels: dict[str, Any]) -> dict[str, Any]:
if not isinstance(labels, dict):
return {}
keep: dict[str, Any] = {}
for key, value in labels.items():
if key.startswith("node-role.kubernetes.io/"):
keep[key] = value
if key in {
"kubernetes.io/arch",
"kubernetes.io/hostname",
"beta.kubernetes.io/arch",
"hardware",
"jetson",
}:
keep[key] = value
return keep
def _node_addresses(status: dict[str, Any]) -> dict[str, str]:
addresses = status.get("addresses") if isinstance(status.get("addresses"), list) else []
output: dict[str, str] = {}
for addr in addresses:
if not isinstance(addr, dict):
continue
addr_type = addr.get("type")
addr_value = addr.get("address")
if isinstance(addr_type, str) and isinstance(addr_value, str):
output[addr_type] = addr_value
return output
def _node_details(payload: dict[str, Any]) -> list[dict[str, Any]]:
details: list[dict[str, Any]] = []
for node in _items(payload):
metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {}
spec = node.get("spec") if isinstance(node.get("spec"), dict) else {}
status = node.get("status") if isinstance(node.get("status"), dict) else {}
node_info = status.get("nodeInfo") if isinstance(status.get("nodeInfo"), dict) else {}
labels = metadata.get("labels") if isinstance(metadata.get("labels"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
if not name:
continue
roles = _node_roles(labels)
conditions = _node_pressure_conditions(status.get("conditions"))
created_at = metadata.get("creationTimestamp") if isinstance(metadata.get("creationTimestamp"), str) else ""
taints = _node_taints(spec.get("taints"))
details.append(
{
"name": name,
"ready": _node_ready(status.get("conditions")),
"roles": roles,
"is_worker": _node_is_worker(labels),
"labels": _node_labels(labels),
"hardware": _hardware_hint(labels, node_info),
"arch": node_info.get("architecture") or "",
"os": node_info.get("operatingSystem") or "",
"kernel": node_info.get("kernelVersion") or "",
"kubelet": node_info.get("kubeletVersion") or "",
"container_runtime": node_info.get("containerRuntimeVersion") or "",
"addresses": _node_addresses(status),
"created_at": created_at,
"age_hours": _age_hours(created_at),
"taints": taints,
"unschedulable": bool(spec.get("unschedulable")),
"capacity": _node_capacity(status.get("capacity")),
"allocatable": _node_capacity(status.get("allocatable")),
"pressure": conditions,
}
)
details.sort(key=lambda item: item.get("name") or "")
return details
def _age_hours(timestamp: str) -> float | None:
if not timestamp:
return None
try:
parsed = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
except ValueError:
return None
return round((datetime.now(timezone.utc) - parsed).total_seconds() / 3600, 1)
def _node_age_stats(details: list[dict[str, Any]]) -> dict[str, Any]:
ages: list[tuple[str, float]] = []
for node in details:
name = node.get("name") if isinstance(node, dict) else ""
age = node.get("age_hours")
if isinstance(name, str) and name and isinstance(age, (int, float)):
ages.append((name, float(age)))
if not ages:
return {}
ages.sort(key=lambda item: item[1])
values = [age for _, age in ages]
return {
"min": round(min(values), 1),
"max": round(max(values), 1),
"avg": round(sum(values) / len(values), 1),
"youngest": [{"name": name, "age_hours": age} for name, age in ages[:5]],
"oldest": [{"name": name, "age_hours": age} for name, age in ages[-5:]],
}
def _node_flagged(details: list[dict[str, Any]], key: str) -> list[str]:
names: list[str] = []
for node in details:
name = node.get("name") if isinstance(node, dict) else ""
if not isinstance(name, str) or not name:
continue
if node.get(key):
names.append(name)
names.sort()
return names
def _node_taints(raw: Any) -> list[dict[str, str]]:
if not isinstance(raw, list):
return []
taints: list[dict[str, str]] = []
for entry in raw:
if not isinstance(entry, dict):
continue
key = entry.get("key")
effect = entry.get("effect")
value = entry.get("value")
if isinstance(key, str) and isinstance(effect, str):
taints.append(
{
"key": key,
"value": value if isinstance(value, str) else "",
"effect": effect,
}
)
return taints
def _summarize_inventory(details: list[dict[str, Any]]) -> dict[str, Any]:
summary = {
"total": 0,
"ready": 0,
"workers": {"total": 0, "ready": 0},
"by_hardware": {},
"by_arch": {},
"by_role": {},
"not_ready_names": [],
"pressure_nodes": {key: [] for key in _PRESSURE_TYPES},
"age_stats": {},
"tainted_nodes": [],
"unschedulable_nodes": [],
}
not_ready: list[str] = []
for node in details:
name = _apply_node_summary(summary, node)
if name and not node.get("ready"):
not_ready.append(name)
not_ready.sort()
summary["not_ready_names"] = not_ready
for cond_type in summary["pressure_nodes"]:
summary["pressure_nodes"][cond_type].sort()
summary["age_stats"] = _node_age_stats(details)
summary["tainted_nodes"] = _node_flagged(details, "taints")
summary["unschedulable_nodes"] = _node_flagged(details, "unschedulable")
return summary
def _hardware_groups(details: list[dict[str, Any]]) -> list[dict[str, Any]]:
groups: dict[str, list[str]] = {}
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not isinstance(name, str) or not name:
continue
hardware = str(node.get("hardware") or "unknown")
groups.setdefault(hardware, []).append(name)
output: list[dict[str, Any]] = []
for hardware, nodes in groups.items():
nodes.sort()
output.append({"hardware": hardware, "count": len(nodes), "nodes": nodes})
output.sort(key=lambda item: (-(item.get("count") or 0), item.get("hardware") or ""))
return output
def _pressure_summary(nodes_summary: dict[str, Any]) -> dict[str, Any]:
pressure_nodes = nodes_summary.get("pressure_nodes") if isinstance(nodes_summary, dict) else {}
summary: dict[str, Any] = {"by_type": {}, "total": 0}
if isinstance(pressure_nodes, dict):
for cond, names in pressure_nodes.items():
count = len(names) if isinstance(names, list) else 0
summary["by_type"][cond] = count
summary["total"] += count
unschedulable = nodes_summary.get("unschedulable_nodes") or []
summary["unschedulable"] = len(unschedulable) if isinstance(unschedulable, list) else 0
return summary
def _apply_node_summary(summary: dict[str, Any], node: dict[str, Any]) -> str:
name = node.get("name") if isinstance(node, dict) else ""
if not isinstance(name, str) or not name:
return ""
summary["total"] += 1
ready = bool(node.get("ready"))
if ready:
summary["ready"] += 1
if node.get("is_worker"):
summary["workers"]["total"] += 1
if ready:
summary["workers"]["ready"] += 1
hardware = node.get("hardware") or "unknown"
arch = node.get("arch") or "unknown"
summary["by_hardware"][hardware] = summary["by_hardware"].get(hardware, 0) + 1
summary["by_arch"][arch] = summary["by_arch"].get(arch, 0) + 1
for role in node.get("roles") or []:
summary["by_role"][role] = summary["by_role"].get(role, 0) + 1
_apply_pressure(summary, node, name)
return name
def _apply_pressure(summary: dict[str, Any], node: dict[str, Any], name: str) -> None:
pressure = node.get("pressure") or {}
if not isinstance(pressure, dict):
return
for cond_type, active in pressure.items():
if active and cond_type in summary["pressure_nodes"]:
summary["pressure_nodes"][cond_type].append(name)
def _node_capacity(raw: Any) -> dict[str, str]:
if not isinstance(raw, dict):
return {}
output: dict[str, str] = {}
for key in _CAPACITY_KEYS:
value = raw.get(key)
if isinstance(value, (str, int, float)) and value != "":
output[key] = str(value)
return output
def _node_pressure_conditions(conditions: Any) -> dict[str, bool]:
if not isinstance(conditions, list):
return {}
pressure: dict[str, bool] = {}
for condition in conditions:
if not isinstance(condition, dict):
continue
cond_type = condition.get("type")
if cond_type in _PRESSURE_TYPES:
pressure[cond_type] = condition.get("status") == "True"
return pressure
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _node_is_worker(labels: dict[str, Any]) -> bool:
if "node-role.kubernetes.io/control-plane" in labels:
return False
if "node-role.kubernetes.io/master" in labels:
return False
if "node-role.kubernetes.io/worker" in labels:
return True
return True
def _hardware_hint(labels: dict[str, Any], node_info: dict[str, Any]) -> str:
result = "unknown"
if str(labels.get("jetson") or "").lower() == "true":
result = "jetson"
else:
hardware = (labels.get("hardware") or "").strip().lower()
if hardware:
result = hardware
else:
kernel = str(node_info.get("kernelVersion") or "").lower()
os_image = str(node_info.get("osImage") or "").lower()
if "tegra" in kernel or "jetson" in os_image:
result = "jetson"
elif "raspi" in kernel or "bcm2711" in kernel:
result = "rpi"
else:
arch = str(node_info.get("architecture") or "").lower()
if arch == "amd64":
result = "amd64"
elif arch == "arm64":
result = "arm64-unknown"
return result
def _condition_status(conditions: Any, cond_type: str) -> tuple[bool | None, str, str]:
if not isinstance(conditions, list):
return None, "", ""
for condition in conditions:
if not isinstance(condition, dict):
continue
if condition.get("type") != cond_type:
continue
status = condition.get("status")
if status == "True":
return True, condition.get("reason") or "", condition.get("message") or ""
if status == "False":
return False, condition.get("reason") or "", condition.get("message") or ""
return None, condition.get("reason") or "", condition.get("message") or ""
return None, "", ""
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,346 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
from .cluster_state_flux_events import *
from .cluster_state_nodes import *
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
for key in _WORKLOAD_LABEL_KEYS:
value = labels.get(key)
if isinstance(value, str) and value:
return value, f"label:{key}"
return "", ""
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
for owner in owners:
if not isinstance(owner, dict):
continue
name = owner.get("name")
kind = owner.get("kind")
if isinstance(name, str) and name:
return name, f"owner:{kind or 'unknown'}"
return "", ""
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
name, source = _workload_from_labels(labels)
if name:
return name, source
return _owner_reference(meta)
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
workloads: dict[tuple[str, str], dict[str, Any]] = {}
for pod in _items(payload):
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
workload, source = _pod_workload(metadata)
if not workload:
continue
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
key = (namespace, workload)
entry = workloads.setdefault(
key,
{
"namespace": namespace,
"workload": workload,
"source": source,
"nodes": {},
"pods_total": 0,
"pods_running": 0,
},
)
entry["pods_total"] += 1
if phase == "Running":
entry["pods_running"] += 1
if node:
nodes = entry["nodes"]
nodes[node] = nodes.get(node, 0) + 1
output: list[dict[str, Any]] = []
for entry in workloads.values():
nodes = entry.get("nodes") or {}
primary = ""
if isinstance(nodes, dict) and nodes:
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
entry["primary_node"] = primary
output.append(entry)
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
return output
def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
namespaces: dict[str, dict[str, Any]] = {}
for pod in _items(payload):
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
entry = namespaces.setdefault(
namespace,
{
"namespace": namespace,
"pods_total": 0,
"pods_running": 0,
"pods_pending": 0,
"pods_failed": 0,
"pods_succeeded": 0,
},
)
entry["pods_total"] += 1
if phase == "Running":
entry["pods_running"] += 1
elif phase == "Pending":
entry["pods_pending"] += 1
elif phase == "Failed":
entry["pods_failed"] += 1
elif phase == "Succeeded":
entry["pods_succeeded"] += 1
output = list(namespaces.values())
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
return output
def _summarize_namespace_nodes(payload: dict[str, Any]) -> list[dict[str, Any]]:
namespaces: dict[str, dict[str, Any]] = {}
for pod in _items(payload):
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
continue
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
if not node:
continue
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
entry = namespaces.setdefault(
namespace,
{
"namespace": namespace,
"pods_total": 0,
"pods_running": 0,
"nodes": {},
},
)
entry["pods_total"] += 1
if phase == "Running":
entry["pods_running"] += 1
nodes = entry["nodes"]
nodes[node] = nodes.get(node, 0) + 1
output: list[dict[str, Any]] = []
for entry in namespaces.values():
nodes = entry.get("nodes") or {}
primary = ""
if isinstance(nodes, dict) and nodes:
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
entry["primary_node"] = primary
output.append(entry)
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
return output
_NODE_PHASE_KEYS = {
"Running": "pods_running",
"Pending": "pods_pending",
"Failed": "pods_failed",
"Succeeded": "pods_succeeded",
}
def _summarize_node_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
nodes: dict[str, dict[str, Any]] = {}
for pod in _items(payload):
context = _node_pod_context(pod)
if not context:
continue
node, namespace, phase = context
entry = _node_pod_entry(nodes, node)
_node_pod_apply(entry, namespace, phase)
return _node_pod_finalize(nodes)
def _node_pod_context(pod: dict[str, Any]) -> tuple[str, str, str] | None:
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
if not _namespace_allowed(namespace):
return None
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
if not node:
return None
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
return node, namespace, phase
def _node_pod_entry(nodes: dict[str, dict[str, Any]], node: str) -> dict[str, Any]:
return nodes.setdefault(
node,
{
"node": node,
"pods_total": 0,
"pods_running": 0,
"pods_pending": 0,
"pods_failed": 0,
"pods_succeeded": 0,
"namespaces": {},
},
)
def _node_pod_apply(entry: dict[str, Any], namespace: str, phase: str) -> None:
entry["pods_total"] += 1
phase_key = _NODE_PHASE_KEYS.get(phase)
if phase_key:
entry[phase_key] += 1
if namespace:
namespaces = entry["namespaces"]
namespaces[namespace] = namespaces.get(namespace, 0) + 1
def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for entry in nodes.values():
namespaces = entry.get("namespaces") or {}
if isinstance(namespaces, dict):
entry["namespaces_top"] = sorted(
namespaces.items(), key=lambda item: (-item[1], item[0])
)[:3]
output.append(entry)
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("node") or ""))
return output
def _node_pods_top(node_pods: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for entry in node_pods[:limit]:
if not isinstance(entry, dict):
continue
output.append(
{
"node": entry.get("node"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"namespaces_top": entry.get("namespaces_top") or [],
}
)
return output
def _record_pending_pod(
pending_oldest: list[dict[str, Any]],
info: dict[str, Any],
) -> bool:
age_hours = info.get("age_hours")
if age_hours is None:
return False
pending_oldest.append(info)
return age_hours >= _PENDING_15M_HOURS
def _update_pod_issue(
pod: dict[str, Any],
acc: dict[str, Any],
) -> None:
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
created_at = (
metadata.get("creationTimestamp")
if isinstance(metadata.get("creationTimestamp"), str)
else ""
)
age_hours = _age_hours(created_at)
if not name or not namespace:
return
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
restarts = 0
waiting_reasons: list[str] = []
for container in status.get("containerStatuses") or []:
if not isinstance(container, dict):
continue
restarts += int(container.get("restartCount") or 0)
state = container.get("state") if isinstance(container.get("state"), dict) else {}
waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {}
reason = waiting.get("reason")
if isinstance(reason, str) and reason:
waiting_reasons.append(reason)
acc["waiting_reasons"][reason] = acc["waiting_reasons"].get(reason, 0) + 1
phase_reason = status.get("reason")
if isinstance(phase_reason, str) and phase_reason:
acc["phase_reasons"][phase_reason] = acc["phase_reasons"].get(phase_reason, 0) + 1
if phase in acc["counts"]:
acc["counts"][phase] += 1
if phase in _PHASE_SEVERITY or restarts > 0:
acc["items"].append(
{
"namespace": namespace,
"pod": name,
"node": spec.get("nodeName") or "",
"phase": phase,
"reason": status.get("reason") or "",
"restarts": restarts,
"waiting_reasons": sorted(set(waiting_reasons)),
"created_at": created_at,
"age_hours": age_hours,
}
)
if phase == "Pending":
info = {
"namespace": namespace,
"pod": name,
"node": spec.get("nodeName") or "",
"age_hours": age_hours,
"reason": status.get("reason") or "",
}
if _record_pending_pod(acc["pending_oldest"], info):
acc["pending_over_15m"] += 1
def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]:
acc = {
"items": [],
"counts": {key: 0 for key in _PHASE_SEVERITY},
"pending_oldest": [],
"pending_over_15m": 0,
"waiting_reasons": {},
"phase_reasons": {},
}
for pod in _items(payload):
if isinstance(pod, dict):
_update_pod_issue(pod, acc)
items = acc["items"]
items.sort(
key=lambda item: (
-_PHASE_SEVERITY.get(item.get("phase") or "", 0),
-(item.get("restarts") or 0),
item.get("namespace") or "",
item.get("pod") or "",
)
)
pending_oldest = acc["pending_oldest"]
pending_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0))
return {
"counts": acc["counts"],
"items": items[:20],
"pending_oldest": pending_oldest[:10],
"pending_over_15m": acc["pending_over_15m"],
"waiting_reasons": acc["waiting_reasons"],
"phase_reasons": acc["phase_reasons"],
}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,110 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
def _node_profiles(
node_context: list[dict[str, Any]],
node_pods: list[dict[str, Any]],
node_workloads: dict[str, dict[str, int]],
) -> list[dict[str, Any]]:
pod_map = {entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)}
workload_map = node_workloads or {}
profiles: list[dict[str, Any]] = []
for entry in node_context:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
pods = pod_map.get(node, {})
workloads = workload_map.get(node, {})
workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
profiles.append(
{
"node": node,
"ready": entry.get("ready"),
"hardware": entry.get("hardware"),
"arch": entry.get("arch"),
"roles": entry.get("roles"),
"pods_total": pods.get("pods_total"),
"pods_running": pods.get("pods_running"),
"namespaces_top": pods.get("namespaces_top") or [],
"workloads_top": workloads_top,
"load_index": entry.get("load_index"),
"cpu": entry.get("cpu"),
"ram": entry.get("ram"),
"net": entry.get("net"),
"io": entry.get("io"),
"disk": entry.get("disk"),
"baseline_delta": entry.get("baseline_delta") or {},
}
)
profiles.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
return profiles[:_PROFILE_LIMIT]
def _namespace_profiles(namespace_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
entries = [entry for entry in namespace_context if isinstance(entry, dict)]
entries.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or ""))
output: list[dict[str, Any]] = []
for entry in entries[:_PROFILE_LIMIT]:
output.append(
{
"namespace": entry.get("namespace"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes_top": entry.get("nodes_top") or [],
"cpu_usage": entry.get("cpu_usage"),
"mem_usage": entry.get("mem_usage"),
"cpu_ratio": entry.get("cpu_ratio"),
"mem_ratio": entry.get("mem_ratio"),
"baseline_delta": entry.get("baseline_delta") or {},
}
)
return output
def _workload_profiles(workloads: list[dict[str, Any]]) -> list[dict[str, Any]]:
entries = [entry for entry in workloads if isinstance(entry, dict)]
entries.sort(
key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""),
)
output: list[dict[str, Any]] = []
for entry in entries[:_PROFILE_LIMIT]:
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
nodes_top = (
sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:3]
if isinstance(nodes, dict)
else []
)
output.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"source": entry.get("source"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes_top": nodes_top,
}
)
return output
def _build_profiles(
node_context: list[dict[str, Any]],
namespace_context: list[dict[str, Any]],
node_pods: list[dict[str, Any]],
workloads: list[dict[str, Any]],
node_workloads: dict[str, dict[str, int]],
) -> dict[str, Any]:
return {
"nodes": _node_profiles(node_context, node_pods, node_workloads),
"namespaces": _namespace_profiles(namespace_context),
"workloads": _workload_profiles(workloads),
}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,455 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
def _vector_to_named(entries: list[dict[str, Any]], label_key: str, name_key: str) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
value = item.get("value")
label = metric.get(label_key) if isinstance(metric, dict) else None
if not isinstance(label, str) or not label:
continue
output.append({name_key: label, "value": value, "metric": metric})
output.sort(key=lambda item: (-(item.get("value") or 0), item.get(name_key) or ""))
return output
def _pvc_top(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
pvc = metric.get("persistentvolumeclaim")
if not isinstance(namespace, str) or not isinstance(pvc, str):
continue
output.append(
{
"namespace": namespace,
"pvc": pvc,
"used_percent": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("used_percent") or 0), item.get("namespace") or ""))
return output
def _namespace_context(
namespace_pods: list[dict[str, Any]],
namespace_nodes: list[dict[str, Any]],
namespace_capacity: list[dict[str, Any]],
namespace_baseline: dict[str, dict[str, dict[str, float]]],
) -> list[dict[str, Any]]:
node_map = {entry.get("namespace"): entry for entry in namespace_nodes if isinstance(entry, dict)}
cap_map = {entry.get("namespace"): entry for entry in namespace_capacity if isinstance(entry, dict)}
output: list[dict[str, Any]] = []
for entry in namespace_pods:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
if not isinstance(namespace, str) or not namespace:
continue
nodes_entry = node_map.get(namespace, {})
cap_entry = cap_map.get(namespace, {})
nodes = nodes_entry.get("nodes") if isinstance(nodes_entry.get("nodes"), dict) else {}
top_nodes: list[dict[str, Any]] = []
if isinstance(nodes, dict):
top_nodes = [
{"node": name, "pods": count}
for name, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:3]
]
baseline = namespace_baseline.get(namespace, {}) if isinstance(namespace_baseline, dict) else {}
delta_cpu = _baseline_delta(cap_entry.get("cpu_usage"), baseline.get("cpu", {}))
delta_mem = _baseline_delta(cap_entry.get("mem_usage"), baseline.get("mem", {}))
baseline_delta = {k: v for k, v in (("cpu", delta_cpu), ("mem", delta_mem)) if v is not None}
output.append(
{
"namespace": namespace,
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"pods_pending": entry.get("pods_pending"),
"pods_failed": entry.get("pods_failed"),
"pods_succeeded": entry.get("pods_succeeded"),
"primary_node": nodes_entry.get("primary_node"),
"nodes_top": top_nodes,
"cpu_usage": cap_entry.get("cpu_usage"),
"cpu_requests": cap_entry.get("cpu_requests"),
"cpu_ratio": cap_entry.get("cpu_usage_ratio"),
"mem_usage": cap_entry.get("mem_usage"),
"mem_requests": cap_entry.get("mem_requests"),
"mem_ratio": cap_entry.get("mem_usage_ratio"),
"baseline_delta": baseline_delta,
}
)
output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or ""))
return output
def _namespace_nodes_top(namespace_context: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for entry in namespace_context[:limit]:
if not isinstance(entry, dict):
continue
output.append(
{
"namespace": entry.get("namespace"),
"pods_total": entry.get("pods_total"),
"primary_node": entry.get("primary_node"),
"nodes_top": entry.get("nodes_top") or [],
}
)
return output
def _workload_nodes_top(workloads: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
entries = [w for w in workloads if isinstance(w, dict)]
entries.sort(
key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""),
)
for entry in entries[:limit]:
output.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"source": entry.get("source"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
}
)
return output
def _node_workload_map(workloads: list[dict[str, Any]]) -> dict[str, dict[str, int]]:
mapping: dict[str, dict[str, int]] = {}
for entry in workloads:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
workload = entry.get("workload")
if not isinstance(workload, str) or not workload:
continue
nodes = entry.get("nodes")
if not isinstance(nodes, dict):
continue
key = f"{namespace}/{workload}" if isinstance(namespace, str) and namespace else workload
for node, count in nodes.items():
if not isinstance(node, str) or not node:
continue
if not isinstance(count, int):
try:
count = int(count)
except (TypeError, ValueError):
continue
if count <= 0:
continue
mapping.setdefault(node, {})[key] = mapping.setdefault(node, {}).get(key, 0) + count
return mapping
def _node_workloads_top(
workload_map: dict[str, dict[str, int]],
limit_nodes: int = _NODE_WORKLOAD_LIMIT,
limit_workloads: int = _NODE_WORKLOAD_TOP,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for node, workloads in workload_map.items():
if not isinstance(node, str) or not node or not isinstance(workloads, dict):
continue
total = sum(count for count in workloads.values() if isinstance(count, int))
top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:limit_workloads]
output.append({"node": node, "pods_total": total, "workloads_top": top})
output.sort(key=lambda item: (-(item.get("pods_total") or 0), item.get("node") or ""))
return output[:limit_nodes]
def _workload_index(workloads: list[dict[str, Any]], limit: int = _WORKLOAD_INDEX_LIMIT) -> list[dict[str, Any]]:
entries = [entry for entry in workloads if isinstance(entry, dict)]
entries.sort(
key=lambda item: (-(item.get("pods_total") or 0), item.get("namespace") or "", item.get("workload") or ""),
)
output: list[dict[str, Any]] = []
for entry in entries[:limit]:
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
nodes_top = (
sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
if isinstance(nodes, dict)
else []
)
output.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes_top": nodes_top,
}
)
return output
def _events_summary(events: dict[str, Any]) -> dict[str, Any]:
if not isinstance(events, dict):
return {}
by_namespace = events.get("warnings_by_namespace") if isinstance(events.get("warnings_by_namespace"), dict) else {}
top_namespace = ""
top_namespace_count = 0
if by_namespace:
top_namespace, top_namespace_count = sorted(
by_namespace.items(), key=lambda item: (-item[1], item[0])
)[0]
return {
"warnings_total": events.get("warnings_total"),
"top_reason": events.get("warnings_top_reason"),
"top_namespace": {"namespace": top_namespace, "count": top_namespace_count},
"latest": events.get("warnings_latest"),
"recent": (events.get("warnings_recent") or [])[:_EVENTS_SUMMARY_LIMIT],
}
def _build_lexicon() -> dict[str, Any]:
terms = [
{
"term": "hottest",
"meaning": "highest utilization for a metric (cpu, ram, net, io, load_index).",
},
{
"term": "pressure",
"meaning": "node condition flags (MemoryPressure, DiskPressure, PIDPressure, NetworkUnavailable).",
},
{
"term": "load_index",
"meaning": "composite load score derived from cpu, ram, net, io.",
},
{"term": "top", "meaning": "highest values within a category."},
{"term": "pods", "meaning": "running workload instances on a node or namespace."},
{"term": "workload", "meaning": "deployment/statefulset/daemonset grouping."},
]
aliases = {
"hot node": "node with highest load_index",
"hottest by cpu": "node with highest cpu utilization",
"hottest by ram": "node with highest ram utilization",
"pressure node": "node with pressure condition flags",
}
return {"terms": terms, "aliases": aliases}
def _top_named_entries(
entries: list[dict[str, Any]],
name_key: str,
limit: int,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for entry in entries or []:
if not isinstance(entry, dict):
continue
name = entry.get(name_key)
if not isinstance(name, str) or not name:
continue
value = entry.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
numeric = 0.0
output.append({"name": name, "value": numeric})
output.sort(key=lambda item: -(item.get("value") or 0))
return output[:limit]
def _cross_node_metric_top(metrics: dict[str, Any], node_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
node_map = {entry.get("node"): entry for entry in node_context if isinstance(entry, dict)}
output: list[dict[str, Any]] = []
for metric in ("cpu", "ram", "net", "io", "disk"):
series = usage.get(metric)
if not isinstance(series, list):
continue
for top in _top_named_entries(series, "node", _CROSS_NODE_TOP):
node = top.get("name")
if not node:
continue
context = node_map.get(node, {})
output.append(
{
"metric": metric,
"node": node,
"value": top.get("value"),
"cpu": context.get("cpu"),
"ram": context.get("ram"),
"net": context.get("net"),
"io": context.get("io"),
"disk": context.get("disk"),
"load_index": context.get("load_index"),
"pods_total": context.get("pods_total"),
"hardware": context.get("hardware"),
"roles": context.get("roles"),
"pressure_flags": context.get("pressure_flags"),
}
)
return output
def _cross_namespace_metric_top(
metrics: dict[str, Any],
namespace_context: list[dict[str, Any]],
) -> list[dict[str, Any]]:
top = metrics.get("namespace_top") if isinstance(metrics.get("namespace_top"), dict) else {}
namespace_map = {
entry.get("namespace"): entry
for entry in namespace_context
if isinstance(entry, dict) and entry.get("namespace")
}
output: list[dict[str, Any]] = []
for metric in ("cpu", "mem", "net", "io", "restarts"):
series = top.get(metric)
if not isinstance(series, list):
continue
for entry in _top_named_entries(series, "namespace", _CROSS_NAMESPACE_TOP):
namespace = entry.get("name")
if not namespace:
continue
context = namespace_map.get(namespace, {})
output.append(
{
"metric": metric,
"namespace": namespace,
"value": entry.get("value"),
"pods_total": context.get("pods_total"),
"pods_running": context.get("pods_running"),
"cpu_ratio": context.get("cpu_ratio"),
"mem_ratio": context.get("mem_ratio"),
"primary_node": context.get("primary_node"),
"nodes_top": context.get("nodes_top") or [],
}
)
return output
def _build_cross_stats(
metrics: dict[str, Any],
node_context: list[dict[str, Any]],
namespace_context: list[dict[str, Any]],
workloads: list[dict[str, Any]],
) -> dict[str, Any]:
return {
"node_metric_top": _cross_node_metric_top(metrics, node_context),
"namespace_metric_top": _cross_namespace_metric_top(metrics, namespace_context),
"pvc_top": _pvc_top(metrics.get("pvc_usage_top", []))[:_CROSS_PVC_TOP],
"workload_top": _workload_nodes_top(workloads, _CROSS_NAMESPACE_TOP),
}
def _node_context(
node_details: list[dict[str, Any]],
node_load: list[dict[str, Any]],
node_baseline: dict[str, dict[str, dict[str, float]]],
node_workloads: dict[str, dict[str, int]],
) -> list[dict[str, Any]]:
load_map = {entry.get("node"): entry for entry in node_load if isinstance(entry, dict)}
output: list[dict[str, Any]] = []
for entry in node_details:
if not isinstance(entry, dict):
continue
name = entry.get("name")
if not isinstance(name, str) or not name:
continue
load_entry = load_map.get(name, {})
baseline = node_baseline.get(name, {}) if isinstance(node_baseline, dict) else {}
deltas: dict[str, float] = {}
for key in ("cpu", "ram", "net", "io", "disk"):
current = load_entry.get(key)
stats = baseline.get(key, {}) if isinstance(baseline, dict) else {}
delta = _baseline_delta(current, stats)
if delta is not None:
deltas[key] = delta
workloads = node_workloads.get(name, {}) if isinstance(node_workloads, dict) else {}
workloads_top = sorted(workloads.items(), key=lambda item: (-item[1], item[0]))[:_NODE_WORKLOAD_TOP]
output.append(
{
"node": name,
"ready": entry.get("ready"),
"roles": entry.get("roles"),
"is_worker": entry.get("is_worker"),
"hardware": entry.get("hardware"),
"arch": entry.get("arch"),
"os": entry.get("os"),
"taints": entry.get("taints"),
"unschedulable": entry.get("unschedulable"),
"pressure_flags": entry.get("pressure"),
"pods_total": load_entry.get("pods_total"),
"cpu": load_entry.get("cpu"),
"ram": load_entry.get("ram"),
"disk": load_entry.get("disk"),
"net": load_entry.get("net"),
"io": load_entry.get("io"),
"load_index": load_entry.get("load_index"),
"baseline": baseline,
"baseline_delta": deltas,
"workloads_top": workloads_top,
}
)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
return output
def _baseline_delta(current: Any, stats: dict[str, Any]) -> float | None:
if not isinstance(current, (int, float)):
return None
avg = stats.get("avg")
if not isinstance(avg, (int, float)) or avg == 0:
return None
return round(((float(current) - float(avg)) / float(avg)) * 100, 2)
def _delta_severity(delta: float) -> str:
magnitude = abs(delta)
if magnitude >= _BASELINE_DELTA_CRIT:
return "critical"
if magnitude >= _BASELINE_DELTA_WARN:
return "warning"
return "info"
def _delta_entry_label(entry: dict[str, Any]) -> tuple[str, str]:
if "node" in entry:
return ("node", str(entry.get("node") or ""))
return ("namespace", str(entry.get("namespace") or ""))
def _delta_top(entries: list[dict[str, Any]], key: str, limit: int = _DELTA_TOP_LIMIT) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for entry in entries:
if not isinstance(entry, dict):
continue
deltas = entry.get("baseline_delta") if isinstance(entry.get("baseline_delta"), dict) else {}
delta = deltas.get(key)
if not isinstance(delta, (int, float)):
continue
label_key, label_value = _delta_entry_label(entry)
output.append(
{
label_key: label_value,
"metric": key,
"delta": delta,
"severity": _delta_severity(float(delta)),
}
)
output.sort(key=lambda item: (-(abs(item.get("delta") or 0)), item.get("metric") or ""))
return output[:limit]
def _reason_top(counts: dict[str, Any], limit: int = _REASON_TOP_LIMIT) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for reason, value in counts.items() if isinstance(counts, dict) else []:
if isinstance(reason, str) and reason and isinstance(value, (int, float)):
output.append({"reason": reason, "count": int(value)})
output.sort(key=lambda item: (-item.get("count", 0), item.get("reason") or ""))
return output[:limit]
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,160 @@
from __future__ import annotations
from typing import Any
from .cluster_state_anomalies import *
from .cluster_state_contract import *
from .cluster_state_health import *
from .cluster_state_relationships import *
def _pod_issue_summary(pod_issues: dict[str, Any], metrics: dict[str, Any]) -> dict[str, Any]:
waiting = pod_issues.get("waiting_reasons") if isinstance(pod_issues, dict) else {}
phase = pod_issues.get("phase_reasons") if isinstance(pod_issues, dict) else {}
return {
"waiting_reasons_top": _reason_top(waiting),
"phase_reasons_top": _reason_top(phase),
"namespace_issue_top": metrics.get("namespace_issue_top") or {},
}
def _delta_hit(delta: Any) -> bool:
if not isinstance(delta, (int, float)):
return False
return abs(float(delta)) >= _BASELINE_DELTA_WARN
def _node_delta_signals(node_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
signals: list[dict[str, Any]] = []
for entry in node_context:
if not isinstance(entry, dict):
continue
node = entry.get("node")
deltas = entry.get("baseline_delta") if isinstance(entry.get("baseline_delta"), dict) else {}
baseline = entry.get("baseline") if isinstance(entry.get("baseline"), dict) else {}
if not isinstance(node, str) or not node:
continue
for metric in ("cpu", "ram", "net", "io", "disk"):
delta = deltas.get(metric)
if not _delta_hit(delta):
continue
avg = baseline.get(metric, {}).get("avg") if isinstance(baseline.get(metric), dict) else None
signals.append(
{
"scope": "node",
"target": node,
"metric": metric,
"current": entry.get(metric),
"baseline_avg": avg,
"delta_pct": delta,
"severity": _delta_severity(float(delta)),
}
)
return signals
def _namespace_delta_signals(namespace_context: list[dict[str, Any]]) -> list[dict[str, Any]]:
signals: list[dict[str, Any]] = []
for entry in namespace_context:
if not isinstance(entry, dict):
continue
namespace = entry.get("namespace")
deltas = entry.get("baseline_delta") if isinstance(entry.get("baseline_delta"), dict) else {}
baseline = entry.get("baseline") if isinstance(entry.get("baseline"), dict) else {}
if not isinstance(namespace, str) or not namespace:
continue
for metric, current_key in (("cpu", "cpu_usage"), ("mem", "mem_usage")):
delta = deltas.get(metric)
if not _delta_hit(delta):
continue
avg = baseline.get(metric, {}).get("avg") if isinstance(baseline.get(metric), dict) else None
signals.append(
{
"scope": "namespace",
"target": namespace,
"metric": metric,
"current": entry.get(current_key),
"baseline_avg": avg,
"delta_pct": delta,
"severity": _delta_severity(float(delta)),
}
)
return signals
def _kustomization_signals(kustomizations: dict[str, Any]) -> list[dict[str, Any]]:
count = int(kustomizations.get("not_ready") or 0) if isinstance(kustomizations, dict) else 0
if count <= 0:
return []
return [
{
"scope": "flux",
"target": "kustomizations",
"metric": "not_ready",
"current": count,
"severity": "warning",
}
]
def _pod_issue_signals(pod_issues: dict[str, Any]) -> list[dict[str, Any]]:
if not isinstance(pod_issues, dict):
return []
signals: list[dict[str, Any]] = []
pending_over = int(pod_issues.get("pending_over_15m") or 0)
if pending_over > 0:
signals.append(
{
"scope": "pods",
"target": "pending_over_15m",
"metric": "count",
"current": pending_over,
"severity": "warning",
}
)
counts = pod_issues.get("counts") if isinstance(pod_issues.get("counts"), dict) else {}
failed = int(counts.get("Failed") or 0) if isinstance(counts, dict) else 0
if failed > 0:
signals.append(
{
"scope": "pods",
"target": "failed",
"metric": "count",
"current": failed,
"severity": "critical",
}
)
return signals
def _workload_health_signals(workloads_health: dict[str, Any]) -> list[dict[str, Any]]:
not_ready = _workload_not_ready_items(workloads_health)
if not not_ready:
return []
output: list[dict[str, Any]] = []
for entry in not_ready[:5]:
output.append(
{
"scope": "workload",
"target": f"{entry.get('namespace')}/{entry.get('workload')}",
"metric": "not_ready",
"current": entry.get("ready") or 0,
"desired": entry.get("desired") or 0,
"severity": "warning",
}
)
return output
def _build_signals(context: SignalContext) -> list[dict[str, Any]]:
signals = (
_node_delta_signals(context.node_context)
+ _namespace_delta_signals(context.namespace_context)
+ _workload_health_signals(context.workloads_health)
+ _pod_issue_signals(context.pod_issues)
+ _kustomization_signals(context.kustomizations)
+ _pvc_pressure_signals(context.metrics)
)
signals.sort(key=lambda item: (_severity_rank(item.get("severity")), item.get("scope") or ""))
return signals[:_SIGNAL_LIMIT]
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,323 @@
from __future__ import annotations
import sys
from typing import Any, Callable
import httpx
from ..settings import settings
from .cluster_state_contract import *
from .cluster_state_flux_events import *
from .cluster_state_relationships import *
def _facade_override(name: str, original: Callable[..., Any]) -> Callable[..., Any] | None:
facade = sys.modules.get("ariadne.services.cluster_state")
candidate = getattr(facade, name, None) if facade is not None else None
if candidate is not None and candidate is not original:
return candidate
return None
def _vm_query(expr: str) -> list[dict[str, Any]] | None:
base = settings.vm_url
if not base:
return None
url = f"{base.rstrip('/')}/api/v1/query"
params = {"query": expr}
with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client:
resp = client.get(url, params=params)
resp.raise_for_status()
payload = resp.json()
if payload.get("status") != "success":
return None
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
result = data.get("result")
return result if isinstance(result, list) else None
def _vm_scalar(expr: str) -> float | None:
override = _facade_override("_vm_scalar", _vm_scalar)
if override is not None:
return override(expr)
result = _vm_query(expr)
if not result:
return None
value = result[0].get("value") if isinstance(result[0], dict) else None
if not isinstance(value, list) or len(value) < _VALUE_PAIR_LEN:
return None
try:
return float(value[1])
except (TypeError, ValueError):
return None
def _vm_vector(expr: str) -> list[dict[str, Any]]:
override = _facade_override("_vm_vector", _vm_vector)
if override is not None:
return override(expr)
result = _vm_query(expr) or []
output: list[dict[str, Any]] = []
for item in result:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
value = item.get("value") if isinstance(item.get("value"), list) else []
if len(value) < _VALUE_PAIR_LEN:
continue
try:
numeric = float(value[1])
except (TypeError, ValueError):
continue
output.append({"metric": metric, "value": numeric})
return output
def _alert_entries(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
value = item.get("value")
name = metric.get("alertname")
if not isinstance(name, str) or not name:
continue
severity = metric.get("severity") if isinstance(metric.get("severity"), str) else ""
output.append(
{
"alert": name,
"severity": severity,
"value": value,
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("alert") or ""))
return output
def _vm_alerts_now() -> list[dict[str, Any]]:
entries = _vm_vector('sum by (alertname,severity) (ALERTS{alertstate="firing"})')
return _alert_entries(entries)[:_ALERT_TOP_LIMIT]
def _vm_alerts_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_ALERT_TOP_LIMIT}, sum by (alertname,severity) (count_over_time(ALERTS{{alertstate=\"firing\"}}[{window}])))"
)
return _alert_entries(entries)
def _alertmanager_alerts(errors: list[str]) -> list[dict[str, Any]]:
base = settings.alertmanager_url
if not base:
return []
url = f"{base.rstrip('/')}/api/v2/alerts"
try:
with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client:
resp = client.get(url)
resp.raise_for_status()
payload = resp.json()
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
except Exception as exc:
errors.append(f"alertmanager: {exc}")
return []
def _summarize_alerts(alerts: list[dict[str, Any]]) -> dict[str, Any]:
items: list[dict[str, Any]] = []
by_severity: dict[str, int] = {}
for alert in alerts:
labels = alert.get("labels") if isinstance(alert.get("labels"), dict) else {}
alertname = labels.get("alertname")
if not isinstance(alertname, str) or not alertname:
continue
severity = labels.get("severity") if isinstance(labels.get("severity"), str) else ""
items.append({"alert": alertname, "severity": severity})
if severity:
by_severity[severity] = by_severity.get(severity, 0) + 1
items.sort(key=lambda item: (item.get("severity") or "", item.get("alert") or ""))
return {
"total": len(items),
"by_severity": by_severity,
"items": items[:_ALERT_TOP_LIMIT],
}
def _filter_namespace_vector(entries: list[dict[str, Any]]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
if not isinstance(namespace, str) or not namespace:
continue
if namespace in _SYSTEM_NAMESPACES:
continue
output.append(item)
return output
def _vm_topk(expr: str, label_key: str) -> dict[str, Any] | None:
result = _vm_vector(expr)
if not result:
return None
metric = result[0].get("metric") if isinstance(result[0], dict) else {}
value = result[0].get("value")
label = metric.get(label_key) if isinstance(metric, dict) else None
return {"label": label or "", "value": value, "metric": metric}
def _vm_node_metric(expr: str, label_key: str) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in _vm_vector(expr):
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
value = item.get("value")
if isinstance(label, str) and label:
output.append({"node": label, "value": value})
output.sort(key=lambda item: item.get("node") or "")
return output
def _vm_baseline_map(expr: str, label_key: str, window: str) -> dict[str, dict[str, float]]:
averages = _vm_vector(f"avg_over_time(({expr})[{window}])")
maximums = _vm_vector(f"max_over_time(({expr})[{window}])")
baseline: dict[str, dict[str, float]] = {}
for item in averages:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
if not isinstance(label, str) or not label:
continue
baseline.setdefault(label, {})["avg"] = float(item.get("value") or 0)
for item in maximums:
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
label = metric.get(label_key)
if not isinstance(label, str) or not label:
continue
baseline.setdefault(label, {})["max"] = float(item.get("value") or 0)
return baseline
def _baseline_map_to_list(
baseline: dict[str, dict[str, float]],
name_key: str,
) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for name, stats in baseline.items():
if not isinstance(name, str) or not name:
continue
output.append(
{
name_key: name,
"avg": stats.get("avg"),
"max": stats.get("max"),
}
)
output.sort(key=lambda item: (-(item.get("avg") or 0), item.get(name_key) or ""))
return output
def _limit_entries(entries: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]:
if limit <= 0:
return []
return entries[:limit]
def _vm_window_series(
expr: str,
label_key: str,
name_key: str,
window: str,
) -> dict[str, list[dict[str, Any]]]:
avg = _vector_to_named(
_vm_vector(f"avg_over_time(({expr})[{window}])"),
label_key,
name_key,
)
max_values = _vector_to_named(
_vm_vector(f"max_over_time(({expr})[{window}])"),
label_key,
name_key,
)
p95 = _vector_to_named(
_vm_vector(f"quantile_over_time(0.95, ({expr})[{window}])"),
label_key,
name_key,
)
return {"avg": avg, "max": max_values, "p95": p95}
def _trim_window_series(series: dict[str, list[dict[str, Any]]], limit: int) -> dict[str, list[dict[str, Any]]]:
return {key: _limit_entries(entries, limit) for key, entries in series.items()}
def _build_metric_trends(
exprs: dict[str, str],
label_key: str,
name_key: str,
windows: tuple[str, ...],
limit: int,
) -> dict[str, dict[str, dict[str, list[dict[str, Any]]]]]:
trends: dict[str, dict[str, dict[str, list[dict[str, Any]]]]] = {}
for metric, expr in exprs.items():
metric_trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for window in windows:
series = _vm_window_series(expr, label_key, name_key, window)
metric_trends[window] = _trim_window_series(series, limit)
trends[metric] = metric_trends
return trends
def _vm_scalar_window(expr: str, window: str, fn: str) -> float | None:
return _vm_scalar(f"{fn}(({expr})[{window}])")
def _scalar_trends(expr: str, windows: tuple[str, ...]) -> dict[str, dict[str, float | None]]:
return {
window: {
"avg": _vm_scalar_window(expr, window, "avg_over_time"),
"min": _vm_scalar_window(expr, window, "min_over_time"),
"max": _vm_scalar_window(expr, window, "max_over_time"),
}
for window in windows
}
def _cluster_trends() -> dict[str, dict[str, dict[str, float | None]]]:
exprs = {
"nodes_ready": 'sum(kube_node_status_condition{condition="Ready",status="true"})',
"nodes_not_ready": 'sum(kube_node_status_condition{condition="Ready",status="false"})',
"pods_running": 'sum(kube_pod_status_phase{phase="Running"})',
"pods_pending": 'sum(kube_pod_status_phase{phase="Pending"})',
"pods_failed": 'sum(kube_pod_status_phase{phase="Failed"})',
"pods_succeeded": 'sum(kube_pod_status_phase{phase="Succeeded"})',
"alerts_firing": 'sum(ALERTS{alertstate="firing"})',
"cpu_usage": f'sum(rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))',
"mem_usage": 'sum(container_memory_working_set_bytes{namespace!=""})',
"net_io": (
f'sum(rate(container_network_receive_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]) '
f'+ rate(container_network_transmit_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]))'
),
"fs_io": (
f'sum(rate(container_fs_reads_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]) '
f'+ rate(container_fs_writes_bytes_total{{namespace!=""}}[{_RATE_WINDOW}]))'
),
}
return {key: _scalar_trends(expr, _TREND_WINDOWS) for key, expr in exprs.items()}
def _node_condition_trends() -> dict[str, dict[str, dict[str, float | None]]]:
conditions = {
"ready": 'sum(kube_node_status_condition{condition="Ready",status="true"})',
"not_ready": 'sum(kube_node_status_condition{condition="Ready",status="false"})',
"unschedulable": "sum(kube_node_spec_unschedulable)",
}
for cond in _PRESSURE_TYPES:
conditions[cond.lower()] = (
f'sum(kube_node_status_condition{{condition="{cond}",status="true"}})'
)
return {key: _scalar_trends(expr, _TREND_WINDOWS) for key, expr in conditions.items()}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,187 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
from .cluster_state_relationships import *
from .cluster_state_vm_client import *
def _pod_reason_totals(
reasons: dict[str, str],
series: str,
) -> dict[str, dict[str, dict[str, float | None]]]:
totals: dict[str, dict[str, dict[str, float | None]]] = {}
for key, reason in reasons.items():
expr = f'sum({series}{{reason="{reason}"}})'
totals[key] = _scalar_trends(expr, _TREND_WINDOWS)
return totals
def _node_usage_exprs() -> dict[str, str]:
return {
"cpu": (
f'avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"ram": (
'avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"net": (
f'avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"io": (
f'avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
'* on(instance) group_left(node) label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
"disk": (
'avg by (node) (((1 - avg by (instance) (node_filesystem_avail_bytes{mountpoint="/",fstype!~"tmpfs|overlay"} '
'/ node_filesystem_size_bytes{mountpoint="/",fstype!~"tmpfs|overlay"})) * 100) * on(instance) group_left(node) '
'label_replace(node_uname_info{nodename!=""}, "node", "$1", "nodename", "(.*)"))'
),
}
def _namespace_usage_exprs() -> dict[str, str]:
return {
"cpu": f'sum by (namespace) (rate(container_cpu_usage_seconds_total{{namespace!=""}}[{_RATE_WINDOW}]))',
"mem": 'sum by (namespace) (container_memory_working_set_bytes{namespace!=""})',
}
def _namespace_request_exprs() -> dict[str, str]:
return {
"cpu_requests": "sum by (namespace) (kube_pod_container_resource_requests_cpu_cores)",
"mem_requests": "sum by (namespace) (kube_pod_container_resource_requests_memory_bytes)",
}
def _restart_namespace_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_NAMESPACE_LIMIT}, sum by (namespace) (increase(kube_pod_container_status_restarts_total[{window}])))"
)
entries = _filter_namespace_vector(entries)
return _vector_to_named(entries, "namespace", "namespace")
def _job_failure_trend(window: str) -> list[dict[str, Any]]:
entries = _vm_vector(
f"topk({_TREND_JOB_LIMIT}, sum by (namespace,job_name) (increase(kube_job_status_failed[{window}])))"
)
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
job = metric.get("job_name")
if not isinstance(namespace, str) or not isinstance(job, str):
continue
output.append(
{
"namespace": namespace,
"job": job,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("job") or ""))
return output
def _pod_reason_entries(expr: str, limit: int) -> list[dict[str, Any]]:
entries = _vm_vector(f"topk({limit}, sum by (namespace,pod) ({expr}))")
output: list[dict[str, Any]] = []
for item in entries:
if not isinstance(item, dict):
continue
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
pod = metric.get("pod")
if not isinstance(namespace, str) or not isinstance(pod, str):
continue
output.append(
{
"namespace": namespace,
"pod": pod,
"value": item.get("value"),
}
)
output.sort(key=lambda item: (-(item.get("value") or 0), item.get("namespace") or "", item.get("pod") or ""))
return output
def _namespace_reason_entries(expr: str, limit: int) -> list[dict[str, Any]]:
entries = _vm_vector(f"topk({limit}, sum by (namespace) ({expr}))")
entries = _filter_namespace_vector(entries)
return _vector_to_named(entries, "namespace", "namespace")
def _pod_waiting_now() -> dict[str, list[dict[str, Any]]]:
output: dict[str, list[dict[str, Any]]] = {}
for key, reason in _POD_WAITING_REASONS.items():
expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}'
output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT)
return output
def _pod_waiting_trends() -> dict[str, dict[str, list[dict[str, Any]]]]:
trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for key, reason in _POD_WAITING_REASONS.items():
expr = f'kube_pod_container_status_waiting_reason{{reason="{reason}"}}'
trends[key] = {
window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT)
for window in _TREND_WINDOWS
}
return trends
def _pod_terminated_now() -> dict[str, list[dict[str, Any]]]:
output: dict[str, list[dict[str, Any]]] = {}
for key, reason in _POD_TERMINATED_REASONS.items():
expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}'
output[key] = _pod_reason_entries(expr, _POD_REASON_LIMIT)
return output
def _pod_terminated_trends() -> dict[str, dict[str, list[dict[str, Any]]]]:
trends: dict[str, dict[str, list[dict[str, Any]]]] = {}
for key, reason in _POD_TERMINATED_REASONS.items():
expr = f'kube_pod_container_status_terminated_reason{{reason="{reason}"}}'
trends[key] = {
window: _pod_reason_entries(f"max_over_time(({expr})[{window}])", _POD_REASON_TREND_LIMIT)
for window in _TREND_WINDOWS
}
return trends
def _pods_phase_trends() -> dict[str, dict[str, dict[str, float | None]]]:
phases = {
"running": "sum(kube_pod_status_phase{phase=\"Running\"})",
"pending": "sum(kube_pod_status_phase{phase=\"Pending\"})",
"failed": "sum(kube_pod_status_phase{phase=\"Failed\"})",
}
trends: dict[str, dict[str, dict[str, float | None]]] = {}
for window in _TREND_WINDOWS:
window_entry: dict[str, dict[str, float | None]] = {}
for name, expr in phases.items():
window_entry[name] = {
"avg": _vm_scalar_window(expr, window, "avg_over_time"),
"max": _vm_scalar_window(expr, window, "max_over_time"),
}
trends[window] = window_entry
return trends
def _pvc_usage_trends() -> dict[str, list[dict[str, Any]]]:
trends: dict[str, list[dict[str, Any]]] = {}
expr = "kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100"
for window in _TREND_WINDOWS:
entries = _vm_vector(
f"topk({_TREND_PVC_LIMIT}, max_over_time(({expr})[{window}]))"
)
trends[window] = _pvc_top(entries)
return trends
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,330 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
from .cluster_state_vm_client import *
from .cluster_state_vm_trends import *
def _postgres_connections(errors: list[str]) -> dict[str, Any]:
postgres: dict[str, Any] = {}
try:
postgres["used"] = _vm_scalar("sum(pg_stat_activity_count)")
postgres["max"] = _vm_scalar("max(pg_settings_max_connections)")
postgres["by_db"] = _vm_vector(
"topk(5, sum by (datname) (pg_stat_activity_count))"
)
postgres["hottest_db"] = _vm_topk(
"topk(1, sum by (datname) (pg_stat_activity_count))",
"datname",
)
except Exception as exc:
errors.append(f"postgres: {exc}")
return postgres
def _hottest_nodes(errors: list[str]) -> dict[str, Any]:
hottest: dict[str, Any] = {}
try:
hottest["cpu"] = _vm_topk(
f'label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{{mode="idle"}}[{_RATE_WINDOW}]))) * 100) '
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["ram"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) '
f'/ node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["net"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]) '
f'+ rate(node_network_transmit_bytes_total{{device!~"lo"}}[{_RATE_WINDOW}]))) * on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
hottest["io"] = _vm_topk(
f'label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[{_RATE_WINDOW}]) + rate(node_disk_written_bytes_total[{_RATE_WINDOW}]))) '
f'* on(instance) group_left(node) label_replace({_NODE_UNAME_LABEL}, "node", "$1", "nodename", "(.*)"))), "__name__", "$1", "node", "(.*)")',
"node",
)
except Exception as exc:
errors.append(f"hottest: {exc}")
return hottest
def _node_usage(errors: list[str]) -> dict[str, Any]:
usage: dict[str, Any] = {}
try:
exprs = _node_usage_exprs()
usage["cpu"] = _vm_node_metric(exprs["cpu"], "node")
usage["ram"] = _vm_node_metric(exprs["ram"], "node")
usage["net"] = _vm_node_metric(exprs["net"], "node")
usage["io"] = _vm_node_metric(exprs["io"], "node")
usage["disk"] = _vm_node_metric(exprs["disk"], "node")
except Exception as exc:
errors.append(f"node_usage: {exc}")
return usage
def _pvc_usage(errors: list[str]) -> list[dict[str, Any]]:
try:
entries = _vm_vector(
"topk(5, max by (namespace,persistentvolumeclaim) "
"(kubelet_volume_stats_used_bytes / kubelet_volume_stats_capacity_bytes * 100))"
)
return _filter_namespace_vector(entries)
except Exception as exc:
errors.append(f"pvc_usage: {exc}")
return []
def _usage_stats(series: list[dict[str, Any]]) -> dict[str, float]:
values: list[float] = []
for entry in series:
if not isinstance(entry, dict):
continue
try:
values.append(float(entry.get("value")))
except (TypeError, ValueError):
continue
if not values:
return {}
return {
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
}
def _vm_namespace_totals(expr: str) -> dict[str, float]:
totals: dict[str, float] = {}
for item in _vm_vector(expr):
metric = item.get("metric") if isinstance(item.get("metric"), dict) else {}
namespace = metric.get("namespace")
if not isinstance(namespace, str) or not namespace:
continue
try:
totals[namespace] = float(item.get("value"))
except (TypeError, ValueError):
continue
return totals
def _build_namespace_capacity(
cpu_usage: dict[str, float],
cpu_requests: dict[str, float],
mem_usage: dict[str, float],
mem_requests: dict[str, float],
) -> list[dict[str, Any]]:
namespaces = sorted(set(cpu_usage) | set(cpu_requests) | set(mem_usage) | set(mem_requests))
output: list[dict[str, Any]] = []
for namespace in namespaces:
cpu_used = cpu_usage.get(namespace)
cpu_req = cpu_requests.get(namespace)
mem_used = mem_usage.get(namespace)
mem_req = mem_requests.get(namespace)
cpu_ratio = None
mem_ratio = None
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)) and cpu_req > 0:
cpu_ratio = cpu_used / cpu_req
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)) and mem_req > 0:
mem_ratio = mem_used / mem_req
output.append(
{
"namespace": namespace,
"cpu_usage": cpu_used,
"cpu_requests": cpu_req,
"cpu_usage_ratio": cpu_ratio,
"mem_usage": mem_used,
"mem_requests": mem_req,
"mem_usage_ratio": mem_ratio,
}
)
output.sort(
key=lambda item: (
-(item.get("cpu_requests") or 0),
-(item.get("mem_requests") or 0),
item.get("namespace") or "",
)
)
return output
def _node_usage_profile(
node_usage: dict[str, list[dict[str, Any]]],
node_details: list[dict[str, Any]],
node_pods: list[dict[str, Any]],
) -> list[dict[str, Any]]:
usage: dict[str, dict[str, Any]] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
for item in node_usage.get(key, []) or []:
node = item.get("node")
value = item.get("value")
if not isinstance(node, str) or not node:
continue
if not isinstance(value, (int, float)):
continue
usage.setdefault(node, {})[key] = float(value)
max_values: dict[str, float] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
values = [entry.get(key) for entry in usage.values() if isinstance(entry.get(key), (int, float))]
max_values[key] = max(values) if values else 0.0
detail_map: dict[str, dict[str, Any]] = {
entry.get("name"): entry for entry in node_details if isinstance(entry, dict)
}
pod_map: dict[str, dict[str, Any]] = {
entry.get("node"): entry for entry in node_pods if isinstance(entry, dict)
}
output: list[dict[str, Any]] = []
for node, entry in usage.items():
detail = detail_map.get(node, {})
pressure = detail.get("pressure") if isinstance(detail.get("pressure"), dict) else {}
pressure_count = sum(1 for value in pressure.values() if value)
taints = detail.get("taints") if isinstance(detail.get("taints"), list) else []
unschedulable = bool(detail.get("unschedulable"))
pods_total = None
pod_entry = pod_map.get(node)
if isinstance(pod_entry, dict):
pods_total = pod_entry.get("pods_total")
normalized: dict[str, float] = {}
for key in ("cpu", "ram", "disk", "net", "io"):
raw = entry.get(key)
max_val = max_values.get(key) or 0.0
if isinstance(raw, (int, float)) and max_val > 0:
normalized[f"{key}_norm"] = raw / max_val
norm_values = [v for v in normalized.values() if isinstance(v, (int, float))]
load_index = sum(norm_values) / len(norm_values) if norm_values else None
output.append(
{
"node": node,
"cpu": entry.get("cpu"),
"ram": entry.get("ram"),
"disk": entry.get("disk"),
"net": entry.get("net"),
"io": entry.get("io"),
**normalized,
"pressure_flags": pressure,
"pressure_count": pressure_count,
"taints": taints,
"unschedulable": unschedulable,
"pods_total": pods_total,
"load_index": load_index,
}
)
output.sort(key=lambda item: (-(item.get("load_index") or 0), item.get("node") or ""))
return output
def _percentile(values: list[float], percentile: float) -> float | None:
if not values:
return None
ordered = sorted(values)
idx = int(round((len(ordered) - 1) * percentile))
idx = min(max(idx, 0), len(ordered) - 1)
return ordered[idx]
def _node_load_summary(node_load: list[dict[str, Any]]) -> dict[str, Any]:
items = [
entry
for entry in node_load
if isinstance(entry, dict) and isinstance(entry.get("load_index"), (int, float))
]
if not items:
return {}
values = [float(entry.get("load_index") or 0) for entry in items]
avg = sum(values) / len(values)
variance = sum((value - avg) ** 2 for value in values) / len(values)
stddev = variance**0.5
top = sorted(items, key=lambda item: -(item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
bottom = sorted(items, key=lambda item: (item.get("load_index") or 0))[:_LOAD_TOP_COUNT]
outliers = [
item
for item in items
if isinstance(item.get("load_index"), (int, float))
and item.get("load_index") >= avg + stddev
]
outliers.sort(key=lambda item: -(item.get("load_index") or 0))
return {
"avg": round(avg, 3),
"p90": round(_percentile(values, 0.9) or 0.0, 3),
"min": round(min(values), 3),
"max": round(max(values), 3),
"top": top,
"bottom": bottom,
"outliers": outliers[:_LOAD_TOP_COUNT],
}
def _namespace_capacity_summary(capacity: list[dict[str, Any]]) -> dict[str, Any]:
if not capacity:
return {}
cpu_ratio = [
entry
for entry in capacity
if isinstance(entry, dict) and isinstance(entry.get("cpu_usage_ratio"), (int, float))
]
mem_ratio = [
entry
for entry in capacity
if isinstance(entry, dict) and isinstance(entry.get("mem_usage_ratio"), (int, float))
]
cpu_ratio.sort(key=lambda item: -(item.get("cpu_usage_ratio") or 0))
mem_ratio.sort(key=lambda item: -(item.get("mem_usage_ratio") or 0))
cpu_headroom: list[dict[str, Any]] = []
mem_headroom: list[dict[str, Any]] = []
for entry in capacity:
if not isinstance(entry, dict):
continue
cpu_used = entry.get("cpu_usage")
cpu_req = entry.get("cpu_requests")
mem_used = entry.get("mem_usage")
mem_req = entry.get("mem_requests")
if isinstance(cpu_used, (int, float)) and isinstance(cpu_req, (int, float)):
cpu_headroom.append(
{
"namespace": entry.get("namespace"),
"headroom": cpu_req - cpu_used,
"usage": cpu_used,
"requests": cpu_req,
"ratio": entry.get("cpu_usage_ratio"),
}
)
if isinstance(mem_used, (int, float)) and isinstance(mem_req, (int, float)):
mem_headroom.append(
{
"namespace": entry.get("namespace"),
"headroom": mem_req - mem_used,
"usage": mem_used,
"requests": mem_req,
"ratio": entry.get("mem_usage_ratio"),
}
)
cpu_headroom.sort(key=lambda item: (item.get("headroom") or 0))
mem_headroom.sort(key=lambda item: (item.get("headroom") or 0))
cpu_over_names = [
entry.get("namespace")
for entry in cpu_ratio
if (entry.get("cpu_usage_ratio") or 0) > 1 and entry.get("namespace")
]
mem_over_names = [
entry.get("namespace")
for entry in mem_ratio
if (entry.get("mem_usage_ratio") or 0) > 1 and entry.get("namespace")
]
over_cpu = len(cpu_over_names)
over_mem = len(mem_over_names)
return {
"cpu_ratio_top": cpu_ratio[:_NAMESPACE_TOP_COUNT],
"mem_ratio_top": mem_ratio[:_NAMESPACE_TOP_COUNT],
"cpu_headroom_low": cpu_headroom[:_NAMESPACE_TOP_COUNT],
"mem_headroom_low": mem_headroom[:_NAMESPACE_TOP_COUNT],
"cpu_overcommitted": over_cpu,
"mem_overcommitted": over_mem,
"cpu_overcommitted_names": sorted({name for name in cpu_over_names if isinstance(name, str)}),
"mem_overcommitted_names": sorted({name for name in mem_over_names if isinstance(name, str)}),
}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -0,0 +1,249 @@
from __future__ import annotations
from typing import Any
from .cluster_state_contract import *
from .cluster_state_nodes import *
def _summarize_jobs(payload: dict[str, Any]) -> dict[str, Any]:
totals = {"total": 0, "active": 0, "failed": 0, "succeeded": 0}
by_namespace: dict[str, dict[str, int]] = {}
failing: list[dict[str, Any]] = []
active_oldest: list[dict[str, Any]] = []
for job in _items(payload):
metadata = job.get("metadata") if isinstance(job.get("metadata"), dict) else {}
status = job.get("status") if isinstance(job.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
created_at = (
metadata.get("creationTimestamp")
if isinstance(metadata.get("creationTimestamp"), str)
else ""
)
if not name or not namespace:
continue
active = int(status.get("active") or 0)
failed = int(status.get("failed") or 0)
succeeded = int(status.get("succeeded") or 0)
totals["total"] += 1
totals["active"] += active
totals["failed"] += failed
totals["succeeded"] += succeeded
entry = by_namespace.setdefault(namespace, {"active": 0, "failed": 0, "succeeded": 0})
entry["active"] += active
entry["failed"] += failed
entry["succeeded"] += succeeded
age_hours = _age_hours(created_at)
if failed > 0:
failing.append(
{
"namespace": namespace,
"job": name,
"failed": failed,
"age_hours": age_hours,
}
)
if active > 0 and age_hours is not None:
active_oldest.append(
{
"namespace": namespace,
"job": name,
"active": active,
"age_hours": age_hours,
}
)
failing.sort(
key=lambda item: (
-(item.get("failed") or 0),
-(item.get("age_hours") or 0.0),
item.get("namespace") or "",
item.get("job") or "",
)
)
active_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0))
namespace_summary = [
{
"namespace": ns,
"active": stats.get("active", 0),
"failed": stats.get("failed", 0),
"succeeded": stats.get("succeeded", 0),
}
for ns, stats in by_namespace.items()
]
namespace_summary.sort(
key=lambda item: (
-(item.get("active") or 0),
-(item.get("failed") or 0),
item.get("namespace") or "",
)
)
return {
"totals": totals,
"by_namespace": namespace_summary[:20],
"failing": failing[:20],
"active_oldest": active_oldest[:20],
}
def _summarize_deployments(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for dep in items:
metadata = dep.get("metadata") if isinstance(dep.get("metadata"), dict) else {}
spec = dep.get("spec") if isinstance(dep.get("spec"), dict) else {}
status = dep.get("status") if isinstance(dep.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(spec.get("replicas") or 0)
ready = int(status.get("readyReplicas") or 0)
available = int(status.get("availableReplicas") or 0)
updated = int(status.get("updatedReplicas") or 0)
if desired <= 0:
continue
if ready < desired or available < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"available": available,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_statefulsets(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for st in items:
metadata = st.get("metadata") if isinstance(st.get("metadata"), dict) else {}
spec = st.get("spec") if isinstance(st.get("spec"), dict) else {}
status = st.get("status") if isinstance(st.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(spec.get("replicas") or 0)
ready = int(status.get("readyReplicas") or 0)
current = int(status.get("currentReplicas") or 0)
updated = int(status.get("updatedReplicas") or 0)
if desired <= 0:
continue
if ready < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"current": current,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_daemonsets(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
unhealthy: list[dict[str, Any]] = []
for ds in items:
metadata = ds.get("metadata") if isinstance(ds.get("metadata"), dict) else {}
status = ds.get("status") if isinstance(ds.get("status"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
desired = int(status.get("desiredNumberScheduled") or 0)
ready = int(status.get("numberReady") or 0)
updated = int(status.get("updatedNumberScheduled") or 0)
if desired <= 0:
continue
if ready < desired:
unhealthy.append(
{
"name": name,
"namespace": namespace,
"desired": desired,
"ready": ready,
"updated": updated,
}
)
unhealthy.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
return {
"total": len(items),
"not_ready": len(unhealthy),
"items": unhealthy,
}
def _summarize_workload_health(
deployments: dict[str, Any],
statefulsets: dict[str, Any],
daemonsets: dict[str, Any],
) -> dict[str, Any]:
return {
"deployments": deployments,
"statefulsets": statefulsets,
"daemonsets": daemonsets,
}
def _summarize_longhorn_volumes(payload: dict[str, Any]) -> dict[str, Any]:
items = _items(payload)
if not items:
return {}
by_state: dict[str, int] = {}
by_robustness: dict[str, int] = {}
degraded: list[dict[str, Any]] = []
attached_count = 0
detached_count = 0
degraded_count = 0
for volume in items:
metadata = volume.get("metadata") if isinstance(volume.get("metadata"), dict) else {}
status = volume.get("status") if isinstance(volume.get("status"), dict) else {}
spec = volume.get("spec") if isinstance(volume.get("spec"), dict) else {}
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
if not name:
continue
state = status.get("state") if isinstance(status.get("state"), str) else "unknown"
robustness = (
status.get("robustness") if isinstance(status.get("robustness"), str) else "unknown"
)
state_lower = state.lower()
robustness_lower = robustness.lower()
by_state[state] = by_state.get(state, 0) + 1
by_robustness[robustness] = by_robustness.get(robustness, 0) + 1
if state_lower == "attached":
attached_count += 1
elif state_lower == "detached":
detached_count += 1
if robustness_lower in {"degraded", "faulted"}:
degraded_count += 1
degraded.append(
{
"name": name,
"state": state,
"robustness": robustness,
"size": spec.get("size"),
"actual_size": status.get("actualSize"),
}
)
degraded.sort(key=lambda item: item.get("name") or "")
return {
"total": len(items),
"by_state": by_state,
"by_robustness": by_robustness,
"attached_count": attached_count,
"detached_count": detached_count,
"degraded": degraded,
"degraded_count": degraded_count,
}
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]

View File

@ -1,5 +1,4 @@
# path reason # path reason
ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
tests/test_services.py test module split planned; broad service contract coverage retained meanwhile tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
tests/test_app.py test module split planned; API coverage retained meanwhile tests/test_app.py test module split planned; API coverage retained meanwhile

1 # path reason
ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
2 tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
3 tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
4 tests/test_app.py test module split planned; API coverage retained meanwhile