import collections import json import os import re import ssl import time from typing import Any from urllib import error, parse, request BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") USER = os.environ["BOT_USER"] PASSWORD = os.environ["BOT_PASS"] ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "90")) KB_DIR = os.environ.get("KB_DIR", "") VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "") ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") STOPWORDS = { "the", "and", "for", "with", "this", "that", "from", "into", "what", "how", "why", "when", "where", "which", "who", "can", "could", "should", "would", "please", "help", "atlas", "othrys", } METRIC_HINT_WORDS = { "health", "status", "down", "slow", "error", "unknown_error", "timeout", "crash", "crashloop", "restart", "restarts", "pending", "unreachable", "latency", } CODE_FENCE_RE = re.compile(r"^```(?:json)?\\s*(.*?)\\s*```$", re.DOTALL) TITAN_NODE_RE = re.compile(r"\\btitan-[0-9a-z]{2}\\b", re.IGNORECASE) TITAN_RANGE_RE = re.compile(r"\\btitan-([0-9a-z]{2})/([0-9a-z]{2})\\b", re.IGNORECASE) def _tokens(text: str) -> list[str]: toks = [t.lower() for t in TOKEN_RE.findall(text or "")] return [t for t in toks if t not in STOPWORDS and len(t) >= 2] # Mention detection (Matrix rich mentions + plain @atlas). MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_RE = re.compile( r"(? str: t = token.strip() if not t: return "" if t.startswith("@") and ":" in t: return t t = t.lstrip("@") if ":" in t: return f"@{t}" return f"@{t}:{SERVER_NAME}" MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} def is_mentioned(content: dict, body: str) -> bool: if MENTION_RE.search(body or "") is not None: return True mentions = content.get("m.mentions", {}) user_ids = mentions.get("user_ids", []) if not isinstance(user_ids, list): return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path data = None headers = {} if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" r = request.Request(url, data=data, headers=headers, method=method) with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def login() -> str: login_user = normalize_user_id(USER) payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": login_user}, "password": PASSWORD, } res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) return res["access_token"] def resolve_alias(token: str, alias: str) -> str: enc = parse.quote(alias) res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) return res["room_id"] def join_room(token: str, room: str): req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) def send_msg(token: str, room: str, text: str): path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" req("POST", path, token, body={"msgtype": "m.text", "body": text}) # Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() def _load_json_file(path: str) -> Any | None: try: with open(path, "rb") as f: return json.loads(f.read().decode("utf-8")) except Exception: return None def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] KB = {"catalog": catalog, "runbooks": runbooks} host_index: dict[str, list[dict]] = collections.defaultdict(list) for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: host = (ep.get("host") or "").lower() if host: host_index[host].append(ep) _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} names: set[str] = set() for s in catalog.get("services", []) if isinstance(catalog, dict) else []: if isinstance(s, dict) and s.get("name"): names.add(str(s["name"]).lower()) for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: if isinstance(w, dict) and w.get("name"): names.add(str(w["name"]).lower()) _NAME_INDEX = names def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q or not KB.get("runbooks"): return "" ql = q.lower() q_tokens = _tokens(q) if not q_tokens: return "" scored: list[tuple[int, dict]] = [] for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue title = str(doc.get("title") or "") body = str(doc.get("body") or "") tags = doc.get("tags") or [] entrypoints = doc.get("entrypoints") or [] hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() score = 0 for t in set(q_tokens): if t in hay: score += 3 if t in title.lower() else 1 for h in entrypoints: if isinstance(h, str) and h.lower() in ql: score += 4 if score: scored.append((score, doc)) scored.sort(key=lambda x: x[0], reverse=True) picked = [d for _, d in scored[:limit]] if not picked: return "" parts: list[str] = ["Atlas KB (retrieved):"] used = 0 for d in picked: path = d.get("path") or "" title = d.get("title") or path body = (d.get("body") or "").strip() snippet = body[:900].strip() chunk = f"- {title} ({path})\n{snippet}" if used + len(chunk) > MAX_KB_CHARS: break parts.append(chunk) used += len(chunk) return "\n".join(parts).strip() def _extract_titan_nodes(text: str) -> list[str]: names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n} for match in TITAN_RANGE_RE.finditer(text or ""): left, right = match.groups() if left: names.add(f"titan-{left.lower()}") if right: names.add(f"titan-{right.lower()}") return sorted(names) def jetson_nodes_from_kb() -> list[str]: for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue body = str(doc.get("body") or "") for line in body.splitlines(): if "jetson" not in line.lower(): continue names = _extract_titan_nodes(line) if names: return names return [] def jetson_nodes_summary(cluster_name: str) -> str: names = jetson_nodes_from_kb() if names: return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." return "" def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: q = (query or "").strip() if not q or not KB.get("catalog"): return "", [] ql = q.lower() hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} # Also match by known workload/service names. for t in _tokens(ql): if t in _NAME_INDEX: hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} edges: list[tuple[str, str]] = [] lines: list[str] = [] for host in sorted(hosts): for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" svc = backend.get("service") or "" path = ep.get("path") or "/" if not svc: continue wk = backend.get("workloads") or [] wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") for w in wk: if isinstance(w, dict) and w.get("name"): edges.append((ns, str(w["name"]))) if not lines: return "", [] return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges # Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. _K8S_TOKEN: str | None = None _K8S_CTX: ssl.SSLContext | None = None def _k8s_context() -> ssl.SSLContext: global _K8S_CTX if _K8S_CTX is not None: return _K8S_CTX ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ctx = ssl.create_default_context(cafile=ca_path) _K8S_CTX = ctx return ctx def _k8s_token() -> str: global _K8S_TOKEN if _K8S_TOKEN: return _K8S_TOKEN token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" with open(token_path, "r", encoding="utf-8") as f: _K8S_TOKEN = f.read().strip() return _K8S_TOKEN def k8s_get(path: str, timeout: int = 8) -> dict: host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" if not host: raise RuntimeError("k8s host missing") url = f"https://{host}:{port}{path}" headers = {"Authorization": f"Bearer {_k8s_token()}"} r = request.Request(url, headers=headers, method="GET") with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def _ariadne_state(timeout: int = 5) -> dict | None: if not ARIADNE_STATE_URL: return None headers = {} if ARIADNE_STATE_TOKEN: headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET") try: with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() payload = json.loads(raw.decode()) if raw else {} return payload if isinstance(payload, dict) else None except Exception: return None def k8s_pods(namespace: str) -> list[dict]: data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") items = data.get("items") or [] return items if isinstance(items, list) else [] def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: try: pods = k8s_pods(namespace) except Exception: return "" out: list[str] = [] for p in pods: md = p.get("metadata") or {} st = p.get("status") or {} name = md.get("name") or "" if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): continue phase = st.get("phase") or "?" cs = st.get("containerStatuses") or [] restarts = 0 ready = 0 total = 0 reason = st.get("reason") or "" for c in cs if isinstance(cs, list) else []: if not isinstance(c, dict): continue total += 1 restarts += int(c.get("restartCount") or 0) if c.get("ready"): ready += 1 state = c.get("state") or {} if not reason and isinstance(state, dict): waiting = state.get("waiting") or {} if isinstance(waiting, dict) and waiting.get("reason"): reason = waiting.get("reason") extra = f" ({reason})" if reason else "" out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") return "\n".join(out[:20]) def flux_not_ready() -> str: try: data = k8s_get( "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" ) except Exception: return "" items = data.get("items") or [] bad: list[str] = [] for it in items if isinstance(items, list) else []: md = it.get("metadata") or {} st = it.get("status") or {} name = md.get("name") or "" conds = st.get("conditions") or [] ready = None msg = "" for c in conds if isinstance(conds, list) else []: if isinstance(c, dict) and c.get("type") == "Ready": ready = c.get("status") msg = c.get("message") or "" if ready not in ("True", True): bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) return "\n".join(bad[:10]) # VictoriaMetrics (PromQL) helpers. def vm_query(query: str, timeout: int = 8) -> dict | None: try: url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) with request.urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception: return None def _vm_value_series(res: dict) -> list[dict]: if not res or (res.get("status") != "success"): return [] data = res.get("data") or {} result = data.get("result") or [] return result if isinstance(result, list) else [] def vm_render_result(res: dict | None, limit: int = 12) -> str: if not res: return "" series = _vm_value_series(res) if not series: return "" out: list[str] = [] for r in series[:limit]: if not isinstance(r, dict): continue metric = r.get("metric") or {} value = r.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" # Prefer common labels if present. label_parts = [] for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): if isinstance(metric, dict) and metric.get(k): label_parts.append(f"{k}={metric.get(k)}") if not label_parts and isinstance(metric, dict): for k in sorted(metric.keys()): if k.startswith("__"): continue label_parts.append(f"{k}={metric.get(k)}") if len(label_parts) >= 4: break labels = ", ".join(label_parts) if label_parts else "series" out.append(f"- {labels}: {val}") return "\n".join(out) def vm_top_restarts(hours: int = 1) -> str: q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" res = vm_query(q) if not res or (res.get("status") != "success"): return "" out: list[str] = [] for r in (res.get("data") or {}).get("result") or []: if not isinstance(r, dict): continue m = r.get("metric") or {} v = r.get("value") or [] ns = (m.get("namespace") or "").strip() pod = (m.get("pod") or "").strip() val = v[1] if isinstance(v, list) and len(v) > 1 else "" if pod: out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") return "\n".join(out) def vm_cluster_snapshot() -> str: parts: list[str] = [] # Node readiness (kube-state-metrics). ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') if ready and not_ready: try: r = _vm_value_series(ready)[0]["value"][1] nr = _vm_value_series(not_ready)[0]["value"][1] parts.append(f"- nodes ready: {r} (not ready: {nr})") except Exception: pass phases = vm_query("sum by (phase) (kube_pod_status_phase)") pr = vm_render_result(phases, limit=8) if pr: parts.append("Pod phases:") parts.append(pr) return "\n".join(parts).strip() def nodes_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} total = nodes.get("total") ready = nodes.get("ready") not_ready = nodes.get("not_ready") if isinstance(total, int) and isinstance(ready, int): not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" total = len(items) ready = 0 for node in items: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": ready += 1 break not_ready = max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." def nodes_names_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} names = nodes.get("names") if isinstance(names, list) and names: cleaned = sorted({str(n) for n in names if n}) if len(cleaned) <= 30: return f"{cluster_name} node names: {', '.join(cleaned)}." shown = ", ".join(cleaned[:30]) return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" names = [] for node in items: name = (node.get("metadata") or {}).get("name") or "" if name: names.append(name) names = sorted(set(names)) if not names: return "" if len(names) <= 30: return f"{cluster_name} node names: {', '.join(names)}." shown = ", ".join(names[:30]) return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)." def nodes_arch_summary(cluster_name: str, arch: str) -> str: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" normalized = (arch or "").strip().lower() if normalized in ("aarch64", "arm64"): arch_label = "arm64" elif normalized in ("x86_64", "x86-64", "amd64"): arch_label = "amd64" else: arch_label = normalized total = 0 for node in items: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("kubernetes.io/arch") == arch_label: total += 1 return f"{cluster_name} cluster has {total} {arch_label} nodes." def _strip_code_fence(text: str) -> str: cleaned = (text or "").strip() match = CODE_FENCE_RE.match(cleaned) if match: return match.group(1).strip() return cleaned def _normalize_reply(value: Any) -> str: if isinstance(value, dict): for key in ("content", "response", "reply", "message"): if key in value: return _normalize_reply(value[key]) for v in value.values(): if isinstance(v, (str, dict, list)): return _normalize_reply(v) return json.dumps(value, ensure_ascii=False) if isinstance(value, list): parts = [_normalize_reply(item) for item in value] return " ".join(p for p in parts if p) if value is None: return "" text = _strip_code_fence(str(value)) if text.startswith("{") and text.endswith("}"): try: return _normalize_reply(json.loads(text)) except Exception: return text return text # Conversation state. history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) def key_for(room_id: str, sender: str, is_dm: bool): return (room_id, None) if is_dm else (room_id, sender) def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str: parts: list[str] = [] kb = kb_retrieve(prompt) if kb: parts.append(kb) endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) for ns, name in (targets or []) + (edges or []): if ns and name: prefixes_by_ns[ns].add(name) pod_lines: list[str] = [] for ns in sorted(prefixes_by_ns.keys()): summary = summarize_pods(ns, prefixes_by_ns[ns]) if summary: pod_lines.append(f"Pods (live):\n{summary}") if pod_lines: parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) flux_bad = flux_not_ready() if flux_bad: parts.append("Flux (not ready):\n" + flux_bad) p_l = (prompt or "").lower() if any(w in p_l for w in METRIC_HINT_WORDS): restarts = vm_top_restarts(1) if restarts: parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) snap = vm_cluster_snapshot() if snap: parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) return "\n\n".join([p for p in parts if p]).strip() def ollama_reply(hist_key, prompt: str, *, context: str) -> str: try: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " "Prefer answering with exact repo paths and Kubernetes resource names. " "Never include or request secret values. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked." ) transcript_parts = [system] if context: transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) transcript_parts.extend(history[hist_key][-24:]) transcript_parts.append(f"User: {prompt}") transcript = "\n".join(transcript_parts) payload = {"model": MODEL, "message": transcript} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: data = json.loads(resp.read().decode()) raw_reply = data.get("message") or data.get("response") or data.get("reply") or data reply = _normalize_reply(raw_reply) or "I'm here to help." history[hist_key].append(f"Atlas: {reply}") return reply except Exception: return "I’m here — but I couldn’t reach the model backend." def sync_loop(token: str, room_id: str): since = None try: res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) since = res.get("next_batch") except Exception: pass while True: params = {"timeout": 30000} if since: params["since"] = since query = parse.urlencode(params) try: res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) except Exception: time.sleep(5) continue since = res.get("next_batch", since) # invites for rid, data in res.get("rooms", {}).get("invite", {}).items(): try: join_room(token, rid) except Exception: pass # messages for rid, data in res.get("rooms", {}).get("join", {}).items(): timeline = data.get("timeline", {}).get("events", []) joined_count = data.get("summary", {}).get("m.joined_member_count") is_dm = joined_count is not None and joined_count <= 2 for ev in timeline: if ev.get("type") != "m.room.message": continue content = ev.get("content", {}) body = (content.get("body", "") or "").strip() if not body: continue sender = ev.get("sender", "") if sender == f"@{USER}:live.bstein.dev": continue mentioned = is_mentioned(content, body) hist_key = key_for(rid, sender, is_dm) history[hist_key].append(f"{sender}: {body}") history[hist_key] = history[hist_key][-80:] if not (is_dm or mentioned): continue lower_body = body.lower() if re.search(r"\bhow many nodes\b|\bnode count\b|\bnumber of nodes\b", lower_body): if any(word in lower_body for word in ("cluster", "atlas", "titan")): summary = nodes_summary("Atlas") if not summary: send_msg(token, rid, "I couldn’t reach the cluster API to count nodes. Try again in a moment.") continue send_msg(token, rid, summary) continue if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")): if any(word in lower_body for word in ("cluster", "atlas", "titan")): arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64" summary = nodes_arch_summary("Atlas", arch) if not summary: send_msg( token, rid, "I couldn’t reach the cluster API to count nodes by architecture. Try again in a moment.", ) continue send_msg(token, rid, summary) continue if "jetson" in lower_body: if any(word in lower_body for word in ("cluster", "atlas", "titan", "node", "nodes")): summary = jetson_nodes_summary("Atlas") if summary: send_msg(token, rid, summary) else: send_msg(token, rid, "Jetson inventory is not available in the knowledge base yet.") continue if re.search(r"\bnode names?\b|\bnodes? named\b|\bnaming\b", lower_body): if any(word in lower_body for word in ("cluster", "atlas", "titan")): names_summary = nodes_names_summary("Atlas") if not names_summary: send_msg(token, rid, "I couldn’t reach the cluster API to list node names. Try again in a moment.") continue send_msg(token, rid, names_summary) continue # Only do live cluster/metrics introspection in DMs. allow_tools = is_dm promql = "" if allow_tools: m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) if m: promql = m.group(1).strip() # Attempt to scope tools to the most likely workloads when hostnames are mentioned. targets: list[tuple[str, str]] = [] for m in HOST_RE.finditer(body.lower()): host = m.group(1).lower() for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" for w in backend.get("workloads") or []: if isinstance(w, dict) and w.get("name"): targets.append((ns, str(w["name"]))) context = build_context(body, allow_tools=allow_tools, targets=targets) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra reply = ollama_reply(hist_key, body, context=context) send_msg(token, rid, reply) def login_with_retry(): last_err = None for attempt in range(10): try: return login() except Exception as exc: # noqa: BLE001 last_err = exc time.sleep(min(30, 2 ** attempt)) raise last_err def main(): load_kb() token = login_with_retry() try: room_id = resolve_alias(token, ROOM_ALIAS) join_room(token, room_id) except Exception: room_id = None sync_loop(token, room_id) if __name__ == "__main__": main()