atlasbot: shift to facts context and upgrade model

This commit is contained in:
Brad Stein 2026-01-27 06:28:03 -03:00
parent 4fcecc4707
commit a2f4c51e1d
3 changed files with 151 additions and 60 deletions

View File

@ -20,7 +20,7 @@ spec:
labels:
app: ollama
annotations:
ai.bstein.dev/model: qwen2.5:7b-instruct-q4_0
ai.bstein.dev/model: qwen2.5:14b-instruct-q4_0
ai.bstein.dev/gpu: GPU pool (titan-22/24)
ai.bstein.dev/restartedAt: "2026-01-26T12:00:00Z"
spec:
@ -52,7 +52,7 @@ spec:
- name: OLLAMA_MODELS
value: /root/.ollama
- name: OLLAMA_MODEL
value: qwen2.5:7b-instruct-q4_0
value: qwen2.5:14b-instruct-q4_0
command:
- /bin/sh
- -c

View File

@ -16,7 +16,7 @@ spec:
labels:
app: atlasbot
annotations:
checksum/atlasbot-configmap: manual-atlasbot-29
checksum/atlasbot-configmap: manual-atlasbot-30
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"
@ -82,7 +82,7 @@ spec:
- name: OLLAMA_URL
value: http://chat-ai-gateway.bstein-dev-home.svc.cluster.local/
- name: OLLAMA_MODEL
value: qwen2.5:7b-instruct-q4_0
value: qwen2.5:14b-instruct-q4_0
- name: OLLAMA_TIMEOUT_SEC
value: "600"
- name: ATLASBOT_THINKING_INTERVAL_SEC

View File

@ -33,7 +33,10 @@ SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2"))
OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false"
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
@ -113,6 +116,8 @@ METRIC_HINTS = {
"connections": ("connections", "conn", "postgres", "database", "db"),
}
_OLLAMA_LOCK = threading.Lock()
HARDWARE_HINTS = {
"amd64": ("amd64", "x86", "x86_64", "x86-64"),
"jetson": ("jetson",),
@ -638,6 +643,105 @@ def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
return node_inventory()
return []
def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[(node.get("arch") or "unknown")].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]:
usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
per_node: dict[str, dict[str, Any]] = {}
for metric_name, entries in usage.items() if isinstance(usage, dict) else []:
if not isinstance(entries, list):
continue
for entry in entries:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
per_node.setdefault(node, {})[metric_name] = entry.get("value")
return [{"node": node, **vals} for node, vals in sorted(per_node.items())]
def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]:
cleaned: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
cleaned.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes": entry.get("nodes"),
}
)
cleaned.sort(
key=lambda item: (
-(item.get("pods_total") or 0),
str(item.get("namespace") or ""),
str(item.get("workload") or ""),
)
)
return cleaned[:limit]
def facts_context(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
inv = inventory or []
metrics = _snapshot_metrics(snapshot)
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
expected_workers = expected_worker_nodes_from_metrics()
ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], [])
facts: dict[str, Any] = {
"generated_at": snapshot.get("generated_at") if isinstance(snapshot, dict) else None,
"nodes": {
"total": summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total"),
"ready": summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready"),
"not_ready": summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready"),
"not_ready_names": summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names"),
"by_hardware": _group_nodes(inv) if inv else {},
"by_arch": _nodes_by_arch(inv) if inv else {},
"workers_ready": ready_workers,
"workers_not_ready": not_ready_workers,
"expected_workers": expected_workers,
},
"metrics": {
"hottest_nodes": metrics.get("hottest_nodes") if isinstance(metrics, dict) else {},
"postgres_connections": metrics.get("postgres_connections") if isinstance(metrics, dict) else {},
"node_usage": _node_usage_table(metrics) if isinstance(metrics, dict) else [],
},
"workloads": _workloads_for_facts(workloads or []),
}
rendered = json.dumps(facts, ensure_ascii=False)
if len(rendered) <= MAX_FACTS_CHARS:
return "Facts (live snapshot):\n" + rendered
trimmed = dict(facts)
trimmed.pop("workloads", None)
rendered = json.dumps(trimmed, ensure_ascii=False)
if len(rendered) <= MAX_FACTS_CHARS:
return "Facts (live snapshot):\n" + rendered
trimmed_metrics = dict(trimmed.get("metrics") or {})
trimmed_metrics.pop("node_usage", None)
trimmed["metrics"] = trimmed_metrics
rendered = json.dumps(trimmed, ensure_ascii=False)
if len(rendered) <= MAX_FACTS_CHARS:
return "Facts (live snapshot):\n" + rendered
return "Facts (live snapshot):\n" + rendered[:MAX_FACTS_CHARS]
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
@ -1463,26 +1567,19 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
answer = structured_answer(
context = build_context(
prompt,
allow_tools=False,
targets=[],
inventory=inventory,
metrics_summary="",
snapshot=snapshot,
workloads=workloads,
)
if not answer and _knowledge_intent(prompt):
answer = knowledge_summary(prompt, inventory)
if not answer:
kb = kb_retrieve_titles(prompt, limit=4)
context = build_context(
prompt,
allow_tools=False,
targets=[],
inventory=inventory,
snapshot=snapshot,
)
fallback = kb or "I don't have enough data to answer that."
answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback)
metrics_context, metrics_fallback = metrics_query_context(prompt, allow_tools=True)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that."
answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback)
self._write_json(200, {"answer": answer})
@ -1505,10 +1602,13 @@ def build_context(
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if not kb and _knowledge_intent(prompt):
kb = kb_retrieve_titles(prompt, limit=4)
if kb:
parts.append(kb)
@ -1516,13 +1616,9 @@ def build_context(
if endpoints:
parts.append(endpoints)
node_ctx = node_inventory_context(prompt, inventory)
if node_ctx:
parts.append(node_ctx)
snapshot_ctx = snapshot_context(prompt, snapshot)
if snapshot_ctx:
parts.append(snapshot_ctx)
facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if facts:
parts.append(facts)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
@ -1627,7 +1723,9 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Use the provided context and facts as your source of truth. "
"If you infer or synthesize, say 'Based on the snapshot' and keep it brief. "
"Prefer exact repo paths and Kubernetes resource names when relevant. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
@ -1646,21 +1744,32 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None
if lock:
lock.acquire()
try:
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
finally:
if lock:
lock.release()
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception:
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
last_error = None
for attempt in range(max(1, OLLAMA_RETRIES + 1)):
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(min(4, 2 ** attempt))
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "I don't have enough data to answer that."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
@ -1774,6 +1883,7 @@ def sync_loop(token: str, room_id: str):
targets=targets,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
@ -1784,26 +1894,7 @@ def sync_loop(token: str, room_id: str):
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = metrics_fallback or ""
if not fallback and context:
fallback = _context_fallback(context)
structured = structured_answer(
body,
inventory=inventory,
metrics_summary=metrics_fallback or "",
snapshot=snapshot,
workloads=workloads,
)
if structured:
send_msg(token, rid, structured)
continue
if _knowledge_intent(body):
summary = knowledge_summary(body, inventory)
if summary:
send_msg(token, rid, summary)
continue
fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that."
reply = ollama_reply_with_thinking(
token,