import collections import json import os import re import ssl import threading import time from typing import Any from urllib import error, parse, request BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") USER = os.environ["BOT_USER"] PASSWORD = os.environ["BOT_PASS"] ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480")) KB_DIR = os.environ.get("KB_DIR", "") VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "") ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120")) TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") STOPWORDS = { "the", "and", "for", "with", "this", "that", "from", "into", "what", "how", "why", "when", "where", "which", "who", "can", "could", "should", "would", "please", "help", "atlas", "othrys", } METRIC_HINT_WORDS = { "bandwidth", "connections", "cpu", "database", "db", "disk", "health", "memory", "network", "node", "nodes", "postgres", "status", "storage", "usage", "down", "slow", "error", "unknown_error", "timeout", "crash", "crashloop", "restart", "restarts", "pending", "unreachable", "latency", } CODE_FENCE_RE = re.compile(r"^```(?:json)?\\s*(.*?)\\s*```$", re.DOTALL) TITAN_NODE_RE = re.compile(r"\\btitan-[0-9a-z]{2}\\b", re.IGNORECASE) TITAN_RANGE_RE = re.compile(r"\\btitan-([0-9a-z]{2})/([0-9a-z]{2})\\b", re.IGNORECASE) def _tokens(text: str) -> list[str]: toks = [t.lower() for t in TOKEN_RE.findall(text or "")] return [t for t in toks if t not in STOPWORDS and len(t) >= 2] # Mention detection (Matrix rich mentions + plain @atlas). MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_RE = re.compile( r"(? str: t = token.strip() if not t: return "" if t.startswith("@") and ":" in t: return t t = t.lstrip("@") if ":" in t: return f"@{t}" return f"@{t}:{SERVER_NAME}" MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} def _body_mentions_token(body: str) -> bool: lower = (body or "").strip().lower() if not lower: return False for token in MENTION_LOCALPARTS: for prefix in (token, f"@{token}"): if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "): return True return False def is_mentioned(content: dict, body: str) -> bool: if MENTION_RE.search(body or "") is not None: return True if _body_mentions_token(body or ""): return True mentions = content.get("m.mentions", {}) user_ids = mentions.get("user_ids", []) if not isinstance(user_ids, list): return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path data = None headers = {} if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" r = request.Request(url, data=data, headers=headers, method=method) with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def login() -> str: login_user = normalize_user_id(USER) payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": login_user}, "password": PASSWORD, } res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) return res["access_token"] def resolve_alias(token: str, alias: str) -> str: enc = parse.quote(alias) res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) return res["room_id"] def join_room(token: str, room: str): req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) def send_msg(token: str, room: str, text: str): path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" req("POST", path, token, body={"msgtype": "m.text", "body": text}) # Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() _METRIC_INDEX: list[dict[str, Any]] = [] _NODE_CLASS_INDEX: dict[str, list[str]] = {} _NODE_CLASS_RPI4: set[str] = set() _NODE_CLASS_RPI5: set[str] = set() _NODE_CLASS_AMD64: set[str] = set() _NODE_CLASS_JETSON: set[str] = set() _NODE_CLASS_EXTERNAL: set[str] = set() _NODE_CLASS_NON_RPI: set[str] = set() def _load_json_file(path: str) -> Any | None: try: with open(path, "rb") as f: return json.loads(f.read().decode("utf-8")) except Exception: return None def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or [] KB = {"catalog": catalog, "runbooks": runbooks} host_index: dict[str, list[dict]] = collections.defaultdict(list) for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: host = (ep.get("host") or "").lower() if host: host_index[host].append(ep) _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} names: set[str] = set() for s in catalog.get("services", []) if isinstance(catalog, dict) else []: if isinstance(s, dict) and s.get("name"): names.add(str(s["name"]).lower()) for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: if isinstance(w, dict) and w.get("name"): names.add(str(w["name"]).lower()) _NAME_INDEX = names _METRIC_INDEX = metrics if isinstance(metrics, list) else [] node_classes = _parse_node_classes(runbooks) _NODE_CLASS_INDEX = node_classes _NODE_CLASS_RPI4 = set(node_classes.get("rpi4", [])) _NODE_CLASS_RPI5 = set(node_classes.get("rpi5", [])) _NODE_CLASS_AMD64 = set(node_classes.get("amd64", [])) _NODE_CLASS_JETSON = set(node_classes.get("jetson", [])) _NODE_CLASS_EXTERNAL = set(node_classes.get("external", [])) _NODE_CLASS_NON_RPI = set( sorted( ( set().union(*node_classes.values()) - _NODE_CLASS_RPI4 - _NODE_CLASS_RPI5 - _NODE_CLASS_EXTERNAL ) ) ) def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q or not KB.get("runbooks"): return "" ql = q.lower() q_tokens = _tokens(q) if not q_tokens: return "" scored: list[tuple[int, dict]] = [] for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue title = str(doc.get("title") or "") body = str(doc.get("body") or "") tags = doc.get("tags") or [] entrypoints = doc.get("entrypoints") or [] hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() score = 0 for t in set(q_tokens): if t in hay: score += 3 if t in title.lower() else 1 for h in entrypoints: if isinstance(h, str) and h.lower() in ql: score += 4 if score: scored.append((score, doc)) scored.sort(key=lambda x: x[0], reverse=True) picked = [d for _, d in scored[:limit]] if not picked: return "" parts: list[str] = ["Atlas KB (retrieved):"] used = 0 for d in picked: path = d.get("path") or "" title = d.get("title") or path body = (d.get("body") or "").strip() snippet = body[:900].strip() chunk = f"- {title} ({path})\n{snippet}" if used + len(chunk) > MAX_KB_CHARS: break parts.append(chunk) used += len(chunk) return "\n".join(parts).strip() def _extract_titan_nodes(text: str) -> list[str]: names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n} for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE): tail = match.group(1) for part in re.split(r"[/,]", tail): part = part.strip() if part: names.add(f"titan-{part.lower()}") for match in TITAN_RANGE_RE.finditer(text or ""): left, right = match.groups() if left: names.add(f"titan-{left.lower()}") if right: names.add(f"titan-{right.lower()}") return sorted(names) def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]: classes: dict[str, list[str]] = {} for doc in runbooks: if not isinstance(doc, dict): continue body = str(doc.get("body") or "") for line in body.splitlines(): stripped = line.strip() if "titan-" not in stripped.lower(): continue label = "" nodes: list[str] = [] if stripped.startswith("-") and ":" in stripped: label, rest = stripped.lstrip("-").split(":", 1) nodes = _extract_titan_nodes(rest) label = label.strip().lower() else: nodes = _extract_titan_nodes(stripped) if not nodes: continue if "jetson" in stripped.lower(): classes.setdefault("jetson", nodes) if "amd64" in stripped.lower() or "x86" in stripped.lower(): classes.setdefault("amd64", nodes) if "rpi4" in stripped.lower(): classes.setdefault("rpi4", nodes) if "rpi5" in stripped.lower(): classes.setdefault("rpi5", nodes) if "external" in stripped.lower() or "non-cluster" in stripped.lower(): classes.setdefault("external", nodes) if label: classes.setdefault(label, nodes) return {k: sorted(set(v)) for k, v in classes.items()} def node_inventory_answer(cluster_name: str, query: str) -> str: q = (query or "").lower() if "jetson" in q and _NODE_CLASS_JETSON: names = sorted(_NODE_CLASS_JETSON) return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q: names = sorted(_NODE_CLASS_NON_RPI) if names: return f"{cluster_name} non‑Raspberry Pi nodes: {', '.join(names)}." if "raspberry" in q or "rpi" in q: if "rpi4" in q and _NODE_CLASS_RPI4: names = sorted(_NODE_CLASS_RPI4) return f"{cluster_name} rpi4 nodes: {', '.join(names)}." if "rpi5" in q and _NODE_CLASS_RPI5: names = sorted(_NODE_CLASS_RPI5) return f"{cluster_name} rpi5 nodes: {', '.join(names)}." names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5) if names: return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}." if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64: names = sorted(_NODE_CLASS_AMD64) return f"{cluster_name} amd64 nodes: {', '.join(names)}." return "" def node_inventory_context(query: str) -> str: q = (query or "").lower() if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")): return "" lines: list[str] = ["Node inventory (KB):"] if _NODE_CLASS_RPI5: lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}") if _NODE_CLASS_RPI4: lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}") if _NODE_CLASS_JETSON: lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}") if _NODE_CLASS_AMD64: lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}") if _NODE_CLASS_EXTERNAL: lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}") if len(lines) == 1: return "" return "\n".join(lines) def _metric_tokens(entry: dict[str, Any]) -> str: parts: list[str] = [] for key in ("panel_title", "dashboard", "description"): val = entry.get(key) if isinstance(val, str) and val: parts.append(val.lower()) tags = entry.get("tags") if isinstance(tags, list): parts.extend(str(t).lower() for t in tags if t) return " ".join(parts) def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]: q_tokens = _tokens(query) if not q_tokens or not _METRIC_INDEX: return [] scored: list[tuple[int, dict[str, Any]]] = [] for entry in _METRIC_INDEX: if not isinstance(entry, dict): continue hay = _metric_tokens(entry) if not hay: continue score = 0 for t in set(q_tokens): if t in hay: score += 2 if t in (entry.get("panel_title") or "").lower() else 1 if score: scored.append((score, entry)) scored.sort(key=lambda item: item[0], reverse=True) return [entry for _, entry in scored[:limit]] def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]: if not allow_tools: return "", "" lower = (prompt or "").lower() if not any(word in lower for word in METRIC_HINT_WORDS): return "", "" matches = metrics_lookup(prompt, limit=1) if not matches: return "", "" entry = matches[0] dashboard = entry.get("dashboard") or "dashboard" panel = entry.get("panel_title") or "panel" exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else [] if not exprs: return "", "" rendered_parts: list[str] = [] for expr in exprs[:2]: res = vm_query(expr, timeout=20) rendered = vm_render_result(res, limit=10) if rendered: rendered_parts.append(rendered) if not rendered_parts: return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data." summary = "\n".join(rendered_parts) context = f"Metrics (from {dashboard} / {panel}):\n{summary}" fallback = _metrics_fallback_summary(panel, summary) return context, fallback def jetson_nodes_from_kb() -> list[str]: for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue body = str(doc.get("body") or "") for line in body.splitlines(): if "jetson" not in line.lower(): continue names = _extract_titan_nodes(line) if names: return names return [] def jetson_nodes_summary(cluster_name: str) -> str: names = jetson_nodes_from_kb() if names: return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." return "" def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: q = (query or "").strip() if not q or not KB.get("catalog"): return "", [] ql = q.lower() hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} # Also match by known workload/service names. for t in _tokens(ql): if t in _NAME_INDEX: hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} edges: list[tuple[str, str]] = [] lines: list[str] = [] for host in sorted(hosts): for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" svc = backend.get("service") or "" path = ep.get("path") or "/" if not svc: continue wk = backend.get("workloads") or [] wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") for w in wk: if isinstance(w, dict) and w.get("name"): edges.append((ns, str(w["name"]))) if not lines: return "", [] return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges # Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. _K8S_TOKEN: str | None = None _K8S_CTX: ssl.SSLContext | None = None def _k8s_context() -> ssl.SSLContext: global _K8S_CTX if _K8S_CTX is not None: return _K8S_CTX ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ctx = ssl.create_default_context(cafile=ca_path) _K8S_CTX = ctx return ctx def _k8s_token() -> str: global _K8S_TOKEN if _K8S_TOKEN: return _K8S_TOKEN token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" with open(token_path, "r", encoding="utf-8") as f: _K8S_TOKEN = f.read().strip() return _K8S_TOKEN def k8s_get(path: str, timeout: int = 8) -> dict: host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" if not host: raise RuntimeError("k8s host missing") url = f"https://{host}:{port}{path}" headers = {"Authorization": f"Bearer {_k8s_token()}"} r = request.Request(url, headers=headers, method="GET") with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def _ariadne_state(timeout: int = 5) -> dict | None: if not ARIADNE_STATE_URL: return None headers = {} if ARIADNE_STATE_TOKEN: headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET") try: with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() payload = json.loads(raw.decode()) if raw else {} return payload if isinstance(payload, dict) else None except Exception: return None def k8s_pods(namespace: str) -> list[dict]: data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") items = data.get("items") or [] return items if isinstance(items, list) else [] def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: try: pods = k8s_pods(namespace) except Exception: return "" out: list[str] = [] for p in pods: md = p.get("metadata") or {} st = p.get("status") or {} name = md.get("name") or "" if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): continue phase = st.get("phase") or "?" cs = st.get("containerStatuses") or [] restarts = 0 ready = 0 total = 0 reason = st.get("reason") or "" for c in cs if isinstance(cs, list) else []: if not isinstance(c, dict): continue total += 1 restarts += int(c.get("restartCount") or 0) if c.get("ready"): ready += 1 state = c.get("state") or {} if not reason and isinstance(state, dict): waiting = state.get("waiting") or {} if isinstance(waiting, dict) and waiting.get("reason"): reason = waiting.get("reason") extra = f" ({reason})" if reason else "" out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") return "\n".join(out[:20]) def flux_not_ready() -> str: try: data = k8s_get( "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" ) except Exception: return "" items = data.get("items") or [] bad: list[str] = [] for it in items if isinstance(items, list) else []: md = it.get("metadata") or {} st = it.get("status") or {} name = md.get("name") or "" conds = st.get("conditions") or [] ready = None msg = "" for c in conds if isinstance(conds, list) else []: if isinstance(c, dict) and c.get("type") == "Ready": ready = c.get("status") msg = c.get("message") or "" if ready not in ("True", True): bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) return "\n".join(bad[:10]) # VictoriaMetrics (PromQL) helpers. def vm_query(query: str, timeout: int = 8) -> dict | None: try: url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) with request.urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception: return None def _vm_value_series(res: dict) -> list[dict]: if not res or (res.get("status") != "success"): return [] data = res.get("data") or {} result = data.get("result") or [] return result if isinstance(result, list) else [] def vm_render_result(res: dict | None, limit: int = 12) -> str: if not res: return "" series = _vm_value_series(res) if not series: return "" out: list[str] = [] for r in series[:limit]: if not isinstance(r, dict): continue metric = r.get("metric") or {} value = r.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" # Prefer common labels if present. label_parts = [] for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): if isinstance(metric, dict) and metric.get(k): label_parts.append(f"{k}={metric.get(k)}") if not label_parts and isinstance(metric, dict): for k in sorted(metric.keys()): if k.startswith("__"): continue label_parts.append(f"{k}={metric.get(k)}") if len(label_parts) >= 4: break labels = ", ".join(label_parts) if label_parts else "series" out.append(f"- {labels}: {val}") return "\n".join(out) def _parse_metric_lines(summary: str) -> dict[str, str]: parsed: dict[str, str] = {} for line in (summary or "").splitlines(): line = line.strip() if not line.startswith("-"): continue try: label, value = line.lstrip("-").split(":", 1) except ValueError: continue parsed[label.strip()] = value.strip() return parsed def _metrics_fallback_summary(panel: str, summary: str) -> str: parsed = _parse_metric_lines(summary) panel_l = (panel or "").lower() if panel_l.startswith("postgres connections"): used = parsed.get("conn=used") maxv = parsed.get("conn=max") if used and maxv: try: used_i = int(float(used)) max_i = int(float(maxv)) except ValueError: return f"Postgres connections: {summary}" free = max_i - used_i return f"Postgres connections: {used_i}/{max_i} used ({free} free)." if panel_l.startswith("postgres hottest"): if parsed: label, value = next(iter(parsed.items())) return f"Most Postgres connections: {label} = {value}." return f"{panel}: {summary}" def _node_ready_status(node: dict) -> bool | None: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": return True if cond.get("status") == "False": return False return None return None def _node_is_worker(node: dict) -> bool: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("node-role.kubernetes.io/control-plane") is not None: return False if labels.get("node-role.kubernetes.io/master") is not None: return False if labels.get("node-role.kubernetes.io/worker") is not None: return True return True def worker_nodes_status() -> tuple[list[str], list[str]]: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return ([], []) items = data.get("items") or [] ready_nodes: list[str] = [] not_ready_nodes: list[str] = [] for node in items if isinstance(items, list) else []: if not _node_is_worker(node): continue name = (node.get("metadata") or {}).get("name") or "" if not name: continue ready = _node_ready_status(node) if ready is True: ready_nodes.append(name) elif ready is False: not_ready_nodes.append(name) return (sorted(ready_nodes), sorted(not_ready_nodes)) def expected_nodes_from_kb() -> set[str]: if not _NODE_CLASS_INDEX: return set() nodes = set().union(*_NODE_CLASS_INDEX.values()) return {n for n in nodes if n and n not in _NODE_CLASS_EXTERNAL} def missing_nodes_answer(cluster_name: str) -> str: expected = expected_nodes_from_kb() if not expected: return "" current = set() try: data = k8s_get("/api/v1/nodes?limit=500") items = data.get("items") or [] for node in items if isinstance(items, list) else []: name = (node.get("metadata") or {}).get("name") or "" if name: current.add(name) except Exception: return "" missing = sorted(expected - current) if not missing: return f"{cluster_name}: no missing nodes versus KB inventory." return f"{cluster_name} missing nodes versus KB inventory: {', '.join(missing)}." def _should_short_circuit(prompt: str, fallback: str) -> bool: if not fallback: return False lower = (prompt or "").lower() for word in ("why", "explain", "architecture", "breakdown", "root cause", "plan"): if word in lower: return False return True def vm_top_restarts(hours: int = 1) -> str: q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" res = vm_query(q) if not res or (res.get("status") != "success"): return "" out: list[str] = [] for r in (res.get("data") or {}).get("result") or []: if not isinstance(r, dict): continue m = r.get("metric") or {} v = r.get("value") or [] ns = (m.get("namespace") or "").strip() pod = (m.get("pod") or "").strip() val = v[1] if isinstance(v, list) and len(v) > 1 else "" if pod: out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") return "\n".join(out) def vm_cluster_snapshot() -> str: parts: list[str] = [] # Node readiness (kube-state-metrics). ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') if ready and not_ready: try: r = _vm_value_series(ready)[0]["value"][1] nr = _vm_value_series(not_ready)[0]["value"][1] parts.append(f"- nodes ready: {r} (not ready: {nr})") except Exception: pass phases = vm_query("sum by (phase) (kube_pod_status_phase)") pr = vm_render_result(phases, limit=8) if pr: parts.append("Pod phases:") parts.append(pr) return "\n".join(parts).strip() def nodes_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} total = nodes.get("total") ready = nodes.get("ready") not_ready = nodes.get("not_ready") if isinstance(total, int) and isinstance(ready, int): not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" total = len(items) ready = 0 for node in items: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": ready += 1 break not_ready = max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." def nodes_names_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} names = nodes.get("names") if isinstance(names, list) and names: cleaned = sorted({str(n) for n in names if n}) if len(cleaned) <= 30: return f"{cluster_name} node names: {', '.join(cleaned)}." shown = ", ".join(cleaned[:30]) return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" names = [] for node in items: name = (node.get("metadata") or {}).get("name") or "" if name: names.append(name) names = sorted(set(names)) if not names: return "" if len(names) <= 30: return f"{cluster_name} node names: {', '.join(names)}." shown = ", ".join(names[:30]) return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)." def nodes_arch_summary(cluster_name: str, arch: str) -> str: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" normalized = (arch or "").strip().lower() if normalized in ("aarch64", "arm64"): arch_label = "arm64" elif normalized in ("x86_64", "x86-64", "amd64"): arch_label = "amd64" else: arch_label = normalized total = 0 for node in items: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("kubernetes.io/arch") == arch_label: total += 1 return f"{cluster_name} cluster has {total} {arch_label} nodes." def _strip_code_fence(text: str) -> str: cleaned = (text or "").strip() match = CODE_FENCE_RE.match(cleaned) if match: return match.group(1).strip() return cleaned def _normalize_reply(value: Any) -> str: if isinstance(value, dict): for key in ("content", "response", "reply", "message"): if key in value: return _normalize_reply(value[key]) for v in value.values(): if isinstance(v, (str, dict, list)): return _normalize_reply(v) return json.dumps(value, ensure_ascii=False) if isinstance(value, list): parts = [_normalize_reply(item) for item in value] return " ".join(p for p in parts if p) if value is None: return "" text = _strip_code_fence(str(value)) if text.startswith("{") and text.endswith("}"): try: return _normalize_reply(json.loads(text)) except Exception: return text return text # Conversation state. history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) def key_for(room_id: str, sender: str, is_dm: bool): return (room_id, None) if is_dm else (room_id, sender) def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str: parts: list[str] = [] kb = kb_retrieve(prompt) if kb: parts.append(kb) endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) inventory = node_inventory_context(prompt) if inventory: parts.append(inventory) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) for ns, name in (targets or []) + (edges or []): if ns and name: prefixes_by_ns[ns].add(name) pod_lines: list[str] = [] for ns in sorted(prefixes_by_ns.keys()): summary = summarize_pods(ns, prefixes_by_ns[ns]) if summary: pod_lines.append(f"Pods (live):\n{summary}") if pod_lines: parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) flux_bad = flux_not_ready() if flux_bad: parts.append("Flux (not ready):\n" + flux_bad) p_l = (prompt or "").lower() if any(w in p_l for w in METRIC_HINT_WORDS): restarts = vm_top_restarts(1) if restarts: parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) snap = vm_cluster_snapshot() if snap: parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) return "\n\n".join([p for p in parts if p]).strip() def _ollama_call(hist_key, prompt: str, *, context: str) -> str: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " "Prefer answering with exact repo paths and Kubernetes resource names. " "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " "If the answer is not grounded in the provided context or tool data, say you do not know." ) transcript_parts = [system] if context: transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) transcript_parts.extend(history[hist_key][-24:]) transcript_parts.append(f"User: {prompt}") transcript = "\n".join(transcript_parts) payload = {"model": MODEL, "message": transcript} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: data = json.loads(resp.read().decode()) raw_reply = data.get("message") or data.get("response") or data.get("reply") or data reply = _normalize_reply(raw_reply) or "I'm here to help." history[hist_key].append(f"Atlas: {reply}") return reply def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str: try: return _ollama_call(hist_key, prompt, context=context) except Exception: if fallback: history[hist_key].append(f"Atlas: {fallback}") return fallback return "Model backend is busy. Try again in a moment." def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() def worker(): result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback) done.set() thread = threading.Thread(target=worker, daemon=True) thread.start() if not done.wait(2.0): send_msg(token, room, "Thinking…") prompt_hint = " ".join((prompt or "").split()) if len(prompt_hint) > 160: prompt_hint = prompt_hint[:157] + "…" heartbeat = max(10, THINKING_INTERVAL_SEC) next_heartbeat = time.monotonic() + heartbeat while not done.wait(max(0, next_heartbeat - time.monotonic())): if prompt_hint: send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)") else: send_msg(token, room, "Still thinking (gathering context)…") next_heartbeat += heartbeat thread.join(timeout=1) return result["reply"] or fallback or "Model backend is busy. Try again in a moment." def sync_loop(token: str, room_id: str): since = None try: res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) since = res.get("next_batch") except Exception: pass while True: params = {"timeout": 30000} if since: params["since"] = since query = parse.urlencode(params) try: res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) except Exception: time.sleep(5) continue since = res.get("next_batch", since) # invites for rid, data in res.get("rooms", {}).get("invite", {}).items(): try: join_room(token, rid) except Exception: pass # messages for rid, data in res.get("rooms", {}).get("join", {}).items(): timeline = data.get("timeline", {}).get("events", []) joined_count = data.get("summary", {}).get("m.joined_member_count") is_dm = joined_count is not None and joined_count <= 2 for ev in timeline: if ev.get("type") != "m.room.message": continue content = ev.get("content", {}) body = (content.get("body", "") or "").strip() if not body: continue sender = ev.get("sender", "") if sender == f"@{USER}:live.bstein.dev": continue mentioned = is_mentioned(content, body) hist_key = key_for(rid, sender, is_dm) history[hist_key].append(f"{sender}: {body}") history[hist_key] = history[hist_key][-80:] if not (is_dm or mentioned): continue lower_body = body.lower() if re.search(r"\bhow many nodes\b|\bnode count\b|\bnumber of nodes\b", lower_body): if any(word in lower_body for word in ("cluster", "atlas", "titan")): summary = nodes_summary("Atlas") if not summary: send_msg(token, rid, "I couldn’t reach the cluster API to count nodes. Try again in a moment.") continue send_msg(token, rid, summary) continue if "worker" in lower_body and "node" in lower_body: ready_nodes, not_ready_nodes = worker_nodes_status() total = len(ready_nodes) + len(not_ready_nodes) if total: if any(word in lower_body for word in ("ready", "not ready", "unready")): if not_ready_nodes: send_msg( token, rid, f"Worker nodes not Ready: {', '.join(not_ready_nodes)}.", ) else: send_msg(token, rid, f"All {len(ready_nodes)} worker nodes are Ready.") continue if any(word in lower_body for word in ("how many", "should")): send_msg( token, rid, f"Atlas has {total} worker nodes; {len(ready_nodes)} Ready, {len(not_ready_nodes)} NotReady.", ) continue if "missing" in lower_body and "node" in lower_body: missing = missing_nodes_answer("Atlas") if missing: send_msg(token, rid, missing) continue inventory_answer = node_inventory_answer("Atlas", lower_body) if inventory_answer: send_msg(token, rid, inventory_answer) continue if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")): if any(word in lower_body for word in ("cluster", "atlas", "titan")): arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64" summary = nodes_arch_summary("Atlas", arch) if not summary: send_msg( token, rid, "I couldn’t reach the cluster API to count nodes by architecture. Try again in a moment.", ) continue send_msg(token, rid, summary) continue if re.search(r"\bnode names?\b|\bnodes? named\b|\bnaming\b", lower_body): if any(word in lower_body for word in ("cluster", "atlas", "titan")): names_summary = nodes_names_summary("Atlas") if not names_summary: send_msg(token, rid, "I couldn’t reach the cluster API to list node names. Try again in a moment.") continue send_msg(token, rid, names_summary) continue # Only do live cluster introspection in DMs; metrics can be answered when mentioned. allow_tools = is_dm allow_metrics = is_dm or mentioned promql = "" if allow_tools: m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) if m: promql = m.group(1).strip() # Attempt to scope tools to the most likely workloads when hostnames are mentioned. targets: list[tuple[str, str]] = [] for m in HOST_RE.finditer(body.lower()): host = m.group(1).lower() for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" for w in backend.get("workloads") or []: if isinstance(w, dict) and w.get("name"): targets.append((ns, str(w["name"]))) context = build_context(body, allow_tools=allow_tools, targets=targets) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics) if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context fallback = "" if "node" in lower_body or "cluster" in lower_body: fallback = node_inventory_answer("Atlas", lower_body) if metrics_fallback and not fallback: fallback = metrics_fallback if _should_short_circuit(body, fallback): send_msg(token, rid, fallback) continue reply = ollama_reply_with_thinking( token, rid, hist_key, body, context=context, fallback=fallback, ) send_msg(token, rid, reply) def login_with_retry(): last_err = None for attempt in range(10): try: return login() except Exception as exc: # noqa: BLE001 last_err = exc time.sleep(min(30, 2 ** attempt)) raise last_err def main(): load_kb() token = login_with_retry() try: room_id = resolve_alias(token, ROOM_ALIAS) join_room(token, room_id) except Exception: room_id = None sync_loop(token, room_id) if __name__ == "__main__": main()