2631 lines
96 KiB
Python

import collections
import json
import os
import re
import ssl
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000"))
MAX_CONTEXT_CHARS = int(os.environ.get("ATLASBOT_MAX_CONTEXT_CHARS", "12000"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2"))
OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false"
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
"system",
"systems",
"service",
"services",
"app",
"apps",
"platform",
"software",
"tool",
"tools",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
"pod",
"pods",
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE)
_DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D"
CONFIDENCE_RE = re.compile(r"confidence\s*:\s*(high|medium|low)\b", re.IGNORECASE)
OPERATION_HINTS = {
"count": ("how many", "count", "number", "total"),
"list": ("list", "which", "what are", "show", "names"),
"top": ("top", "hottest", "highest", "most", "largest", "max", "maximum"),
"status": ("ready", "not ready", "unready", "down", "missing", "status"),
}
METRIC_HINTS = {
"cpu": ("cpu",),
"ram": ("ram", "memory", "mem"),
"net": ("net", "network", "bandwidth", "throughput"),
"io": ("io", "disk", "storage"),
"connections": ("connections", "conn", "postgres", "database", "db"),
"pods": ("pods", "pod"),
}
CLUSTER_HINT_WORDS = {
"atlas",
"titan",
"cluster",
"k8s",
"kubernetes",
"node",
"nodes",
"worker",
"workers",
"pod",
"pods",
"namespace",
"service",
"deployment",
"daemonset",
"statefulset",
"grafana",
"victoria",
"prometheus",
"ariadne",
"mailu",
"nextcloud",
"vaultwarden",
"firefly",
"wger",
"jellyfin",
"planka",
"budget",
"element",
"synapse",
"mas",
"comms",
"longhorn",
"harbor",
"jenkins",
"gitea",
"flux",
"keycloak",
"postgres",
"database",
"db",
"atlasbot",
"jetson",
"rpi",
"raspberry",
"amd64",
"arm64",
}
_OLLAMA_LOCK = threading.Lock()
HARDWARE_HINTS = {
"amd64": ("amd64", "x86", "x86_64", "x86-64"),
"jetson": ("jetson",),
"rpi4": ("rpi4",),
"rpi5": ("rpi5",),
"rpi": ("rpi", "raspberry"),
"arm64": ("arm64", "aarch64"),
}
def normalize_query(text: str) -> str:
cleaned = (text or "").lower()
for ch in _DASH_CHARS:
cleaned = cleaned.replace(ch, "-")
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
def _ensure_confidence(text: str) -> str:
if not text:
return ""
lines = text.strip().splitlines()
for idx, line in enumerate(lines):
match = CONFIDENCE_RE.search(line)
if match:
level = match.group(1).lower()
lines[idx] = CONFIDENCE_RE.sub(f"Confidence: {level}", line)
return "\n".join(lines)
lines.append("Confidence: medium")
return "\n".join(lines)
def _ollama_endpoint() -> str:
url = (OLLAMA_URL or "").strip()
if not url:
return ""
if url.endswith("/api/chat"):
return url
return url.rstrip("/") + "/api/chat"
def _history_to_messages(lines: list[str]) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
for line in lines:
raw = (line or "").strip()
if not raw:
continue
role = "user"
content = raw
lowered = raw.lower()
if lowered.startswith("atlas:"):
role = "assistant"
content = raw.split(":", 1)[1].strip()
elif lowered.startswith("user:"):
role = "user"
content = raw.split(":", 1)[1].strip()
elif ":" in raw:
content = raw.split(":", 1)[1].strip()
if content:
messages.append({"role": role, "content": content})
return messages
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
if _body_mentions_token(body or ""):
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
def _strip_bot_mention(text: str) -> str:
if not text:
return ""
if not MENTION_LOCALPARTS:
return text.strip()
names = [re.escape(name) for name in MENTION_LOCALPARTS if name]
if not names:
return text.strip()
pattern = r"^(?:\s*@?(?:" + "|".join(names) + r")(?::)?\s+)+"
cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE).strip()
return cleaned or text.strip()
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
def _score_kb_docs(query: str) -> list[dict[str, Any]]:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return []
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return []
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [d for _, d in scored]
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q:
return ""
scored = _score_kb_docs(q)
picked = scored[:limit]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def kb_retrieve_titles(query: str, *, limit: int = 4) -> str:
scored = _score_kb_docs(query)
picked = scored[:limit]
if not picked:
return ""
parts = ["Relevant runbooks:"]
for doc in picked:
title = doc.get("title") or doc.get("path") or "runbook"
path = doc.get("path") or ""
if path:
parts.append(f"- {title} ({path})")
else:
parts.append(f"- {title}")
return "\n".join(parts)
def _extract_titan_nodes(text: str) -> list[str]:
cleaned = normalize_query(text)
names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(cleaned):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _humanize_rate(value: str, *, unit: str) -> str:
try:
val = float(value)
except (TypeError, ValueError):
return value
if unit == "%":
return f"{val:.1f}%"
if val >= 1024 * 1024:
return f"{val / (1024 * 1024):.2f} MB/s"
if val >= 1024:
return f"{val / 1024:.2f} KB/s"
return f"{val:.2f} B/s"
def _has_any(text: str, phrases: tuple[str, ...]) -> bool:
return any(p in text for p in phrases)
def _detect_operation(q: str) -> str | None:
if _has_any(q, OPERATION_HINTS["top"]):
return "top"
for op, phrases in OPERATION_HINTS.items():
if op == "top":
continue
if _has_any(q, phrases):
return op
return None
def _detect_metric(q: str) -> str | None:
tokens = set(_tokens(q))
for metric, phrases in METRIC_HINTS.items():
for phrase in phrases:
if " " in phrase:
if phrase in q:
return metric
elif phrase in tokens:
return metric
return None
def _detect_hardware_filters(q: str) -> tuple[set[str], set[str]]:
include: set[str] = set()
exclude: set[str] = set()
rpi_specific = "rpi4" in q or "rpi5" in q
for hardware, phrases in HARDWARE_HINTS.items():
if hardware == "rpi" and rpi_specific:
continue
for phrase in phrases:
if f"non {phrase}" in q or f"non-{phrase}" in q or f"not {phrase}" in q:
exclude.add(hardware)
elif phrase in q:
include.add(hardware)
return include, exclude
def _detect_role_filters(q: str) -> set[str]:
roles: set[str] = set()
if "control-plane" in q or "control plane" in q:
roles.add("control-plane")
if "master" in q:
roles.add("master")
if "accelerator" in q:
roles.add("accelerator")
return roles
def _detect_entity(q: str) -> str | None:
if "node" in q or "nodes" in q or "worker" in q or "hardware" in q or "architecture" in q or TITAN_NODE_RE.search(q):
return "node"
if "pod" in q or "pods" in q:
return "pod"
if "namespace" in q or "namespaces" in q:
return "namespace"
return None
def _metric_entry_score(entry: dict[str, Any], tokens: list[str], *, metric: str | None, op: str | None) -> int:
hay = _metric_tokens(entry)
score = 0
for t in set(tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if metric:
for phrase in METRIC_HINTS.get(metric, (metric,)):
if phrase in hay:
score += 3
if op == "top" and ("hottest" in hay or "top" in hay):
score += 3
if "node" in hay:
score += 1
return score
def _select_metric_entry(tokens: list[str], *, metric: str | None, op: str | None) -> dict[str, Any] | None:
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
score = _metric_entry_score(entry, tokens, metric=metric, op=op)
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _apply_node_filter(expr: str, node_regex: str | None) -> str:
if not node_regex:
return expr
needle = 'node_uname_info{nodename!=""}'
replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}'
return expr.replace(needle, replacement)
def _metric_expr_uses_percent(entry: dict[str, Any]) -> bool:
exprs = entry.get("exprs")
expr = exprs[0] if isinstance(exprs, list) and exprs else ""
return "* 100" in expr or "*100" in expr
def _format_metric_value(value: str, *, percent: bool, rate: bool = False) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return value
if percent:
return f"{num:.1f}%"
if rate:
return _humanize_rate(value, unit="rate")
if abs(num) >= 1:
return f"{num:.2f}".rstrip("0").rstrip(".")
return f"{num:.4f}".rstrip("0").rstrip(".")
def _format_metric_label(metric: dict[str, Any]) -> str:
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts:
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
return ", ".join(label_parts) if label_parts else "series"
def _primary_series_metric(res: dict | None) -> tuple[str | None, str | None]:
series = _vm_value_series(res or {})
if not series:
return (None, None)
first = series[0]
metric = first.get("metric") if isinstance(first, dict) else {}
value = first.get("value") if isinstance(first, dict) else []
node = metric.get("node") if isinstance(metric, dict) else None
val = value[1] if isinstance(value, list) and len(value) > 1 else None
return (node, val)
def _format_metric_answer(entry: dict[str, Any], res: dict | None) -> str:
series = _vm_value_series(res)
panel = entry.get("panel_title") or "Metric"
if not series:
return ""
percent = _metric_expr_uses_percent(entry)
lines: list[str] = []
for r in series[:5]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
label = _format_metric_label(metric if isinstance(metric, dict) else {})
lines.append(f"{label}: {_format_metric_value(val, percent=percent)}")
if not lines:
return ""
if len(lines) == 1:
return f"{panel}: {lines[0]}."
return f"{panel}:\n" + "\n".join(f"- {line}" for line in lines)
def _inventory_filter(
inventory: list[dict[str, Any]],
*,
include_hw: set[str],
exclude_hw: set[str],
only_workers: bool,
only_ready: bool | None,
nodes_in_query: list[str],
) -> list[dict[str, Any]]:
results = inventory
if nodes_in_query:
results = [node for node in results if node.get("name") in nodes_in_query]
if only_workers:
results = [node for node in results if node.get("is_worker") is True]
if only_ready is True:
results = [node for node in results if node.get("ready") is True]
if only_ready is False:
results = [node for node in results if node.get("ready") is False]
if include_hw:
results = [node for node in results if _hardware_match(node, include_hw)]
if exclude_hw:
results = [node for node in results if not _hardware_match(node, exclude_hw)]
return results
def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool:
hw = node.get("hardware") or ""
arch = node.get("arch") or ""
for f in filters:
if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"):
return True
if f == "arm64" and arch == "arm64":
return True
if hw == f:
return True
if f == "amd64" and arch == "amd64":
return True
return False
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5", "rpi"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
return "amd64"
if arch == "arm64":
return "arm64-unknown"
return "unknown"
def node_inventory_live() -> list[dict[str, Any]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return []
items = data.get("items") or []
inventory: list[dict[str, Any]] = []
for node in items if isinstance(items, list) else []:
meta = node.get("metadata") or {}
labels = meta.get("labels") or {}
name = meta.get("name") or ""
if not name:
continue
inventory.append(
{
"name": name,
"arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": _hardware_class(labels),
"roles": _node_roles(labels),
"is_worker": _node_is_worker(node),
"ready": _node_ready_status(node),
}
)
return sorted(inventory, key=lambda item: item["name"])
def node_inventory() -> list[dict[str, Any]]:
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot)
if inventory:
return inventory
return node_inventory_live()
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[node.get("hardware") or "unknown"].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str:
q = normalize_query(query)
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
if inventory is None:
inventory = node_inventory()
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = sum(1 for node in inventory if node.get("ready") is True)
not_ready = sum(1 for node in inventory if node.get("ready") is False)
lines: list[str] = [
"Node inventory (live):",
f"- total: {total}, ready: {ready}, not ready: {not_ready}",
]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
if key in groups:
lines.append(f"- {key}: {', '.join(groups[key])}")
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}")
unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", [])
if unknowns:
lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.")
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_workers, not_ready_workers = worker_nodes_status()
missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers))
lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}")
lines.append(f"- workers_ready: {', '.join(ready_workers)}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if missing:
lines.append(f"- workers_missing (derived): {', '.join(missing)}")
return "\n".join(lines)
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = normalize_query(prompt)
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory()
return []
def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[(node.get("arch") or "unknown")].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]:
usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
per_node: dict[str, dict[str, Any]] = {}
for metric_name, entries in usage.items() if isinstance(usage, dict) else []:
if not isinstance(entries, list):
continue
for entry in entries:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
per_node.setdefault(node, {})[metric_name] = entry.get("value")
return [{"node": node, **vals} for node, vals in sorted(per_node.items())]
def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]:
cleaned: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
cleaned.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes": entry.get("nodes"),
}
)
cleaned.sort(
key=lambda item: (
-(item.get("pods_total") or 0),
str(item.get("namespace") or ""),
str(item.get("workload") or ""),
)
)
return cleaned[:limit]
def _workloads_for_prompt(prompt: str, workloads: list[dict[str, Any]], limit: int = 12) -> list[dict[str, Any]]:
tokens = set(_tokens(prompt))
if tokens:
matched: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
entry_tokens = _workload_tokens(entry)
if entry_tokens & tokens:
matched.append(entry)
if matched:
return _workloads_for_facts(matched, limit=limit)
return _workloads_for_facts(workloads, limit=limit)
def facts_context(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
inv = inventory or []
nodes_in_query = _extract_titan_nodes(prompt)
metrics = _snapshot_metrics(snapshot)
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
expected_workers = expected_worker_nodes_from_metrics()
ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], [])
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
not_ready_names = summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names")
by_hardware = _group_nodes(inv) if inv else {}
by_arch = _nodes_by_arch(inv) if inv else {}
control_plane_nodes = [
node["name"]
for node in inv
if any(role in ("control-plane", "master") for role in (node.get("roles") or []))
]
worker_nodes = [node["name"] for node in inv if node.get("is_worker") is True]
lines: list[str] = ["Facts (live snapshot):"]
if total is not None:
lines.append(f"- nodes_total={total}, ready={ready}, not_ready={not_ready}")
if isinstance(summary, dict):
by_arch_counts = summary.get("by_arch")
if isinstance(by_arch_counts, dict) and by_arch_counts:
parts = [f"{arch}={count}" for arch, count in sorted(by_arch_counts.items())]
lines.append(f"- nodes_by_arch: {', '.join(parts)}")
if not_ready_names:
lines.append(f"- nodes_not_ready: {', '.join(not_ready_names)}")
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes_list = by_hardware.get(key) or []
if nodes_list:
lines.append(f"- {key}: {', '.join(nodes_list)}")
non_rpi = sorted(set(by_hardware.get("jetson", [])) | set(by_hardware.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi: {', '.join(non_rpi)}")
for key, nodes_list in sorted(by_arch.items()):
if nodes_list:
lines.append(f"- arch {key}: {', '.join(nodes_list)}")
if control_plane_nodes:
lines.append(f"- control_plane_nodes: {', '.join(control_plane_nodes)}")
if worker_nodes:
lines.append(f"- worker_nodes: {', '.join(worker_nodes)}")
if ready_workers or not_ready_workers:
lines.append(f"- workers_ready: {', '.join(ready_workers) if ready_workers else 'none'}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if expected_workers and any(word in normalize_query(prompt) for word in ("missing", "expected", "should", "not ready", "unready")):
missing = sorted(
set(expected_workers)
- {n.get("name") for n in inv if isinstance(n, dict) and n.get("name")}
)
lines.append(f"- expected_workers: {', '.join(expected_workers)}")
if missing:
lines.append(f"- expected_workers_missing: {', '.join(missing)}")
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
lines.append(f"- hottest_{key}: {node} ({value_fmt})")
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if isinstance(postgres, dict) and postgres:
used = postgres.get("used")
max_conn = postgres.get("max")
if used is not None and max_conn is not None:
lines.append(f"- postgres_connections: {used} used / {max_conn} max")
hottest_db = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
if hottest_db.get("label"):
lines.append(
f"- postgres_hottest_db: {hottest_db.get('label')} ({hottest_db.get('value')})"
)
for key in ("pods_running", "pods_pending", "pods_failed", "pods_succeeded"):
value = metrics.get(key)
if value is not None:
lines.append(f"- {key}: {value}")
usage_table = _node_usage_table(metrics)
if usage_table:
lines.append("- node_usage (cpu/ram/net/io):")
for entry in usage_table:
node = entry.get("node")
if not node:
continue
cpu = _format_metric_value(str(entry.get("cpu")), percent=True) if entry.get("cpu") is not None else ""
ram = _format_metric_value(str(entry.get("ram")), percent=True) if entry.get("ram") is not None else ""
net = (
_format_metric_value(str(entry.get("net")), percent=False, rate=True)
if entry.get("net") is not None
else ""
)
io_val = (
_format_metric_value(str(entry.get("io")), percent=False, rate=True)
if entry.get("io") is not None
else ""
)
lines.append(f" - {node}: cpu={cpu}, ram={ram}, net={net}, io={io_val}")
if nodes_in_query:
lines.append("- node_details:")
for name in nodes_in_query:
detail = next((n for n in inv if n.get("name") == name), None)
if not detail:
lines.append(f" - {name}: not found in snapshot")
continue
roles = ",".join(detail.get("roles") or []) or "none"
lines.append(
f" - {name}: hardware={detail.get('hardware')}, arch={detail.get('arch')}, "
f"ready={detail.get('ready')}, roles={roles}"
)
workload_entries = _workloads_for_prompt(prompt, workloads or [])
if workload_entries:
lines.append("- workloads:")
for entry in workload_entries:
if not isinstance(entry, dict):
continue
ns = entry.get("namespace") or ""
wl = entry.get("workload") or ""
primary = entry.get("primary_node") or ""
pods_total = entry.get("pods_total")
label = f"{ns}/{wl}" if ns and wl else (wl or ns)
if not label:
continue
if primary:
lines.append(f" - {label}: primary_node={primary}, pods_total={pods_total}")
else:
lines.append(f" - {label}: pods_total={pods_total}")
rendered = "\n".join(lines)
return rendered[:MAX_FACTS_CHARS]
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
not_ready = [node["name"] for node in inventory if node.get("ready") is False]
groups = _group_nodes(inventory)
workers = [node for node in inventory if node.get("is_worker") is True]
worker_names = [node["name"] for node in workers]
worker_ready = [node["name"] for node in workers if node.get("ready") is True]
worker_not_ready = [node["name"] for node in workers if node.get("ready") is False]
expected_workers = expected_worker_nodes_from_metrics()
expected_ready = [n for n in expected_workers if n in ready] if expected_workers else []
expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else []
expected_missing = [n for n in expected_workers if n not in names] if expected_workers else []
return {
"names": sorted(names),
"ready": sorted(ready),
"not_ready": sorted(not_ready),
"groups": groups,
"worker_names": sorted(worker_names),
"worker_ready": sorted(worker_ready),
"worker_not_ready": sorted(worker_not_ready),
"expected_workers": expected_workers,
"expected_ready": sorted(expected_ready),
"expected_not_ready": sorted(expected_not_ready),
"expected_missing": sorted(expected_missing),
}
def _workload_tokens(entry: dict[str, Any]) -> set[str]:
tokens: set[str] = set()
for key in ("workload", "namespace"):
value = entry.get(key)
if isinstance(value, str) and value:
tokens.update(_tokens(value))
return tokens
def _workload_query_target(prompt: str) -> str:
tokens = set(_tokens(prompt))
matches = sorted(tokens & _NAME_INDEX) if _NAME_INDEX else []
return matches[0] if matches else ""
def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None:
q_tokens = set(_tokens(prompt))
if not q_tokens:
return None
scored: list[tuple[int, dict[str, Any]]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
tokens = _workload_tokens(entry)
score = len(tokens & q_tokens)
name = (entry.get("workload") or "").lower()
namespace = (entry.get("namespace") or "").lower()
if name and name in q_tokens:
score += 5
if namespace and namespace in q_tokens:
score += 3
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _format_confidence(answer: str, confidence: str) -> str:
if not answer:
return ""
return f"{answer}\nConfidence: {confidence}."
def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str:
q = normalize_query(prompt)
if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")):
return ""
target = _workload_query_target(prompt)
entry = _select_workload(prompt, workloads)
if not entry:
return ""
workload = entry.get("workload") or ""
namespace = entry.get("namespace") or ""
if target:
workload_l = str(workload).lower()
namespace_l = str(namespace).lower()
if workload_l != target and namespace_l == target and "namespace" not in q and "workload" not in q:
return ""
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
primary = entry.get("primary_node") or ""
if not workload or not nodes:
return ""
parts = []
if primary:
parts.append(f"{primary} (primary)")
for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])):
if node == primary:
continue
parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})")
node_text = ", ".join(parts) if parts else primary
answer = f"{workload} runs in {namespace}. Nodes: {node_text}."
return _format_confidence(answer, "medium")
def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _node_usage_top(
usage: list[dict[str, Any]],
*,
allowed_nodes: set[str] | None,
) -> tuple[str, float] | None:
best_node = ""
best_val = None
for item in usage if isinstance(usage, list) else []:
if not isinstance(item, dict):
continue
node = item.get("node") or ""
if allowed_nodes and node not in allowed_nodes:
continue
value = item.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best_val is None or numeric > best_val:
best_val = numeric
best_node = node
if best_node and best_val is not None:
return best_node, best_val
return None
def snapshot_metric_answer(
prompt: str,
*,
snapshot: dict[str, Any] | None,
inventory: list[dict[str, Any]],
) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
if not metrics:
return ""
q = normalize_query(prompt)
metric = _detect_metric(q)
op = _detect_operation(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
allowed_nodes = {node["name"] for node in filtered} if filtered else None
if metric in {"cpu", "ram", "net", "io"} and op in {"top", "status", None}:
usage = metrics.get("node_usage", {}).get(metric, [])
top = _node_usage_top(usage, allowed_nodes=allowed_nodes)
if top:
node, val = top
percent = metric in {"cpu", "ram"}
value = _format_metric_value(str(val), percent=percent, rate=metric in {"net", "io"})
scope = ""
if include_hw:
scope = f" among {' and '.join(sorted(include_hw))}"
answer = f"Hottest node{scope}: {node} ({value})."
if allowed_nodes and len(allowed_nodes) != len(inventory):
overall = _node_usage_top(usage, allowed_nodes=None)
if overall and overall[0] != node:
overall_val = _format_metric_value(
str(overall[1]),
percent=percent,
rate=metric in {"net", "io"},
)
answer += f" Overall hottest: {overall[0]} ({overall_val})."
return _format_confidence(answer, "high")
if metric == "connections" or "postgres" in q:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).")
if parts:
return _format_confidence(" ".join(parts), "high")
if metric == "pods":
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
if "pending" in q and pending is not None:
return _format_confidence(f"Pending pods: {pending:.0f}.", "high")
if "failed" in q and failed is not None:
return _format_confidence(f"Failed pods: {failed:.0f}.", "high")
if "succeeded" in q or "completed" in q:
if succeeded is not None:
return _format_confidence(f"Succeeded pods: {succeeded:.0f}.", "high")
if "running" in q and running is not None:
return _format_confidence(f"Running pods: {running:.0f}.", "high")
parts = []
if running is not None:
parts.append(f"running {running:.0f}")
if pending is not None:
parts.append(f"pending {pending:.0f}")
if failed is not None:
parts.append(f"failed {failed:.0f}")
if succeeded is not None:
parts.append(f"succeeded {succeeded:.0f}")
if parts:
return _format_confidence(f"Pods: {', '.join(parts)}.", "high")
return ""
def structured_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
metrics_summary: str,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
q = normalize_query(prompt)
if not q:
return ""
if workloads:
workload_resp = workload_answer(prompt, workloads)
if workload_resp:
return workload_resp
snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory)
if snap_resp:
return snap_resp
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
entity = _detect_entity(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
role_filters = _detect_role_filters(q)
only_ready: bool | None = None
if "not ready" in q or "unready" in q or "down" in q or "missing" in q:
only_ready = False
elif "ready" in q:
only_ready = True
if entity == "node" and only_ready is not None and op != "count":
op = "status"
if not op and entity == "node":
op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count"
if op == "top" and metric is None:
metric = "cpu"
# Metrics-first when a metric or top operation is requested.
if metric or op == "top":
entry = _select_metric_entry(tokens, metric=metric, op=op)
if entry and isinstance(entry.get("exprs"), list) and entry["exprs"]:
expr = entry["exprs"][0]
if inventory:
scoped = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
if scoped:
node_regex = "|".join([n["name"] for n in scoped])
expr = _apply_node_filter(expr, node_regex)
res = vm_query(expr, timeout=20)
answer = ""
if op == "top" or "hottest" in (entry.get("panel_title") or "").lower():
node, val = _primary_series_metric(res)
if node and val is not None:
percent = _metric_expr_uses_percent(entry)
value_fmt = _format_metric_value(val or "", percent=percent)
metric_label = (metric or "").upper()
label = f"{metric_label} node" if metric_label else "node"
answer = f"Hottest {label}: {node} ({value_fmt})."
if not answer:
answer = _format_metric_answer(entry, res)
if answer:
scope_parts: list[str] = []
if include_hw:
scope_parts.append(" and ".join(sorted(include_hw)))
if exclude_hw:
scope_parts.append(f"excluding {' and '.join(sorted(exclude_hw))}")
if only_workers:
scope_parts.append("worker")
if scope_parts:
scope = " ".join(scope_parts)
overall_note = ""
base_expr = entry["exprs"][0]
if inventory:
all_nodes = "|".join([n["name"] for n in inventory])
if all_nodes:
base_expr = _apply_node_filter(base_expr, all_nodes)
base_res = vm_query(base_expr, timeout=20)
base_node, base_val = _primary_series_metric(base_res)
scoped_node, scoped_val = _primary_series_metric(res)
if base_node and scoped_node and base_node != scoped_node:
percent = _metric_expr_uses_percent(entry)
base_val_fmt = _format_metric_value(base_val or "", percent=percent)
overall_note = f" Overall hottest node: {base_node} ({base_val_fmt})."
return _format_confidence(f"Among {scope} nodes, {answer}{overall_note}", "high")
return _format_confidence(answer, "high")
if metrics_summary:
return metrics_summary
if entity != "node" or not inventory:
if any(word in q for word in METRIC_HINT_WORDS) and not metrics_summary:
return "I don't have data to answer that right now."
return ""
expected_workers = expected_worker_nodes_from_metrics()
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=only_ready if op in ("status", "count") else None,
nodes_in_query=nodes_in_query,
)
if role_filters:
filtered = [
node
for node in filtered
if role_filters.intersection(set(node.get("roles") or []))
]
names = [node["name"] for node in filtered]
if op == "status":
if "missing" in q and expected_workers:
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
return _format_confidence(
"Missing nodes: " + (", ".join(missing) if missing else "none") + ".",
"high",
)
if only_ready is False:
return _format_confidence(
"Not ready nodes: " + (", ".join(names) if names else "none") + ".",
"high",
)
if only_ready is True:
return _format_confidence(
f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + ".",
"high",
)
if op == "count":
if expected_workers and ("expected" in q or "should" in q):
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
msg = f"Grafana inventory expects {len(expected_workers)} worker nodes."
if missing:
msg += f" Missing: {', '.join(missing)}."
return _format_confidence(msg, "high")
if only_ready is True:
return _format_confidence(f"Ready nodes: {len(names)}.", "high")
if only_ready is False:
return _format_confidence(f"Not ready nodes: {len(names)}.", "high")
if not (include_hw or exclude_hw or nodes_in_query or only_workers or role_filters):
return _format_confidence(f"Atlas has {len(names)} nodes.", "high")
return _format_confidence(f"Matching nodes: {len(names)}.", "high")
if op == "list":
if nodes_in_query:
parts = []
existing = {n["name"] for n in inventory}
for node in nodes_in_query:
parts.append(f"{node}: {'present' if node in existing else 'not present'}")
return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high")
if not names:
return _format_confidence("Matching nodes: none.", "high")
shown = names[:30]
suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else ""
return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high")
return ""
def _nodes_summary_line(inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None) -> str:
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
if total is None:
total = len(inventory)
ready = len([n for n in inventory if n.get("ready") is True])
not_ready = len([n for n in inventory if n.get("ready") is False])
if total is None:
return ""
return f"Atlas cluster has {total} nodes ({ready} ready, {not_ready} not ready)."
def _hardware_mix_line(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
parts: list[str] = []
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"{key}={len(nodes)}")
if not parts:
return ""
return "Hardware mix: " + ", ".join(parts) + "."
def _os_mix_line(snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
counts: dict[str, int] = collections.Counter()
for node in details:
if not isinstance(node, dict):
continue
os_name = (node.get("os") or "").strip()
if os_name:
counts[os_name] += 1
if not counts or (len(counts) == 1 and "linux" in counts):
return ""
parts = [f"{os_name}={count}" for os_name, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))]
return "OS mix: " + ", ".join(parts[:5]) + "."
def _pods_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
parts: list[str] = []
if running is not None:
parts.append(f"{running:.0f} running")
if pending is not None:
parts.append(f"{pending:.0f} pending")
if failed is not None:
parts.append(f"{failed:.0f} failed")
if succeeded is not None:
parts.append(f"{succeeded:.0f} succeeded")
if not parts:
return ""
return "Pods: " + ", ".join(parts) + "."
def _postgres_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if not postgres:
return ""
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"{used:.0f}/{max_conn:.0f} connections")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"hottest {hottest.get('label')} ({hot_val_str})")
if not parts:
return ""
return "Postgres: " + ", ".join(parts) + "."
def _hottest_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if not hottest:
return ""
parts: list[str] = []
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
parts.append(f"{key.upper()} {node} ({value_fmt})")
if not parts:
return ""
return "Hottest nodes: " + "; ".join(parts) + "."
def cluster_overview_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
) -> str:
if not inventory and not snapshot:
return ""
q = normalize_query(prompt)
metrics = _snapshot_metrics(snapshot)
lines: list[str] = []
nodes_line = _nodes_summary_line(inventory, snapshot)
if nodes_line:
lines.append(nodes_line)
if any(word in q for word in ("hardware", "architecture", "nodes", "node", "cluster", "atlas", "titan", "lab")):
hw_line = _hardware_mix_line(inventory)
if hw_line:
lines.append(hw_line)
os_line = _os_mix_line(snapshot)
if os_line:
lines.append(os_line)
if any(
word in q
for word in (
"interesting",
"status",
"health",
"overview",
"summary",
"tell me",
"what do you know",
"about",
"pods",
"postgres",
"connections",
"hottest",
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"busy",
"load",
"usage",
"utilization",
)
):
pods_line = _pods_summary_line(metrics)
if pods_line:
lines.append(pods_line)
hottest_line = _hottest_summary_line(metrics)
if hottest_line:
lines.append(hottest_line)
postgres_line = _postgres_summary_line(metrics)
if postgres_line:
lines.append(postgres_line)
if not lines:
return ""
return "Based on the snapshot, " + "\n".join(lines)
def cluster_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
metrics_summary = snapshot_context(prompt, snapshot)
structured = structured_answer(
prompt,
inventory=inventory,
metrics_summary=metrics_summary,
snapshot=snapshot,
workloads=workloads,
)
if structured:
return structured
q = normalize_query(prompt)
workload_target = _workload_query_target(prompt)
if workload_target and any(word in q for word in ("where", "run", "running", "host", "node")):
return _format_confidence(
f"I don't have workload placement data for {workload_target} in the current snapshot.",
"low",
)
overview = cluster_overview_answer(prompt, inventory=inventory, snapshot=snapshot)
if overview:
kb_titles = kb_retrieve_titles(prompt, limit=4) if _knowledge_intent(prompt) else ""
if kb_titles:
overview = overview + "\n" + kb_titles
return _format_confidence(overview, "medium")
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
return _format_confidence(kb_titles, "low")
if metrics_summary:
return _format_confidence(metrics_summary, "low")
return ""
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", ""
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
return context, ""
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
_SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0}
def _snapshot_state() -> dict[str, Any] | None:
now = time.monotonic()
cached = _SNAPSHOT_CACHE.get("payload")
ts = _SNAPSHOT_CACHE.get("ts") or 0.0
if cached and now - ts < max(5, SNAPSHOT_TTL_SEC):
return cached
payload = _ariadne_state(timeout=10)
if isinstance(payload, dict) and payload:
_SNAPSHOT_CACHE["payload"] = payload
_SNAPSHOT_CACHE["ts"] = now
return payload
return cached if isinstance(cached, dict) else None
def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
items = snapshot.get("nodes_detail")
if not isinstance(items, list):
return []
inventory: list[dict[str, Any]] = []
for node in items:
if not isinstance(node, dict):
continue
labels = node.get("labels") if isinstance(node.get("labels"), dict) else {}
name = node.get("name") or ""
if not name:
continue
hardware = node.get("hardware") or _hardware_class(labels)
inventory.append(
{
"name": name,
"arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": hardware,
"roles": node.get("roles") or [],
"is_worker": node.get("is_worker") is True,
"ready": node.get("ready") is True,
}
)
return sorted(inventory, key=lambda item: item["name"])
def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
workloads = snapshot.get("workloads")
return workloads if isinstance(workloads, list) else []
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if parsed:
items = list(parsed.items())
if len(items) == 1:
label, value = items[0]
return f"{panel}: {label} = {value}."
compact = "; ".join(f"{k}={v}" for k, v in items)
return f"{panel}: {compact}."
if panel_l:
return f"{panel}: {summary}"
return summary
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]:
if inventory is None:
inventory = node_inventory()
ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True]
not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False]
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def _context_fallback(context: str) -> str:
if not context:
return ""
trimmed = context.strip()
if len(trimmed) > MAX_TOOL_CHARS:
trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..."
return "Here is what I found:\n" + trimmed
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return _ensure_confidence(text)
return _ensure_confidence(text)
# Internal HTTP endpoint for cluster answers (website uses this).
class _AtlasbotHandler(BaseHTTPRequestHandler):
server_version = "AtlasbotHTTP/1.0"
def _write_json(self, status: int, payload: dict[str, Any]):
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _authorized(self) -> bool:
if not ATLASBOT_INTERNAL_TOKEN:
return True
token = self.headers.get("X-Internal-Token", "")
return token == ATLASBOT_INTERNAL_TOKEN
def do_GET(self): # noqa: N802
if self.path == "/health":
self._write_json(200, {"status": "ok"})
return
self._write_json(404, {"error": "not_found"})
def do_POST(self): # noqa: N802
if self.path != "/v1/answer":
self._write_json(404, {"error": "not_found"})
return
if not self._authorized():
self._write_json(401, {"error": "unauthorized"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
length = 0
raw = self.rfile.read(length) if length > 0 else b""
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except json.JSONDecodeError:
self._write_json(400, {"error": "invalid_json"})
return
prompt = str(payload.get("prompt") or payload.get("question") or "").strip()
if not prompt:
self._write_json(400, {"error": "missing_prompt"})
return
cleaned = _strip_bot_mention(prompt)
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
context = ""
if cluster_query:
context = build_context(
cleaned,
allow_tools=False,
targets=[],
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
fallback = "I don't have enough data to answer that."
if cluster_query:
answer = cluster_answer(
cleaned,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if not answer:
answer = fallback
else:
llm_prompt = cleaned
answer = ollama_reply(
("http", "internal"),
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
self._write_json(200, {"answer": answer})
def _start_http_server():
server = HTTPServer(("0.0.0.0", ATLASBOT_HTTP_PORT), _AtlasbotHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def build_context(
prompt: str,
*,
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
parts: list[str] = []
facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if facts:
parts.append(facts)
snapshot_json = snapshot_compact_context(
prompt,
snapshot,
inventory=inventory,
workloads=workloads,
)
if snapshot_json:
parts.append(snapshot_json)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
kb = kb_retrieve(prompt)
if not kb and _knowledge_intent(prompt):
kb = kb_retrieve_titles(prompt, limit=4)
if kb:
parts.append(kb)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
return "\n\n".join([p for p in parts if p]).strip()
def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
workloads = _snapshot_workloads(snapshot)
q = normalize_query(prompt)
parts: list[str] = []
nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {}
if nodes.get("total") is not None:
parts.append(
f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}."
)
if any(word in q for word in ("postgres", "connections", "db")):
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if postgres:
parts.append(f"Snapshot: postgres_connections={postgres}.")
if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")):
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
parts.append(f"Snapshot: hottest_nodes={hottest}.")
if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")):
match = _select_workload(prompt, workloads)
if match:
parts.append(f"Snapshot: workload={match}.")
return "\n".join(parts).strip()
def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
output: list[dict[str, Any]] = []
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not name:
continue
output.append(
{
"name": name,
"ready": node.get("ready"),
"hardware": node.get("hardware"),
"arch": node.get("arch"),
"roles": node.get("roles"),
"is_worker": node.get("is_worker"),
"os": node.get("os"),
"kernel": node.get("kernel"),
"kubelet": node.get("kubelet"),
"container_runtime": node.get("container_runtime"),
}
)
return output
def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {}
return {
"pods_running": metrics.get("pods_running"),
"pods_pending": metrics.get("pods_pending"),
"pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"),
"postgres_connections": metrics.get("postgres_connections"),
"hottest_nodes": metrics.get("hottest_nodes"),
"node_usage": metrics.get("node_usage"),
"top_restarts_1h": metrics.get("top_restarts_1h"),
}
def snapshot_compact_context(
prompt: str,
snapshot: dict[str, Any] | None,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
if not snapshot:
return ""
compact = {
"collected_at": snapshot.get("collected_at"),
"nodes_summary": snapshot.get("nodes_summary"),
"expected_workers": expected_worker_nodes_from_metrics(),
"nodes_detail": _compact_nodes_detail(snapshot),
"workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [],
"metrics": _compact_metrics(snapshot),
"flux": snapshot.get("flux"),
"errors": snapshot.get("errors"),
}
text = json.dumps(compact, ensure_ascii=False)
if len(text) > MAX_FACTS_CHARS:
text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..."
return "Cluster snapshot (JSON):\n" + text
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
return any(
phrase in q
for phrase in (
"what do you know",
"tell me about",
"interesting",
"overview",
"summary",
"describe",
"explain",
"what is",
)
)
def _is_cluster_query(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> bool:
q = normalize_query(prompt)
if not q:
return False
if TITAN_NODE_RE.search(q):
return True
if any(word in q for word in CLUSTER_HINT_WORDS):
return True
for host_match in HOST_RE.finditer(q):
host = host_match.group(1).lower()
if host.endswith("bstein.dev"):
return True
tokens = set(_tokens(q))
if _NAME_INDEX and tokens & _NAME_INDEX:
return True
return False
def _inventory_summary(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = [n for n in inventory if n.get("ready") is True]
not_ready = [n for n in inventory if n.get("ready") is False]
parts = [f"Atlas cluster: {total} nodes ({len(ready)} ready, {len(not_ready)} not ready)."]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"- {key}: {len(nodes)} nodes ({', '.join(nodes)})")
return "\n".join(parts)
def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
parts: list[str] = []
inv = _inventory_summary(inventory)
if inv:
parts.append(inv)
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
parts.append(kb_titles)
summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Use the provided context and facts as your source of truth. "
"If the context includes a cluster snapshot, treat the question as about the Atlas/Othrys cluster even if the prompt is ambiguous. "
"When a cluster snapshot is provided, never answer about unrelated meanings of 'Atlas' (maps, mythology, Apache Atlas, etc). "
"Treat 'hottest' as highest utilization (CPU/RAM/NET/IO) rather than temperature. "
"If you infer or synthesize, say 'Based on the snapshot' and keep it brief. "
"Prefer exact repo paths and Kubernetes resource names when relevant. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"Translate metrics into natural language instead of echoing raw label/value pairs. "
"Do not answer by only listing runbooks; if the question is about Atlas/Othrys, summarize the cluster first and mention docs only if useful. "
"If the question is not about Atlas/Othrys and no cluster context is provided, answer using general knowledge and say when you are unsure. "
"If the answer is not grounded in the provided context or tool data, say you do not know. "
"End every response with a line: 'Confidence: high|medium|low'."
)
endpoint = _ollama_endpoint()
if not endpoint:
raise RuntimeError("ollama endpoint missing")
messages: list[dict[str, str]] = [{"role": "system", "content": system}]
if context:
messages.append({"role": "user", "content": "Context (grounded):\n" + context[:MAX_CONTEXT_CHARS]})
if use_history:
messages.extend(_history_to_messages(history[hist_key][-24:]))
messages.append({"role": "user", "content": prompt})
payload = {"model": MODEL, "messages": messages, "stream": False}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(endpoint, data=json.dumps(payload).encode(), headers=headers)
lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None
if lock:
lock.acquire()
try:
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
msg = data.get("message") if isinstance(data, dict) else None
if isinstance(msg, dict):
raw_reply = msg.get("content")
else:
raw_reply = data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
if use_history:
history[hist_key].append(f"Atlas: {reply}")
return reply
finally:
if lock:
lock.release()
def ollama_reply(
hist_key,
prompt: str,
*,
context: str,
fallback: str = "",
use_history: bool = True,
) -> str:
last_error = None
for attempt in range(max(1, OLLAMA_RETRIES + 1)):
try:
return _ollama_call(hist_key, prompt, context=context, use_history=use_history)
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(min(4, 2 ** attempt))
if fallback:
if use_history:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "I don't have enough data to answer that."
def ollama_reply_with_thinking(
token: str,
room: str,
hist_key,
prompt: str,
*,
context: str,
fallback: str,
use_history: bool = True,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(
hist_key,
prompt,
context=context,
fallback=fallback,
use_history=use_history,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
cleaned_body = _strip_bot_mention(body)
lower_body = cleaned_body.lower()
# Only do live cluster introspection in DMs.
allow_tools = is_dm
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(lower_body):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
snapshot = _snapshot_state()
inventory = node_inventory_for_prompt(cleaned_body)
if not inventory:
inventory = _snapshot_inventory(snapshot)
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
context = ""
if cluster_query:
context = build_context(
cleaned_body,
allow_tools=allow_tools,
targets=targets,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
send_msg(token, rid, extra)
continue
fallback = "I don't have enough data to answer that."
if cluster_query:
reply = cluster_answer(
cleaned_body,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if not reply:
reply = fallback
else:
llm_prompt = cleaned_body
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
_start_http_server()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()