atlasbot/atlasbot/engine/answerer.py

1911 lines
76 KiB
Python

import asyncio
import json
import logging
import math
import re
import time
import difflib
from dataclasses import dataclass
from typing import Any, Callable
from atlasbot.config import Settings
from atlasbot.knowledge.loader import KnowledgeBase
from atlasbot.llm.client import LLMClient, build_messages, parse_json
from atlasbot.llm import prompts
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
from atlasbot.state.store import ClaimStore
log = logging.getLogger(__name__)
class LLMLimitReached(RuntimeError):
pass
@dataclass
class AnswerScores:
confidence: int
relevance: int
satisfaction: int
hallucination_risk: str
@dataclass
class AnswerResult:
reply: str
scores: AnswerScores
meta: dict[str, Any]
@dataclass
class EvidenceItem:
path: str
reason: str
value: Any | None = None
value_at_claim: Any | None = None
@dataclass
class ClaimItem:
id: str
claim: str
evidence: list[EvidenceItem]
@dataclass
class ConversationState:
updated_at: float
claims: list[ClaimItem]
snapshot_id: str | None = None
snapshot: dict[str, Any] | None = None
@dataclass
class ModePlan:
model: str
fast_model: str
max_subquestions: int
chunk_lines: int
chunk_top: int
chunk_group: int
use_tool: bool
use_critic: bool
use_gap: bool
use_scores: bool
drafts: int
class AnswerEngine:
def __init__(
self,
settings: Settings,
llm: LLMClient,
kb: KnowledgeBase,
snapshot: SnapshotProvider,
) -> None:
self._settings = settings
self._llm = llm
self._kb = kb
self._snapshot = snapshot
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
async def answer(
self,
question: str,
*,
mode: str,
history: list[dict[str, str]] | None = None,
observer: Callable[[str, str], None] | None = None,
conversation_id: str | None = None,
) -> AnswerResult:
question = (question or "").strip()
if not question:
return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode})
if mode == "stock":
return await self._answer_stock(question)
limitless = "run limitless" in question.lower()
if limitless:
question = re.sub(r"(?i)run limitless", "", question).strip()
plan = _mode_plan(self._settings, mode)
call_limit = _llm_call_limit(self._settings, mode)
call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier)
call_count = 0
limit_hit = False
debug_tags = {
"route",
"decompose",
"chunk_score",
"fact_select",
"synth",
"subanswer",
"tool",
"followup",
"select_claims",
"evidence_fix",
}
def _debug_log(name: str, payload: Any) -> None:
if not self._settings.debug_pipeline:
return
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
nonlocal call_count, limit_hit
if not limitless and call_count >= call_cap:
limit_hit = True
raise LLMLimitReached("llm_limit")
call_count += 1
messages = build_messages(system, prompt, context=context)
response = await self._llm.chat(messages, model=model or plan.model)
log.info(
"atlasbot_llm_call",
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
)
if self._settings.debug_pipeline and tag in debug_tags:
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
return response
state = self._get_state(conversation_id)
snapshot = self._snapshot.get()
snapshot_used = snapshot
if self._settings.snapshot_pin_enabled and state and state.snapshot:
snapshot_used = state.snapshot
summary = build_summary(snapshot_used)
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used)
kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=6)
runbook_paths = self._kb.runbook_paths(limit=10)
history_ctx = _format_history(history)
lexicon_ctx = _lexicon_context(summary)
key_facts: list[str] = []
metric_facts: list[str] = []
facts_used: list[str] = []
started = time.monotonic()
reply = ""
scores = _default_scores()
claims: list[ClaimItem] = []
classify: dict[str, Any] = {}
tool_hint: dict[str, Any] | None = None
try:
if observer:
observer("normalize", "normalizing")
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
normalize_raw = await call_llm(
prompts.NORMALIZE_SYSTEM,
normalize_prompt,
context=lexicon_ctx,
model=plan.fast_model,
tag="normalize",
)
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
normalized = str(normalize.get("normalized") or question).strip() or question
keywords = normalize.get("keywords") or []
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords)
if observer:
observer("route", "routing")
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
route_raw = await call_llm(
prompts.ROUTE_SYSTEM,
route_prompt,
context=_join_context([kb_summary, lexicon_ctx]),
model=plan.fast_model,
tag="route",
)
classify = _parse_json_block(route_raw, fallback={})
classify.setdefault("needs_snapshot", True)
classify.setdefault("answer_style", "direct")
classify.setdefault("follow_up", False)
classify.setdefault("focus_entity", "unknown")
classify.setdefault("focus_metric", "unknown")
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
cluster_terms = (
"atlas",
"cluster",
"node",
"nodes",
"namespace",
"pod",
"workload",
"k8s",
"kubernetes",
"postgres",
"database",
"db",
"connections",
"cpu",
"ram",
"memory",
"network",
"io",
"disk",
"pvc",
"storage",
)
if any(term in normalized.lower() for term in cluster_terms):
classify["needs_snapshot"] = True
lowered_norm = normalized.lower()
if (
("namespace" in lowered_norm and ("pod" in lowered_norm or "pods" in lowered_norm))
or re.search(r"\bmost\s+pods\b", lowered_norm)
or re.search(r"\bpods\s+running\b", lowered_norm)
):
classify["question_type"] = "metric"
classify["needs_snapshot"] = True
if re.search(r"\b(how many|count|number of|list)\b", normalized.lower()):
classify["question_type"] = "metric"
hottest_terms = ("hottest", "highest", "lowest", "most")
metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage", "pod", "pods", "namespace")
lowered_question = f"{question} {normalized}".lower()
if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms):
classify["question_type"] = "metric"
if not classify.get("follow_up") and state and state.claims:
follow_terms = ("there", "that", "those", "these", "it", "them", "that one", "this", "former", "latter")
if any(term in lowered_question for term in follow_terms) or len(normalized.split()) <= 6:
classify["follow_up"] = True
if classify.get("follow_up") and state and state.claims:
if observer:
observer("followup", "answering follow-up")
reply = await self._answer_followup(question, state, summary, classify, plan, call_llm)
scores = await self._score_answer(question, reply, plan, call_llm)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(reply, scores, meta)
if observer:
observer("decompose", "decomposing")
decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2)
decompose_raw = await call_llm(
prompts.DECOMPOSE_SYSTEM,
decompose_prompt + "\nQuestion: " + normalized,
context=lexicon_ctx,
model=plan.fast_model if mode == "quick" else plan.model,
tag="decompose",
)
parts = _parse_json_list(decompose_raw)
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords)
focus_entity = str(classify.get("focus_entity") or "unknown").lower()
focus_metric = str(classify.get("focus_metric") or "unknown").lower()
lowered_q = f"{question} {normalized}".lower()
if "node" in lowered_q:
focus_entity = "node"
hotspot_override = False
snapshot_context = ""
if classify.get("needs_snapshot"):
if observer:
observer("retrieve", "scoring chunks")
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
key_facts = _key_fact_lines(summary_lines, keyword_tokens)
hottest_facts = _extract_hottest_facts(summary_lines, f"{question} {normalized}")
hardware_facts = _extract_hardware_usage_facts(summary_lines, f"{question} {normalized}")
hotspot_line = next((line for line in summary_lines if line.startswith("hottest:")), None)
if not hardware_facts:
hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")
lowered_q = f"{question} {normalized}".lower()
if any(tok in lowered_q for tok in hardware_tokens):
for line in summary_lines:
if "hardware_usage_top:" in line:
hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
break
if not hardware_facts:
for line in summary_lines:
if "hardware_usage_avg:" in line:
hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")]
break
metric_facts = [line for line in key_facts if re.search(r"\d", line)]
hotspot_request = any(term in lowered_q for term in ("hot spot", "hotspot", "hot spots", "hotspots"))
hotspot_override = False
if focus_entity == "node" and hottest_facts:
metric_facts = hottest_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
elif hardware_facts:
metric_facts = hardware_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if classify.get("question_type") in {"metric", "diagnostic"}:
if focus_entity != "node" and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")) and any(
tok in lowered_q for tok in ("average", "avg", "mean", "ram", "memory", "cpu", "load")
):
hw_top = None
for line in summary_lines:
if "hardware_usage_top:" in line:
parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
if parts:
hw_top = parts[0]
break
if hw_top:
metric_facts = [hw_top]
key_facts = _merge_fact_lines(metric_facts, key_facts)
if hottest_facts and not hardware_facts and focus_entity != "class" and not hotspot_request:
metric_facts = hottest_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if "namespace" in lowered_q and any(term in lowered_q for term in ("hotspot", "hot spot", "hottest")):
hotspot_node = None
if hottest_facts:
match = re.search(r"hottest_\w+_node: (?P<node>[^\s\[]+)", hottest_facts[0])
if match:
hotspot_node = match.group("node")
if hotspot_node:
for line in summary_lines:
if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line:
metric_facts = _merge_fact_lines([line], metric_facts)
key_facts = _merge_fact_lines([line], key_facts)
break
if not hotspot_node or not any(line.startswith("node_namespaces_top:") for line in metric_facts):
for line in summary_lines:
if line.startswith("node_pods_top:"):
metric_facts = _merge_fact_lines([line], metric_facts)
key_facts = _merge_fact_lines([line], key_facts)
break
if classify.get("question_type") in {"metric", "diagnostic"} and not hottest_facts and not hardware_facts:
metric_candidates = _metric_candidate_lines(summary_lines, keyword_tokens)
selected_facts = await _select_metric_facts(call_llm, normalized, metric_candidates, plan)
if selected_facts:
metric_facts = selected_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if self._settings.debug_pipeline:
_debug_log("metric_facts_selected", {"facts": metric_facts})
if classify.get("question_type") in {"metric", "diagnostic"} and not metric_facts:
lowered_q = f"{question} {normalized}".lower()
if "namespace" in lowered_q and "pod" in lowered_q:
for line in summary_lines:
if line.startswith("namespaces_top:"):
metric_facts = [line]
break
if not metric_facts:
hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")
if any(tok in lowered_q for tok in hardware_tokens):
for line in summary_lines:
if "hardware_usage_top:" in line:
metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
break
if not metric_facts:
for line in summary_lines:
if "hardware_usage_avg:" in line:
metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")]
break
if metric_facts:
key_facts = _merge_fact_lines(metric_facts, key_facts)
if ("pressure" in lowered_q or "diskpressure" in lowered_q or "storagepressure" in lowered_q) and not metric_facts:
pressure_lines = []
for line in summary_lines:
line_lower = line.lower()
if line_lower.startswith("pressure_summary:") or line_lower.startswith("pressure_nodes:"):
pressure_lines.append(line)
if "node_pressure" in line_lower or "pvc_pressure" in line_lower:
pressure_lines.append(line)
if line_lower.startswith("pvc_usage_top:") or line_lower.startswith("root_disk_low_headroom:"):
pressure_lines.append(line)
if pressure_lines:
metric_facts = pressure_lines[:2]
key_facts = _merge_fact_lines(metric_facts, key_facts)
if hotspot_request and hotspot_line:
metric_facts = [hotspot_line]
key_facts = _merge_fact_lines(metric_facts, key_facts)
if self._settings.debug_pipeline:
scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
key=lambda item: item["score"],
reverse=True,
)[: min(len(chunks), max(plan.chunk_top, 6))]
_debug_log(
"chunk_selected",
{
"selected_ids": [item["id"] for item in selected],
"top_scored": scored_preview,
},
)
facts_used = list(dict.fromkeys(key_facts)) if key_facts else list(dict.fromkeys(metric_facts))
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
if key_facts:
snapshot_context = "KeyFacts:\n" + "\n".join(key_facts) + "\n\n" + snapshot_context
context = _join_context(
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
)
if plan.use_tool and classify.get("needs_tool"):
if observer:
observer("tool", "suggesting tools")
tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized
tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool")
tool_hint = _parse_json_block(tool_raw, fallback={})
if observer:
observer("subanswers", "drafting subanswers")
subanswers: list[str] = []
for subq in sub_questions:
sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq
sub_answer = await call_llm(prompts.ANSWER_SYSTEM, sub_prompt, context=context, model=plan.model, tag="subanswer")
subanswers.append(sub_answer)
if observer:
observer("synthesize", "synthesizing")
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
needs_evidence = _needs_evidence_fix(reply, classify)
if classify.get("question_type") in {"open_ended", "planning"} and metric_facts:
needs_evidence = True
resolved_runbook = None
if runbook_paths and (runbook_fix or runbook_needed):
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if candidate and candidate in runbook_paths:
resolved_runbook = candidate
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
if observer:
observer("evidence_fix", "repairing missing evidence")
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if runbook_paths:
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
if resolved_runbook:
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
if metric_facts:
extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4]))
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context=context,
model=plan.model,
tag="evidence_fix",
)
if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply):
if observer:
observer("runbook_enforce", "enforcing runbook path")
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
if runbook_paths:
invalid = [
token
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)
if token.lower() not in {p.lower() for p in runbook_paths}
]
if invalid:
if observer:
observer("runbook_enforce", "replacing invalid runbook path")
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if not (candidate and candidate in runbook_paths):
candidate = _best_runbook_match(invalid[0], runbook_paths)
if candidate and candidate in runbook_paths:
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces)
if _needs_focus_fix(normalized, reply, classify):
if observer:
observer("focus_fix", "tightening answer")
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply,
context=context,
model=plan.model,
tag="focus_fix",
)
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts:
best_line = None
lowered_keywords = [kw.lower() for kw in keyword_tokens if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
reply = f"From the latest snapshot: {best_line}."
if plan.use_critic:
if observer:
observer("critic", "reviewing")
critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic")
critic = _parse_json_block(critic_raw, fallback={})
if critic.get("issues"):
revise_prompt = (
prompts.REVISION_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ "\nCritique: "
+ json.dumps(critic)
)
reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise")
if plan.use_gap:
if observer:
observer("gap", "checking gaps")
gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap")
gap = _parse_json_block(gap_raw, fallback={})
note = str(gap.get("note") or "").strip()
if note:
reply = f"{reply}\n\n{note}"
if hotspot_request:
namespace_line = None
hotspot_node = None
if hotspot_line:
match = re.search(r"(titan-[a-z0-9]+)", hotspot_line)
if match:
hotspot_node = match.group(1)
if not hotspot_node:
for fact in metric_facts:
match = re.search(r"hottest_\w+_node: (?P<node>[^\s\[]+)", fact)
if match:
hotspot_node = match.group("node")
break
if "namespace" in lowered_q and hotspot_node:
for line in summary_lines:
if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line:
namespace_line = line
break
if not namespace_line:
for line in summary_lines:
if line.startswith("node_pods_top:") and hotspot_node in line:
namespace_line = line
break
if "namespace" in lowered_q and not namespace_line:
for fact in metric_facts:
if fact.startswith("node_namespaces_top:"):
namespace_line = fact
break
if not namespace_line:
for fact in metric_facts:
if fact.startswith("node_pods_top:"):
namespace_line = fact
break
if hotspot_line:
reply = f"Current hotspots: {hotspot_line}."
elif hotspot_node:
reply = f"Hotspot node: {hotspot_node}."
if namespace_line and reply:
reply = f"{reply} {namespace_line}."
if reply:
hotspot_override = True
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts and not hotspot_override:
reply = _metric_fact_guard(reply, metric_facts, keyword_tokens)
if classify.get("question_type") in {"metric", "diagnostic"}:
lowered_q = f"{question} {normalized}".lower()
if any(tok in lowered_q for tok in ("how many", "count", "number of")) and any(
tok in lowered_q for tok in ("jetson", "rpi4", "rpi5", "amd64", "arm64", "rpi")
):
hw_line = next((line for line in summary_lines if line.startswith("hardware:")), None)
hw_nodes_line = next((line for line in summary_lines if line.startswith("hardware_nodes:")), None)
if hw_line:
def _find_value(key: str, line: str) -> str | None:
match = re.search(rf"{re.escape(key)}=([^;|]+)", line)
return match.group(1).strip() if match else None
target = None
if "jetson" in lowered_q:
target = "jetson"
elif "rpi5" in lowered_q:
target = "rpi5"
elif "rpi4" in lowered_q:
target = "rpi4"
elif "amd64" in lowered_q:
target = "amd64"
elif "arm64" in lowered_q:
target = "arm64"
elif "rpi" in lowered_q:
target = "rpi"
if target:
count = _find_value(target, hw_line)
nodes = _find_value(target, hw_nodes_line or "")
if count:
if nodes and "(" not in count:
reply = f"From the latest snapshot: {target}={count} ({nodes})."
else:
reply = f"From the latest snapshot: {target}={count}."
if _has_token(lowered_q, "io") and ("node" in lowered_q or "nodes" in lowered_q):
io_facts = _extract_hottest_facts(summary_lines, lowered_q)
io_line = next((fact for fact in io_facts if fact.startswith("hottest_io_node")), None)
if io_line:
reply = f"From the latest snapshot: {io_line}."
if "namespace" in lowered_q and "pod" in lowered_q:
ns_line = None
for line in summary_lines:
if line.startswith("namespaces_top:"):
ns_line = line
break
cpu_line = None
if any(tok in lowered_q for tok in ("cpu", "hottest", "highest cpu", "highest")):
cpu_facts = _extract_hottest_facts(summary_lines, lowered_q)
cpu_line = next((fact for fact in cpu_facts if fact.startswith("hottest_cpu_node")), None)
if ns_line:
if cpu_line:
reply = f"From the latest snapshot: {cpu_line}; {ns_line}."
else:
reply = f"From the latest snapshot: {ns_line}."
# do not fall through to other overrides
if classify.get("question_type") in {"metric", "diagnostic"}:
lowered_q = f"{question} {normalized}".lower()
if (
focus_entity != "node"
and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64"))
and any(tok in lowered_q for tok in ("average", "avg", "mean", "per hardware", "by hardware", "typical"))
):
hw_top = None
for line in summary_lines:
if "hardware_usage_top:" in line:
parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
if parts:
hw_top = parts[0]
break
if hw_top:
reply = f"From the latest snapshot: {hw_top}."
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
scores = await self._score_answer(normalized, reply, plan, call_llm)
claims = await self._extract_claims(normalized, reply, summary, facts_used, call_llm)
except LLMLimitReached:
if not reply:
reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass."
scores = _default_scores()
finally:
elapsed = round(time.monotonic() - started, 2)
log.info(
"atlasbot_answer",
extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}},
)
if conversation_id and claims:
self._store_state(conversation_id, claims, summary, snapshot_used)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(reply, scores, meta)
async def _answer_stock(self, question: str) -> AnswerResult:
messages = build_messages(prompts.STOCK_SYSTEM, question)
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
async def _synthesize_answer(
self,
question: str,
subanswers: list[str],
context: str,
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> str:
style_hint = _style_hint(classify)
if not subanswers:
prompt = (
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
)
return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")
draft_prompts = []
for idx in range(plan.drafts):
draft_prompts.append(
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
+ "\nSubanswers:\n"
+ "\n".join([f"- {item}" for item in subanswers])
+ f"\nDraftIndex: {idx + 1}"
)
drafts: list[str] = []
for prompt in draft_prompts:
drafts.append(await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth"))
if len(drafts) == 1:
return drafts[0]
select_prompt = (
prompts.DRAFT_SELECT_PROMPT
+ "\nQuestion: "
+ question
+ "\nDrafts:\n"
+ "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)])
)
select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select")
selection = _parse_json_block(select_raw, fallback={})
idx = int(selection.get("best", 1)) - 1
if 0 <= idx < len(drafts):
return drafts[idx]
return drafts[0]
async def _score_answer(
self,
question: str,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
) -> AnswerScores:
if not plan.use_scores:
return _default_scores()
prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score")
data = _parse_json_block(raw, fallback={})
return _scores_from_json(data)
async def _extract_claims(
self,
question: str,
reply: str,
summary: dict[str, Any],
facts_used: list[str],
call_llm: Callable[..., Any],
) -> list[ClaimItem]:
if not reply or not summary:
return []
summary_json = _json_excerpt(summary)
facts_used = [line.strip() for line in (facts_used or []) if line and line.strip()]
facts_block = ""
if facts_used:
facts_block = "\nFactsUsed:\n" + "\n".join([f"- {line}" for line in facts_used[:12]])
prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply + facts_block
raw = await call_llm(
prompts.CLAIM_SYSTEM,
prompt,
context=f"SnapshotSummaryJson:{summary_json}",
model=self._settings.ollama_model_fast,
tag="claim_map",
)
data = _parse_json_block(raw, fallback={})
claims_raw = data.get("claims") if isinstance(data, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}"
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value = _resolve_path(summary, path)
evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value))
if claim_text and evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return claims
async def _dedup_reply(
self,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
tag: str,
) -> str:
if not _needs_dedup(reply):
return reply
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
async def _answer_followup(
self,
question: str,
state: ConversationState,
summary: dict[str, Any],
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> str:
claim_ids = await self._select_claims(question, state.claims, plan, call_llm)
selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2]
evidence_lines = []
lowered = question.lower()
for claim in selected:
evidence_lines.append(f"Claim: {claim.claim}")
for ev in claim.evidence:
current = _resolve_path(summary, ev.path)
ev.value = current
delta_note = ""
if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim:
delta_note = f" (now {current})"
evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}")
if any(term in lowered for term in ("hotspot", "hot spot", "hottest", "jetson", "rpi", "amd64", "arm64", "hardware", "class")):
hotspot_lines = _hotspot_evidence(summary)
if hotspot_lines:
evidence_lines.append("HotspotSummary:")
evidence_lines.extend(hotspot_lines)
evidence_ctx = "\n".join(evidence_lines)
prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx
reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup")
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
if extra_bits:
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ question
+ "\nDraft: "
+ reply
+ "\n"
+ "\n".join(extra_bits)
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context="Evidence:\n" + evidence_ctx,
model=plan.model,
tag="followup_fix",
)
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup")
reply = _strip_followup_meta(reply)
return reply
async def _select_claims(
self,
question: str,
claims: list[ClaimItem],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> list[str]:
if not claims:
return []
claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims]
prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief)
raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims")
data = _parse_json_block(raw, fallback={})
ids = data.get("claim_ids") if isinstance(data, dict) else []
if isinstance(ids, list):
return [str(item) for item in ids if item]
return []
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
if not conversation_id:
return None
state_payload = self._store.get(conversation_id)
return _state_from_payload(state_payload) if state_payload else None
def _store_state(
self,
conversation_id: str,
claims: list[ClaimItem],
summary: dict[str, Any],
snapshot: dict[str, Any] | None,
) -> None:
snapshot_id = _snapshot_id(summary)
pinned_snapshot = snapshot if self._settings.snapshot_pin_enabled else None
payload = {
"updated_at": time.monotonic(),
"claims": _claims_to_payload(claims),
"snapshot_id": snapshot_id,
"snapshot": pinned_snapshot,
}
self._store.set(conversation_id, payload)
def _cleanup_state(self) -> None:
self._store.cleanup()
def _strip_followup_meta(reply: str) -> str:
cleaned = reply.strip()
if not cleaned:
return cleaned
prefixes = [
"The draft is correct based on the provided context.",
"The draft is correct based on the context.",
"The draft is correct based on the provided evidence.",
"The draft is correct.",
"Based on the provided context,",
"Based on the context,",
"Based on the provided evidence,",
]
for prefix in prefixes:
if cleaned.lower().startswith(prefix.lower()):
cleaned = cleaned[len(prefix) :].lstrip(" .")
break
return cleaned
def _build_meta(
mode: str,
call_count: int,
call_cap: int,
limit_hit: bool,
classify: dict[str, Any],
tool_hint: dict[str, Any] | None,
started: float,
) -> dict[str, Any]:
return {
"mode": mode,
"llm_calls": call_count,
"llm_limit": call_cap,
"llm_limit_hit": limit_hit,
"classify": classify,
"tool_hint": tool_hint,
"elapsed_sec": round(time.monotonic() - started, 2),
}
def _mode_plan(settings: Settings, mode: str) -> ModePlan:
if mode == "genius":
return ModePlan(
model=settings.ollama_model_genius,
fast_model=settings.ollama_model_fast,
max_subquestions=6,
chunk_lines=6,
chunk_top=10,
chunk_group=4,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=2,
)
if mode == "smart":
return ModePlan(
model=settings.ollama_model_smart,
fast_model=settings.ollama_model_fast,
max_subquestions=4,
chunk_lines=8,
chunk_top=8,
chunk_group=4,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=1,
)
return ModePlan(
model=settings.ollama_model_fast,
fast_model=settings.ollama_model_fast,
max_subquestions=2,
chunk_lines=12,
chunk_top=5,
chunk_group=5,
use_tool=False,
use_critic=False,
use_gap=False,
use_scores=False,
drafts=1,
)
def _llm_call_limit(settings: Settings, mode: str) -> int:
if mode == "genius":
return settings.genius_llm_calls_max
if mode == "smart":
return settings.smart_llm_calls_max
return settings.fast_llm_calls_max
def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]:
if not parts:
return [fallback]
ranked = []
for entry in parts:
if not isinstance(entry, dict):
continue
question = str(entry.get("question") or "").strip()
if not question:
continue
priority = entry.get("priority")
try:
weight = float(priority)
except (TypeError, ValueError):
weight = 1.0
ranked.append((weight, question))
ranked.sort(key=lambda item: item[0], reverse=True)
questions = [item[1] for item in ranked][:limit]
return questions or [fallback]
def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]:
chunks: list[dict[str, Any]] = []
if not lines:
return chunks
for idx in range(0, len(lines), lines_per_chunk):
chunk_lines = lines[idx : idx + lines_per_chunk]
text = "\n".join(chunk_lines)
summary = " | ".join(chunk_lines[:4])
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
return chunks
async def _score_chunks(
call_llm: Callable[..., Any],
chunks: list[dict[str, Any]],
question: str,
sub_questions: list[str],
plan: ModePlan,
) -> dict[str, float]:
scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks}
if not chunks:
return scores
group: list[dict[str, Any]] = []
for chunk in chunks:
group.append({"id": chunk["id"], "summary": chunk["summary"]})
if len(group) >= plan.chunk_group:
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
group = []
if group:
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
return scores
async def _score_chunk_group(
call_llm: Callable[..., Any],
group: list[dict[str, Any]],
question: str,
sub_questions: list[str],
) -> dict[str, float]:
prompt = (
prompts.CHUNK_SCORE_PROMPT
+ "\nQuestion: "
+ question
+ "\nSubQuestions: "
+ json.dumps(sub_questions)
+ "\nChunks: "
+ json.dumps(group)
)
raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score")
data = _parse_json_list(raw)
scored: dict[str, float] = {}
for entry in data:
if not isinstance(entry, dict):
continue
cid = str(entry.get("id") or "").strip()
if not cid:
continue
try:
score = float(entry.get("score") or 0)
except (TypeError, ValueError):
score = 0.0
scored[cid] = score
return scored
def _select_chunks(
chunks: list[dict[str, Any]],
scores: dict[str, float],
plan: ModePlan,
keywords: list[str] | None = None,
) -> list[dict[str, Any]]:
if not chunks:
return []
ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True)
selected: list[dict[str, Any]] = []
head = chunks[0]
selected.append(head)
keyword_hits: list[dict[str, Any]] = []
raw_keywords = [kw.lower() for kw in (keywords or []) if kw]
focused = _focused_keywords(keywords or [])
if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked:
text = item.get("text", "").lower()
if any(kw in text for kw in lowered):
keyword_hits.append(item)
if raw_keywords:
for item in ranked:
if len(keyword_hits) >= plan.chunk_top:
break
text = item.get("text", "").lower()
if any(kw in text for kw in raw_keywords):
keyword_hits.append(item)
for item in keyword_hits:
if len(selected) >= plan.chunk_top:
break
if item in selected:
continue
selected.append(item)
for item in ranked:
if len(selected) >= plan.chunk_top:
break
if item is head:
continue
selected.append(item)
return selected
def _format_runbooks(runbooks: list[str]) -> str:
if not runbooks:
return ""
return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks])
def _join_context(parts: list[str]) -> str:
text = "\n".join([part for part in parts if part])
return text.strip()
def _format_history(history: list[dict[str, str]] | None) -> str:
if not history:
return ""
lines = ["Recent conversation (non-authoritative):"]
for entry in history[-4:]:
if not isinstance(entry, dict):
continue
question = entry.get("q")
answer = entry.get("a")
role = entry.get("role")
content = entry.get("content")
if question:
lines.append(f"Q: {question}")
if answer:
lines.append(f"A: {answer}")
if role and content:
prefix = "Q" if role == "user" else "A"
lines.append(f"{prefix}: {content}")
return "\n".join(lines)
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
text = summary_text(snapshot)
if not text:
return []
return [line for line in text.splitlines() if line.strip()]
def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]:
if not lines or not keywords:
return []
lowered = [kw.lower() for kw in keywords if kw]
if not lowered:
return []
focused = _focused_keywords(lowered)
primary = focused or lowered
matches: list[str] = []
for line in lines:
line_lower = line.lower()
if any(kw in line_lower for kw in primary):
matches.append(line)
if len(matches) >= limit:
break
if len(matches) < limit and focused:
for line in lines:
if len(matches) >= limit:
break
if line in matches:
continue
line_lower = line.lower()
if any(kw in line_lower for kw in lowered):
matches.append(line)
return matches
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
seen = set()
merged: list[str] = []
for line in primary + fallback:
if line in seen:
continue
seen.add(line)
merged.append(line)
return merged
def _expand_hottest_line(line: str) -> list[str]:
if not line:
return []
if not line.lower().startswith("hottest:"):
return []
expanded: list[str] = []
payload = line.split("hottest:", 1)[1]
for part in payload.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
metric, rest = part.split("=", 1)
metric = metric.strip()
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
if not match:
continue
node = match.group("node").strip()
value = match.group("value").strip()
class_match = re.search(r"\[(?P<class>[^\]]+)\]", rest)
node_class = class_match.group("class").strip() if class_match else ""
if node_class:
expanded.append(f"hottest_{metric}_node: {node} [{node_class}] ({value})")
else:
expanded.append(f"hottest_{metric}_node: {node} ({value})")
return expanded
def _has_token(text: str, token: str) -> bool:
if not text or not token:
return False
if token == "io":
return "i/o" in text or re.search(r"\bio\b", text) is not None
return re.search(rf"\b{re.escape(token)}\b", text) is not None
def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
if not hottest:
return []
hardware_by_node = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
node_pods_top = summary.get("node_pods_top") if isinstance(summary.get("node_pods_top"), list) else []
ns_map = {}
for item in node_pods_top:
if not isinstance(item, dict):
continue
node = item.get("node")
namespaces_top = item.get("namespaces_top") if isinstance(item.get("namespaces_top"), list) else []
ns_map[node] = namespaces_top
lines: list[str] = []
for metric, info in hottest.items():
if not isinstance(info, dict):
continue
node = info.get("node")
value = info.get("value")
if not node:
continue
node_class = hardware_by_node.get(node)
ns_parts = []
for entry in ns_map.get(node, [])[:3]:
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
ns_parts.append(f"{entry[0]}={entry[1]}")
ns_text = ", ".join(ns_parts)
value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value)
line = f"hotspot.{metric}: node={node} class={node_class or 'unknown'} value={value_text}"
if ns_text:
line += f" namespaces_top={ns_text}"
lines.append(line)
return lines
def _extract_hottest_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if not any(
term in lowered
for term in ("hottest", "hot", "highest", "lowest", "most", "top", "peak", "loaded", "load", "busy")
):
return []
if "node" not in lowered and "nodes" not in lowered:
return []
line = ""
for item in lines:
if item.lower().startswith("hottest:"):
line = item
break
if " | " in item:
for seg in [seg.strip() for seg in item.split(" | ")]:
if seg.lower().startswith("hottest:"):
line = seg
break
if line:
break
if not line:
return []
facts = _expand_hottest_line(line)
if not facts:
return []
wanted = []
if _has_token(lowered, "io") and ("disk" in lowered or "storage" in lowered):
return [fact for fact in facts if fact.startswith("hottest_io_node")]
if _has_token(lowered, "cpu") or "processor" in lowered:
wanted.append("hottest_cpu_node")
if _has_token(lowered, "ram") or "memory" in lowered:
wanted.append("hottest_ram_node")
if _has_token(lowered, "net") or "network" in lowered or "throughput" in lowered:
wanted.append("hottest_net_node")
if _has_token(lowered, "io"):
wanted.append("hottest_io_node")
if _has_token(lowered, "disk") or "storage" in lowered:
wanted.append("hottest_disk_node")
if not wanted:
return facts
return [fact for fact in facts if any(label in fact for label in wanted)] or facts
def _extract_hardware_usage_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if "hardware" not in lowered:
return []
if not any(term in lowered for term in ("average", "avg", "mean", "per hardware", "by hardware", "typical")):
return []
avg_line = None
top_line = None
for line in lines:
segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line]
for seg in segments:
if seg.startswith("hardware_usage_avg:"):
avg_line = seg
elif seg.startswith("hardware_usage_top:"):
top_line = seg
if not avg_line and not top_line:
return []
wants_top = any(term in lowered for term in ("highest", "lowest", "most", "least", "top", "worst", "best"))
if wants_top and top_line:
return [top_line]
facts: list[str] = []
if avg_line:
facts.append(avg_line)
if top_line:
facts.append(top_line)
return facts
def _metric_candidate_lines(lines: list[str], keywords: list[str] | None, limit: int = 40) -> list[str]:
if not lines:
return []
lowered = [kw.lower() for kw in (keywords or []) if kw]
prefer_node = any("node" in kw for kw in lowered) or "hottest" in lowered
metric_tokens = {
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"load",
"usage",
"utilization",
"hottest",
"p95",
"percent",
"pressure",
"pod",
"pods",
"namespace",
"anomaly",
"anomalies",
"pvc",
"storage",
"pressure_summary",
"pressure_nodes",
"diskpressure",
"storagepressure",
}
candidates: list[str] = []
expanded: list[str] = []
for line in lines:
if " | " in line:
expanded.extend([seg.strip() for seg in line.split(" | ") if seg.strip()])
expanded.append(line)
for line in expanded:
if line.lower().startswith("hottest:"):
candidates.extend(_expand_hottest_line(line))
break
for line in expanded:
line_lower = line.lower()
if line_lower.startswith("lexicon_") or line_lower.startswith("units:"):
continue
if prefer_node and "pod_" in line_lower:
continue
if "hottest:" in line_lower:
candidates.append(line)
continue
if lowered and any(kw in line_lower for kw in lowered):
candidates.append(line)
continue
if any(token in line_lower for token in metric_tokens) and re.search(r"\d", line_lower):
candidates.append(line)
continue
return candidates[:limit]
async def _select_metric_facts(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
max_lines: int = 2,
) -> list[str]:
if not candidates:
return []
prompt = (
prompts.FACT_SELECT_PROMPT.format(max_lines=max_lines)
+ "\nQuestion: "
+ question
+ "\nCandidates:\n"
+ "\n".join([f"- {line}" for line in candidates])
)
raw = await call_llm(prompts.FACT_SELECT_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
data = _parse_json_block(raw, fallback={})
lines = data.get("lines") if isinstance(data, dict) else None
if not isinstance(lines, list):
return []
cleaned = []
allowed = set(candidates)
for line in lines:
if isinstance(line, str) and line in allowed and line not in cleaned:
cleaned.append(line)
if len(cleaned) >= max_lines:
break
return cleaned
def _metric_fact_guard(reply: str, metric_facts: list[str], keywords: list[str]) -> str:
if not metric_facts:
return reply
best_line = None
lowered_keywords = [kw.lower() for kw in keywords if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
return f"From the latest snapshot: {best_line}."
return reply
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
if not reply:
return reply
if not unknown_nodes and not unknown_namespaces:
return reply
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if not sentences:
return reply
lowered_nodes = [node.lower() for node in unknown_nodes]
lowered_namespaces = [ns.lower() for ns in unknown_namespaces]
kept: list[str] = []
for sent in sentences:
lower = sent.lower()
if lowered_nodes and any(node in lower for node in lowered_nodes):
continue
if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces):
continue
kept.append(sent)
cleaned = " ".join(kept).strip()
return cleaned or reply
def _lexicon_context(summary: dict[str, Any]) -> str:
if not isinstance(summary, dict):
return ""
lexicon = summary.get("lexicon")
if not isinstance(lexicon, dict):
return ""
terms = lexicon.get("terms")
aliases = lexicon.get("aliases")
lines: list[str] = []
if isinstance(terms, list):
for entry in terms[:8]:
if not isinstance(entry, dict):
continue
term = entry.get("term")
meaning = entry.get("meaning")
if term and meaning:
lines.append(f"{term}: {meaning}")
if isinstance(aliases, dict):
for key, value in list(aliases.items())[:6]:
if key and value:
lines.append(f"alias {key} -> {value}")
if not lines:
return ""
return "Lexicon:\n" + "\n".join(lines)
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
raw = text.strip()
match = re.search(r"\{.*\}", raw, flags=re.S)
if match:
return parse_json(match.group(0), fallback=fallback)
return parse_json(raw, fallback=fallback)
def _parse_json_list(text: str) -> list[dict[str, Any]]:
raw = text.strip()
match = re.search(r"\[.*\]", raw, flags=re.S)
data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={})
if isinstance(data, list):
return [entry for entry in data if isinstance(entry, dict)]
return []
def _scores_from_json(data: dict[str, Any]) -> AnswerScores:
return AnswerScores(
confidence=_coerce_int(data.get("confidence"), 60),
relevance=_coerce_int(data.get("relevance"), 60),
satisfaction=_coerce_int(data.get("satisfaction"), 60),
hallucination_risk=str(data.get("hallucination_risk") or "medium"),
)
def _coerce_int(value: Any, default: int) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def _default_scores() -> AnswerScores:
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
def _style_hint(classify: dict[str, Any]) -> str:
style = (classify.get("answer_style") or "").strip().lower()
qtype = (classify.get("question_type") or "").strip().lower()
if style == "insightful" or qtype in {"open_ended", "planning"}:
return "insightful"
return "direct"
def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
lowered = reply.lower()
missing_markers = (
"don't have",
"do not have",
"don't know",
"cannot",
"can't",
"need to",
"would need",
"does not provide",
"does not mention",
"not mention",
"not provided",
"not in context",
"not referenced",
"missing",
"no specific",
"no information",
)
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
return True
if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply):
return True
return False
def _needs_dedup(reply: str) -> bool:
if not reply:
return False
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if len(sentences) < 3:
return False
seen = set()
for sent in sentences:
norm = re.sub(r"\\s+", " ", sent.lower())
if norm in seen:
return True
seen.add(norm)
return False
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
q_lower = (question or "").lower()
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
return False
if reply.count(".") <= 1:
return False
extra_markers = ("for more", "if you need", "additional", "based on")
return any(marker in reply.lower() for marker in extra_markers)
def _extract_keywords(
raw_question: str,
normalized: str,
sub_questions: list[str],
keywords: list[Any] | None,
) -> list[str]:
stopwords = {
"the",
"and",
"for",
"with",
"that",
"this",
"what",
"which",
"when",
"where",
"who",
"why",
"how",
"tell",
"show",
"list",
"give",
"about",
"right",
"now",
}
tokens: list[str] = []
for source in [raw_question, normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < 3 or part in stopwords:
continue
tokens.append(part)
if keywords:
for kw in keywords:
if isinstance(kw, str):
part = kw.strip().lower()
if part and part not in stopwords and part not in tokens:
tokens.append(part)
return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:
return sorted([node for node in hardware.keys() if isinstance(node, str)])
return []
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
namespaces: list[str] = []
for entry in summary.get("namespace_pods") or []:
if isinstance(entry, dict):
name = entry.get("namespace")
if name:
namespaces.append(str(name))
return sorted(set(namespaces))
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\b(titan-[0-9a-z]+|node\d+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
if not reply or not allowed:
return False
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
if not paths:
return False
allowed_set = {p.lower() for p in allowed}
return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
if not candidate or not allowed:
return None
best = None
best_score = 0.0
for path in allowed:
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
if score > best_score:
best_score = score
best = path
return best if best_score >= 0.4 else None
def _resolve_path(data: Any, path: str) -> Any | None:
if path.startswith("line:"):
return path.split("line:", 1)[1].strip()
cursor = data
for part in re.split(r"\.(?![^\[]*\])", path):
if not part:
continue
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
if not match:
return None
key = match.group(1)
index = match.group(2)
if isinstance(cursor, dict):
cursor = cursor.get(key)
else:
return None
if index is not None:
try:
idx = int(index)
if isinstance(cursor, list) and 0 <= idx < len(cursor):
cursor = cursor[idx]
else:
return None
except ValueError:
return None
return cursor
def _snapshot_id(summary: dict[str, Any]) -> str | None:
if not summary:
return None
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
value = summary.get(key)
if isinstance(value, str) and value:
return value
return None
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for claim in claims:
evidence = []
for ev in claim.evidence:
evidence.append(
{
"path": ev.path,
"reason": ev.reason,
"value_at_claim": ev.value_at_claim,
}
)
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
return output
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
if not payload:
return None
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip()
if not claim_text or not claim_id:
continue
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value_at_claim = ev.get("value_at_claim")
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
if evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return ConversationState(
updated_at=float(payload.get("updated_at") or time.monotonic()),
claims=claims,
snapshot_id=payload.get("snapshot_id"),
snapshot=payload.get("snapshot"),
)
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
raw = json.dumps(summary, ensure_ascii=False)
return raw[:max_chars]