diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 7eb6dc7..e070ead 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -184,13 +184,6 @@ KB = {"catalog": {}, "runbooks": []} _HOST_INDEX: dict[str, list[dict]] = {} _NAME_INDEX: set[str] = set() _METRIC_INDEX: list[dict[str, Any]] = [] -_NODE_CLASS_INDEX: dict[str, list[str]] = {} -_NODE_CLASS_RPI4: set[str] = set() -_NODE_CLASS_RPI5: set[str] = set() -_NODE_CLASS_AMD64: set[str] = set() -_NODE_CLASS_JETSON: set[str] = set() -_NODE_CLASS_EXTERNAL: set[str] = set() -_NODE_CLASS_NON_RPI: set[str] = set() NODE_REGEX = re.compile(r'node=~"([^"]+)"') def _load_json_file(path: str) -> Any | None: @@ -202,8 +195,6 @@ def _load_json_file(path: str) -> Any | None: def load_kb(): global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX - global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON - global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI if not KB_DIR: return catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} @@ -228,24 +219,6 @@ def load_kb(): _NAME_INDEX = names _METRIC_INDEX = metrics if isinstance(metrics, list) else [] - node_classes = _parse_node_classes(runbooks) - _NODE_CLASS_INDEX = node_classes - _NODE_CLASS_RPI4 = set(node_classes.get("rpi4", [])) - _NODE_CLASS_RPI5 = set(node_classes.get("rpi5", [])) - _NODE_CLASS_AMD64 = set(node_classes.get("amd64", [])) - _NODE_CLASS_JETSON = set(node_classes.get("jetson", [])) - _NODE_CLASS_EXTERNAL = set(node_classes.get("external", [])) - _NODE_CLASS_NON_RPI = set( - sorted( - ( - set().union(*node_classes.values()) - - _NODE_CLASS_RPI4 - - _NODE_CLASS_RPI5 - - _NODE_CLASS_EXTERNAL - ) - ) - ) - def kb_retrieve(query: str, *, limit: int = 3) -> str: q = (query or "").strip() if not q or not KB.get("runbooks"): @@ -309,81 +282,92 @@ def _extract_titan_nodes(text: str) -> list[str]: names.add(f"titan-{right.lower()}") return sorted(names) -def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]: - classes: dict[str, list[str]] = {} - for doc in runbooks: - if not isinstance(doc, dict): - continue - body = str(doc.get("body") or "") - for line in body.splitlines(): - stripped = line.strip() - if "titan-" not in stripped.lower(): - continue - label = "" - nodes: list[str] = [] - if stripped.startswith("-") and ":" in stripped: - label, rest = stripped.lstrip("-").split(":", 1) - nodes = _extract_titan_nodes(rest) - label = label.strip().lower() - else: - nodes = _extract_titan_nodes(stripped) - if not nodes: - continue - if "jetson" in stripped.lower(): - classes.setdefault("jetson", nodes) - if "amd64" in stripped.lower() or "x86" in stripped.lower(): - classes.setdefault("amd64", nodes) - if "rpi4" in stripped.lower(): - classes.setdefault("rpi4", nodes) - if "rpi5" in stripped.lower(): - classes.setdefault("rpi5", nodes) - if "external" in stripped.lower() or "non-cluster" in stripped.lower(): - classes.setdefault("external", nodes) - if label: - classes.setdefault(label, nodes) - return {k: sorted(set(v)) for k, v in classes.items()} +def _node_roles(labels: dict[str, Any]) -> list[str]: + roles: list[str] = [] + for key in labels.keys(): + if key.startswith("node-role.kubernetes.io/"): + role = key.split("/", 1)[-1] + if role: + roles.append(role) + return sorted(set(roles)) -def node_inventory_answer(cluster_name: str, query: str) -> str: - q = (query or "").lower() - if "jetson" in q and _NODE_CLASS_JETSON: - names = sorted(_NODE_CLASS_JETSON) - return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}." - if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q: - names = sorted(_NODE_CLASS_NON_RPI) - if names: - return f"{cluster_name} non‑Raspberry Pi nodes: {', '.join(names)}." - if "raspberry" in q or "rpi" in q: - if "rpi4" in q and _NODE_CLASS_RPI4: - names = sorted(_NODE_CLASS_RPI4) - return f"{cluster_name} rpi4 nodes: {', '.join(names)}." - if "rpi5" in q and _NODE_CLASS_RPI5: - names = sorted(_NODE_CLASS_RPI5) - return f"{cluster_name} rpi5 nodes: {', '.join(names)}." - names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5) - if names: - return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}." - if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64: - names = sorted(_NODE_CLASS_AMD64) - return f"{cluster_name} amd64 nodes: {', '.join(names)}." - return "" +def _hardware_class(labels: dict[str, Any]) -> str: + if str(labels.get("jetson") or "").lower() == "true": + return "jetson" + hardware = (labels.get("hardware") or "").strip().lower() + if hardware in ("rpi4", "rpi5"): + return hardware + arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "" + if arch == "amd64": + return "amd64" + if arch == "arm64": + return "arm64-unknown" + return "unknown" + +def node_inventory_live() -> list[dict[str, Any]]: + try: + data = k8s_get("/api/v1/nodes?limit=500") + except Exception: + return [] + items = data.get("items") or [] + inventory: list[dict[str, Any]] = [] + for node in items if isinstance(items, list) else []: + meta = node.get("metadata") or {} + labels = meta.get("labels") or {} + name = meta.get("name") or "" + if not name: + continue + inventory.append( + { + "name": name, + "arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "", + "hardware": _hardware_class(labels), + "roles": _node_roles(labels), + "ready": _node_ready_status(node), + } + ) + return sorted(inventory, key=lambda item: item["name"]) + +def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: + grouped: dict[str, list[str]] = collections.defaultdict(list) + for node in inventory: + grouped[node.get("hardware") or "unknown"].append(node["name"]) + return {k: sorted(v) for k, v in grouped.items()} def node_inventory_context(query: str) -> str: q = (query or "").lower() - if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")): + if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")): return "" - lines: list[str] = ["Node inventory (KB):"] - if _NODE_CLASS_RPI5: - lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}") - if _NODE_CLASS_RPI4: - lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}") - if _NODE_CLASS_JETSON: - lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}") - if _NODE_CLASS_AMD64: - lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}") - if _NODE_CLASS_EXTERNAL: - lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}") - if len(lines) == 1: + inventory = node_inventory_live() + if not inventory: return "" + groups = _group_nodes(inventory) + total = len(inventory) + ready = sum(1 for node in inventory if node.get("ready") is True) + not_ready = sum(1 for node in inventory if node.get("ready") is False) + lines: list[str] = [ + "Node inventory (live):", + f"- total: {total}, ready: {ready}, not ready: {not_ready}", + ] + for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"): + if key in groups: + lines.append(f"- {key}: {', '.join(groups[key])}") + non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", []))) + if non_rpi: + lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}") + unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", []) + if unknowns: + lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.") + expected_workers = expected_worker_nodes_from_metrics() + if expected_workers: + ready_workers, not_ready_workers = worker_nodes_status() + missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers)) + lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}") + lines.append(f"- workers_ready: {', '.join(ready_workers)}") + if not_ready_workers: + lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}") + if missing: + lines.append(f"- workers_missing (derived): {', '.join(missing)}") return "\n".join(lines) def _metric_tokens(entry: dict[str, Any]) -> str: @@ -730,12 +714,6 @@ def worker_nodes_status() -> tuple[list[str], list[str]]: not_ready_nodes.append(name) return (sorted(ready_nodes), sorted(not_ready_nodes)) -def expected_nodes_from_kb() -> set[str]: - if not _NODE_CLASS_INDEX: - return set() - nodes = set().union(*_NODE_CLASS_INDEX.values()) - return {n for n in nodes if n and n not in _NODE_CLASS_EXTERNAL} - def expected_worker_nodes_from_metrics() -> list[str]: for entry in _METRIC_INDEX: panel = (entry.get("panel_title") or "").lower() @@ -753,42 +731,13 @@ def expected_worker_nodes_from_metrics() -> list[str]: return sorted(nodes) return [] -def missing_nodes_answer(cluster_name: str) -> str: - expected_workers = expected_worker_nodes_from_metrics() - if expected_workers: - ready_nodes, not_ready_nodes = worker_nodes_status() - current_workers = set(ready_nodes + not_ready_nodes) - missing = sorted(set(expected_workers) - current_workers) - if not missing: - return f"{cluster_name}: no missing worker nodes versus Grafana inventory." - return f"{cluster_name} missing worker nodes versus Grafana inventory: {', '.join(missing)}." - - expected = expected_nodes_from_kb() - if not expected: +def _context_fallback(context: str) -> str: + if not context: return "" - current = set() - try: - data = k8s_get("/api/v1/nodes?limit=500") - items = data.get("items") or [] - for node in items if isinstance(items, list) else []: - name = (node.get("metadata") or {}).get("name") or "" - if name: - current.add(name) - except Exception: - return "" - missing = sorted(expected - current) - if not missing: - return f"{cluster_name}: no missing nodes versus KB inventory." - return f"{cluster_name} missing nodes versus KB inventory: {', '.join(missing)}." - -def _should_short_circuit(prompt: str, fallback: str) -> bool: - if not fallback: - return False - lower = (prompt or "").lower() - for word in ("why", "explain", "architecture", "breakdown", "root cause", "plan"): - if word in lower: - return False - return True + trimmed = context.strip() + if len(trimmed) > MAX_TOOL_CHARS: + trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..." + return "I couldn’t reach the model backend. Here is the data I found:\n" + trimmed def vm_top_restarts(hours: int = 1) -> str: q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" @@ -1112,92 +1061,6 @@ def sync_loop(token: str, room_id: str): continue lower_body = body.lower() - if re.search(r"\bhow many nodes\b|\bnode count\b|\bnumber of nodes\b", lower_body): - if any(word in lower_body for word in ("cluster", "atlas", "titan")): - summary = nodes_summary("Atlas") - if not summary: - send_msg(token, rid, "I couldn’t reach the cluster API to count nodes. Try again in a moment.") - continue - send_msg(token, rid, summary) - continue - if "worker" in lower_body and "node" in lower_body: - ready_nodes, not_ready_nodes = worker_nodes_status() - total = len(ready_nodes) + len(not_ready_nodes) - if total: - missing_hint = missing_nodes_answer("Atlas") - expected_workers = expected_worker_nodes_from_metrics() - expected_total = len(expected_workers) if expected_workers else 0 - if any(word in lower_body for word in ("ready", "not ready", "unready")): - if not_ready_nodes: - send_msg( - token, - rid, - f"Worker nodes not Ready: {', '.join(not_ready_nodes)}.", - ) - else: - msg = f"All {len(ready_nodes)} worker nodes are Ready." - if expected_total and len(ready_nodes) != expected_total: - missing = sorted(set(expected_workers) - set(ready_nodes)) - if missing: - msg += f" Missing: {', '.join(missing)}." - elif missing_hint and "no missing" not in missing_hint: - msg += f" {missing_hint}" - send_msg(token, rid, msg) - continue - if any(word in lower_body for word in ("how many", "should")): - msg = ( - f"Atlas has {total} worker nodes; " - f"{len(ready_nodes)} Ready, {len(not_ready_nodes)} NotReady." - ) - if expected_total: - msg += f" Grafana inventory expects {expected_total} workers." - missing = sorted(set(expected_workers) - set(ready_nodes)) - if missing: - msg += f" Missing: {', '.join(missing)}." - elif missing_hint and "no missing" not in missing_hint: - msg += f" {missing_hint}" - elif "should" in lower_body: - msg += " I don’t have an expected worker inventory in the KB; this is the current cluster state." - send_msg(token, rid, msg) - continue - if "missing" in lower_body and "node" in lower_body: - missing = missing_nodes_answer("Atlas") - if missing: - send_msg(token, rid, missing) - continue - inventory_answer = node_inventory_answer("Atlas", lower_body) - if inventory_answer: - send_msg(token, rid, inventory_answer) - continue - if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")): - if any(word in lower_body for word in ("cluster", "atlas", "titan")): - arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64" - summary = nodes_arch_summary("Atlas", arch) - if not summary: - send_msg( - token, - rid, - "I couldn’t reach the cluster API to count nodes by architecture. Try again in a moment.", - ) - continue - send_msg(token, rid, summary) - continue - if re.search(r"\bnode names?\b|\bnodes?\b.*\bnamed\b|\bnaming\b", lower_body): - if any(word in lower_body for word in ("cluster", "atlas", "titan")): - names_summary = nodes_names_summary("Atlas") - if not names_summary: - send_msg(token, rid, "I couldn’t reach the cluster API to list node names. Try again in a moment.") - continue - send_msg(token, rid, names_summary) - continue - if re.search(r"\bwhich nodes are ready\b|\bnodes ready\b", lower_body): - ready_nodes, not_ready_nodes = worker_nodes_status() - if ready_nodes: - msg = f"Ready worker nodes ({len(ready_nodes)}): {', '.join(ready_nodes)}." - if not_ready_nodes: - msg += f" Not Ready: {', '.join(not_ready_nodes)}." - send_msg(token, rid, msg) - continue # Only do live cluster introspection in DMs; metrics can be answered when mentioned. allow_tools = is_dm @@ -1230,14 +1093,9 @@ def sync_loop(token: str, room_id: str): if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context - fallback = "" - if "node" in lower_body or "cluster" in lower_body: - fallback = node_inventory_answer("Atlas", lower_body) - if metrics_fallback and not fallback: - fallback = metrics_fallback - if _should_short_circuit(body, fallback): - send_msg(token, rid, fallback) - continue + fallback = metrics_fallback or "" + if not fallback and context: + fallback = _context_fallback(context) reply = ollama_reply_with_thinking( token, rid,