import collections import json import os import re import ssl import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any from urllib import error, parse, request BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") USER = os.environ["BOT_USER"] PASSWORD = os.environ["BOT_PASS"] ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480")) ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090")) ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "") SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30")) KB_DIR = os.environ.get("KB_DIR", "") VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "") ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000")) MAX_CONTEXT_CHARS = int(os.environ.get("ATLASBOT_MAX_CONTEXT_CHARS", "12000")) THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120")) OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2")) OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false" TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") STOPWORDS = { "the", "and", "for", "with", "this", "that", "from", "into", "what", "how", "why", "when", "where", "which", "who", "can", "could", "should", "would", "please", "help", "atlas", "othrys", "system", "systems", "service", "services", "app", "apps", "platform", "software", "tool", "tools", } METRIC_HINT_WORDS = { "bandwidth", "connections", "cpu", "database", "db", "disk", "health", "memory", "network", "node", "nodes", "postgres", "status", "storage", "usage", "down", "slow", "error", "unknown_error", "timeout", "crash", "crashloop", "restart", "restarts", "pending", "unreachable", "latency", "pod", "pods", } CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL) TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE) TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE) _DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D" CONFIDENCE_RE = re.compile(r"confidence\s*:\s*(high|medium|low)\b", re.IGNORECASE) OPERATION_HINTS = { "count": ("how many", "count", "number", "total"), "list": ("list", "which", "what are", "show", "names"), "top": ("top", "hottest", "highest", "most", "largest", "max", "maximum"), "status": ("ready", "not ready", "unready", "down", "missing", "status"), } METRIC_HINTS = { "cpu": ("cpu",), "ram": ("ram", "memory", "mem"), "net": ("net", "network", "bandwidth", "throughput"), "io": ("io", "disk", "storage"), "connections": ("connections", "conn", "postgres", "database", "db"), "pods": ("pods", "pod"), } CLUSTER_HINT_WORDS = { "atlas", "titan", "cluster", "k8s", "kubernetes", "node", "nodes", "worker", "workers", "pod", "pods", "namespace", "service", "deployment", "daemonset", "statefulset", "grafana", "victoria", "prometheus", "ariadne", "mailu", "nextcloud", "vaultwarden", "firefly", "wger", "jellyfin", "planka", "budget", "element", "synapse", "mas", "comms", "longhorn", "harbor", "jenkins", "gitea", "flux", "keycloak", "postgres", "database", "db", "atlasbot", "jetson", "rpi", "raspberry", "amd64", "arm64", } _OLLAMA_LOCK = threading.Lock() HARDWARE_HINTS = { "amd64": ("amd64", "x86", "x86_64", "x86-64"), "jetson": ("jetson",), "rpi4": ("rpi4",), "rpi5": ("rpi5",), "rpi": ("rpi", "raspberry"), "arm64": ("arm64", "aarch64"), } def normalize_query(text: str) -> str: cleaned = (text or "").lower() for ch in _DASH_CHARS: cleaned = cleaned.replace(ch, "-") cleaned = re.sub(r"\s+", " ", cleaned).strip() return cleaned def _tokens(text: str) -> list[str]: toks = [t.lower() for t in TOKEN_RE.findall(text or "")] return [t for t in toks if t not in STOPWORDS and len(t) >= 2] def _ensure_confidence(text: str) -> str: if not text: return "" lines = text.strip().splitlines() for idx, line in enumerate(lines): match = CONFIDENCE_RE.search(line) if match: level = match.group(1).lower() lines[idx] = CONFIDENCE_RE.sub(f"Confidence: {level}", line) return "\n".join(lines) lines.append("Confidence: medium") return "\n".join(lines) def _ollama_endpoint() -> str: url = (OLLAMA_URL or "").strip() if not url: return "" if url.endswith("/api/chat"): return url return url.rstrip("/") + "/api/chat" def _history_to_messages(lines: list[str]) -> list[dict[str, str]]: messages: list[dict[str, str]] = [] for line in lines: raw = (line or "").strip() if not raw: continue role = "user" content = raw lowered = raw.lower() if lowered.startswith("atlas:"): role = "assistant" content = raw.split(":", 1)[1].strip() elif lowered.startswith("user:"): role = "user" content = raw.split(":", 1)[1].strip() elif ":" in raw: content = raw.split(":", 1)[1].strip() if content: messages.append({"role": role, "content": content}) return messages # Mention detection (Matrix rich mentions + plain @atlas). MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_RE = re.compile( r"(? str: t = token.strip() if not t: return "" if t.startswith("@") and ":" in t: return t t = t.lstrip("@") if ":" in t: return f"@{t}" return f"@{t}:{SERVER_NAME}" MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} def _body_mentions_token(body: str) -> bool: lower = (body or "").strip().lower() if not lower: return False for token in MENTION_LOCALPARTS: for prefix in (token, f"@{token}"): if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "): return True return False def is_mentioned(content: dict, body: str) -> bool: if MENTION_RE.search(body or "") is not None: return True if _body_mentions_token(body or ""): return True mentions = content.get("m.mentions", {}) user_ids = mentions.get("user_ids", []) if not isinstance(user_ids, list): return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) def _strip_bot_mention(text: str) -> str: if not text: return "" if not MENTION_LOCALPARTS: return text.strip() names = [re.escape(name) for name in MENTION_LOCALPARTS if name] if not names: return text.strip() pattern = r"^(?:\s*@?(?:" + "|".join(names) + r")(?::)?\s+)+" cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE).strip() return cleaned or text.strip() # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path data = None headers = {} if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" r = request.Request(url, data=data, headers=headers, method=method) with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def login() -> str: login_user = normalize_user_id(USER) payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": login_user}, "password": PASSWORD, } res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) return res["access_token"] def resolve_alias(token: str, alias: str) -> str: enc = parse.quote(alias) res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) return res["room_id"] def join_room(token: str, room: str): req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) def send_msg(token: str, room: str, text: str): path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" req("POST", path, token, body={"msgtype": "m.text", "body": text}) # Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() _METRIC_INDEX: list[dict[str, Any]] = [] NODE_REGEX = re.compile(r'node=~"([^"]+)"') def _load_json_file(path: str) -> Any | None: try: with open(path, "rb") as f: return json.loads(f.read().decode("utf-8")) except Exception: return None def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or [] KB = {"catalog": catalog, "runbooks": runbooks} host_index: dict[str, list[dict]] = collections.defaultdict(list) for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: host = (ep.get("host") or "").lower() if host: host_index[host].append(ep) _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} names: set[str] = set() for s in catalog.get("services", []) if isinstance(catalog, dict) else []: if isinstance(s, dict) and s.get("name"): names.add(str(s["name"]).lower()) for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: if isinstance(w, dict) and w.get("name"): names.add(str(w["name"]).lower()) _NAME_INDEX = names _METRIC_INDEX = metrics if isinstance(metrics, list) else [] def _score_kb_docs(query: str) -> list[dict[str, Any]]: q = (query or "").strip() if not q or not KB.get("runbooks"): return [] ql = q.lower() q_tokens = _tokens(q) if not q_tokens: return [] scored: list[tuple[int, dict]] = [] for doc in KB.get("runbooks", []): if not isinstance(doc, dict): continue title = str(doc.get("title") or "") body = str(doc.get("body") or "") tags = doc.get("tags") or [] entrypoints = doc.get("entrypoints") or [] hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() score = 0 for t in set(q_tokens): if t in hay: score += 3 if t in title.lower() else 1 for h in entrypoints: if isinstance(h, str) and h.lower() in ql: score += 4 if score: scored.append((score, doc)) scored.sort(key=lambda x: x[0], reverse=True) return [d for _, d in scored] def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q: return "" scored = _score_kb_docs(q) picked = scored[:limit] if not picked: return "" parts: list[str] = ["Atlas KB (retrieved):"] used = 0 for d in picked: path = d.get("path") or "" title = d.get("title") or path body = (d.get("body") or "").strip() snippet = body[:900].strip() chunk = f"- {title} ({path})\n{snippet}" if used + len(chunk) > MAX_KB_CHARS: break parts.append(chunk) used += len(chunk) return "\n".join(parts).strip() def kb_retrieve_titles(query: str, *, limit: int = 4) -> str: scored = _score_kb_docs(query) picked = scored[:limit] if not picked: return "" parts = ["Relevant runbooks:"] for doc in picked: title = doc.get("title") or doc.get("path") or "runbook" path = doc.get("path") or "" if path: parts.append(f"- {title} ({path})") else: parts.append(f"- {title}") return "\n".join(parts) def _extract_titan_nodes(text: str) -> list[str]: cleaned = normalize_query(text) names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n} for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE): tail = match.group(1) for part in re.split(r"[/,]", tail): part = part.strip() if part: names.add(f"titan-{part.lower()}") for match in TITAN_RANGE_RE.finditer(cleaned): left, right = match.groups() if left: names.add(f"titan-{left.lower()}") if right: names.add(f"titan-{right.lower()}") return sorted(names) def _humanize_rate(value: str, *, unit: str) -> str: try: val = float(value) except (TypeError, ValueError): return value if unit == "%": return f"{val:.1f}%" if val >= 1024 * 1024: return f"{val / (1024 * 1024):.2f} MB/s" if val >= 1024: return f"{val / 1024:.2f} KB/s" return f"{val:.2f} B/s" def _has_any(text: str, phrases: tuple[str, ...]) -> bool: return any(p in text for p in phrases) def _detect_operation(q: str) -> str | None: if _has_any(q, OPERATION_HINTS["top"]): return "top" for op, phrases in OPERATION_HINTS.items(): if op == "top": continue if _has_any(q, phrases): return op return None def _detect_metric(q: str) -> str | None: tokens = set(_tokens(q)) for metric, phrases in METRIC_HINTS.items(): for phrase in phrases: if " " in phrase: if phrase in q: return metric elif phrase in tokens: return metric return None def _detect_hardware_filters(q: str) -> tuple[set[str], set[str]]: include: set[str] = set() exclude: set[str] = set() rpi_specific = "rpi4" in q or "rpi5" in q for hardware, phrases in HARDWARE_HINTS.items(): if hardware == "rpi" and rpi_specific: continue for phrase in phrases: if f"non {phrase}" in q or f"non-{phrase}" in q or f"not {phrase}" in q: exclude.add(hardware) elif phrase in q: include.add(hardware) return include, exclude def _detect_role_filters(q: str) -> set[str]: roles: set[str] = set() if "control-plane" in q or "control plane" in q: roles.add("control-plane") if "master" in q: roles.add("master") if "accelerator" in q: roles.add("accelerator") return roles def _detect_entity(q: str) -> str | None: if "node" in q or "nodes" in q or "worker" in q or "hardware" in q or "architecture" in q or TITAN_NODE_RE.search(q): return "node" if "pod" in q or "pods" in q: return "pod" if "namespace" in q or "namespaces" in q: return "namespace" return None def _metric_entry_score(entry: dict[str, Any], tokens: list[str], *, metric: str | None, op: str | None) -> int: hay = _metric_tokens(entry) score = 0 for t in set(tokens): if t in hay: score += 2 if t in (entry.get("panel_title") or "").lower() else 1 if metric: for phrase in METRIC_HINTS.get(metric, (metric,)): if phrase in hay: score += 3 if op == "top" and ("hottest" in hay or "top" in hay): score += 3 if "node" in hay: score += 1 return score def _select_metric_entry(tokens: list[str], *, metric: str | None, op: str | None) -> dict[str, Any] | None: scored: list[tuple[int, dict[str, Any]]] = [] for entry in _METRIC_INDEX: if not isinstance(entry, dict): continue score = _metric_entry_score(entry, tokens, metric=metric, op=op) if score: scored.append((score, entry)) if not scored: return None scored.sort(key=lambda item: item[0], reverse=True) return scored[0][1] def _apply_node_filter(expr: str, node_regex: str | None) -> str: if not node_regex: return expr needle = 'node_uname_info{nodename!=""}' replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}' return expr.replace(needle, replacement) def _metric_expr_uses_percent(entry: dict[str, Any]) -> bool: exprs = entry.get("exprs") expr = exprs[0] if isinstance(exprs, list) and exprs else "" return "* 100" in expr or "*100" in expr def _format_metric_value(value: str, *, percent: bool, rate: bool = False) -> str: try: num = float(value) except (TypeError, ValueError): return value if percent: return f"{num:.1f}%" if rate: return _humanize_rate(value, unit="rate") if abs(num) >= 1: return f"{num:.2f}".rstrip("0").rstrip(".") return f"{num:.4f}".rstrip("0").rstrip(".") def _format_metric_label(metric: dict[str, Any]) -> str: label_parts = [] for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): if metric.get(k): label_parts.append(f"{k}={metric.get(k)}") if not label_parts: for k in sorted(metric.keys()): if k.startswith("__"): continue label_parts.append(f"{k}={metric.get(k)}") if len(label_parts) >= 4: break return ", ".join(label_parts) if label_parts else "series" def _primary_series_metric(res: dict | None) -> tuple[str | None, str | None]: series = _vm_value_series(res or {}) if not series: return (None, None) first = series[0] metric = first.get("metric") if isinstance(first, dict) else {} value = first.get("value") if isinstance(first, dict) else [] node = metric.get("node") if isinstance(metric, dict) else None val = value[1] if isinstance(value, list) and len(value) > 1 else None return (node, val) def _format_metric_answer(entry: dict[str, Any], res: dict | None) -> str: series = _vm_value_series(res) panel = entry.get("panel_title") or "Metric" if not series: return "" percent = _metric_expr_uses_percent(entry) lines: list[str] = [] for r in series[:5]: if not isinstance(r, dict): continue metric = r.get("metric") or {} value = r.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" label = _format_metric_label(metric if isinstance(metric, dict) else {}) lines.append(f"{label}: {_format_metric_value(val, percent=percent)}") if not lines: return "" if len(lines) == 1: return f"{panel}: {lines[0]}." return f"{panel}:\n" + "\n".join(f"- {line}" for line in lines) def _inventory_filter( inventory: list[dict[str, Any]], *, include_hw: set[str], exclude_hw: set[str], only_workers: bool, only_ready: bool | None, nodes_in_query: list[str], ) -> list[dict[str, Any]]: results = inventory if nodes_in_query: results = [node for node in results if node.get("name") in nodes_in_query] if only_workers: results = [node for node in results if node.get("is_worker") is True] if only_ready is True: results = [node for node in results if node.get("ready") is True] if only_ready is False: results = [node for node in results if node.get("ready") is False] if include_hw: results = [node for node in results if _hardware_match(node, include_hw)] if exclude_hw: results = [node for node in results if not _hardware_match(node, exclude_hw)] return results def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool: hw = node.get("hardware") or "" arch = node.get("arch") or "" for f in filters: if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"): return True if f == "arm64" and arch == "arm64": return True if hw == f: return True if f == "amd64" and arch == "amd64": return True return False def _node_roles(labels: dict[str, Any]) -> list[str]: roles: list[str] = [] for key in labels.keys(): if key.startswith("node-role.kubernetes.io/"): role = key.split("/", 1)[-1] if role: roles.append(role) return sorted(set(roles)) def _hardware_class(labels: dict[str, Any]) -> str: if str(labels.get("jetson") or "").lower() == "true": return "jetson" hardware = (labels.get("hardware") or "").strip().lower() if hardware in ("rpi4", "rpi5", "rpi"): return hardware arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "" if arch == "amd64": return "amd64" if arch == "arm64": return "arm64-unknown" return "unknown" def node_inventory_live() -> list[dict[str, Any]]: try: data = k8s_get("/api/v1/nodes?limit=500") except Exception: return [] items = data.get("items") or [] inventory: list[dict[str, Any]] = [] for node in items if isinstance(items, list) else []: meta = node.get("metadata") or {} labels = meta.get("labels") or {} name = meta.get("name") or "" if not name: continue inventory.append( { "name": name, "arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "", "hardware": _hardware_class(labels), "roles": _node_roles(labels), "is_worker": _node_is_worker(node), "ready": _node_ready_status(node), } ) return sorted(inventory, key=lambda item: item["name"]) def node_inventory() -> list[dict[str, Any]]: snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) if inventory: return inventory return node_inventory_live() def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: grouped: dict[str, list[str]] = collections.defaultdict(list) for node in inventory: grouped[node.get("hardware") or "unknown"].append(node["name"]) return {k: sorted(v) for k, v in grouped.items()} def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str: q = normalize_query(query) if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")): return "" if inventory is None: inventory = node_inventory() if not inventory: return "" groups = _group_nodes(inventory) total = len(inventory) ready = sum(1 for node in inventory if node.get("ready") is True) not_ready = sum(1 for node in inventory if node.get("ready") is False) lines: list[str] = [ "Node inventory (live):", f"- total: {total}, ready: {ready}, not ready: {not_ready}", ] for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): if key in groups: lines.append(f"- {key}: {', '.join(groups[key])}") non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", []))) if non_rpi: lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}") unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", []) if unknowns: lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.") expected_workers = expected_worker_nodes_from_metrics() if expected_workers: ready_workers, not_ready_workers = worker_nodes_status() missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers)) lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}") lines.append(f"- workers_ready: {', '.join(ready_workers)}") if not_ready_workers: lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}") if missing: lines.append(f"- workers_missing (derived): {', '.join(missing)}") return "\n".join(lines) def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]: q = normalize_query(prompt) if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")): return node_inventory() return [] def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: grouped: dict[str, list[str]] = collections.defaultdict(list) for node in inventory: grouped[(node.get("arch") or "unknown")].append(node["name"]) return {k: sorted(v) for k, v in grouped.items()} def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]: usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {} per_node: dict[str, dict[str, Any]] = {} for metric_name, entries in usage.items() if isinstance(usage, dict) else []: if not isinstance(entries, list): continue for entry in entries: if not isinstance(entry, dict): continue node = entry.get("node") if not isinstance(node, str) or not node: continue per_node.setdefault(node, {})[metric_name] = entry.get("value") return [{"node": node, **vals} for node, vals in sorted(per_node.items())] def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]: cleaned: list[dict[str, Any]] = [] for entry in workloads: if not isinstance(entry, dict): continue cleaned.append( { "namespace": entry.get("namespace"), "workload": entry.get("workload"), "pods_total": entry.get("pods_total"), "pods_running": entry.get("pods_running"), "primary_node": entry.get("primary_node"), "nodes": entry.get("nodes"), } ) cleaned.sort( key=lambda item: ( -(item.get("pods_total") or 0), str(item.get("namespace") or ""), str(item.get("workload") or ""), ) ) return cleaned[:limit] def _workloads_for_prompt(prompt: str, workloads: list[dict[str, Any]], limit: int = 12) -> list[dict[str, Any]]: tokens = set(_tokens(prompt)) if tokens: matched: list[dict[str, Any]] = [] for entry in workloads: if not isinstance(entry, dict): continue entry_tokens = _workload_tokens(entry) if entry_tokens & tokens: matched.append(entry) if matched: return _workloads_for_facts(matched, limit=limit) return _workloads_for_facts(workloads, limit=limit) def facts_context( prompt: str, *, inventory: list[dict[str, Any]] | None, snapshot: dict[str, Any] | None, workloads: list[dict[str, Any]] | None, ) -> str: inv = inventory or [] nodes_in_query = _extract_titan_nodes(prompt) metrics = _snapshot_metrics(snapshot) nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {} summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {} expected_workers = expected_worker_nodes_from_metrics() ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], []) total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total") ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready") not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready") not_ready_names = summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names") by_hardware = _group_nodes(inv) if inv else {} by_arch = _nodes_by_arch(inv) if inv else {} control_plane_nodes = [ node["name"] for node in inv if any(role in ("control-plane", "master") for role in (node.get("roles") or [])) ] worker_nodes = [node["name"] for node in inv if node.get("is_worker") is True] lines: list[str] = ["Facts (live snapshot):"] if total is not None: lines.append(f"- nodes_total={total}, ready={ready}, not_ready={not_ready}") if isinstance(summary, dict): by_arch_counts = summary.get("by_arch") if isinstance(by_arch_counts, dict) and by_arch_counts: parts = [f"{arch}={count}" for arch, count in sorted(by_arch_counts.items())] lines.append(f"- nodes_by_arch: {', '.join(parts)}") if not_ready_names: lines.append(f"- nodes_not_ready: {', '.join(not_ready_names)}") for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): nodes_list = by_hardware.get(key) or [] if nodes_list: lines.append(f"- {key}: {', '.join(nodes_list)}") non_rpi = sorted(set(by_hardware.get("jetson", [])) | set(by_hardware.get("amd64", []))) if non_rpi: lines.append(f"- non_raspberry_pi: {', '.join(non_rpi)}") for key, nodes_list in sorted(by_arch.items()): if nodes_list: lines.append(f"- arch {key}: {', '.join(nodes_list)}") if control_plane_nodes: lines.append(f"- control_plane_nodes: {', '.join(control_plane_nodes)}") if worker_nodes: lines.append(f"- worker_nodes: {', '.join(worker_nodes)}") if ready_workers or not_ready_workers: lines.append(f"- workers_ready: {', '.join(ready_workers) if ready_workers else 'none'}") if not_ready_workers: lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}") if expected_workers and any(word in normalize_query(prompt) for word in ("missing", "expected", "should", "not ready", "unready")): missing = sorted( set(expected_workers) - {n.get("name") for n in inv if isinstance(n, dict) and n.get("name")} ) lines.append(f"- expected_workers: {', '.join(expected_workers)}") if missing: lines.append(f"- expected_workers_missing: {', '.join(missing)}") hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} for key in ("cpu", "ram", "net", "io"): entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {} node = entry.get("node") value = entry.get("value") if node and value is not None: value_fmt = _format_metric_value( str(value), percent=key in ("cpu", "ram"), rate=key in ("net", "io"), ) lines.append(f"- hottest_{key}: {node} ({value_fmt})") postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} if isinstance(postgres, dict) and postgres: used = postgres.get("used") max_conn = postgres.get("max") if used is not None and max_conn is not None: lines.append(f"- postgres_connections: {used} used / {max_conn} max") hottest_db = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {} if hottest_db.get("label"): lines.append( f"- postgres_hottest_db: {hottest_db.get('label')} ({hottest_db.get('value')})" ) for key in ("pods_running", "pods_pending", "pods_failed", "pods_succeeded"): value = metrics.get(key) if value is not None: lines.append(f"- {key}: {value}") usage_table = _node_usage_table(metrics) if usage_table: lines.append("- node_usage (cpu/ram/net/io):") for entry in usage_table: node = entry.get("node") if not node: continue cpu = _format_metric_value(str(entry.get("cpu")), percent=True) if entry.get("cpu") is not None else "" ram = _format_metric_value(str(entry.get("ram")), percent=True) if entry.get("ram") is not None else "" net = ( _format_metric_value(str(entry.get("net")), percent=False, rate=True) if entry.get("net") is not None else "" ) io_val = ( _format_metric_value(str(entry.get("io")), percent=False, rate=True) if entry.get("io") is not None else "" ) lines.append(f" - {node}: cpu={cpu}, ram={ram}, net={net}, io={io_val}") if nodes_in_query: lines.append("- node_details:") for name in nodes_in_query: detail = next((n for n in inv if n.get("name") == name), None) if not detail: lines.append(f" - {name}: not found in snapshot") continue roles = ",".join(detail.get("roles") or []) or "none" lines.append( f" - {name}: hardware={detail.get('hardware')}, arch={detail.get('arch')}, " f"ready={detail.get('ready')}, roles={roles}" ) workload_entries = _workloads_for_prompt(prompt, workloads or []) if workload_entries: lines.append("- workloads:") for entry in workload_entries: if not isinstance(entry, dict): continue ns = entry.get("namespace") or "" wl = entry.get("workload") or "" primary = entry.get("primary_node") or "" pods_total = entry.get("pods_total") label = f"{ns}/{wl}" if ns and wl else (wl or ns) if not label: continue if primary: lines.append(f" - {label}: primary_node={primary}, pods_total={pods_total}") else: lines.append(f" - {label}: pods_total={pods_total}") rendered = "\n".join(lines) return rendered[:MAX_FACTS_CHARS] def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]: names = [node["name"] for node in inventory] ready = [node["name"] for node in inventory if node.get("ready") is True] not_ready = [node["name"] for node in inventory if node.get("ready") is False] groups = _group_nodes(inventory) workers = [node for node in inventory if node.get("is_worker") is True] worker_names = [node["name"] for node in workers] worker_ready = [node["name"] for node in workers if node.get("ready") is True] worker_not_ready = [node["name"] for node in workers if node.get("ready") is False] expected_workers = expected_worker_nodes_from_metrics() expected_ready = [n for n in expected_workers if n in ready] if expected_workers else [] expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else [] expected_missing = [n for n in expected_workers if n not in names] if expected_workers else [] return { "names": sorted(names), "ready": sorted(ready), "not_ready": sorted(not_ready), "groups": groups, "worker_names": sorted(worker_names), "worker_ready": sorted(worker_ready), "worker_not_ready": sorted(worker_not_ready), "expected_workers": expected_workers, "expected_ready": sorted(expected_ready), "expected_not_ready": sorted(expected_not_ready), "expected_missing": sorted(expected_missing), } def _workload_tokens(entry: dict[str, Any]) -> set[str]: tokens: set[str] = set() for key in ("workload", "namespace"): value = entry.get(key) if isinstance(value, str) and value: tokens.update(_tokens(value)) return tokens def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None: q_tokens = set(_tokens(prompt)) if not q_tokens: return None scored: list[tuple[int, dict[str, Any]]] = [] for entry in workloads: if not isinstance(entry, dict): continue tokens = _workload_tokens(entry) score = len(tokens & q_tokens) if score: scored.append((score, entry)) if not scored: return None scored.sort(key=lambda item: item[0], reverse=True) return scored[0][1] def _format_confidence(answer: str, confidence: str) -> str: if not answer: return "" return f"{answer}\nConfidence: {confidence}." def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str: q = normalize_query(prompt) if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")): return "" entry = _select_workload(prompt, workloads) if not entry: return "" workload = entry.get("workload") or "" namespace = entry.get("namespace") or "" nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {} primary = entry.get("primary_node") or "" if not workload or not nodes: return "" parts = [] if primary: parts.append(f"{primary} (primary)") for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])): if node == primary: continue parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})") node_text = ", ".join(parts) if parts else primary answer = f"{workload} runs in {namespace}. Nodes: {node_text}." return _format_confidence(answer, "medium") def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]: if not snapshot: return {} metrics = snapshot.get("metrics") return metrics if isinstance(metrics, dict) else {} def _node_usage_top( usage: list[dict[str, Any]], *, allowed_nodes: set[str] | None, ) -> tuple[str, float] | None: best_node = "" best_val = None for item in usage if isinstance(usage, list) else []: if not isinstance(item, dict): continue node = item.get("node") or "" if allowed_nodes and node not in allowed_nodes: continue value = item.get("value") try: numeric = float(value) except (TypeError, ValueError): continue if best_val is None or numeric > best_val: best_val = numeric best_node = node if best_node and best_val is not None: return best_node, best_val return None def snapshot_metric_answer( prompt: str, *, snapshot: dict[str, Any] | None, inventory: list[dict[str, Any]], ) -> str: if not snapshot: return "" metrics = _snapshot_metrics(snapshot) if not metrics: return "" q = normalize_query(prompt) metric = _detect_metric(q) op = _detect_operation(q) include_hw, exclude_hw = _detect_hardware_filters(q) nodes_in_query = _extract_titan_nodes(q) only_workers = "worker" in q or "workers" in q filtered = _inventory_filter( inventory, include_hw=include_hw, exclude_hw=exclude_hw, only_workers=only_workers, only_ready=None, nodes_in_query=nodes_in_query, ) allowed_nodes = {node["name"] for node in filtered} if filtered else None if metric in {"cpu", "ram", "net", "io"} and op in {"top", "status", None}: usage = metrics.get("node_usage", {}).get(metric, []) top = _node_usage_top(usage, allowed_nodes=allowed_nodes) if top: node, val = top percent = metric in {"cpu", "ram"} value = _format_metric_value(str(val), percent=percent, rate=metric in {"net", "io"}) scope = "" if include_hw: scope = f" among {' and '.join(sorted(include_hw))}" answer = f"Hottest node{scope}: {node} ({value})." if allowed_nodes and len(allowed_nodes) != len(inventory): overall = _node_usage_top(usage, allowed_nodes=None) if overall and overall[0] != node: overall_val = _format_metric_value( str(overall[1]), percent=percent, rate=metric in {"net", "io"}, ) answer += f" Overall hottest: {overall[0]} ({overall_val})." return _format_confidence(answer, "high") if metric == "connections" or "postgres" in q: postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} used = postgres.get("used") max_conn = postgres.get("max") hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {} parts: list[str] = [] if used is not None and max_conn is not None: parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.") if hottest.get("label"): hot_val = hottest.get("value") hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else "" parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).") if parts: return _format_confidence(" ".join(parts), "high") if metric == "pods": running = metrics.get("pods_running") pending = metrics.get("pods_pending") failed = metrics.get("pods_failed") succeeded = metrics.get("pods_succeeded") if "pending" in q and pending is not None: return _format_confidence(f"Pending pods: {pending:.0f}.", "high") if "failed" in q and failed is not None: return _format_confidence(f"Failed pods: {failed:.0f}.", "high") if "succeeded" in q or "completed" in q: if succeeded is not None: return _format_confidence(f"Succeeded pods: {succeeded:.0f}.", "high") if "running" in q and running is not None: return _format_confidence(f"Running pods: {running:.0f}.", "high") parts = [] if running is not None: parts.append(f"running {running:.0f}") if pending is not None: parts.append(f"pending {pending:.0f}") if failed is not None: parts.append(f"failed {failed:.0f}") if succeeded is not None: parts.append(f"succeeded {succeeded:.0f}") if parts: return _format_confidence(f"Pods: {', '.join(parts)}.", "high") return "" def structured_answer( prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str, snapshot: dict[str, Any] | None = None, workloads: list[dict[str, Any]] | None = None, ) -> str: q = normalize_query(prompt) if not q: return "" if workloads: workload_resp = workload_answer(prompt, workloads) if workload_resp: return workload_resp snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory) if snap_resp: return snap_resp tokens = _tokens(q) op = _detect_operation(q) metric = _detect_metric(q) entity = _detect_entity(q) include_hw, exclude_hw = _detect_hardware_filters(q) nodes_in_query = _extract_titan_nodes(q) only_workers = "worker" in q or "workers" in q role_filters = _detect_role_filters(q) only_ready: bool | None = None if "not ready" in q or "unready" in q or "down" in q or "missing" in q: only_ready = False elif "ready" in q: only_ready = True if entity == "node" and only_ready is not None and op != "count": op = "status" if not op and entity == "node": op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count" if op == "top" and metric is None: metric = "cpu" # Metrics-first when a metric or top operation is requested. if metric or op == "top": entry = _select_metric_entry(tokens, metric=metric, op=op) if entry and isinstance(entry.get("exprs"), list) and entry["exprs"]: expr = entry["exprs"][0] if inventory: scoped = _inventory_filter( inventory, include_hw=include_hw, exclude_hw=exclude_hw, only_workers=only_workers, only_ready=None, nodes_in_query=nodes_in_query, ) if scoped: node_regex = "|".join([n["name"] for n in scoped]) expr = _apply_node_filter(expr, node_regex) res = vm_query(expr, timeout=20) answer = _format_metric_answer(entry, res) if answer: scope_parts: list[str] = [] if include_hw: scope_parts.append(" and ".join(sorted(include_hw))) if exclude_hw: scope_parts.append(f"excluding {' and '.join(sorted(exclude_hw))}") if only_workers: scope_parts.append("worker") if scope_parts: scope = " ".join(scope_parts) overall_note = "" base_expr = entry["exprs"][0] if inventory: all_nodes = "|".join([n["name"] for n in inventory]) if all_nodes: base_expr = _apply_node_filter(base_expr, all_nodes) base_res = vm_query(base_expr, timeout=20) base_node, base_val = _primary_series_metric(base_res) scoped_node, scoped_val = _primary_series_metric(res) if base_node and scoped_node and base_node != scoped_node: percent = _metric_expr_uses_percent(entry) base_val_fmt = _format_metric_value(base_val or "", percent=percent) overall_note = f" Overall hottest node: {base_node} ({base_val_fmt})." return f"Among {scope} nodes, {answer}{overall_note}" return answer if metrics_summary: return metrics_summary if entity != "node" or not inventory: if any(word in q for word in METRIC_HINT_WORDS) and not metrics_summary: return "I don't have data to answer that right now." return "" expected_workers = expected_worker_nodes_from_metrics() filtered = _inventory_filter( inventory, include_hw=include_hw, exclude_hw=exclude_hw, only_workers=only_workers, only_ready=only_ready if op in ("status", "count") else None, nodes_in_query=nodes_in_query, ) if role_filters: filtered = [ node for node in filtered if role_filters.intersection(set(node.get("roles") or [])) ] names = [node["name"] for node in filtered] if op == "status": if "missing" in q and expected_workers: missing = sorted(set(expected_workers) - {n["name"] for n in inventory}) return _format_confidence( "Missing nodes: " + (", ".join(missing) if missing else "none") + ".", "high", ) if only_ready is False: return _format_confidence( "Not ready nodes: " + (", ".join(names) if names else "none") + ".", "high", ) if only_ready is True: return _format_confidence( f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + ".", "high", ) if op == "count": if expected_workers and ("expected" in q or "should" in q): missing = sorted(set(expected_workers) - {n["name"] for n in inventory}) msg = f"Grafana inventory expects {len(expected_workers)} worker nodes." if missing: msg += f" Missing: {', '.join(missing)}." return _format_confidence(msg, "high") if only_ready is True: return _format_confidence(f"Ready nodes: {len(names)}.", "high") if only_ready is False: return _format_confidence(f"Not ready nodes: {len(names)}.", "high") if not (include_hw or exclude_hw or nodes_in_query or only_workers or role_filters): return _format_confidence(f"Atlas has {len(names)} nodes.", "high") return _format_confidence(f"Matching nodes: {len(names)}.", "high") if op == "list": if nodes_in_query: parts = [] existing = {n["name"] for n in inventory} for node in nodes_in_query: parts.append(f"{node}: {'present' if node in existing else 'not present'}") return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high") if not names: return _format_confidence("Matching nodes: none.", "high") shown = names[:30] suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else "" return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high") return "" def _nodes_summary_line(inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None) -> str: summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {} nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {} total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total") ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready") not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready") if total is None: total = len(inventory) ready = len([n for n in inventory if n.get("ready") is True]) not_ready = len([n for n in inventory if n.get("ready") is False]) if total is None: return "" return f"Atlas cluster has {total} nodes ({ready} ready, {not_ready} not ready)." def _hardware_mix_line(inventory: list[dict[str, Any]]) -> str: if not inventory: return "" groups = _group_nodes(inventory) parts: list[str] = [] for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): nodes = groups.get(key) or [] if nodes: parts.append(f"{key}={len(nodes)}") if not parts: return "" return "Hardware mix: " + ", ".join(parts) + "." def _os_mix_line(snapshot: dict[str, Any] | None) -> str: if not snapshot: return "" details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else [] counts: dict[str, int] = collections.Counter() for node in details: if not isinstance(node, dict): continue os_name = (node.get("os") or "").strip() if os_name: counts[os_name] += 1 if not counts: return "" parts = [f"{os_name}={count}" for os_name, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))] return "OS mix: " + ", ".join(parts[:5]) + "." def _pods_summary_line(metrics: dict[str, Any]) -> str: if not metrics: return "" running = metrics.get("pods_running") pending = metrics.get("pods_pending") failed = metrics.get("pods_failed") succeeded = metrics.get("pods_succeeded") parts: list[str] = [] if running is not None: parts.append(f"{running:.0f} running") if pending is not None: parts.append(f"{pending:.0f} pending") if failed is not None: parts.append(f"{failed:.0f} failed") if succeeded is not None: parts.append(f"{succeeded:.0f} succeeded") if not parts: return "" return "Pods: " + ", ".join(parts) + "." def _postgres_summary_line(metrics: dict[str, Any]) -> str: if not metrics: return "" postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} if not postgres: return "" used = postgres.get("used") max_conn = postgres.get("max") hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {} parts: list[str] = [] if used is not None and max_conn is not None: parts.append(f"{used:.0f}/{max_conn:.0f} connections") if hottest.get("label"): hot_val = hottest.get("value") hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else "" parts.append(f"hottest {hottest.get('label')} ({hot_val_str})") if not parts: return "" return "Postgres: " + ", ".join(parts) + "." def _hottest_summary_line(metrics: dict[str, Any]) -> str: if not metrics: return "" hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} if not hottest: return "" parts: list[str] = [] for key in ("cpu", "ram", "net", "io"): entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {} node = entry.get("node") value = entry.get("value") if node and value is not None: value_fmt = _format_metric_value( str(value), percent=key in ("cpu", "ram"), rate=key in ("net", "io"), ) parts.append(f"{key.upper()} {node} ({value_fmt})") if not parts: return "" return "Hottest nodes: " + "; ".join(parts) + "." def cluster_overview_answer( prompt: str, *, inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None, ) -> str: if not inventory and not snapshot: return "" q = normalize_query(prompt) metrics = _snapshot_metrics(snapshot) lines: list[str] = [] nodes_line = _nodes_summary_line(inventory, snapshot) if nodes_line: lines.append(nodes_line) if any(word in q for word in ("hardware", "architecture", "nodes", "node", "cluster", "atlas", "titan", "lab")): hw_line = _hardware_mix_line(inventory) if hw_line: lines.append(hw_line) os_line = _os_mix_line(snapshot) if os_line: lines.append(os_line) if any( word in q for word in ( "interesting", "status", "health", "overview", "summary", "tell me", "what do you know", "about", "pods", "postgres", "connections", "hottest", "cpu", "ram", "memory", "net", "network", "io", "disk", "busy", "load", "usage", "utilization", ) ): pods_line = _pods_summary_line(metrics) if pods_line: lines.append(pods_line) hottest_line = _hottest_summary_line(metrics) if hottest_line: lines.append(hottest_line) postgres_line = _postgres_summary_line(metrics) if postgres_line: lines.append(postgres_line) if not lines: return "" return "Based on the snapshot, " + "\n".join(lines) def cluster_answer( prompt: str, *, inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None, workloads: list[dict[str, Any]] | None, ) -> str: metrics_summary = snapshot_context(prompt, snapshot) structured = structured_answer( prompt, inventory=inventory, metrics_summary=metrics_summary, snapshot=snapshot, workloads=workloads, ) if structured: return structured overview = cluster_overview_answer(prompt, inventory=inventory, snapshot=snapshot) if overview: kb_titles = kb_retrieve_titles(prompt, limit=4) if _knowledge_intent(prompt) else "" if kb_titles: overview = overview + "\n" + kb_titles return _format_confidence(overview, "medium") kb_titles = kb_retrieve_titles(prompt, limit=4) if kb_titles: return _format_confidence(kb_titles, "low") if metrics_summary: return _format_confidence(metrics_summary, "low") return "" def _metric_tokens(entry: dict[str, Any]) -> str: parts: list[str] = [] for key in ("panel_title", "dashboard", "description"): val = entry.get(key) if isinstance(val, str) and val: parts.append(val.lower()) tags = entry.get("tags") if isinstance(tags, list): parts.extend(str(t).lower() for t in tags if t) return " ".join(parts) def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]: q_tokens = _tokens(query) if not q_tokens or not _METRIC_INDEX: return [] scored: list[tuple[int, dict[str, Any]]] = [] for entry in _METRIC_INDEX: if not isinstance(entry, dict): continue hay = _metric_tokens(entry) if not hay: continue score = 0 for t in set(q_tokens): if t in hay: score += 2 if t in (entry.get("panel_title") or "").lower() else 1 if score: scored.append((score, entry)) scored.sort(key=lambda item: item[0], reverse=True) return [entry for _, entry in scored[:limit]] def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]: if not allow_tools: return "", "" lower = (prompt or "").lower() if not any(word in lower for word in METRIC_HINT_WORDS): return "", "" matches = metrics_lookup(prompt, limit=1) if not matches: return "", "" entry = matches[0] dashboard = entry.get("dashboard") or "dashboard" panel = entry.get("panel_title") or "panel" exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else [] if not exprs: return "", "" rendered_parts: list[str] = [] for expr in exprs[:2]: res = vm_query(expr, timeout=20) rendered = vm_render_result(res, limit=10) if rendered: rendered_parts.append(rendered) if not rendered_parts: return "", "" summary = "\n".join(rendered_parts) context = f"Metrics (from {dashboard} / {panel}):\n{summary}" return context, "" def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: q = (query or "").strip() if not q or not KB.get("catalog"): return "", [] ql = q.lower() hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} # Also match by known workload/service names. for t in _tokens(ql): if t in _NAME_INDEX: hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} edges: list[tuple[str, str]] = [] lines: list[str] = [] for host in sorted(hosts): for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" svc = backend.get("service") or "" path = ep.get("path") or "/" if not svc: continue wk = backend.get("workloads") or [] wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") for w in wk: if isinstance(w, dict) and w.get("name"): edges.append((ns, str(w["name"]))) if not lines: return "", [] return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges # Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. _K8S_TOKEN: str | None = None _K8S_CTX: ssl.SSLContext | None = None def _k8s_context() -> ssl.SSLContext: global _K8S_CTX if _K8S_CTX is not None: return _K8S_CTX ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" ctx = ssl.create_default_context(cafile=ca_path) _K8S_CTX = ctx return ctx def _k8s_token() -> str: global _K8S_TOKEN if _K8S_TOKEN: return _K8S_TOKEN token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" with open(token_path, "r", encoding="utf-8") as f: _K8S_TOKEN = f.read().strip() return _K8S_TOKEN def k8s_get(path: str, timeout: int = 8) -> dict: host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" if not host: raise RuntimeError("k8s host missing") url = f"https://{host}:{port}{path}" headers = {"Authorization": f"Bearer {_k8s_token()}"} r = request.Request(url, headers=headers, method="GET") with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def _ariadne_state(timeout: int = 5) -> dict | None: if not ARIADNE_STATE_URL: return None headers = {} if ARIADNE_STATE_TOKEN: headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET") try: with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() payload = json.loads(raw.decode()) if raw else {} return payload if isinstance(payload, dict) else None except Exception: return None _SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0} def _snapshot_state() -> dict[str, Any] | None: now = time.monotonic() cached = _SNAPSHOT_CACHE.get("payload") ts = _SNAPSHOT_CACHE.get("ts") or 0.0 if cached and now - ts < max(5, SNAPSHOT_TTL_SEC): return cached payload = _ariadne_state(timeout=10) if isinstance(payload, dict) and payload: _SNAPSHOT_CACHE["payload"] = payload _SNAPSHOT_CACHE["ts"] = now return payload return cached if isinstance(cached, dict) else None def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]: if not snapshot: return [] items = snapshot.get("nodes_detail") if not isinstance(items, list): return [] inventory: list[dict[str, Any]] = [] for node in items: if not isinstance(node, dict): continue labels = node.get("labels") if isinstance(node.get("labels"), dict) else {} name = node.get("name") or "" if not name: continue hardware = node.get("hardware") or _hardware_class(labels) inventory.append( { "name": name, "arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "", "hardware": hardware, "roles": node.get("roles") or [], "is_worker": node.get("is_worker") is True, "ready": node.get("ready") is True, } ) return sorted(inventory, key=lambda item: item["name"]) def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]: if not snapshot: return [] workloads = snapshot.get("workloads") return workloads if isinstance(workloads, list) else [] def k8s_pods(namespace: str) -> list[dict]: data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") items = data.get("items") or [] return items if isinstance(items, list) else [] def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: try: pods = k8s_pods(namespace) except Exception: return "" out: list[str] = [] for p in pods: md = p.get("metadata") or {} st = p.get("status") or {} name = md.get("name") or "" if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): continue phase = st.get("phase") or "?" cs = st.get("containerStatuses") or [] restarts = 0 ready = 0 total = 0 reason = st.get("reason") or "" for c in cs if isinstance(cs, list) else []: if not isinstance(c, dict): continue total += 1 restarts += int(c.get("restartCount") or 0) if c.get("ready"): ready += 1 state = c.get("state") or {} if not reason and isinstance(state, dict): waiting = state.get("waiting") or {} if isinstance(waiting, dict) and waiting.get("reason"): reason = waiting.get("reason") extra = f" ({reason})" if reason else "" out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") return "\n".join(out[:20]) def flux_not_ready() -> str: try: data = k8s_get( "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" ) except Exception: return "" items = data.get("items") or [] bad: list[str] = [] for it in items if isinstance(items, list) else []: md = it.get("metadata") or {} st = it.get("status") or {} name = md.get("name") or "" conds = st.get("conditions") or [] ready = None msg = "" for c in conds if isinstance(conds, list) else []: if isinstance(c, dict) and c.get("type") == "Ready": ready = c.get("status") msg = c.get("message") or "" if ready not in ("True", True): bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) return "\n".join(bad[:10]) # VictoriaMetrics (PromQL) helpers. def vm_query(query: str, timeout: int = 8) -> dict | None: try: url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) with request.urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception: return None def _vm_value_series(res: dict) -> list[dict]: if not res or (res.get("status") != "success"): return [] data = res.get("data") or {} result = data.get("result") or [] return result if isinstance(result, list) else [] def vm_render_result(res: dict | None, limit: int = 12) -> str: if not res: return "" series = _vm_value_series(res) if not series: return "" out: list[str] = [] for r in series[:limit]: if not isinstance(r, dict): continue metric = r.get("metric") or {} value = r.get("value") or [] val = value[1] if isinstance(value, list) and len(value) > 1 else "" # Prefer common labels if present. label_parts = [] for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): if isinstance(metric, dict) and metric.get(k): label_parts.append(f"{k}={metric.get(k)}") if not label_parts and isinstance(metric, dict): for k in sorted(metric.keys()): if k.startswith("__"): continue label_parts.append(f"{k}={metric.get(k)}") if len(label_parts) >= 4: break labels = ", ".join(label_parts) if label_parts else "series" out.append(f"- {labels}: {val}") return "\n".join(out) def _parse_metric_lines(summary: str) -> dict[str, str]: parsed: dict[str, str] = {} for line in (summary or "").splitlines(): line = line.strip() if not line.startswith("-"): continue try: label, value = line.lstrip("-").split(":", 1) except ValueError: continue parsed[label.strip()] = value.strip() return parsed def _metrics_fallback_summary(panel: str, summary: str) -> str: parsed = _parse_metric_lines(summary) panel_l = (panel or "").lower() if parsed: items = list(parsed.items()) if len(items) == 1: label, value = items[0] return f"{panel}: {label} = {value}." compact = "; ".join(f"{k}={v}" for k, v in items) return f"{panel}: {compact}." if panel_l: return f"{panel}: {summary}" return summary def _node_ready_status(node: dict) -> bool | None: conditions = node.get("status", {}).get("conditions") or [] for cond in conditions if isinstance(conditions, list) else []: if cond.get("type") == "Ready": if cond.get("status") == "True": return True if cond.get("status") == "False": return False return None return None def _node_is_worker(node: dict) -> bool: labels = (node.get("metadata") or {}).get("labels") or {} if labels.get("node-role.kubernetes.io/control-plane") is not None: return False if labels.get("node-role.kubernetes.io/master") is not None: return False if labels.get("node-role.kubernetes.io/worker") is not None: return True return True def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]: if inventory is None: inventory = node_inventory() ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True] not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False] return (sorted(ready_nodes), sorted(not_ready_nodes)) def expected_worker_nodes_from_metrics() -> list[str]: for entry in _METRIC_INDEX: panel = (entry.get("panel_title") or "").lower() if "worker nodes ready" not in panel: continue exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else [] for expr in exprs: if not isinstance(expr, str): continue match = NODE_REGEX.search(expr) if not match: continue raw = match.group(1) nodes = [n.strip() for n in raw.split("|") if n.strip()] return sorted(nodes) return [] def _context_fallback(context: str) -> str: if not context: return "" trimmed = context.strip() if len(trimmed) > MAX_TOOL_CHARS: trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..." return "Here is what I found:\n" + trimmed def vm_top_restarts(hours: int = 1) -> str: q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" res = vm_query(q) if not res or (res.get("status") != "success"): return "" out: list[str] = [] for r in (res.get("data") or {}).get("result") or []: if not isinstance(r, dict): continue m = r.get("metric") or {} v = r.get("value") or [] ns = (m.get("namespace") or "").strip() pod = (m.get("pod") or "").strip() val = v[1] if isinstance(v, list) and len(v) > 1 else "" if pod: out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") return "\n".join(out) def vm_cluster_snapshot() -> str: parts: list[str] = [] # Node readiness (kube-state-metrics). ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') if ready and not_ready: try: r = _vm_value_series(ready)[0]["value"][1] nr = _vm_value_series(not_ready)[0]["value"][1] parts.append(f"- nodes ready: {r} (not ready: {nr})") except Exception: pass phases = vm_query("sum by (phase) (kube_pod_status_phase)") pr = vm_render_result(phases, limit=8) if pr: parts.append("Pod phases:") parts.append(pr) return "\n".join(parts).strip() def _strip_code_fence(text: str) -> str: cleaned = (text or "").strip() match = CODE_FENCE_RE.match(cleaned) if match: return match.group(1).strip() return cleaned def _normalize_reply(value: Any) -> str: if isinstance(value, dict): for key in ("content", "response", "reply", "message"): if key in value: return _normalize_reply(value[key]) for v in value.values(): if isinstance(v, (str, dict, list)): return _normalize_reply(v) return json.dumps(value, ensure_ascii=False) if isinstance(value, list): parts = [_normalize_reply(item) for item in value] return " ".join(p for p in parts if p) if value is None: return "" text = _strip_code_fence(str(value)) if text.startswith("{") and text.endswith("}"): try: return _normalize_reply(json.loads(text)) except Exception: return _ensure_confidence(text) return _ensure_confidence(text) # Internal HTTP endpoint for cluster answers (website uses this). class _AtlasbotHandler(BaseHTTPRequestHandler): server_version = "AtlasbotHTTP/1.0" def _write_json(self, status: int, payload: dict[str, Any]): body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _authorized(self) -> bool: if not ATLASBOT_INTERNAL_TOKEN: return True token = self.headers.get("X-Internal-Token", "") return token == ATLASBOT_INTERNAL_TOKEN def do_GET(self): # noqa: N802 if self.path == "/health": self._write_json(200, {"status": "ok"}) return self._write_json(404, {"error": "not_found"}) def do_POST(self): # noqa: N802 if self.path != "/v1/answer": self._write_json(404, {"error": "not_found"}) return if not self._authorized(): self._write_json(401, {"error": "unauthorized"}) return try: length = int(self.headers.get("Content-Length", "0")) except ValueError: length = 0 raw = self.rfile.read(length) if length > 0 else b"" try: payload = json.loads(raw.decode("utf-8")) if raw else {} except json.JSONDecodeError: self._write_json(400, {"error": "invalid_json"}) return prompt = str(payload.get("prompt") or payload.get("question") or "").strip() if not prompt: self._write_json(400, {"error": "missing_prompt"}) return cleaned = _strip_bot_mention(prompt) snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) context = "" if cluster_query: context = build_context( cleaned, allow_tools=False, targets=[], inventory=inventory, snapshot=snapshot, workloads=workloads, ) fallback = "I don't have enough data to answer that." if cluster_query: answer = cluster_answer( cleaned, inventory=inventory, snapshot=snapshot, workloads=workloads, ) if not answer: answer = fallback else: llm_prompt = cleaned answer = ollama_reply( ("http", "internal"), llm_prompt, context=context, fallback=fallback, use_history=False, ) self._write_json(200, {"answer": answer}) def _start_http_server(): server = HTTPServer(("0.0.0.0", ATLASBOT_HTTP_PORT), _AtlasbotHandler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() # Conversation state. history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) def key_for(room_id: str, sender: str, is_dm: bool): return (room_id, None) if is_dm else (room_id, sender) def build_context( prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]], inventory: list[dict[str, Any]] | None = None, snapshot: dict[str, Any] | None = None, workloads: list[dict[str, Any]] | None = None, ) -> str: parts: list[str] = [] facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) if facts: parts.append(facts) snapshot_json = snapshot_compact_context( prompt, snapshot, inventory=inventory, workloads=workloads, ) if snapshot_json: parts.append(snapshot_json) endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) kb = kb_retrieve(prompt) if not kb and _knowledge_intent(prompt): kb = kb_retrieve_titles(prompt, limit=4) if kb: parts.append(kb) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) for ns, name in (targets or []) + (edges or []): if ns and name: prefixes_by_ns[ns].add(name) pod_lines: list[str] = [] for ns in sorted(prefixes_by_ns.keys()): summary = summarize_pods(ns, prefixes_by_ns[ns]) if summary: pod_lines.append(f"Pods (live):\n{summary}") if pod_lines: parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) flux_bad = flux_not_ready() if flux_bad: parts.append("Flux (not ready):\n" + flux_bad) return "\n\n".join([p for p in parts if p]).strip() def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str: if not snapshot: return "" metrics = _snapshot_metrics(snapshot) workloads = _snapshot_workloads(snapshot) q = normalize_query(prompt) parts: list[str] = [] nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {} if nodes.get("total") is not None: parts.append( f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}." ) if any(word in q for word in ("postgres", "connections", "db")): postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} if postgres: parts.append(f"Snapshot: postgres_connections={postgres}.") if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")): hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} if hottest: parts.append(f"Snapshot: hottest_nodes={hottest}.") if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")): match = _select_workload(prompt, workloads) if match: parts.append(f"Snapshot: workload={match}.") return "\n".join(parts).strip() def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else [] output: list[dict[str, Any]] = [] for node in details: if not isinstance(node, dict): continue name = node.get("name") if not name: continue output.append( { "name": name, "ready": node.get("ready"), "hardware": node.get("hardware"), "arch": node.get("arch"), "roles": node.get("roles"), "is_worker": node.get("is_worker"), "os": node.get("os"), "kernel": node.get("kernel"), "kubelet": node.get("kubelet"), "container_runtime": node.get("container_runtime"), } ) return output def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]: metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {} return { "pods_running": metrics.get("pods_running"), "pods_pending": metrics.get("pods_pending"), "pods_failed": metrics.get("pods_failed"), "pods_succeeded": metrics.get("pods_succeeded"), "postgres_connections": metrics.get("postgres_connections"), "hottest_nodes": metrics.get("hottest_nodes"), "node_usage": metrics.get("node_usage"), "top_restarts_1h": metrics.get("top_restarts_1h"), } def snapshot_compact_context( prompt: str, snapshot: dict[str, Any] | None, *, inventory: list[dict[str, Any]] | None, workloads: list[dict[str, Any]] | None, ) -> str: if not snapshot: return "" compact = { "collected_at": snapshot.get("collected_at"), "nodes_summary": snapshot.get("nodes_summary"), "expected_workers": expected_worker_nodes_from_metrics(), "nodes_detail": _compact_nodes_detail(snapshot), "workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [], "metrics": _compact_metrics(snapshot), "flux": snapshot.get("flux"), "errors": snapshot.get("errors"), } text = json.dumps(compact, ensure_ascii=False) if len(text) > MAX_FACTS_CHARS: text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..." return "Cluster snapshot (JSON):\n" + text def _knowledge_intent(prompt: str) -> bool: q = normalize_query(prompt) return any( phrase in q for phrase in ( "what do you know", "tell me about", "interesting", "overview", "summary", "describe", "explain", "what is", ) ) def _is_cluster_query( prompt: str, *, inventory: list[dict[str, Any]] | None, workloads: list[dict[str, Any]] | None, ) -> bool: q = normalize_query(prompt) if not q: return False if TITAN_NODE_RE.search(q): return True if any(word in q for word in CLUSTER_HINT_WORDS): return True for host_match in HOST_RE.finditer(q): host = host_match.group(1).lower() if host.endswith("bstein.dev"): return True tokens = set(_tokens(q)) if _NAME_INDEX and tokens & _NAME_INDEX: return True return False def _inventory_summary(inventory: list[dict[str, Any]]) -> str: if not inventory: return "" groups = _group_nodes(inventory) total = len(inventory) ready = [n for n in inventory if n.get("ready") is True] not_ready = [n for n in inventory if n.get("ready") is False] parts = [f"Atlas cluster: {total} nodes ({len(ready)} ready, {len(not_ready)} not ready)."] for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): nodes = groups.get(key) or [] if nodes: parts.append(f"- {key}: {len(nodes)} nodes ({', '.join(nodes)})") return "\n".join(parts) def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str: parts: list[str] = [] inv = _inventory_summary(inventory) if inv: parts.append(inv) kb_titles = kb_retrieve_titles(prompt, limit=4) if kb_titles: parts.append(kb_titles) summary = "\n".join(parts).strip() return _format_confidence(summary, "medium") if summary else "" def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " "Use the provided context and facts as your source of truth. " "If the context includes a cluster snapshot, treat the question as about the Atlas/Othrys cluster even if the prompt is ambiguous. " "When a cluster snapshot is provided, never answer about unrelated meanings of 'Atlas' (maps, mythology, Apache Atlas, etc). " "Treat 'hottest' as highest utilization (CPU/RAM/NET/IO) rather than temperature. " "If you infer or synthesize, say 'Based on the snapshot' and keep it brief. " "Prefer exact repo paths and Kubernetes resource names when relevant. " "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " "Translate metrics into natural language instead of echoing raw label/value pairs. " "Do not answer by only listing runbooks; if the question is about Atlas/Othrys, summarize the cluster first and mention docs only if useful. " "If the question is not about Atlas/Othrys and no cluster context is provided, answer using general knowledge and say when you are unsure. " "If the answer is not grounded in the provided context or tool data, say you do not know. " "End every response with a line: 'Confidence: high|medium|low'." ) endpoint = _ollama_endpoint() if not endpoint: raise RuntimeError("ollama endpoint missing") messages: list[dict[str, str]] = [{"role": "system", "content": system}] if context: messages.append({"role": "user", "content": "Context (grounded):\n" + context[:MAX_CONTEXT_CHARS]}) if use_history: messages.extend(_history_to_messages(history[hist_key][-24:])) messages.append({"role": "user", "content": prompt}) payload = {"model": MODEL, "messages": messages, "stream": False} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(endpoint, data=json.dumps(payload).encode(), headers=headers) lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None if lock: lock.acquire() try: with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: data = json.loads(resp.read().decode()) msg = data.get("message") if isinstance(data, dict) else None if isinstance(msg, dict): raw_reply = msg.get("content") else: raw_reply = data.get("response") or data.get("reply") or data reply = _normalize_reply(raw_reply) or "I'm here to help." if use_history: history[hist_key].append(f"Atlas: {reply}") return reply finally: if lock: lock.release() def ollama_reply( hist_key, prompt: str, *, context: str, fallback: str = "", use_history: bool = True, ) -> str: last_error = None for attempt in range(max(1, OLLAMA_RETRIES + 1)): try: return _ollama_call(hist_key, prompt, context=context, use_history=use_history) except Exception as exc: # noqa: BLE001 last_error = exc time.sleep(min(4, 2 ** attempt)) if fallback: if use_history: history[hist_key].append(f"Atlas: {fallback}") return fallback return "I don't have enough data to answer that." def ollama_reply_with_thinking( token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str, use_history: bool = True, ) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() def worker(): result["reply"] = ollama_reply( hist_key, prompt, context=context, fallback=fallback, use_history=use_history, ) done.set() thread = threading.Thread(target=worker, daemon=True) thread.start() if not done.wait(2.0): send_msg(token, room, "Thinking…") prompt_hint = " ".join((prompt or "").split()) if len(prompt_hint) > 160: prompt_hint = prompt_hint[:157] + "…" heartbeat = max(10, THINKING_INTERVAL_SEC) next_heartbeat = time.monotonic() + heartbeat while not done.wait(max(0, next_heartbeat - time.monotonic())): if prompt_hint: send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)") else: send_msg(token, room, "Still thinking (gathering context)…") next_heartbeat += heartbeat thread.join(timeout=1) return result["reply"] or fallback or "Model backend is busy. Try again in a moment." def sync_loop(token: str, room_id: str): since = None try: res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) since = res.get("next_batch") except Exception: pass while True: params = {"timeout": 30000} if since: params["since"] = since query = parse.urlencode(params) try: res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) except Exception: time.sleep(5) continue since = res.get("next_batch", since) # invites for rid, data in res.get("rooms", {}).get("invite", {}).items(): try: join_room(token, rid) except Exception: pass # messages for rid, data in res.get("rooms", {}).get("join", {}).items(): timeline = data.get("timeline", {}).get("events", []) joined_count = data.get("summary", {}).get("m.joined_member_count") is_dm = joined_count is not None and joined_count <= 2 for ev in timeline: if ev.get("type") != "m.room.message": continue content = ev.get("content", {}) body = (content.get("body", "") or "").strip() if not body: continue sender = ev.get("sender", "") if sender == f"@{USER}:live.bstein.dev": continue mentioned = is_mentioned(content, body) hist_key = key_for(rid, sender, is_dm) history[hist_key].append(f"{sender}: {body}") history[hist_key] = history[hist_key][-80:] if not (is_dm or mentioned): continue cleaned_body = _strip_bot_mention(body) lower_body = cleaned_body.lower() # Only do live cluster introspection in DMs. allow_tools = is_dm promql = "" if allow_tools: m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) if m: promql = m.group(1).strip() # Attempt to scope tools to the most likely workloads when hostnames are mentioned. targets: list[tuple[str, str]] = [] for m in HOST_RE.finditer(lower_body): host = m.group(1).lower() for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} ns = backend.get("namespace") or "" for w in backend.get("workloads") or []: if isinstance(w, dict) and w.get("name"): targets.append((ns, str(w["name"]))) snapshot = _snapshot_state() inventory = node_inventory_for_prompt(cleaned_body) if not inventory: inventory = _snapshot_inventory(snapshot) workloads = _snapshot_workloads(snapshot) cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) context = "" if cluster_query: context = build_context( cleaned_body, allow_tools=allow_tools, targets=targets, inventory=inventory, snapshot=snapshot, workloads=workloads, ) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered send_msg(token, rid, extra) continue fallback = "I don't have enough data to answer that." if cluster_query: reply = cluster_answer( cleaned_body, inventory=inventory, snapshot=snapshot, workloads=workloads, ) if not reply: reply = fallback else: llm_prompt = cleaned_body reply = ollama_reply_with_thinking( token, rid, hist_key, llm_prompt, context=context, fallback=fallback, use_history=False, ) send_msg(token, rid, reply) def login_with_retry(): last_err = None for attempt in range(10): try: return login() except Exception as exc: # noqa: BLE001 last_err = exc time.sleep(min(30, 2 ** attempt)) raise last_err def main(): load_kb() _start_http_server() token = login_with_retry() try: room_id = resolve_alias(token, ROOM_ALIAS) join_room(token, room_id) except Exception: room_id = None sync_loop(token, room_id) if __name__ == "__main__": main()