1273 lines
49 KiB
Python
Raw Normal View History

2026-01-13 09:59:39 -03:00
import collections
import json
import os
import re
import ssl
import threading
2026-01-13 09:59:39 -03:00
import time
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
2026-01-13 09:59:39 -03:00
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
2026-01-13 09:59:39 -03:00
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
2026-01-13 09:59:39 -03:00
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
2026-01-13 09:59:39 -03:00
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
2026-01-13 09:59:39 -03:00
"status",
"storage",
"usage",
2026-01-13 09:59:39 -03:00
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
}
2026-01-26 00:52:35 -03:00
CODE_FENCE_RE = re.compile(r"^```(?:json)?\\s*(.*?)\\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\\btitan-[0-9a-z]{2}\\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\\btitan-([0-9a-z]{2})/([0-9a-z]{2})\\b", re.IGNORECASE)
2026-01-26 00:52:35 -03:00
2026-01-13 09:59:39 -03:00
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
2026-01-26 15:54:00 -03:00
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
2026-01-13 09:59:39 -03:00
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
2026-01-26 15:54:00 -03:00
if _body_mentions_token(body or ""):
return True
2026-01-13 09:59:39 -03:00
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
_NODE_CLASS_INDEX: dict[str, list[str]] = {}
_NODE_CLASS_RPI4: set[str] = set()
_NODE_CLASS_RPI5: set[str] = set()
_NODE_CLASS_AMD64: set[str] = set()
_NODE_CLASS_JETSON: set[str] = set()
_NODE_CLASS_EXTERNAL: set[str] = set()
_NODE_CLASS_NON_RPI: set[str] = set()
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
2026-01-13 09:59:39 -03:00
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON
global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI
2026-01-13 09:59:39 -03:00
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
2026-01-13 09:59:39 -03:00
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
2026-01-13 09:59:39 -03:00
node_classes = _parse_node_classes(runbooks)
_NODE_CLASS_INDEX = node_classes
_NODE_CLASS_RPI4 = set(node_classes.get("rpi4", []))
_NODE_CLASS_RPI5 = set(node_classes.get("rpi5", []))
_NODE_CLASS_AMD64 = set(node_classes.get("amd64", []))
_NODE_CLASS_JETSON = set(node_classes.get("jetson", []))
_NODE_CLASS_EXTERNAL = set(node_classes.get("external", []))
_NODE_CLASS_NON_RPI = set(
sorted(
(
set().union(*node_classes.values())
- _NODE_CLASS_RPI4
- _NODE_CLASS_RPI5
- _NODE_CLASS_EXTERNAL
)
)
)
2026-01-13 09:59:39 -03:00
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return ""
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return ""
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
picked = [d for _, d in scored[:limit]]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def _extract_titan_nodes(text: str) -> list[str]:
names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(text or ""):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]:
classes: dict[str, list[str]] = {}
for doc in runbooks:
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
stripped = line.strip()
if "titan-" not in stripped.lower():
continue
label = ""
nodes: list[str] = []
if stripped.startswith("-") and ":" in stripped:
label, rest = stripped.lstrip("-").split(":", 1)
nodes = _extract_titan_nodes(rest)
label = label.strip().lower()
else:
nodes = _extract_titan_nodes(stripped)
if not nodes:
continue
if "jetson" in stripped.lower():
classes.setdefault("jetson", nodes)
if "amd64" in stripped.lower() or "x86" in stripped.lower():
classes.setdefault("amd64", nodes)
if "rpi4" in stripped.lower():
classes.setdefault("rpi4", nodes)
if "rpi5" in stripped.lower():
classes.setdefault("rpi5", nodes)
if "external" in stripped.lower() or "non-cluster" in stripped.lower():
classes.setdefault("external", nodes)
if label:
classes.setdefault(label, nodes)
return {k: sorted(set(v)) for k, v in classes.items()}
def node_inventory_answer(cluster_name: str, query: str) -> str:
q = (query or "").lower()
if "jetson" in q and _NODE_CLASS_JETSON:
names = sorted(_NODE_CLASS_JETSON)
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q:
names = sorted(_NODE_CLASS_NON_RPI)
if names:
return f"{cluster_name} nonRaspberry Pi nodes: {', '.join(names)}."
if "raspberry" in q or "rpi" in q:
if "rpi4" in q and _NODE_CLASS_RPI4:
names = sorted(_NODE_CLASS_RPI4)
return f"{cluster_name} rpi4 nodes: {', '.join(names)}."
if "rpi5" in q and _NODE_CLASS_RPI5:
names = sorted(_NODE_CLASS_RPI5)
return f"{cluster_name} rpi5 nodes: {', '.join(names)}."
names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5)
if names:
return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}."
if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64:
names = sorted(_NODE_CLASS_AMD64)
return f"{cluster_name} amd64 nodes: {', '.join(names)}."
return ""
def node_inventory_context(query: str) -> str:
q = (query or "").lower()
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")):
return ""
lines: list[str] = ["Node inventory (KB):"]
if _NODE_CLASS_RPI5:
lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}")
if _NODE_CLASS_RPI4:
lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}")
if _NODE_CLASS_JETSON:
lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}")
if _NODE_CLASS_AMD64:
lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}")
if _NODE_CLASS_EXTERNAL:
lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}")
if len(lines) == 1:
return ""
return "\n".join(lines)
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data."
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
fallback = _metrics_fallback_summary(panel, summary)
return context, fallback
def jetson_nodes_from_kb() -> list[str]:
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
if "jetson" not in line.lower():
continue
names = _extract_titan_nodes(line)
if names:
return names
return []
def jetson_nodes_summary(cluster_name: str) -> str:
names = jetson_nodes_from_kb()
if names:
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
return ""
2026-01-13 09:59:39 -03:00
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
2026-01-13 09:59:39 -03:00
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if panel_l.startswith("postgres connections"):
used = parsed.get("conn=used")
maxv = parsed.get("conn=max")
if used and maxv:
try:
used_i = int(float(used))
max_i = int(float(maxv))
except ValueError:
return f"Postgres connections: {summary}"
free = max_i - used_i
return f"Postgres connections: {used_i}/{max_i} used ({free} free)."
if panel_l.startswith("postgres hottest"):
if parsed:
label, value = next(iter(parsed.items()))
return f"Most Postgres connections: {label} = {value}."
return f"{panel}: {summary}"
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status() -> tuple[list[str], list[str]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ([], [])
items = data.get("items") or []
ready_nodes: list[str] = []
not_ready_nodes: list[str] = []
for node in items if isinstance(items, list) else []:
if not _node_is_worker(node):
continue
name = (node.get("metadata") or {}).get("name") or ""
if not name:
continue
ready = _node_ready_status(node)
if ready is True:
ready_nodes.append(name)
elif ready is False:
not_ready_nodes.append(name)
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_nodes_from_kb() -> set[str]:
if not _NODE_CLASS_INDEX:
return set()
nodes = set().union(*_NODE_CLASS_INDEX.values())
return {n for n in nodes if n and n not in _NODE_CLASS_EXTERNAL}
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def missing_nodes_answer(cluster_name: str) -> str:
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_nodes, not_ready_nodes = worker_nodes_status()
current_workers = set(ready_nodes + not_ready_nodes)
missing = sorted(set(expected_workers) - current_workers)
if not missing:
return f"{cluster_name}: no missing worker nodes versus Grafana inventory."
return f"{cluster_name} missing worker nodes versus Grafana inventory: {', '.join(missing)}."
expected = expected_nodes_from_kb()
if not expected:
return ""
current = set()
try:
data = k8s_get("/api/v1/nodes?limit=500")
items = data.get("items") or []
for node in items if isinstance(items, list) else []:
name = (node.get("metadata") or {}).get("name") or ""
if name:
current.add(name)
except Exception:
return ""
missing = sorted(expected - current)
if not missing:
return f"{cluster_name}: no missing nodes versus KB inventory."
return f"{cluster_name} missing nodes versus KB inventory: {', '.join(missing)}."
def _should_short_circuit(prompt: str, fallback: str) -> bool:
if not fallback:
return False
lower = (prompt or "").lower()
for word in ("why", "explain", "architecture", "breakdown", "root cause", "plan"):
if word in lower:
return False
return True
2026-01-13 09:59:39 -03:00
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
2026-01-26 01:07:49 -03:00
def nodes_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
total = nodes.get("total")
ready = nodes.get("ready")
not_ready = nodes.get("not_ready")
if isinstance(total, int) and isinstance(ready, int):
not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
2026-01-26 01:07:49 -03:00
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
total = len(items)
ready = 0
for node in items:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
ready += 1
break
not_ready = max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
2026-01-26 01:35:47 -03:00
def nodes_names_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
names = nodes.get("names")
if isinstance(names, list) and names:
cleaned = sorted({str(n) for n in names if n})
if len(cleaned) <= 30:
return f"{cluster_name} node names: {', '.join(cleaned)}."
shown = ", ".join(cleaned[:30])
return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)."
2026-01-26 01:35:47 -03:00
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
names = []
for node in items:
name = (node.get("metadata") or {}).get("name") or ""
if name:
names.append(name)
names = sorted(set(names))
if not names:
return ""
if len(names) <= 30:
return f"{cluster_name} node names: {', '.join(names)}."
shown = ", ".join(names[:30])
return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)."
def nodes_arch_summary(cluster_name: str, arch: str) -> str:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
normalized = (arch or "").strip().lower()
if normalized in ("aarch64", "arm64"):
arch_label = "arm64"
elif normalized in ("x86_64", "x86-64", "amd64"):
arch_label = "amd64"
else:
arch_label = normalized
total = 0
for node in items:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("kubernetes.io/arch") == arch_label:
total += 1
return f"{cluster_name} cluster has {total} {arch_label} nodes."
2026-01-26 00:52:35 -03:00
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return text
return text
2026-01-13 09:59:39 -03:00
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if kb:
parts.append(kb)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
inventory = node_inventory_context(prompt)
if inventory:
parts.append(inventory)
2026-01-13 09:59:39 -03:00
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
p_l = (prompt or "").lower()
if any(w in p_l for w in METRIC_HINT_WORDS):
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
snap = vm_cluster_snapshot()
if snap:
parts.append("VictoriaMetrics (cluster snapshot):\n" + snap)
return "\n\n".join([p for p in parts if p]).strip()
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
2026-01-13 09:59:39 -03:00
try:
return _ollama_call(hist_key, prompt, context=context)
2026-01-13 09:59:39 -03:00
except Exception:
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
2026-01-13 09:59:39 -03:00
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
2026-01-26 01:07:49 -03:00
lower_body = body.lower()
2026-01-26 01:32:01 -03:00
if re.search(r"\bhow many nodes\b|\bnode count\b|\bnumber of nodes\b", lower_body):
2026-01-26 01:07:49 -03:00
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
summary = nodes_summary("Atlas")
2026-01-26 01:32:01 -03:00
if not summary:
send_msg(token, rid, "I couldnt reach the cluster API to count nodes. Try again in a moment.")
2026-01-26 01:07:49 -03:00
continue
2026-01-26 01:32:01 -03:00
send_msg(token, rid, summary)
continue
if "worker" in lower_body and "node" in lower_body:
ready_nodes, not_ready_nodes = worker_nodes_status()
total = len(ready_nodes) + len(not_ready_nodes)
if total:
2026-01-26 18:18:42 -03:00
missing_hint = missing_nodes_answer("Atlas")
expected_workers = expected_worker_nodes_from_metrics()
expected_total = len(expected_workers) if expected_workers else 0
if any(word in lower_body for word in ("ready", "not ready", "unready")):
if not_ready_nodes:
send_msg(
token,
rid,
f"Worker nodes not Ready: {', '.join(not_ready_nodes)}.",
)
else:
2026-01-26 18:18:42 -03:00
msg = f"All {len(ready_nodes)} worker nodes are Ready."
if expected_total and len(ready_nodes) != expected_total:
missing = sorted(set(expected_workers) - set(ready_nodes))
if missing:
msg += f" Missing: {', '.join(missing)}."
elif missing_hint and "no missing" not in missing_hint:
2026-01-26 18:18:42 -03:00
msg += f" {missing_hint}"
send_msg(token, rid, msg)
continue
if any(word in lower_body for word in ("how many", "should")):
2026-01-26 18:18:42 -03:00
msg = (
f"Atlas has {total} worker nodes; "
f"{len(ready_nodes)} Ready, {len(not_ready_nodes)} NotReady."
)
if expected_total:
msg += f" Grafana inventory expects {expected_total} workers."
missing = sorted(set(expected_workers) - set(ready_nodes))
if missing:
msg += f" Missing: {', '.join(missing)}."
elif missing_hint and "no missing" not in missing_hint:
2026-01-26 18:18:42 -03:00
msg += f" {missing_hint}"
2026-01-26 18:21:17 -03:00
elif "should" in lower_body:
msg += " I dont have an expected worker inventory in the KB; this is the current cluster state."
2026-01-26 18:18:42 -03:00
send_msg(token, rid, msg)
continue
if "missing" in lower_body and "node" in lower_body:
missing = missing_nodes_answer("Atlas")
if missing:
send_msg(token, rid, missing)
continue
inventory_answer = node_inventory_answer("Atlas", lower_body)
if inventory_answer:
send_msg(token, rid, inventory_answer)
continue
if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64"
summary = nodes_arch_summary("Atlas", arch)
if not summary:
send_msg(
token,
rid,
"I couldnt reach the cluster API to count nodes by architecture. Try again in a moment.",
)
continue
send_msg(token, rid, summary)
continue
if re.search(r"\bnode names?\b|\bnodes?\b.*\bnamed\b|\bnaming\b", lower_body):
2026-01-26 01:35:47 -03:00
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
names_summary = nodes_names_summary("Atlas")
if not names_summary:
send_msg(token, rid, "I couldnt reach the cluster API to list node names. Try again in a moment.")
continue
send_msg(token, rid, names_summary)
continue
if re.search(r"\bwhich nodes are ready\b|\bnodes ready\b", lower_body):
ready_nodes, not_ready_nodes = worker_nodes_status()
if ready_nodes:
msg = f"Ready worker nodes ({len(ready_nodes)}): {', '.join(ready_nodes)}."
if not_ready_nodes:
msg += f" Not Ready: {', '.join(not_ready_nodes)}."
send_msg(token, rid, msg)
continue
2026-01-13 09:59:39 -03:00
# Only do live cluster introspection in DMs; metrics can be answered when mentioned.
2026-01-13 09:59:39 -03:00
allow_tools = is_dm
allow_metrics = is_dm or mentioned
2026-01-13 09:59:39 -03:00
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(body.lower()):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
context = build_context(body, allow_tools=allow_tools, targets=targets)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = ""
if "node" in lower_body or "cluster" in lower_body:
fallback = node_inventory_answer("Atlas", lower_body)
if metrics_fallback and not fallback:
fallback = metrics_fallback
if _should_short_circuit(body, fallback):
send_msg(token, rid, fallback)
continue
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
body,
context=context,
fallback=fallback,
)
2026-01-13 09:59:39 -03:00
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()