diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index c96c79c..2c08853 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-11 + checksum/atlasbot-configmap: manual-atlasbot-12 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 18ec611..8edc28d 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -3,6 +3,7 @@ import json import os import re import ssl +import threading import time from typing import Any from urllib import error, parse, request @@ -156,6 +157,13 @@ def send_msg(token: str, room: str, text: str): KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() +_NODE_CLASS_INDEX: dict[str, list[str]] = {} +_NODE_CLASS_RPI4: set[str] = set() +_NODE_CLASS_RPI5: set[str] = set() +_NODE_CLASS_AMD64: set[str] = set() +_NODE_CLASS_JETSON: set[str] = set() +_NODE_CLASS_EXTERNAL: set[str] = set() +_NODE_CLASS_NON_RPI: set[str] = set() def _load_json_file(path: str) -> Any | None: try: @@ -166,6 +174,8 @@ def _load_json_file(path: str) -> Any | None: def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX + global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON + global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} @@ -188,6 +198,24 @@ def load_kb(): names.add(str(w["name"]).lower()) _NAME_INDEX = names + node_classes = _parse_node_classes(runbooks) + _NODE_CLASS_INDEX = node_classes + _NODE_CLASS_RPI4 = set(node_classes.get("rpi4", [])) + _NODE_CLASS_RPI5 = set(node_classes.get("rpi5", [])) + _NODE_CLASS_AMD64 = set(node_classes.get("amd64", [])) + _NODE_CLASS_JETSON = set(node_classes.get("jetson", [])) + _NODE_CLASS_EXTERNAL = set(node_classes.get("external", [])) + _NODE_CLASS_NON_RPI = set( + sorted( + ( + set().union(*node_classes.values()) + - _NODE_CLASS_RPI4 + - _NODE_CLASS_RPI5 + - _NODE_CLASS_EXTERNAL + ) + ) + ) + def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q or not KB.get("runbooks"): @@ -237,6 +265,12 @@ def kb_retrieve(query: str, *, limit: int = 3) -> str: def _extract_titan_nodes(text: str) -> list[str]: names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n} + for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE): + tail = match.group(1) + for part in re.split(r"[/,]", tail): + part = part.strip() + if part: + names.add(f"titan-{part.lower()}") for match in TITAN_RANGE_RE.finditer(text or ""): left, right = match.groups() if left: @@ -245,6 +279,83 @@ def _extract_titan_nodes(text: str) -> list[str]: names.add(f"titan-{right.lower()}") return sorted(names) +def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]: + classes: dict[str, list[str]] = {} + for doc in runbooks: + if not isinstance(doc, dict): + continue + body = str(doc.get("body") or "") + for line in body.splitlines(): + stripped = line.strip() + if "titan-" not in stripped.lower(): + continue + label = "" + nodes: list[str] = [] + if stripped.startswith("-") and ":" in stripped: + label, rest = stripped.lstrip("-").split(":", 1) + nodes = _extract_titan_nodes(rest) + label = label.strip().lower() + else: + nodes = _extract_titan_nodes(stripped) + if not nodes: + continue + if "jetson" in stripped.lower(): + classes.setdefault("jetson", nodes) + if "amd64" in stripped.lower() or "x86" in stripped.lower(): + classes.setdefault("amd64", nodes) + if "rpi4" in stripped.lower(): + classes.setdefault("rpi4", nodes) + if "rpi5" in stripped.lower(): + classes.setdefault("rpi5", nodes) + if "external" in stripped.lower() or "non-cluster" in stripped.lower(): + classes.setdefault("external", nodes) + if label: + classes.setdefault(label, nodes) + return {k: sorted(set(v)) for k, v in classes.items()} + +def node_inventory_answer(cluster_name: str, query: str) -> str: + q = (query or "").lower() + if "jetson" in q and _NODE_CLASS_JETSON: + names = sorted(_NODE_CLASS_JETSON) + return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." + if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q: + names = sorted(_NODE_CLASS_NON_RPI) + if names: + return f"{cluster_name} non‑Raspberry Pi nodes: {', '.join(names)}." + if "raspberry" in q or "rpi" in q: + if "rpi4" in q and _NODE_CLASS_RPI4: + names = sorted(_NODE_CLASS_RPI4) + return f"{cluster_name} rpi4 nodes: {', '.join(names)}." + if "rpi5" in q and _NODE_CLASS_RPI5: + names = sorted(_NODE_CLASS_RPI5) + return f"{cluster_name} rpi5 nodes: {', '.join(names)}." + names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5) + if names: + return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}." + if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64: + names = sorted(_NODE_CLASS_AMD64) + return f"{cluster_name} amd64 nodes: {', '.join(names)}." + return "" + +def node_inventory_context(query: str) -> str: + q = (query or "").lower() + if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")): + return "" + lines: list[str] = ["Node inventory (KB):"] + if _NODE_CLASS_RPI5: + lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}") + if _NODE_CLASS_RPI4: + lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}") + if _NODE_CLASS_JETSON: + lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}") + if _NODE_CLASS_AMD64: + lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}") + if _NODE_CLASS_EXTERNAL: + lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}") + if len(lines) == 1: + return "" + return "\n".join(lines) + def jetson_nodes_from_kb() -> list[str]: for doc in KB.get("runbooks", []): if not isinstance(doc, dict): @@ -627,6 +738,10 @@ def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, st if endpoints: parts.append(endpoints) + inventory = node_inventory_context(prompt) + if inventory: + parts.append(inventory) + if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) @@ -656,35 +771,58 @@ def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, st return "\n\n".join([p for p in parts if p]).strip() -def ollama_reply(hist_key, prompt: str, *, context: str) -> str: - try: - system = ( - "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " - "Be helpful, direct, and concise. " - "Prefer answering with exact repo paths and Kubernetes resource names. " - "Never include or request secret values. " - "Respond in plain sentences; do not return JSON or code fences unless explicitly asked." - ) - transcript_parts = [system] - if context: - transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) - transcript_parts.extend(history[hist_key][-24:]) - transcript_parts.append(f"User: {prompt}") - transcript = "\n".join(transcript_parts) +def _ollama_call(hist_key, prompt: str, *, context: str) -> str: + system = ( + "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " + "Be helpful, direct, and concise. " + "Prefer answering with exact repo paths and Kubernetes resource names. " + "Never include or request secret values. " + "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " + "If the answer is not grounded in the provided context or tool data, say you do not know." + ) + transcript_parts = [system] + if context: + transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) + transcript_parts.extend(history[hist_key][-24:]) + transcript_parts.append(f"User: {prompt}") + transcript = "\n".join(transcript_parts) - payload = {"model": MODEL, "message": transcript} - headers = {"Content-Type": "application/json"} - if API_KEY: - headers["x-api-key"] = API_KEY - r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) - with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: - data = json.loads(resp.read().decode()) - raw_reply = data.get("message") or data.get("response") or data.get("reply") or data - reply = _normalize_reply(raw_reply) or "I'm here to help." - history[hist_key].append(f"Atlas: {reply}") - return reply + payload = {"model": MODEL, "message": transcript} + headers = {"Content-Type": "application/json"} + if API_KEY: + headers["x-api-key"] = API_KEY + r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) + with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: + data = json.loads(resp.read().decode()) + raw_reply = data.get("message") or data.get("response") or data.get("reply") or data + reply = _normalize_reply(raw_reply) or "I'm here to help." + history[hist_key].append(f"Atlas: {reply}") + return reply + +def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str: + try: + return _ollama_call(hist_key, prompt, context=context) except Exception: - return "I’m here — but I couldn’t reach the model backend." + if fallback: + history[hist_key].append(f"Atlas: {fallback}") + return fallback + return "Model backend is busy. Try again in a moment." + +def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str: + result: dict[str, str] = {"reply": ""} + done = threading.Event() + + def worker(): + result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback) + done.set() + + thread = threading.Thread(target=worker, daemon=True) + thread.start() + if not done.wait(2.0): + send_msg(token, room, "Thinking…") + done.wait() + thread.join(timeout=1) + return result["reply"] or fallback or "Model backend is busy. Try again in a moment." def sync_loop(token: str, room_id: str): since = None @@ -747,6 +885,10 @@ def sync_loop(token: str, room_id: str): continue send_msg(token, rid, summary) continue + inventory_answer = node_inventory_answer("Atlas", lower_body) + if inventory_answer: + send_msg(token, rid, inventory_answer) + continue if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")): if any(word in lower_body for word in ("cluster", "atlas", "titan")): arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64" @@ -760,14 +902,6 @@ def sync_loop(token: str, room_id: str): continue send_msg(token, rid, summary) continue - if "jetson" in lower_body: - if any(word in lower_body for word in ("cluster", "atlas", "titan", "node", "nodes")): - summary = jetson_nodes_summary("Atlas") - if summary: - send_msg(token, rid, summary) - else: - send_msg(token, rid, "Jetson inventory is not available in the knowledge base yet.") - continue if re.search(r"\bnode names?\b|\bnodes? named\b|\bnaming\b", lower_body): if any(word in lower_body for word in ("cluster", "atlas", "titan")): names_summary = nodes_names_summary("Atlas") @@ -803,7 +937,17 @@ def sync_loop(token: str, room_id: str): rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra - reply = ollama_reply(hist_key, body, context=context) + fallback = "" + if "node" in lower_body or "cluster" in lower_body: + fallback = node_inventory_answer("Atlas", lower_body) + reply = ollama_reply_with_thinking( + token, + rid, + hist_key, + body, + context=context, + fallback=fallback, + ) send_msg(token, rid, reply) def login_with_retry():