import collections import json import os import re import ssl import threading import time from typing import Any from urllib import error, parse, request BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") USER = os.environ["BOT_USER"] PASSWORD = os.environ["BOT_PASS"] ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480")) ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090")) ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "") KB_DIR = os.environ.get("KB_DIR", "") VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "") ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120")) TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") STOPWORDS = { "the", "and", "for", "with", "this", "that", "from", "into", "what", "how", "why", "when", "where", "which", "who", "can", "could", "should", "would", "please", "help", "atlas", "othrys", } METRIC_HINT_WORDS = { "bandwidth", "connections", "cpu", "database", "db", "disk", "health", "memory", "network", "node", "nodes", "postgres", "status", "storage", "usage", "down", "slow", "error", "unknown_error", "timeout", "crash", "crashloop", "restart", "restarts", "pending", "unreachable", "latency", } CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL) TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE) TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE) _DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D" HOTTEST_QUERIES = { "cpu": "label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m]))) * 100) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")", "ram": "label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")", "net": "label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~\"lo\"}[5m]) + rate(node_network_transmit_bytes_total{device!~\"lo\"}[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")", "io": "label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")", } def normalize_query(text: str) -> str: cleaned = (text or "").lower() for ch in _DASH_CHARS: cleaned = cleaned.replace(ch, "-") cleaned = re.sub(r"\s+", " ", cleaned).strip() return cleaned def _tokens(text: str) -> list[str]: toks = [t.lower() for t in TOKEN_RE.findall(text or "")] return [t for t in toks if t not in STOPWORDS and len(t) >= 2] # Mention detection (Matrix rich mentions + plain @atlas). MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_RE = re.compile( r"(? str: t = token.strip() if not t: return "" if t.startswith("@") and ":" in t: return t t = t.lstrip("@") if ":" in t: return f"@{t}" return f"@{t}:{SERVER_NAME}" MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} def _body_mentions_token(body: str) -> bool: lower = (body or "").strip().lower() if not lower: return False for token in MENTION_LOCALPARTS: for prefix in (token, f"@{token}"): if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "): return True return False def is_mentioned(content: dict, body: str) -> bool: if MENTION_RE.search(body or "") is not None: return True if _body_mentions_token(body or ""): return True mentions = content.get("m.mentions", {}) user_ids = mentions.get("user_ids", []) if not isinstance(user_ids, list): return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path data = None headers = {} if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" r = request.Request(url, data=data, headers=headers, method=method) with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def login() -> str: login_user = normalize_user_id(USER) payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": login_user}, "password": PASSWORD, } res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) return res["access_token"] def resolve_alias(token: str, alias: str) -> str: enc = parse.quote(alias) res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) return res["room_id"] def join_room(token: str, room: str): req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) def send_msg(token: str, room: str, text: str): path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" req("POST", path, token, body={"msgtype": "m.text", "body": text}) # Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() _METRIC_INDEX: list[dict[str, Any]] = [] NODE_REGEX = re.compile(r'node=~"([^"]+)"') def _load_json_file(path: str) -> Any | None: try: with open(path, "rb") as f: return json.loads(f.read().decode("utf-8")) except Exception: return None def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or [] KB = {"catalog": catalog, "runbooks": runbooks} host_index: dict[str, list[dict]] = collections.defaultdict(list) for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: host = (ep.get("host") or "").lower() if host: host_index[host].append(ep) _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} names: set[str] = set() for s in catalog.get("services", []) if isinstance(catalog, dict) else []: if isinstance(s, dict) and s.get("name"): names.add(str(s["name"]).lower()) for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: if isinstance(w, dict) and w.get("name"): names.add(str(w["name"]).lower()) _NAME_INDEX = names _METRIC_INDEX = metrics if isinstance(metrics, list) else [] def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q or not KB.get("runbooks"): return "" ql = q.lower() q_tokens = _tokens(q) if not q_tokens: return "" scored: list[tuple[int, dict]] = [] for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue title = str(doc.get("title") or "") body = str(doc.get("body") or "") tags = doc.get("tags") or [] entrypoints = doc.get("entrypoints") or [] hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() score = 0 for t in set(q_tokens): if t in hay: score += 3 if t in title.lower() else 1 for h in entrypoints: if isinstance(h, str) and h.lower() in ql: score += 4 if score: scored.append((score, doc)) scored.sort(key=lambda x: x[0], reverse=True) picked = [d for _, d in scored[:limit]] if not picked: return "" parts: list[str] = ["Atlas KB (retrieved):"] used = 0 for d in picked: path = d.get("path") or "" title = d.get("title") or path body = (d.get("body") or "").strip() snippet = body[:900].strip() chunk = f"- {title} ({path})\n{snippet}" if used + len(chunk) > MAX_KB_CHARS: break parts.append(chunk) used += len(chunk) return "\n".join(parts).strip() def _extract_titan_nodes(text: str) -> list[str]: cleaned = normalize_query(text) names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n} for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE): tail = match.group(1) for part in re.split(r"[/,]", tail): part = part.strip() if part: names.add(f"titan-{part.lower()}") for match in TITAN_RANGE_RE.finditer(cleaned): left, right = match.groups() if left: names.add(f"titan-{left.lower()}") if right: names.add(f"titan-{right.lower()}") return sorted(names) def _humanize_rate(value: str, *, unit: str) -> str: try: val = float(value) except (TypeError, ValueError): return value if unit == "%": return f"{val:.1f}%" if val >= 1024 * 1024: return f"{val / (1024 * 1024):.2f} MB/s" if val >= 1024: return f"{val / 1024:.2f} KB/s" return f"{val:.2f} B/s" def _hottest_query(metric: str, node_regex: str | None) -> str: expr = HOTTEST_QUERIES[metric] if node_regex: needle = 'node_uname_info{nodename!=""}' replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}' return expr.replace(needle, replacement) return expr def _vm_hottest(metric: str, node_regex: str | None) -> tuple[str, str] | None: expr = _hottest_query(metric, node_regex) res = vm_query(expr) series = _vm_value_series(res) if not series: return None first = series[0] labels = first.get("metric") or {} value = first.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" node = labels.get("node") or labels.get("__name__") or "" if not node: return None return (str(node), str(val)) def _hottest_answer(q: str, *, nodes: list[str] | None) -> str: metric = None assumed_cpu = False if "cpu" in q: metric = "cpu" elif "ram" in q or "memory" in q: metric = "ram" elif "net" in q or "network" in q: metric = "net" elif "io" in q or "disk" in q or "storage" in q: metric = "io" if metric is None: metric = "cpu" assumed_cpu = True if nodes is not None and not nodes: return "No nodes match the requested hardware class." node_regex = "|".join(nodes) if nodes else None metrics = [metric] lines: list[str] = [] for m in metrics: picked = _vm_hottest(m, node_regex) if not picked: continue node, val = picked unit = "%" if m in ("cpu", "ram") else "B/s" val_str = _humanize_rate(val, unit=unit) label = {"cpu": "CPU", "ram": "RAM", "net": "NET", "io": "I/O"}[m] lines.append(f"{label}: {node} ({val_str})") if not lines: return "" label = metric.upper() suffix = " (defaulting to CPU)" if assumed_cpu else "" return f"Hottest node by {label}: {lines[0].split(': ', 1)[1]}.{suffix}" def _node_roles(labels: dict[str, Any]) -> list[str]: roles: list[str] = [] for key in labels.keys(): if key.startswith("node-role.kubernetes.io/"): role = key.split("/", 1)[-1] if role: roles.append(role) return sorted(set(roles)) def _hardware_class(labels: dict[str, Any]) -> str: if str(labels.get("jetson") or "").lower() == "true": return "jetson" hardware = (labels.get("hardware") or "").strip().lower() if hardware in ("rpi4", "rpi5"): return hardware arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "" if arch == "amd64": return "amd64" if arch == "arm64": return "arm64-unknown" return "unknown" def node_inventory_live() -> list[dict[str, Any]]: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return [] items = data.get("items") or [] inventory: list[dict[str, Any]] = [] for node in items if isinstance(items, list) else []: meta = node.get("metadata") or {} labels = meta.get("labels") or {} name = meta.get("name") or "" if not name: continue inventory.append( { "name": name, "arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "", "hardware": _hardware_class(labels), "roles": _node_roles(labels), "is_worker": _node_is_worker(node), "ready": _node_ready_status(node), } ) return sorted(inventory, key=lambda item: item["name"]) def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: grouped: dict[str, list[str]] = collections.defaultdict(list) for node in inventory: grouped[node.get("hardware") or "unknown"].append(node["name"]) return {k: sorted(v) for k, v in grouped.items()} def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str: q = normalize_query(query) if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")): return "" if inventory is None: inventory = node_inventory_live() if not inventory: return "" groups = _group_nodes(inventory) total = len(inventory) ready = sum(1 for node in inventory if node.get("ready") is True) not_ready = sum(1 for node in inventory if node.get("ready") is False) lines: list[str] = [ "Node inventory (live):", f"- total: {total}, ready: {ready}, not ready: {not_ready}", ] for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): if key in groups: lines.append(f"- {key}: {', '.join(groups[key])}") non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", []))) if non_rpi: lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}") unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", []) if unknowns: lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.") expected_workers = expected_worker_nodes_from_metrics() if expected_workers: ready_workers, not_ready_workers = worker_nodes_status() missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers)) lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}") lines.append(f"- workers_ready: {', '.join(ready_workers)}") if not_ready_workers: lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}") if missing: lines.append(f"- workers_missing (derived): {', '.join(missing)}") return "\n".join(lines) def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]: q = normalize_query(prompt) if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")): return node_inventory_live() return [] def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]: names = [node["name"] for node in inventory] ready = [node["name"] for node in inventory if node.get("ready") is True] not_ready = [node["name"] for node in inventory if node.get("ready") is False] groups = _group_nodes(inventory) workers = [node for node in inventory if node.get("is_worker") is True] worker_names = [node["name"] for node in workers] worker_ready = [node["name"] for node in workers if node.get("ready") is True] worker_not_ready = [node["name"] for node in workers if node.get("ready") is False] expected_workers = expected_worker_nodes_from_metrics() expected_ready = [n for n in expected_workers if n in ready] if expected_workers else [] expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else [] expected_missing = [n for n in expected_workers if n not in names] if expected_workers else [] return { "names": sorted(names), "ready": sorted(ready), "not_ready": sorted(not_ready), "groups": groups, "worker_names": sorted(worker_names), "worker_ready": sorted(worker_ready), "worker_not_ready": sorted(worker_not_ready), "expected_workers": expected_workers, "expected_ready": sorted(expected_ready), "expected_not_ready": sorted(expected_not_ready), "expected_missing": sorted(expected_missing), } def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str) -> str: q = normalize_query(prompt) if metrics_summary and any(word in q for word in ("postgres", "connection", "connections", "db")): return metrics_summary if not inventory: return "" sets = _inventory_sets(inventory) names = sets["names"] ready = sets["ready"] not_ready = sets["not_ready"] groups = sets["groups"] worker_names = sets["worker_names"] worker_ready = sets["worker_ready"] worker_not_ready = sets["worker_not_ready"] expected_workers = sets["expected_workers"] expected_ready = sets["expected_ready"] expected_not_ready = sets["expected_not_ready"] expected_missing = sets["expected_missing"] total = len(names) nodes_in_query = _extract_titan_nodes(q) rpi_nodes = set(groups.get("rpi4", [])) | set(groups.get("rpi5", [])) non_rpi = set(groups.get("jetson", [])) | set(groups.get("amd64", [])) unknown_hw = set(groups.get("arm64-unknown", [])) | set(groups.get("unknown", [])) if "hottest" in q or "hot" in q: filter_nodes: list[str] | None = None if "amd64" in q or "x86" in q: filter_nodes = sorted(groups.get("amd64", [])) elif "jetson" in q: filter_nodes = sorted(groups.get("jetson", [])) elif "raspberry" in q or "rpi" in q: filter_nodes = sorted(rpi_nodes) elif "arm64" in q: filter_nodes = sorted([n for n in names if n not in groups.get("amd64", [])]) hottest = _hottest_answer(q, nodes=filter_nodes) if hottest: return hottest return "Unable to determine hottest nodes right now (metrics unavailable)." if nodes_in_query and ("raspberry" in q or "rpi" in q): parts: list[str] = [] for node in nodes_in_query: if node in rpi_nodes: parts.append(f"{node} is a Raspberry Pi node.") elif node in non_rpi: parts.append(f"{node} is not a Raspberry Pi node.") elif node in names: parts.append(f"{node} is in Atlas but hardware is unknown.") else: parts.append(f"{node} is not in the Atlas cluster.") return " ".join(parts) if nodes_in_query and "jetson" in q: jets = set(groups.get("jetson", [])) parts = [] for node in nodes_in_query: if node in jets: parts.append(f"{node} is a Jetson node.") elif node in names: parts.append(f"{node} is not a Jetson node.") else: parts.append(f"{node} is not in the Atlas cluster.") return " ".join(parts) if nodes_in_query and ("is" in q or "part of" in q or "in atlas" in q or "in cluster" in q or "present" in q or "exist" in q): parts: list[str] = [] for node in nodes_in_query: if node in names: parts.append(f"Yes. {node} is in the Atlas cluster.") else: parts.append(f"No. {node} is not in the Atlas cluster.") return " ".join(parts) if any(term in q for term in ("non-raspberry", "non raspberry", "not raspberry", "non-rpi", "non rpi")): non_rpi_sorted = sorted(non_rpi) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(non_rpi_sorted)} non‑Raspberry Pi nodes." if any(phrase in q for phrase in ("besides jetson", "excluding jetson", "without jetson", "non jetson")): amd = sorted(groups.get("amd64", [])) return f"Non‑Raspberry Pi nodes (excluding Jetson): {', '.join(amd)}." if amd else "No non‑Raspberry Pi nodes outside Jetson." return f"Non‑Raspberry Pi nodes: {', '.join(non_rpi_sorted)}." if non_rpi_sorted else "No non‑Raspberry Pi nodes found." if "jetson" in q: jets = groups.get("jetson", []) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(jets)} Jetson nodes." return f"Jetson nodes: {', '.join(jets)}." if jets else "No Jetson nodes found." if "amd64" in q or "x86" in q: amd = groups.get("amd64", []) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(amd)} amd64 nodes." return f"amd64 nodes: {', '.join(amd)}." if amd else "No amd64 nodes found." if "arm64" in q and "node" in q and any(word in q for word in ("how many", "count", "number")): count = sum(1 for node in inventory if node.get("arch") == "arm64") return f"Atlas has {count} arm64 nodes." if "rpi4" in q: rpi4 = groups.get("rpi4", []) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(rpi4)} rpi4 nodes." return f"rpi4 nodes: {', '.join(rpi4)}." if rpi4 else "No rpi4 nodes found." if "rpi5" in q: rpi5 = groups.get("rpi5", []) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(rpi5)} rpi5 nodes." return f"rpi5 nodes: {', '.join(rpi5)}." if rpi5 else "No rpi5 nodes found." if "raspberry" in q or "rpi" in q: rpi = sorted(rpi_nodes) if any(word in q for word in ("how many", "count", "number")): return f"Atlas has {len(rpi)} Raspberry Pi nodes." return f"Raspberry Pi nodes: {', '.join(rpi)}." if rpi else "No Raspberry Pi nodes found." if "arm64-unknown" in q or "unknown" in q or "no hardware" in q: unknown = sorted(unknown_hw) return f"Unknown hardware nodes: {', '.join(unknown)}." if unknown else "No unknown hardware labels." if ("notready" in q or "not ready" in q or "unready" in q) and ("node" in q or "nodes" in q): return "Not ready nodes: " + (", ".join(not_ready) if not_ready else "none") + "." if "worker" in q and ("node" in q or "nodes" in q or "workers" in q): not_ready_query = "not ready" in q or "unready" in q or "down" in q or ("not" in q and "ready" in q) if expected_workers: if "missing" in q: return "Missing worker nodes: " + (", ".join(expected_missing) if expected_missing else "none") + "." if "ready" in q and ("not ready" in q or "vs" in q or "versus" in q): return ( f"Expected workers: {len(expected_ready)} ready, " f"{len(expected_not_ready)} not ready (expected {len(expected_workers)})." ) if any(word in q for word in ("how many", "count", "number")) and ("expect" in q or "expected" in q or "should" in q): msg = f"Grafana inventory expects {len(expected_workers)} worker nodes." if expected_missing: msg += f" Missing: {', '.join(expected_missing)}." return msg if not_ready_query: if expected_not_ready or expected_missing: detail = [] if expected_not_ready: detail.append(f"Not ready: {', '.join(expected_not_ready)}") if expected_missing: detail.append(f"Missing: {', '.join(expected_missing)}") return "Worker nodes needing attention. " + " ".join(detail) + "." return "All expected worker nodes are Ready." if any(word in q for word in ("expected", "expect", "should")): msg = f"Grafana inventory expects {len(expected_workers)} worker nodes." if expected_missing: msg += f" Missing: {', '.join(expected_missing)}." return msg if any(word in q for word in ("how many", "count", "number")): return f"Worker nodes: {len(expected_ready)} ready, {len(expected_not_ready)} not ready (expected {len(expected_workers)})." if "ready" in q: return f"Ready worker nodes ({len(expected_ready)}): {', '.join(expected_ready)}." if not_ready_query: return "Worker nodes not ready: " + (", ".join(worker_not_ready) if worker_not_ready else "none") + "." if any(word in q for word in ("how many", "count", "number")): return f"Worker nodes: {len(worker_ready)} ready, {len(worker_not_ready)} not ready." return "Ready worker nodes ({}): {}.".format(len(worker_ready), ", ".join(worker_ready)) if any(word in q for word in ("how many", "count", "number")) and "node" in q: return f"Atlas has {total} nodes; {len(ready)} ready, {len(not_ready)} not ready." if "node names" in q or ("nodes" in q and "named" in q) or "naming" in q: return "Atlas node names: " + ", ".join(names) + "." if "ready" in q and "node" in q: return f"Ready nodes ({len(ready)}): {', '.join(ready)}." return "" def _metric_tokens(entry: dict[str, Any]) -> str: parts: list[str] = [] for key in ("panel_title", "dashboard", "description"): val = entry.get(key) if isinstance(val, str) and val: parts.append(val.lower()) tags = entry.get("tags") if isinstance(tags, list): parts.extend(str(t).lower() for t in tags if t) return " ".join(parts) def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]: q_tokens = _tokens(query) if not q_tokens or not _METRIC_INDEX: return [] scored: list[tuple[int, dict[str, Any]]] = [] for entry in _METRIC_INDEX: if not isinstance(entry, dict): continue hay = _metric_tokens(entry) if not hay: continue score = 0 for t in set(q_tokens): if t in hay: score += 2 if t in (entry.get("panel_title") or "").lower() else 1 if score: scored.append((score, entry)) scored.sort(key=lambda item: item[0], reverse=True) return [entry for _, entry in scored[:limit]] def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]: if not allow_tools: return "", "" lower = (prompt or "").lower() if not any(word in lower for word in METRIC_HINT_WORDS): return "", "" matches = metrics_lookup(prompt, limit=1) if not matches: return "", "" entry = matches[0] dashboard = entry.get("dashboard") or "dashboard" panel = entry.get("panel_title") or "panel" exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else [] if not exprs: return "", "" rendered_parts: list[str] = [] for expr in exprs[:2]: res = vm_query(expr, timeout=20) rendered = vm_render_result(res, limit=10) if rendered: rendered_parts.append(rendered) if not rendered_parts: return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data." summary = "\n".join(rendered_parts) context = f"Metrics (from {dashboard} / {panel}):\n{summary}" fallback = _metrics_fallback_summary(panel, summary) return context, fallback def jetson_nodes_from_kb() -> list[str]: for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue body = str(doc.get("body") or "") for line in body.splitlines(): if "jetson" not in line.lower(): continue names = _extract_titan_nodes(line) if names: return names return [] def jetson_nodes_summary(cluster_name: str) -> str: names = jetson_nodes_from_kb() if names: return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." return "" def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: q = (query or "").strip() if not q or not KB.get("catalog"): return "", [] ql = q.lower() hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} # Also match by known workload/service names. for t in _tokens(ql): if t in _NAME_INDEX: hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} edges: list[tuple[str, str]] = [] lines: list[str] = [] for host in sorted(hosts): for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" svc = backend.get("service") or "" path = ep.get("path") or "/" if not svc: continue wk = backend.get("workloads") or [] wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") for w in wk: if isinstance(w, dict) and w.get("name"): edges.append((ns, str(w["name"]))) if not lines: return "", [] return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges # Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. _K8S_TOKEN: str | None = None _K8S_CTX: ssl.SSLContext | None = None def _k8s_context() -> ssl.SSLContext: global _K8S_CTX if _K8S_CTX is not None: return _K8S_CTX ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ctx = ssl.create_default_context(cafile=ca_path) _K8S_CTX = ctx return ctx def _k8s_token() -> str: global _K8S_TOKEN if _K8S_TOKEN: return _K8S_TOKEN token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" with open(token_path, "r", encoding="utf-8") as f: _K8S_TOKEN = f.read().strip() return _K8S_TOKEN def k8s_get(path: str, timeout: int = 8) -> dict: host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" if not host: raise RuntimeError("k8s host missing") url = f"https://{host}:{port}{path}" headers = {"Authorization": f"Bearer {_k8s_token()}"} r = request.Request(url, headers=headers, method="GET") with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def _ariadne_state(timeout: int = 5) -> dict | None: if not ARIADNE_STATE_URL: return None headers = {} if ARIADNE_STATE_TOKEN: headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET") try: with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() payload = json.loads(raw.decode()) if raw else {} return payload if isinstance(payload, dict) else None except Exception: return None def k8s_pods(namespace: str) -> list[dict]: data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") items = data.get("items") or [] return items if isinstance(items, list) else [] def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: try: pods = k8s_pods(namespace) except Exception: return "" out: list[str] = [] for p in pods: md = p.get("metadata") or {} st = p.get("status") or {} name = md.get("name") or "" if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): continue phase = st.get("phase") or "?" cs = st.get("containerStatuses") or [] restarts = 0 ready = 0 total = 0 reason = st.get("reason") or "" for c in cs if isinstance(cs, list) else []: if not isinstance(c, dict): continue total += 1 restarts += int(c.get("restartCount") or 0) if c.get("ready"): ready += 1 state = c.get("state") or {} if not reason and isinstance(state, dict): waiting = state.get("waiting") or {} if isinstance(waiting, dict) and waiting.get("reason"): reason = waiting.get("reason") extra = f" ({reason})" if reason else "" out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") return "\n".join(out[:20]) def flux_not_ready() -> str: try: data = k8s_get( "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" ) except Exception: return "" items = data.get("items") or [] bad: list[str] = [] for it in items if isinstance(items, list) else []: md = it.get("metadata") or {} st = it.get("status") or {} name = md.get("name") or "" conds = st.get("conditions") or [] ready = None msg = "" for c in conds if isinstance(conds, list) else []: if isinstance(c, dict) and c.get("type") == "Ready": ready = c.get("status") msg = c.get("message") or "" if ready not in ("True", True): bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) return "\n".join(bad[:10]) # VictoriaMetrics (PromQL) helpers. def vm_query(query: str, timeout: int = 8) -> dict | None: try: url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) with request.urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception: return None def _vm_value_series(res: dict) -> list[dict]: if not res or (res.get("status") != "success"): return [] data = res.get("data") or {} result = data.get("result") or [] return result if isinstance(result, list) else [] def vm_render_result(res: dict | None, limit: int = 12) -> str: if not res: return "" series = _vm_value_series(res) if not series: return "" out: list[str] = [] for r in series[:limit]: if not isinstance(r, dict): continue metric = r.get("metric") or {} value = r.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" # Prefer common labels if present. label_parts = [] for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): if isinstance(metric, dict) and metric.get(k): label_parts.append(f"{k}={metric.get(k)}") if not label_parts and isinstance(metric, dict): for k in sorted(metric.keys()): if k.startswith("__"): continue label_parts.append(f"{k}={metric.get(k)}") if len(label_parts) >= 4: break labels = ", ".join(label_parts) if label_parts else "series" out.append(f"- {labels}: {val}") return "\n".join(out) def _parse_metric_lines(summary: str) -> dict[str, str]: parsed: dict[str, str] = {} for line in (summary or "").splitlines(): line = line.strip() if not line.startswith("-"): continue try: label, value = line.lstrip("-").split(":", 1) except ValueError: continue parsed[label.strip()] = value.strip() return parsed def _metrics_fallback_summary(panel: str, summary: str) -> str: parsed = _parse_metric_lines(summary) panel_l = (panel or "").lower() if panel_l.startswith("postgres connections"): used = parsed.get("conn=used") maxv = parsed.get("conn=max") if used and maxv: try: used_i = int(float(used)) max_i = int(float(maxv)) except ValueError: return f"Postgres connections: {summary}" free = max_i - used_i return f"Postgres connections: {used_i}/{max_i} used ({free} free)." if panel_l.startswith("postgres hottest"): if parsed: label, value = next(iter(parsed.items())) return f"Most Postgres connections: {label} = {value}." return f"{panel}: {summary}" def _node_ready_status(node: dict) -> bool | None: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": return True if cond.get("status") == "False": return False return None return None def _node_is_worker(node: dict) -> bool: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("node-role.kubernetes.io/control-plane") is not None: return False if labels.get("node-role.kubernetes.io/master") is not None: return False if labels.get("node-role.kubernetes.io/worker") is not None: return True return True def worker_nodes_status() -> tuple[list[str], list[str]]: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return ([], []) items = data.get("items") or [] ready_nodes: list[str] = [] not_ready_nodes: list[str] = [] for node in items if isinstance(items, list) else []: if not _node_is_worker(node): continue name = (node.get("metadata") or {}).get("name") or "" if not name: continue ready = _node_ready_status(node) if ready is True: ready_nodes.append(name) elif ready is False: not_ready_nodes.append(name) return (sorted(ready_nodes), sorted(not_ready_nodes)) def expected_worker_nodes_from_metrics() -> list[str]: for entry in _METRIC_INDEX: panel = (entry.get("panel_title") or "").lower() if "worker nodes ready" not in panel: continue exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else [] for expr in exprs: if not isinstance(expr, str): continue match = NODE_REGEX.search(expr) if not match: continue raw = match.group(1) nodes = [n.strip() for n in raw.split("|") if n.strip()] return sorted(nodes) return [] def _context_fallback(context: str) -> str: if not context: return "" trimmed = context.strip() if len(trimmed) > MAX_TOOL_CHARS: trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..." return "I couldn’t reach the model backend. Here is the data I found:\n" + trimmed def vm_top_restarts(hours: int = 1) -> str: q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" res = vm_query(q) if not res or (res.get("status") != "success"): return "" out: list[str] = [] for r in (res.get("data") or {}).get("result") or []: if not isinstance(r, dict): continue m = r.get("metric") or {} v = r.get("value") or [] ns = (m.get("namespace") or "").strip() pod = (m.get("pod") or "").strip() val = v[1] if isinstance(v, list) and len(v) > 1 else "" if pod: out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") return "\n".join(out) def vm_cluster_snapshot() -> str: parts: list[str] = [] # Node readiness (kube-state-metrics). ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') if ready and not_ready: try: r = _vm_value_series(ready)[0]["value"][1] nr = _vm_value_series(not_ready)[0]["value"][1] parts.append(f"- nodes ready: {r} (not ready: {nr})") except Exception: pass phases = vm_query("sum by (phase) (kube_pod_status_phase)") pr = vm_render_result(phases, limit=8) if pr: parts.append("Pod phases:") parts.append(pr) return "\n".join(parts).strip() def nodes_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} total = nodes.get("total") ready = nodes.get("ready") not_ready = nodes.get("not_ready") if isinstance(total, int) and isinstance(ready, int): not_ready = not_ready if isinstance(not_ready, int) else max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" total = len(items) ready = 0 for node in items: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": ready += 1 break not_ready = max(total - ready, 0) if not_ready: return f"{cluster_name} cluster has {total} nodes: {ready} Ready, {not_ready} NotReady." return f"{cluster_name} cluster has {total} nodes, all Ready." def nodes_names_summary(cluster_name: str) -> str: state = _ariadne_state() if state: nodes = state.get("nodes") if isinstance(state.get("nodes"), dict) else {} names = nodes.get("names") if isinstance(names, list) and names: cleaned = sorted({str(n) for n in names if n}) if len(cleaned) <= 30: return f"{cluster_name} node names: {', '.join(cleaned)}." shown = ", ".join(cleaned[:30]) return f"{cluster_name} node names: {shown}, … (+{len(cleaned) - 30} more)." try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" names = [] for node in items: name = (node.get("metadata") or {}).get("name") or "" if name: names.append(name) names = sorted(set(names)) if not names: return "" if len(names) <= 30: return f"{cluster_name} node names: {', '.join(names)}." shown = ", ".join(names[:30]) return f"{cluster_name} node names: {shown}, … (+{len(names) - 30} more)." def nodes_arch_summary(cluster_name: str, arch: str) -> str: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return "" items = data.get("items") or [] if not isinstance(items, list) or not items: return "" normalized = (arch or "").strip().lower() if normalized in ("aarch64", "arm64"): arch_label = "arm64" elif normalized in ("x86_64", "x86-64", "amd64"): arch_label = "amd64" else: arch_label = normalized total = 0 for node in items: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("kubernetes.io/arch") == arch_label: total += 1 return f"{cluster_name} cluster has {total} {arch_label} nodes." def _strip_code_fence(text: str) -> str: cleaned = (text or "").strip() match = CODE_FENCE_RE.match(cleaned) if match: return match.group(1).strip() return cleaned def _normalize_reply(value: Any) -> str: if isinstance(value, dict): for key in ("content", "response", "reply", "message"): if key in value: return _normalize_reply(value[key]) for v in value.values(): if isinstance(v, (str, dict, list)): return _normalize_reply(v) return json.dumps(value, ensure_ascii=False) if isinstance(value, list): parts = [_normalize_reply(item) for item in value] return " ".join(p for p in parts if p) if value is None: return "" text = _strip_code_fence(str(value)) if text.startswith("{") and text.endswith("}"): try: return _normalize_reply(json.loads(text)) except Exception: return text return text # Conversation state. history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) def key_for(room_id: str, sender: str, is_dm: bool): return (room_id, None) if is_dm else (room_id, sender) def build_context( prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]], inventory: list[dict[str, Any]] | None = None, ) -> str: parts: list[str] = [] kb = kb_retrieve(prompt) if kb: parts.append(kb) endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) node_ctx = node_inventory_context(prompt, inventory) if node_ctx: parts.append(node_ctx) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) for ns, name in (targets or []) + (edges or []): if ns and name: prefixes_by_ns[ns].add(name) pod_lines: list[str] = [] for ns in sorted(prefixes_by_ns.keys()): summary = summarize_pods(ns, prefixes_by_ns[ns]) if summary: pod_lines.append(f"Pods (live):\n{summary}") if pod_lines: parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) flux_bad = flux_not_ready() if flux_bad: parts.append("Flux (not ready):\n" + flux_bad) p_l = (prompt or "").lower() if any(w in p_l for w in METRIC_HINT_WORDS): restarts = vm_top_restarts(1) if restarts: parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) snap = vm_cluster_snapshot() if snap: parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) return "\n\n".join([p for p in parts if p]).strip() def _ollama_call(hist_key, prompt: str, *, context: str) -> str: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " "Prefer answering with exact repo paths and Kubernetes resource names. " "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " "If the answer is not grounded in the provided context or tool data, say you do not know." ) transcript_parts = [system] if context: transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) transcript_parts.extend(history[hist_key][-24:]) transcript_parts.append(f"User: {prompt}") transcript = "\n".join(transcript_parts) payload = {"model": MODEL, "message": transcript} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: data = json.loads(resp.read().decode()) raw_reply = data.get("message") or data.get("response") or data.get("reply") or data reply = _normalize_reply(raw_reply) or "I'm here to help." history[hist_key].append(f"Atlas: {reply}") return reply def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str: try: return _ollama_call(hist_key, prompt, context=context) except Exception: if fallback: history[hist_key].append(f"Atlas: {fallback}") return fallback return "Model backend is busy. Try again in a moment." def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() def worker(): result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback) done.set() thread = threading.Thread(target=worker, daemon=True) thread.start() if not done.wait(2.0): send_msg(token, room, "Thinking…") prompt_hint = " ".join((prompt or "").split()) if len(prompt_hint) > 160: prompt_hint = prompt_hint[:157] + "…" heartbeat = max(10, THINKING_INTERVAL_SEC) next_heartbeat = time.monotonic() + heartbeat while not done.wait(max(0, next_heartbeat - time.monotonic())): if prompt_hint: send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)") else: send_msg(token, room, "Still thinking (gathering context)…") next_heartbeat += heartbeat thread.join(timeout=1) return result["reply"] or fallback or "Model backend is busy. Try again in a moment." def sync_loop(token: str, room_id: str): since = None try: res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) since = res.get("next_batch") except Exception: pass while True: params = {"timeout": 30000} if since: params["since"] = since query = parse.urlencode(params) try: res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) except Exception: time.sleep(5) continue since = res.get("next_batch", since) # invites for rid, data in res.get("rooms", {}).get("invite", {}).items(): try: join_room(token, rid) except Exception: pass # messages for rid, data in res.get("rooms", {}).get("join", {}).items(): timeline = data.get("timeline", {}).get("events", []) joined_count = data.get("summary", {}).get("m.joined_member_count") is_dm = joined_count is not None and joined_count <= 2 for ev in timeline: if ev.get("type") != "m.room.message": continue content = ev.get("content", {}) body = (content.get("body", "") or "").strip() if not body: continue sender = ev.get("sender", "") if sender == f"@{USER}:live.bstein.dev": continue mentioned = is_mentioned(content, body) hist_key = key_for(rid, sender, is_dm) history[hist_key].append(f"{sender}: {body}") history[hist_key] = history[hist_key][-80:] if not (is_dm or mentioned): continue lower_body = body.lower() # Only do live cluster introspection in DMs; metrics can be answered when mentioned. allow_tools = is_dm allow_metrics = is_dm or mentioned promql = "" if allow_tools: m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) if m: promql = m.group(1).strip() # Attempt to scope tools to the most likely workloads when hostnames are mentioned. targets: list[tuple[str, str]] = [] for m in HOST_RE.finditer(body.lower()): host = m.group(1).lower() for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" for w in backend.get("workloads") or []: if isinstance(w, dict) and w.get("name"): targets.append((ns, str(w["name"]))) inventory = node_inventory_for_prompt(body) context = build_context(body, allow_tools=allow_tools, targets=targets, inventory=inventory) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics) if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context fallback = metrics_fallback or "" if not fallback and context: fallback = _context_fallback(context) structured = structured_answer(body, inventory=inventory, metrics_summary=metrics_fallback or "") if structured: send_msg(token, rid, structured) continue reply = ollama_reply_with_thinking( token, rid, hist_key, body, context=context, fallback=fallback, ) send_msg(token, rid, reply) def login_with_retry(): last_err = None for attempt in range(10): try: return login() except Exception as exc: # noqa: BLE001 last_err = exc time.sleep(min(30, 2 ** attempt)) raise last_err def main(): load_kb() token = login_with_retry() try: room_id = resolve_alias(token, ROOM_ALIAS) join_room(token, room_id) except Exception: room_id = None sync_loop(token, room_id) if __name__ == "__main__": main()