atlasbot: answer hottest node queries via metrics

This commit is contained in:
Brad Stein 2026-01-26 22:13:04 -03:00
parent 72bd22e912
commit 6432472be7

View File

@ -18,6 +18,8 @@ OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
ATLASBOT_HTTP_PORT = int(os.environ.get("ATLASBOT_HTTP_PORT", "8090"))
ATLASBOT_INTERNAL_TOKEN = os.environ.get("ATLASBOT_INTERNAL_TOKEN") or os.environ.get("CHAT_API_HOMEPAGE", "")
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
@ -93,6 +95,12 @@ CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL)
TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE)
TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE)
_DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D"
HOTTEST_QUERIES = {
"cpu": "label_replace(topk(1, avg by (node) (((1 - avg by (instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m]))) * 100) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")",
"ram": "label_replace(topk(1, avg by (node) ((avg by (instance) ((node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes * 100)) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")",
"net": "label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_network_receive_bytes_total{device!~\"lo\"}[5m]) + rate(node_network_transmit_bytes_total{device!~\"lo\"}[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")",
"io": "label_replace(topk(1, avg by (node) ((sum by (instance) (rate(node_disk_read_bytes_total[5m]) + rate(node_disk_written_bytes_total[5m]))) * on(instance) group_left(node) label_replace(node_uname_info{nodename!=\"\"}, \"node\", \"$1\", \"nodename\", \"(.*)\"))), \"__name__\", \"$1\", \"node\", \"(.*)\")",
}
def normalize_query(text: str) -> str:
cleaned = (text or "").lower()
@ -291,6 +299,77 @@ def _extract_titan_nodes(text: str) -> list[str]:
names.add(f"titan-{right.lower()}")
return sorted(names)
def _humanize_rate(value: str, *, unit: str) -> str:
try:
val = float(value)
except (TypeError, ValueError):
return value
if unit == "%":
return f"{val:.1f}%"
if val >= 1024 * 1024:
return f"{val / (1024 * 1024):.2f} MB/s"
if val >= 1024:
return f"{val / 1024:.2f} KB/s"
return f"{val:.2f} B/s"
def _hottest_query(metric: str, node_regex: str | None) -> str:
expr = HOTTEST_QUERIES[metric]
if node_regex:
needle = 'node_uname_info{nodename!=""}'
replacement = f'node_uname_info{{nodename!=\"\",nodename=~\"{node_regex}\"}}'
return expr.replace(needle, replacement)
return expr
def _vm_hottest(metric: str, node_regex: str | None) -> tuple[str, str] | None:
expr = _hottest_query(metric, node_regex)
res = vm_query(expr)
series = _vm_value_series(res)
if not series:
return None
first = series[0]
labels = first.get("metric") or {}
value = first.get("value") or []
val = value[1] if isinstance(value, list) and len(value) > 1 else ""
node = labels.get("node") or labels.get("__name__") or ""
if not node:
return None
return (str(node), str(val))
def _hottest_answer(q: str, *, nodes: list[str] | None) -> str:
metric = None
assumed_cpu = False
if "cpu" in q:
metric = "cpu"
elif "ram" in q or "memory" in q:
metric = "ram"
elif "net" in q or "network" in q:
metric = "net"
elif "io" in q or "disk" in q or "storage" in q:
metric = "io"
if metric is None:
metric = "cpu"
assumed_cpu = True
if nodes is not None and not nodes:
return "No nodes match the requested hardware class."
node_regex = "|".join(nodes) if nodes else None
metrics = [metric]
lines: list[str] = []
for m in metrics:
picked = _vm_hottest(m, node_regex)
if not picked:
continue
node, val = picked
unit = "%" if m in ("cpu", "ram") else "B/s"
val_str = _humanize_rate(val, unit=unit)
label = {"cpu": "CPU", "ram": "RAM", "net": "NET", "io": "I/O"}[m]
lines.append(f"{label}: {node} ({val_str})")
if not lines:
return ""
label = metric.upper()
suffix = " (defaulting to CPU)" if assumed_cpu else ""
return f"Hottest node by {label}: {lines[0].split(': ', 1)[1]}.{suffix}"
def _node_roles(labels: dict[str, Any]) -> list[str]:
roles: list[str] = []
for key in labels.keys():
@ -440,6 +519,21 @@ def structured_answer(prompt: str, *, inventory: list[dict[str, Any]], metrics_s
non_rpi = set(groups.get("jetson", [])) | set(groups.get("amd64", []))
unknown_hw = set(groups.get("arm64-unknown", [])) | set(groups.get("unknown", []))
if "hottest" in q or "hot" in q:
filter_nodes: list[str] | None = None
if "amd64" in q or "x86" in q:
filter_nodes = sorted(groups.get("amd64", []))
elif "jetson" in q:
filter_nodes = sorted(groups.get("jetson", []))
elif "raspberry" in q or "rpi" in q:
filter_nodes = sorted(rpi_nodes)
elif "arm64" in q:
filter_nodes = sorted([n for n in names if n not in groups.get("amd64", [])])
hottest = _hottest_answer(q, nodes=filter_nodes)
if hottest:
return hottest
return "Unable to determine hottest nodes right now (metrics unavailable)."
if nodes_in_query and ("raspberry" in q or "rpi" in q):
parts: list[str] = []
for node in nodes_in_query: