From a442ea6d5d9abae9b8397bad6b3e7db8e6151881 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 11:03:55 -0300 Subject: [PATCH] atlasbot: strengthen facts context and replies --- services/comms/scripts/atlasbot/bot.py | 91 +++++++++++++++++++------- 1 file changed, 68 insertions(+), 23 deletions(-) diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 9e8e0dd..e0056f8 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -34,6 +34,7 @@ SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000")) +MAX_CONTEXT_CHARS = int(os.environ.get("ATLASBOT_MAX_CONTEXT_CHARS", "12000")) THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120")) OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2")) OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false" @@ -100,6 +101,7 @@ CODE_FENCE_RE = re.compile(r"^```(?:json)?\s*(.*?)\s*```$", re.DOTALL) TITAN_NODE_RE = re.compile(r"\btitan-[0-9a-z]{2}\b", re.IGNORECASE) TITAN_RANGE_RE = re.compile(r"\btitan-([0-9a-z]{2})/([0-9a-z]{2})\b", re.IGNORECASE) _DASH_CHARS = "\u2010\u2011\u2012\u2013\u2014\u2015\u2212\uFE63\uFF0D" +CONFIDENCE_RE = re.compile(r"confidence\s*:\s*(high|medium|low)\b", re.IGNORECASE) OPERATION_HINTS = { "count": ("how many", "count", "number", "total"), @@ -139,6 +141,20 @@ def _tokens(text: str) -> list[str]: return [t for t in toks if t not in STOPWORDS and len(t) >= 2] +def _ensure_confidence(text: str) -> str: + if not text: + return "" + lines = text.strip().splitlines() + for idx, line in enumerate(lines): + match = CONFIDENCE_RE.search(line) + if match: + level = match.group(1).lower() + lines[idx] = f"Confidence: {level}" + return "\n".join(lines) + lines.append("Confidence: medium") + return "\n".join(lines) + + # Mention detection (Matrix rich mentions + plain @atlas). MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] @@ -710,6 +726,7 @@ def facts_context( workloads: list[dict[str, Any]] | None, ) -> str: inv = inventory or [] + nodes_in_query = _extract_titan_nodes(prompt) metrics = _snapshot_metrics(snapshot) nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {} summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {} @@ -721,6 +738,12 @@ def facts_context( not_ready_names = summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names") by_hardware = _group_nodes(inv) if inv else {} by_arch = _nodes_by_arch(inv) if inv else {} + control_plane_nodes = [ + node["name"] + for node in inv + if any(role in ("control-plane", "master") for role in (node.get("roles") or [])) + ] + worker_nodes = [node["name"] for node in inv if node.get("is_worker") is True] lines: list[str] = ["Facts (live snapshot):"] if total is not None: @@ -731,9 +754,16 @@ def facts_context( nodes_list = by_hardware.get(key) or [] if nodes_list: lines.append(f"- {key}: {', '.join(nodes_list)}") + non_rpi = sorted(set(by_hardware.get("jetson", [])) | set(by_hardware.get("amd64", []))) + if non_rpi: + lines.append(f"- non_raspberry_pi: {', '.join(non_rpi)}") for key, nodes_list in sorted(by_arch.items()): if nodes_list: lines.append(f"- arch {key}: {', '.join(nodes_list)}") + if control_plane_nodes: + lines.append(f"- control_plane_nodes: {', '.join(control_plane_nodes)}") + if worker_nodes: + lines.append(f"- worker_nodes: {', '.join(worker_nodes)}") if ready_workers or not_ready_workers: lines.append(f"- workers_ready: {', '.join(ready_workers) if ready_workers else 'none'}") if not_ready_workers: @@ -753,7 +783,8 @@ def facts_context( node = entry.get("node") value = entry.get("value") if node and value is not None: - lines.append(f"- hottest_{key}: {node} ({value})") + value_fmt = _format_metric_value(str(value), percent=key in ("cpu", "ram")) + lines.append(f"- hottest_{key}: {node} ({value_fmt})") postgres = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else {} if isinstance(postgres, dict) and postgres: @@ -774,12 +805,25 @@ def facts_context( node = entry.get("node") if not node: continue - cpu = entry.get("cpu") - ram = entry.get("ram") - net = entry.get("net") - io_val = entry.get("io") + cpu = _format_metric_value(str(entry.get("cpu")), percent=True) if entry.get("cpu") is not None else "" + ram = _format_metric_value(str(entry.get("ram")), percent=True) if entry.get("ram") is not None else "" + net = _format_metric_value(str(entry.get("net")), percent=False) if entry.get("net") is not None else "" + io_val = _format_metric_value(str(entry.get("io")), percent=False) if entry.get("io") is not None else "" lines.append(f" - {node}: cpu={cpu}, ram={ram}, net={net}, io={io_val}") + if nodes_in_query: + lines.append("- node_details:") + for name in nodes_in_query: + detail = next((n for n in inv if n.get("name") == name), None) + if not detail: + lines.append(f" - {name}: not found in snapshot") + continue + roles = ",".join(detail.get("roles") or []) or "none" + lines.append( + f" - {name}: hardware={detail.get('hardware')}, arch={detail.get('arch')}, " + f"ready={detail.get('ready')}, roles={roles}" + ) + workload_entries = _workloads_for_prompt(prompt, workloads or []) if workload_entries: lines.append("- workloads:") @@ -1181,11 +1225,10 @@ def metrics_query_context(prompt: str, *, allow_tools: bool) -> tuple[str, str]: if rendered: rendered_parts.append(rendered) if not rendered_parts: - return "", f"{panel}: matched dashboard panel but VictoriaMetrics did not return data." + return "", "" summary = "\n".join(rendered_parts) context = f"Metrics (from {dashboard} / {panel}):\n{summary}" - fallback = _metrics_fallback_summary(panel, summary) - return context, fallback + return context, "" def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: q = (query or "").strip() @@ -1574,8 +1617,8 @@ def _normalize_reply(value: Any) -> str: try: return _normalize_reply(json.loads(text)) except Exception: - return text - return text + return _ensure_confidence(text) + return _ensure_confidence(text) # Internal HTTP endpoint for cluster answers (website uses this). @@ -1634,10 +1677,10 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): snapshot=snapshot, workloads=workloads, ) - metrics_context, metrics_fallback = metrics_query_context(prompt, allow_tools=True) + metrics_context, _metrics_fallback = metrics_query_context(prompt, allow_tools=True) if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context - fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that." + fallback = "I don't have enough data to answer that." answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback) self._write_json(200, {"answer": answer}) @@ -1665,19 +1708,19 @@ def build_context( ) -> str: parts: list[str] = [] - kb = kb_retrieve(prompt) - if not kb and _knowledge_intent(prompt): - kb = kb_retrieve_titles(prompt, limit=4) - if kb: - parts.append(kb) + facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) + if facts: + parts.append(facts) endpoints, edges = catalog_hints(prompt) if endpoints: parts.append(endpoints) - facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) - if facts: - parts.append(facts) + kb = kb_retrieve(prompt) + if not kb and _knowledge_intent(prompt): + kb = kb_retrieve_titles(prompt, limit=4) + if kb: + parts.append(kb) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. @@ -1789,12 +1832,14 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " + "Translate metrics into natural language instead of echoing raw label/value pairs. " + "Do not answer by only listing runbooks; summarize the cluster first and mention docs only if useful. " "If the answer is not grounded in the provided context or tool data, say you do not know. " "End every response with a line: 'Confidence: high|medium|low'." ) transcript_parts = [system] if context: - transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) + transcript_parts.append("Context (grounded):\n" + context[:MAX_CONTEXT_CHARS]) transcript_parts.extend(history[hist_key][-24:]) transcript_parts.append(f"User: {prompt}") transcript = "\n".join(transcript_parts) @@ -1950,11 +1995,11 @@ def sync_loop(token: str, room_id: str): rendered = vm_render_result(res, limit=15) or "(no results)" extra = "VictoriaMetrics (PromQL result):\n" + rendered context = (context + "\n\n" + extra).strip() if context else extra - metrics_context, metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics) + metrics_context, _metrics_fallback = metrics_query_context(body, allow_tools=allow_metrics) if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context - fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that." + fallback = "I don't have enough data to answer that." reply = ollama_reply_with_thinking( token,