From 92f4137e9c161bf5725469f595f771dd0eba78a3 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 14:54:09 -0300 Subject: [PATCH] atlasbot: simplify cluster gating and context --- services/comms/atlasbot-deployment.yaml | 2 +- services/comms/scripts/atlasbot/bot.py | 197 ++++++++++++++++-------- 2 files changed, 133 insertions(+), 66 deletions(-) diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index aa91fdf..a2b0a3c 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-41 + checksum/atlasbot-configmap: manual-atlasbot-42 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 26fe7ef..64097da 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -65,6 +65,16 @@ STOPWORDS = { "help", "atlas", "othrys", + "system", + "systems", + "service", + "services", + "app", + "apps", + "platform", + "software", + "tool", + "tools", } METRIC_HINT_WORDS = { @@ -129,6 +139,8 @@ CLUSTER_HINT_WORDS = { "kubernetes", "node", "nodes", + "worker", + "workers", "pod", "pods", "namespace", @@ -162,6 +174,11 @@ CLUSTER_HINT_WORDS = { "database", "db", "atlasbot", + "jetson", + "rpi", + "raspberry", + "amd64", + "arm64", } _OLLAMA_LOCK = threading.Lock() @@ -1840,18 +1857,6 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) - metrics_summary = snapshot_context(cleaned, snapshot) if cluster_query else "" - if cluster_query: - structured = structured_answer( - cleaned, - inventory=inventory, - metrics_summary=metrics_summary, - snapshot=snapshot, - workloads=workloads, - ) - if structured: - self._write_json(200, {"answer": structured}) - return context = "" if cluster_query: context = build_context( @@ -1862,11 +1867,14 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): snapshot=snapshot, workloads=workloads, ) - metrics_context, _metrics_fallback = metrics_query_context(cleaned, allow_tools=True) - if metrics_context: - context = (context + "\n\n" + metrics_context).strip() if context else metrics_context fallback = "I don't have enough data to answer that." - answer = ollama_reply(("http", "internal"), cleaned, context=context, fallback=fallback) + answer = ollama_reply( + ("http", "internal"), + cleaned, + context=context, + fallback=fallback, + use_history=False, + ) self._write_json(200, {"answer": answer}) @@ -1897,6 +1905,15 @@ def build_context( if facts: parts.append(facts) + snapshot_json = snapshot_compact_context( + prompt, + snapshot, + inventory=inventory, + workloads=workloads, + ) + if snapshot_json: + parts.append(snapshot_json) + endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) @@ -1925,15 +1942,6 @@ def build_context( if flux_bad: parts.append("Flux (not ready):\n" + flux_bad) - p_l = (prompt or "").lower() - if any(w in p_l for w in METRIC_HINT_WORDS): - restarts = vm_top_restarts(1) - if restarts: - parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) - snap = vm_cluster_snapshot() - if snap: - parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) - return "\n\n".join([p for p in parts if p]).strip() @@ -1963,6 +1971,68 @@ def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str: parts.append(f"Snapshot: workload={match}.") return "\n".join(parts).strip() +def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: + details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else [] + output: list[dict[str, Any]] = [] + for node in details: + if not isinstance(node, dict): + continue + name = node.get("name") + if not name: + continue + output.append( + { + "name": name, + "ready": node.get("ready"), + "hardware": node.get("hardware"), + "arch": node.get("arch"), + "roles": node.get("roles"), + "is_worker": node.get("is_worker"), + "os": node.get("os"), + "kernel": node.get("kernel"), + "kubelet": node.get("kubelet"), + "container_runtime": node.get("container_runtime"), + } + ) + return output + +def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]: + metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {} + return { + "pods_running": metrics.get("pods_running"), + "pods_pending": metrics.get("pods_pending"), + "pods_failed": metrics.get("pods_failed"), + "pods_succeeded": metrics.get("pods_succeeded"), + "postgres_connections": metrics.get("postgres_connections"), + "hottest_nodes": metrics.get("hottest_nodes"), + "node_usage": metrics.get("node_usage"), + "top_restarts_1h": metrics.get("top_restarts_1h"), + } + +def snapshot_compact_context( + prompt: str, + snapshot: dict[str, Any] | None, + *, + inventory: list[dict[str, Any]] | None, + workloads: list[dict[str, Any]] | None, +) -> str: + if not snapshot: + return "" + compact = { + "collected_at": snapshot.get("collected_at"), + "nodes_summary": snapshot.get("nodes_summary"), + "expected_workers": expected_worker_nodes_from_metrics(), + "nodes_detail": _compact_nodes_detail(snapshot), + "workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [], + "metrics": _compact_metrics(snapshot), + "flux": snapshot.get("flux"), + "errors": snapshot.get("errors"), + } + text = json.dumps(compact, ensure_ascii=False) + if len(text) > MAX_FACTS_CHARS: + text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..." + return "Cluster snapshot (JSON):\n" + text + def _knowledge_intent(prompt: str) -> bool: q = normalize_query(prompt) @@ -1998,16 +2068,8 @@ def _is_cluster_query( if host.endswith("bstein.dev"): return True tokens = set(_tokens(q)) - if workloads: - for entry in workloads: - if not isinstance(entry, dict): - continue - if tokens & _workload_tokens(entry): - return True - if inventory: - names = {node.get("name") for node in inventory if isinstance(node, dict)} - if tokens & {n for n in names if n}: - return True + if _NAME_INDEX and tokens & _NAME_INDEX: + return True return False @@ -2037,7 +2099,7 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str: summary = "\n".join(parts).strip() return _format_confidence(summary, "medium") if summary else "" -def _ollama_call(hist_key, prompt: str, *, context: str) -> str: +def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " @@ -2062,7 +2124,8 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: system_content += "\n\nContext (grounded):\n" + context[:MAX_CONTEXT_CHARS] messages: list[dict[str, str]] = [{"role": "system", "content": system_content}] - messages.extend(_history_to_messages(history[hist_key][-24:])) + if use_history: + messages.extend(_history_to_messages(history[hist_key][-24:])) messages.append({"role": "user", "content": prompt}) payload = {"model": MODEL, "messages": messages, "stream": False} @@ -2082,31 +2145,55 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: else: raw_reply = data.get("response") or data.get("reply") or data reply = _normalize_reply(raw_reply) or "I'm here to help." - history[hist_key].append(f"Atlas: {reply}") + if use_history: + history[hist_key].append(f"Atlas: {reply}") return reply finally: if lock: lock.release() -def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str: +def ollama_reply( + hist_key, + prompt: str, + *, + context: str, + fallback: str = "", + use_history: bool = True, +) -> str: last_error = None for attempt in range(max(1, OLLAMA_RETRIES + 1)): try: - return _ollama_call(hist_key, prompt, context=context) + return _ollama_call(hist_key, prompt, context=context, use_history=use_history) except Exception as exc: # noqa: BLE001 last_error = exc time.sleep(min(4, 2 ** attempt)) if fallback: - history[hist_key].append(f"Atlas: {fallback}") + if use_history: + history[hist_key].append(f"Atlas: {fallback}") return fallback return "I don't have enough data to answer that." -def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str: +def ollama_reply_with_thinking( + token: str, + room: str, + hist_key, + prompt: str, + *, + context: str, + fallback: str, + use_history: bool = True, +) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() def worker(): - result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback) + result["reply"] = ollama_reply( + hist_key, + prompt, + context=context, + fallback=fallback, + use_history=use_history, + ) done.set() thread = threading.Thread(target=worker, daemon=True) @@ -2182,9 +2269,8 @@ def sync_loop(token: str, room_id: str): cleaned_body = _strip_bot_mention(body) lower_body = cleaned_body.lower() - # Only do live cluster introspection in DMs; metrics can be answered when mentioned. + # Only do live cluster introspection in DMs. allow_tools = is_dm - allow_metrics = is_dm or mentioned promql = "" if allow_tools: @@ -2209,21 +2295,6 @@ def sync_loop(token: str, room_id: str): inventory = _snapshot_inventory(snapshot) workloads = _snapshot_workloads(snapshot) cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) - metrics_summary = snapshot_context(cleaned_body, snapshot) if cluster_query else "" - structured = "" - if cluster_query: - structured = structured_answer( - cleaned_body, - inventory=inventory, - metrics_summary=metrics_summary, - snapshot=snapshot, - workloads=workloads, - ) - if structured: - history[hist_key].append(f"Atlas: {structured}") - history[hist_key] = history[hist_key][-80:] - send_msg(token, rid, structured) - continue context = "" if cluster_query: context = build_context( @@ -2239,11 +2310,6 @@ def sync_loop(token: str, room_id: str): rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra - if cluster_query: - metrics_context, _metrics_fallback = metrics_query_context(cleaned_body, allow_tools=allow_metrics) - if metrics_context: - context = (context + "\n\n" + metrics_context).strip() if context else metrics_context - fallback = "I don't have enough data to answer that." reply = ollama_reply_with_thinking( @@ -2253,6 +2319,7 @@ def sync_loop(token: str, room_id: str): cleaned_body, context=context, fallback=fallback, + use_history=cluster_query, ) send_msg(token, rid, reply)