atlasbot: ground node inventory and soften llm failures

This commit is contained in:
Brad Stein 2026-01-26 12:36:51 -03:00
parent 53e4b4036b
commit fff00dbe95
2 changed files with 181 additions and 37 deletions

View File

@ -16,7 +16,7 @@ spec:
labels:
app: atlasbot
annotations:
checksum/atlasbot-configmap: manual-atlasbot-11
checksum/atlasbot-configmap: manual-atlasbot-12
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"

View File

@ -3,6 +3,7 @@ import json
import os
import re
import ssl
import threading
import time
from typing import Any
from urllib import error, parse, request
@ -156,6 +157,13 @@ def send_msg(token: str, room: str, text: str):
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_NODE_CLASS_INDEX: dict[str, list[str]] = {}
_NODE_CLASS_RPI4: set[str] = set()
_NODE_CLASS_RPI5: set[str] = set()
_NODE_CLASS_AMD64: set[str] = set()
_NODE_CLASS_JETSON: set[str] = set()
_NODE_CLASS_EXTERNAL: set[str] = set()
_NODE_CLASS_NON_RPI: set[str] = set()
def _load_json_file(path: str) -> Any | None:
try:
@ -166,6 +174,8 @@ def _load_json_file(path: str) -> Any | None:
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX
global _NODE_CLASS_INDEX, _NODE_CLASS_RPI4, _NODE_CLASS_RPI5, _NODE_CLASS_AMD64, _NODE_CLASS_JETSON
global _NODE_CLASS_EXTERNAL, _NODE_CLASS_NON_RPI
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
@ -188,6 +198,24 @@ def load_kb():
names.add(str(w["name"]).lower())
_NAME_INDEX = names
node_classes = _parse_node_classes(runbooks)
_NODE_CLASS_INDEX = node_classes
_NODE_CLASS_RPI4 = set(node_classes.get("rpi4", []))
_NODE_CLASS_RPI5 = set(node_classes.get("rpi5", []))
_NODE_CLASS_AMD64 = set(node_classes.get("amd64", []))
_NODE_CLASS_JETSON = set(node_classes.get("jetson", []))
_NODE_CLASS_EXTERNAL = set(node_classes.get("external", []))
_NODE_CLASS_NON_RPI = set(
sorted(
(
set().union(*node_classes.values())
- _NODE_CLASS_RPI4
- _NODE_CLASS_RPI5
- _NODE_CLASS_EXTERNAL
)
)
)
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
@ -237,6 +265,12 @@ def kb_retrieve(query: str, *, limit: int = 3) -> str:
def _extract_titan_nodes(text: str) -> list[str]:
names = {n.lower() for n in TITAN_NODE_RE.findall(text or "") if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", text or "", re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(text or ""):
left, right = match.groups()
if left:
@ -245,6 +279,83 @@ def _extract_titan_nodes(text: str) -> list[str]:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _parse_node_classes(runbooks: list[dict[str, Any]]) -> dict[str, list[str]]:
classes: dict[str, list[str]] = {}
for doc in runbooks:
if not isinstance(doc, dict):
continue
body = str(doc.get("body") or "")
for line in body.splitlines():
stripped = line.strip()
if "titan-" not in stripped.lower():
continue
label = ""
nodes: list[str] = []
if stripped.startswith("-") and ":" in stripped:
label, rest = stripped.lstrip("-").split(":", 1)
nodes = _extract_titan_nodes(rest)
label = label.strip().lower()
else:
nodes = _extract_titan_nodes(stripped)
if not nodes:
continue
if "jetson" in stripped.lower():
classes.setdefault("jetson", nodes)
if "amd64" in stripped.lower() or "x86" in stripped.lower():
classes.setdefault("amd64", nodes)
if "rpi4" in stripped.lower():
classes.setdefault("rpi4", nodes)
if "rpi5" in stripped.lower():
classes.setdefault("rpi5", nodes)
if "external" in stripped.lower() or "non-cluster" in stripped.lower():
classes.setdefault("external", nodes)
if label:
classes.setdefault(label, nodes)
return {k: sorted(set(v)) for k, v in classes.items()}
def node_inventory_answer(cluster_name: str, query: str) -> str:
q = (query or "").lower()
if "jetson" in q and _NODE_CLASS_JETSON:
names = sorted(_NODE_CLASS_JETSON)
return f"{cluster_name} has {len(names)} Jetson nodes: {', '.join(names)}."
if "non-raspberry" in q or "non raspberry" in q or "not raspberry" in q:
names = sorted(_NODE_CLASS_NON_RPI)
if names:
return f"{cluster_name} nonRaspberry Pi nodes: {', '.join(names)}."
if "raspberry" in q or "rpi" in q:
if "rpi4" in q and _NODE_CLASS_RPI4:
names = sorted(_NODE_CLASS_RPI4)
return f"{cluster_name} rpi4 nodes: {', '.join(names)}."
if "rpi5" in q and _NODE_CLASS_RPI5:
names = sorted(_NODE_CLASS_RPI5)
return f"{cluster_name} rpi5 nodes: {', '.join(names)}."
names = sorted(_NODE_CLASS_RPI4 | _NODE_CLASS_RPI5)
if names:
return f"{cluster_name} Raspberry Pi nodes: {', '.join(names)}."
if ("amd64" in q or "x86" in q) and _NODE_CLASS_AMD64:
names = sorted(_NODE_CLASS_AMD64)
return f"{cluster_name} amd64 nodes: {', '.join(names)}."
return ""
def node_inventory_context(query: str) -> str:
q = (query or "").lower()
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "x86", "cluster")):
return ""
lines: list[str] = ["Node inventory (KB):"]
if _NODE_CLASS_RPI5:
lines.append(f"- rpi5: {', '.join(sorted(_NODE_CLASS_RPI5))}")
if _NODE_CLASS_RPI4:
lines.append(f"- rpi4: {', '.join(sorted(_NODE_CLASS_RPI4))}")
if _NODE_CLASS_JETSON:
lines.append(f"- jetson: {', '.join(sorted(_NODE_CLASS_JETSON))}")
if _NODE_CLASS_AMD64:
lines.append(f"- amd64: {', '.join(sorted(_NODE_CLASS_AMD64))}")
if _NODE_CLASS_EXTERNAL:
lines.append(f"- external: {', '.join(sorted(_NODE_CLASS_EXTERNAL))}")
if len(lines) == 1:
return ""
return "\n".join(lines)
def jetson_nodes_from_kb() -> list[str]:
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
@ -627,6 +738,10 @@ def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, st
if endpoints:
parts.append(endpoints)
inventory = node_inventory_context(prompt)
if inventory:
parts.append(inventory)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
@ -656,35 +771,58 @@ def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, st
return "\n\n".join([p for p in parts if p]).strip()
def ollama_reply(hist_key, prompt: str, *, context: str) -> str:
try:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception:
return "Im here — but I couldnt reach the model backend."
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
done.wait()
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
@ -747,6 +885,10 @@ def sync_loop(token: str, room_id: str):
continue
send_msg(token, rid, summary)
continue
inventory_answer = node_inventory_answer("Atlas", lower_body)
if inventory_answer:
send_msg(token, rid, inventory_answer)
continue
if "node" in lower_body and any(word in lower_body for word in ("arm64", "aarch64", "amd64", "x86_64", "x86-64")):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
arch = "arm64" if "arm64" in lower_body or "aarch64" in lower_body else "amd64"
@ -760,14 +902,6 @@ def sync_loop(token: str, room_id: str):
continue
send_msg(token, rid, summary)
continue
if "jetson" in lower_body:
if any(word in lower_body for word in ("cluster", "atlas", "titan", "node", "nodes")):
summary = jetson_nodes_summary("Atlas")
if summary:
send_msg(token, rid, summary)
else:
send_msg(token, rid, "Jetson inventory is not available in the knowledge base yet.")
continue
if re.search(r"\bnode names?\b|\bnodes? named\b|\bnaming\b", lower_body):
if any(word in lower_body for word in ("cluster", "atlas", "titan")):
names_summary = nodes_names_summary("Atlas")
@ -803,7 +937,17 @@ def sync_loop(token: str, room_id: str):
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
reply = ollama_reply(hist_key, body, context=context)
fallback = ""
if "node" in lower_body or "cluster" in lower_body:
fallback = node_inventory_answer("Atlas", lower_body)
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
body,
context=context,
fallback=fallback,
)
send_msg(token, rid, reply)
def login_with_retry():