atlasbot: use cluster snapshot + model update

This commit is contained in:
Brad Stein 2026-01-27 05:41:58 -03:00
parent b1aad04f3e
commit 89935a579a
3 changed files with 334 additions and 44 deletions

View File

@ -20,7 +20,7 @@ spec:
labels:
app: ollama
annotations:
ai.bstein.dev/model: qwen2.5-coder:7b-instruct-q4_0
ai.bstein.dev/model: qwen2.5:7b-instruct-q4_0
ai.bstein.dev/gpu: GPU pool (titan-22/24)
ai.bstein.dev/restartedAt: "2026-01-26T12:00:00Z"
spec:
@ -52,7 +52,7 @@ spec:
- name: OLLAMA_MODELS
value: /root/.ollama
- name: OLLAMA_MODEL
value: qwen2.5-coder:7b-instruct-q4_0
value: qwen2.5:7b-instruct-q4_0
command:
- /bin/sh
- -c

View File

@ -82,11 +82,13 @@ spec:
- name: OLLAMA_URL
value: http://chat-ai-gateway.bstein-dev-home.svc.cluster.local/
- name: OLLAMA_MODEL
value: qwen2.5-coder:7b-instruct-q4_0
value: qwen2.5:7b-instruct-q4_0
- name: OLLAMA_TIMEOUT_SEC
value: "480"
value: "600"
- name: ATLASBOT_THINKING_INTERVAL_SEC
value: "120"
- name: ATLASBOT_SNAPSHOT_TTL_SEC
value: "30"
- name: ATLASBOT_HTTP_PORT
value: "8090"
ports:

View File

@ -21,6 +21,7 @@ API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
@ -523,7 +524,7 @@ def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool:
hw = node.get("hardware") or ""
arch = node.get("arch") or ""
for f in filters:
if f == "rpi" and hw in ("rpi4", "rpi5"):
if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"):
return True
if f == "arm64" and arch == "arm64":
return True
@ -546,7 +547,7 @@ def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5"):
if hardware in ("rpi4", "rpi5", "rpi"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
@ -580,6 +581,14 @@ def node_inventory_live() -> list[dict[str, Any]]:
)
return sorted(inventory, key=lambda item: item["name"])
def node_inventory() -> list[dict[str, Any]]:
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot)
if inventory:
return inventory
return node_inventory_live()
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
@ -591,7 +600,7 @@ def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None =
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
if inventory is None:
inventory = node_inventory_live()
inventory = node_inventory()
if not inventory:
return ""
groups = _group_nodes(inventory)
@ -626,7 +635,7 @@ def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None =
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = normalize_query(prompt)
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory_live()
return node_inventory()
return []
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
@ -656,11 +665,177 @@ def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
"expected_missing": sorted(expected_missing),
}
def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str) -> str:
def _workload_tokens(entry: dict[str, Any]) -> set[str]:
tokens: set[str] = set()
for key in ("workload", "namespace"):
value = entry.get(key)
if isinstance(value, str) and value:
tokens.update(_tokens(value))
return tokens
def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None:
q_tokens = set(_tokens(prompt))
if not q_tokens:
return None
scored: list[tuple[int, dict[str, Any]]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
tokens = _workload_tokens(entry)
score = len(tokens & q_tokens)
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _format_confidence(answer: str, confidence: str) -> str:
if not answer:
return ""
return f"{answer}\nConfidence: {confidence}."
def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str:
q = normalize_query(prompt)
if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")):
return ""
entry = _select_workload(prompt, workloads)
if not entry:
return ""
workload = entry.get("workload") or ""
namespace = entry.get("namespace") or ""
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
primary = entry.get("primary_node") or ""
if not workload or not nodes:
return ""
parts = []
if primary:
parts.append(f"{primary} (primary)")
for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])):
if node == primary:
continue
parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})")
node_text = ", ".join(parts) if parts else primary
answer = f"{workload} runs in {namespace}. Nodes: {node_text}."
return _format_confidence(answer, "medium")
def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _node_usage_top(
usage: list[dict[str, Any]],
*,
allowed_nodes: set[str] | None,
) -> tuple[str, float] | None:
best_node = ""
best_val = None
for item in usage if isinstance(usage, list) else []:
if not isinstance(item, dict):
continue
node = item.get("node") or ""
if allowed_nodes and node not in allowed_nodes:
continue
value = item.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best_val is None or numeric > best_val:
best_val = numeric
best_node = node
if best_node and best_val is not None:
return best_node, best_val
return None
def snapshot_metric_answer(
prompt: str,
*,
snapshot: dict[str, Any] | None,
inventory: list[dict[str, Any]],
) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
if not metrics:
return ""
q = normalize_query(prompt)
metric = _detect_metric(q)
op = _detect_operation(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
allowed_nodes = {node["name"] for node in filtered} if filtered else None
if metric in {"cpu", "ram", "net", "io"} and op in {"top", "status", None}:
usage = metrics.get("node_usage", {}).get(metric, [])
top = _node_usage_top(usage, allowed_nodes=allowed_nodes)
if top:
node, val = top
percent = metric in {"cpu", "ram"}
value = _format_metric_value(str(val), percent=percent)
scope = ""
if include_hw:
scope = f" among {' and '.join(sorted(include_hw))}"
answer = f"Hottest node{scope}: {node} ({value})."
return _format_confidence(answer, "high")
if metric == "connections" or "postgres" in q:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).")
if parts:
return _format_confidence(" ".join(parts), "high")
return ""
def structured_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
metrics_summary: str,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
q = normalize_query(prompt)
if not q:
return ""
if workloads:
workload_resp = workload_answer(prompt, workloads)
if workload_resp:
return workload_resp
snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory)
if snap_resp:
return snap_resp
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
@ -749,11 +924,20 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s
if op == "status":
if "missing" in q and expected_workers:
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
return "Missing nodes: " + (", ".join(missing) if missing else "none") + "."
return _format_confidence(
"Missing nodes: " + (", ".join(missing) if missing else "none") + ".",
"high",
)
if only_ready is False:
return "Not ready nodes: " + (", ".join(names) if names else "none") + "."
return _format_confidence(
"Not ready nodes: " + (", ".join(names) if names else "none") + ".",
"high",
)
if only_ready is True:
return f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + "."
return _format_confidence(
f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + ".",
"high",
)
if op == "count":
if expected_workers and ("expected" in q or "should" in q):
@ -761,10 +945,10 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s
msg = f"Grafana inventory expects {len(expected_workers)} worker nodes."
if missing:
msg += f" Missing: {', '.join(missing)}."
return msg
return _format_confidence(msg, "high")
if not (include_hw or exclude_hw or nodes_in_query or only_workers):
return f"Atlas has {len(names)} nodes."
return f"Matching nodes: {len(names)}."
return _format_confidence(f"Atlas has {len(names)} nodes.", "high")
return _format_confidence(f"Matching nodes: {len(names)}.", "high")
if op == "list":
if nodes_in_query:
@ -772,12 +956,12 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s
existing = {n["name"] for n in inventory}
for node in nodes_in_query:
parts.append(f"{node}: {'present' if node in existing else 'not present'}")
return "Node presence: " + ", ".join(parts) + "."
return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high")
if not names:
return "Matching nodes: none."
return _format_confidence("Matching nodes: none.", "high")
shown = names[:30]
suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else ""
return "Matching nodes: " + ", ".join(shown) + suffix + "."
return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high")
return ""
@ -922,6 +1106,58 @@ def _ariadne_state(timeout: int = 5) -> dict | None:
except Exception:
return None
_SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0}
def _snapshot_state() -> dict[str, Any] | None:
now = time.monotonic()
cached = _SNAPSHOT_CACHE.get("payload")
ts = _SNAPSHOT_CACHE.get("ts") or 0.0
if cached and now - ts < max(5, SNAPSHOT_TTL_SEC):
return cached
payload = _ariadne_state(timeout=10)
if isinstance(payload, dict) and payload:
_SNAPSHOT_CACHE["payload"] = payload
_SNAPSHOT_CACHE["ts"] = now
return payload
return cached if isinstance(cached, dict) else None
def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
items = snapshot.get("nodes_detail")
if not isinstance(items, list):
return []
inventory: list[dict[str, Any]] = []
for node in items:
if not isinstance(node, dict):
continue
labels = node.get("labels") if isinstance(node.get("labels"), dict) else {}
name = node.get("name") or ""
if not name:
continue
hardware = node.get("hardware") or _hardware_class(labels)
inventory.append(
{
"name": name,
"arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": hardware,
"roles": node.get("roles") or [],
"is_worker": node.get("is_worker") is True,
"ready": node.get("ready") is True,
}
)
return sorted(inventory, key=lambda item: item["name"])
def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
workloads = snapshot.get("workloads")
return workloads if isinstance(workloads, list) else []
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
@ -1079,25 +1315,11 @@ def _node_is_worker(node: dict) -> bool:
return True
return True
def worker_nodes_status() -> tuple[list[str], list[str]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ([], [])
items = data.get("items") or []
ready_nodes: list[str] = []
not_ready_nodes: list[str] = []
for node in items if isinstance(items, list) else []:
if not _node_is_worker(node):
continue
name = (node.get("metadata") or {}).get("name") or ""
if not name:
continue
ready = _node_ready_status(node)
if ready is True:
ready_nodes.append(name)
elif ready is False:
not_ready_nodes.append(name)
def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]:
if inventory is None:
inventory = node_inventory()
ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True]
not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False]
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
@ -1238,13 +1460,29 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
if not prompt:
self._write_json(400, {"error": "missing_prompt"})
return
inventory = node_inventory_live()
answer = structured_answer(prompt, inventory=inventory, metrics_summary="")
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
answer = structured_answer(
prompt,
inventory=inventory,
metrics_summary="",
snapshot=snapshot,
workloads=workloads,
)
if not answer and _knowledge_intent(prompt):
answer = knowledge_summary(prompt, inventory)
if not answer:
kb = kb_retrieve_titles(prompt, limit=4)
answer = kb or ""
context = build_context(
prompt,
allow_tools=False,
targets=[],
inventory=inventory,
snapshot=snapshot,
)
fallback = kb or "I don't have enough data to answer that."
answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback)
self._write_json(200, {"answer": answer})
@ -1266,6 +1504,7 @@ def build_context(
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
snapshot: dict[str, Any] | None = None,
) -> str:
parts: list[str] = []
@ -1281,6 +1520,10 @@ def build_context(
if node_ctx:
parts.append(node_ctx)
snapshot_ctx = snapshot_context(prompt, snapshot)
if snapshot_ctx:
parts.append(snapshot_ctx)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
@ -1311,6 +1554,33 @@ def build_context(
return "\n\n".join([p for p in parts if p]).strip()
def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
workloads = _snapshot_workloads(snapshot)
q = normalize_query(prompt)
parts: list[str] = []
nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {}
if nodes.get("total") is not None:
parts.append(
f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}."
)
if any(word in q for word in ("postgres", "connections", "db")):
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if postgres:
parts.append(f"Snapshot: postgres_connections={postgres}.")
if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")):
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
parts.append(f"Snapshot: hottest_nodes={hottest}.")
if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")):
match = _select_workload(prompt, workloads)
if match:
parts.append(f"Snapshot: workload={match}.")
return "\n".join(parts).strip()
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
return any(
@ -1350,7 +1620,8 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
parts.append(kb_titles)
return "\n".join(parts).strip()
summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
@ -1360,7 +1631,8 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
"If the answer is not grounded in the provided context or tool data, say you do not know. "
"End every response with a line: 'Confidence: high|medium|low'."
)
transcript_parts = [system]
if context:
@ -1491,8 +1763,18 @@ def sync_loop(token: str, room_id: str):
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
snapshot = _snapshot_state()
inventory = node_inventory_for_prompt(body)
context = build_context(body, allow_tools=allow_tools, targets=targets, inventory=inventory)
if not inventory:
inventory = _snapshot_inventory(snapshot)
workloads = _snapshot_workloads(snapshot)
context = build_context(
body,
allow_tools=allow_tools,
targets=targets,
inventory=inventory,
snapshot=snapshot,
)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
@ -1506,7 +1788,13 @@ def sync_loop(token: str, room_id: str):
if not fallback and context:
fallback = _context_fallback(context)
structured = structured_answer(body, inventory=inventory, metrics_summary=metrics_fallback or "")
structured = structured_answer(
body,
inventory=inventory,
metrics_summary=metrics_fallback or "",
snapshot=snapshot,
workloads=workloads,
)
if structured:
send_msg(token, rid, structured)
continue