atlasbot: add metrics kb and long timeout

This commit is contained in:
Brad Stein 2026-01-26 14:08:11 -03:00
parent fff00dbe95
commit 33b5e2b678
9 changed files with 3934 additions and 6 deletions

File diff suppressed because it is too large Load Diff

View File

@ -26,6 +26,7 @@ from typing import Any, Iterable
import yaml
REPO_ROOT = Path(__file__).resolve().parents[1]
DASHBOARD_DIR = REPO_ROOT / "services" / "monitoring" / "dashboards"
CLUSTER_SCOPED_KINDS = {
"Namespace",
@ -67,6 +68,64 @@ def _sync_tree(source: Path, dest: Path) -> None:
shutil.copytree(source, dest)
def _iter_dashboard_panels(dashboard: dict[str, Any]) -> Iterable[dict[str, Any]]:
panels = dashboard.get("panels") if isinstance(dashboard.get("panels"), list) else []
for panel in panels:
if not isinstance(panel, dict):
continue
if panel.get("type") == "row" and isinstance(panel.get("panels"), list):
yield from _iter_dashboard_panels({"panels": panel.get("panels")})
continue
yield panel
def _extract_metrics_index(dashboard_dir: Path) -> list[dict[str, Any]]:
index: list[dict[str, Any]] = []
for path in sorted(dashboard_dir.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
continue
if not isinstance(data, dict):
continue
dash_title = data.get("title") or path.stem
dash_tags = data.get("tags") or []
for panel in _iter_dashboard_panels(data):
targets = panel.get("targets")
if not isinstance(targets, list):
continue
exprs: list[str] = []
for target in targets:
if not isinstance(target, dict):
continue
expr = target.get("expr")
if isinstance(expr, str) and expr.strip():
exprs.append(expr.strip())
if not exprs:
continue
datasource = panel.get("datasource") or {}
if isinstance(datasource, dict):
ds_uid = datasource.get("uid")
ds_type = datasource.get("type")
else:
ds_uid = None
ds_type = None
index.append(
{
"dashboard": dash_title,
"panel_title": panel.get("title") or "",
"panel_id": panel.get("id"),
"panel_type": panel.get("type"),
"description": panel.get("description") or "",
"tags": dash_tags,
"datasource_uid": ds_uid,
"datasource_type": ds_type,
"exprs": exprs,
}
)
return index
def kustomize_build(path: Path) -> str:
rel = path.relative_to(REPO_ROOT)
try:
@ -516,6 +575,7 @@ def main() -> int:
summary_path = out_dir / "catalog" / "atlas-summary.json"
diagram_path = out_dir / "diagrams" / "atlas-http.mmd"
runbooks_json_path = out_dir / "catalog" / "runbooks.json"
metrics_json_path = out_dir / "catalog" / "metrics.json"
catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix()
catalog_path.write_text(
@ -560,12 +620,17 @@ def main() -> int:
}
)
runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8")
metrics_index = _extract_metrics_index(DASHBOARD_DIR)
metrics_json_path.write_text(
json.dumps(metrics_index, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}")
print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {summary_path.relative_to(REPO_ROOT)}")
print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}")
print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {metrics_json_path.relative_to(REPO_ROOT)}")
if args.sync_comms:
comms_dir = REPO_ROOT / "services" / "comms" / "knowledge"

View File

@ -58,14 +58,14 @@ spec:
args:
- >-
. /vault/secrets/portal-env.sh
&& exec gunicorn -b 0.0.0.0:8080 --workers 2 --timeout 180 app:app
&& exec gunicorn -b 0.0.0.0:8080 --workers 2 --timeout 600 app:app
env:
- name: AI_CHAT_API
value: http://ollama.ai.svc.cluster.local:11434
- name: AI_CHAT_MODEL
value: qwen2.5-coder:7b-instruct-q4_0
- name: AI_CHAT_TIMEOUT_SEC
value: "60"
value: "480"
- name: AI_NODE_NAME
valueFrom:
fieldRef:

View File

@ -47,6 +47,8 @@ spec:
env:
- name: UPSTREAM_URL
value: http://bstein-dev-home-backend/api/chat
- name: UPSTREAM_TIMEOUT_SEC
value: "600"
ports:
- name: http
containerPort: 8080

View File

@ -6,6 +6,7 @@ from urllib import request, error
UPSTREAM = os.environ.get("UPSTREAM_URL", "http://bstein-dev-home-backend/api/chat")
KEY_MATRIX = os.environ.get("CHAT_KEY_MATRIX", "")
KEY_HOMEPAGE = os.environ.get("CHAT_KEY_HOMEPAGE", "")
UPSTREAM_TIMEOUT_SEC = float(os.environ.get("UPSTREAM_TIMEOUT_SEC", "90"))
ALLOWED = {k for k in (KEY_MATRIX, KEY_HOMEPAGE) if k}
@ -41,7 +42,7 @@ class Handler(BaseHTTPRequestHandler):
headers={"Content-Type": "application/json"},
method="POST",
)
with request.urlopen(upstream_req, timeout=90) as resp:
with request.urlopen(upstream_req, timeout=UPSTREAM_TIMEOUT_SEC) as resp:
data = resp.read()
self.send_response(resp.status)
for k, v in resp.headers.items():

View File

@ -16,7 +16,7 @@ spec:
labels:
app: atlasbot
annotations:
checksum/atlasbot-configmap: manual-atlasbot-12
checksum/atlasbot-configmap: manual-atlasbot-13
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"
@ -83,6 +83,10 @@ spec:
value: http://chat-ai-gateway.bstein-dev-home.svc.cluster.local/
- name: OLLAMA_MODEL
value: qwen2.5-coder:7b-instruct-q4_0
- name: OLLAMA_TIMEOUT_SEC
value: "480"
- name: ATLASBOT_THINKING_INTERVAL_SEC
value: "120"
resources:
requests:
cpu: 100m
@ -114,6 +118,8 @@ spec:
path: catalog/atlas.json
- key: atlas-summary.json
path: catalog/atlas-summary.json
- key: metrics.json
path: catalog/metrics.json
- key: runbooks.json
path: catalog/runbooks.json
- key: atlas-http.mmd

File diff suppressed because it is too large Load Diff

View File

@ -73,5 +73,6 @@ configMapGenerator:
- INDEX.md=knowledge/INDEX.md
- atlas.json=knowledge/catalog/atlas.json
- atlas-summary.json=knowledge/catalog/atlas-summary.json
- metrics.json=knowledge/catalog/metrics.json
- runbooks.json=knowledge/catalog/runbooks.json
- atlas-http.mmd=knowledge/diagrams/atlas-http.mmd

View File

@ -17,7 +17,7 @@ ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "90"))
OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480"))
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
@ -29,6 +29,7 @@ SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120"))
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
@ -59,8 +60,21 @@ STOPWORDS = {
}
METRIC_HINT_WORDS = {
"bandwidth",
"connections",
"cpu",
"database",
"db",
"disk",
"health",
"memory",
"network",
"node",
"nodes",
"postgres",
"status",
"storage",
"usage",
"down",
"slow",
"error",
@ -157,6 +171,7 @@ def send_msg(token: str, room: str, text: str):
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
_METRIC_INDEX: list[dict[str, Any]] = []
_NODE_CLASS_INDEX: dict[str, list[str]] = {}
_NODE_CLASS_RPI4: set[str] = set()
_NODE_CLASS_RPI5: set[str] = set()
@ -180,6 +195,7 @@ def load_kb():
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
metrics = _load_json_file(os.path.join(KB_DIR, "catalog", "metrics.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
@ -197,6 +213,7 @@ def load_kb():
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
_METRIC_INDEX = metrics if isinstance(metrics, list) else []
node_classes = _parse_node_classes(runbooks)
_NODE_CLASS_INDEX = node_classes
@ -356,6 +373,65 @@ def node_inventory_context(query: str) -> str:
return ""
return "\n".join(lines)
def _metric_tokens(entry: dict[str, Any]) -> str:
parts: list[str] = []
for key in ("panel_title", "dashboard", "description"):
val = entry.get(key)
if isinstance(val, str) and val:
parts.append(val.lower())
tags = entry.get("tags")
if isinstance(tags, list):
parts.extend(str(t).lower() for t in tags if t)
return " ".join(parts)
def metrics_lookup(query: str, limit: int = 3) -> list[dict[str, Any]]:
q_tokens = _tokens(query)
if not q_tokens or not _METRIC_INDEX:
return []
scored: list[tuple[int, dict[str, Any]]] = []
for entry in _METRIC_INDEX:
if not isinstance(entry, dict):
continue
hay = _metric_tokens(entry)
if not hay:
continue
score = 0
for t in set(q_tokens):
if t in hay:
score += 2 if t in (entry.get("panel_title") or "").lower() else 1
if score:
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
return [entry for _, entry in scored[:limit]]
def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]:
if not allow_tools:
return "", ""
lower = (prompt or "").lower()
if not any(word in lower for word in METRIC_HINT_WORDS):
return "", ""
matches = metrics_lookup(prompt, limit=1)
if not matches:
return "", ""
entry = matches[0]
exprs = entry.get("exprs") if isinstance(entry.get("exprs"), list) else []
if not exprs:
return "", ""
rendered_parts: list[str] = []
for expr in exprs[:2]:
res = vm_query(expr, timeout=20)
rendered = vm_render_result(res, limit=10)
if rendered:
rendered_parts.append(rendered)
if not rendered_parts:
return "", ""
dashboard = entry.get("dashboard") or "dashboard"
panel = entry.get("panel_title") or "panel"
summary = "\n".join(rendered_parts)
context = f"Metrics (from {dashboard} / {panel}):\n{summary}"
fallback = f"{panel}: {summary}"
return context, fallback
def jetson_nodes_from_kb() -> list[str]:
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
@ -777,6 +853,7 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str:
"Be helpful, direct, and concise. "
"Prefer answering with exact repo paths and Kubernetes resource names. "
"Never include or request secret values. "
"Do not suggest commands unless explicitly asked. "
"Respond in plain sentences; do not return JSON or code fences unless explicitly asked. "
"If the answer is not grounded in the provided context or tool data, say you do not know."
)
@ -820,7 +897,17 @@ def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *,
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
done.wait()
prompt_hint = " ".join((prompt or "").split())
if len(prompt_hint) > 160:
prompt_hint = prompt_hint[:157] + ""
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
if prompt_hint:
send_msg(token, room, f"Still thinking about: {prompt_hint} (gathering context)")
else:
send_msg(token, room, "Still thinking (gathering context)…")
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
@ -937,9 +1024,15 @@ def sync_loop(token: str, room_id: str):
rendered = vm_render_result(res, limit=15) or "(no results)"
extra = "VictoriaMetrics (PromQL result):\n" + rendered
context = (context + "\n\n" + extra).strip() if context else extra
metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_tools)
if metrics_context:
context = (context + "\n\n" + metrics_context).strip() if context else metrics_context
fallback = ""
if "node" in lower_body or "cluster" in lower_body:
fallback = node_inventory_answer("Atlas", lower_body)
if metrics_fallback and not fallback:
fallback = metrics_fallback
reply = ollama_reply_with_thinking(
token,
rid,