atlasbot: simplify cluster gating and context

This commit is contained in:
Brad Stein 2026-01-27 14:54:09 -03:00
parent cb7141dfb6
commit 92f4137e9c
2 changed files with 133 additions and 66 deletions

View File

@ -16,7 +16,7 @@ spec:
labels:
app: atlasbot
annotations:
checksum/atlasbot-configmap: manual-atlasbot-41
checksum/atlasbot-configmap: manual-atlasbot-42
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"

View File

@ -65,6 +65,16 @@ STOPWORDS = {
"help",
"atlas",
"othrys",
"system",
"systems",
"service",
"services",
"app",
"apps",
"platform",
"software",
"tool",
"tools",
}
METRIC_HINT_WORDS = {
@ -129,6 +139,8 @@ CLUSTER_HINT_WORDS = {
"kubernetes",
"node",
"nodes",
"worker",
"workers",
"pod",
"pods",
"namespace",
@ -162,6 +174,11 @@ CLUSTER_HINT_WORDS = {
"database",
"db",
"atlasbot",
"jetson",
"rpi",
"raspberry",
"amd64",
"arm64",
}
_OLLAMA_LOCK = threading.Lock()
@ -1840,18 +1857,6 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
metrics_summary = snapshot_context(cleaned, snapshot) if cluster_query else ""
if cluster_query:
structured = structured_answer(
cleaned,
inventory=inventory,
metrics_summary=metrics_summary,
snapshot=snapshot,
workloads=workloads,
)
if structured:
self._write_json(200, {"answer": structured})
return
context = ""
if cluster_query:
context = build_context(
@ -1862,11 +1867,14 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
snapshot=snapshot,
workloads=workloads,
)
metrics_context, _metrics_fallback = metrics_query_context(cleaned, allow_tools=True)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = "I don't have enough data to answer that."
answer = ollama_reply(("http", "internal"), cleaned, context=context, fallback=fallback)
answer = ollama_reply(
("http", "internal"),
cleaned,
context=context,
fallback=fallback,
use_history=False,
)
self._write_json(200, {"answer": answer})
@ -1897,6 +1905,15 @@ def build_context(
if facts:
parts.append(facts)
snapshot_json = snapshot_compact_context(
prompt,
snapshot,
inventory=inventory,
workloads=workloads,
)
if snapshot_json:
parts.append(snapshot_json)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
@ -1925,15 +1942,6 @@ def build_context(
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
p_l = (prompt or "").lower()
if any(w in p_l for w in METRIC_HINT_WORDS):
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
snap = vm_cluster_snapshot()
if snap:
parts.append("VictoriaMetrics (cluster snapshot):\n" + snap)
return "\n\n".join([p for p in parts if p]).strip()
@ -1963,6 +1971,68 @@ def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str:
parts.append(f"Snapshot: workload={match}.")
return "\n".join(parts).strip()
def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
output: list[dict[str, Any]] = []
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not name:
continue
output.append(
{
"name": name,
"ready": node.get("ready"),
"hardware": node.get("hardware"),
"arch": node.get("arch"),
"roles": node.get("roles"),
"is_worker": node.get("is_worker"),
"os": node.get("os"),
"kernel": node.get("kernel"),
"kubelet": node.get("kubelet"),
"container_runtime": node.get("container_runtime"),
}
)
return output
def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {}
return {
"pods_running": metrics.get("pods_running"),
"pods_pending": metrics.get("pods_pending"),
"pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"),
"postgres_connections": metrics.get("postgres_connections"),
"hottest_nodes": metrics.get("hottest_nodes"),
"node_usage": metrics.get("node_usage"),
"top_restarts_1h": metrics.get("top_restarts_1h"),
}
def snapshot_compact_context(
prompt: str,
snapshot: dict[str, Any] | None,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
if not snapshot:
return ""
compact = {
"collected_at": snapshot.get("collected_at"),
"nodes_summary": snapshot.get("nodes_summary"),
"expected_workers": expected_worker_nodes_from_metrics(),
"nodes_detail": _compact_nodes_detail(snapshot),
"workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [],
"metrics": _compact_metrics(snapshot),
"flux": snapshot.get("flux"),
"errors": snapshot.get("errors"),
}
text = json.dumps(compact, ensure_ascii=False)
if len(text) > MAX_FACTS_CHARS:
text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..."
return "Cluster snapshot (JSON):\n" + text
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
@ -1998,16 +2068,8 @@ def _is_cluster_query(
if host.endswith("bstein.dev"):
return True
tokens = set(_tokens(q))
if workloads:
for entry in workloads:
if not isinstance(entry, dict):
continue
if tokens & _workload_tokens(entry):
return True
if inventory:
names = {node.get("name") for node in inventory if isinstance(node, dict)}
if tokens & {n for n in names if n}:
return True
if _NAME_INDEX and tokens & _NAME_INDEX:
return True
return False
@ -2037,7 +2099,7 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
@ -2062,7 +2124,8 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system_content += "\n\nContext (grounded):\n" + context[:MAX_CONTEXT_CHARS]
messages: list[dict[str, str]] = [{"role": "system", "content": system_content}]
messages.extend(_history_to_messages(history[hist_key][-24:]))
if use_history:
messages.extend(_history_to_messages(history[hist_key][-24:]))
messages.append({"role": "user", "content": prompt})
payload = {"model": MODEL, "messages": messages, "stream": False}
@ -2082,31 +2145,55 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
else:
raw_reply = data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
if use_history:
history[hist_key].append(f"Atlas: {reply}")
return reply
finally:
if lock:
lock.release()
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
def ollama_reply(
hist_key,
prompt: str,
*,
context: str,
fallback: str = "",
use_history: bool = True,
) -> str:
last_error = None
for attempt in range(max(1, OLLAMA_RETRIES + 1)):
try:
return _ollama_call(hist_key, prompt, context=context)
return _ollama_call(hist_key, prompt, context=context, use_history=use_history)
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(min(4, 2 ** attempt))
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
if use_history:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "I don't have enough data to answer that."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
def ollama_reply_with_thinking(
token: str,
room: str,
hist_key,
prompt: str,
*,
context: str,
fallback: str,
use_history: bool = True,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
result["reply"] = ollama_reply(
hist_key,
prompt,
context=context,
fallback=fallback,
use_history=use_history,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
@ -2182,9 +2269,8 @@ def sync_loop(token: str, room_id: str):
cleaned_body = _strip_bot_mention(body)
lower_body = cleaned_body.lower()
# Only do live cluster introspection in DMs; metrics can be answered when mentioned.
# Only do live cluster introspection in DMs.
allow_tools = is_dm
allow_metrics = is_dm or mentioned
promql = ""
if allow_tools:
@ -2209,21 +2295,6 @@ def sync_loop(token: str, room_id: str):
inventory = _snapshot_inventory(snapshot)
workloads = _snapshot_workloads(snapshot)
cluster_query = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
metrics_summary = snapshot_context(cleaned_body, snapshot) if cluster_query else ""
structured = ""
if cluster_query:
structured = structured_answer(
cleaned_body,
inventory=inventory,
metrics_summary=metrics_summary,
snapshot=snapshot,
workloads=workloads,
)
if structured:
history[hist_key].append(f"Atlas: {structured}")
history[hist_key] = history[hist_key][-80:]
send_msg(token, rid, structured)
continue
context = ""
if cluster_query:
context = build_context(
@ -2239,11 +2310,6 @@ def sync_loop(token: str, room_id: str):
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
if cluster_query:
metrics_context, _metrics_fallback = metrics_query_context(cleaned_body, allow_tools=allow_metrics)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = "I don't have enough data to answer that."
reply = ollama_reply_with_thinking(
@ -2253,6 +2319,7 @@ def sync_loop(token: str, room_id: str):
cleaned_body,
context=context,
fallback=fallback,
use_history=cluster_query,
)
send_msg(token, rid, reply)