diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index 47d0992..69b30e4 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-47 + checksum/atlasbot-configmap: manual-atlasbot-48 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 351bb40..f0bf008 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -532,7 +532,7 @@ def _detect_role_filters(q: str) -> set[str]: return roles def _detect_entity(q: str) -> str | None: - if "node" in q or "nodes" in q or "worker" in q or TITAN_NODE_RE.search(q): + if "node" in q or "nodes" in q or "worker" in q or "hardware" in q or "architecture" in q or TITAN_NODE_RE.search(q): return "node" if "pod" in q or "pods" in q: return "pod" @@ -1152,6 +1152,15 @@ def snapshot_metric_answer( if include_hw: scope = f" among {' and '.join(sorted(include_hw))}" answer = f"Hottest node{scope}: {node} ({value})." + if allowed_nodes and len(allowed_nodes) != len(inventory): + overall = _node_usage_top(usage, allowed_nodes=None) + if overall and overall[0] != node: + overall_val = _format_metric_value( + str(overall[1]), + percent=percent, + rate=metric in {"net", "io"}, + ) + answer += f" Overall hottest: {overall[0]} ({overall_val})." return _format_confidence(answer, "high") if metric == "connections" or "postgres" in q: @@ -1358,6 +1367,219 @@ def structured_answer( return "" + +def _nodes_summary_line(inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None) -> str: + summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {} + nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {} + total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total") + ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready") + not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready") + if total is None: + total = len(inventory) + ready = len([n for n in inventory if n.get("ready") is True]) + not_ready = len([n for n in inventory if n.get("ready") is False]) + if total is None: + return "" + return f"Atlas cluster has {total} nodes ({ready} ready, {not_ready} not ready)." + + +def _hardware_mix_line(inventory: list[dict[str, Any]]) -> str: + if not inventory: + return "" + groups = _group_nodes(inventory) + parts: list[str] = [] + for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): + nodes = groups.get(key) or [] + if nodes: + parts.append(f"{key}={len(nodes)}") + if not parts: + return "" + return "Hardware mix: " + ", ".join(parts) + "." + + +def _os_mix_line(snapshot: dict[str, Any] | None) -> str: + if not snapshot: + return "" + details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else [] + counts: dict[str, int] = collections.Counter() + for node in details: + if not isinstance(node, dict): + continue + os_name = (node.get("os") or "").strip() + if os_name: + counts[os_name] += 1 + if not counts: + return "" + parts = [f"{os_name}={count}" for os_name, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))] + return "OS mix: " + ", ".join(parts[:5]) + "." + + +def _pods_summary_line(metrics: dict[str, Any]) -> str: + if not metrics: + return "" + running = metrics.get("pods_running") + pending = metrics.get("pods_pending") + failed = metrics.get("pods_failed") + succeeded = metrics.get("pods_succeeded") + parts: list[str] = [] + if running is not None: + parts.append(f"{running:.0f} running") + if pending is not None: + parts.append(f"{pending:.0f} pending") + if failed is not None: + parts.append(f"{failed:.0f} failed") + if succeeded is not None: + parts.append(f"{succeeded:.0f} succeeded") + if not parts: + return "" + return "Pods: " + ", ".join(parts) + "." + + +def _postgres_summary_line(metrics: dict[str, Any]) -> str: + if not metrics: + return "" + postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} + if not postgres: + return "" + used = postgres.get("used") + max_conn = postgres.get("max") + hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {} + parts: list[str] = [] + if used is not None and max_conn is not None: + parts.append(f"{used:.0f}/{max_conn:.0f} connections") + if hottest.get("label"): + hot_val = hottest.get("value") + hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else "" + parts.append(f"hottest {hottest.get('label')} ({hot_val_str})") + if not parts: + return "" + return "Postgres: " + ", ".join(parts) + "." + + +def _hottest_summary_line(metrics: dict[str, Any]) -> str: + if not metrics: + return "" + hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} + if not hottest: + return "" + parts: list[str] = [] + for key in ("cpu", "ram", "net", "io"): + entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {} + node = entry.get("node") + value = entry.get("value") + if node and value is not None: + value_fmt = _format_metric_value( + str(value), + percent=key in ("cpu", "ram"), + rate=key in ("net", "io"), + ) + parts.append(f"{key.upper()} {node} ({value_fmt})") + if not parts: + return "" + return "Hottest nodes: " + "; ".join(parts) + "." + + +def cluster_overview_answer( + prompt: str, + *, + inventory: list[dict[str, Any]], + snapshot: dict[str, Any] | None, +) -> str: + if not inventory and not snapshot: + return "" + q = normalize_query(prompt) + metrics = _snapshot_metrics(snapshot) + lines: list[str] = [] + + nodes_line = _nodes_summary_line(inventory, snapshot) + if nodes_line: + lines.append(nodes_line) + + if any(word in q for word in ("hardware", "architecture", "nodes", "node", "cluster", "atlas", "titan", "lab")): + hw_line = _hardware_mix_line(inventory) + if hw_line: + lines.append(hw_line) + os_line = _os_mix_line(snapshot) + if os_line: + lines.append(os_line) + + if any( + word in q + for word in ( + "interesting", + "status", + "health", + "overview", + "summary", + "tell me", + "what do you know", + "about", + "pods", + "postgres", + "connections", + "hottest", + "cpu", + "ram", + "memory", + "net", + "network", + "io", + "disk", + "busy", + "load", + "usage", + "utilization", + ) + ): + pods_line = _pods_summary_line(metrics) + if pods_line: + lines.append(pods_line) + hottest_line = _hottest_summary_line(metrics) + if hottest_line: + lines.append(hottest_line) + postgres_line = _postgres_summary_line(metrics) + if postgres_line: + lines.append(postgres_line) + + if not lines: + return "" + return "Based on the snapshot, " + "\n".join(lines) + + +def cluster_answer( + prompt: str, + *, + inventory: list[dict[str, Any]], + snapshot: dict[str, Any] | None, + workloads: list[dict[str, Any]] | None, +) -> str: + metrics_summary = snapshot_context(prompt, snapshot) + structured = structured_answer( + prompt, + inventory=inventory, + metrics_summary=metrics_summary, + snapshot=snapshot, + workloads=workloads, + ) + if structured: + return structured + + overview = cluster_overview_answer(prompt, inventory=inventory, snapshot=snapshot) + if overview: + kb_titles = kb_retrieve_titles(prompt, limit=4) if _knowledge_intent(prompt) else "" + if kb_titles: + overview = overview + "\n" + kb_titles + return _format_confidence(overview, "medium") + + kb_titles = kb_retrieve_titles(prompt, limit=4) + if kb_titles: + return _format_confidence(kb_titles, "low") + + if metrics_summary: + return _format_confidence(metrics_summary, "low") + + return "" + def _metric_tokens(entry: dict[str, Any]) -> str: parts: list[str] = [] for key in ("panel_title", "dashboard", "description"): @@ -1868,16 +2090,24 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): workloads=workloads, ) fallback = "I don't have enough data to answer that." - llm_prompt = cleaned if cluster_query: - llm_prompt = f"Atlas cluster question (use the cluster snapshot context): {cleaned}" - answer = ollama_reply( - ("http", "internal"), - llm_prompt, - context=context, - fallback=fallback, - use_history=False, - ) + answer = cluster_answer( + cleaned, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + ) + if not answer: + answer = fallback + else: + llm_prompt = cleaned + answer = ollama_reply( + ("http", "internal"), + llm_prompt, + context=context, + fallback=fallback, + use_history=False, + ) self._write_json(200, {"answer": answer}) @@ -2044,6 +2274,7 @@ def _knowledge_intent(prompt: str) -> bool: for phrase in ( "what do you know", "tell me about", + "interesting", "overview", "summary", "describe", @@ -2312,21 +2543,30 @@ def sync_loop(token: str, room_id: str): res = vm_query(promql, timeout=20) rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered - context = (context + "\n\n" + extra).strip() if context else extra + send_msg(token, rid, extra) + continue fallback = "I don't have enough data to answer that." - llm_prompt = cleaned_body if cluster_query: - llm_prompt = f"Atlas cluster question (use the cluster snapshot context): {cleaned_body}" - reply = ollama_reply_with_thinking( - token, - rid, - hist_key, - llm_prompt, - context=context, - fallback=fallback, - use_history=cluster_query, - ) + reply = cluster_answer( + cleaned_body, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + ) + if not reply: + reply = fallback + else: + llm_prompt = cleaned_body + reply = ollama_reply_with_thinking( + token, + rid, + hist_key, + llm_prompt, + context=context, + fallback=fallback, + use_history=False, + ) send_msg(token, rid, reply) def login_with_retry():