atlasbot: derive spine facts from summary

This commit is contained in:
Brad Stein 2026-02-05 14:25:09 -03:00
parent c6b25c27c1
commit 526095ab64

View File

@ -207,7 +207,7 @@ class AnswerEngine:
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used)
spine = _spine_lines(summary_lines)
spine = _spine_from_summary(summary) or _spine_lines(summary_lines)
metric_tokens = _metric_key_tokens(summary_lines)
global_facts = _global_facts(summary_lines)
kb_summary = self._kb.summary()
@ -272,7 +272,9 @@ class AnswerEngine:
force_metric = True
if intent:
spine_line = spine.get(intent.kind)
spine_line = spine.get(intent.kind) if isinstance(spine, dict) else None
if not spine_line:
spine_line = _spine_fallback(intent, summary_lines)
spine_answer = _spine_answer(intent, spine_line)
if spine_line:
key_facts = _merge_fact_lines([spine_line], key_facts)
@ -1736,11 +1738,11 @@ def _spine_answer(intent: IntentMatch, spine_line: str | None) -> str | None:
handler = handlers.get(kind)
if handler:
return handler(spine_line)
return f"From the latest snapshot: {spine_line}."
return spine_line
def _spine_nodes_answer(line: str) -> str:
return f"From the latest snapshot: {line}."
return line
def _spine_non_rpi_answer(line: str) -> str:
@ -1752,19 +1754,19 @@ def _spine_non_rpi_answer(line: str) -> str:
non_rpi.extend(nodes)
if non_rpi:
return "NonRaspberry Pi nodes: " + ", ".join(non_rpi) + "."
return f"From the latest snapshot: {line}."
return line
def _spine_hottest_answer(kind: str, line: str) -> str:
metric = kind.split("_", 1)[1]
hottest = _parse_hottest(line, metric)
if hottest:
return f"From the latest snapshot: {hottest}."
return f"From the latest snapshot: {line}."
return hottest
return line
def _spine_postgres_answer(line: str) -> str:
return f"From the latest snapshot: {line}."
return line
def _spine_namespace_answer(line: str) -> str:
@ -1772,11 +1774,184 @@ def _spine_namespace_answer(line: str) -> str:
top = payload.split(";")[0].strip()
if top:
return f"Namespace with most pods: {top}."
return f"From the latest snapshot: {line}."
return line
def _spine_pressure_answer(line: str) -> str:
return f"From the latest snapshot: {line}."
return line
def _spine_from_summary(summary: dict[str, Any]) -> dict[str, str]:
if not isinstance(summary, dict) or not summary:
return {}
spine: dict[str, str] = {}
spine.update(_spine_from_counts(summary))
spine.update(_spine_from_hardware(summary))
spine.update(_spine_from_hottest(summary))
spine.update(_spine_from_postgres(summary))
spine.update(_spine_from_namespace_pods(summary))
spine.update(_spine_from_pressure(summary))
return spine
def _spine_from_counts(summary: dict[str, Any]) -> dict[str, str]:
counts = summary.get("counts") if isinstance(summary.get("counts"), dict) else {}
inventory = summary.get("inventory") if isinstance(summary.get("inventory"), dict) else {}
workers = inventory.get("workers") if isinstance(inventory.get("workers"), dict) else {}
total = counts.get("nodes_total")
ready = counts.get("nodes_ready")
not_ready = None
if isinstance(inventory.get("not_ready_names"), list):
not_ready = len(inventory.get("not_ready_names") or [])
workers_ready = workers.get("ready")
workers_total = workers.get("total")
if total is None and ready is None:
return {}
parts = []
if total is not None:
parts.append(f"total={int(total)}")
if ready is not None:
parts.append(f"ready={int(ready)}")
if not_ready is not None:
parts.append(f"not_ready={int(not_ready)}")
if workers_total is not None and workers_ready is not None:
parts.append(f"workers_ready={int(workers_ready)}/{int(workers_total)}")
line = "nodes: " + ", ".join(parts)
return {"nodes_count": line, "nodes_ready": line}
def _spine_from_hardware(summary: dict[str, Any]) -> dict[str, str]:
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
if not hardware:
return {}
parts = []
for key, nodes in hardware.items():
if not isinstance(nodes, list):
continue
node_list = ", ".join(str(n) for n in nodes if n)
if node_list:
parts.append(f"{key} ({node_list})")
if not parts:
return {}
return {"nodes_non_rpi": "hardware: " + "; ".join(parts)}
def _spine_from_hottest(summary: dict[str, Any]) -> dict[str, str]:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
top = summary.get("top") if isinstance(summary.get("top"), dict) else {}
top_hottest = top.get("node_hottest") if isinstance(top.get("node_hottest"), dict) else {}
if not hottest and top_hottest:
hottest = top_hottest
elif top_hottest:
for key, value in top_hottest.items():
if key not in hottest and value is not None:
hottest[key] = value
if not hottest:
return {}
mapping = {}
for key in ("cpu", "ram", "net", "io", "disk"):
entry = hottest.get(key)
if not isinstance(entry, dict):
continue
node = entry.get("node") or entry.get("label") or ""
value = entry.get("value")
if node:
mapping[f"hottest_{key}"] = f"{key}={node} ({_format_metric_value(value)})"
if not mapping:
return {}
return mapping
def _spine_from_postgres(summary: dict[str, Any]) -> dict[str, str]:
postgres = summary.get("postgres") if isinstance(summary.get("postgres"), dict) else {}
if not postgres:
top = summary.get("top") if isinstance(summary.get("top"), dict) else {}
postgres = top.get("postgres") if isinstance(top.get("postgres"), dict) else {}
if not postgres:
return {}
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
hottest_label = hottest.get("label") or ""
facts: dict[str, str] = {}
if used is not None and max_conn is not None:
facts["postgres_connections"] = f"postgres_connections_total: used={int(used)}, max={int(max_conn)}"
if hottest_label:
facts["postgres_hottest"] = f"postgres_hottest_db: {hottest_label}"
return facts
def _spine_from_namespace_pods(summary: dict[str, Any]) -> dict[str, str]:
pods = summary.get("namespace_pods") if isinstance(summary.get("namespace_pods"), list) else []
if not pods:
top = summary.get("top") if isinstance(summary.get("top"), dict) else {}
pods = top.get("namespace_pods") if isinstance(top.get("namespace_pods"), list) else []
if not pods:
return {}
best_name = ""
best_value = None
for entry in pods:
if not isinstance(entry, dict):
continue
name = entry.get("namespace") or entry.get("name") or entry.get("label") or ""
value = entry.get("pods") or entry.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
numeric = None
if name and numeric is not None and (best_value is None or numeric > best_value):
best_name = name
best_value = numeric
if best_name:
return {"namespace_most_pods": f"namespace_most_pods: {best_name} ({int(best_value or 0)} pods)"}
return {}
def _spine_from_pressure(summary: dict[str, Any]) -> dict[str, str]:
pressure = summary.get("pressure_summary") if isinstance(summary.get("pressure_summary"), dict) else {}
if not pressure:
return {}
total = pressure.get("total")
unsched = pressure.get("unschedulable")
parts = []
if total is not None:
parts.append(f"total={int(total)}")
if unsched is not None:
parts.append(f"unschedulable={int(unsched)}")
if parts:
return {"pressure_summary": "pressure_nodes: " + ", ".join(parts)}
return {}
def _spine_fallback(intent: IntentMatch, lines: list[str]) -> str | None:
if not lines:
return None
keywords = {
"postgres_hottest": ("postgres_hottest", "hottest_db", "postgres"),
"namespace_most_pods": ("namespace", "pods", "namespaces_top"),
"pressure_summary": ("pressure", "node_load_top"),
}
for token in keywords.get(intent.kind, ("",)):
if not token:
continue
for line in lines:
if token in line:
return line
return None
def _format_metric_value(value: Any) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return str(value)
if num >= 1024 * 1024:
return f"{num / (1024 * 1024):.2f} MB/s"
if num >= 1024:
return f"{num / 1024:.2f} KB/s"
if num >= 1:
return f"{num:.2f}"
return f"{num:.4f}"
async def _select_metric_chunks(