4391 lines
156 KiB
Python

import collections
import json
import os
import re
import ssl
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
MODEL_FAST = os.environ.get("ATLASBOT_MODEL_FAST", "")
MODEL_DEEP = os.environ.get("ATLASBOT_MODEL_DEEP", "")
FALLBACK_MODEL = os.environ.get("OLLAMA_FALLBACK_MODEL", "")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
SNAPSHOT_TTL_SEC = int(os.environ.get("ATLASBOT_SNAPSHOT_TTL_SEC", "30"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000"))
MAX_CONTEXT_CHARS = int(os.environ.get("ATLASBOT_MAX_CONTEXT_CHARS", "12000"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2"))
OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false"
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
"system",
"systems",
"service",
"services",
"app",
"apps",
"platform",
"software",
"tool",
"tools",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
"pod",
"pods",
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE)
_DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D"
CONFIDENCE_RE = re.compile(r"confidence\s*:\s*(high|medium|low)\b", re.IGNORECASE)
OPERATION_HINTS = {
"count": ("how many", "count", "number", "total"),
"list": ("list", "which", "what are", "show", "names"),
"top": ("top", "hottest", "highest", "most", "largest", "max", "maximum", "busiest", "busy"),
"bottom": ("lowest", "least", "minimum", "min", "smallest"),
"status": ("ready", "not ready", "unready", "down", "missing", "status"),
}
METRIC_HINTS = {
"cpu": ("cpu",),
"ram": ("ram", "memory", "mem"),
"net": ("net", "network", "bandwidth", "throughput"),
"io": ("io", "disk", "storage"),
"connections": ("connections", "conn", "postgres", "database", "db"),
"pods": ("pods", "pod"),
}
CLUSTER_HINT_WORDS = {
"atlas",
"titan",
"cluster",
"k8s",
"kubernetes",
"health",
"node",
"nodes",
"hardware",
"architecture",
"worker",
"workers",
"pod",
"pods",
"namespace",
"service",
"deployment",
"daemonset",
"statefulset",
"snapshot",
"anomaly",
"anomalies",
"monitor",
"monitoring",
"runbook",
"runbooks",
"documentation",
"docs",
"playbook",
"utilization",
"usage",
"grafana",
"victoria",
"prometheus",
"ariadne",
"mailu",
"nextcloud",
"vaultwarden",
"firefly",
"wger",
"jellyfin",
"planka",
"budget",
"element",
"synapse",
"mas",
"comms",
"longhorn",
"harbor",
"jenkins",
"gitea",
"flux",
"keycloak",
"postgres",
"database",
"db",
"atlasbot",
"jetson",
"rpi",
"raspberry",
"amd64",
"arm64",
}
_INSIGHT_HINT_WORDS = {
"interesting",
"unconventional",
"surprising",
"weird",
"odd",
"unusual",
"outlier",
"fun",
"cool",
"unique",
"notable",
"coolest",
"risk",
"risky",
"favorite",
"favourite",
"trivia",
"anomaly",
"anomalies",
"monitor",
"monitoring",
"alert",
"alerts",
"stand out",
"stands out",
}
_OVERVIEW_HINT_WORDS = {
"overview",
"summary",
"describe",
"explain",
"tell me about",
"what do you know",
"health",
}
_OLLAMA_LOCK = threading.Lock()
HARDWARE_HINTS = {
"amd64": ("amd64", "x86", "x86_64", "x86-64"),
"jetson": ("jetson",),
"rpi4": ("rpi4", "raspberry pi 4", "raspberry pi-4"),
"rpi5": ("rpi5", "raspberry pi 5", "raspberry pi-5"),
"rpi": ("rpi", "raspberry"),
"arm64": ("arm64", "aarch64"),
}
def normalize_query(text: str) -> str:
cleaned = (text or "").lower()
for ch in _DASH_CHARS:
cleaned = cleaned.replace(ch, "-")
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
def _ensure_confidence(text: str) -> str:
if not text:
return ""
lines = text.strip().splitlines()
for idx, line in enumerate(lines):
match = CONFIDENCE_RE.search(line)
if match:
level = match.group(1).lower()
lines[idx] = CONFIDENCE_RE.sub(f"Confidence: {level}", line)
return "\n".join(lines)
lines.append("Confidence: medium")
return "\n".join(lines)
def _ollama_endpoint() -> str:
url = (OLLAMA_URL or "").strip()
if not url:
return ""
if url.endswith("/api/chat"):
return url
return url.rstrip("/") + "/api/chat"
def _history_to_messages(lines: list[str]) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
for line in lines:
raw = (line or "").strip()
if not raw:
continue
role = "user"
content = raw
lowered = raw.lower()
if lowered.startswith("atlas:"):
role = "assistant"
content = raw.split(":", 1)[1].strip()
elif lowered.startswith("user:"):
role = "user"
content = raw.split(":", 1)[1].strip()
elif ":" in raw:
content = raw.split(":", 1)[1].strip()
if content:
messages.append({"role": role, "content": content})
return messages
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
if _body_mentions_token(body or ""):
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
def _strip_bot_mention(text: str) -> str:
if not text:
return ""
if not MENTION_LOCALPARTS:
return text.strip()
names = [re.escape(name) for name in MENTION_LOCALPARTS if name]
if not names:
return text.strip()
pattern = r"^(?:\s*@?(?:" + "|".join(names) + r")(?::)?\s+)+"
cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE).strip()
return cleaned or text.strip()
def _detect_mode_from_body(body: str, *, default: str = "deep") -> str:
lower = normalize_query(body or "")
if "atlas_quick" in lower or "atlas-quick" in lower:
return "fast"
if "atlas_smart" in lower or "atlas-smart" in lower:
return "deep"
if lower.startswith("quick ") or lower.startswith("fast "):
return "fast"
if lower.startswith("smart ") or lower.startswith("deep "):
return "deep"
return default
def _model_for_mode(mode: str) -> str:
if mode == "fast" and MODEL_FAST:
return MODEL_FAST
if mode == "deep" and MODEL_DEEP:
return MODEL_DEEP
return MODEL
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
def _score_kb_docs(query: str) -> list[dict[str, Any]]:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return []
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return []
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [d for _, d in scored]
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q:
return ""
scored = _score_kb_docs(q)
picked = scored[:limit]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def kb_retrieve_titles(query: str, *, limit: int = 4) -> str:
scored = _score_kb_docs(query)
picked = scored[:limit]
if not picked:
return ""
parts = ["Relevant runbooks:"]
for doc in picked:
title = doc.get("title") or doc.get("path") or "runbook"
path = doc.get("path") or ""
if path:
parts.append(f"- {title} ({path})")
else:
parts.append(f"- {title}")
return "\n".join(parts)
def _extract_titan_nodes(text: str) -> list[str]:
cleaned = normalize_query(text)
names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(cleaned):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _humanize_rate(value: str, *, unit: str) -> str:
try:
val = float(value)
except (TypeError, ValueError):
return value
if unit == "%":
return f"{val:.1f}%"
if val >= 1024 * 1024:
return f"{val / (1024 * 1024):.2f} MB/s"
if val >= 1024:
return f"{val / 1024:.2f} KB/s"
return f"{val:.2f} B/s"
def _has_any(text: str, phrases: tuple[str, ...]) -> bool:
for phrase in phrases:
if " " in phrase:
if phrase in text:
return True
else:
if re.search(rf"\b{re.escape(phrase)}\b", text):
return True
return False
def _detect_operation(q: str) -> str | None:
if _has_any(q, OPERATION_HINTS["top"]):
return "top"
if _has_any(q, OPERATION_HINTS["bottom"]):
return "bottom"
for op, phrases in OPERATION_HINTS.items():
if op in ("top", "bottom"):
continue
if _has_any(q, phrases):
return op
return None
def _detect_metric(q: str) -> str | None:
q = normalize_query(q)
if _has_any(q, ("disk", "storage")):
return "io"
if _has_any(q, ("io",)) and not _has_any(q, METRIC_HINTS["net"]):
return "io"
for metric, phrases in METRIC_HINTS.items():
if _has_any(q, phrases):
return metric
tokens = set(_tokens(q))
expanded: set[str] = set(tokens)
for token in list(tokens):
for part in re.split(r"[-_]", token):
part = part.strip()
if len(part) >= 2:
expanded.add(part)
if part.endswith("s") and len(part) >= 4:
expanded.add(part[:-1])
tokens = expanded
for metric, phrases in METRIC_HINTS.items():
for phrase in phrases:
if " " in phrase:
if phrase in q:
return metric
elif phrase in tokens:
return metric
return None
def _detect_hardware_filters(q: str) -> tuple[set[str], set[str]]:
include: set[str] = set()
exclude: set[str] = set()
if any(term in q for term in ("gpu", "gpus", "accelerator", "accelerators", "cuda", "nvidia")):
include.add("jetson")
rpi_specific = any(
phrase in q
for phrase in (
"rpi4",
"rpi5",
"raspberry pi 4",
"raspberry pi 5",
"raspberry pi-4",
"raspberry pi-5",
)
)
for hardware, phrases in HARDWARE_HINTS.items():
if hardware == "rpi" and rpi_specific:
continue
for phrase in phrases:
if f"non {phrase}" in q or f"non-{phrase}" in q or f"not {phrase}" in q:
exclude.add(hardware)
elif phrase in q:
include.add(hardware)
return include, exclude
def _detect_role_filters(q: str) -> set[str]:
roles: set[str] = set()
if "control-plane" in q or "control plane" in q:
roles.add("control-plane")
if "master" in q:
roles.add("master")
if "accelerator" in q:
roles.add("accelerator")
return roles
def _detect_entity(q: str) -> str | None:
if (
"node" in q
or "nodes" in q
or "worker" in q
or "hardware" in q
or "architecture" in q
or "machine" in q
or "machines" in q
or "host" in q
or "hosts" in q
or "hostname" in q
or "hostnames" in q
or TITAN_NODE_RE.search(q)
):
return "node"
if "pod" in q or "pods" in q:
return "pod"
if "namespace" in q or "namespaces" in q:
return "namespace"
return None
def _metric_entry_score(entry: dict[str, Any], tokens: list[str], *, metric: str | None, op: str | None) -> int:
hay = _metric_tokens(entry)
score = 0
for t in set(tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if metric:
for phrase in METRIC_HINTS.get(metric, (metric,)):
if phrase in hay:
score += 3
if op == "top" and ("hottest" in hay or "top" in hay):
score += 3
if "node" in hay:
score += 1
return score
def _select_metric_entry(tokens: list[str], *, metric: str | None, op: str | None) -> dict[str, Any] | None:
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
score = _metric_entry_score(entry, tokens, metric=metric, op=op)
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _apply_node_filter(expr: str, node_regex: str | None) -> str:
if not node_regex:
return expr
needle = 'node_uname_info{nodename!=""}'
replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}'
return expr.replace(needle, replacement)
def _metric_expr_uses_percent(entry: dict[str, Any]) -> bool:
exprs = entry.get("exprs")
expr = exprs[0] if isinstance(exprs, list) and exprs else ""
return "* 100" in expr or "*100" in expr
def _format_metric_value(value: str, *, percent: bool, rate: bool = False) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return value
if percent:
return f"{num:.1f}%"
if rate:
return _humanize_rate(value, unit="rate")
if abs(num) >= 1:
return f"{num:.2f}".rstrip("0").rstrip(".")
return f"{num:.4f}".rstrip("0").rstrip(".")
def _format_metric_label(metric: dict[str, Any]) -> str:
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts:
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
return ", ".join(label_parts) if label_parts else "series"
def _primary_series_metric(res: dict | None) -> tuple[str | None, str | None]:
series = _vm_value_series(res or {})
if not series:
return (None, None)
first = series[0]
metric = first.get("metric") if isinstance(first, dict) else {}
value = first.get("value") if isinstance(first, dict) else []
node = metric.get("node") if isinstance(metric, dict) else None
val = value[1] if isinstance(value, list) and len(value) > 1 else None
return (node, val)
def _format_metric_answer(entry: dict[str, Any], res: dict | None) -> str:
series = _vm_value_series(res)
panel = entry.get("panel_title") or "Metric"
if not series:
return ""
percent = _metric_expr_uses_percent(entry)
lines: list[str] = []
for r in series[:5]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
label = _format_metric_label(metric if isinstance(metric, dict) else {})
lines.append(f"{label}: {_format_metric_value(val, percent=percent)}")
if not lines:
return ""
if len(lines) == 1:
return f"{panel}: {lines[0]}."
return f"{panel}:\n" + "\n".join(f"- {line}" for line in lines)
def _inventory_filter(
inventory: list[dict[str, Any]],
*,
include_hw: set[str],
exclude_hw: set[str],
only_workers: bool,
only_ready: bool | None,
nodes_in_query: list[str],
) -> list[dict[str, Any]]:
results = inventory
if nodes_in_query:
results = [node for node in results if node.get("name") in nodes_in_query]
if only_workers:
results = [node for node in results if node.get("is_worker") is True]
if only_ready is True:
results = [node for node in results if node.get("ready") is True]
if only_ready is False:
results = [node for node in results if node.get("ready") is False]
if include_hw:
results = [node for node in results if _hardware_match(node, include_hw)]
if exclude_hw:
results = [node for node in results if not _hardware_match(node, exclude_hw)]
return results
def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool:
hw = node.get("hardware") or ""
arch = node.get("arch") or ""
for f in filters:
if f == "rpi" and hw in ("rpi4", "rpi5", "rpi"):
return True
if f == "arm64" and arch == "arm64":
return True
if hw == f:
return True
if f == "amd64" and arch == "amd64":
return True
return False
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5", "rpi"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
return "amd64"
if arch == "arm64":
return "arm64-unknown"
return "unknown"
def node_inventory_live() -> list[dict[str, Any]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return []
items = data.get("items") or []
inventory: list[dict[str, Any]] = []
for node in items if isinstance(items, list) else []:
meta = node.get("metadata") or {}
labels = meta.get("labels") or {}
name = meta.get("name") or ""
if not name:
continue
inventory.append(
{
"name": name,
"arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": _hardware_class(labels),
"roles": _node_roles(labels),
"is_worker": _node_is_worker(node),
"ready": _node_ready_status(node),
}
)
return sorted(inventory, key=lambda item: item["name"])
def node_inventory() -> list[dict[str, Any]]:
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot)
if inventory:
return inventory
return node_inventory_live()
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[node.get("hardware") or "unknown"].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str:
q = normalize_query(query)
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
if inventory is None:
inventory = node_inventory()
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = sum(1 for node in inventory if node.get("ready") is True)
not_ready = sum(1 for node in inventory if node.get("ready") is False)
lines: list[str] = [
"Node inventory (live):",
f"- total: {total}, ready: {ready}, not ready: {not_ready}",
]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
if key in groups:
lines.append(f"- {key}: {', '.join(groups[key])}")
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}")
unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", [])
if unknowns:
lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.")
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_workers, not_ready_workers = worker_nodes_status()
missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers))
lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}")
lines.append(f"- workers_ready: {', '.join(ready_workers)}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if missing:
lines.append(f"- workers_missing (derived): {', '.join(missing)}")
return "\n".join(lines)
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = normalize_query(prompt)
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory()
return []
def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[(node.get("arch") or "unknown")].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]:
usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {}
per_node: dict[str, dict[str, Any]] = {}
for metric_name, entries in usage.items() if isinstance(usage, dict) else []:
if not isinstance(entries, list):
continue
for entry in entries:
if not isinstance(entry, dict):
continue
node = entry.get("node")
if not isinstance(node, str) or not node:
continue
per_node.setdefault(node, {})[metric_name] = entry.get("value")
return [{"node": node, **vals} for node, vals in sorted(per_node.items())]
def _usage_extremes(usage_table: list[dict[str, Any]]) -> dict[str, tuple[str, float]]:
extremes: dict[str, tuple[str, float]] = {}
for metric in ("cpu", "ram", "net", "io"):
values: list[tuple[str, float]] = []
for entry in usage_table:
node = entry.get("node")
raw = entry.get(metric)
if not node or raw is None:
continue
try:
value = float(raw)
except (TypeError, ValueError):
continue
values.append((node, value))
if not values:
continue
lowest = min(values, key=lambda item: item[1])
highest = max(values, key=lambda item: item[1])
extremes[f"min_{metric}"] = lowest
extremes[f"max_{metric}"] = highest
return extremes
def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]:
cleaned: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
cleaned.append(
{
"namespace": entry.get("namespace"),
"workload": entry.get("workload"),
"pods_total": entry.get("pods_total"),
"pods_running": entry.get("pods_running"),
"primary_node": entry.get("primary_node"),
"nodes": entry.get("nodes"),
}
)
cleaned.sort(
key=lambda item: (
-(item.get("pods_total") or 0),
str(item.get("namespace") or ""),
str(item.get("workload") or ""),
)
)
return cleaned[:limit]
def _workloads_for_prompt(prompt: str, workloads: list[dict[str, Any]], limit: int = 12) -> list[dict[str, Any]]:
tokens = set(_tokens(prompt))
if tokens:
matched: list[dict[str, Any]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
entry_tokens = _workload_tokens(entry)
if entry_tokens & tokens:
matched.append(entry)
if matched:
return _workloads_for_facts(matched, limit=limit)
return _workloads_for_facts(workloads, limit=limit)
def facts_context(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
inv = inventory or []
nodes_in_query = _extract_titan_nodes(prompt)
metrics = _snapshot_metrics(snapshot)
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
expected_workers = expected_worker_nodes_from_metrics()
ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], [])
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
not_ready_names = summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names")
by_hardware = _group_nodes(inv) if inv else {}
by_arch = _nodes_by_arch(inv) if inv else {}
control_plane_nodes = [
node["name"]
for node in inv
if any(role in ("control-plane", "master") for role in (node.get("roles") or []))
]
worker_nodes = [node["name"] for node in inv if node.get("is_worker") is True]
lines: list[str] = ["Facts (live snapshot):"]
if total is not None:
lines.append(f"- nodes_total={total}, ready={ready}, not_ready={not_ready}")
if isinstance(summary, dict):
by_arch_counts = summary.get("by_arch")
if isinstance(by_arch_counts, dict) and by_arch_counts:
parts = [f"{arch}={count}" for arch, count in sorted(by_arch_counts.items())]
lines.append(f"- nodes_by_arch: {', '.join(parts)}")
if not_ready_names:
lines.append(f"- nodes_not_ready: {', '.join(not_ready_names)}")
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes_list = by_hardware.get(key) or []
if nodes_list:
lines.append(f"- {key}: {', '.join(nodes_list)}")
non_rpi = sorted(set(by_hardware.get("jetson", [])) | set(by_hardware.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi: {', '.join(non_rpi)}")
for key, nodes_list in sorted(by_arch.items()):
if nodes_list:
lines.append(f"- arch {key}: {', '.join(nodes_list)}")
if control_plane_nodes:
lines.append(f"- control_plane_nodes: {', '.join(control_plane_nodes)}")
control_plane_by_hw: dict[str, list[str]] = collections.defaultdict(list)
for node in inv:
if node.get("name") in control_plane_nodes:
control_plane_by_hw[node.get("hardware") or "unknown"].append(node["name"])
parts = [f"{hw}={', '.join(sorted(nodes))}" for hw, nodes in sorted(control_plane_by_hw.items())]
if parts:
lines.append(f"- control_plane_by_hardware: {', '.join(parts)}")
if worker_nodes:
lines.append(f"- worker_nodes: {', '.join(worker_nodes)}")
if ready_workers or not_ready_workers:
lines.append(f"- workers_ready: {', '.join(ready_workers) if ready_workers else 'none'}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if expected_workers and any(word in normalize_query(prompt) for word in ("missing", "expected", "should", "not ready", "unready")):
missing = sorted(
set(expected_workers)
- {n.get("name") for n in inv if isinstance(n, dict) and n.get("name")}
)
lines.append(f"- expected_workers: {', '.join(expected_workers)}")
if missing:
lines.append(f"- expected_workers_missing: {', '.join(missing)}")
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
lines.append(f"- hottest_{key}: {node} ({value_fmt})")
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if isinstance(postgres, dict) and postgres:
used = postgres.get("used")
max_conn = postgres.get("max")
if used is not None and max_conn is not None:
lines.append(f"- postgres_connections: {used} used / {max_conn} max")
hottest_db = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
if hottest_db.get("label"):
lines.append(
f"- postgres_hottest_db: {hottest_db.get('label')} ({hottest_db.get('value')})"
)
for key in ("pods_running", "pods_pending", "pods_failed", "pods_succeeded"):
value = metrics.get(key)
if value is not None:
lines.append(f"- {key}: {value}")
top_restarts = metrics.get("top_restarts_1h") if isinstance(metrics.get("top_restarts_1h"), list) else []
if top_restarts:
items = []
for entry in top_restarts[:5]:
if not isinstance(entry, dict):
continue
metric = entry.get("metric") or {}
pod = metric.get("pod") or metric.get("name") or ""
ns = metric.get("namespace") or ""
value = entry.get("value")
label = f"{ns}/{pod}".strip("/")
if label and value is not None:
items.append(f"{label}={value}")
if items:
lines.append(f"- top_restarts_1h: {', '.join(items)}")
usage_table = _node_usage_table(metrics)
if usage_table:
lines.append("- node_usage (cpu/ram/net/io):")
for entry in usage_table:
node = entry.get("node")
if not node:
continue
cpu = _format_metric_value(str(entry.get("cpu")), percent=True) if entry.get("cpu") is not None else ""
ram = _format_metric_value(str(entry.get("ram")), percent=True) if entry.get("ram") is not None else ""
net = (
_format_metric_value(str(entry.get("net")), percent=False, rate=True)
if entry.get("net") is not None
else ""
)
io_val = (
_format_metric_value(str(entry.get("io")), percent=False, rate=True)
if entry.get("io") is not None
else ""
)
lines.append(f" - {node}: cpu={cpu}, ram={ram}, net={net}, io={io_val}")
extremes = _usage_extremes(usage_table)
for metric in ("cpu", "ram", "net", "io"):
min_key = f"min_{metric}"
if min_key not in extremes:
continue
node, value = extremes[min_key]
value_fmt = _format_metric_value(
str(value),
percent=metric in ("cpu", "ram"),
rate=metric in ("net", "io"),
)
lines.append(f"- lowest_{metric}: {node} ({value_fmt})")
for metric in ("cpu", "ram"):
hottest_parts: list[str] = []
lowest_parts: list[str] = []
for hw, nodes_list in sorted(by_hardware.items()):
entries = []
for entry in usage_table:
node = entry.get("node")
if node in nodes_list and entry.get(metric) is not None:
try:
value = float(entry.get(metric))
except (TypeError, ValueError):
continue
entries.append((node, value))
if not entries:
continue
max_node, max_val = max(entries, key=lambda item: item[1])
min_node, min_val = min(entries, key=lambda item: item[1])
hottest_parts.append(
f"{hw}={max_node} ({_format_metric_value(str(max_val), percent=True)})"
)
lowest_parts.append(
f"{hw}={min_node} ({_format_metric_value(str(min_val), percent=True)})"
)
if hottest_parts:
lines.append(f"- hottest_{metric}_by_hardware: {', '.join(hottest_parts)}")
if lowest_parts:
lines.append(f"- lowest_{metric}_by_hardware: {', '.join(lowest_parts)}")
if nodes_in_query:
lines.append("- node_details:")
for name in nodes_in_query:
detail = next((n for n in inv if n.get("name") == name), None)
if not detail:
lines.append(f" - {name}: not found in snapshot")
continue
roles = ",".join(detail.get("roles") or []) or "none"
lines.append(
f" - {name}: hardware={detail.get('hardware')}, arch={detail.get('arch')}, "
f"ready={detail.get('ready')}, roles={roles}"
)
workload_entries = _workloads_for_prompt(prompt, workloads or [])
if workload_entries:
lines.append("- workloads:")
for entry in workload_entries:
if not isinstance(entry, dict):
continue
ns = entry.get("namespace") or ""
wl = entry.get("workload") or ""
primary = entry.get("primary_node") or ""
pods_total = entry.get("pods_total")
pods_running = entry.get("pods_running")
label = f"{ns}/{wl}" if ns and wl else (wl or ns)
if not label:
continue
if primary:
lines.append(
f" - {label}: primary_node={primary}, pods_total={pods_total}, pods_running={pods_running}"
)
else:
lines.append(f" - {label}: pods_total={pods_total}, pods_running={pods_running}")
top = max(
(entry for entry in workload_entries if isinstance(entry.get("pods_total"), (int, float))),
key=lambda item: item.get("pods_total", 0),
default=None,
)
if isinstance(top, dict) and top.get("pods_total") is not None:
label = f"{top.get('namespace')}/{top.get('workload')}".strip("/")
lines.append(f"- workload_most_pods: {label} ({top.get('pods_total')})")
zero_running = [
entry
for entry in workload_entries
if isinstance(entry.get("pods_running"), (int, float)) and entry.get("pods_running") == 0
]
if zero_running:
labels = []
for entry in zero_running:
label = f"{entry.get('namespace')}/{entry.get('workload')}".strip("/")
if label:
labels.append(label)
if labels:
lines.append(f"- workloads_zero_running: {', '.join(labels)}")
rendered = "\n".join(lines)
return rendered[:MAX_FACTS_CHARS]
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
not_ready = [node["name"] for node in inventory if node.get("ready") is False]
groups = _group_nodes(inventory)
workers = [node for node in inventory if node.get("is_worker") is True]
worker_names = [node["name"] for node in workers]
worker_ready = [node["name"] for node in workers if node.get("ready") is True]
worker_not_ready = [node["name"] for node in workers if node.get("ready") is False]
expected_workers = expected_worker_nodes_from_metrics()
expected_ready = [n for n in expected_workers if n in ready] if expected_workers else []
expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else []
expected_missing = [n for n in expected_workers if n not in names] if expected_workers else []
return {
"names": sorted(names),
"ready": sorted(ready),
"not_ready": sorted(not_ready),
"groups": groups,
"worker_names": sorted(worker_names),
"worker_ready": sorted(worker_ready),
"worker_not_ready": sorted(worker_not_ready),
"expected_workers": expected_workers,
"expected_ready": sorted(expected_ready),
"expected_not_ready": sorted(expected_not_ready),
"expected_missing": sorted(expected_missing),
}
def _workload_tokens(entry: dict[str, Any]) -> set[str]:
tokens: set[str] = set()
for key in ("workload", "namespace"):
value = entry.get(key)
if isinstance(value, str) and value:
tokens.update(_tokens(value))
return tokens
def _workload_query_target(prompt: str) -> str:
tokens = set(_tokens(prompt))
matches = sorted(tokens & _NAME_INDEX) if _NAME_INDEX else []
return matches[0] if matches else ""
def _select_workload(prompt: str, workloads: list[dict[str, Any]]) -> dict[str, Any] | None:
q_tokens = set(_tokens(prompt))
if not q_tokens:
return None
scored: list[tuple[int, dict[str, Any]]] = []
for entry in workloads:
if not isinstance(entry, dict):
continue
tokens = _workload_tokens(entry)
score = len(tokens & q_tokens)
name = (entry.get("workload") or "").lower()
namespace = (entry.get("namespace") or "").lower()
if name and name in q_tokens:
score += 5
if namespace and namespace in q_tokens:
score += 3
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _format_confidence(answer: str, confidence: str) -> str:
if not answer:
return ""
return f"{answer}\nConfidence: {confidence}."
def workload_answer(prompt: str, workloads: list[dict[str, Any]]) -> str:
q = normalize_query(prompt)
if not any(word in q for word in ("where", "which", "node", "run", "running", "host", "located")):
return ""
target = _workload_query_target(prompt)
entry = _select_workload(prompt, workloads)
if not entry:
return ""
workload = entry.get("workload") or ""
namespace = entry.get("namespace") or ""
if target:
workload_l = str(workload).lower()
namespace_l = str(namespace).lower()
if workload_l != target and namespace_l == target and "namespace" not in q and "workload" not in q:
return ""
nodes = entry.get("nodes") if isinstance(entry.get("nodes"), dict) else {}
primary = entry.get("primary_node") or ""
if not workload or not nodes:
return ""
parts = []
if primary:
parts.append(f"{primary} (primary)")
for node, count in sorted(nodes.items(), key=lambda item: (-item[1], item[0])):
if node == primary:
continue
parts.append(f"{node} ({count} pod{'s' if count != 1 else ''})")
node_text = ", ".join(parts) if parts else primary
answer = f"{workload} runs in {namespace}. Nodes: {node_text}."
return _format_confidence(answer, "medium")
def _snapshot_metrics(snapshot: dict[str, Any] | None) -> dict[str, Any]:
if not snapshot:
return {}
metrics = snapshot.get("metrics")
return metrics if isinstance(metrics, dict) else {}
def _node_usage_top(
usage: list[dict[str, Any]],
*,
allowed_nodes: set[str] | None,
) -> tuple[str, float] | None:
best_node = ""
best_val = None
for item in usage if isinstance(usage, list) else []:
if not isinstance(item, dict):
continue
node = item.get("node") or ""
if allowed_nodes and node not in allowed_nodes:
continue
value = item.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best_val is None or numeric > best_val:
best_val = numeric
best_node = node
if best_node and best_val is not None:
return best_node, best_val
return None
def _node_usage_bottom(
usage: list[dict[str, Any]],
*,
allowed_nodes: set[str] | None,
) -> tuple[str, float] | None:
best_node: str | None = None
best_val: float | None = None
for item in usage:
if not isinstance(item, dict):
continue
node = item.get("node")
if not node or not isinstance(node, str):
continue
if allowed_nodes and node not in allowed_nodes:
continue
value = item.get("value")
try:
numeric = float(value)
except (TypeError, ValueError):
continue
if best_val is None or numeric < best_val:
best_val = numeric
best_node = node
if best_node and best_val is not None:
return best_node, best_val
return None
def snapshot_metric_answer(
prompt: str,
*,
snapshot: dict[str, Any] | None,
inventory: list[dict[str, Any]],
) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
if not metrics:
return ""
q = normalize_query(prompt)
metric = _detect_metric(q)
op = _detect_operation(q)
if op == "list" and metric in {"cpu", "ram", "net", "io"}:
op = "top"
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
allowed_nodes = {node["name"] for node in filtered} if filtered else None
if metric in {"cpu", "ram", "net", "io"} and op in {"top", "bottom", "status", None}:
usage = metrics.get("node_usage", {}).get(metric, [])
pick = _node_usage_bottom if op == "bottom" else _node_usage_top
chosen = pick(usage, allowed_nodes=allowed_nodes)
if chosen:
node, val = chosen
percent = metric in {"cpu", "ram"}
value = _format_metric_value(str(val), percent=percent, rate=metric in {"net", "io"})
scope = ""
if include_hw:
scope = f" among {' and '.join(sorted(include_hw))}"
label = "Lowest" if op == "bottom" else "Hottest"
answer = f"{label} node{scope}: {node} ({value})."
if allowed_nodes and len(allowed_nodes) != len(inventory) and op != "bottom":
overall = _node_usage_top(usage, allowed_nodes=None)
if overall and overall[0] != node:
overall_val = _format_metric_value(
str(overall[1]),
percent=percent,
rate=metric in {"net", "io"},
)
answer += f" Overall hottest: {overall[0]} ({overall_val})."
return _format_confidence(answer, "high")
if metric == "connections" or "postgres" in q:
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
free = max_conn - used
if any(word in q for word in ("free", "available", "remaining", "remain", "left")):
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max ({free:.0f} free).")
else:
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"Hottest DB: {hottest.get('label')} ({hot_val_str}).")
if parts:
return _format_confidence(" ".join(parts), "high")
if metric == "pods":
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
status_terms = ("running", "pending", "failed", "succeeded", "completed")
if ("most pods" in q or ("most" in q and "pod" in q and "node" in q)) and not nodes_in_query:
return _format_confidence(
"I don't have per-node pod counts in the snapshot.",
"medium",
)
if "total" in q or "sum" in q:
values = [v for v in (running, pending, failed, succeeded) if isinstance(v, (int, float))]
if values:
return _format_confidence(f"Total pods: {sum(values):.0f}.", "high")
if "not running" in q or "not in running" in q or "non running" in q:
parts = [v for v in (pending, failed, succeeded) if isinstance(v, (int, float))]
if parts:
return _format_confidence(f"Pods not running: {sum(parts):.0f}.", "high")
if sum(1 for term in status_terms if term in q) > 1:
parts = []
if "running" in q and running is not None:
parts.append(f"running {running:.0f}")
if "pending" in q and pending is not None:
parts.append(f"pending {pending:.0f}")
if "failed" in q and failed is not None:
parts.append(f"failed {failed:.0f}")
if ("succeeded" in q or "completed" in q) and succeeded is not None:
parts.append(f"succeeded {succeeded:.0f}")
if parts:
return _format_confidence(f"Pods: {', '.join(parts)}.", "high")
if "pending" in q and pending is not None:
return _format_confidence(f"Pending pods: {pending:.0f}.", "high")
if "failed" in q and failed is not None:
return _format_confidence(f"Failed pods: {failed:.0f}.", "high")
if "succeeded" in q or "completed" in q:
if succeeded is not None:
return _format_confidence(f"Succeeded pods: {succeeded:.0f}.", "high")
if "running" in q and running is not None:
return _format_confidence(f"Running pods: {running:.0f}.", "high")
parts = []
if running is not None:
parts.append(f"running {running:.0f}")
if pending is not None:
parts.append(f"pending {pending:.0f}")
if failed is not None:
parts.append(f"failed {failed:.0f}")
if succeeded is not None:
parts.append(f"succeeded {succeeded:.0f}")
if parts:
return _format_confidence(f"Pods: {', '.join(parts)}.", "high")
return ""
def structured_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
metrics_summary: str,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
q = normalize_query(prompt)
if not q:
return ""
if workloads:
workload_resp = workload_answer(prompt, workloads)
if workload_resp:
return workload_resp
snap_resp = snapshot_metric_answer(prompt, snapshot=snapshot, inventory=inventory)
if snap_resp:
return snap_resp
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
if op == "list" and metric in {"cpu", "ram", "net", "io"}:
op = "top"
entity = _detect_entity(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
if entity is None and (include_hw or exclude_hw):
entity = "node"
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
role_filters = _detect_role_filters(q)
only_ready: bool | None = None
if (
"not ready" in q
or "notready" in q
or "not-ready" in q
or "unready" in q
or "down" in q
or "missing" in q
):
only_ready = False
elif "ready" in q:
only_ready = True
if entity == "node" and only_ready is not None and op != "count":
op = "status"
if entity == "node" and only_ready is not None and op == "count":
if not any(term in q for term in ("how many", "count", "number")):
op = "status"
if not op and entity == "node":
op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count"
if entity == "node" and "total" in q and "ready" in q:
summary = _nodes_summary_line(inventory, snapshot)
if summary:
return _format_confidence(summary, "high")
if entity == "node" and ("hardware mix" in q or "architecture" in q):
hw_line = _hardware_mix_line(inventory)
if hw_line:
return _format_confidence(hw_line, "high")
if (
entity == "node"
and op == "status"
and metric is None
and not (include_hw or exclude_hw or nodes_in_query or only_workers or role_filters)
):
summary = _nodes_summary_line(inventory, snapshot)
if summary:
return _format_confidence(summary, "high")
if entity == "node" and metric is None and any(word in q for word in ("hardware", "architecture", "class", "mix")):
hw_line = _hardware_mix_line(inventory)
if hw_line:
return _format_confidence(hw_line, "medium")
if (
entity == "node"
and any(term in q for term in ("arm64", "amd64"))
and any(term in q for term in ("mostly", "majority", "more"))
):
arm64_count = len([n for n in inventory if n.get("arch") == "arm64"])
amd64_count = len([n for n in inventory if n.get("arch") == "amd64"])
if arm64_count or amd64_count:
majority = "arm64" if arm64_count >= amd64_count else "amd64"
return _format_confidence(
f"arm64 nodes: {arm64_count}, amd64 nodes: {amd64_count}. Mostly {majority}.",
"high",
)
if op == "top" and metric is None and not any(word in q for word in ("hardware", "architecture", "class")):
metric = "cpu"
# Metrics-first when a metric or top operation is requested.
if metric or op == "top":
entry = _select_metric_entry(tokens, metric=metric, op=op)
if entry and isinstance(entry.get("exprs"), list) and entry["exprs"]:
expr = entry["exprs"][0]
if inventory:
scoped = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
if scoped:
node_regex = "|".join([n["name"] for n in scoped])
expr = _apply_node_filter(expr, node_regex)
res = vm_query(expr, timeout=20)
answer = ""
if op == "top" or "hottest" in (entry.get("panel_title") or "").lower():
node, val = _primary_series_metric(res)
if node and val is not None:
percent = _metric_expr_uses_percent(entry)
rate = metric in {"net", "io"}
value_fmt = _format_metric_value(val or "", percent=percent, rate=rate)
metric_label = (metric or "").upper()
label = f"{metric_label} node" if metric_label else "node"
answer = f"Hottest {label}: {node} ({value_fmt})."
if not answer:
answer = _format_metric_answer(entry, res)
if answer:
scope_parts: list[str] = []
if include_hw:
scope_parts.append(" and ".join(sorted(include_hw)))
if exclude_hw:
scope_parts.append(f"excluding {' and '.join(sorted(exclude_hw))}")
if only_workers:
scope_parts.append("worker")
if scope_parts:
scope = " ".join(scope_parts)
overall_note = ""
base_expr = entry["exprs"][0]
if inventory:
all_nodes = "|".join([n["name"] for n in inventory])
if all_nodes:
base_expr = _apply_node_filter(base_expr, all_nodes)
base_res = vm_query(base_expr, timeout=20)
base_node, base_val = _primary_series_metric(base_res)
scoped_node, scoped_val = _primary_series_metric(res)
if base_node and scoped_node and base_node != scoped_node:
percent = _metric_expr_uses_percent(entry)
rate = metric in {"net", "io"}
base_val_fmt = _format_metric_value(base_val or "", percent=percent, rate=rate)
overall_note = f" Overall hottest node: {base_node} ({base_val_fmt})."
return _format_confidence(f"Among {scope} nodes, {answer}{overall_note}", "high")
return _format_confidence(answer, "high")
if metrics_summary:
return metrics_summary
if entity != "node" or not inventory:
if any(word in q for word in METRIC_HINT_WORDS) and not metrics_summary:
return "I don't have data to answer that right now."
return ""
expected_workers = expected_worker_nodes_from_metrics()
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=only_ready if op in ("status", "count") else None,
nodes_in_query=nodes_in_query,
)
if role_filters:
filtered = [
node
for node in filtered
if role_filters.intersection(set(node.get("roles") or []))
]
names = [node["name"] for node in filtered]
if op == "status":
scope_label = "nodes"
if include_hw:
scope_label = f"{' and '.join(sorted(include_hw))} nodes"
elif only_workers:
scope_label = "worker nodes"
if "missing" in q and ("ready" in q or "readiness" in q):
return _format_confidence(
f"Not ready {scope_label}: " + (", ".join(names) if names else "none") + ".",
"high",
)
if "missing" in q and expected_workers:
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
return _format_confidence(
"Missing nodes: " + (", ".join(missing) if missing else "none") + ".",
"high",
)
if only_ready is False:
return _format_confidence(
f"Not ready {scope_label}: " + (", ".join(names) if names else "none") + ".",
"high",
)
if only_ready is True:
return _format_confidence(
f"Ready {scope_label} ({len(names)}): " + (", ".join(names) if names else "none") + ".",
"high",
)
if op == "count":
scope_label = "nodes"
if include_hw:
scope_label = f"{' and '.join(sorted(include_hw))} nodes"
elif only_workers:
scope_label = "worker nodes"
if only_workers and "ready" in q and ("total" in q or "vs" in q or "versus" in q):
total_workers = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=True,
only_ready=None,
nodes_in_query=nodes_in_query,
)
ready_workers = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=True,
only_ready=True,
nodes_in_query=nodes_in_query,
)
return _format_confidence(
f"Worker nodes ready: {len(ready_workers)} / {len(total_workers)} total.",
"high",
)
if expected_workers and ("expected" in q or "should" in q):
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
msg = f"Grafana inventory expects {len(expected_workers)} worker nodes."
if missing:
msg += f" Missing: {', '.join(missing)}."
return _format_confidence(msg, "high")
if only_ready is True:
return _format_confidence(f"Ready {scope_label}: {len(names)}.", "high")
if only_ready is False:
return _format_confidence(f"Not ready {scope_label}: {len(names)}.", "high")
if not (include_hw or exclude_hw or nodes_in_query or only_workers or role_filters):
return _format_confidence(f"Atlas has {len(names)} nodes.", "high")
return _format_confidence(f"Matching nodes: {len(names)}.", "high")
if op == "list":
if nodes_in_query:
parts = []
existing = {n["name"] for n in inventory}
for node in nodes_in_query:
parts.append(f"{node}: {'present' if node in existing else 'not present'}")
return _format_confidence("Node presence: " + ", ".join(parts) + ".", "high")
if not names:
return _format_confidence("Matching nodes: none.", "high")
shown = names[:30]
suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else ""
return _format_confidence("Matching nodes: " + ", ".join(shown) + suffix + ".", "high")
return ""
def _nodes_summary_line(inventory: list[dict[str, Any]], snapshot: dict[str, Any] | None) -> str:
summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {}
nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {}
total = summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total")
ready = summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready")
not_ready = summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready")
if total is None:
total = len(inventory)
ready = len([n for n in inventory if n.get("ready") is True])
not_ready = len([n for n in inventory if n.get("ready") is False])
if total is None:
return ""
if not_ready:
names = []
summary_names = summary.get("not_ready_names") if isinstance(summary, dict) else []
if isinstance(summary_names, list):
names = [name for name in summary_names if isinstance(name, str)]
if not names and snapshot:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
names = [node.get("name") for node in details if isinstance(node, dict) and node.get("ready") is False]
names = [name for name in names if isinstance(name, str) and name]
suffix = f" (not ready: {', '.join(names)})" if names else ""
return f"Atlas has {total} nodes; {ready} ready, {not_ready} not ready{suffix}."
return f"Atlas has {total} nodes and all are Ready."
def _hardware_mix_line(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
parts: list[str] = []
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"{key}={len(nodes)}")
if not parts:
return ""
return "Hardware mix includes " + ", ".join(parts) + "."
def _os_mix_line(snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
counts: dict[str, int] = collections.Counter()
for node in details:
if not isinstance(node, dict):
continue
os_name = (node.get("os") or "").strip()
if os_name:
counts[os_name] += 1
if not counts or (len(counts) == 1 and "linux" in counts):
return ""
parts = [f"{os_name}={count}" for os_name, count in sorted(counts.items(), key=lambda item: (-item[1], item[0]))]
return "OS mix: " + ", ".join(parts[:5]) + "."
def _pods_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
running = metrics.get("pods_running")
pending = metrics.get("pods_pending")
failed = metrics.get("pods_failed")
succeeded = metrics.get("pods_succeeded")
if running is None and pending is None and failed is None and succeeded is None:
return ""
parts: list[str] = []
if running is not None:
parts.append(f"{running:.0f} running")
if pending is not None:
parts.append(f"{pending:.0f} pending")
if failed is not None:
parts.append(f"{failed:.0f} failed")
if succeeded is not None:
parts.append(f"{succeeded:.0f} succeeded")
return "There are " + ", ".join(parts) + " pods."
def _postgres_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if not postgres:
return ""
used = postgres.get("used")
max_conn = postgres.get("max")
hottest = postgres.get("hottest_db") if isinstance(postgres.get("hottest_db"), dict) else {}
parts: list[str] = []
if used is not None and max_conn is not None:
parts.append(f"{used:.0f}/{max_conn:.0f} connections")
if hottest.get("label"):
hot_val = hottest.get("value")
hot_val_str = _format_metric_value(str(hot_val), percent=False) if hot_val is not None else ""
parts.append(f"hottest {hottest.get('label')} ({hot_val_str})")
if not parts:
return ""
return "Postgres is at " + ", ".join(parts) + "."
def _hottest_summary_line(metrics: dict[str, Any]) -> str:
if not metrics:
return ""
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if not hottest:
return ""
parts: list[str] = []
for key in ("cpu", "ram", "net", "io"):
entry = hottest.get(key) if isinstance(hottest.get(key), dict) else {}
node = entry.get("node")
value = entry.get("value")
if node and value is not None:
value_fmt = _format_metric_value(
str(value),
percent=key in ("cpu", "ram"),
rate=key in ("net", "io"),
)
parts.append(f"{key.upper()} {node} ({value_fmt})")
if not parts:
return ""
return "Hot spots: " + "; ".join(parts) + "."
_FOLLOWUP_HINTS = (
"what about",
"how about",
"and what",
"and how",
"tell me more",
"anything else",
"something else",
"that one",
"those",
"them",
"it",
"this",
"that",
"else",
"another",
"again",
)
def _is_followup_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
if any(hint in q for hint in _FOLLOWUP_HINTS):
return True
if len(q.split()) <= 3 and not any(word in q for word in _INSIGHT_HINT_WORDS):
return True
return False
def _is_subjective_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
return any(word in q for word in _INSIGHT_HINT_WORDS) or any(
phrase in q
for phrase in (
"what do you think",
"your favorite",
"your favourite",
"your opinion",
)
)
def _is_overview_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
return any(word in q for word in _OVERVIEW_HINT_WORDS)
def _doc_intent(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
return any(
phrase in q
for phrase in (
"runbook",
"documentation",
"docs",
"guide",
"how do i",
"how to",
"instructions",
"playbook",
"next step",
"next steps",
"what should",
"what do i",
"what to do",
"troubleshoot",
"triage",
"recover",
"remediate",
)
)
def cluster_overview_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
) -> str:
if not inventory and not snapshot:
return ""
q = normalize_query(prompt)
metrics = _snapshot_metrics(snapshot)
sentences: list[str] = []
nodes_line = _nodes_summary_line(inventory, snapshot)
if nodes_line:
sentences.append(nodes_line)
wants_overview = _is_overview_query(q) or any(word in q for word in ("atlas", "cluster", "titan", "lab"))
wants_hardware = any(word in q for word in ("hardware", "architecture", "nodes", "node")) or wants_overview
wants_metrics = any(
word in q
for word in (
"status",
"health",
"overview",
"summary",
"pods",
"postgres",
"connections",
"hottest",
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"busy",
"load",
"usage",
"utilization",
)
) or wants_overview
if wants_hardware:
hw_line = _hardware_mix_line(inventory)
if hw_line:
sentences.append(hw_line)
os_line = _os_mix_line(snapshot)
if os_line:
sentences.append(os_line)
if wants_metrics:
pods_line = _pods_summary_line(metrics)
if pods_line:
sentences.append(pods_line)
postgres_line = _postgres_summary_line(metrics)
if postgres_line:
sentences.append(postgres_line)
hottest_line = _hottest_summary_line(metrics)
if hottest_line:
sentences.append(hottest_line)
if not sentences:
return ""
if len(sentences) > 3 and not wants_overview:
sentences = sentences[:3]
return "Based on the latest snapshot, " + " ".join(sentences)
def cluster_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
history_lines: list[str] | None = None,
) -> str:
metrics_summary = snapshot_context(prompt, snapshot)
structured = structured_answer(
prompt,
inventory=inventory,
metrics_summary=metrics_summary,
snapshot=snapshot,
workloads=workloads,
)
if structured:
return structured
q = normalize_query(prompt)
workload_target = _workload_query_target(prompt)
if workload_target and any(word in q for word in ("where", "run", "running", "host", "node")):
return _format_confidence(
f"I don't have workload placement data for {workload_target} in the current snapshot.",
"low",
)
overview = cluster_overview_answer(prompt, inventory=inventory, snapshot=snapshot)
if overview:
kb_titles = kb_retrieve_titles(prompt, limit=4) if _doc_intent(prompt) else ""
if kb_titles:
overview = overview + "\n" + kb_titles
return _format_confidence(overview, "medium")
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
return _format_confidence(kb_titles, "low")
if metrics_summary:
return _format_confidence(metrics_summary, "low")
return ""
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", ""
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
return context, ""
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
_SNAPSHOT_CACHE: dict[str, Any] = {"payload": None, "ts": 0.0}
def _snapshot_state() -> dict[str, Any] | None:
now = time.monotonic()
cached = _SNAPSHOT_CACHE.get("payload")
ts = _SNAPSHOT_CACHE.get("ts") or 0.0
if cached and now - ts < max(5, SNAPSHOT_TTL_SEC):
return cached
payload = _ariadne_state(timeout=10)
if isinstance(payload, dict) and payload:
_SNAPSHOT_CACHE["payload"] = payload
_SNAPSHOT_CACHE["ts"] = now
return payload
return cached if isinstance(cached, dict) else None
def _snapshot_inventory(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
items = snapshot.get("nodes_detail")
if not isinstance(items, list):
return []
inventory: list[dict[str, Any]] = []
for node in items:
if not isinstance(node, dict):
continue
labels = node.get("labels") if isinstance(node.get("labels"), dict) else {}
name = node.get("name") or ""
if not name:
continue
hardware = node.get("hardware") or _hardware_class(labels)
inventory.append(
{
"name": name,
"arch": node.get("arch") or labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": hardware,
"roles": node.get("roles") or [],
"is_worker": node.get("is_worker") is True,
"ready": node.get("ready") is True,
}
)
return sorted(inventory, key=lambda item: item["name"])
def _snapshot_workloads(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
if not snapshot:
return []
workloads = snapshot.get("workloads")
return workloads if isinstance(workloads, list) else []
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if parsed:
items = list(parsed.items())
if len(items) == 1:
label, value = items[0]
return f"{panel}: {label} = {value}."
compact = "; ".join(f"{k}={v}" for k, v in items)
return f"{panel}: {compact}."
if panel_l:
return f"{panel}: {summary}"
return summary
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status(inventory: list[dict[str, Any]] | None = None) -> tuple[list[str], list[str]]:
if inventory is None:
inventory = node_inventory()
ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is True]
not_ready_nodes = [n["name"] for n in inventory if n.get("is_worker") and n.get("ready") is False]
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def _context_fallback(context: str) -> str:
if not context:
return ""
trimmed = context.strip()
if len(trimmed) > MAX_TOOL_CHARS:
trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..."
return "Here is what I found:\n" + trimmed
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return _ensure_confidence(text)
return _ensure_confidence(text)
def _history_payload_lines(history_payload: list[Any]) -> list[str]:
lines: list[str] = []
if not isinstance(history_payload, list):
return lines
for item in history_payload[-12:]:
if isinstance(item, dict):
for key in ("content", "message", "text", "prompt", "question", "body", "answer", "reply", "response"):
val = item.get(key)
if isinstance(val, str) and val.strip():
lines.append(val.strip())
elif isinstance(item, str) and item.strip():
lines.append(item.strip())
return [line for line in lines if line]
def _append_history_context(context: str, history_lines: list[str]) -> str:
lines = [line.strip() for line in history_lines if isinstance(line, str) and line.strip()]
if not lines:
return context
snippet = "\n".join(lines[-6:])
combined = context + "\nRecent chat:\n" + snippet if context else "Recent chat:\n" + snippet
if len(combined) > MAX_CONTEXT_CHARS:
combined = combined[: MAX_CONTEXT_CHARS - 3].rstrip() + "..."
return combined
class ThoughtState:
def __init__(self, total_steps: int = 0):
self._lock = threading.Lock()
self.stage = "starting"
self.note = ""
self.step = 0
self.total_steps = total_steps
def update(self, stage: str, *, note: str = "", step: int | None = None) -> None:
with self._lock:
self.stage = stage
if note:
self.note = note
if step is not None:
self.step = step
def status_line(self) -> str:
with self._lock:
stage = self.stage
note = self.note
step = self.step
total = self.total_steps
step_part = f"{step}/{total}" if total else str(step) if step else ""
detail = f"Stage {step_part}: {stage}".strip()
if note:
return f"Still thinking ({detail}). Latest insight: {note}"
return f"Still thinking ({detail})."
def _ollama_json_call(
prompt: str,
*,
context: str,
retries: int = 2,
model: str | None = None,
) -> dict[str, Any]:
system = (
"System: You are Atlas, a reasoning assistant. "
"Return strict JSON only (no code fences, no trailing commentary). "
"If you cannot comply, return {}. "
"Only use facts from the provided context. "
"If you make an inference, label it as 'inference' in the JSON."
)
last_exc: Exception | None = None
for attempt in range(max(1, retries + 1)):
try:
raw = _ollama_call(
("json", "internal"),
prompt,
context=context,
use_history=False,
system_override=system,
model=model,
)
cleaned = _strip_code_fence(raw).strip()
if cleaned.startswith("{") and cleaned.endswith("}"):
return json.loads(cleaned)
last = json.loads(_strip_code_fence(cleaned))
if isinstance(last, dict):
return last
except Exception as exc: # noqa: BLE001
last_exc = exc
time.sleep(min(2, 2 ** attempt))
if last_exc:
return {}
return {}
def _fact_pack_lines(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> list[str]:
raw = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
lines: list[str] = []
for line in raw.splitlines():
trimmed = line.strip()
if not trimmed or trimmed.lower().startswith("facts"):
continue
lines.append(trimmed)
if _knowledge_intent(prompt) or _doc_intent(prompt) or _is_overview_query(prompt):
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
for kb_line in kb_titles.splitlines():
if kb_line.strip():
lines.append(kb_line.strip())
return lines
def _fact_pack_text(lines: list[str], fact_meta: dict[str, dict[str, Any]]) -> str:
labeled: list[str] = []
for idx, line in enumerate(lines):
fid = f"F{idx + 1}"
tags = fact_meta.get(fid, {}).get("tags") or []
tag_text = f" [tags: {', '.join(tags)}]" if tags else ""
labeled.append(f"{fid}{tag_text}: {line}")
return "Fact pack:\n" + "\n".join(labeled)
def _tool_fact_lines(prompt: str, *, allow_tools: bool) -> list[str]:
if not allow_tools:
return []
metrics_context, _ = metrics_query_context(prompt, allow_tools=True)
lines: list[str] = []
if metrics_context:
for line in metrics_context.splitlines():
trimmed = line.strip()
if trimmed:
lines.append(f"tool_metrics: {trimmed}")
return lines
_ALLOWED_INSIGHT_TAGS = {
"availability",
"architecture",
"database",
"hardware",
"inventory",
"node_detail",
"os",
"pods",
"utilization",
"workloads",
"workers",
}
_DYNAMIC_TAGS = {"availability", "database", "pods", "utilization", "workloads"}
_INVENTORY_TAGS = {"hardware", "architecture", "inventory", "workers", "node_detail", "os"}
def _fact_line_tags(line: str) -> set[str]:
text = (line or "").lower()
tags: set[str] = set()
if any(key in text for key in ("nodes_total", "ready", "not_ready", "workers_ready", "workers_not_ready")):
tags.add("availability")
if "nodes_by_arch" in text or "arch " in text or "architecture" in text:
tags.add("architecture")
if any(key in text for key in ("rpi", "jetson", "amd64", "arm64", "non_raspberry_pi")):
tags.update({"hardware", "inventory"})
if "control_plane_nodes" in text or "control_plane_by_hardware" in text or "worker_nodes" in text:
tags.add("inventory")
if any(key in text for key in ("hottest_", "lowest_", "node_usage", "cpu=", "ram=", "net=", "io=")):
tags.add("utilization")
if "postgres_" in text or "postgres connections" in text:
tags.add("database")
if "pods_" in text or "pod phases" in text or "restarts" in text:
tags.add("pods")
if "workloads" in text or "primary_node" in text or "workload_" in text:
tags.add("workloads")
if "node_details" in text:
tags.add("node_detail")
if "os mix" in text or "os" in text:
tags.add("os")
return tags & _ALLOWED_INSIGHT_TAGS
def _fact_pack_meta(lines: list[str]) -> dict[str, dict[str, Any]]:
meta: dict[str, dict[str, Any]] = {}
for idx, line in enumerate(lines):
fid = f"F{idx + 1}"
tags = sorted(_fact_line_tags(line))
meta[fid] = {"tags": tags}
return meta
def _history_tags(history_lines: list[str]) -> set[str]:
tags: set[str] = set()
for line in history_lines[-6:]:
tags.update(_fact_line_tags(line))
return tags & _ALLOWED_INSIGHT_TAGS
def _normalize_fraction(value: Any, *, default: float = 0.5) -> float:
if isinstance(value, (int, float)):
score = float(value)
if score > 1:
score = score / 100.0
return max(0.0, min(1.0, score))
return default
def _seed_insights(
lines: list[str],
fact_meta: dict[str, dict[str, Any]],
*,
limit: int = 6,
) -> list[dict[str, Any]]:
priority = [
"utilization",
"database",
"pods",
"workloads",
"availability",
"hardware",
"architecture",
"inventory",
]
seeds: list[dict[str, Any]] = []
used_tags: set[str] = set()
for tag in priority:
for idx, line in enumerate(lines):
fid = f"F{idx + 1}"
tags = set(fact_meta.get(fid, {}).get("tags") or [])
if tag not in tags or fid in {s["fact_ids"][0] for s in seeds}:
continue
summary = line.lstrip("- ").strip()
seeds.append(
{
"summary": summary,
"fact_ids": [fid],
"relevance": 0.5,
"novelty": 0.5,
"rationale": "seeded from fact pack",
"tags": sorted(tags),
}
)
used_tags.update(tags)
if len(seeds) >= limit:
return seeds
return seeds
def _insight_tags(insight: dict[str, Any], fact_meta: dict[str, dict[str, Any]]) -> set[str]:
tags: set[str] = set()
for fid in insight.get("fact_ids") if isinstance(insight.get("fact_ids"), list) else []:
tags.update(fact_meta.get(fid, {}).get("tags") or [])
raw_tags = insight.get("tags") if isinstance(insight.get("tags"), list) else []
tags.update(t for t in raw_tags if isinstance(t, str))
summary = insight.get("summary") or insight.get("claim") or ""
if isinstance(summary, str):
tags.update(_fact_line_tags(summary))
return tags & _ALLOWED_INSIGHT_TAGS
def _insight_score(
insight: dict[str, Any],
*,
preference: str,
prefer_tags: set[str],
avoid_tags: set[str],
history_tags: set[str],
fact_meta: dict[str, dict[str, Any]],
) -> float:
base = _score_insight(insight, preference)
tags = _insight_tags(insight, fact_meta)
if prefer_tags and tags:
base += 0.15 * len(tags & prefer_tags)
if avoid_tags and tags:
base -= 0.12 * len(tags & avoid_tags)
if history_tags and tags:
base -= 0.08 * len(tags & history_tags)
if preference == "novelty":
if tags & _DYNAMIC_TAGS:
base += 0.12
if tags & _INVENTORY_TAGS:
base -= 0.08
return base
def _score_insight(insight: dict[str, Any], preference: str) -> float:
relevance = _normalize_fraction(insight.get("relevance"), default=0.6)
novelty = _normalize_fraction(insight.get("novelty"), default=0.5)
if preference == "novelty":
return novelty * 0.6 + relevance * 0.4
return relevance * 0.6 + novelty * 0.4
def _select_diverse_insights(
candidates: list[dict[str, Any]],
*,
preference: str,
prefer_tags: set[str],
avoid_tags: set[str],
history_tags: set[str],
fact_meta: dict[str, dict[str, Any]],
count: int = 2,
) -> list[dict[str, Any]]:
scored: list[tuple[float, dict[str, Any]]] = []
for item in candidates:
tags = _insight_tags(item, fact_meta)
item["tags"] = sorted(tags)
score = _insight_score(
item,
preference=preference,
prefer_tags=prefer_tags,
avoid_tags=avoid_tags,
history_tags=history_tags,
fact_meta=fact_meta,
)
scored.append((score, item))
scored.sort(key=lambda pair: pair[0], reverse=True)
picked: list[dict[str, Any]] = []
used_tags: set[str] = set()
for _, item in scored:
tags = set(item.get("tags") or [])
if used_tags and tags and tags <= used_tags and len(picked) < count:
continue
picked.append(item)
used_tags.update(tags)
if len(picked) >= count:
break
if len(picked) < count:
for _, item in scored:
if item in picked:
continue
picked.append(item)
if len(picked) >= count:
break
return picked
def _open_ended_system() -> str:
return (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Use ONLY the provided fact pack and recent chat as your evidence. "
"You may draw light inferences if you label them as such. "
"Write concise, human sentences with a helpful, calm tone (not a list). "
"If the question is subjective (cool/interesting/unconventional), pick a standout fact and explain why it stands out. "
"If the question asks for a list, embed the list inline in a sentence (comma-separated). "
"If the question is ambiguous, pick a reasonable interpretation and state it briefly. "
"Avoid repeating the exact same observation as the last response if possible; vary across metrics, workload, or hardware details. "
"Do not invent numbers or facts. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), HallucinationRisk (low|medium|high)."
)
def _ollama_call_safe(
hist_key,
prompt: str,
*,
context: str,
fallback: str,
system_override: str | None = None,
model: str | None = None,
) -> str:
try:
return _ollama_call(
hist_key,
prompt,
context=context,
use_history=False,
system_override=system_override,
model=model,
)
except Exception:
return fallback
def _candidate_note(candidate: dict[str, Any]) -> str:
claim = str(candidate.get("focus") or candidate.get("answer") or "")
return claim[:160] + ("" if len(claim) > 160 else "")
def _ensure_scores(answer: str) -> str:
text = answer.strip()
lines = [line.strip() for line in text.splitlines() if line.strip()]
score_map: dict[str, str] = {}
body_lines: list[str] = []
def _score_key(line: str) -> str:
cleaned = line.strip().lstrip("-•* ").strip()
return cleaned.lower()
def _extract_value(line: str) -> str:
cleaned = line.strip().lstrip("-•* ").strip()
if ":" in cleaned:
return cleaned.split(":", 1)[1].strip()
parts = cleaned.split()
return parts[1] if len(parts) > 1 else ""
def _record_score(key: str, value: str):
if not value:
return
score_map.setdefault(key, value)
for line in lines:
cleaned = line.strip().lstrip("-•* ").strip()
lowered = cleaned.lower()
if lowered.startswith("confidence,") or (
"confidence" in lowered and "relevance" in lowered and "satisfaction" in lowered
):
for key in ("confidence", "relevance", "satisfaction"):
match = re.search(rf"{key}\\s*[:=]?\\s*(\\d{{1,3}}|high|medium|low)", lowered)
if match:
_record_score(key, match.group(1))
risk_match = re.search(r"hallucination\\s*risk\\s*[:=]?\\s*(low|medium|high)", lowered)
if risk_match:
_record_score("hallucinationrisk", risk_match.group(1))
continue
if lowered.startswith("confidence"):
_record_score("confidence", _extract_value(cleaned))
continue
if lowered.startswith("relevance"):
_record_score("relevance", _extract_value(cleaned))
continue
if lowered.startswith("satisfaction"):
_record_score("satisfaction", _extract_value(cleaned))
continue
if lowered.replace(" ", "").startswith("hallucinationrisk") or lowered.startswith(
"hallucination risk"
):
_record_score("hallucinationrisk", _extract_value(cleaned))
continue
body_lines.append(line)
confidence = score_map.get("confidence") or "medium"
relevance = score_map.get("relevance") or "70"
satisfaction = score_map.get("satisfaction") or "70"
risk = score_map.get("hallucinationrisk") or "low"
final_lines = body_lines + [
f"Confidence: {confidence}",
f"Relevance: {relevance}",
f"Satisfaction: {satisfaction}",
f"HallucinationRisk: {risk}",
]
return "\n".join(final_lines)
def _open_ended_plan(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
count: int,
state: ThoughtState | None,
model: str | None,
) -> list[dict[str, Any]]:
if state:
state.update("planning", step=1, note="mapping angles")
count = max(1, count)
prompt_text = (
"Analyze the question and propose up to "
f"{count} distinct answer angles that can be supported by the fact pack. "
"Keep them diverse (e.g., metrics, hardware, workload placement, recent changes). "
"If the question is subjective, propose at least one angle that surfaces a standout detail. "
"Avoid repeating the same angle as the most recent response if possible. "
"Return JSON: {\"angles\":[{\"focus\":\"...\",\"reason\":\"...\",\"priority\":1-5}]}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
prompt_text + f" Question: {prompt}",
context=context,
model=model,
)
angles = result.get("angles") if isinstance(result, dict) else None
cleaned: list[dict[str, Any]] = []
seen: set[str] = set()
if isinstance(angles, list):
for item in angles:
if not isinstance(item, dict):
continue
focus = str(item.get("focus") or "").strip()
if not focus or focus.lower() in seen:
continue
seen.add(focus.lower())
priority = item.get("priority")
if not isinstance(priority, (int, float)):
priority = 3
cleaned.append(
{
"focus": focus,
"reason": str(item.get("reason") or ""),
"priority": int(max(1, min(5, priority))),
}
)
if not cleaned:
cleaned = [{"focus": "Direct answer", "reason": "Default fallback", "priority": 3}]
cleaned.sort(key=lambda item: item.get("priority", 3), reverse=True)
if state:
state.update("planning", step=1, note=_candidate_note(cleaned[0]))
return cleaned
def _sanitize_focus_tags(raw_tags: list[Any]) -> list[str]:
tags: list[str] = []
for tag in raw_tags:
if not isinstance(tag, str):
continue
tag = tag.strip()
if tag in _ALLOWED_INSIGHT_TAGS and tag not in tags:
tags.append(tag)
return tags
def _open_ended_interpret(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
state: ThoughtState | None,
model: str | None,
) -> dict[str, Any]:
if state:
state.update("interpreting", step=1, note="reading question")
allowed_tags = ", ".join(sorted(_ALLOWED_INSIGHT_TAGS))
prompt_text = (
"Classify how to answer the question using only the fact pack. "
"Return JSON: {\"style\":\"objective|subjective\","
"\"tone\":\"neutral|curious|enthusiastic\","
"\"focus_tags\":[\"tag\"],"
"\"focus_label\":\"short phrase\","
"\"allow_list\":true|false}. "
"Use allow_list=true only if the question explicitly asks for names or lists. "
f"Only use tags from: {allowed_tags}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
prompt_text + f" Question: {prompt}",
context=context,
model=model,
)
if not isinstance(result, dict):
result = {}
style = str(result.get("style") or "").strip().lower()
if style not in ("objective", "subjective"):
style = "subjective" if _is_subjective_query(prompt) else "objective"
tone = str(result.get("tone") or "neutral").strip().lower()
if tone not in ("neutral", "curious", "enthusiastic"):
tone = "neutral"
focus_tags = _sanitize_focus_tags(result.get("focus_tags") or [])
focus_label = str(result.get("focus_label") or "").strip()
allow_list = result.get("allow_list")
if not isinstance(allow_list, bool):
q = normalize_query(prompt)
allow_list = any(phrase in q for phrase in ("list", "which", "what are", "names"))
return {
"style": style,
"tone": tone,
"focus_tags": focus_tags,
"focus_label": focus_label,
"allow_list": allow_list,
}
def _preferred_tags_for_prompt(prompt: str) -> set[str]:
q = normalize_query(prompt)
tags: set[str] = set()
if any(word in q for word in ("cpu", "ram", "memory", "net", "network", "io", "disk", "hottest", "busy", "usage", "utilization", "load")):
tags.add("utilization")
if any(word in q for word in ("postgres", "database", "db", "connections")):
tags.add("database")
if any(word in q for word in ("pod", "pods", "deployment", "job", "cronjob")):
tags.add("pods")
if any(word in q for word in ("workload", "service", "namespace")):
tags.add("workloads")
if any(word in q for word in ("ready", "not ready", "down", "unreachable", "availability")):
tags.add("availability")
if any(word in q for word in ("node", "nodes", "hardware", "arch", "architecture", "rpi", "jetson", "amd64", "arm64", "worker", "control-plane")):
tags.update({"hardware", "inventory", "architecture"})
return tags & _ALLOWED_INSIGHT_TAGS
def _open_ended_insights(
prompt: str,
*,
fact_pack: str,
fact_meta: dict[str, dict[str, Any]],
history_lines: list[str],
count: int,
state: ThoughtState | None,
model: str | None,
) -> list[dict[str, Any]]:
if state:
state.update("analyzing", note="scouting insights")
count = max(1, count)
allowed_tags = ", ".join(sorted(_ALLOWED_INSIGHT_TAGS))
prompt_text = (
"Review the fact pack and propose up to "
f"{count} insights that could answer the question. "
"Each insight should be grounded in the facts. "
"Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"],"
"\"relevance\":0-1,\"novelty\":0-1,\"tags\":[\"tag\"],\"rationale\":\"...\"}]}. "
f"Only use tags from: {allowed_tags}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
prompt_text + f" Question: {prompt}",
context=context,
model=model,
)
insights = result.get("insights") if isinstance(result, dict) else None
cleaned: list[dict[str, Any]] = []
valid_ids = set(fact_meta.keys())
if isinstance(insights, list):
for item in insights:
if not isinstance(item, dict):
continue
summary = str(item.get("summary") or item.get("claim") or "").strip()
if not summary:
continue
raw_ids = item.get("fact_ids") if isinstance(item.get("fact_ids"), list) else []
fact_ids = [fid for fid in raw_ids if isinstance(fid, str) and fid in valid_ids]
if not fact_ids:
continue
cleaned.append(
{
"summary": summary,
"fact_ids": fact_ids,
"relevance": _normalize_fraction(item.get("relevance"), default=0.6),
"novelty": _normalize_fraction(item.get("novelty"), default=0.5),
"rationale": str(item.get("rationale") or ""),
"tags": [t for t in (item.get("tags") or []) if isinstance(t, str)],
}
)
if cleaned and state:
state.update("analyzing", note=_candidate_note(cleaned[0]))
return cleaned
def _fallback_fact_ids(
fact_meta: dict[str, dict[str, Any]],
*,
focus_tags: set[str],
count: int,
) -> list[str]:
if not fact_meta:
return []
if focus_tags:
tagged = [
fid
for fid, meta in fact_meta.items()
if focus_tags & set(meta.get("tags") or [])
]
if tagged:
return tagged[:count]
return list(fact_meta.keys())[:count]
def _open_ended_select_facts(
prompt: str,
*,
fact_pack: str,
fact_meta: dict[str, dict[str, Any]],
history_lines: list[str],
focus_tags: set[str],
avoid_fact_ids: list[str],
count: int,
subjective: bool,
state: ThoughtState | None,
step: int,
model: str | None,
) -> list[str]:
if state:
state.update("selecting facts", step=step, note="picking evidence")
focus_hint = ", ".join(sorted(focus_tags)) if focus_tags else "any"
avoid_hint = ", ".join(avoid_fact_ids) if avoid_fact_ids else "none"
prompt_text = (
"Select the fact IDs that best answer the question. "
f"Pick up to {count} fact IDs. "
f"Focus tags: {focus_hint}. "
f"Avoid these fact IDs: {avoid_hint}. "
"If the question is subjective, pick standout or unusual facts; "
"if objective, pick the minimal facts needed. "
"Return JSON: {\"fact_ids\":[\"F1\"...],\"note\":\"...\"}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
prompt_text + f" Question: {prompt}",
context=context,
model=model,
)
fact_ids = result.get("fact_ids") if isinstance(result, dict) else None
selected: list[str] = []
if isinstance(fact_ids, list):
for fid in fact_ids:
if isinstance(fid, str) and fid in fact_meta and fid not in selected:
selected.append(fid)
if len(selected) >= count:
break
seed = _fallback_fact_ids(fact_meta, focus_tags=focus_tags, count=count)
if selected:
for fid in seed:
if fid not in selected:
selected.append(fid)
if len(selected) >= count:
break
else:
selected = seed
return selected
def _normalize_score(value: Any, *, default: int = 60) -> int:
if isinstance(value, (int, float)):
return int(max(0, min(100, value)))
return default
def _confidence_score(value: Any) -> int:
text = str(value or "").strip().lower()
if text.startswith("high"):
return 85
if text.startswith("low"):
return 35
return 60
def _risk_penalty(value: Any) -> int:
text = str(value or "").strip().lower()
if text.startswith("high"):
return 20
if text.startswith("medium"):
return 10
return 0
def _open_ended_candidate(
prompt: str,
*,
focus: str,
fact_pack: str,
history_lines: list[str],
subjective: bool,
tone: str,
allow_list: bool,
state: ThoughtState | None,
step: int,
fact_hints: list[str] | None = None,
model: str | None = None,
) -> dict[str, Any]:
if state:
state.update("drafting", step=step, note=focus)
hint_text = ""
if fact_hints:
hint_text = " Prioritize these fact IDs if relevant: " + ", ".join(fact_hints) + "."
style_hint = (
"Offer a brief opinion grounded in facts and explain why it stands out. "
if subjective
else "Answer directly and succinctly. "
)
list_hint = (
"If a list is requested, embed it inline in a sentence (comma-separated). "
if allow_list
else "Avoid bullet lists. "
)
prompt_text = (
"Using ONLY the fact pack, answer the question focusing on this angle: "
f"{focus}. "
f"Tone: {tone}. "
+ style_hint
+ list_hint
+ "Write 2-4 sentences in plain prose."
+ hint_text
+ " "
"If you infer, label it as inference. "
"List which fact pack IDs you used. "
"Return JSON: {\"answer\":\"...\",\"facts_used\":[\"F1\"],\"confidence\":\"high|medium|low\","
"\"relevance\":0-100,\"satisfaction\":0-100,\"risk\":\"low|medium|high\"}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
prompt_text + f" Question: {prompt}",
context=context,
model=model,
)
if not isinstance(result, dict):
result = {}
answer = str(result.get("answer") or "").strip()
if not answer:
answer = "I don't have enough data to answer that from the current snapshot."
facts_used = result.get("facts_used")
if not isinstance(facts_used, list):
facts_used = []
candidate = {
"focus": focus,
"answer": answer,
"facts_used": facts_used,
"confidence": result.get("confidence", "medium"),
"relevance": _normalize_score(result.get("relevance"), default=60),
"satisfaction": _normalize_score(result.get("satisfaction"), default=60),
"risk": result.get("risk", "medium"),
}
candidate["score"] = _candidate_score(candidate)
return candidate
def _candidate_score(candidate: dict[str, Any]) -> float:
relevance = _normalize_score(candidate.get("relevance"), default=60)
satisfaction = _normalize_score(candidate.get("satisfaction"), default=60)
confidence = _confidence_score(candidate.get("confidence"))
score = relevance * 0.45 + satisfaction * 0.35 + confidence * 0.2
if not candidate.get("facts_used"):
score -= 5
return score - _risk_penalty(candidate.get("risk"))
def _select_candidates(candidates: list[dict[str, Any]], *, count: int) -> list[dict[str, Any]]:
if not candidates:
return []
ranked = sorted(candidates, key=lambda item: item.get("score", 0), reverse=True)
picked: list[dict[str, Any]] = []
seen_focus: set[str] = set()
for item in ranked:
focus = str(item.get("focus") or "").strip().lower()
if focus and focus in seen_focus:
continue
picked.append(item)
if focus:
seen_focus.add(focus)
if len(picked) >= count:
break
return picked or ranked[:count]
def _open_ended_synthesize(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
candidates: list[dict[str, Any]],
subjective: bool,
tone: str,
allow_list: bool,
state: ThoughtState | None,
step: int,
model: str | None,
critique: str | None = None,
) -> str:
if state:
state.update("synthesizing", step=step, note="composing answer")
critique_block = f"\nCritique guidance: {critique}\n" if critique else "\n"
style_hint = (
"If the question is subjective, share a light opinion grounded in facts and explain why it stands out. "
if subjective
else "Answer directly without extra caveats. "
)
list_hint = (
"If a list is requested, embed it inline in a sentence (comma-separated). "
if allow_list
else "Avoid bullet lists. "
)
synth_prompt = (
"Compose the final answer to the question using the candidate answers below. "
"Select the best 1-2 candidates, blend them if helpful, and keep 2-4 sentences. "
"Use only the fact pack as evidence. "
"If you infer, label it as inference. "
"Do not claim nodes are missing or not ready unless the fact pack explicitly lists "
"nodes_not_ready or expected_workers_missing. "
f"Tone: {tone}. "
+ style_hint
+ list_hint
+ "Keep the tone conversational and answer the user's intent directly. "
"Avoid repeating the last response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), "
"HallucinationRisk (low|medium|high).\n"
f"Question: {prompt}\n"
f"{critique_block}"
f"Candidates: {json.dumps(candidates, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call_safe(
("open", "synth"),
synth_prompt,
context=context,
fallback="I don't have enough data to answer that.",
system_override=_open_ended_system(),
model=model,
)
return _ensure_scores(reply)
def _open_ended_critique(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
candidates: list[dict[str, Any]],
state: ThoughtState | None,
step: int,
model: str | None,
) -> str:
if state:
state.update("reviewing", step=step, note="quality check")
critique_prompt = (
"Review the candidate answers against the fact pack. "
"Identify any missing important detail or risky inference and give one sentence of guidance. "
"Return JSON: {\"guidance\":\"...\",\"risk\":\"low|medium|high\"}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(
critique_prompt + f" Question: {prompt} Candidates: {json.dumps(candidates, ensure_ascii=False)}",
context=context,
model=model,
)
if isinstance(result, dict):
guidance = str(result.get("guidance") or "").strip()
if guidance:
return guidance
return ""
def _open_ended_multi(
prompt: str,
*,
fact_pack: str,
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
history_lines: list[str],
mode: str,
state: ThoughtState | None = None,
) -> str:
model = _model_for_mode(mode)
if mode == "fast":
total_steps = 4
else:
total_steps = 7
if state:
state.total_steps = total_steps
interpretation = _open_ended_interpret(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
state=state,
model=model,
)
style = interpretation.get("style") or "objective"
subjective = style == "subjective" or _is_subjective_query(prompt)
tone = str(interpretation.get("tone") or "").strip().lower()
if tone not in ("neutral", "curious", "enthusiastic"):
tone = "curious" if subjective else "neutral"
allow_list = bool(interpretation.get("allow_list"))
focus_tags = set(interpretation.get("focus_tags") or []) or _preferred_tags_for_prompt(prompt)
if not focus_tags and subjective:
focus_tags = set(_ALLOWED_INSIGHT_TAGS)
primary_ids = _open_ended_select_facts(
prompt,
fact_pack=fact_pack,
fact_meta=fact_meta,
history_lines=history_lines,
focus_tags=focus_tags,
avoid_fact_ids=[],
count=4 if mode == "deep" else 3,
subjective=subjective,
state=state,
step=2,
model=model,
)
alternate_ids: list[str] = []
if mode == "deep":
alternate_ids = _open_ended_select_facts(
prompt,
fact_pack=fact_pack,
fact_meta=fact_meta,
history_lines=history_lines,
focus_tags=focus_tags,
avoid_fact_ids=primary_ids,
count=4,
subjective=subjective,
state=state,
step=3,
model=model,
)
candidates: list[dict[str, Any]] = []
focus_label = interpretation.get("focus_label") or "primary angle"
step = 3 if mode == "fast" else 4
candidates.append(
_open_ended_candidate(
prompt,
focus=str(focus_label),
fact_pack=fact_pack,
history_lines=history_lines,
subjective=subjective,
tone=str(tone),
allow_list=allow_list,
state=state,
step=step,
fact_hints=primary_ids,
model=model,
)
)
step += 1
if mode == "deep" and alternate_ids:
candidates.append(
_open_ended_candidate(
prompt,
focus="alternate angle",
fact_pack=fact_pack,
history_lines=history_lines,
subjective=subjective,
tone=str(tone),
allow_list=allow_list,
state=state,
step=step,
fact_hints=alternate_ids,
model=model,
)
)
step += 1
if state:
state.update("evaluating", step=step, note="ranking candidates")
selected = _select_candidates(candidates, count=1 if mode == "fast" else 2)
step += 1
critique = ""
if mode == "deep":
critique = _open_ended_critique(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
candidates=selected or candidates,
state=state,
step=step,
model=model,
)
step += 1
reply = _open_ended_synthesize(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
candidates=selected or candidates,
subjective=subjective,
tone=str(tone),
allow_list=allow_list,
state=state,
step=step,
model=model,
critique=critique,
)
if state:
state.update("done", step=total_steps)
return reply
def _open_ended_total_steps(mode: str) -> int:
if mode == "fast":
return 4
return 7
def _open_ended_fast(
prompt: str,
*,
fact_pack: str,
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
history_lines: list[str],
state: ThoughtState | None = None,
) -> str:
return _open_ended_multi(
prompt,
fact_pack=fact_pack,
fact_lines=fact_lines,
fact_meta=fact_meta,
history_lines=history_lines,
mode="fast",
state=state,
)
def _open_ended_deep(
prompt: str,
*,
fact_pack: str,
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
history_lines: list[str],
state: ThoughtState | None = None,
) -> str:
return _open_ended_multi(
prompt,
fact_pack=fact_pack,
fact_lines=fact_lines,
fact_meta=fact_meta,
history_lines=history_lines,
mode="deep",
state=state,
)
def open_ended_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]],
history_lines: list[str],
mode: str,
allow_tools: bool,
state: ThoughtState | None = None,
) -> str:
lines = _fact_pack_lines(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if _knowledge_intent(prompt) or _doc_intent(prompt):
kb_detail = kb_retrieve(prompt)
if kb_detail:
for line in kb_detail.splitlines():
if line.strip():
lines.append(line.strip())
tool_lines = _tool_fact_lines(prompt, allow_tools=allow_tools)
if tool_lines:
lines.extend(tool_lines)
if not lines:
return _ensure_scores("I don't have enough data to answer that.")
fact_meta = _fact_pack_meta(lines)
fact_pack = _fact_pack_text(lines, fact_meta)
if mode == "fast":
return _open_ended_fast(
prompt,
fact_pack=fact_pack,
fact_lines=lines,
fact_meta=fact_meta,
history_lines=history_lines,
state=state,
)
return _open_ended_deep(
prompt,
fact_pack=fact_pack,
fact_lines=lines,
fact_meta=fact_meta,
history_lines=history_lines,
state=state,
)
def _non_cluster_reply(prompt: str, *, history_lines: list[str], mode: str) -> str:
system = (
"System: You are Atlas, a helpful general assistant. "
"Answer using common knowledge when possible, and say when you're unsure. "
"Be concise and avoid unnecessary caveats. "
"Respond in plain sentences (no lists unless asked). "
"End every response with a line: 'Confidence: high|medium|low'."
)
model = _model_for_mode(mode)
context = _append_history_context("", history_lines) if history_lines else ""
reply = _ollama_call(
("general", "reply"),
prompt,
context=context,
use_history=False,
system_override=system,
model=model,
)
return _ensure_scores(reply)
# Internal HTTP endpoint for cluster answers (website uses this).
class _AtlasbotHandler(BaseHTTPRequestHandler):
server_version = "AtlasbotHTTP/1.0"
def _write_json(self, status: int, payload: dict[str, Any]):
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _authorized(self) -> bool:
if not ATLASBOT_INTERNAL_TOKEN:
return True
token = self.headers.get("X-Internal-Token", "")
return token == ATLASBOT_INTERNAL_TOKEN
def do_GET(self): # noqa: N802
if self.path == "/health":
self._write_json(200, {"status": "ok"})
return
self._write_json(404, {"error": "not_found"})
def do_POST(self): # noqa: N802
if self.path != "/v1/answer":
self._write_json(404, {"error": "not_found"})
return
if not self._authorized():
self._write_json(401, {"error": "unauthorized"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
length = 0
raw = self.rfile.read(length) if length > 0 else b""
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except json.JSONDecodeError:
self._write_json(400, {"error": "invalid_json"})
return
prompt = str(payload.get("prompt") or payload.get("question") or "").strip()
if not prompt:
self._write_json(400, {"error": "missing_prompt"})
return
cleaned = _strip_bot_mention(prompt)
mode = str(payload.get("mode") or "deep").lower()
if mode in ("quick", "fast"):
mode = "fast"
elif mode in ("smart", "deep"):
mode = "deep"
else:
mode = "deep"
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
history_payload = payload.get("history") or []
history_lines = _history_payload_lines(history_payload)
history_cluster = _history_mentions_cluster(
history_lines,
inventory=inventory,
workloads=workloads,
)
followup = _is_followup_query(cleaned)
cleaned_q = normalize_query(cleaned)
cluster_affinity = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
subjective = _is_subjective_query(cleaned)
followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS)
contextual = history_cluster and (followup or followup_affinity)
cluster_query = cluster_affinity or contextual
context = ""
if cluster_query:
context = build_context(
cleaned,
allow_tools=False,
targets=[],
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if cluster_query:
answer = open_ended_answer(
cleaned,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history_lines,
mode=mode,
allow_tools=False,
state=None,
)
else:
answer = _non_cluster_reply(cleaned, history_lines=history_lines, mode=mode)
self._write_json(200, {"answer": answer})
def _start_http_server():
server = HTTPServer(("0.0.0.0", ATLASBOT_HTTP_PORT), _AtlasbotHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def _history_mentions_cluster(
history_lines: list[str],
*,
inventory: list[dict[str, Any]] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> bool:
recent = [line for line in history_lines[-8:] if isinstance(line, str)]
for line in recent:
cleaned = normalize_query(line)
if not cleaned:
continue
if _is_cluster_query(cleaned, inventory=inventory, workloads=workloads):
return True
return False
def build_context(
prompt: str,
*,
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
snapshot: dict[str, Any] | None = None,
workloads: list[dict[str, Any]] | None = None,
) -> str:
parts: list[str] = []
facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if facts:
parts.append(facts)
snapshot_json = snapshot_compact_context(
prompt,
snapshot,
inventory=inventory,
workloads=workloads,
)
if snapshot_json:
parts.append(snapshot_json)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
kb = kb_retrieve(prompt)
if not kb and _knowledge_intent(prompt):
kb = kb_retrieve_titles(prompt, limit=4)
if kb:
parts.append(kb)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
return "\n\n".join([p for p in parts if p]).strip()
def snapshot_context(prompt: str, snapshot: dict[str, Any] | None) -> str:
if not snapshot:
return ""
metrics = _snapshot_metrics(snapshot)
workloads = _snapshot_workloads(snapshot)
q = normalize_query(prompt)
parts: list[str] = []
nodes = snapshot.get("nodes") if isinstance(snapshot.get("nodes"), dict) else {}
if nodes.get("total") is not None:
parts.append(
f"Snapshot: nodes_total={nodes.get('total')}, ready={nodes.get('ready')}, not_ready={nodes.get('not_ready')}."
)
if any(word in q for word in ("postgres", "connections", "db")):
postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {}
if postgres:
parts.append(f"Snapshot: postgres_connections={postgres}.")
if any(word in q for word in ("hottest", "cpu", "ram", "memory", "net", "network", "io", "disk")):
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
parts.append(f"Snapshot: hottest_nodes={hottest}.")
if workloads and any(word in q for word in ("run", "running", "host", "node", "where", "which")):
match = _select_workload(prompt, workloads)
if match:
parts.append(f"Snapshot: workload={match}.")
return "\n".join(parts).strip()
def _compact_nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
details = snapshot.get("nodes_detail") if isinstance(snapshot.get("nodes_detail"), list) else []
output: list[dict[str, Any]] = []
for node in details:
if not isinstance(node, dict):
continue
name = node.get("name")
if not name:
continue
output.append(
{
"name": name,
"ready": node.get("ready"),
"hardware": node.get("hardware"),
"arch": node.get("arch"),
"roles": node.get("roles"),
"is_worker": node.get("is_worker"),
"os": node.get("os"),
"kernel": node.get("kernel"),
"kubelet": node.get("kubelet"),
"container_runtime": node.get("container_runtime"),
}
)
return output
def _compact_metrics(snapshot: dict[str, Any]) -> dict[str, Any]:
metrics = snapshot.get("metrics") if isinstance(snapshot.get("metrics"), dict) else {}
return {
"pods_running": metrics.get("pods_running"),
"pods_pending": metrics.get("pods_pending"),
"pods_failed": metrics.get("pods_failed"),
"pods_succeeded": metrics.get("pods_succeeded"),
"postgres_connections": metrics.get("postgres_connections"),
"hottest_nodes": metrics.get("hottest_nodes"),
"node_usage": metrics.get("node_usage"),
"top_restarts_1h": metrics.get("top_restarts_1h"),
}
def snapshot_compact_context(
prompt: str,
snapshot: dict[str, Any] | None,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> str:
if not snapshot:
return ""
compact = {
"collected_at": snapshot.get("collected_at"),
"nodes_summary": snapshot.get("nodes_summary"),
"expected_workers": expected_worker_nodes_from_metrics(),
"nodes_detail": _compact_nodes_detail(snapshot),
"workloads": _workloads_for_prompt(prompt, workloads or [], limit=40) if workloads else [],
"metrics": _compact_metrics(snapshot),
"flux": snapshot.get("flux"),
"errors": snapshot.get("errors"),
}
text = json.dumps(compact, ensure_ascii=False)
if len(text) > MAX_FACTS_CHARS:
text = text[: MAX_FACTS_CHARS - 3].rstrip() + "..."
return "Cluster snapshot (JSON):\n" + text
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
return any(
phrase in q
for phrase in (
"what do you know",
"tell me about",
"interesting",
"overview",
"summary",
"describe",
"explain",
)
)
def _is_cluster_query(
prompt: str,
*,
inventory: list[dict[str, Any]] | None,
workloads: list[dict[str, Any]] | None,
) -> bool:
q = normalize_query(prompt)
if not q:
return False
if TITAN_NODE_RE.search(q):
return True
if any(word in q for word in CLUSTER_HINT_WORDS):
return True
for host_match in HOST_RE.finditer(q):
host = host_match.group(1).lower()
if host.endswith("bstein.dev"):
return True
tokens = set(_tokens(q))
if _NAME_INDEX and tokens & _NAME_INDEX:
return True
return False
def _inventory_summary(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = [n for n in inventory if n.get("ready") is True]
not_ready = [n for n in inventory if n.get("ready") is False]
parts = [f"Atlas cluster: {total} nodes ({len(ready)} ready, {len(not_ready)} not ready)."]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"- {key}: {len(nodes)} nodes ({', '.join(nodes)})")
return "\n".join(parts)
def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
parts: list[str] = []
inv = _inventory_summary(inventory)
if inv:
parts.append(inv)
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
parts.append(kb_titles)
summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(
hist_key,
prompt: str,
*,
context: str,
use_history: bool = True,
system_override: str | None = None,
model: str | None = None,
) -> str:
system = system_override or (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Use the provided context and facts as your source of truth. "
"If the context includes a cluster snapshot, treat the question as about the Atlas/Othrys cluster even if the prompt is ambiguous. "
"When a cluster snapshot is provided, never answer about unrelated meanings of 'Atlas' (maps, mythology, Apache Atlas, etc). "
"Treat 'hottest' as highest utilization (CPU/RAM/NET/IO) rather than temperature. "
"If you infer or synthesize, say 'Based on the snapshot' and keep it brief. "
"For subjective prompts (interesting, favorite, unconventional), pick one or two observations from the context, explain why they stand out in 1-2 sentences, and avoid repeating the same observation as the last response if you can. "
"Prefer exact repo paths and Kubernetes resource names when relevant. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"Translate metrics into natural language instead of echoing raw label/value pairs. "
"Avoid bare lists unless the user asked for a list; weave numbers into sentences. "
"Do not answer by only listing runbooks; if the question is about Atlas/Othrys, summarize the cluster first and mention docs only if useful. "
"If the question is not about Atlas/Othrys and no cluster context is provided, answer using general knowledge and say when you are unsure. "
"If the answer is not grounded in the provided context or tool data, say you do not know. "
"End every response with a line: 'Confidence: high|medium|low'."
)
endpoint = _ollama_endpoint()
if not endpoint:
raise RuntimeError("ollama endpoint missing")
messages: list[dict[str, str]] = [{"role": "system", "content": system}]
if context:
messages.append({"role": "user", "content": "Context (grounded):\n" + context[:MAX_CONTEXT_CHARS]})
if use_history:
messages.extend(_history_to_messages(history[hist_key][-24:]))
messages.append({"role": "user", "content": prompt})
model_name = model or MODEL
payload = {"model": model_name, "messages": messages, "stream": False}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(endpoint, data=json.dumps(payload).encode(), headers=headers)
lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None
if lock:
lock.acquire()
try:
try:
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
except error.HTTPError as exc:
if exc.code == 404 and FALLBACK_MODEL and FALLBACK_MODEL != payload["model"]:
payload["model"] = FALLBACK_MODEL
r = request.Request(endpoint, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
else:
raise
msg = data.get("message") if isinstance(data, dict) else None
if isinstance(msg, dict):
raw_reply = msg.get("content")
else:
raw_reply = data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
if use_history:
history[hist_key].append(f"Atlas: {reply}")
return reply
finally:
if lock:
lock.release()
def ollama_reply(
hist_key,
prompt: str,
*,
context: str,
fallback: str = "",
use_history: bool = True,
model: str | None = None,
) -> str:
last_error = None
for attempt in range(max(1, OLLAMA_RETRIES + 1)):
try:
return _ollama_call(
hist_key,
prompt,
context=context,
use_history=use_history,
model=model,
)
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(min(4, 2 ** attempt))
if fallback:
if use_history:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "I don't have enough data to answer that."
def ollama_reply_with_thinking(
token: str,
room: str,
hist_key,
prompt: str,
*,
context: str,
fallback: str,
use_history: bool = True,
model: str | None = None,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(
hist_key,
prompt,
context=context,
fallback=fallback,
use_history=use_history,
model=model,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def open_ended_with_thinking(
token: str,
room: str,
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]],
history_lines: list[str],
mode: str,
allow_tools: bool,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
total_steps = _open_ended_total_steps(mode)
state = ThoughtState(total_steps=total_steps)
def worker():
result["reply"] = open_ended_answer(
prompt,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history_lines,
mode=mode,
allow_tools=allow_tools,
state=state,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
send_msg(token, room, state.status_line())
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
cleaned_body = _strip_bot_mention(body)
lower_body = cleaned_body.lower()
mode = _detect_mode_from_body(body, default="deep" if is_dm else "deep")
# Only do live cluster introspection in DMs.
allow_tools = is_dm
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(lower_body):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
snapshot = _snapshot_state()
inventory = node_inventory_for_prompt(cleaned_body)
if not inventory:
inventory = _snapshot_inventory(snapshot)
workloads = _snapshot_workloads(snapshot)
history_cluster = _history_mentions_cluster(
history[hist_key],
inventory=inventory,
workloads=workloads,
)
followup = _is_followup_query(cleaned_body)
cleaned_q = normalize_query(cleaned_body)
cluster_affinity = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
subjective = _is_subjective_query(cleaned_body)
followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS)
contextual = history_cluster and (followup or followup_affinity)
cluster_query = cluster_affinity or contextual
context = ""
if cluster_query:
context = build_context(
cleaned_body,
allow_tools=allow_tools,
targets=targets,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
send_msg(token, rid, extra)
continue
if cluster_query:
reply = open_ended_with_thinking(
token,
rid,
cleaned_body,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history[hist_key],
mode=mode if mode in ("fast", "deep") else "deep",
allow_tools=allow_tools,
)
else:
reply = _non_cluster_reply(
cleaned_body,
history_lines=history[hist_key],
mode=mode if mode in ("fast", "deep") else "deep",
)
send_msg(token, rid, reply)
history[hist_key].append(f"Atlas: {reply}")
history[hist_key] = history[hist_key][-80:]
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
_start_http_server()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()