From cd45b7faba60cf8fb045e78308ebd70fc5f7866a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 14:38:05 -0300 Subject: [PATCH] atlasbot: ignore mentions and gate cluster context --- services/comms/scripts/atlasbot/bot.py | 193 +++++++++++++++++++------ 1 file changed, 146 insertions(+), 47 deletions(-) diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index f7cfd82..26fe7ef 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -121,6 +121,49 @@ METRIC_HINTS = { "pods": ("pods", "pod"), } +CLUSTER_HINT_WORDS = { + "atlas", + "titan", + "cluster", + "k8s", + "kubernetes", + "node", + "nodes", + "pod", + "pods", + "namespace", + "service", + "deployment", + "daemonset", + "statefulset", + "grafana", + "victoria", + "prometheus", + "ariadne", + "mailu", + "nextcloud", + "vaultwarden", + "firefly", + "wger", + "jellyfin", + "planka", + "budget", + "element", + "synapse", + "mas", + "comms", + "longhorn", + "harbor", + "jenkins", + "gitea", + "flux", + "keycloak", + "postgres", + "database", + "db", + "atlasbot", +} + _OLLAMA_LOCK = threading.Lock() HARDWARE_HINTS = { @@ -231,6 +274,18 @@ def is_mentioned(content: dict, body: str) -> bool: return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) +def _strip_bot_mention(text: str) -> str: + if not text: + return "" + if not MENTION_LOCALPARTS: + return text.strip() + names = [re.escape(name) for name in MENTION_LOCALPARTS if name] + if not names: + return text.strip() + pattern = r"^(?:\s*@?(?:" + "|".join(names) + r")(?::)?\s+)+" + cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE).strip() + return cleaned or text.strip() + # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): @@ -1780,33 +1835,38 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): if not prompt: self._write_json(400, {"error": "missing_prompt"}) return + cleaned = _strip_bot_mention(prompt) snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) - metrics_summary = snapshot_context(prompt, snapshot) - structured = structured_answer( - prompt, - inventory=inventory, - metrics_summary=metrics_summary, - snapshot=snapshot, - workloads=workloads, - ) - if structured: - self._write_json(200, {"answer": structured}) - return - context = build_context( - prompt, - allow_tools=False, - targets=[], - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - ) - metrics_context, _metrics_fallback = metrics_query_context(prompt, allow_tools=True) - if metrics_context: - context = (context + "\n\n" + metrics_context).strip() if context else metrics_context + cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) + metrics_summary = snapshot_context(cleaned, snapshot) if cluster_query else "" + if cluster_query: + structured = structured_answer( + cleaned, + inventory=inventory, + metrics_summary=metrics_summary, + snapshot=snapshot, + workloads=workloads, + ) + if structured: + self._write_json(200, {"answer": structured}) + return + context = "" + if cluster_query: + context = build_context( + cleaned, + allow_tools=False, + targets=[], + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + ) + metrics_context, _metrics_fallback = metrics_query_context(cleaned, allow_tools=True) + if metrics_context: + context = (context + "\n\n" + metrics_context).strip() if context else metrics_context fallback = "I don't have enough data to answer that." - answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback) + answer = ollama_reply(("http", "internal"), cleaned, context=context, fallback=fallback) self._write_json(200, {"answer": answer}) @@ -1920,6 +1980,37 @@ def _knowledge_intent(prompt: str) -> bool: ) +def _is_cluster_query( + prompt: str, + *, + inventory: list[dict[str, Any]] | None, + workloads: list[dict[str, Any]] | None, +) -> bool: + q = normalize_query(prompt) + if not q: + return False + if TITAN_NODE_RE.search(q): + return True + if any(word in q for word in CLUSTER_HINT_WORDS): + return True + for host_match in HOST_RE.finditer(q): + host = host_match.group(1).lower() + if host.endswith("bstein.dev"): + return True + tokens = set(_tokens(q)) + if workloads: + for entry in workloads: + if not isinstance(entry, dict): + continue + if tokens & _workload_tokens(entry): + return True + if inventory: + names = {node.get("name") for node in inventory if isinstance(node, dict)} + if tokens & {n for n in names if n}: + return True + return False + + def _inventory_summary(inventory: list[dict[str, Any]]) -> str: if not inventory: return "" @@ -1958,7 +2049,8 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " "Translate metrics into natural language instead of echoing raw label/value pairs. " - "Do not answer by only listing runbooks; summarize the cluster first and mention docs only if useful. " + "Do not answer by only listing runbooks; if the question is about Atlas/Othrys, summarize the cluster first and mention docs only if useful. " + "If the question is not about Atlas/Othrys and no cluster context is provided, answer using general knowledge and say when you are unsure. " "If the answer is not grounded in the provided context or tool data, say you do not know. " "End every response with a line: 'Confidence: high|medium|low'." ) @@ -2087,7 +2179,8 @@ def sync_loop(token: str, room_id: str): if not (is_dm or mentioned): continue - lower_body = body.lower() + cleaned_body = _strip_bot_mention(body) + lower_body = cleaned_body.lower() # Only do live cluster introspection in DMs; metrics can be answered when mentioned. allow_tools = is_dm @@ -2101,7 +2194,7 @@ def sync_loop(token: str, room_id: str): # Attempt to scope tools to the most likely workloads when hostnames are mentioned. targets: list[tuple[str, str]] = [] - for m in HOST_RE.finditer(body.lower()): + for m in HOST_RE.finditer(lower_body): host = m.group(1).lower() for ep in _HOST_INDEX.get(host, []): backend = ep.get("backend") or {} @@ -2111,39 +2204,45 @@ def sync_loop(token: str, room_id: str): targets.append((ns, str(w["name"]))) snapshot = _snapshot_state() - inventory = node_inventory_for_prompt(body) + inventory = node_inventory_for_prompt(cleaned_body) if not inventory: inventory = _snapshot_inventory(snapshot) workloads = _snapshot_workloads(snapshot) - metrics_summary = snapshot_context(body, snapshot) - structured = structured_answer( - body, - inventory=inventory, - metrics_summary=metrics_summary, - snapshot=snapshot, - workloads=workloads, - ) + cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) + metrics_summary = snapshot_context(cleaned_body, snapshot) if cluster_query else "" + structured = "" + if cluster_query: + structured = structured_answer( + cleaned_body, + inventory=inventory, + metrics_summary=metrics_summary, + snapshot=snapshot, + workloads=workloads, + ) if structured: history[hist_key].append(f"Atlas: {structured}") history[hist_key] = history[hist_key][-80:] send_msg(token, rid, structured) continue - context = build_context( - body, - allow_tools=allow_tools, - targets=targets, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - ) + context = "" + if cluster_query: + context = build_context( + cleaned_body, + allow_tools=allow_tools, + targets=targets, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + ) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra - metrics_context, _metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics) - if metrics_context: - context = (context + "\n\n" + metrics_context).strip() if context else metrics_context + if cluster_query: + metrics_context, _metrics_fallback = metrics_query_context(cleaned_body, allow_tools=allow_metrics) + if metrics_context: + context = (context + "\n\n" + metrics_context).strip() if context else metrics_context fallback = "I don't have enough data to answer that." @@ -2151,7 +2250,7 @@ def sync_loop(token: str, room_id: str): token, rid, hist_key, - body, + cleaned_body, context=context, fallback=fallback, )