1240 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import collections
import json
import os
import re
import ssl
import threading
import time
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\\s*(.*?)\\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\\btitan-[0-9a-z]{2}\\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\\btitan-([0-9a-z]{2})/([0-9a-z]{2})\\b", re.IGNORECASE)
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
if _body_mentions_token(body or ""):
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return ""
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return ""
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
picked = [d for _, d in scored[:limit]]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def _extract_titan_nodes(text: str) -> list[str]:
names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(text or ""):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
return "amd64"
if arch == "arm64":
return "arm64-unknown"
return "unknown"
def node_inventory_live() -> list[dict[str, Any]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return []
items = data.get("items") or []
inventory: list[dict[str, Any]] = []
for node in items if isinstance(items, list) else []:
meta = node.get("metadata") or {}
labels = meta.get("labels") or {}
name = meta.get("name") or ""
if not name:
continue
inventory.append(
{
"name": name,
"arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": _hardware_class(labels),
"roles": _node_roles(labels),
"ready": _node_ready_status(node),
}
)
return sorted(inventory, key=lambda item: item["name"])
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[node.get("hardware") or "unknown"].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str:
q = (query or "").lower()
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
if inventory is None:
inventory = node_inventory_live()
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = sum(1 for node in inventory if node.get("ready") is True)
not_ready = sum(1 for node in inventory if node.get("ready") is False)
lines: list[str] = [
"Node inventory (live):",
f"- total: {total}, ready: {ready}, not ready: {not_ready}",
]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
if key in groups:
lines.append(f"- {key}: {', '.join(groups[key])}")
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}")
unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", [])
if unknowns:
lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.")
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_workers, not_ready_workers = worker_nodes_status()
missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers))
lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}")
lines.append(f"- workers_ready: {', '.join(ready_workers)}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if missing:
lines.append(f"- workers_missing (derived): {', '.join(missing)}")
return "\n".join(lines)
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = (prompt or "").lower()
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory_live()
return []
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
not_ready = [node["name"] for node in inventory if node.get("ready") is False]
groups = _group_nodes(inventory)
return {
"names": sorted(names),
"ready": sorted(ready),
"not_ready": sorted(not_ready),
"groups": groups,
}
def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str) -> str:
q = (prompt or "").lower()
if metrics_summary and any(word in q for word in ("postgres", "connection", "connections", "db")):
return metrics_summary
if not inventory:
return ""
sets = _inventory_sets(inventory)
names = sets["names"]
ready = sets["ready"]
not_ready = sets["not_ready"]
groups = sets["groups"]
total = len(names)
for node in _extract_titan_nodes(q):
if node and ("is" in q or "part of" in q or "in atlas" in q or "in cluster" in q):
if node in names:
return f"Yes. {node} is in the Atlas cluster."
return f"No. {node} is not in the Atlas cluster."
if any(word in q for word in ("how many", "count", "number")) and "node" in q and "worker" not in q:
return f"Atlas has {total} nodes; {len(ready)} ready, {len(not_ready)} not ready."
if "node names" in q or ("nodes" in q and "named" in q) or "naming" in q:
return "Atlas node names: " + ", ".join(names) + "."
if "ready" in q and "node" in q and "worker" in q:
if "not ready" in q or "unready" in q or "down" in q:
return "Worker nodes not ready: " + (", ".join(not_ready) if not_ready else "none") + "."
return "Ready worker nodes ({}): {}.".format(len(ready), ", ".join(ready))
if "worker" in q and any(word in q for word in ("missing", "expected", "should")):
expected_workers = expected_worker_nodes_from_metrics()
missing = sorted(set(expected_workers) - set(ready + not_ready)) if expected_workers else []
if "missing" in q and missing:
return "Missing worker nodes: " + ", ".join(missing) + "."
if expected_workers:
msg = f"Grafana inventory expects {len(expected_workers)} workers."
if missing:
msg += f" Missing: {', '.join(missing)}."
return msg
return "No expected worker inventory found; using live cluster state."
if "worker" in q and "node" in q and "ready" not in q and "missing" not in q:
return f"Worker nodes: {len(ready)} ready, {len(not_ready)} not ready."
if "jetson" in q:
jets = groups.get("jetson", [])
return f"Jetson nodes: {', '.join(jets)}." if jets else "No Jetson nodes found."
if "amd64" in q or "x86" in q:
amd = groups.get("amd64", [])
return f"amd64 nodes: {', '.join(amd)}." if amd else "No amd64 nodes found."
if "rpi4" in q:
rpi4 = groups.get("rpi4", [])
return f"rpi4 nodes: {', '.join(rpi4)}." if rpi4 else "No rpi4 nodes found."
if "rpi5" in q:
rpi5 = groups.get("rpi5", [])
return f"rpi5 nodes: {', '.join(rpi5)}." if rpi5 else "No rpi5 nodes found."
if "raspberry" in q or "rpi" in q:
rpi = sorted(set(groups.get("rpi4", [])) | set(groups.get("rpi5", [])))
return f"Raspberry Pi nodes: {', '.join(rpi)}." if rpi else "No Raspberry Pi nodes found."
if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q:
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
return f"NonRaspberry Pi nodes: {', '.join(non_rpi)}." if non_rpi else "No nonRaspberry Pi nodes found."
if "arm64-unknown" in q or "unknown" in q:
unknown = sorted(set(groups.get("arm64-unknown", [])) | set(groups.get("unknown", [])))
return f"Unknown hardware nodes: {', '.join(unknown)}." if unknown else "No unknown hardware labels."
return ""
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data."
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
fallback = _metrics_fallback_summary(panel, summary)
return context, fallback
def jetson_nodes_from_kb() -> list[str]:
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
if "jetson" not in line.lower():
continue
names = _extract_titan_nodes(line)
if names:
return names
return []
def jetson_nodes_summary(cluster_name: str) -> str:
names = jetson_nodes_from_kb()
if names:
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
return ""
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if panel_l.startswith("postgres connections"):
used = parsed.get("conn=used")
maxv = parsed.get("conn=max")
if used and maxv:
try:
used_i = int(float(used))
max_i = int(float(maxv))
except ValueError:
return f"Postgres connections: {summary}"
free = max_i - used_i
return f"Postgres connections: {used_i}/{max_i} used ({free} free)."
if panel_l.startswith("postgres hottest"):
if parsed:
label, value = next(iter(parsed.items()))
return f"Most Postgres connections: {label} = {value}."
return f"{panel}: {summary}"
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status() -> tuple[list[str], list[str]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ([], [])
items = data.get("items") or []
ready_nodes: list[str] = []
not_ready_nodes: list[str] = []
for node in items if isinstance(items, list) else []:
if not _node_is_worker(node):
continue
name = (node.get("metadata") or {}).get("name") or ""
if not name:
continue
ready = _node_ready_status(node)
if ready is True:
ready_nodes.append(name)
elif ready is False:
not_ready_nodes.append(name)
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def _context_fallback(context: str) -> str:
if not context:
return ""
trimmed = context.strip()
if len(trimmed) > MAX_TOOL_CHARS:
trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..."
return "I couldnt reach the model backend. Here is the data I found:\n" + trimmed
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
def nodes_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
total = nodes.get("total")
ready = nodes.get("ready")
not_ready = nodes.get("not_ready")
if isinstance(total, int) and isinstance(ready, int):
not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
total = len(items)
ready = 0
for node in items:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
ready += 1
break
not_ready = max(total - ready, 0)
if not_ready:
return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady."
return f"{cluster_name} cluster has {total} nodes, all Ready."
def nodes_names_summary(cluster_name: str) -> str:
state = _ariadne_state()
if state:
nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {}
names = nodes.get("names")
if isinstance(names, list) and names:
cleaned = sorted({str(n) for n in names if n})
if len(cleaned) <= 30:
return f"{cluster_name} node names: {', '.join(cleaned)}."
shown = ", ".join(cleaned[:30])
return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)."
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
names = []
for node in items:
name = (node.get("metadata") or {}).get("name") or ""
if name:
names.append(name)
names = sorted(set(names))
if not names:
return ""
if len(names) <= 30:
return f"{cluster_name} node names: {', '.join(names)}."
shown = ", ".join(names[:30])
return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)."
def nodes_arch_summary(cluster_name: str, arch: str) -> str:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ""
items = data.get("items") or []
if not isinstance(items, list) or not items:
return ""
normalized = (arch or "").strip().lower()
if normalized in ("aarch64", "arm64"):
arch_label = "arm64"
elif normalized in ("x86_64", "x86-64", "amd64"):
arch_label = "amd64"
else:
arch_label = normalized
total = 0
for node in items:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("kubernetes.io/arch") == arch_label:
total += 1
return f"{cluster_name} cluster has {total} {arch_label} nodes."
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return text
return text
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def build_context(
prompt: str,
*,
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if kb:
parts.append(kb)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
node_ctx = node_inventory_context(prompt, inventory)
if node_ctx:
parts.append(node_ctx)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
p_l = (prompt or "").lower()
if any(w in p_l for w in METRIC_HINT_WORDS):
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
snap = vm_cluster_snapshot()
if snap:
parts.append("VictoriaMetrics (cluster snapshot):\n" + snap)
return "\n\n".join([p for p in parts if p]).strip()
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception:
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
lower_body = body.lower()
# Only do live cluster introspection in DMs; metrics can be answered when mentioned.
allow_tools = is_dm
allow_metrics = is_dm or mentioned
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(body.lower()):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
inventory = node_inventory_for_prompt(body)
context = build_context(body, allow_tools=allow_tools, targets=targets, inventory=inventory)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = metrics_fallback or ""
if not fallback and context:
fallback = _context_fallback(context)
structured = structured_answer(body, inventory=inventory, metrics_summary=metrics_fallback or "")
if structured:
send_msg(token, rid, structured)
continue
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
body,
context=context,
fallback=fallback,
)
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()