From 89935a579ab8cdb53175d32ec1a54fdfc63561b7 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 05:41:58 -0300 Subject: [PATCH] atlasbot: use cluster snapshot + model update --- services/ai-llm/deployment.yaml | 4 +- services/comms/atlasbot-deployment.yaml | 6 +- services/comms/scripts/atlasbot/bot.py | 368 +++++++++++++++++++++--- 3 files changed, 334 insertions(+), 44 deletions(-) diff --git a/services/ai-llm/deployment.yaml b/services/ai-llm/deployment.yaml index 4f34d86..43d14c8 100644 --- a/services/ai-llm/deployment.yaml +++ b/services/ai-llm/deployment.yaml @@ -20,7 +20,7 @@ spec: labels: app: ollama annotations: - ai.bstein.dev/model: qwen2.5-coder:7b-instruct-q4_0 + ai.bstein.dev/model: qwen2.5:7b-instruct-q4_0 ai.bstein.dev/gpu: GPU pool (titan-22/24) ai.bstein.dev/restartedAt: "2026-01-26T12:00:00Z" spec: @@ -52,7 +52,7 @@ spec: - name: OLLAMA_MODELS value: /root/.ollama - name: OLLAMA_MODEL - value: qwen2.5-coder:7b-instruct-q4_0 + value: qwen2.5:7b-instruct-q4_0 command: - /bin/sh - -c diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index e35fa61..0ee86f0 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -82,11 +82,13 @@ spec: - name: OLLAMA_URL value: http://chat-ai-gateway.bstein-dev-home.svc.cluster.local/ - name: OLLAMA_MODEL - value: qwen2.5-coder:7b-instruct-q4_0 + value: qwen2.5:7b-instruct-q4_0 - name: OLLAMA_TIMEOUT_SEC - value: "480" + value: "600" - name: ATLASBOT_THINKING_INTERVAL_SEC value: "120" + - name: ATLASBOT_SNAPSHOT_TTL_SEC + value: "30" - name: ATLASBOT_HTTP_PORT value: "8090" ports: diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 8df1317..9f6c38d 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -21,6 +21,7 @@ API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480")) ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090")) ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "") +SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30")) KB_DIR = os.environ.get("KB_DIR", "") VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") @@ -523,7 +524,7 @@ def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool: hw = node.get("hardware") or "" arch = node.get("arch") or "" for f in filters: - if f == "rpi" and hw in ("rpi4", "rpi5"): + if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"): return True if f == "arm64" and arch == "arm64": return True @@ -546,7 +547,7 @@ def _hardware_class(labels: dict[str, Any]) -> str: if str(labels.get("jetson") or "").lower() == "true": return "jetson" hardware = (labels.get("hardware") or "").strip().lower() - if hardware in ("rpi4", "rpi5"): + if hardware in ("rpi4", "rpi5", "rpi"): return hardware arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "" if arch == "amd64": @@ -580,6 +581,14 @@ def node_inventory_live() -> list[dict[str, Any]]: ) return sorted(inventory, key=lambda item: item["name"]) + +def node_inventory() -> list[dict[str, Any]]: + snapshot = _snapshot_state() + inventory = _snapshot_inventory(snapshot) + if inventory: + return inventory + return node_inventory_live() + def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: grouped: dict[str, list[str]] = collections.defaultdict(list) for node in inventory: @@ -591,7 +600,7 @@ def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")): return "" if inventory is None: - inventory = node_inventory_live() + inventory = node_inventory() if not inventory: return "" groups = _group_nodes(inventory) @@ -626,7 +635,7 @@ def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]: q = normalize_query(prompt) if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")): - return node_inventory_live() + return node_inventory() return [] def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]: @@ -656,11 +665,177 @@ def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]: "expected_missing": sorted(expected_missing), } -def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str) -> str: + +def _workload_tokens(entry: dict[str, Any]) -> set[str]: + tokens: set[str] = set() + for key in ("workload", "namespace"): + value = entry.get(key) + if isinstance(value, str) and value: + tokens.update(_tokens(value)) + return tokens + + +def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None: + q_tokens = set(_tokens(prompt)) + if not q_tokens: + return None + scored: list[tuple[int, dict[str, Any]]] = [] + for entry in workloads: + if not isinstance(entry, dict): + continue + tokens = _workload_tokens(entry) + score = len(tokens & q_tokens) + if score: + scored.append((score, entry)) + if not scored: + return None + scored.sort(key=lambda item: item[0], reverse=True) + return scored[0][1] + + +def _format_confidence(answer: str, confidence: str) -> str: + if not answer: + return "" + return f"{answer}\nConfidence: {confidence}." + + +def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str: + q = normalize_query(prompt) + if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")): + return "" + entry = _select_workload(prompt, workloads) + if not entry: + return "" + workload = entry.get("workload") or "" + namespace = entry.get("namespace") or "" + nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {} + primary = entry.get("primary_node") or "" + if not workload or not nodes: + return "" + parts = [] + if primary: + parts.append(f"{primary} (primary)") + for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])): + if node == primary: + continue + parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})") + node_text = ", ".join(parts) if parts else primary + answer = f"{workload} runs in {namespace}. Nodes: {node_text}." + return _format_confidence(answer, "medium") + + +def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]: + if not snapshot: + return {} + metrics = snapshot.get("metrics") + return metrics if isinstance(metrics, dict) else {} + + +def _node_usage_top( + usage: list[dict[str, Any]], + *, + allowed_nodes: set[str] | None, +) -> tuple[str, float] | None: + best_node = "" + best_val = None + for item in usage if isinstance(usage, list) else []: + if not isinstance(item, dict): + continue + node = item.get("node") or "" + if allowed_nodes and node not in allowed_nodes: + continue + value = item.get("value") + try: + numeric = float(value) + except (TypeError, ValueError): + continue + if best_val is None or numeric > best_val: + best_val = numeric + best_node = node + if best_node and best_val is not None: + return best_node, best_val + return None + + +def snapshot_metric_answer( + prompt: str, + *, + snapshot: dict[str, Any] | None, + inventory: list[dict[str, Any]], +) -> str: + if not snapshot: + return "" + metrics = _snapshot_metrics(snapshot) + if not metrics: + return "" + q = normalize_query(prompt) + metric = _detect_metric(q) + op = _detect_operation(q) + include_hw, exclude_hw = _detect_hardware_filters(q) + nodes_in_query = _extract_titan_nodes(q) + only_workers = "worker" in q or "workers" in q + + filtered = _inventory_filter( + inventory, + include_hw=include_hw, + exclude_hw=exclude_hw, + only_workers=only_workers, + only_ready=None, + nodes_in_query=nodes_in_query, + ) + allowed_nodes = {node["name"] for node in filtered} if filtered else None + + if metric in {"cpu", "ram", "net", "io"} and op in {"top", "status", None}: + usage = metrics.get("node_usage", {}).get(metric, []) + top = _node_usage_top(usage, allowed_nodes=allowed_nodes) + if top: + node, val = top + percent = metric in {"cpu", "ram"} + value = _format_metric_value(str(val), percent=percent) + scope = "" + if include_hw: + scope = f" among {' and '.join(sorted(include_hw))}" + answer = f"Hottest node{scope}: {node} ({value})." + return _format_confidence(answer, "high") + + if metric == "connections" or "postgres" in q: + postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} + used = postgres.get("used") + max_conn = postgres.get("max") + hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {} + parts: list[str] = [] + if used is not None and max_conn is not None: + parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.") + if hottest.get("label"): + hot_val = hottest.get("value") + hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else "" + parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).") + if parts: + return _format_confidence(" ".join(parts), "high") + + return "" + +def structured_answer( + prompt: str, + *, + inventory: list[dict[str, Any]], + metrics_summary: str, + snapshot: dict[str, Any] | None = None, + workloads: list[dict[str, Any]] | None = None, +) -> str: q = normalize_query(prompt) if not q: return "" + if workloads: + workload_resp = workload_answer(prompt, workloads) + if workload_resp: + return workload_resp + + snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory) + if snap_resp: + return snap_resp + tokens = _tokens(q) op = _detect_operation(q) metric = _detect_metric(q) @@ -749,11 +924,20 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s if op == "status": if "missing" in q and expected_workers: missing = sorted(set(expected_workers) - {n["name"] for n in inventory}) - return "Missing nodes: " + (", ".join(missing) if missing else "none") + "." + return _format_confidence( + "Missing nodes: " + (", ".join(missing) if missing else "none") + ".", + "high", + ) if only_ready is False: - return "Not ready nodes: " + (", ".join(names) if names else "none") + "." + return _format_confidence( + "Not ready nodes: " + (", ".join(names) if names else "none") + ".", + "high", + ) if only_ready is True: - return f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + "." + return _format_confidence( + f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + ".", + "high", + ) if op == "count": if expected_workers and ("expected" in q or "should" in q): @@ -761,10 +945,10 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s msg = f"Grafana inventory expects {len(expected_workers)} worker nodes." if missing: msg += f" Missing: {', '.join(missing)}." - return msg + return _format_confidence(msg, "high") if not (include_hw or exclude_hw or nodes_in_query or only_workers): - return f"Atlas has {len(names)} nodes." - return f"Matching nodes: {len(names)}." + return _format_confidence(f"Atlas has {len(names)} nodes.", "high") + return _format_confidence(f"Matching nodes: {len(names)}.", "high") if op == "list": if nodes_in_query: @@ -772,12 +956,12 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s existing = {n["name"] for n in inventory} for node in nodes_in_query: parts.append(f"{node}: {'present' if node in existing else 'not present'}") - return "Node presence: " + ", ".join(parts) + "." + return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high") if not names: - return "Matching nodes: none." + return _format_confidence("Matching nodes: none.", "high") shown = names[:30] suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else "" - return "Matching nodes: " + ", ".join(shown) + suffix + "." + return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high") return "" @@ -922,6 +1106,58 @@ def _ariadne_state(timeout: int = 5) -> dict | None: except Exception: return None + +_SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0} + + +def _snapshot_state() -> dict[str, Any] | None: + now = time.monotonic() + cached = _SNAPSHOT_CACHE.get("payload") + ts = _SNAPSHOT_CACHE.get("ts") or 0.0 + if cached and now - ts < max(5, SNAPSHOT_TTL_SEC): + return cached + payload = _ariadne_state(timeout=10) + if isinstance(payload, dict) and payload: + _SNAPSHOT_CACHE["payload"] = payload + _SNAPSHOT_CACHE["ts"] = now + return payload + return cached if isinstance(cached, dict) else None + + +def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]: + if not snapshot: + return [] + items = snapshot.get("nodes_detail") + if not isinstance(items, list): + return [] + inventory: list[dict[str, Any]] = [] + for node in items: + if not isinstance(node, dict): + continue + labels = node.get("labels") if isinstance(node.get("labels"), dict) else {} + name = node.get("name") or "" + if not name: + continue + hardware = node.get("hardware") or _hardware_class(labels) + inventory.append( + { + "name": name, + "arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "", + "hardware": hardware, + "roles": node.get("roles") or [], + "is_worker": node.get("is_worker") is True, + "ready": node.get("ready") is True, + } + ) + return sorted(inventory, key=lambda item: item["name"]) + + +def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]: + if not snapshot: + return [] + workloads = snapshot.get("workloads") + return workloads if isinstance(workloads, list) else [] + def k8s_pods(namespace: str) -> list[dict]: data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") items = data.get("items") or [] @@ -1079,25 +1315,11 @@ def _node_is_worker(node: dict) -> bool: return True return True -def worker_nodes_status() -> tuple[list[str], list[str]]: - try: - data = k8s_get("/api/v1/nodes?limit=500") - except Exception: - return ([], []) - items = data.get("items") or [] - ready_nodes: list[str] = [] - not_ready_nodes: list[str] = [] - for node in items if isinstance(items, list) else []: - if not _node_is_worker(node): - continue - name = (node.get("metadata") or {}).get("name") or "" - if not name: - continue - ready = _node_ready_status(node) - if ready is True: - ready_nodes.append(name) - elif ready is False: - not_ready_nodes.append(name) +def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]: + if inventory is None: + inventory = node_inventory() + ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True] + not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False] return (sorted(ready_nodes), sorted(not_ready_nodes)) def expected_worker_nodes_from_metrics() -> list[str]: @@ -1238,13 +1460,29 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): if not prompt: self._write_json(400, {"error": "missing_prompt"}) return - inventory = node_inventory_live() - answer = structured_answer(prompt, inventory=inventory, metrics_summary="") + snapshot = _snapshot_state() + inventory = _snapshot_inventory(snapshot) or node_inventory_live() + workloads = _snapshot_workloads(snapshot) + answer = structured_answer( + prompt, + inventory=inventory, + metrics_summary="", + snapshot=snapshot, + workloads=workloads, + ) if not answer and _knowledge_intent(prompt): answer = knowledge_summary(prompt, inventory) if not answer: kb = kb_retrieve_titles(prompt, limit=4) - answer = kb or "" + context = build_context( + prompt, + allow_tools=False, + targets=[], + inventory=inventory, + snapshot=snapshot, + ) + fallback = kb or "I don't have enough data to answer that." + answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback) self._write_json(200, {"answer": answer}) @@ -1266,6 +1504,7 @@ def build_context( allow_tools: bool, targets: list[tuple[str, str]], inventory: list[dict[str, Any]] | None = None, + snapshot: dict[str, Any] | None = None, ) -> str: parts: list[str] = [] @@ -1281,6 +1520,10 @@ def build_context( if node_ctx: parts.append(node_ctx) + snapshot_ctx = snapshot_context(prompt, snapshot) + if snapshot_ctx: + parts.append(snapshot_ctx) + if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) @@ -1311,6 +1554,33 @@ def build_context( return "\n\n".join([p for p in parts if p]).strip() +def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str: + if not snapshot: + return "" + metrics = _snapshot_metrics(snapshot) + workloads = _snapshot_workloads(snapshot) + q = normalize_query(prompt) + parts: list[str] = [] + nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {} + if nodes.get("total") is not None: + parts.append( + f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}." + ) + if any(word in q for word in ("postgres", "connections", "db")): + postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} + if postgres: + parts.append(f"Snapshot: postgres_connections={postgres}.") + if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")): + hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} + if hottest: + parts.append(f"Snapshot: hottest_nodes={hottest}.") + if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")): + match = _select_workload(prompt, workloads) + if match: + parts.append(f"Snapshot: workload={match}.") + return "\n".join(parts).strip() + + def _knowledge_intent(prompt: str) -> bool: q = normalize_query(prompt) return any( @@ -1350,7 +1620,8 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str: kb_titles = kb_retrieve_titles(prompt, limit=4) if kb_titles: parts.append(kb_titles) - return "\n".join(parts).strip() + summary = "\n".join(parts).strip() + return _format_confidence(summary, "medium") if summary else "" def _ollama_call(hist_key, prompt: str, *, context: str) -> str: system = ( @@ -1360,7 +1631,8 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " - "If the answer is not grounded in the provided context or tool data, say you do not know." + "If the answer is not grounded in the provided context or tool data, say you do not know. " + "End every response with a line: 'Confidence: high|medium|low'." ) transcript_parts = [system] if context: @@ -1491,8 +1763,18 @@ def sync_loop(token: str, room_id: str): if isinstance(w, dict) and w.get("name"): targets.append((ns, str(w["name"]))) + snapshot = _snapshot_state() inventory = node_inventory_for_prompt(body) - context = build_context(body, allow_tools=allow_tools, targets=targets, inventory=inventory) + if not inventory: + inventory = _snapshot_inventory(snapshot) + workloads = _snapshot_workloads(snapshot) + context = build_context( + body, + allow_tools=allow_tools, + targets=targets, + inventory=inventory, + snapshot=snapshot, + ) if allow_tools and promql: res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" @@ -1506,7 +1788,13 @@ def sync_loop(token: str, room_id: str): if not fallback and context: fallback = _context_fallback(context) - structured = structured_answer(body, inventory=inventory, metrics_summary=metrics_fallback or "") + structured = structured_answer( + body, + inventory=inventory, + metrics_summary=metrics_fallback or "", + snapshot=snapshot, + workloads=workloads, + ) if structured: send_msg(token, rid, structured) continue