1548 lines
56 KiB
Python

import collections
import json
import os
import re
import ssl
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
ARIADNE_STATE_URL = os.environ.get("ARIADNE_STATE_URL", "")
ARIADNE_STATE_TOKEN = os.environ.get("ARIADNE_STATE_TOKEN", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
"unknown_error",
"timeout",
"crash",
"crashloop",
"restart",
"restarts",
"pending",
"unreachable",
"latency",
}
CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE)
_DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D"
OPERATION_HINTS = {
"count": ("how many", "count", "number", "total"),
"list": ("list", "which", "what are", "show", "names"),
"top": ("top", "hottest", "highest", "most", "largest", "max", "maximum"),
"status": ("ready", "not ready", "unready", "down", "missing", "status"),
}
METRIC_HINTS = {
"cpu": ("cpu",),
"ram": ("ram", "memory", "mem"),
"net": ("net", "network", "bandwidth", "throughput"),
"io": ("io", "disk", "storage"),
"connections": ("connections", "conn", "postgres", "database", "db"),
}
HARDWARE_HINTS = {
"amd64": ("amd64", "x86", "x86_64", "x86-64"),
"jetson": ("jetson",),
"rpi4": ("rpi4",),
"rpi5": ("rpi5",),
"rpi": ("rpi", "raspberry"),
"arm64": ("arm64", "aarch64"),
}
def normalize_query(text: str) -> str:
cleaned = (text or "").lower()
for ch in _DASH_CHARS:
cleaned = cleaned.replace(ch, "-")
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def _body_mentions_token(body: str) -> bool:
lower = (body or "").strip().lower()
if not lower:
return False
for token in MENTION_LOCALPARTS:
for prefix in (token, f"@{token}"):
if lower.startswith(prefix + ":") or lower.startswith(prefix + ",") or lower.startswith(prefix + " "):
return True
return False
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
if _body_mentions_token(body or ""):
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
login_user = normalize_user_id(USER)
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": login_user},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
NODE_REGEX = re.compile(r'node=~"([^"]+)"')
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX, _METRIC_INDEX
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
def _score_kb_docs(query: str) -> list[dict[str, Any]]:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return []
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return []
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
return [d for _, d in scored]
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q:
return ""
scored = _score_kb_docs(q)
picked = scored[:limit]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def kb_retrieve_titles(query: str, *, limit: int = 4) -> str:
scored = _score_kb_docs(query)
picked = scored[:limit]
if not picked:
return ""
parts = ["Relevant runbooks:"]
for doc in picked:
title = doc.get("title") or doc.get("path") or "runbook"
path = doc.get("path") or ""
if path:
parts.append(f"- {title} ({path})")
else:
parts.append(f"- {title}")
return "\n".join(parts)
def _extract_titan_nodes(text: str) -> list[str]:
cleaned = normalize_query(text)
names = {n.lower() for n in TITAN_NODE_RE.findall(cleaned) if n}
for match in re.finditer(r"titan-([0-9a-z]{2}(?:[/,][0-9a-z]{2})+)", cleaned, re.IGNORECASE):
tail = match.group(1)
for part in re.split(r"[/,]", tail):
part = part.strip()
if part:
names.add(f"titan-{part.lower()}")
for match in TITAN_RANGE_RE.finditer(cleaned):
left, right = match.groups()
if left:
names.add(f"titan-{left.lower()}")
if right:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _humanize_rate(value: str, *, unit: str) -> str:
try:
val = float(value)
except (TypeError, ValueError):
return value
if unit == "%":
return f"{val:.1f}%"
if val >= 1024 * 1024:
return f"{val / (1024 * 1024):.2f} MB/s"
if val >= 1024:
return f"{val / 1024:.2f} KB/s"
return f"{val:.2f} B/s"
def _has_any(text: str, phrases: tuple[str, ...]) -> bool:
return any(p in text for p in phrases)
def _detect_operation(q: str) -> str | None:
if _has_any(q, OPERATION_HINTS["top"]):
return "top"
for op, phrases in OPERATION_HINTS.items():
if op == "top":
continue
if _has_any(q, phrases):
return op
return None
def _detect_metric(q: str) -> str | None:
for metric, phrases in METRIC_HINTS.items():
if _has_any(q, phrases):
return metric
return None
def _detect_hardware_filters(q: str) -> tuple[set[str], set[str]]:
include: set[str] = set()
exclude: set[str] = set()
for hardware, phrases in HARDWARE_HINTS.items():
for phrase in phrases:
if f"non {phrase}" in q or f"non-{phrase}" in q or f"not {phrase}" in q:
exclude.add(hardware)
elif phrase in q:
include.add(hardware)
return include, exclude
def _detect_entity(q: str) -> str | None:
if "node" in q or "nodes" in q or "worker" in q or TITAN_NODE_RE.search(q):
return "node"
if "pod" in q or "pods" in q:
return "pod"
if "namespace" in q or "namespaces" in q:
return "namespace"
return None
def _metric_entry_score(entry: dict[str, Any], tokens: list[str], *, metric: str | None, op: str | None) -> int:
hay = _metric_tokens(entry)
score = 0
for t in set(tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if metric:
for phrase in METRIC_HINTS.get(metric, (metric,)):
if phrase in hay:
score += 3
if op == "top" and ("hottest" in hay or "top" in hay):
score += 3
if "node" in hay:
score += 1
return score
def _select_metric_entry(tokens: list[str], *, metric: str | None, op: str | None) -> dict[str, Any] | None:
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
score = _metric_entry_score(entry, tokens, metric=metric, op=op)
if score:
scored.append((score, entry))
if not scored:
return None
scored.sort(key=lambda item: item[0], reverse=True)
return scored[0][1]
def _apply_node_filter(expr: str, node_regex: str | None) -> str:
if not node_regex:
return expr
needle = 'node_uname_info{nodename!=""}'
replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}'
return expr.replace(needle, replacement)
def _metric_expr_uses_percent(entry: dict[str, Any]) -> bool:
exprs = entry.get("exprs")
expr = exprs[0] if isinstance(exprs, list) and exprs else ""
return "* 100" in expr or "*100" in expr
def _format_metric_value(value: str, *, percent: bool) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return value
if percent:
return f"{num:.1f}%"
if abs(num) >= 1:
return f"{num:.2f}".rstrip("0").rstrip(".")
return f"{num:.4f}".rstrip("0").rstrip(".")
def _format_metric_label(metric: dict[str, Any]) -> str:
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts:
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
return ", ".join(label_parts) if label_parts else "series"
def _primary_series_metric(res: dict | None) -> tuple[str | None, str | None]:
series = _vm_value_series(res or {})
if not series:
return (None, None)
first = series[0]
metric = first.get("metric") if isinstance(first, dict) else {}
value = first.get("value") if isinstance(first, dict) else []
node = metric.get("node") if isinstance(metric, dict) else None
val = value[1] if isinstance(value, list) and len(value) > 1 else None
return (node, val)
def _format_metric_answer(entry: dict[str, Any], res: dict | None) -> str:
series = _vm_value_series(res)
panel = entry.get("panel_title") or "Metric"
if not series:
return ""
percent = _metric_expr_uses_percent(entry)
lines: list[str] = []
for r in series[:5]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
label = _format_metric_label(metric if isinstance(metric, dict) else {})
lines.append(f"{label}: {_format_metric_value(val, percent=percent)}")
if not lines:
return ""
if len(lines) == 1:
return f"{panel}: {lines[0]}."
return f"{panel}:\n" + "\n".join(f"- {line}" for line in lines)
def _inventory_filter(
inventory: list[dict[str, Any]],
*,
include_hw: set[str],
exclude_hw: set[str],
only_workers: bool,
only_ready: bool | None,
nodes_in_query: list[str],
) -> list[dict[str, Any]]:
results = inventory
if nodes_in_query:
results = [node for node in results if node.get("name") in nodes_in_query]
if only_workers:
results = [node for node in results if node.get("is_worker") is True]
if only_ready is True:
results = [node for node in results if node.get("ready") is True]
if only_ready is False:
results = [node for node in results if node.get("ready") is False]
if include_hw:
results = [node for node in results if _hardware_match(node, include_hw)]
if exclude_hw:
results = [node for node in results if not _hardware_match(node, exclude_hw)]
return results
def _hardware_match(node: dict[str, Any], filters: set[str]) -> bool:
hw = node.get("hardware") or ""
arch = node.get("arch") or ""
for f in filters:
if f == "rpi" and hw in ("rpi4", "rpi5"):
return True
if f == "arm64" and arch == "arm64":
return True
if hw == f:
return True
if f == "amd64" and arch == "amd64":
return True
return False
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
if key.startswith("node-role.kubernetes.io/"):
role = key.split("/", 1)[-1]
if role:
roles.append(role)
return sorted(set(roles))
def _hardware_class(labels: dict[str, Any]) -> str:
if str(labels.get("jetson") or "").lower() == "true":
return "jetson"
hardware = (labels.get("hardware") or "").strip().lower()
if hardware in ("rpi4", "rpi5"):
return hardware
arch = labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or ""
if arch == "amd64":
return "amd64"
if arch == "arm64":
return "arm64-unknown"
return "unknown"
def node_inventory_live() -> list[dict[str, Any]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return []
items = data.get("items") or []
inventory: list[dict[str, Any]] = []
for node in items if isinstance(items, list) else []:
meta = node.get("metadata") or {}
labels = meta.get("labels") or {}
name = meta.get("name") or ""
if not name:
continue
inventory.append(
{
"name": name,
"arch": labels.get("kubernetes.io/arch") or labels.get("beta.kubernetes.io/arch") or "",
"hardware": _hardware_class(labels),
"roles": _node_roles(labels),
"is_worker": _node_is_worker(node),
"ready": _node_ready_status(node),
}
)
return sorted(inventory, key=lambda item: item["name"])
def _group_nodes(inventory: list[dict[str, Any]]) -> dict[str, list[str]]:
grouped: dict[str, list[str]] = collections.defaultdict(list)
for node in inventory:
grouped[node.get("hardware") or "unknown"].append(node["name"])
return {k: sorted(v) for k, v in grouped.items()}
def node_inventory_context(query: str, inventory: list[dict[str, Any]] | None = None) -> str:
q = normalize_query(query)
if not any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster")):
return ""
if inventory is None:
inventory = node_inventory_live()
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = sum(1 for node in inventory if node.get("ready") is True)
not_ready = sum(1 for node in inventory if node.get("ready") is False)
lines: list[str] = [
"Node inventory (live):",
f"- total: {total}, ready: {ready}, not ready: {not_ready}",
]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
if key in groups:
lines.append(f"- {key}: {', '.join(groups[key])}")
non_rpi = sorted(set(groups.get("jetson", [])) | set(groups.get("amd64", [])))
if non_rpi:
lines.append(f"- non_raspberry_pi (derived): {', '.join(non_rpi)}")
unknowns = groups.get("arm64-unknown", []) + groups.get("unknown", [])
if unknowns:
lines.append("- note: nodes labeled arm64-unknown/unknown may still be Raspberry Pi unless tagged.")
expected_workers = expected_worker_nodes_from_metrics()
if expected_workers:
ready_workers, not_ready_workers = worker_nodes_status()
missing = sorted(set(expected_workers) - set(ready_workers + not_ready_workers))
lines.append(f"- expected_workers (grafana): {', '.join(expected_workers)}")
lines.append(f"- workers_ready: {', '.join(ready_workers)}")
if not_ready_workers:
lines.append(f"- workers_not_ready: {', '.join(not_ready_workers)}")
if missing:
lines.append(f"- workers_missing (derived): {', '.join(missing)}")
return "\n".join(lines)
def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]:
q = normalize_query(prompt)
if any(word in q for word in ("node", "nodes", "raspberry", "rpi", "jetson", "amd64", "hardware", "cluster", "worker")):
return node_inventory_live()
return []
def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]:
names = [node["name"] for node in inventory]
ready = [node["name"] for node in inventory if node.get("ready") is True]
not_ready = [node["name"] for node in inventory if node.get("ready") is False]
groups = _group_nodes(inventory)
workers = [node for node in inventory if node.get("is_worker") is True]
worker_names = [node["name"] for node in workers]
worker_ready = [node["name"] for node in workers if node.get("ready") is True]
worker_not_ready = [node["name"] for node in workers if node.get("ready") is False]
expected_workers = expected_worker_nodes_from_metrics()
expected_ready = [n for n in expected_workers if n in ready] if expected_workers else []
expected_not_ready = [n for n in expected_workers if n in not_ready] if expected_workers else []
expected_missing = [n for n in expected_workers if n not in names] if expected_workers else []
return {
"names": sorted(names),
"ready": sorted(ready),
"not_ready": sorted(not_ready),
"groups": groups,
"worker_names": sorted(worker_names),
"worker_ready": sorted(worker_ready),
"worker_not_ready": sorted(worker_not_ready),
"expected_workers": expected_workers,
"expected_ready": sorted(expected_ready),
"expected_not_ready": sorted(expected_not_ready),
"expected_missing": sorted(expected_missing),
}
def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_summary: str) -> str:
q = normalize_query(prompt)
if not q:
return ""
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
entity = _detect_entity(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
only_ready: bool | None = None
if "not ready" in q or "unready" in q or "down" in q or "missing" in q:
only_ready = False
elif "ready" in q:
only_ready = True
if entity == "node" and only_ready is not None and op != "count":
op = "status"
if not op and entity == "node":
op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count"
if op == "top" and metric is None:
metric = "cpu"
# Metrics-first when a metric or top operation is requested.
if metric or op == "top":
entry = _select_metric_entry(tokens, metric=metric, op=op)
if entry and isinstance(entry.get("exprs"), list) and entry["exprs"]:
expr = entry["exprs"][0]
if inventory:
scoped = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=None,
nodes_in_query=nodes_in_query,
)
if scoped:
node_regex = "|".join([n["name"] for n in scoped])
expr = _apply_node_filter(expr, node_regex)
res = vm_query(expr, timeout=20)
answer = _format_metric_answer(entry, res)
if answer:
scope_parts: list[str] = []
if include_hw:
scope_parts.append(" and ".join(sorted(include_hw)))
if exclude_hw:
scope_parts.append(f"excluding {' and '.join(sorted(exclude_hw))}")
if only_workers:
scope_parts.append("worker")
if scope_parts:
scope = " ".join(scope_parts)
overall_note = ""
base_res = vm_query(entry["exprs"][0], timeout=20)
base_node, base_val = _primary_series_metric(base_res)
scoped_node, scoped_val = _primary_series_metric(res)
if base_node and scoped_node and base_node != scoped_node:
percent = _metric_expr_uses_percent(entry)
base_val_fmt = _format_metric_value(base_val or "", percent=percent)
overall_note = f" Overall hottest node: {base_node} ({base_val_fmt})."
return f"Among {scope} nodes, {answer}{overall_note}"
return answer
if metrics_summary:
return metrics_summary
if entity != "node" or not inventory:
if any(word in q for word in METRIC_HINT_WORDS) and not metrics_summary:
return "I don't have data to answer that right now."
return ""
expected_workers = expected_worker_nodes_from_metrics()
filtered = _inventory_filter(
inventory,
include_hw=include_hw,
exclude_hw=exclude_hw,
only_workers=only_workers,
only_ready=only_ready if op in ("status", "count") else None,
nodes_in_query=nodes_in_query,
)
names = [node["name"] for node in filtered]
if op == "status":
if "missing" in q and expected_workers:
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
return "Missing nodes: " + (", ".join(missing) if missing else "none") + "."
if only_ready is False:
return "Not ready nodes: " + (", ".join(names) if names else "none") + "."
if only_ready is True:
return f"Ready nodes ({len(names)}): " + (", ".join(names) if names else "none") + "."
if op == "count":
if expected_workers and ("expected" in q or "should" in q):
missing = sorted(set(expected_workers) - {n["name"] for n in inventory})
msg = f"Grafana inventory expects {len(expected_workers)} worker nodes."
if missing:
msg += f" Missing: {', '.join(missing)}."
return msg
if not (include_hw or exclude_hw or nodes_in_query or only_workers):
return f"Atlas has {len(names)} nodes."
return f"Matching nodes: {len(names)}."
if op == "list":
if nodes_in_query:
parts = []
existing = {n["name"] for n in inventory}
for node in nodes_in_query:
parts.append(f"{node}: {'present' if node in existing else 'not present'}")
return "Node presence: " + ", ".join(parts) + "."
if not names:
return "Matching nodes: none."
shown = names[:30]
suffix = f", … (+{len(names) - 30} more)" if len(names) > 30 else ""
return "Matching nodes: " + ", ".join(shown) + suffix + "."
return ""
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data."
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
fallback = _metrics_fallback_summary(panel, summary)
return context, fallback
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path}{ns}/{svc}{wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def _ariadne_state(timeout: int = 5) -> dict | None:
if not ARIADNE_STATE_URL:
return None
headers = {}
if ARIADNE_STATE_TOKEN:
headers["X-Internal-Token"] = ARIADNE_STATE_TOKEN
r = request.Request(ARIADNE_STATE_URL, headers=headers, method="GET")
try:
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return payload if isinstance(payload, dict) else None
except Exception:
return None
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def _vm_value_series(res: dict) -> list[dict]:
if not res or (res.get("status") != "success"):
return []
data = res.get("data") or {}
result = data.get("result") or []
return result if isinstance(result, list) else []
def vm_render_result(res: dict | None, limit: int = 12) -> str:
if not res:
return ""
series = _vm_value_series(res)
if not series:
return ""
out: list[str] = []
for r in series[:limit]:
if not isinstance(r, dict):
continue
metric = r.get("metric") or {}
value = r.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
# Prefer common labels if present.
label_parts = []
for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"):
if isinstance(metric, dict) and metric.get(k):
label_parts.append(f"{k}={metric.get(k)}")
if not label_parts and isinstance(metric, dict):
for k in sorted(metric.keys()):
if k.startswith("__"):
continue
label_parts.append(f"{k}={metric.get(k)}")
if len(label_parts) >= 4:
break
labels = ", ".join(label_parts) if label_parts else "series"
out.append(f"- {labels}: {val}")
return "\n".join(out)
def _parse_metric_lines(summary: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for line in (summary or "").splitlines():
line = line.strip()
if not line.startswith("-"):
continue
try:
label, value = line.lstrip("-").split(":", 1)
except ValueError:
continue
parsed[label.strip()] = value.strip()
return parsed
def _metrics_fallback_summary(panel: str, summary: str) -> str:
parsed = _parse_metric_lines(summary)
panel_l = (panel or "").lower()
if parsed:
items = list(parsed.items())
if len(items) == 1:
label, value = items[0]
return f"{panel}: {label} = {value}."
compact = "; ".join(f"{k}={v}" for k, v in items)
return f"{panel}: {compact}."
if panel_l:
return f"{panel}: {summary}"
return summary
def _node_ready_status(node: dict) -> bool | None:
conditions = node.get("status", {}).get("conditions") or []
for cond in conditions if isinstance(conditions, list) else []:
if cond.get("type") == "Ready":
if cond.get("status") == "True":
return True
if cond.get("status") == "False":
return False
return None
return None
def _node_is_worker(node: dict) -> bool:
labels = (node.get("metadata") or {}).get("labels") or {}
if labels.get("node-role.kubernetes.io/control-plane") is not None:
return False
if labels.get("node-role.kubernetes.io/master") is not None:
return False
if labels.get("node-role.kubernetes.io/worker") is not None:
return True
return True
def worker_nodes_status() -> tuple[list[str], list[str]]:
try:
data = k8s_get("/api/v1/nodes?limit=500")
except Exception:
return ([], [])
items = data.get("items") or []
ready_nodes: list[str] = []
not_ready_nodes: list[str] = []
for node in items if isinstance(items, list) else []:
if not _node_is_worker(node):
continue
name = (node.get("metadata") or {}).get("name") or ""
if not name:
continue
ready = _node_ready_status(node)
if ready is True:
ready_nodes.append(name)
elif ready is False:
not_ready_nodes.append(name)
return (sorted(ready_nodes), sorted(not_ready_nodes))
def expected_worker_nodes_from_metrics() -> list[str]:
for entry in _METRIC_INDEX:
panel = (entry.get("panel_title") or "").lower()
if "worker nodes ready" not in panel:
continue
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
for expr in exprs:
if not isinstance(expr, str):
continue
match = NODE_REGEX.search(expr)
if not match:
continue
raw = match.group(1)
nodes = [n.strip() for n in raw.split("|") if n.strip()]
return sorted(nodes)
return []
def _context_fallback(context: str) -> str:
if not context:
return ""
trimmed = context.strip()
if len(trimmed) > MAX_TOOL_CHARS:
trimmed = trimmed[: MAX_TOOL_CHARS - 3].rstrip() + "..."
return "Here is what I found:\n" + trimmed
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
def vm_cluster_snapshot() -> str:
parts: list[str] = []
# Node readiness (kube-state-metrics).
ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})')
not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})')
if ready and not_ready:
try:
r = _vm_value_series(ready)[0]["value"][1]
nr = _vm_value_series(not_ready)[0]["value"][1]
parts.append(f"- nodes ready: {r} (not ready: {nr})")
except Exception:
pass
phases = vm_query("sum by (phase) (kube_pod_status_phase)")
pr = vm_render_result(phases, limit=8)
if pr:
parts.append("Pod phases:")
parts.append(pr)
return "\n".join(parts).strip()
def _strip_code_fence(text: str) -> str:
cleaned = (text or "").strip()
match = CODE_FENCE_RE.match(cleaned)
if match:
return match.group(1).strip()
return cleaned
def _normalize_reply(value: Any) -> str:
if isinstance(value, dict):
for key in ("content", "response", "reply", "message"):
if key in value:
return _normalize_reply(value[key])
for v in value.values():
if isinstance(v, (str, dict, list)):
return _normalize_reply(v)
return json.dumps(value, ensure_ascii=False)
if isinstance(value, list):
parts = [_normalize_reply(item) for item in value]
return " ".join(p for p in parts if p)
if value is None:
return ""
text = _strip_code_fence(str(value))
if text.startswith("{") and text.endswith("}"):
try:
return _normalize_reply(json.loads(text))
except Exception:
return text
return text
# Internal HTTP endpoint for cluster answers (website uses this).
class _AtlasbotHandler(BaseHTTPRequestHandler):
server_version = "AtlasbotHTTP/1.0"
def _write_json(self, status: int, payload: dict[str, Any]):
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _authorized(self) -> bool:
if not ATLASBOT_INTERNAL_TOKEN:
return True
token = self.headers.get("X-Internal-Token", "")
return token == ATLASBOT_INTERNAL_TOKEN
def do_GET(self): # noqa: N802
if self.path == "/health":
self._write_json(200, {"status": "ok"})
return
self._write_json(404, {"error": "not_found"})
def do_POST(self): # noqa: N802
if self.path != "/v1/answer":
self._write_json(404, {"error": "not_found"})
return
if not self._authorized():
self._write_json(401, {"error": "unauthorized"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
length = 0
raw = self.rfile.read(length) if length > 0 else b""
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except json.JSONDecodeError:
self._write_json(400, {"error": "invalid_json"})
return
prompt = str(payload.get("prompt") or payload.get("question") or "").strip()
if not prompt:
self._write_json(400, {"error": "missing_prompt"})
return
inventory = node_inventory_live()
answer = structured_answer(prompt, inventory=inventory, metrics_summary="")
if not answer and _knowledge_intent(prompt):
answer = knowledge_summary(prompt, inventory)
if not answer:
kb = kb_retrieve_titles(prompt, limit=4)
answer = kb or ""
self._write_json(200, {"answer": answer})
def _start_http_server():
server = HTTPServer(("0.0.0.0", ATLASBOT_HTTP_PORT), _AtlasbotHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def build_context(
prompt: str,
*,
allow_tools: bool,
targets: list[tuple[str, str]],
inventory: list[dict[str, Any]] | None = None,
) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if kb:
parts.append(kb)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
node_ctx = node_inventory_context(prompt, inventory)
if node_ctx:
parts.append(node_ctx)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
p_l = (prompt or "").lower()
if any(w in p_l for w in METRIC_HINT_WORDS):
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
snap = vm_cluster_snapshot()
if snap:
parts.append("VictoriaMetrics (cluster snapshot):\n" + snap)
return "\n\n".join([p for p in parts if p]).strip()
def _knowledge_intent(prompt: str) -> bool:
q = normalize_query(prompt)
return any(
phrase in q
for phrase in (
"what do you know",
"tell me about",
"overview",
"summary",
"describe",
"explain",
"what is",
)
)
def _inventory_summary(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
total = len(inventory)
ready = [n for n in inventory if n.get("ready") is True]
not_ready = [n for n in inventory if n.get("ready") is False]
parts = [f"Atlas cluster: {total} nodes ({len(ready)} ready, {len(not_ready)} not ready)."]
for key in ("rpi5", "rpi4", "jetson", "amd64", "arm64-unknown", "unknown"):
nodes = groups.get(key) or []
if nodes:
parts.append(f"- {key}: {len(nodes)} nodes ({', '.join(nodes)})")
return "\n".join(parts)
def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
parts: list[str] = []
inv = _inventory_summary(inventory)
if inv:
parts.append(inv)
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
parts.append(kb_titles)
return "\n".join(parts).strip()
def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
system = (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp:
data = json.loads(resp.read().decode())
raw_reply = data.get("message") or data.get("response") or data.get("reply") or data
reply = _normalize_reply(raw_reply) or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str:
try:
return _ollama_call(hist_key, prompt, context=context)
except Exception:
if fallback:
history[hist_key].append(f"Atlas: {fallback}")
return fallback
return "Model backend is busy. Try again in a moment."
def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
def worker():
result["reply"] = ollama_reply(hist_key, prompt, context=context, fallback=fallback)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = (content.get("body", "") or "").strip()
if not body:
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if not (is_dm or mentioned):
continue
lower_body = body.lower()
# Only do live cluster introspection in DMs; metrics can be answered when mentioned.
allow_tools = is_dm
allow_metrics = is_dm or mentioned
promql = ""
if allow_tools:
m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body)
if m:
promql = m.group(1).strip()
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(body.lower()):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
inventory = node_inventory_for_prompt(body)
context = build_context(body, allow_tools=allow_tools, targets=targets, inventory=inventory)
if allow_tools and promql:
res = vm_query(promql, timeout=20)
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = metrics_fallback or ""
if not fallback and context:
fallback = _context_fallback(context)
structured = structured_answer(body, inventory=inventory, metrics_summary=metrics_fallback or "")
if structured:
send_msg(token, rid, structured)
continue
if _knowledge_intent(body):
summary = knowledge_summary(body, inventory)
if summary:
send_msg(token, rid, summary)
continue
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
body,
context=context,
fallback=fallback,
)
send_msg(token, rid, reply)
def login_with_retry():
last_err = None
for attempt in range(10):
try:
return login()
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(min(30, 2 ** attempt))
raise last_err
def main():
load_kb()
_start_http_server()
token = login_with_retry()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()