341 lines
13 KiB
Python
341 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from .cluster_state_contract import *
|
|
from .cluster_state_flux_events import *
|
|
from .cluster_state_nodes import *
|
|
|
|
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
|
|
for key in _WORKLOAD_LABEL_KEYS:
|
|
value = labels.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value, f"label:{key}"
|
|
return "", ""
|
|
|
|
|
|
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
|
|
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
|
|
for owner in owners:
|
|
if not isinstance(owner, dict):
|
|
continue
|
|
name = owner.get("name")
|
|
kind = owner.get("kind")
|
|
if isinstance(name, str) and name:
|
|
return name, f"owner:{kind or 'unknown'}"
|
|
return "", ""
|
|
|
|
|
|
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
|
|
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
|
|
name, source = _workload_from_labels(labels)
|
|
if name:
|
|
return name, source
|
|
return _owner_reference(meta)
|
|
|
|
|
|
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
workloads: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
workload, source = _pod_workload(metadata)
|
|
if not workload:
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
key = (namespace, workload)
|
|
entry = workloads.setdefault(
|
|
key,
|
|
{
|
|
"namespace": namespace,
|
|
"workload": workload,
|
|
"source": source,
|
|
"nodes": {},
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
if node:
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in workloads.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
elif phase == "Pending":
|
|
entry["pods_pending"] += 1
|
|
elif phase == "Failed":
|
|
entry["pods_failed"] += 1
|
|
elif phase == "Succeeded":
|
|
entry["pods_succeeded"] += 1
|
|
output = list(namespaces.values())
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_nodes(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"nodes": {},
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in namespaces.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
_NODE_PHASE_KEYS = {
|
|
"Running": "pods_running",
|
|
"Pending": "pods_pending",
|
|
"Failed": "pods_failed",
|
|
"Succeeded": "pods_succeeded",
|
|
}
|
|
|
|
|
|
def _summarize_node_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
nodes: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
context = _node_pod_context(pod)
|
|
if not context:
|
|
continue
|
|
node, namespace, phase = context
|
|
entry = _node_pod_entry(nodes, node)
|
|
_node_pod_apply(entry, namespace, phase)
|
|
return _node_pod_finalize(nodes)
|
|
|
|
|
|
def _node_pod_context(pod: dict[str, Any]) -> tuple[str, str, str] | None:
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
return None
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
return None
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
return node, namespace, phase
|
|
|
|
|
|
def _node_pod_entry(nodes: dict[str, dict[str, Any]], node: str) -> dict[str, Any]:
|
|
return nodes.setdefault(
|
|
node,
|
|
{
|
|
"node": node,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
"namespaces": {},
|
|
},
|
|
)
|
|
|
|
|
|
def _node_pod_apply(entry: dict[str, Any], namespace: str, phase: str) -> None:
|
|
entry["pods_total"] += 1
|
|
phase_key = _NODE_PHASE_KEYS.get(phase)
|
|
if phase_key:
|
|
entry[phase_key] += 1
|
|
if namespace:
|
|
namespaces = entry["namespaces"]
|
|
namespaces[namespace] = namespaces.get(namespace, 0) + 1
|
|
|
|
|
|
def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for entry in nodes.values():
|
|
namespaces = entry.get("namespaces") or {}
|
|
if isinstance(namespaces, dict):
|
|
entry["namespaces_top"] = sorted(
|
|
namespaces.items(), key=lambda item: (-item[1], item[0])
|
|
)[:3]
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("node") or ""))
|
|
return output
|
|
|
|
|
|
def _node_pods_top(node_pods: list[dict[str, Any]], limit: int = 5) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for entry in node_pods[:limit]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
output.append(
|
|
{
|
|
"node": entry.get("node"),
|
|
"pods_total": entry.get("pods_total"),
|
|
"pods_running": entry.get("pods_running"),
|
|
"namespaces_top": entry.get("namespaces_top") or [],
|
|
}
|
|
)
|
|
return output
|
|
|
|
|
|
def _record_pending_pod(pending_oldest: list[dict[str, Any]], info: dict[str, Any]) -> bool:
|
|
age_hours = info.get("age_hours")
|
|
if age_hours is None:
|
|
return False
|
|
pending_oldest.append(info)
|
|
return age_hours >= _PENDING_15M_HOURS
|
|
|
|
|
|
def _update_pod_issue(pod: dict[str, Any], acc: dict[str, Any]) -> None:
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
created_at = (
|
|
metadata.get("creationTimestamp")
|
|
if isinstance(metadata.get("creationTimestamp"), str)
|
|
else ""
|
|
)
|
|
age_hours = _age_hours(created_at)
|
|
if not name or not namespace:
|
|
return
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
restarts = 0
|
|
waiting_reasons: list[str] = []
|
|
for container in status.get("containerStatuses") or []:
|
|
if not isinstance(container, dict):
|
|
continue
|
|
restarts += int(container.get("restartCount") or 0)
|
|
state = container.get("state") if isinstance(container.get("state"), dict) else {}
|
|
waiting = state.get("waiting") if isinstance(state.get("waiting"), dict) else {}
|
|
reason = waiting.get("reason")
|
|
if isinstance(reason, str) and reason:
|
|
waiting_reasons.append(reason)
|
|
acc["waiting_reasons"][reason] = acc["waiting_reasons"].get(reason, 0) + 1
|
|
phase_reason = status.get("reason")
|
|
if isinstance(phase_reason, str) and phase_reason:
|
|
acc["phase_reasons"][phase_reason] = acc["phase_reasons"].get(phase_reason, 0) + 1
|
|
if phase in acc["counts"]:
|
|
acc["counts"][phase] += 1
|
|
if phase in _PHASE_SEVERITY or restarts > 0:
|
|
acc["items"].append(
|
|
{
|
|
"namespace": namespace,
|
|
"pod": name,
|
|
"node": spec.get("nodeName") or "",
|
|
"phase": phase,
|
|
"reason": status.get("reason") or "",
|
|
"restarts": restarts,
|
|
"waiting_reasons": sorted(set(waiting_reasons)),
|
|
"created_at": created_at,
|
|
"age_hours": age_hours,
|
|
}
|
|
)
|
|
if phase == "Pending":
|
|
info = {
|
|
"namespace": namespace,
|
|
"pod": name,
|
|
"node": spec.get("nodeName") or "",
|
|
"age_hours": age_hours,
|
|
"reason": status.get("reason") or "",
|
|
}
|
|
if _record_pending_pod(acc["pending_oldest"], info):
|
|
acc["pending_over_15m"] += 1
|
|
|
|
|
|
def _summarize_pod_issues(payload: dict[str, Any]) -> dict[str, Any]:
|
|
acc = {
|
|
"items": [],
|
|
"counts": {key: 0 for key in _PHASE_SEVERITY},
|
|
"pending_oldest": [],
|
|
"pending_over_15m": 0,
|
|
"waiting_reasons": {},
|
|
"phase_reasons": {},
|
|
}
|
|
for pod in _items(payload):
|
|
if isinstance(pod, dict):
|
|
_update_pod_issue(pod, acc)
|
|
items = acc["items"]
|
|
items.sort(
|
|
key=lambda item: (
|
|
-_PHASE_SEVERITY.get(item.get("phase") or "", 0),
|
|
-(item.get("restarts") or 0),
|
|
item.get("namespace") or "",
|
|
item.get("pod") or "",
|
|
)
|
|
)
|
|
pending_oldest = acc["pending_oldest"]
|
|
pending_oldest.sort(key=lambda item: -(item.get("age_hours") or 0.0))
|
|
return {
|
|
"counts": acc["counts"],
|
|
"items": items[:20],
|
|
"pending_oldest": pending_oldest[:10],
|
|
"pending_over_15m": acc["pending_over_15m"],
|
|
"waiting_reasons": acc["waiting_reasons"],
|
|
"phase_reasons": acc["phase_reasons"],
|
|
}
|
|
|
|
__all__ = [name for name in globals() if (name.startswith("_") and not name.startswith("__")) or name in {"ClusterStateSummary", "SignalContext"}]
|