1069 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import collections
import json
import os
import re
import ssl
import threading
import time
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\\s*(.*?)\\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\\btitan-[0-9a-z]{2}\\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\\btitan-([0-9a-z]{2})/([0-9a-z]{2})\\b", re.IGNORECASE)
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
_NODE_CLASS_INDEX: dict[str, list[str]] = {}
_NODE_CLASS_RPI4: set[str] = set()
_NODE_CLASS_RPI5: set[str] = set()
_NODE_CLASS_AMD64: set[str] = set()
_NODE_CLASS_JETSON: set[str] = set()
_NODE_CLASS_EXTERNAL: set[str] = set()
_NODE_CLASS_NON_RPI: set[str] = set()
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON
global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
node_classes = _parse_node_classes(runbooks)
_NODE_CLASS_INDEX = node_classes
_NODE_CLASS_RPI4 = set(node_classes.get("rpi4", []))
_NODE_CLASS_RPI5 = set(node_classes.get("rpi5", []))
_NODE_CLASS_AMD64 = set(node_classes.get("amd64", []))
_NODE_CLASS_JETSON = set(node_classes.get("jetson", []))
_NODE_CLASS_EXTERNAL = set(node_classes.get("external", []))
_NODE_CLASS_NON_RPI = set(
sorted(
(
set().union(*node_classes.values())
- _NODE_CLASS_RPI4
- _NODE_CLASS_RPI5
- _NODE_CLASS_EXTERNAL
)
)
)
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return ""
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return ""
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
picked = [d for _, d in scored[:limit]]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def _extract_titan_nodes(text: str) -> list[str]:
names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(text or ""):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]:
classes: dict[str, list[str]] = {}
for doc in runbooks:
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
stripped = line.strip()
if "titan-" not in stripped.lower():
continue
label = ""
nodes: list[str] = []
if stripped.startswith("-") and ":" in stripped:
label, rest = stripped.lstrip("-").split(":", 1)
nodes = _extract_titan_nodes(rest)
label = label.strip().lower()
else:
nodes = _extract_titan_nodes(stripped)
if not nodes:
continue
if "jetson" in stripped.lower():
classes.setdefault("jetson", nodes)
if "amd64" in stripped.lower() or "x86" in stripped.lower():
classes.setdefault("amd64", nodes)
if "rpi4" in stripped.lower():
classes.setdefault("rpi4", nodes)
if "rpi5" in stripped.lower():
classes.setdefault("rpi5", nodes)
if "external" in stripped.lower() or "non-cluster" in stripped.lower():
classes.setdefault("external", nodes)
if label:
classes.setdefault(label, nodes)
return {k: sorted(set(v)) for k, v in classes.items()}
def node_inventory_answer(cluster_name: str, query: str) -> str:
q = (query or "").lower()
if "jetson" in q and _NODE_CLASS_JETSON:
names = sorted(_NODE_CLASS_JETSON)
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q:
names = sorted(_NODE_CLASS_NON_RPI)
if names:
return f"{cluster_name} nonRaspberry Pi nodes: {', '.join(names)}."
if "raspberry" in q or "rpi" in q:
if "rpi4" in q and _NODE_CLASS_RPI4:
names = sorted(_NODE_CLASS_RPI4)
return f"{cluster_name} rpi4 nodes: {', '.join(names)}."
if "rpi5" in q and _NODE_CLASS_RPI5:
names = sorted(_NODE_CLASS_RPI5)
return f"{cluster_name} rpi5 nodes: {', '.join(names)}."
names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5)
if names:
return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}."
if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64:
names = sorted(_NODE_CLASS_AMD64)
return f"{cluster_name} amd64 nodes: {', '.join(names)}."
return ""
def node_inventory_context(query: str) -> str:
q = (query or "").lower()
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")):
return ""
lines: list[str] = ["Node inventory (KB):"]
if _NODE_CLASS_RPI5:
lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}")
if _NODE_CLASS_RPI4:
lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}")
if _NODE_CLASS_JETSON:
lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}")
if _NODE_CLASS_AMD64:
lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}")
if _NODE_CLASS_EXTERNAL:
lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}")
if len(lines) == 1:
return ""
return "\n".join(lines)
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data."
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
fallback = f"{panel}: {summary}"
return context, fallback
def jetson_nodes_from_kb() -> list[str]:
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
if "jetson" not in line.lower():
continue
names = _extract_titan_nodes(line)
if names:
return names
return []
def jetson_nodes_summary(cluster_name: str) -> str:
names = jetson_nodes_from_kb()
if names:
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
return ""
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
def nodes_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
total = nodes.get("total")
ready = nodes.get("ready")
not_ready = nodes.get("not_ready")
if isinstance(total, int) and isinstance(ready, int):
not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
total = len(items)
ready = 0
for node in items:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
ready += 1
break
not_ready = max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
def nodes_names_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
names = nodes.get("names")
if isinstance(names, list) and names:
cleaned = sorted({str(n) for n in names if n})
if len(cleaned) <= 30:
return f"{cluster_name} node names: {', '.join(cleaned)}."
shown = ", ".join(cleaned[:30])
return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)."
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
names = []
for node in items:
name = (node.get("metadata") or {}).get("name") or ""
if name:
names.append(name)
names = sorted(set(names))
if not names:
return ""
if len(names) <= 30:
return f"{cluster_name} node names: {', '.join(names)}."
shown = ", ".join(names[:30])
return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)."
def nodes_arch_summary(cluster_name: str, arch: str) -> str:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
normalized = (arch or "").strip().lower()
if normalized in ("aarch64", "arm64"):
arch_label = "arm64"
elif normalized in ("x86_64", "x86-64", "amd64"):
arch_label = "amd64"
else:
arch_label = normalized
total = 0
for node in items:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("kubernetes.io/arch") == arch_label:
total += 1
return f"{cluster_name} cluster has {total} {arch_label} nodes."
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return text
return text
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if kb:
parts.append(kb)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
inventory = node_inventory_context(prompt)
if inventory:
parts.append(inventory)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
p_l = (prompt or "").lower()
if any(w in p_l for w in METRIC_HINT_WORDS):
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
snap = vm_cluster_snapshot()
if snap:
parts.append("VictoriaMetrics (cluster snapshot):\n" + snap)
return "\n\n".join([p for p in parts if p]).strip()
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception:
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
lower_body = body.lower()
if re.search(r"\bhow many nodes\b|\bnode count\b|\bnumber of nodes\b", lower_body):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
summary = nodes_summary("Atlas")
if not summary:
send_msg(token, rid, "I couldnt reach the cluster API to count nodes. Try again in a moment.")
continue
send_msg(token, rid, summary)
continue
inventory_answer = node_inventory_answer("Atlas", lower_body)
if inventory_answer:
send_msg(token, rid, inventory_answer)
continue
if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64"
summary = nodes_arch_summary("Atlas", arch)
if not summary:
send_msg(
token,
rid,
"I couldnt reach the cluster API to count nodes by architecture. Try again in a moment.",
)
continue
send_msg(token, rid, summary)
continue
if re.search(r"\bnode names?\b|\bnodes? named\b|\bnaming\b", lower_body):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
names_summary = nodes_names_summary("Atlas")
if not names_summary:
send_msg(token, rid, "I couldnt reach the cluster API to list node names. Try again in a moment.")
continue
send_msg(token, rid, names_summary)
continue
# Only do live cluster introspection in DMs; metrics can be answered when mentioned.
allow_tools = is_dm
allow_metrics = is_dm or mentioned
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(body.lower()):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
context = build_context(body, allow_tools=allow_tools, targets=targets)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = ""
if "node" in lower_body or "cluster" in lower_body:
fallback = node_inventory_answer("Atlas", lower_body)
if metrics_fallback and not fallback:
fallback = metrics_fallback
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
body,
context=context,
fallback=fallback,
)
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()