2801 lines
102 KiB
Python
Raw Normal View History

2026-01-13 09:59:39 -03:00
import collections
import json
import os
import re
import ssl
import threading
2026-01-13 09:59:39 -03:00
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
2026-01-13 09:59:39 -03:00
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30"))
2026-01-13 09:59:39 -03:00
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
2026-01-13 09:59:39 -03:00
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000"))
MAX_CONTEXT_CHARS = int(os.environ.get("ATLASBOT_MAX_CONTEXT_CHARS", "12000"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2"))
OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false"
2026-01-13 09:59:39 -03:00
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
"system",
"systems",
"service",
"services",
"app",
"apps",
"platform",
"software",
"tool",
"tools",
2026-01-13 09:59:39 -03:00
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
2026-01-13 09:59:39 -03:00
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
2026-01-13 09:59:39 -03:00
"status",
"storage",
"usage",
2026-01-13 09:59:39 -03:00
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
"pod",
"pods",
2026-01-13 09:59:39 -03:00
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE)
_DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D"
CONFIDENCE_RE = re.compile(r"confidence\s*:\s*(high|medium|low)\b", re.IGNORECASE)
OPERATION_HINTS = {
"count": ("how many", "count", "number", "total"),
"list": ("list", "which", "what are", "show", "names"),
"top": ("top", "hottest", "highest", "most", "largest", "max", "maximum"),
"status": ("ready", "not ready", "unready", "down", "missing", "status"),
}
METRIC_HINTS = {
"cpu": ("cpu",),
"ram": ("ram", "memory", "mem"),
"net": ("net", "network", "bandwidth", "throughput"),
"io": ("io", "disk", "storage"),
"connections": ("connections", "conn", "postgres", "database", "db"),
"pods": ("pods", "pod"),
}
CLUSTER_HINT_WORDS = {
"atlas",
"titan",
"cluster",
"k8s",
"kubernetes",
"node",
"nodes",
"worker",
"workers",
"pod",
"pods",
"namespace",
"service",
"deployment",
"daemonset",
"statefulset",
"grafana",
"victoria",
"prometheus",
"ariadne",
"mailu",
"nextcloud",
"vaultwarden",
"firefly",
"wger",
"jellyfin",
"planka",
"budget",
"element",
"synapse",
"mas",
"comms",
"longhorn",
"harbor",
"jenkins",
"gitea",
"flux",
"keycloak",
"postgres",
"database",
"db",
"atlasbot",
"jetson",
"rpi",
"raspberry",
"amd64",
"arm64",
}
_INSIGHT_HINT_WORDS = {
"interesting",
"unconventional",
"surprising",
"weird",
"odd",
"fun",
"cool",
"unique",
"notable",
}
_OVERVIEW_HINT_WORDS = {
"overview",
"summary",
"describe",
"explain",
"tell me about",
"what do you know",
}
_OLLAMA_LOCK = threading.Lock()
HARDWARE_HINTS = {
"amd64": ("amd64", "x86", "x86_64", "x86-64"),
"jetson": ("jetson",),
"rpi4": ("rpi4",),
"rpi5": ("rpi5",),
"rpi": ("rpi", "raspberry"),
"arm64": ("arm64", "aarch64"),
}
def normalize_query(text: str) -> str:
cleaned = (text or "").lower()
for ch in _DASH_CHARS:
cleaned = cleaned.replace(ch, "-")
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
2026-01-26 00:52:35 -03:00
2026-01-13 09:59:39 -03:00
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
def _ensure_confidence(text: str) -> str:
if not text:
return ""
lines = text.strip().splitlines()
for idx, line in enumerate(lines):
match = CONFIDENCE_RE.search(line)
if match:
level = match.group(1).lower()
lines[idx] = CONFIDENCE_RE.sub(f"Confidence: {level}", line)
return "\n".join(lines)
lines.append("Confidence: medium")
return "\n".join(lines)
2026-01-27 12:33:56 -03:00
def _ollama_endpoint() -> str:
url = (OLLAMA_URL or "").strip()
if not url:
return ""
if url.endswith("/api/chat"):
return url
return url.rstrip("/") + "/api/chat"
def _history_to_messages(lines: list[str]) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
for line in lines:
raw = (line or "").strip()
if not raw:
continue
role = "user"
content = raw
lowered = raw.lower()
if lowered.startswith("atlas:"):
role = "assistant"
content = raw.split(":", 1)[1].strip()
elif lowered.startswith("user:"):
role = "user"
content = raw.split(":", 1)[1].strip()
elif ":" in raw:
content = raw.split(":", 1)[1].strip()
if content:
messages.append({"role": role, "content": content})
return messages
2026-01-13 09:59:39 -03:00
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
2026-01-26 15:54:00 -03:00
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
2026-01-13 09:59:39 -03:00
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
2026-01-26 15:54:00 -03:00
if _body_mentions_token(body or ""):
return True
2026-01-13 09:59:39 -03:00
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
def _strip_bot_mention(text: str) -> str:
if not text:
return ""
if not MENTION_LOCALPARTS:
return text.strip()
names = [re.escape(name) for name in MENTION_LOCALPARTS if name]
if not names:
return text.strip()
pattern = r"^(?:\s*@?(?:" + "|".join(names) + r")(?::)?\s+)+"
cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE).strip()
return cleaned or text.strip()
2026-01-13 09:59:39 -03:00
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
2026-01-13 09:59:39 -03:00
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
2026-01-13 09:59:39 -03:00
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
2026-01-13 09:59:39 -03:00
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
2026-01-13 09:59:39 -03:00
def _score_kb_docs(query: str) -> list[dict[str, Any]]:
2026-01-13 09:59:39 -03:00
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return []
2026-01-13 09:59:39 -03:00
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return []
2026-01-13 09:59:39 -03:00
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [d for _, d in scored]
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q:
return ""
scored = _score_kb_docs(q)
picked = scored[:limit]
2026-01-13 09:59:39 -03:00
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def kb_retrieve_titles(query: str, *, limit: int = 4) -> str:
scored = _score_kb_docs(query)
picked = scored[:limit]
if not picked:
return ""
parts = ["Relevant runbooks:"]
for doc in picked:
title = doc.get("title") or doc.get("path") or "runbook"
path = doc.get("path") or ""
if path:
parts.append(f"- {title} ({path})")
else:
parts.append(f"- {title}")
return "\n".join(parts)
def _extract_titan_nodes(text: str) -> list[str]:
cleaned = normalize_query(text)
names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(cleaned):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _humanize_rate(value: str, *, unit: str) -> str:
try:
val = float(value)
except (TypeError, ValueError):
return value
if unit == "%":
return f"{val:.1f}%"
if val >= 1024 * 1024:
return f"{val / (1024 * 1024):.2f} MB/s"
if val >= 1024:
return f"{val / 1024:.2f} KB/s"
return f"{val:.2f} B/s"
def _has_any(text: str, phrases: tuple[str, ...]) -> bool:
return any(p in text for p in phrases)
def _detect_operation(q: str) -> str | None:
if _has_any(q, OPERATION_HINTS["top"]):
return "top"
for op, phrases in OPERATION_HINTS.items():
if op == "top":
continue
if _has_any(q, phrases):
return op
return None
def _detect_metric(q: str) -> str | None:
tokens = set(_tokens(q))
for metric, phrases in METRIC_HINTS.items():
for phrase in phrases:
if " " in phrase:
if phrase in q:
return metric
elif phrase in tokens:
return metric
return None
def _detect_hardware_filters(q: str) -> tuple[set[str], set[str]]:
include: set[str] = set()
exclude: set[str] = set()
rpi_specific = "rpi4" in q or "rpi5" in q
for hardware, phrases in HARDWARE_HINTS.items():
if hardware == "rpi" and rpi_specific:
continue
for phrase in phrases:
if f"non {phrase}" in q or f"non-{phrase}" in q or f"not {phrase}" in q:
exclude.add(hardware)
elif phrase in q:
include.add(hardware)
return include, exclude
def _detect_role_filters(q: str) -> set[str]:
roles: set[str] = set()
if "control-plane" in q or "control plane" in q:
roles.add("control-plane")
if "master" in q:
roles.add("master")
if "accelerator" in q:
roles.add("accelerator")
return roles
def _detect_entity(q: str) -> str | None:
if "node" in q or "nodes" in q or "worker" in q or "hardware" in q or "architecture" in q or TITAN_NODE_RE.search(q):
return "node"
if "pod" in q or "pods" in q:
return "pod"
if "namespace" in q or "namespaces" in q:
return "namespace"
return None
def _metric_entry_score(entry: dict[str, Any], tokens: list[str], *, metric: str | None, op: str | None) -> int:
hay = _metric_tokens(entry)
score = 0
for t in set(tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if metric:
for phrase in METRIC_HINTS.get(metric, (metric,)):
if phrase in hay:
score += 3
if op == "top" and ("hottest" in hay or "top" in hay):
score += 3
if "node" in hay:
score += 1
return score
def _select_metric_entry(tokens: list[str], *, metric: str | None, op: str | None) -> dict[str, Any] | None:
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
score = _metric_entry_score(entry, tokens, metric=metric, op=op)
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _apply_node_filter(expr: str, node_regex: str | None) -> str:
if not node_regex:
return expr
needle = 'node_uname_info{nodename!=""}'
replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}'
return expr.replace(needle, replacement)
def _metric_expr_uses_percent(entry: dict[str, Any]) -> bool:
exprs = entry.get("exprs")
expr = exprs[0] if isinstance(exprs, list) and exprs else ""
return "* 100" in expr or "*100" in expr
def _format_metric_value(value: str, *, percent: bool, rate: bool = False) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return value
if percent:
return f"{num:.1f}%"
if rate:
return _humanize_rate(value, unit="rate")
if abs(num) >= 1:
return f"{num:.2f}".rstrip("0").rstrip(".")
return f"{num:.4f}".rstrip("0").rstrip(".")
def _format_metric_label(metric: dict[str, Any]) -> str:
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts:
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
return ", ".join(label_parts) if label_parts else "series"
def _primary_series_metric(res: dict | None) -> tuple[str | None, str | None]:
series = _vm_value_series(res or {})
if not series:
return (None, None)
first = series[0]
metric = first.get("metric") if isinstance(first, dict) else {}
value = first.get("value") if isinstance(first, dict) else []
node = metric.get("node") if isinstance(metric, dict) else None
val = value[1] if isinstance(value, list) and len(value) > 1 else None
return (node, val)
def _format_metric_answer(entry: dict[str, Any], res: dict | None) -> str:
series = _vm_value_series(res)
panel = entry.get("panel_title") or "Metric"
if not series:
return ""
percent = _metric_expr_uses_percent(entry)
lines: list[str] = []
for r in series[:5]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
label = _format_metric_label(metric if isinstance(metric, dict) else {})
lines.append(f"{label}: {_format_metric_value(val, percent=percent)}")
if not lines:
return ""
if len(lines) == 1:
return f"{panel}: {lines[0]}."
return f"{panel}:\n" + "\n".join(f"- {line}" for line in lines)
def _inventory_filter(
inventory: list[dict[str, Any]],
*,
include_hw: set[str],
exclude_hw: set[str],
only_workers: bool,
only_ready: bool | None,
nodes_in_query: list[str],
) -> list[dict[str, Any]]:
results = inventory
if nodes_in_query:
results = [node for node in results if node.get("name") in nodes_in_query]
if only_workers:
results = [node for node in results if node.get("is_worker") is True]
if only_ready is True:
results = [node for node in results if node.get("ready") is True]
if only_ready is False:
results = [node for node in results if node.get("ready") is False]
if include_hw:
results = [node for node in results if _hardware_match(node, include_hw)]
if exclude_hw:
results = [node for node in results if not _hardware_match(node, exclude_hw)]
return results
def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool:
hw = node.get("hardware") or ""
arch = node.get("arch") or ""
for f in filters:
if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"):
return True
if f == "arm64" and arch == "arm64":
return True
if hw == f:
return True
if f == "amd64" and arch == "amd64":
return True
return False
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5", "rpi"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
return "amd64"
if arch == "arm64":
return "arm64-unknown"
return "unknown"
def node_inventory_live() -> list[dict[str, Any]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return []
items = data.get("items") or []
inventory: list[dict[str, Any]] = []
for node in items if isinstance(items, list) else []:
meta = node.get("metadata") or {}
labels = meta.get("labels") or {}
name = meta.get("name") or ""
if not name:
continue
inventory.append(
{
"name": name,
"arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": _hardware_class(labels),
"roles": _node_roles(labels),
"is_worker": _node_is_worker(node),
"ready": _node_ready_status(node),
}
)
return sorted(inventory, key=lambda item: item["name"])
def node_inventory() -> list[dict[str, Any]]:
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot)
if inventory:
return inventory
return node_inventory_live()
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[node.get("hardware") or "unknown"].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
2026-01-26 19:29:26 -03:00
def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str:
q = normalize_query(query)
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
2026-01-26 19:29:26 -03:00
if inventory is None:
inventory = node_inventory()
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = sum(1 for node in inventory if node.get("ready") is True)
not_ready = sum(1 for node in inventory if node.get("ready") is False)
lines: list[str] = [
"Node inventory (live):",
f"- total: {total}, ready: {ready}, not ready: {not_ready}",
]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
if key in groups:
lines.append(f"- {key}: {', '.join(groups[key])}")
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}")
unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", [])
if unknowns:
lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.")
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_workers, not_ready_workers = worker_nodes_status()
missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers))
lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}")
lines.append(f"- workers_ready: {', '.join(ready_workers)}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if missing:
lines.append(f"- workers_missing (derived): {', '.join(missing)}")
return "\n".join(lines)
2026-01-26 19:29:26 -03:00
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = normalize_query(prompt)
2026-01-26 19:29:26 -03:00
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory()
2026-01-26 19:29:26 -03:00
return []
def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[(node.get("arch") or "unknown")].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]:
usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
per_node: dict[str, dict[str, Any]] = {}
for metric_name, entries in usage.items() if isinstance(usage, dict) else []:
if not isinstance(entries, list):
continue
for entry in entries:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
per_node.setdefault(node, {})[metric_name] = entry.get("value")
return [{"node": node, **vals} for node, vals in sorted(per_node.items())]
def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]:
cleaned: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
cleaned.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes": entry.get("nodes"),
}
)
cleaned.sort(
key=lambda item: (
-(item.get("pods_total") or 0),
str(item.get("namespace") or ""),
str(item.get("workload") or ""),
)
)
return cleaned[:limit]
def _workloads_for_prompt(prompt: str, workloads: list[dict[str, Any]], limit: int = 12) -> list[dict[str, Any]]:
tokens = set(_tokens(prompt))
if tokens:
matched: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
entry_tokens = _workload_tokens(entry)
if entry_tokens & tokens:
matched.append(entry)
if matched:
return _workloads_for_facts(matched, limit=limit)
return _workloads_for_facts(workloads, limit=limit)
def facts_context(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
inv = inventory or []
nodes_in_query = _extract_titan_nodes(prompt)
metrics = _snapshot_metrics(snapshot)
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
expected_workers = expected_worker_nodes_from_metrics()
ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], [])
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
not_ready_names = summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names")
by_hardware = _group_nodes(inv) if inv else {}
by_arch = _nodes_by_arch(inv) if inv else {}
control_plane_nodes = [
node["name"]
for node in inv
if any(role in ("control-plane", "master") for role in (node.get("roles") or []))
]
worker_nodes = [node["name"] for node in inv if node.get("is_worker") is True]
lines: list[str] = ["Facts (live snapshot):"]
if total is not None:
lines.append(f"- nodes_total={total}, ready={ready}, not_ready={not_ready}")
if isinstance(summary, dict):
by_arch_counts = summary.get("by_arch")
if isinstance(by_arch_counts, dict) and by_arch_counts:
parts = [f"{arch}={count}" for arch, count in sorted(by_arch_counts.items())]
lines.append(f"- nodes_by_arch: {', '.join(parts)}")
if not_ready_names:
lines.append(f"- nodes_not_ready: {', '.join(not_ready_names)}")
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes_list = by_hardware.get(key) or []
if nodes_list:
lines.append(f"- {key}: {', '.join(nodes_list)}")
non_rpi = sorted(set(by_hardware.get("jetson", [])) | set(by_hardware.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi: {', '.join(non_rpi)}")
for key, nodes_list in sorted(by_arch.items()):
if nodes_list:
lines.append(f"- arch {key}: {', '.join(nodes_list)}")
if control_plane_nodes:
lines.append(f"- control_plane_nodes: {', '.join(control_plane_nodes)}")
if worker_nodes:
lines.append(f"- worker_nodes: {', '.join(worker_nodes)}")
if ready_workers or not_ready_workers:
lines.append(f"- workers_ready: {', '.join(ready_workers) if ready_workers else 'none'}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if expected_workers and any(word in normalize_query(prompt) for word in ("missing", "expected", "should", "not ready", "unready")):
missing = sorted(
set(expected_workers)
- {n.get("name") for n in inv if isinstance(n, dict) and n.get("name")}
)
lines.append(f"- expected_workers: {', '.join(expected_workers)}")
if missing:
lines.append(f"- expected_workers_missing: {', '.join(missing)}")
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
lines.append(f"- hottest_{key}: {node} ({value_fmt})")
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
2026-01-27 06:34:37 -03:00
if isinstance(postgres, dict) and postgres:
used = postgres.get("used")
max_conn = postgres.get("max")
if used is not None and max_conn is not None:
lines.append(f"- postgres_connections: {used} used / {max_conn} max")
2026-01-27 06:34:37 -03:00
hottest_db = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
if hottest_db.get("label"):
lines.append(
f"- postgres_hottest_db: {hottest_db.get('label')} ({hottest_db.get('value')})"
)
for key in ("pods_running", "pods_pending", "pods_failed", "pods_succeeded"):
value = metrics.get(key)
if value is not None:
lines.append(f"- {key}: {value}")
usage_table = _node_usage_table(metrics)
if usage_table:
lines.append("- node_usage (cpu/ram/net/io):")
for entry in usage_table:
node = entry.get("node")
if not node:
continue
cpu = _format_metric_value(str(entry.get("cpu")), percent=True) if entry.get("cpu") is not None else ""
ram = _format_metric_value(str(entry.get("ram")), percent=True) if entry.get("ram") is not None else ""
net = (
_format_metric_value(str(entry.get("net")), percent=False, rate=True)
if entry.get("net") is not None
else ""
)
io_val = (
_format_metric_value(str(entry.get("io")), percent=False, rate=True)
if entry.get("io") is not None
else ""
)
lines.append(f" - {node}: cpu={cpu}, ram={ram}, net={net}, io={io_val}")
if nodes_in_query:
lines.append("- node_details:")
for name in nodes_in_query:
detail = next((n for n in inv if n.get("name") == name), None)
if not detail:
lines.append(f" - {name}: not found in snapshot")
continue
roles = ",".join(detail.get("roles") or []) or "none"
lines.append(
f" - {name}: hardware={detail.get('hardware')}, arch={detail.get('arch')}, "
f"ready={detail.get('ready')}, roles={roles}"
)
workload_entries = _workloads_for_prompt(prompt, workloads or [])
if workload_entries:
lines.append("- workloads:")
for entry in workload_entries:
if not isinstance(entry, dict):
continue
ns = entry.get("namespace") or ""
wl = entry.get("workload") or ""
primary = entry.get("primary_node") or ""
pods_total = entry.get("pods_total")
label = f"{ns}/{wl}" if ns and wl else (wl or ns)
if not label:
continue
if primary:
lines.append(f" - {label}: primary_node={primary}, pods_total={pods_total}")
else:
lines.append(f" - {label}: pods_total={pods_total}")
rendered = "\n".join(lines)
return rendered[:MAX_FACTS_CHARS]
2026-01-26 19:29:26 -03:00
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
not_ready = [node["name"] for node in inventory if node.get("ready") is False]
groups = _group_nodes(inventory)
workers = [node for node in inventory if node.get("is_worker") is True]
2026-01-26 19:34:19 -03:00
worker_names = [node["name"] for node in workers]
worker_ready = [node["name"] for node in workers if node.get("ready") is True]
worker_not_ready = [node["name"] for node in workers if node.get("ready") is False]
expected_workers = expected_worker_nodes_from_metrics()
expected_ready = [n for n in expected_workers if n in ready] if expected_workers else []
expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else []
expected_missing = [n for n in expected_workers if n not in names] if expected_workers else []
2026-01-26 19:29:26 -03:00
return {
"names": sorted(names),
"ready": sorted(ready),
"not_ready": sorted(not_ready),
"groups": groups,
2026-01-26 19:34:19 -03:00
"worker_names": sorted(worker_names),
"worker_ready": sorted(worker_ready),
"worker_not_ready": sorted(worker_not_ready),
"expected_workers": expected_workers,
"expected_ready": sorted(expected_ready),
"expected_not_ready": sorted(expected_not_ready),
"expected_missing": sorted(expected_missing),
2026-01-26 19:29:26 -03:00
}
def _workload_tokens(entry: dict[str, Any]) -> set[str]:
tokens: set[str] = set()
for key in ("workload", "namespace"):
value = entry.get(key)
if isinstance(value, str) and value:
tokens.update(_tokens(value))
return tokens
def _workload_query_target(prompt: str) -> str:
tokens = set(_tokens(prompt))
matches = sorted(tokens & _NAME_INDEX) if _NAME_INDEX else []
return matches[0] if matches else ""
def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None:
q_tokens = set(_tokens(prompt))
if not q_tokens:
return None
scored: list[tuple[int, dict[str, Any]]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
tokens = _workload_tokens(entry)
score = len(tokens & q_tokens)
name = (entry.get("workload") or "").lower()
namespace = (entry.get("namespace") or "").lower()
if name and name in q_tokens:
score += 5
if namespace and namespace in q_tokens:
score += 3
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _format_confidence(answer: str, confidence: str) -> str:
if not answer:
return ""
return f"{answer}\nConfidence: {confidence}."
def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str:
q = normalize_query(prompt)
if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")):
return ""
target = _workload_query_target(prompt)
entry = _select_workload(prompt, workloads)
if not entry:
return ""
workload = entry.get("workload") or ""
namespace = entry.get("namespace") or ""
if target:
workload_l = str(workload).lower()
namespace_l = str(namespace).lower()
if workload_l != target and namespace_l == target and "namespace" not in q and "workload" not in q:
return ""
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
primary = entry.get("primary_node") or ""
if not workload or not nodes:
return ""
parts = []
if primary:
parts.append(f"{primary} (primary)")
for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])):
if node == primary:
continue
parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})")
node_text = ", ".join(parts) if parts else primary
answer = f"{workload} runs in {namespace}. Nodes: {node_text}."
return _format_confidence(answer, "medium")
def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _node_usage_top(
usage: list[dict[str, Any]],
*,
allowed_nodes: set[str] | None,
) -> tuple[str, float] | None:
best_node = ""
best_val = None
for item in usage if isinstance(usage, list) else []:
if not isinstance(item, dict):
continue
node = item.get("node") or ""
if allowed_nodes and node not in allowed_nodes:
continue
value = item.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best_val is None or numeric > best_val:
best_val = numeric
best_node = node
if best_node and best_val is not None:
return best_node, best_val
return None
def snapshot_metric_answer(
prompt: str,
*,
snapshot: dict[str, Any] | None,
inventory: list[dict[str, Any]],
) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
if not metrics:
return ""
q = normalize_query(prompt)
metric = _detect_metric(q)
op = _detect_operation(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
allowed_nodes = {node["name"] for node in filtered} if filtered else None
if metric in {"cpu", "ram", "net", "io"} and op in {"top", "status", None}:
usage = metrics.get("node_usage", {}).get(metric, [])
top = _node_usage_top(usage, allowed_nodes=allowed_nodes)
if top:
node, val = top
percent = metric in {"cpu", "ram"}
value = _format_metric_value(str(val), percent=percent, rate=metric in {"net", "io"})
scope = ""
if include_hw:
scope = f" among {' and '.join(sorted(include_hw))}"
answer = f"Hottest node{scope}: {node} ({value})."
if allowed_nodes and len(allowed_nodes) != len(inventory):
overall = _node_usage_top(usage, allowed_nodes=None)
if overall and overall[0] != node:
overall_val = _format_metric_value(
str(overall[1]),
percent=percent,
rate=metric in {"net", "io"},
)
answer += f" Overall hottest: {overall[0]} ({overall_val})."
return _format_confidence(answer, "high")
if metric == "connections" or "postgres" in q:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).")
if parts:
return _format_confidence(" ".join(parts), "high")
if metric == "pods":
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
2026-01-27 13:17:33 -03:00
if "pending" in q and pending is not None:
return _format_confidence(f"Pending pods: {pending:.0f}.", "high")
if "failed" in q and failed is not None:
return _format_confidence(f"Failed pods: {failed:.0f}.", "high")
if "succeeded" in q or "completed" in q:
if succeeded is not None:
return _format_confidence(f"Succeeded pods: {succeeded:.0f}.", "high")
if "running" in q and running is not None:
return _format_confidence(f"Running pods: {running:.0f}.", "high")
parts = []
if running is not None:
parts.append(f"running {running:.0f}")
if pending is not None:
parts.append(f"pending {pending:.0f}")
if failed is not None:
parts.append(f"failed {failed:.0f}")
if succeeded is not None:
parts.append(f"succeeded {succeeded:.0f}")
if parts:
return _format_confidence(f"Pods: {', '.join(parts)}.", "high")
return ""
def structured_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
metrics_summary: str,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
q = normalize_query(prompt)
if not q:
2026-01-26 19:29:26 -03:00
return ""
if workloads:
workload_resp = workload_answer(prompt, workloads)
if workload_resp:
return workload_resp
snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory)
if snap_resp:
return snap_resp
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
entity = _detect_entity(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
role_filters = _detect_role_filters(q)
only_ready: bool | None = None
if "not ready" in q or "unready" in q or "down" in q or "missing" in q:
only_ready = False
elif "ready" in q:
only_ready = True
if entity == "node" and only_ready is not None and op != "count":
op = "status"
if not op and entity == "node":
op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count"
if op == "top" and metric is None:
metric = "cpu"
# Metrics-first when a metric or top operation is requested.
if metric or op == "top":
entry = _select_metric_entry(tokens, metric=metric, op=op)
if entry and isinstance(entry.get("exprs"), list) and entry["exprs"]:
expr = entry["exprs"][0]
if inventory:
scoped = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
if scoped:
node_regex = "|".join([n["name"] for n in scoped])
expr = _apply_node_filter(expr, node_regex)
res = vm_query(expr, timeout=20)
answer = ""
if op == "top" or "hottest" in (entry.get("panel_title") or "").lower():
node, val = _primary_series_metric(res)
if node and val is not None:
percent = _metric_expr_uses_percent(entry)
value_fmt = _format_metric_value(val or "", percent=percent)
metric_label = (metric or "").upper()
label = f"{metric_label} node" if metric_label else "node"
answer = f"Hottest {label}: {node} ({value_fmt})."
if not answer:
answer = _format_metric_answer(entry, res)
if answer:
scope_parts: list[str] = []
if include_hw:
scope_parts.append(" and ".join(sorted(include_hw)))
if exclude_hw:
scope_parts.append(f"excluding {' and '.join(sorted(exclude_hw))}")
if only_workers:
scope_parts.append("worker")
if scope_parts:
scope = " ".join(scope_parts)
overall_note = ""
base_expr = entry["exprs"][0]
if inventory:
all_nodes = "|".join([n["name"] for n in inventory])
if all_nodes:
base_expr = _apply_node_filter(base_expr, all_nodes)
base_res = vm_query(base_expr, timeout=20)
base_node, base_val = _primary_series_metric(base_res)
scoped_node, scoped_val = _primary_series_metric(res)
if base_node and scoped_node and base_node != scoped_node:
percent = _metric_expr_uses_percent(entry)
base_val_fmt = _format_metric_value(base_val or "", percent=percent)
overall_note = f" Overall hottest node: {base_node} ({base_val_fmt})."
return _format_confidence(f"Among {scope} nodes, {answer}{overall_note}", "high")
return _format_confidence(answer, "high")
if metrics_summary:
return metrics_summary
if entity != "node" or not inventory:
if any(word in q for word in METRIC_HINT_WORDS) and not metrics_summary:
return "I don't have data to answer that right now."
return ""
expected_workers = expected_worker_nodes_from_metrics()
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=only_ready if op in ("status", "count") else None,
nodes_in_query=nodes_in_query,
)
if role_filters:
filtered = [
node
for node in filtered
if role_filters.intersection(set(node.get("roles") or []))
]
names = [node["name"] for node in filtered]
if op == "status":
if "missing" in q and expected_workers:
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
return _format_confidence(
"Missing nodes: " + (", ".join(missing) if missing else "none") + ".",
"high",
)
if only_ready is False:
return _format_confidence(
"Not ready nodes: " + (", ".join(names) if names else "none") + ".",
"high",
)
if only_ready is True:
return _format_confidence(
f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + ".",
"high",
)
if op == "count":
if expected_workers and ("expected" in q or "should" in q):
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
msg = f"Grafana inventory expects {len(expected_workers)} worker nodes."
if missing:
msg += f" Missing: {', '.join(missing)}."
return _format_confidence(msg, "high")
2026-01-27 13:17:33 -03:00
if only_ready is True:
return _format_confidence(f"Ready nodes: {len(names)}.", "high")
if only_ready is False:
return _format_confidence(f"Not ready nodes: {len(names)}.", "high")
if not (include_hw or exclude_hw or nodes_in_query or only_workers or role_filters):
return _format_confidence(f"Atlas has {len(names)} nodes.", "high")
return _format_confidence(f"Matching nodes: {len(names)}.", "high")
if op == "list":
if nodes_in_query:
parts = []
existing = {n["name"] for n in inventory}
for node in nodes_in_query:
parts.append(f"{node}: {'present' if node in existing else 'not present'}")
return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high")
if not names:
return _format_confidence("Matching nodes: none.", "high")
shown = names[:30]
suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else ""
return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high")
2026-01-26 19:34:19 -03:00
2026-01-26 19:29:26 -03:00
return ""
def _nodes_summary_line(inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None) -> str:
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
if total is None:
total = len(inventory)
ready = len([n for n in inventory if n.get("ready") is True])
not_ready = len([n for n in inventory if n.get("ready") is False])
if total is None:
return ""
if not_ready:
names = []
summary_names = summary.get("not_ready_names") if isinstance(summary, dict) else []
if isinstance(summary_names, list):
names = [name for name in summary_names if isinstance(name, str)]
if not names and snapshot:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
names = [node.get("name") for node in details if isinstance(node, dict) and node.get("ready") is False]
names = [name for name in names if isinstance(name, str) and name]
suffix = f" (not ready: {', '.join(names)})" if names else ""
return f"Atlas has {total} nodes; {ready} ready, {not_ready} not ready{suffix}."
return f"Atlas has {total} nodes and all are Ready."
def _hardware_mix_line(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
parts: list[str] = []
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"{key}={len(nodes)}")
if not parts:
return ""
return "Hardware mix includes " + ", ".join(parts) + "."
def _os_mix_line(snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
counts: dict[str, int] = collections.Counter()
for node in details:
if not isinstance(node, dict):
continue
os_name = (node.get("os") or "").strip()
if os_name:
counts[os_name] += 1
if not counts or (len(counts) == 1 and "linux" in counts):
return ""
parts = [f"{os_name}={count}" for os_name, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))]
return "OS mix: " + ", ".join(parts[:5]) + "."
def _pods_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
if running is None and pending is None and failed is None and succeeded is None:
return ""
parts: list[str] = []
if running is not None:
parts.append(f"{running:.0f} running")
if pending is not None:
parts.append(f"{pending:.0f} pending")
if failed is not None:
parts.append(f"{failed:.0f} failed")
if succeeded is not None:
parts.append(f"{succeeded:.0f} succeeded")
return "There are " + ", ".join(parts) + " pods."
def _postgres_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if not postgres:
return ""
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"{used:.0f}/{max_conn:.0f} connections")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"hottest {hottest.get('label')} ({hot_val_str})")
if not parts:
return ""
return "Postgres is at " + ", ".join(parts) + "."
def _hottest_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if not hottest:
return ""
parts: list[str] = []
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
parts.append(f"{key.upper()} {node} ({value_fmt})")
if not parts:
return ""
return "Hot spots: " + "; ".join(parts) + "."
def _is_insight_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
if any(word in q for word in _INSIGHT_HINT_WORDS):
return True
if "most" in q and any(word in q for word in ("unusual", "odd", "weird", "unconventional")):
return True
return False
def _is_overview_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
return any(word in q for word in _OVERVIEW_HINT_WORDS)
def _doc_intent(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
return any(
phrase in q
for phrase in (
"runbook",
"documentation",
"docs",
"guide",
"how do i",
"how to",
"instructions",
"playbook",
)
)
def _insight_candidates(
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
) -> list[tuple[str, str, str]]:
metrics = _snapshot_metrics(snapshot)
candidates: list[tuple[str, str, str]] = []
nodes_line = _nodes_summary_line(inventory, snapshot)
if nodes_line and "not ready" in nodes_line.lower():
candidates.append(("availability", nodes_line, "high"))
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
cpu = hottest.get("cpu") if isinstance(hottest.get("cpu"), dict) else {}
if cpu.get("node") and cpu.get("value") is not None:
value_fmt = _format_metric_value(str(cpu.get("value")), percent=True)
candidates.append(("cpu", f"The busiest CPU right now is {cpu.get('node')} at about {value_fmt}.", "high"))
ram = hottest.get("ram") if isinstance(hottest.get("ram"), dict) else {}
if ram.get("node") and ram.get("value") is not None:
value_fmt = _format_metric_value(str(ram.get("value")), percent=True)
candidates.append(("ram", f"RAM usage peaks on {ram.get('node')} at about {value_fmt}.", "high"))
postgres_line = _postgres_summary_line(metrics)
if postgres_line:
candidates.append(("postgres", postgres_line, "high"))
hardware_line = _hardware_mix_line(inventory)
if hardware_line:
candidates.append(("hardware", hardware_line, "medium"))
pods_line = _pods_summary_line(metrics)
if pods_line:
candidates.append(("pods", pods_line, "high"))
return candidates
def _select_insight(
prompt: str,
candidates: list[tuple[str, str, str]],
2026-01-27 18:17:29 -03:00
) -> tuple[str, str, str] | None:
if not candidates:
return None
q = normalize_query(prompt)
prefer_keys: list[str] = []
if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")):
prefer_keys.extend(["hardware", "availability"])
if any(word in q for word in ("another", "else", "different", "other")) and len(candidates) > 1:
2026-01-27 18:17:29 -03:00
return candidates[1]
if prefer_keys:
for key, text, conf in candidates:
if key in prefer_keys:
2026-01-27 18:17:29 -03:00
return key, text, conf
return candidates[0]
def _format_insight_text(key: str, text: str) -> str:
cleaned = text.strip().rstrip(".")
if not cleaned:
return ""
if key == "hardware":
counts = cleaned.replace("Hardware mix includes ", "")
return f"Atlas mixes Raspberry Pi, Jetson, and AMD64 nodes ({counts})."
if key == "postgres":
detail = cleaned.replace("Postgres is at ", "")
return f"Postgres looks healthy at {detail}."
if key == "pods":
detail = cleaned.replace("There are ", "")
return f"Pods look stable with {detail}."
if key == "availability":
return cleaned + "."
if key in ("cpu", "ram"):
return cleaned + "."
return cleaned + "."
def _insight_prefix(prompt: str) -> str:
q = normalize_query(prompt)
if any(word in q for word in ("another", "else", "different", "other")):
return "Another interesting detail: "
if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")):
return "What stands out is that "
if any(word in q for word in ("interesting", "notable", "fun", "cool")):
return "One notable detail: "
return ""
def cluster_overview_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
) -> str:
if not inventory and not snapshot:
return ""
q = normalize_query(prompt)
metrics = _snapshot_metrics(snapshot)
sentences: list[str] = []
nodes_line = _nodes_summary_line(inventory, snapshot)
if nodes_line:
sentences.append(nodes_line)
wants_overview = _is_overview_query(q) or any(word in q for word in ("atlas", "cluster", "titan", "lab"))
wants_hardware = any(word in q for word in ("hardware", "architecture", "nodes", "node")) or wants_overview
wants_metrics = any(
word in q
for word in (
"status",
"health",
"overview",
"summary",
"pods",
"postgres",
"connections",
"hottest",
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"busy",
"load",
"usage",
"utilization",
)
) or wants_overview
if wants_hardware:
hw_line = _hardware_mix_line(inventory)
if hw_line:
sentences.append(hw_line)
os_line = _os_mix_line(snapshot)
if os_line:
sentences.append(os_line)
if wants_metrics:
pods_line = _pods_summary_line(metrics)
if pods_line:
sentences.append(pods_line)
postgres_line = _postgres_summary_line(metrics)
if postgres_line:
sentences.append(postgres_line)
hottest_line = _hottest_summary_line(metrics)
if hottest_line:
sentences.append(hottest_line)
if not sentences:
return ""
if len(sentences) > 3 and not wants_overview:
sentences = sentences[:3]
return "Based on the latest snapshot, " + " ".join(sentences)
def cluster_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
metrics_summary = snapshot_context(prompt, snapshot)
if _is_insight_query(prompt):
candidates = _insight_candidates(inventory, snapshot)
selected = _select_insight(prompt, candidates)
if selected:
2026-01-27 18:17:29 -03:00
key, raw_text, confidence = selected
formatted = _format_insight_text(key, raw_text)
if not formatted:
formatted = raw_text
prefix = _insight_prefix(prompt)
if prefix:
formatted = prefix + formatted
return _format_confidence(formatted, confidence)
structured = structured_answer(
prompt,
inventory=inventory,
metrics_summary=metrics_summary,
snapshot=snapshot,
workloads=workloads,
)
if structured:
return structured
q = normalize_query(prompt)
workload_target = _workload_query_target(prompt)
if workload_target and any(word in q for word in ("where", "run", "running", "host", "node")):
return _format_confidence(
f"I don't have workload placement data for {workload_target} in the current snapshot.",
"low",
)
overview = cluster_overview_answer(prompt, inventory=inventory, snapshot=snapshot)
if overview:
kb_titles = kb_retrieve_titles(prompt, limit=4) if _doc_intent(prompt) else ""
if kb_titles:
overview = overview + "\n" + kb_titles
return _format_confidence(overview, "medium")
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
return _format_confidence(kb_titles, "low")
if metrics_summary:
return _format_confidence(metrics_summary, "low")
return ""
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", ""
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
return context, ""
2026-01-13 09:59:39 -03:00
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
_SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0}
def _snapshot_state() -> dict[str, Any] | None:
now = time.monotonic()
cached = _SNAPSHOT_CACHE.get("payload")
ts = _SNAPSHOT_CACHE.get("ts") or 0.0
if cached and now - ts < max(5, SNAPSHOT_TTL_SEC):
return cached
payload = _ariadne_state(timeout=10)
if isinstance(payload, dict) and payload:
_SNAPSHOT_CACHE["payload"] = payload
_SNAPSHOT_CACHE["ts"] = now
return payload
return cached if isinstance(cached, dict) else None
def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
items = snapshot.get("nodes_detail")
if not isinstance(items, list):
return []
inventory: list[dict[str, Any]] = []
for node in items:
if not isinstance(node, dict):
continue
labels = node.get("labels") if isinstance(node.get("labels"), dict) else {}
name = node.get("name") or ""
if not name:
continue
hardware = node.get("hardware") or _hardware_class(labels)
inventory.append(
{
"name": name,
"arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": hardware,
"roles": node.get("roles") or [],
"is_worker": node.get("is_worker") is True,
"ready": node.get("ready") is True,
}
)
return sorted(inventory, key=lambda item: item["name"])
def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
workloads = snapshot.get("workloads")
return workloads if isinstance(workloads, list) else []
2026-01-13 09:59:39 -03:00
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if parsed:
items = list(parsed.items())
if len(items) == 1:
label, value = items[0]
return f"{panel}: {label} = {value}."
compact = "; ".join(f"{k}={v}" for k, v in items)
return f"{panel}: {compact}."
if panel_l:
return f"{panel}: {summary}"
return summary
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]:
if inventory is None:
inventory = node_inventory()
ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True]
not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False]
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def _context_fallback(context: str) -> str:
if not context:
return ""
trimmed = context.strip()
if len(trimmed) > MAX_TOOL_CHARS:
trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..."
return "Here is what I found:\n" + trimmed
2026-01-13 09:59:39 -03:00
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
2026-01-26 00:52:35 -03:00
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return _ensure_confidence(text)
return _ensure_confidence(text)
2026-01-26 00:52:35 -03:00
2026-01-13 09:59:39 -03:00
# Internal HTTP endpoint for cluster answers (website uses this).
class _AtlasbotHandler(BaseHTTPRequestHandler):
server_version = "AtlasbotHTTP/1.0"
def _write_json(self, status: int, payload: dict[str, Any]):
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _authorized(self) -> bool:
if not ATLASBOT_INTERNAL_TOKEN:
return True
token = self.headers.get("X-Internal-Token", "")
return token == ATLASBOT_INTERNAL_TOKEN
def do_GET(self): # noqa: N802
if self.path == "/health":
self._write_json(200, {"status": "ok"})
return
self._write_json(404, {"error": "not_found"})
def do_POST(self): # noqa: N802
if self.path != "/v1/answer":
self._write_json(404, {"error": "not_found"})
return
if not self._authorized():
self._write_json(401, {"error": "unauthorized"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
length = 0
raw = self.rfile.read(length) if length > 0 else b""
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except json.JSONDecodeError:
self._write_json(400, {"error": "invalid_json"})
return
prompt = str(payload.get("prompt") or payload.get("question") or "").strip()
if not prompt:
self._write_json(400, {"error": "missing_prompt"})
return
cleaned = _strip_bot_mention(prompt)
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
context = ""
if cluster_query:
context = build_context(
cleaned,
allow_tools=False,
targets=[],
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
fallback = "I don't have enough data to answer that."
if cluster_query:
answer = cluster_answer(
cleaned,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if not answer:
answer = fallback
else:
llm_prompt = cleaned
answer = ollama_reply(
("http", "internal"),
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
self._write_json(200, {"answer": answer})
def _start_http_server():
server = HTTPServer(("0.0.0.0", ATLASBOT_HTTP_PORT), _AtlasbotHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
2026-01-13 09:59:39 -03:00
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
2026-01-26 19:29:26 -03:00
def build_context(
prompt: str,
*,
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
2026-01-26 19:29:26 -03:00
) -> str:
2026-01-13 09:59:39 -03:00
parts: list[str] = []
facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if facts:
parts.append(facts)
2026-01-13 09:59:39 -03:00
snapshot_json = snapshot_compact_context(
prompt,
snapshot,
inventory=inventory,
workloads=workloads,
)
if snapshot_json:
parts.append(snapshot_json)
2026-01-13 09:59:39 -03:00
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
kb = kb_retrieve(prompt)
if not kb and _knowledge_intent(prompt):
kb = kb_retrieve_titles(prompt, limit=4)
if kb:
parts.append(kb)
2026-01-13 09:59:39 -03:00
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
return "\n\n".join([p for p in parts if p]).strip()
def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
workloads = _snapshot_workloads(snapshot)
q = normalize_query(prompt)
parts: list[str] = []
nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {}
if nodes.get("total") is not None:
parts.append(
f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}."
)
if any(word in q for word in ("postgres", "connections", "db")):
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if postgres:
parts.append(f"Snapshot: postgres_connections={postgres}.")
if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")):
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
parts.append(f"Snapshot: hottest_nodes={hottest}.")
if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")):
match = _select_workload(prompt, workloads)
if match:
parts.append(f"Snapshot: workload={match}.")
return "\n".join(parts).strip()
def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
output: list[dict[str, Any]] = []
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not name:
continue
output.append(
{
"name": name,
"ready": node.get("ready"),
"hardware": node.get("hardware"),
"arch": node.get("arch"),
"roles": node.get("roles"),
"is_worker": node.get("is_worker"),
"os": node.get("os"),
"kernel": node.get("kernel"),
"kubelet": node.get("kubelet"),
"container_runtime": node.get("container_runtime"),
}
)
return output
def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {}
return {
"pods_running": metrics.get("pods_running"),
"pods_pending": metrics.get("pods_pending"),
"pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"),
"postgres_connections": metrics.get("postgres_connections"),
"hottest_nodes": metrics.get("hottest_nodes"),
"node_usage": metrics.get("node_usage"),
"top_restarts_1h": metrics.get("top_restarts_1h"),
}
def snapshot_compact_context(
prompt: str,
snapshot: dict[str, Any] | None,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
if not snapshot:
return ""
compact = {
"collected_at": snapshot.get("collected_at"),
"nodes_summary": snapshot.get("nodes_summary"),
"expected_workers": expected_worker_nodes_from_metrics(),
"nodes_detail": _compact_nodes_detail(snapshot),
"workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [],
"metrics": _compact_metrics(snapshot),
"flux": snapshot.get("flux"),
"errors": snapshot.get("errors"),
}
text = json.dumps(compact, ensure_ascii=False)
if len(text) > MAX_FACTS_CHARS:
text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..."
return "Cluster snapshot (JSON):\n" + text
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
return any(
phrase in q
for phrase in (
"what do you know",
"tell me about",
"interesting",
"overview",
"summary",
"describe",
"explain",
"what is",
)
)
def _is_cluster_query(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> bool:
q = normalize_query(prompt)
if not q:
return False
if TITAN_NODE_RE.search(q):
return True
if any(word in q for word in CLUSTER_HINT_WORDS):
return True
for host_match in HOST_RE.finditer(q):
host = host_match.group(1).lower()
if host.endswith("bstein.dev"):
return True
tokens = set(_tokens(q))
if _NAME_INDEX and tokens & _NAME_INDEX:
return True
return False
def _inventory_summary(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = [n for n in inventory if n.get("ready") is True]
not_ready = [n for n in inventory if n.get("ready") is False]
parts = [f"Atlas cluster: {total} nodes ({len(ready)} ready, {len(not_ready)} not ready)."]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"- {key}: {len(nodes)} nodes ({', '.join(nodes)})")
return "\n".join(parts)
def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
parts: list[str] = []
inv = _inventory_summary(inventory)
if inv:
parts.append(inv)
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
parts.append(kb_titles)
summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Use the provided context and facts as your source of truth. "
"If the context includes a cluster snapshot, treat the question as about the Atlas/Othrys cluster even if the prompt is ambiguous. "
"When a cluster snapshot is provided, never answer about unrelated meanings of 'Atlas' (maps, mythology, Apache Atlas, etc). "
2026-01-27 06:34:37 -03:00
"Treat 'hottest' as highest utilization (CPU/RAM/NET/IO) rather than temperature. "
"If you infer or synthesize, say 'Based on the snapshot' and keep it brief. "
"Prefer exact repo paths and Kubernetes resource names when relevant. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"Translate metrics into natural language instead of echoing raw label/value pairs. "
"Do not answer by only listing runbooks; if the question is about Atlas/Othrys, summarize the cluster first and mention docs only if useful. "
"If the question is not about Atlas/Othrys and no cluster context is provided, answer using general knowledge and say when you are unsure. "
"If the answer is not grounded in the provided context or tool data, say you do not know. "
"End every response with a line: 'Confidence: high|medium|low'."
)
2026-01-27 12:33:56 -03:00
endpoint = _ollama_endpoint()
if not endpoint:
raise RuntimeError("ollama endpoint missing")
messages: list[dict[str, str]] = [{"role": "system", "content": system}]
if context:
messages.append({"role": "user", "content": "Context (grounded):\n" + context[:MAX_CONTEXT_CHARS]})
if use_history:
messages.extend(_history_to_messages(history[hist_key][-24:]))
2026-01-27 12:33:56 -03:00
messages.append({"role": "user", "content": prompt})
payload = {"model": MODEL, "messages": messages, "stream": False}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
2026-01-27 12:33:56 -03:00
r = request.Request(endpoint, data=json.dumps(payload).encode(), headers=headers)
lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None
if lock:
lock.acquire()
try:
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
2026-01-27 12:33:56 -03:00
msg = data.get("message") if isinstance(data, dict) else None
if isinstance(msg, dict):
raw_reply = msg.get("content")
else:
raw_reply = data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
if use_history:
history[hist_key].append(f"Atlas: {reply}")
return reply
finally:
if lock:
lock.release()
def ollama_reply(
hist_key,
prompt: str,
*,
context: str,
fallback: str = "",
use_history: bool = True,
) -> str:
last_error = None
for attempt in range(max(1, OLLAMA_RETRIES + 1)):
try:
return _ollama_call(hist_key, prompt, context=context, use_history=use_history)
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(min(4, 2 ** attempt))
if fallback:
if use_history:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "I don't have enough data to answer that."
def ollama_reply_with_thinking(
token: str,
room: str,
hist_key,
prompt: str,
*,
context: str,
fallback: str,
use_history: bool = True,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(
hist_key,
prompt,
context=context,
fallback=fallback,
use_history=use_history,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
2026-01-13 09:59:39 -03:00
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
2026-01-26 01:07:49 -03:00
cleaned_body = _strip_bot_mention(body)
lower_body = cleaned_body.lower()
2026-01-13 09:59:39 -03:00
# Only do live cluster introspection in DMs.
2026-01-13 09:59:39 -03:00
allow_tools = is_dm
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(lower_body):
2026-01-13 09:59:39 -03:00
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
snapshot = _snapshot_state()
inventory = node_inventory_for_prompt(cleaned_body)
if not inventory:
inventory = _snapshot_inventory(snapshot)
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
context = ""
if cluster_query:
context = build_context(
cleaned_body,
allow_tools=allow_tools,
targets=targets,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
2026-01-13 09:59:39 -03:00
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
send_msg(token, rid, extra)
continue
fallback = "I don't have enough data to answer that."
if cluster_query:
reply = cluster_answer(
cleaned_body,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if not reply:
reply = fallback
else:
llm_prompt = cleaned_body
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
2026-01-13 09:59:39 -03:00
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
_start_http_server()
2026-01-13 09:59:39 -03:00
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()