331 lines
12 KiB
Python
331 lines
12 KiB
Python
from __future__ import annotations
|
|
from .common import *
|
|
from .nodes import *
|
|
|
|
def _summarize_kustomizations(payload: dict[str, Any]) -> dict[str, Any]:
|
|
not_ready: list[dict[str, Any]] = []
|
|
for item in _items(payload):
|
|
metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
|
|
spec = item.get("spec") if isinstance(item.get("spec"), dict) else {}
|
|
status = item.get("status") if isinstance(item.get("status"), dict) else {}
|
|
name = metadata.get("name") if isinstance(metadata.get("name"), str) else ""
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
conditions = status.get("conditions")
|
|
ready, reason, message = _condition_status(conditions, "Ready")
|
|
suspended = bool(spec.get("suspend"))
|
|
if ready is True and not suspended:
|
|
continue
|
|
not_ready.append(
|
|
{
|
|
"name": name,
|
|
"namespace": namespace,
|
|
"ready": ready,
|
|
"suspended": suspended,
|
|
"reason": reason,
|
|
"message": message,
|
|
}
|
|
)
|
|
not_ready.sort(key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
|
|
return {
|
|
"total": len(_items(payload)),
|
|
"not_ready": len(not_ready),
|
|
"items": not_ready,
|
|
}
|
|
|
|
|
|
def _namespace_allowed(namespace: str) -> bool:
|
|
if not namespace:
|
|
return False
|
|
if namespace in _WORKLOAD_ALLOWED_NAMESPACES:
|
|
return True
|
|
return namespace not in _SYSTEM_NAMESPACES
|
|
|
|
|
|
def _event_timestamp(event: dict[str, Any]) -> str:
|
|
for key in ("eventTime", "lastTimestamp", "firstTimestamp"):
|
|
value = event.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def _event_sort_key(timestamp: str) -> float:
|
|
if not timestamp:
|
|
return 0.0
|
|
try:
|
|
return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp()
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
|
|
def _summarize_events(payload: dict[str, Any]) -> dict[str, Any]:
|
|
warnings: list[dict[str, Any]] = []
|
|
by_reason: dict[str, int] = {}
|
|
by_namespace: dict[str, int] = {}
|
|
for event in _items(payload):
|
|
metadata = event.get("metadata") if isinstance(event.get("metadata"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
event_type = event.get("type") if isinstance(event.get("type"), str) else ""
|
|
if event_type != _EVENT_WARNING:
|
|
continue
|
|
reason = event.get("reason") if isinstance(event.get("reason"), str) else ""
|
|
message = event.get("message") if isinstance(event.get("message"), str) else ""
|
|
count = event.get("count") if isinstance(event.get("count"), int) else 1
|
|
involved = (
|
|
event.get("involvedObject") if isinstance(event.get("involvedObject"), dict) else {}
|
|
)
|
|
timestamp = _event_timestamp(event)
|
|
warnings.append(
|
|
{
|
|
"namespace": namespace,
|
|
"reason": reason,
|
|
"message": message,
|
|
"count": count,
|
|
"last_seen": timestamp,
|
|
"object_kind": involved.get("kind") or "",
|
|
"object_name": involved.get("name") or "",
|
|
}
|
|
)
|
|
if reason:
|
|
by_reason[reason] = by_reason.get(reason, 0) + count
|
|
if namespace:
|
|
by_namespace[namespace] = by_namespace.get(namespace, 0) + count
|
|
warnings.sort(key=lambda item: _event_sort_key(item.get("last_seen") or ""), reverse=True)
|
|
top = warnings[:_EVENTS_MAX]
|
|
top_reason = ""
|
|
top_reason_count = 0
|
|
if by_reason:
|
|
top_reason, top_reason_count = sorted(
|
|
by_reason.items(), key=lambda item: (-item[1], item[0])
|
|
)[0]
|
|
latest_warning = top[0] if top else None
|
|
return {
|
|
"warnings_total": len(warnings),
|
|
"warnings_by_reason": by_reason,
|
|
"warnings_by_namespace": by_namespace,
|
|
"warnings_recent": top,
|
|
"warnings_top_reason": {"reason": top_reason, "count": top_reason_count},
|
|
"warnings_latest": latest_warning,
|
|
}
|
|
|
|
|
|
def _workload_from_labels(labels: dict[str, Any]) -> tuple[str, str]:
|
|
for key in _WORKLOAD_LABEL_KEYS:
|
|
value = labels.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value, f"label:{key}"
|
|
return "", ""
|
|
|
|
|
|
def _owner_reference(metadata: dict[str, Any]) -> tuple[str, str]:
|
|
owners = metadata.get("ownerReferences") if isinstance(metadata.get("ownerReferences"), list) else []
|
|
for owner in owners:
|
|
if not isinstance(owner, dict):
|
|
continue
|
|
name = owner.get("name")
|
|
kind = owner.get("kind")
|
|
if isinstance(name, str) and name:
|
|
return name, f"owner:{kind or 'unknown'}"
|
|
return "", ""
|
|
|
|
|
|
def _pod_workload(meta: dict[str, Any]) -> tuple[str, str]:
|
|
labels = meta.get("labels") if isinstance(meta.get("labels"), dict) else {}
|
|
name, source = _workload_from_labels(labels)
|
|
if name:
|
|
return name, source
|
|
return _owner_reference(meta)
|
|
|
|
|
|
def _summarize_workloads(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
workloads: dict[tuple[str, str], dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
workload, source = _pod_workload(metadata)
|
|
if not workload:
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
key = (namespace, workload)
|
|
entry = workloads.setdefault(
|
|
key,
|
|
{
|
|
"namespace": namespace,
|
|
"workload": workload,
|
|
"source": source,
|
|
"nodes": {},
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
if node:
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in workloads.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (item.get("namespace") or "", item.get("workload") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
elif phase == "Pending":
|
|
entry["pods_pending"] += 1
|
|
elif phase == "Failed":
|
|
entry["pods_failed"] += 1
|
|
elif phase == "Succeeded":
|
|
entry["pods_succeeded"] += 1
|
|
output = list(namespaces.values())
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
def _summarize_namespace_nodes(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
namespaces: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
continue
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
continue
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
entry = namespaces.setdefault(
|
|
namespace,
|
|
{
|
|
"namespace": namespace,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"nodes": {},
|
|
},
|
|
)
|
|
entry["pods_total"] += 1
|
|
if phase == "Running":
|
|
entry["pods_running"] += 1
|
|
nodes = entry["nodes"]
|
|
nodes[node] = nodes.get(node, 0) + 1
|
|
output: list[dict[str, Any]] = []
|
|
for entry in namespaces.values():
|
|
nodes = entry.get("nodes") or {}
|
|
primary = ""
|
|
if isinstance(nodes, dict) and nodes:
|
|
primary = sorted(nodes.items(), key=lambda item: (-item[1], item[0]))[0][0]
|
|
entry["primary_node"] = primary
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("namespace") or ""))
|
|
return output
|
|
|
|
|
|
_NODE_PHASE_KEYS = {
|
|
"Running": "pods_running",
|
|
"Pending": "pods_pending",
|
|
"Failed": "pods_failed",
|
|
"Succeeded": "pods_succeeded",
|
|
}
|
|
|
|
|
|
def _summarize_node_pods(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
nodes: dict[str, dict[str, Any]] = {}
|
|
for pod in _items(payload):
|
|
context = _node_pod_context(pod)
|
|
if not context:
|
|
continue
|
|
node, namespace, phase = context
|
|
entry = _node_pod_entry(nodes, node)
|
|
_node_pod_apply(entry, namespace, phase)
|
|
return _node_pod_finalize(nodes)
|
|
|
|
|
|
def _node_pod_context(pod: dict[str, Any]) -> tuple[str, str, str] | None:
|
|
metadata = pod.get("metadata") if isinstance(pod.get("metadata"), dict) else {}
|
|
namespace = metadata.get("namespace") if isinstance(metadata.get("namespace"), str) else ""
|
|
if not _namespace_allowed(namespace):
|
|
return None
|
|
spec = pod.get("spec") if isinstance(pod.get("spec"), dict) else {}
|
|
node = spec.get("nodeName") if isinstance(spec.get("nodeName"), str) else ""
|
|
if not node:
|
|
return None
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
phase = status.get("phase") if isinstance(status.get("phase"), str) else ""
|
|
return node, namespace, phase
|
|
|
|
|
|
def _node_pod_entry(nodes: dict[str, dict[str, Any]], node: str) -> dict[str, Any]:
|
|
return nodes.setdefault(
|
|
node,
|
|
{
|
|
"node": node,
|
|
"pods_total": 0,
|
|
"pods_running": 0,
|
|
"pods_pending": 0,
|
|
"pods_failed": 0,
|
|
"pods_succeeded": 0,
|
|
"namespaces": {},
|
|
},
|
|
)
|
|
|
|
|
|
def _node_pod_apply(entry: dict[str, Any], namespace: str, phase: str) -> None:
|
|
entry["pods_total"] += 1
|
|
phase_key = _NODE_PHASE_KEYS.get(phase)
|
|
if phase_key:
|
|
entry[phase_key] += 1
|
|
if namespace:
|
|
namespaces = entry["namespaces"]
|
|
namespaces[namespace] = namespaces.get(namespace, 0) + 1
|
|
|
|
|
|
def _node_pod_finalize(nodes: dict[str, dict[str, Any]]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for entry in nodes.values():
|
|
namespaces = entry.get("namespaces") or {}
|
|
if isinstance(namespaces, dict):
|
|
entry["namespaces_top"] = sorted(
|
|
namespaces.items(), key=lambda item: (-item[1], item[0])
|
|
)[:3]
|
|
output.append(entry)
|
|
output.sort(key=lambda item: (-item.get("pods_total", 0), item.get("node") or ""))
|
|
return output
|
|
|
|
__all__ = [name for name in globals() if not name.startswith("__")]
|