atlasbot: add spine intent routing

This commit is contained in:
Brad Stein 2026-02-05 12:27:11 -03:00
parent 7174f42895
commit c356abdec0
3 changed files with 265 additions and 16 deletions

View File

@ -16,6 +16,7 @@ from atlasbot.llm.client import LLMClient, build_messages, parse_json
from atlasbot.llm import prompts from atlasbot.llm import prompts
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
from atlasbot.state.store import ClaimStore from atlasbot.state.store import ClaimStore
from atlasbot.engine.intent_router import IntentMatch, route_intent
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -194,6 +195,7 @@ class AnswerEngine:
allowed_nodes = _allowed_nodes(summary) allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary) allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used) summary_lines = _summary_lines(snapshot_used)
spine = _spine_lines(summary_lines)
metric_tokens = _metric_key_tokens(summary_lines) metric_tokens = _metric_key_tokens(summary_lines)
global_facts = _global_facts(summary_lines) global_facts = _global_facts(summary_lines)
kb_summary = self._kb.summary() kb_summary = self._kb.summary()
@ -247,11 +249,26 @@ class AnswerEngine:
classify.setdefault("focus_metric", "unknown") classify.setdefault("focus_metric", "unknown")
if metric_tokens and keyword_tokens and any(token in metric_tokens for token in keyword_tokens): if metric_tokens and keyword_tokens and any(token in metric_tokens for token in keyword_tokens):
classify["needs_snapshot"] = True classify["needs_snapshot"] = True
intent = route_intent(normalized)
if intent:
classify["needs_snapshot"] = True
classify["question_type"] = "metric"
_debug_log("route_parsed", {"classify": classify, "normalized": normalized}) _debug_log("route_parsed", {"classify": classify, "normalized": normalized})
lowered_question = f"{question} {normalized}".lower() lowered_question = f"{question} {normalized}".lower()
force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", lowered_question)) force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", lowered_question))
if any(term in lowered_question for term in ("postgres", "connections", "pvc", "ready")): if any(term in lowered_question for term in ("postgres", "connections", "pvc", "ready")):
force_metric = True force_metric = True
if intent:
spine_line = spine.get(intent.kind)
spine_answer = _spine_answer(intent, spine_line)
if spine_line:
key_facts = _merge_fact_lines([spine_line], key_facts)
metric_facts = _merge_fact_lines([spine_line], metric_facts)
if spine_answer and mode == "fast":
scores = _default_scores()
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(spine_answer, scores, meta)
cluster_terms = ( cluster_terms = (
"atlas", "atlas",
"cluster", "cluster",
@ -745,22 +762,34 @@ class AnswerEngine:
if facts_used and _needs_evidence_guard(reply, facts_used): if facts_used and _needs_evidence_guard(reply, facts_used):
if observer: if observer:
observer("evidence_guard", "tightening unsupported claims") observer("evidence_guard", "tightening unsupported claims")
guard_prompt = ( use_guard = True
prompts.EVIDENCE_GUARD_PROMPT if mode in {"smart", "genius"}:
+ "\nQuestion: " decision = await _contradiction_decision(
+ normalized call_llm,
+ "\nDraft: " normalized,
+ reply reply,
+ "\nFactsUsed:\n" facts_used,
+ "\n".join(facts_used) plan,
) attempts=3 if mode == "genius" else 1,
reply = await call_llm( )
prompts.EVIDENCE_GUARD_SYSTEM, use_guard = decision.get("use_facts", True)
guard_prompt, if use_guard:
context=context, guard_prompt = (
model=plan.model, prompts.EVIDENCE_GUARD_PROMPT
tag="evidence_guard", + "\nQuestion: "
) + normalized
+ "\nDraft: "
+ reply
+ "\nFactsUsed:\n"
+ "\n".join(facts_used)
)
reply = await call_llm(
prompts.EVIDENCE_GUARD_SYSTEM,
guard_prompt,
context=context,
model=plan.model,
tag="evidence_guard",
)
if _needs_focus_fix(normalized, reply, classify): if _needs_focus_fix(normalized, reply, classify):
if observer: if observer:
@ -1530,6 +1559,115 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
return [line for line in text.splitlines() if line.strip()] return [line for line in text.splitlines() if line.strip()]
def _line_starting_with(lines: list[str], prefix: str) -> str | None:
if not lines:
return None
for line in lines:
if line.lower().startswith(prefix.lower()):
return line
return None
def _spine_lines(lines: list[str]) -> dict[str, str]:
spine: dict[str, str] = {}
nodes_line = _line_starting_with(lines, "nodes:")
if nodes_line:
spine["nodes_count"] = nodes_line
spine["nodes_ready"] = nodes_line
hardware_line = _line_starting_with(lines, "hardware_nodes:")
if hardware_line:
spine["nodes_non_rpi"] = hardware_line
hottest_line = _line_starting_with(lines, "hottest:")
if hottest_line:
spine["hottest_cpu"] = hottest_line
spine["hottest_ram"] = hottest_line
spine["hottest_net"] = hottest_line
spine["hottest_io"] = hottest_line
spine["hottest_disk"] = hottest_line
postgres_total = _line_starting_with(lines, "postgres_connections_total:")
if postgres_total:
spine["postgres_connections"] = postgres_total
postgres_line = _line_starting_with(lines, "postgres:")
if postgres_line:
spine["postgres_hottest"] = postgres_line
namespaces_top = _line_starting_with(lines, "namespaces_top:")
if namespaces_top:
spine["namespace_most_pods"] = namespaces_top
pressure_line = _line_starting_with(lines, "pressure_nodes:")
if pressure_line:
spine["pressure_summary"] = pressure_line
else:
load_line = _line_starting_with(lines, "node_load_top:")
if load_line:
spine["pressure_summary"] = load_line
return spine
def _parse_group_line(line: str) -> dict[str, list[str]]:
groups: dict[str, list[str]] = {}
if not line:
return groups
payload = line.split(":", 1)[1] if ":" in line else line
for part in payload.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
key, value = part.split("=", 1)
nodes = [item.strip() for item in value.split(",") if item.strip()]
groups[key.strip()] = nodes
return groups
def _parse_hottest(line: str, metric: str) -> str | None:
if not line:
return None
payload = line.split(":", 1)[1] if ":" in line else line
for part in payload.split(";"):
part = part.strip()
if part.startswith(f"{metric}="):
return part
return None
def _spine_answer(intent: IntentMatch, spine_line: str | None) -> str | None:
if not spine_line:
return None
kind = intent.kind
if kind == "nodes_count":
return f"From the latest snapshot: {spine_line}."
if kind == "nodes_ready":
return f"From the latest snapshot: {spine_line}."
if kind == "nodes_non_rpi":
groups = _parse_group_line(spine_line)
non_rpi = []
for key, nodes in groups.items():
if key.lower().startswith("rpi"):
continue
non_rpi.extend(nodes)
if non_rpi:
return "NonRaspberry Pi nodes: " + ", ".join(non_rpi) + "."
return f"From the latest snapshot: {spine_line}."
if kind.startswith("hottest_"):
metric = kind.split("_", 1)[1]
hottest = _parse_hottest(spine_line, metric)
if hottest:
return f"From the latest snapshot: {hottest}."
return f"From the latest snapshot: {spine_line}."
if kind == "postgres_connections":
return f"From the latest snapshot: {spine_line}."
if kind == "postgres_hottest":
return f"From the latest snapshot: {spine_line}."
if kind == "namespace_most_pods":
payload = spine_line.split(":", 1)[1] if ":" in spine_line else spine_line
top = payload.split(";")[0].strip()
if top:
return f"Namespace with most pods: {top}."
return f"From the latest snapshot: {spine_line}."
if kind == "pressure_summary":
return f"From the latest snapshot: {spine_line}."
return f"From the latest snapshot: {spine_line}."
async def _select_metric_chunks( async def _select_metric_chunks(
call_llm: Callable[..., Awaitable[str]], call_llm: Callable[..., Awaitable[str]],
ctx: dict[str, Any], ctx: dict[str, Any],
@ -2108,6 +2246,39 @@ def _needs_evidence_guard(reply: str, facts: list[str]) -> bool:
return False return False
async def _contradiction_decision(
call_llm: Callable[..., Awaitable[str]],
question: str,
draft: str,
facts_used: list[str],
plan: "ModePlan",
attempts: int = 1,
) -> dict[str, Any]:
best = {"use_facts": True, "confidence": 50}
facts_block = "\n".join(facts_used[:12])
for idx in range(max(1, attempts)):
variant = f"Variant: {idx + 1}" if attempts > 1 else ""
prompt = (
prompts.CONTRADICTION_PROMPT.format(question=question, draft=draft, facts=facts_block)
+ ("\n" + variant if variant else "")
)
raw = await call_llm(
prompts.CONTRADICTION_SYSTEM,
prompt,
model=plan.fast_model,
tag="contradiction",
)
data = _parse_json_block(raw, fallback={})
try:
confidence = int(data.get("confidence", 50))
except Exception:
confidence = 50
use_facts = bool(data.get("use_facts", True))
if confidence >= best.get("confidence", 0):
best = {"use_facts": use_facts, "confidence": confidence}
return best
def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]: def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]:
if not lines: if not lines:
return [] return []

View File

@ -0,0 +1,65 @@
from __future__ import annotations
from dataclasses import dataclass
import re
@dataclass(frozen=True)
class IntentMatch:
kind: str
score: int
_COUNT_TERMS = r"(how many|count|number of|total|totals|tally|amount of|quantity|sum of|overall|in total|all up)"
_NODE_TERMS = r"(nodes?|workers?|worker nodes?|cluster nodes?|machines?|hosts?|members?|instances?)"
_READY_TERMS = r"(ready|unready|not ready|down|offline|not responding|missing)"
_HOTTEST_TERMS = r"(hottest|hot|highest|max(?:imum)?|peak|top|most|worst|spikiest|heaviest|largest)"
_CPU_TERMS = r"(cpu|processor|compute|core|cores|load|load avg|load average)"
_RAM_TERMS = r"(ram|memory|mem|heap)"
_NET_TERMS = r"(net|network|bandwidth|throughput|traffic|rx|tx|ingress|egress|bits|bytes)"
_IO_TERMS = r"(io|i/o|disk io|disk activity|read/write|read write|storage io|iops)"
_DISK_TERMS = r"(disk|storage|volume|pvc|filesystem|fs|capacity|space)"
_PG_TERMS = r"(postgres|postgresql|pg\\b|database|db|sql)"
_CONN_TERMS = r"(connections?|conn|pool|sessions?|clients?)"
_DB_HOT_TERMS = r"(hottest|busiest|most|largest|top|heaviest)"
_NAMESPACE_TERMS = r"(namespace|namespaces|ns\\b)"
_PODS_TERMS = r"(pods?|workloads?|tasks?|containers?)"
_NON_RPI_TERMS = r"(non[-\\s]?raspberry|not\\s+raspberry|non[-\\s]?rpi|not\\s+rpi|amd64|x86|x86_64|jetson)"
_PRESSURE_TERMS = r"(pressure|overload|hotspot|bottleneck|saturation|headroom|strain|stress)"
def route_intent(question: str) -> IntentMatch | None:
text = (question or "").lower()
if not text:
return None
if re.search(_COUNT_TERMS, text) and re.search(_NODE_TERMS, text):
return IntentMatch("nodes_count", 90)
if re.search(_READY_TERMS, text) and re.search(_NODE_TERMS, text):
return IntentMatch("nodes_ready", 85)
if re.search(_NON_RPI_TERMS, text) and re.search(_NODE_TERMS, text):
return IntentMatch("nodes_non_rpi", 80)
if re.search(_HOTTEST_TERMS, text) and re.search(_CPU_TERMS, text):
return IntentMatch("hottest_cpu", 80)
if re.search(_HOTTEST_TERMS, text) and re.search(_RAM_TERMS, text):
return IntentMatch("hottest_ram", 80)
if re.search(_HOTTEST_TERMS, text) and re.search(_NET_TERMS, text):
return IntentMatch("hottest_net", 80)
if re.search(_HOTTEST_TERMS, text) and re.search(_IO_TERMS, text):
return IntentMatch("hottest_io", 80)
if re.search(_HOTTEST_TERMS, text) and re.search(_DISK_TERMS, text):
return IntentMatch("hottest_disk", 80)
if re.search(_PG_TERMS, text) and re.search(_CONN_TERMS, text):
return IntentMatch("postgres_connections", 80)
if re.search(_PG_TERMS, text) and re.search(_DB_HOT_TERMS, text):
return IntentMatch("postgres_hottest", 75)
if re.search(_NAMESPACE_TERMS, text) and re.search(_PODS_TERMS, text):
return IntentMatch("namespace_most_pods", 75)
if re.search(_PRESSURE_TERMS, text) and re.search(_NODE_TERMS, text):
return IntentMatch("pressure_summary", 70)
return None

View File

@ -231,6 +231,19 @@ DRAFT_SELECT_PROMPT = (
"Return JSON with field: best (1-based index)." "Return JSON with field: best (1-based index)."
) )
CONTRADICTION_SYSTEM = (
CLUSTER_SYSTEM
+ " Judge whether Draft or FactsUsed is more correct. "
+ "Return JSON only."
)
CONTRADICTION_PROMPT = (
"Question: {question}\n"
"Draft: {draft}\n"
"FactsUsed:\n{facts}\n\n"
"Return JSON: {\"use_facts\": true|false, \"confidence\": 0-100, \"reason\": \"...\"}"
)
CANDIDATE_SELECT_SYSTEM = ( CANDIDATE_SELECT_SYSTEM = (
CLUSTER_SYSTEM CLUSTER_SYSTEM
+ " Pick the best candidate for accuracy and evidence use. " + " Pick the best candidate for accuracy and evidence use. "