From c356abdec0d2e4717acb276fe97ee1aef02eae32 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 5 Feb 2026 12:27:11 -0300 Subject: [PATCH] atlasbot: add spine intent routing --- atlasbot/engine/answerer.py | 203 ++++++++++++++++++++++++++++--- atlasbot/engine/intent_router.py | 65 ++++++++++ atlasbot/llm/prompts.py | 13 ++ 3 files changed, 265 insertions(+), 16 deletions(-) create mode 100644 atlasbot/engine/intent_router.py diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py index 088d4a3..b05d60f 100644 --- a/atlasbot/engine/answerer.py +++ b/atlasbot/engine/answerer.py @@ -16,6 +16,7 @@ from atlasbot.llm.client import LLMClient, build_messages, parse_json from atlasbot.llm import prompts from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text from atlasbot.state.store import ClaimStore +from atlasbot.engine.intent_router import IntentMatch, route_intent log = logging.getLogger(__name__) @@ -194,6 +195,7 @@ class AnswerEngine: allowed_nodes = _allowed_nodes(summary) allowed_namespaces = _allowed_namespaces(summary) summary_lines = _summary_lines(snapshot_used) + spine = _spine_lines(summary_lines) metric_tokens = _metric_key_tokens(summary_lines) global_facts = _global_facts(summary_lines) kb_summary = self._kb.summary() @@ -247,11 +249,26 @@ class AnswerEngine: classify.setdefault("focus_metric", "unknown") if metric_tokens and keyword_tokens and any(token in metric_tokens for token in keyword_tokens): classify["needs_snapshot"] = True + intent = route_intent(normalized) + if intent: + classify["needs_snapshot"] = True + classify["question_type"] = "metric" _debug_log("route_parsed", {"classify": classify, "normalized": normalized}) lowered_question = f"{question} {normalized}".lower() force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", lowered_question)) if any(term in lowered_question for term in ("postgres", "connections", "pvc", "ready")): force_metric = True + + if intent: + spine_line = spine.get(intent.kind) + spine_answer = _spine_answer(intent, spine_line) + if spine_line: + key_facts = _merge_fact_lines([spine_line], key_facts) + metric_facts = _merge_fact_lines([spine_line], metric_facts) + if spine_answer and mode == "fast": + scores = _default_scores() + meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started) + return AnswerResult(spine_answer, scores, meta) cluster_terms = ( "atlas", "cluster", @@ -745,22 +762,34 @@ class AnswerEngine: if facts_used and _needs_evidence_guard(reply, facts_used): if observer: observer("evidence_guard", "tightening unsupported claims") - guard_prompt = ( - prompts.EVIDENCE_GUARD_PROMPT - + "\nQuestion: " - + normalized - + "\nDraft: " - + reply - + "\nFactsUsed:\n" - + "\n".join(facts_used) - ) - reply = await call_llm( - prompts.EVIDENCE_GUARD_SYSTEM, - guard_prompt, - context=context, - model=plan.model, - tag="evidence_guard", - ) + use_guard = True + if mode in {"smart", "genius"}: + decision = await _contradiction_decision( + call_llm, + normalized, + reply, + facts_used, + plan, + attempts=3 if mode == "genius" else 1, + ) + use_guard = decision.get("use_facts", True) + if use_guard: + guard_prompt = ( + prompts.EVIDENCE_GUARD_PROMPT + + "\nQuestion: " + + normalized + + "\nDraft: " + + reply + + "\nFactsUsed:\n" + + "\n".join(facts_used) + ) + reply = await call_llm( + prompts.EVIDENCE_GUARD_SYSTEM, + guard_prompt, + context=context, + model=plan.model, + tag="evidence_guard", + ) if _needs_focus_fix(normalized, reply, classify): if observer: @@ -1530,6 +1559,115 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]: return [line for line in text.splitlines() if line.strip()] +def _line_starting_with(lines: list[str], prefix: str) -> str | None: + if not lines: + return None + for line in lines: + if line.lower().startswith(prefix.lower()): + return line + return None + + +def _spine_lines(lines: list[str]) -> dict[str, str]: + spine: dict[str, str] = {} + nodes_line = _line_starting_with(lines, "nodes:") + if nodes_line: + spine["nodes_count"] = nodes_line + spine["nodes_ready"] = nodes_line + hardware_line = _line_starting_with(lines, "hardware_nodes:") + if hardware_line: + spine["nodes_non_rpi"] = hardware_line + hottest_line = _line_starting_with(lines, "hottest:") + if hottest_line: + spine["hottest_cpu"] = hottest_line + spine["hottest_ram"] = hottest_line + spine["hottest_net"] = hottest_line + spine["hottest_io"] = hottest_line + spine["hottest_disk"] = hottest_line + postgres_total = _line_starting_with(lines, "postgres_connections_total:") + if postgres_total: + spine["postgres_connections"] = postgres_total + postgres_line = _line_starting_with(lines, "postgres:") + if postgres_line: + spine["postgres_hottest"] = postgres_line + namespaces_top = _line_starting_with(lines, "namespaces_top:") + if namespaces_top: + spine["namespace_most_pods"] = namespaces_top + pressure_line = _line_starting_with(lines, "pressure_nodes:") + if pressure_line: + spine["pressure_summary"] = pressure_line + else: + load_line = _line_starting_with(lines, "node_load_top:") + if load_line: + spine["pressure_summary"] = load_line + return spine + + +def _parse_group_line(line: str) -> dict[str, list[str]]: + groups: dict[str, list[str]] = {} + if not line: + return groups + payload = line.split(":", 1)[1] if ":" in line else line + for part in payload.split(";"): + part = part.strip() + if not part or "=" not in part: + continue + key, value = part.split("=", 1) + nodes = [item.strip() for item in value.split(",") if item.strip()] + groups[key.strip()] = nodes + return groups + + +def _parse_hottest(line: str, metric: str) -> str | None: + if not line: + return None + payload = line.split(":", 1)[1] if ":" in line else line + for part in payload.split(";"): + part = part.strip() + if part.startswith(f"{metric}="): + return part + return None + + +def _spine_answer(intent: IntentMatch, spine_line: str | None) -> str | None: + if not spine_line: + return None + kind = intent.kind + if kind == "nodes_count": + return f"From the latest snapshot: {spine_line}." + if kind == "nodes_ready": + return f"From the latest snapshot: {spine_line}." + if kind == "nodes_non_rpi": + groups = _parse_group_line(spine_line) + non_rpi = [] + for key, nodes in groups.items(): + if key.lower().startswith("rpi"): + continue + non_rpi.extend(nodes) + if non_rpi: + return "Non‑Raspberry Pi nodes: " + ", ".join(non_rpi) + "." + return f"From the latest snapshot: {spine_line}." + if kind.startswith("hottest_"): + metric = kind.split("_", 1)[1] + hottest = _parse_hottest(spine_line, metric) + if hottest: + return f"From the latest snapshot: {hottest}." + return f"From the latest snapshot: {spine_line}." + if kind == "postgres_connections": + return f"From the latest snapshot: {spine_line}." + if kind == "postgres_hottest": + return f"From the latest snapshot: {spine_line}." + if kind == "namespace_most_pods": + payload = spine_line.split(":", 1)[1] if ":" in spine_line else spine_line + top = payload.split(";")[0].strip() + if top: + return f"Namespace with most pods: {top}." + return f"From the latest snapshot: {spine_line}." + if kind == "pressure_summary": + return f"From the latest snapshot: {spine_line}." + return f"From the latest snapshot: {spine_line}." + + async def _select_metric_chunks( call_llm: Callable[..., Awaitable[str]], ctx: dict[str, Any], @@ -2108,6 +2246,39 @@ def _needs_evidence_guard(reply: str, facts: list[str]) -> bool: return False +async def _contradiction_decision( + call_llm: Callable[..., Awaitable[str]], + question: str, + draft: str, + facts_used: list[str], + plan: "ModePlan", + attempts: int = 1, +) -> dict[str, Any]: + best = {"use_facts": True, "confidence": 50} + facts_block = "\n".join(facts_used[:12]) + for idx in range(max(1, attempts)): + variant = f"Variant: {idx + 1}" if attempts > 1 else "" + prompt = ( + prompts.CONTRADICTION_PROMPT.format(question=question, draft=draft, facts=facts_block) + + ("\n" + variant if variant else "") + ) + raw = await call_llm( + prompts.CONTRADICTION_SYSTEM, + prompt, + model=plan.fast_model, + tag="contradiction", + ) + data = _parse_json_block(raw, fallback={}) + try: + confidence = int(data.get("confidence", 50)) + except Exception: + confidence = 50 + use_facts = bool(data.get("use_facts", True)) + if confidence >= best.get("confidence", 0): + best = {"use_facts": use_facts, "confidence": confidence} + return best + + def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]: if not lines: return [] diff --git a/atlasbot/engine/intent_router.py b/atlasbot/engine/intent_router.py new file mode 100644 index 0000000..c343b08 --- /dev/null +++ b/atlasbot/engine/intent_router.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from dataclasses import dataclass +import re + + +@dataclass(frozen=True) +class IntentMatch: + kind: str + score: int + + +_COUNT_TERMS = r"(how many|count|number of|total|totals|tally|amount of|quantity|sum of|overall|in total|all up)" +_NODE_TERMS = r"(nodes?|workers?|worker nodes?|cluster nodes?|machines?|hosts?|members?|instances?)" +_READY_TERMS = r"(ready|unready|not ready|down|offline|not responding|missing)" +_HOTTEST_TERMS = r"(hottest|hot|highest|max(?:imum)?|peak|top|most|worst|spikiest|heaviest|largest)" +_CPU_TERMS = r"(cpu|processor|compute|core|cores|load|load avg|load average)" +_RAM_TERMS = r"(ram|memory|mem|heap)" +_NET_TERMS = r"(net|network|bandwidth|throughput|traffic|rx|tx|ingress|egress|bits|bytes)" +_IO_TERMS = r"(io|i/o|disk io|disk activity|read/write|read write|storage io|iops)" +_DISK_TERMS = r"(disk|storage|volume|pvc|filesystem|fs|capacity|space)" +_PG_TERMS = r"(postgres|postgresql|pg\\b|database|db|sql)" +_CONN_TERMS = r"(connections?|conn|pool|sessions?|clients?)" +_DB_HOT_TERMS = r"(hottest|busiest|most|largest|top|heaviest)" +_NAMESPACE_TERMS = r"(namespace|namespaces|ns\\b)" +_PODS_TERMS = r"(pods?|workloads?|tasks?|containers?)" +_NON_RPI_TERMS = r"(non[-\\s]?raspberry|not\\s+raspberry|non[-\\s]?rpi|not\\s+rpi|amd64|x86|x86_64|jetson)" +_PRESSURE_TERMS = r"(pressure|overload|hotspot|bottleneck|saturation|headroom|strain|stress)" + + +def route_intent(question: str) -> IntentMatch | None: + text = (question or "").lower() + if not text: + return None + + if re.search(_COUNT_TERMS, text) and re.search(_NODE_TERMS, text): + return IntentMatch("nodes_count", 90) + if re.search(_READY_TERMS, text) and re.search(_NODE_TERMS, text): + return IntentMatch("nodes_ready", 85) + if re.search(_NON_RPI_TERMS, text) and re.search(_NODE_TERMS, text): + return IntentMatch("nodes_non_rpi", 80) + + if re.search(_HOTTEST_TERMS, text) and re.search(_CPU_TERMS, text): + return IntentMatch("hottest_cpu", 80) + if re.search(_HOTTEST_TERMS, text) and re.search(_RAM_TERMS, text): + return IntentMatch("hottest_ram", 80) + if re.search(_HOTTEST_TERMS, text) and re.search(_NET_TERMS, text): + return IntentMatch("hottest_net", 80) + if re.search(_HOTTEST_TERMS, text) and re.search(_IO_TERMS, text): + return IntentMatch("hottest_io", 80) + if re.search(_HOTTEST_TERMS, text) and re.search(_DISK_TERMS, text): + return IntentMatch("hottest_disk", 80) + + if re.search(_PG_TERMS, text) and re.search(_CONN_TERMS, text): + return IntentMatch("postgres_connections", 80) + if re.search(_PG_TERMS, text) and re.search(_DB_HOT_TERMS, text): + return IntentMatch("postgres_hottest", 75) + + if re.search(_NAMESPACE_TERMS, text) and re.search(_PODS_TERMS, text): + return IntentMatch("namespace_most_pods", 75) + + if re.search(_PRESSURE_TERMS, text) and re.search(_NODE_TERMS, text): + return IntentMatch("pressure_summary", 70) + + return None diff --git a/atlasbot/llm/prompts.py b/atlasbot/llm/prompts.py index 844071e..d827719 100644 --- a/atlasbot/llm/prompts.py +++ b/atlasbot/llm/prompts.py @@ -231,6 +231,19 @@ DRAFT_SELECT_PROMPT = ( "Return JSON with field: best (1-based index)." ) +CONTRADICTION_SYSTEM = ( + CLUSTER_SYSTEM + + " Judge whether Draft or FactsUsed is more correct. " + + "Return JSON only." +) + +CONTRADICTION_PROMPT = ( + "Question: {question}\n" + "Draft: {draft}\n" + "FactsUsed:\n{facts}\n\n" + "Return JSON: {\"use_facts\": true|false, \"confidence\": 0-100, \"reason\": \"...\"}" +) + CANDIDATE_SELECT_SYSTEM = ( CLUSTER_SYSTEM + " Pick the best candidate for accuracy and evidence use. "