From c6b25c27c132905568af0c22fd6f2d4db77d36eb Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 5 Feb 2026 12:55:33 -0300 Subject: [PATCH] atlasbot: refactor spine and intent routing --- atlasbot/engine/answerer.py | 171 +++++++++++++++++++------------ atlasbot/engine/intent_router.py | 51 ++++----- 2 files changed, 134 insertions(+), 88 deletions(-) diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py index 48e78db..ebd1310 100644 --- a/atlasbot/engine/answerer.py +++ b/atlasbot/engine/answerer.py @@ -58,6 +58,15 @@ class InsightGuardInput: facts: list[str] +@dataclass +class ContradictionContext: + call_llm: Callable[..., Awaitable[str]] + question: str + reply: str + facts: list[str] + plan: "ModePlan" + + @dataclass class EvidenceItem: path: str @@ -768,11 +777,7 @@ class AnswerEngine: use_guard = True if mode in {"smart", "genius"}: decision = await _contradiction_decision( - call_llm, - normalized, - reply, - facts_used, - plan, + ContradictionContext(call_llm, normalized, reply, facts_used, plan), attempts=3 if mode == "genius" else 1, ) use_guard = decision.get("use_facts", True) @@ -1616,46 +1621,68 @@ def _line_starting_with(lines: list[str], prefix: str) -> str | None: def _spine_lines(lines: list[str]) -> dict[str, str]: spine: dict[str, str] = {} + _spine_nodes(lines, spine) + _spine_hardware(lines, spine) + _spine_hottest(lines, spine) + _spine_postgres(lines, spine) + _spine_namespaces(lines, spine) + _spine_pressure(lines, spine) + return spine + + +def _spine_nodes(lines: list[str], spine: dict[str, str]) -> None: nodes_line = _line_starting_with(lines, "nodes:") if nodes_line: spine["nodes_count"] = nodes_line spine["nodes_ready"] = nodes_line - else: - nodes_total = _line_starting_with(lines, "nodes_total:") - nodes_ready = _line_starting_with(lines, "nodes_ready:") - if nodes_total: - spine["nodes_count"] = nodes_total - if nodes_ready: - spine["nodes_ready"] = nodes_ready + return + nodes_total = _line_starting_with(lines, "nodes_total:") + nodes_ready = _line_starting_with(lines, "nodes_ready:") + if nodes_total: + spine["nodes_count"] = nodes_total + if nodes_ready: + spine["nodes_ready"] = nodes_ready + + +def _spine_hardware(lines: list[str], spine: dict[str, str]) -> None: hardware_line = _line_starting_with(lines, "hardware_nodes:") if not hardware_line: hardware_line = _line_starting_with(lines, "hardware:") if hardware_line: spine["nodes_non_rpi"] = hardware_line + + +def _spine_hottest(lines: list[str], spine: dict[str, str]) -> None: hottest_line = _line_starting_with(lines, "hottest:") - if hottest_line: - spine["hottest_cpu"] = hottest_line - spine["hottest_ram"] = hottest_line - spine["hottest_net"] = hottest_line - spine["hottest_io"] = hottest_line - spine["hottest_disk"] = hottest_line + if not hottest_line: + return + for key in ("hottest_cpu", "hottest_ram", "hottest_net", "hottest_io", "hottest_disk"): + spine[key] = hottest_line + + +def _spine_postgres(lines: list[str], spine: dict[str, str]) -> None: postgres_total = _line_starting_with(lines, "postgres_connections_total:") if postgres_total: spine["postgres_connections"] = postgres_total postgres_line = _line_starting_with(lines, "postgres:") if postgres_line: spine["postgres_hottest"] = postgres_line + + +def _spine_namespaces(lines: list[str], spine: dict[str, str]) -> None: namespaces_top = _line_starting_with(lines, "namespaces_top:") if namespaces_top: spine["namespace_most_pods"] = namespaces_top + + +def _spine_pressure(lines: list[str], spine: dict[str, str]) -> None: pressure_line = _line_starting_with(lines, "pressure_nodes:") if pressure_line: spine["pressure_summary"] = pressure_line - else: - load_line = _line_starting_with(lines, "node_load_top:") - if load_line: - spine["pressure_summary"] = load_line - return spine + return + load_line = _line_starting_with(lines, "node_load_top:") + if load_line: + spine["pressure_summary"] = load_line def _parse_group_line(line: str) -> dict[str, list[str]]: @@ -1694,42 +1721,64 @@ def _parse_hottest(line: str, metric: str) -> str | None: def _spine_answer(intent: IntentMatch, spine_line: str | None) -> str | None: if not spine_line: return None + handlers = { + "nodes_count": _spine_nodes_answer, + "nodes_ready": _spine_nodes_answer, + "nodes_non_rpi": _spine_non_rpi_answer, + "postgres_connections": _spine_postgres_answer, + "postgres_hottest": _spine_postgres_answer, + "namespace_most_pods": _spine_namespace_answer, + "pressure_summary": _spine_pressure_answer, + } kind = intent.kind - if kind == "nodes_count": - return f"From the latest snapshot: {spine_line}." - if kind == "nodes_ready": - return f"From the latest snapshot: {spine_line}." - if kind == "nodes_non_rpi": - groups = _parse_group_line(spine_line) - non_rpi = [] - for key, nodes in groups.items(): - if key.lower().startswith("rpi"): - continue - non_rpi.extend(nodes) - if non_rpi: - return "Non‑Raspberry Pi nodes: " + ", ".join(non_rpi) + "." - return f"From the latest snapshot: {spine_line}." if kind.startswith("hottest_"): - metric = kind.split("_", 1)[1] - hottest = _parse_hottest(spine_line, metric) - if hottest: - return f"From the latest snapshot: {hottest}." - return f"From the latest snapshot: {spine_line}." - if kind == "postgres_connections": - return f"From the latest snapshot: {spine_line}." - if kind == "postgres_hottest": - return f"From the latest snapshot: {spine_line}." - if kind == "namespace_most_pods": - payload = spine_line.split(":", 1)[1] if ":" in spine_line else spine_line - top = payload.split(";")[0].strip() - if top: - return f"Namespace with most pods: {top}." - return f"From the latest snapshot: {spine_line}." - if kind == "pressure_summary": - return f"From the latest snapshot: {spine_line}." + return _spine_hottest_answer(kind, spine_line) + handler = handlers.get(kind) + if handler: + return handler(spine_line) return f"From the latest snapshot: {spine_line}." +def _spine_nodes_answer(line: str) -> str: + return f"From the latest snapshot: {line}." + + +def _spine_non_rpi_answer(line: str) -> str: + groups = _parse_group_line(line) + non_rpi: list[str] = [] + for key, nodes in groups.items(): + if key.lower().startswith("rpi"): + continue + non_rpi.extend(nodes) + if non_rpi: + return "Non‑Raspberry Pi nodes: " + ", ".join(non_rpi) + "." + return f"From the latest snapshot: {line}." + + +def _spine_hottest_answer(kind: str, line: str) -> str: + metric = kind.split("_", 1)[1] + hottest = _parse_hottest(line, metric) + if hottest: + return f"From the latest snapshot: {hottest}." + return f"From the latest snapshot: {line}." + + +def _spine_postgres_answer(line: str) -> str: + return f"From the latest snapshot: {line}." + + +def _spine_namespace_answer(line: str) -> str: + payload = line.split(":", 1)[1] if ":" in line else line + top = payload.split(";")[0].strip() + if top: + return f"Namespace with most pods: {top}." + return f"From the latest snapshot: {line}." + + +def _spine_pressure_answer(line: str) -> str: + return f"From the latest snapshot: {line}." + + async def _select_metric_chunks( call_llm: Callable[..., Awaitable[str]], ctx: dict[str, Any], @@ -2309,25 +2358,21 @@ def _needs_evidence_guard(reply: str, facts: list[str]) -> bool: async def _contradiction_decision( - call_llm: Callable[..., Awaitable[str]], - question: str, - draft: str, - facts_used: list[str], - plan: "ModePlan", + ctx: ContradictionContext, attempts: int = 1, ) -> dict[str, Any]: best = {"use_facts": True, "confidence": 50} - facts_block = "\n".join(facts_used[:12]) + facts_block = "\n".join(ctx.facts[:12]) for idx in range(max(1, attempts)): variant = f"Variant: {idx + 1}" if attempts > 1 else "" prompt = ( - prompts.CONTRADICTION_PROMPT.format(question=question, draft=draft, facts=facts_block) + prompts.CONTRADICTION_PROMPT.format(question=ctx.question, draft=ctx.reply, facts=facts_block) + ("\n" + variant if variant else "") ) - raw = await call_llm( + raw = await ctx.call_llm( prompts.CONTRADICTION_SYSTEM, prompt, - model=plan.fast_model, + model=ctx.plan.fast_model, tag="contradiction", ) data = _parse_json_block(raw, fallback={}) diff --git a/atlasbot/engine/intent_router.py b/atlasbot/engine/intent_router.py index 50175e2..feae734 100644 --- a/atlasbot/engine/intent_router.py +++ b/atlasbot/engine/intent_router.py @@ -33,33 +33,34 @@ def route_intent(question: str) -> IntentMatch | None: if not text: return None - if re.search(_COUNT_TERMS, text) and (re.search(_NODE_TERMS, text) or "cluster" in text): - return IntentMatch("nodes_count", 90) - if re.search(_READY_TERMS, text) and (re.search(_NODE_TERMS, text) or "cluster" in text or "workers" in text): - return IntentMatch("nodes_ready", 85) - if re.search(_NON_RPI_TERMS, text) and (re.search(_NODE_TERMS, text) or "cluster" in text): - return IntentMatch("nodes_non_rpi", 80) + def _has(pattern: str) -> bool: + return bool(re.search(pattern, text)) - if re.search(_HOTTEST_TERMS, text) and re.search(_CPU_TERMS, text): - return IntentMatch("hottest_cpu", 80) - if re.search(_HOTTEST_TERMS, text) and re.search(_RAM_TERMS, text): - return IntentMatch("hottest_ram", 80) - if re.search(_HOTTEST_TERMS, text) and re.search(_NET_TERMS, text): - return IntentMatch("hottest_net", 80) - if re.search(_HOTTEST_TERMS, text) and re.search(_IO_TERMS, text): - return IntentMatch("hottest_io", 80) - if re.search(_HOTTEST_TERMS, text) and re.search(_DISK_TERMS, text): - return IntentMatch("hottest_disk", 80) + def _all(*patterns: str) -> bool: + return all(_has(pat) for pat in patterns) - if re.search(_PG_TERMS, text) and re.search(_CONN_TERMS, text): - return IntentMatch("postgres_connections", 80) - if re.search(_PG_TERMS, text) and re.search(_DB_HOT_TERMS, text): - return IntentMatch("postgres_hottest", 75) + def _any(*patterns: str) -> bool: + return any(_has(pat) for pat in patterns) - if re.search(_NAMESPACE_TERMS, text) and re.search(_PODS_TERMS, text): - return IntentMatch("namespace_most_pods", 75) - - if re.search(_PRESSURE_TERMS, text) and re.search(_NODE_TERMS, text): - return IntentMatch("pressure_summary", 70) + intents = [ + (lambda: _all(_COUNT_TERMS) and (_has(_NODE_TERMS) or "cluster" in text), IntentMatch("nodes_count", 90)), + ( + lambda: _all(_READY_TERMS) and (_any(_NODE_TERMS) or "cluster" in text or "workers" in text), + IntentMatch("nodes_ready", 85), + ), + (lambda: _all(_NON_RPI_TERMS) and (_any(_NODE_TERMS) or "cluster" in text), IntentMatch("nodes_non_rpi", 80)), + (lambda: _all(_HOTTEST_TERMS, _CPU_TERMS), IntentMatch("hottest_cpu", 80)), + (lambda: _all(_HOTTEST_TERMS, _RAM_TERMS), IntentMatch("hottest_ram", 80)), + (lambda: _all(_HOTTEST_TERMS, _NET_TERMS), IntentMatch("hottest_net", 80)), + (lambda: _all(_HOTTEST_TERMS, _IO_TERMS), IntentMatch("hottest_io", 80)), + (lambda: _all(_HOTTEST_TERMS, _DISK_TERMS), IntentMatch("hottest_disk", 80)), + (lambda: _all(_PG_TERMS, _CONN_TERMS), IntentMatch("postgres_connections", 80)), + (lambda: _all(_PG_TERMS, _DB_HOT_TERMS), IntentMatch("postgres_hottest", 75)), + (lambda: _all(_NAMESPACE_TERMS, _PODS_TERMS), IntentMatch("namespace_most_pods", 75)), + (lambda: _all(_PRESSURE_TERMS, _NODE_TERMS), IntentMatch("pressure_summary", 70)), + ] + for predicate, result in intents: + if predicate(): + return result return None