atlasbot/atlasbot/engine/answerer.py

2481 lines
93 KiB
Python

import asyncio
import json
import logging
import math
import asyncio
import re
import time
import difflib
from collections.abc import Awaitable
from dataclasses import dataclass
from typing import Any, Callable
from atlasbot.config import Settings
from atlasbot.knowledge.loader import KnowledgeBase
from atlasbot.llm.client import LLMClient, build_messages, parse_json
from atlasbot.llm import prompts
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
from atlasbot.state.store import ClaimStore
log = logging.getLogger(__name__)
FOLLOWUP_SHORT_WORDS = 6
TOKEN_MIN_LEN = 3
NS_ENTRY_MIN_LEN = 2
DEDUP_MIN_SENTENCES = 3
RUNBOOK_SIMILARITY_THRESHOLD = 0.4
class LLMLimitReached(RuntimeError):
pass
@dataclass
class AnswerScores:
confidence: int
relevance: int
satisfaction: int
hallucination_risk: str
@dataclass
class AnswerResult:
reply: str
scores: AnswerScores
meta: dict[str, Any]
@dataclass
class EvidenceItem:
path: str
reason: str
value: Any | None = None
value_at_claim: Any | None = None
@dataclass
class ClaimItem:
id: str
claim: str
evidence: list[EvidenceItem]
@dataclass
class ConversationState:
updated_at: float
claims: list[ClaimItem]
snapshot_id: str | None = None
snapshot: dict[str, Any] | None = None
@dataclass
class ModePlan:
model: str
fast_model: str
max_subquestions: int
chunk_lines: int
chunk_top: int
chunk_group: int
parallelism: int
score_retries: int
use_tool: bool
use_critic: bool
use_gap: bool
use_scores: bool
drafts: int
metric_retries: int
subanswer_retries: int
@dataclass
class ScoreContext:
question: str
sub_questions: list[str]
retries: int
parallelism: int
class AnswerEngine:
def __init__(
self,
settings: Settings,
llm: LLMClient,
kb: KnowledgeBase,
snapshot: SnapshotProvider,
) -> None:
self._settings = settings
self._llm = llm
self._kb = kb
self._snapshot = snapshot
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
async def answer( # noqa: C901, PLR0912, PLR0913, PLR0915
self,
question: str,
*,
mode: str,
history: list[dict[str, str]] | None = None,
observer: Callable[[str, str], None] | None = None,
conversation_id: str | None = None,
snapshot_pin: bool | None = None,
) -> AnswerResult:
question = (question or "").strip()
if not question:
return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode})
if mode == "stock":
return await self._answer_stock(question)
limitless = "run limitless" in question.lower()
if limitless:
question = re.sub(r"(?i)run limitless", "", question).strip()
plan = _mode_plan(self._settings, mode)
call_limit = _llm_call_limit(self._settings, mode)
call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier)
call_count = 0
limit_hit = False
debug_tags = {
"route",
"decompose",
"chunk_score",
"fact_select",
"synth",
"subanswer",
"tool",
"followup",
"select_claims",
"evidence_fix",
}
def _debug_log(name: str, payload: Any) -> None:
if not self._settings.debug_pipeline:
return
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
nonlocal call_count, limit_hit
if not limitless and call_count >= call_cap:
limit_hit = True
raise LLMLimitReached("llm_limit")
call_count += 1
messages = build_messages(system, prompt, context=context)
response = await self._llm.chat(messages, model=model or plan.model)
log.info(
"atlasbot_llm_call",
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
)
if self._settings.debug_pipeline and tag in debug_tags:
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
return response
state = self._get_state(conversation_id)
pin_snapshot = bool(snapshot_pin) or self._settings.snapshot_pin_enabled
snapshot = self._snapshot.get()
snapshot_used = snapshot
if pin_snapshot and state and state.snapshot:
snapshot_used = state.snapshot
summary = build_summary(snapshot_used)
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used)
metric_tokens = _metric_key_tokens(summary_lines)
global_facts = _global_facts(summary_lines)
kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=6)
runbook_paths = self._kb.runbook_paths(limit=10)
history_ctx = _format_history(history)
lexicon_ctx = _lexicon_context(summary)
key_facts: list[str] = []
metric_facts: list[str] = []
facts_used: list[str] = []
started = time.monotonic()
reply = ""
scores = _default_scores()
claims: list[ClaimItem] = []
classify: dict[str, Any] = {}
tool_hint: dict[str, Any] | None = None
try:
if observer:
observer("normalize", "normalizing")
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
normalize_raw = await call_llm(
prompts.NORMALIZE_SYSTEM,
normalize_prompt,
context=lexicon_ctx,
model=plan.fast_model,
tag="normalize",
)
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
normalized = str(normalize.get("normalized") or question).strip() or question
keywords = normalize.get("keywords") or []
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords)
question_tokens = _extract_question_tokens(normalized)
if observer:
observer("route", "routing")
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
route_raw = await call_llm(
prompts.ROUTE_SYSTEM,
route_prompt,
context=_join_context([kb_summary, lexicon_ctx]),
model=plan.fast_model,
tag="route",
)
classify = _parse_json_block(route_raw, fallback={})
classify.setdefault("needs_snapshot", True)
classify.setdefault("answer_style", "direct")
classify.setdefault("follow_up", False)
classify.setdefault("focus_entity", "unknown")
classify.setdefault("focus_metric", "unknown")
if metric_tokens and keyword_tokens and any(token in metric_tokens for token in keyword_tokens):
classify["needs_snapshot"] = True
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
lowered_question = f"{question} {normalized}".lower()
force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", lowered_question))
if any(term in lowered_question for term in ("postgres", "connections", "pvc", "ready")):
force_metric = True
cluster_terms = (
"atlas",
"cluster",
"node",
"nodes",
"namespace",
"pod",
"workload",
"k8s",
"kubernetes",
"postgres",
"database",
"db",
"connections",
"cpu",
"ram",
"memory",
"network",
"io",
"disk",
"pvc",
"storage",
)
has_cluster_terms = any(term in lowered_question for term in cluster_terms)
if has_cluster_terms:
classify["needs_snapshot"] = True
lowered_norm = normalized.lower()
if (
("namespace" in lowered_norm and ("pod" in lowered_norm or "pods" in lowered_norm))
or re.search(r"\bmost\s+pods\b", lowered_norm)
or re.search(r"\bpods\s+running\b", lowered_norm)
):
classify["question_type"] = "metric"
classify["needs_snapshot"] = True
if re.search(r"\b(how many|count|number of|list)\b", lowered_question):
classify["question_type"] = "metric"
if any(term in lowered_question for term in ("postgres", "connections", "db")):
classify["question_type"] = "metric"
classify["needs_snapshot"] = True
if any(term in lowered_question for term in ("pvc", "persistentvolume", "persistent volume", "storage")):
if classify.get("question_type") not in {"metric", "diagnostic"}:
classify["question_type"] = "metric"
classify["needs_snapshot"] = True
if "ready" in lowered_question and classify.get("question_type") not in {"metric", "diagnostic"}:
classify["question_type"] = "diagnostic"
hottest_terms = ("hottest", "highest", "lowest", "most")
metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage", "pod", "pods", "namespace")
if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms):
classify["question_type"] = "metric"
baseline_terms = ("baseline", "delta", "trend", "increase", "decrease", "drop", "spike", "regression", "change")
if any(term in lowered_question for term in baseline_terms) and any(term in lowered_question for term in metric_terms):
classify["question_type"] = "metric"
classify["needs_snapshot"] = True
if not classify.get("follow_up") and state and state.claims:
follow_terms = ("there", "that", "those", "these", "it", "them", "that one", "this", "former", "latter")
is_metric_query = force_metric or classify.get("question_type") in {"metric", "diagnostic"}
if not is_metric_query:
if any(term in lowered_question for term in follow_terms):
classify["follow_up"] = True
elif len(normalized.split()) <= FOLLOWUP_SHORT_WORDS and not has_cluster_terms:
classify["follow_up"] = True
if classify.get("follow_up") and state and state.claims:
if observer:
observer("followup", "answering follow-up")
reply = await self._answer_followup(question, state, summary, classify, plan, call_llm)
scores = await self._score_answer(question, reply, plan, call_llm)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(reply, scores, meta)
if observer:
observer("decompose", "decomposing")
decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2)
decompose_raw = await call_llm(
prompts.DECOMPOSE_SYSTEM,
decompose_prompt + "\nQuestion: " + normalized,
context=lexicon_ctx,
model=plan.fast_model if mode == "quick" else plan.model,
tag="decompose",
)
parts = _parse_json_list(decompose_raw)
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords)
focus_entity = str(classify.get("focus_entity") or "unknown").lower()
focus_metric = str(classify.get("focus_metric") or "unknown").lower()
lowered_q = f"{question} {normalized}".lower()
if "node" in lowered_q:
focus_entity = "node"
snapshot_context = ""
signal_tokens: list[str] = []
if classify.get("needs_snapshot"):
if observer:
observer("retrieve", "scoring chunks")
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
metric_keys: list[str] = []
must_chunk_ids: list[str] = []
metric_task = None
if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and summary_lines:
metric_ctx = {
"question": normalized,
"sub_questions": sub_questions,
"keywords": keywords,
"keyword_tokens": keyword_tokens,
"summary_lines": summary_lines,
}
metric_task = asyncio.create_task(_select_metric_chunks(call_llm, metric_ctx, chunks, plan))
scored_task = asyncio.create_task(_score_chunks(call_llm, chunks, normalized, sub_questions, plan))
if metric_task:
metric_keys, must_chunk_ids = await metric_task
scored = await scored_task
selected = _select_chunks(chunks, scored, plan, keyword_tokens, must_chunk_ids)
fact_candidates = _collect_fact_candidates(selected, limit=plan.max_subquestions * 12)
key_facts = await _select_fact_lines(
call_llm,
normalized,
fact_candidates,
plan,
max_lines=max(4, plan.max_subquestions * 2),
)
metric_facts: list[str] = []
if classify.get("question_type") in {"metric", "diagnostic"} or force_metric:
if observer:
observer("retrieve", "extracting fact types")
fact_types = await _extract_fact_types(
call_llm,
normalized,
keyword_tokens,
plan,
)
if observer:
observer("retrieve", "deriving signals")
signals = await _derive_signals(
call_llm,
normalized,
fact_types,
plan,
)
if isinstance(signals, list):
signal_tokens = [str(item) for item in signals if item]
all_tokens = _merge_tokens(signal_tokens, keyword_tokens, question_tokens)
if observer:
observer("retrieve", "scanning chunks")
candidate_lines: list[str] = []
if signals:
for chunk in selected:
chunk_lines = chunk["text"].splitlines()
if not chunk_lines:
continue
hits = await _scan_chunk_for_signals(
call_llm,
normalized,
signals,
chunk_lines,
plan,
)
if hits:
candidate_lines.extend(hits)
candidate_lines = list(dict.fromkeys(candidate_lines))
if candidate_lines:
if observer:
observer("retrieve", "pruning candidates")
metric_facts = await _prune_metric_candidates(
call_llm,
normalized,
candidate_lines,
plan,
plan.metric_retries,
)
if metric_facts:
key_facts = _merge_fact_lines(metric_facts, key_facts)
if self._settings.debug_pipeline:
_debug_log("metric_facts_selected", {"facts": metric_facts})
if not metric_facts:
if observer:
observer("retrieve", "fallback metric selection")
fallback_candidates = _filter_lines_by_keywords(summary_lines, all_tokens, max_lines=200)
if fallback_candidates:
metric_facts = await _select_fact_lines(
call_llm,
normalized,
fallback_candidates,
plan,
max_lines=max(2, plan.max_subquestions),
)
if not metric_facts and fallback_candidates:
metric_facts = fallback_candidates[: max(2, plan.max_subquestions)]
if metric_keys:
key_lines = _lines_for_metric_keys(summary_lines, metric_keys, max_lines=plan.max_subquestions * 3)
if key_lines:
metric_facts = _merge_fact_lines(key_lines, metric_facts)
if metric_facts:
metric_facts = _ensure_token_coverage(
metric_facts,
all_tokens,
summary_lines,
max_add=plan.max_subquestions,
)
if metric_facts and not _has_keyword_overlap(metric_facts, keyword_tokens):
best_line = _best_keyword_line(summary_lines, keyword_tokens)
if best_line:
metric_facts = _merge_fact_lines([best_line], metric_facts)
if metric_facts:
key_facts = _merge_fact_lines(metric_facts, key_facts)
if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and not metric_facts and key_facts:
metric_facts = key_facts
if key_facts:
key_facts = _ensure_token_coverage(
key_facts,
all_tokens,
summary_lines,
max_add=plan.max_subquestions,
)
if self._settings.debug_pipeline:
scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
key=lambda item: item["score"],
reverse=True,
)[: min(len(chunks), max(plan.chunk_top, 6))]
_debug_log(
"chunk_selected",
{
"selected_ids": [item["id"] for item in selected],
"top_scored": scored_preview,
"metric_keys": metric_keys,
"forced_chunks": must_chunk_ids,
},
)
facts_used = list(dict.fromkeys(key_facts)) if key_facts else list(dict.fromkeys(metric_facts))
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
combined_facts = key_facts
if global_facts:
combined_facts = _merge_fact_lines(global_facts, key_facts)
if combined_facts:
snapshot_context = "KeyFacts:\n" + "\n".join(combined_facts) + "\n\n" + snapshot_context
context = _join_context(
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
)
if plan.use_tool and classify.get("needs_tool"):
if observer:
observer("tool", "suggesting tools")
tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized
tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool")
tool_hint = _parse_json_block(tool_raw, fallback={})
if observer:
observer("subanswers", "drafting subanswers")
subanswers: list[str] = []
async def _subanswer_for(subq: str) -> str:
sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq
if plan.subanswer_retries > 1:
candidates = await _gather_limited(
[
call_llm(
prompts.ANSWER_SYSTEM,
sub_prompt,
context=context,
model=plan.model,
tag="subanswer",
)
for _ in range(plan.subanswer_retries)
],
plan.parallelism,
)
best_idx = await _select_best_candidate(call_llm, subq, candidates, plan, "subanswer_select")
return candidates[best_idx]
return await call_llm(
prompts.ANSWER_SYSTEM,
sub_prompt,
context=context,
model=plan.model,
tag="subanswer",
)
if plan.parallelism > 1 and len(sub_questions) > 1:
subanswers = await _gather_limited(
[_subanswer_for(subq) for subq in sub_questions],
plan.parallelism,
)
else:
for subq in sub_questions:
subanswers.append(await _subanswer_for(subq))
if observer:
observer("synthesize", "synthesizing")
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
needs_evidence = _needs_evidence_fix(reply, classify)
hardware_terms = ("rpi", "raspberry", "jetson", "amd64", "arm64", "hardware")
hardware_line = _line_starting_with(summary_lines, "hardware_nodes:")
if any(term in lowered_question for term in hardware_terms) and hardware_line:
needs_evidence = True
if metric_facts and (classify.get("question_type") in {"metric", "diagnostic"} or force_metric):
if not _reply_matches_metric_facts(reply, metric_facts):
needs_evidence = True
if classify.get("question_type") in {"open_ended", "planning"} and metric_facts:
needs_evidence = True
resolved_runbook = None
if runbook_paths and (runbook_fix or runbook_needed):
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if candidate and candidate in runbook_paths:
resolved_runbook = candidate
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
if observer:
observer("evidence_fix", "repairing missing evidence")
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if runbook_paths:
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
if resolved_runbook:
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
if metric_facts:
extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4]))
if hardware_line:
extra_bits.append("HardwareNodes: " + hardware_line)
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context=context,
model=plan.model,
tag="evidence_fix",
)
if metric_facts and not _reply_matches_metric_facts(reply, metric_facts):
enforce_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ "\nMustIncludeFacts: "
+ "; ".join(metric_facts[:6])
+ "\nInstruction: The answer must include all MustIncludeFacts items."
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
enforce_prompt,
context=context,
model=plan.model,
tag="evidence_fix_enforce",
)
if "raspberry" in lowered_question and "not" in lowered_question:
non_rpi = _non_rpi_nodes(summary)
if non_rpi:
reply = _format_hardware_groups(non_rpi, "Non-Raspberry Pi nodes")
if unknown_nodes or unknown_namespaces:
refreshed_nodes = _find_unknown_nodes(reply, allowed_nodes)
refreshed_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
if refreshed_nodes or refreshed_namespaces:
reply = _strip_unknown_entities(reply, refreshed_nodes, refreshed_namespaces)
if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply):
if observer:
observer("runbook_enforce", "enforcing runbook path")
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
if runbook_paths:
invalid = [
token
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)
if token.lower() not in {p.lower() for p in runbook_paths}
]
if invalid:
if observer:
observer("runbook_enforce", "replacing invalid runbook path")
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if not (candidate and candidate in runbook_paths):
candidate = _best_runbook_match(invalid[0], runbook_paths)
if candidate and candidate in runbook_paths:
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces)
if facts_used and _needs_evidence_guard(reply, facts_used):
if observer:
observer("evidence_guard", "tightening unsupported claims")
guard_prompt = (
prompts.EVIDENCE_GUARD_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ "\nFactsUsed:\n"
+ "\n".join(facts_used)
)
reply = await call_llm(
prompts.EVIDENCE_GUARD_SYSTEM,
guard_prompt,
context=context,
model=plan.model,
tag="evidence_guard",
)
if _needs_focus_fix(normalized, reply, classify):
if observer:
observer("focus_fix", "tightening answer")
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply,
context=context,
model=plan.model,
tag="focus_fix",
)
if not metric_facts or not _has_keyword_overlap(metric_facts, keyword_tokens):
best_line = _best_keyword_line(summary_lines, keyword_tokens)
if best_line:
reply = f"From the latest snapshot: {best_line}."
if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and metric_facts:
best_line = None
lowered_keywords = [kw.lower() for kw in keyword_tokens if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
reply = f"From the latest snapshot: {best_line}."
if plan.use_critic:
if observer:
observer("critic", "reviewing")
critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic")
critic = _parse_json_block(critic_raw, fallback={})
if critic.get("issues"):
revise_prompt = (
prompts.REVISION_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ "\nCritique: "
+ json.dumps(critic)
)
reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise")
if plan.use_gap:
if observer:
observer("gap", "checking gaps")
gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap")
gap = _parse_json_block(gap_raw, fallback={})
note = str(gap.get("note") or "").strip()
if note:
reply = f"{reply}\n\n{note}"
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
scores = await self._score_answer(normalized, reply, plan, call_llm)
claims = await self._extract_claims(normalized, reply, summary, facts_used, call_llm)
except LLMLimitReached:
if not reply:
reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass."
scores = _default_scores()
finally:
elapsed = round(time.monotonic() - started, 2)
log.info(
"atlasbot_answer",
extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}},
)
if conversation_id and claims:
self._store_state(conversation_id, claims, summary, snapshot_used, pin_snapshot)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(reply, scores, meta)
async def _answer_stock(self, question: str) -> AnswerResult:
messages = build_messages(prompts.STOCK_SYSTEM, question)
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
async def _synthesize_answer( # noqa: PLR0913
self,
question: str,
subanswers: list[str],
context: str,
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> str:
style_hint = _style_hint(classify)
if not subanswers:
prompt = (
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
)
return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")
draft_prompts = []
for idx in range(plan.drafts):
draft_prompts.append(
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
+ "\nSubanswers:\n"
+ "\n".join([f"- {item}" for item in subanswers])
+ f"\nDraftIndex: {idx + 1}"
)
drafts: list[str] = []
if plan.parallelism > 1 and len(draft_prompts) > 1:
drafts = await _gather_limited(
[
call_llm(
prompts.SYNTHESIZE_SYSTEM,
prompt,
context=context,
model=plan.model,
tag="synth",
)
for prompt in draft_prompts
],
plan.parallelism,
)
else:
for prompt in draft_prompts:
drafts.append(
await call_llm(
prompts.SYNTHESIZE_SYSTEM,
prompt,
context=context,
model=plan.model,
tag="synth",
)
)
if len(drafts) == 1:
return drafts[0]
select_prompt = (
prompts.DRAFT_SELECT_PROMPT
+ "\nQuestion: "
+ question
+ "\nDrafts:\n"
+ "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)])
)
select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select")
selection = _parse_json_block(select_raw, fallback={})
idx = int(selection.get("best", 1)) - 1
if 0 <= idx < len(drafts):
return drafts[idx]
return drafts[0]
async def _score_answer(
self,
question: str,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
) -> AnswerScores:
if not plan.use_scores:
return _default_scores()
prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score")
data = _parse_json_block(raw, fallback={})
return _scores_from_json(data)
async def _extract_claims(
self,
question: str,
reply: str,
summary: dict[str, Any],
facts_used: list[str],
call_llm: Callable[..., Any],
) -> list[ClaimItem]:
if not reply or not summary:
return []
summary_json = _json_excerpt(summary)
facts_used = [line.strip() for line in (facts_used or []) if line and line.strip()]
facts_block = ""
if facts_used:
facts_block = "\nFactsUsed:\n" + "\n".join([f"- {line}" for line in facts_used[:12]])
prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply + facts_block
raw = await call_llm(
prompts.CLAIM_SYSTEM,
prompt,
context=f"SnapshotSummaryJson:{summary_json}",
model=self._settings.ollama_model_fast,
tag="claim_map",
)
data = _parse_json_block(raw, fallback={})
claims_raw = data.get("claims") if isinstance(data, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}"
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value = _resolve_path(summary, path)
evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value))
if claim_text and evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return claims
async def _dedup_reply(
self,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
tag: str,
) -> str:
if not _needs_dedup(reply):
return reply
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
async def _answer_followup( # noqa: C901, PLR0913
self,
question: str,
state: ConversationState,
summary: dict[str, Any],
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> str:
claim_ids = await self._select_claims(question, state.claims, plan, call_llm)
selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2]
evidence_lines = []
lowered = question.lower()
for claim in selected:
evidence_lines.append(f"Claim: {claim.claim}")
for ev in claim.evidence:
current = _resolve_path(summary, ev.path)
ev.value = current
delta_note = ""
if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim:
delta_note = f" (now {current})"
evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}")
if any(term in lowered for term in ("hotspot", "hot spot", "hottest", "jetson", "rpi", "amd64", "arm64", "hardware", "class")):
hotspot_lines = _hotspot_evidence(summary)
if hotspot_lines:
evidence_lines.append("HotspotSummary:")
evidence_lines.extend(hotspot_lines)
evidence_ctx = "\n".join(evidence_lines)
prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx
reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup")
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
if extra_bits:
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ question
+ "\nDraft: "
+ reply
+ "\n"
+ "\n".join(extra_bits)
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context="Evidence:\n" + evidence_ctx,
model=plan.model,
tag="followup_fix",
)
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup")
reply = _strip_followup_meta(reply)
return reply
async def _select_claims(
self,
question: str,
claims: list[ClaimItem],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> list[str]:
if not claims:
return []
claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims]
prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief)
raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims")
data = _parse_json_block(raw, fallback={})
ids = data.get("claim_ids") if isinstance(data, dict) else []
if isinstance(ids, list):
return [str(item) for item in ids if item]
return []
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
if not conversation_id:
return None
state_payload = self._store.get(conversation_id)
return _state_from_payload(state_payload) if state_payload else None
def _store_state(
self,
conversation_id: str,
claims: list[ClaimItem],
summary: dict[str, Any],
snapshot: dict[str, Any] | None,
pin_snapshot: bool,
) -> None:
snapshot_id = _snapshot_id(summary)
pinned_snapshot = snapshot if pin_snapshot else None
payload = {
"updated_at": time.monotonic(),
"claims": _claims_to_payload(claims),
"snapshot_id": snapshot_id,
"snapshot": pinned_snapshot,
}
self._store.set(conversation_id, payload)
def _cleanup_state(self) -> None:
self._store.cleanup()
def _strip_followup_meta(reply: str) -> str:
cleaned = reply.strip()
if not cleaned:
return cleaned
prefixes = [
"The draft is correct based on the provided context.",
"The draft is correct based on the context.",
"The draft is correct based on the provided evidence.",
"The draft is correct.",
"Based on the provided context,",
"Based on the context,",
"Based on the provided evidence,",
]
for prefix in prefixes:
if cleaned.lower().startswith(prefix.lower()):
cleaned = cleaned[len(prefix) :].lstrip(" .")
break
return cleaned
def _build_meta( # noqa: PLR0913
mode: str,
call_count: int,
call_cap: int,
limit_hit: bool,
classify: dict[str, Any],
tool_hint: dict[str, Any] | None,
started: float,
) -> dict[str, Any]:
return {
"mode": mode,
"llm_calls": call_count,
"llm_limit": call_cap,
"llm_limit_hit": limit_hit,
"classify": classify,
"tool_hint": tool_hint,
"elapsed_sec": round(time.monotonic() - started, 2),
}
def _mode_plan(settings: Settings, mode: str) -> ModePlan:
if mode == "genius":
return ModePlan(
model=settings.ollama_model_genius,
fast_model=settings.ollama_model_fast,
max_subquestions=6,
chunk_lines=6,
chunk_top=10,
chunk_group=4,
parallelism=4,
score_retries=3,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=2,
metric_retries=3,
subanswer_retries=3,
)
if mode == "smart":
return ModePlan(
model=settings.ollama_model_smart,
fast_model=settings.ollama_model_fast,
max_subquestions=4,
chunk_lines=8,
chunk_top=8,
chunk_group=4,
parallelism=2,
score_retries=2,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=1,
metric_retries=2,
subanswer_retries=2,
)
return ModePlan(
model=settings.ollama_model_fast,
fast_model=settings.ollama_model_fast,
max_subquestions=2,
chunk_lines=12,
chunk_top=5,
chunk_group=5,
parallelism=1,
score_retries=1,
use_tool=False,
use_critic=False,
use_gap=False,
use_scores=False,
drafts=1,
metric_retries=1,
subanswer_retries=1,
)
def _llm_call_limit(settings: Settings, mode: str) -> int:
if mode == "genius":
return settings.genius_llm_calls_max
if mode == "smart":
return settings.smart_llm_calls_max
return settings.fast_llm_calls_max
def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]:
if not parts:
return [fallback]
ranked = []
for entry in parts:
if not isinstance(entry, dict):
continue
question = str(entry.get("question") or "").strip()
if not question:
continue
priority = entry.get("priority")
try:
weight = float(priority)
except (TypeError, ValueError):
weight = 1.0
ranked.append((weight, question))
ranked.sort(key=lambda item: item[0], reverse=True)
questions = [item[1] for item in ranked][:limit]
return questions or [fallback]
def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]:
chunks: list[dict[str, Any]] = []
if not lines:
return chunks
for idx in range(0, len(lines), lines_per_chunk):
chunk_lines = lines[idx : idx + lines_per_chunk]
text = "\n".join(chunk_lines)
summary = " | ".join(chunk_lines[:4])
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
return chunks
def _build_chunk_groups(chunks: list[dict[str, Any]], group_size: int) -> list[list[dict[str, Any]]]:
groups: list[list[dict[str, Any]]] = []
group: list[dict[str, Any]] = []
for chunk in chunks:
group.append({"id": chunk["id"], "summary": chunk["summary"]})
if len(group) >= group_size:
groups.append(group)
group = []
if group:
groups.append(group)
return groups
async def _score_chunks(
call_llm: Callable[..., Any],
chunks: list[dict[str, Any]],
question: str,
sub_questions: list[str],
plan: ModePlan,
) -> dict[str, float]:
scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks}
if not chunks:
return scores
groups = _build_chunk_groups(chunks, plan.chunk_group)
ctx = ScoreContext(
question=question,
sub_questions=sub_questions,
retries=max(1, plan.score_retries),
parallelism=plan.parallelism,
)
if ctx.parallelism <= 1 or len(groups) * ctx.retries <= 1:
return await _score_groups_serial(call_llm, groups, ctx)
return await _score_groups_parallel(call_llm, groups, ctx)
async def _score_groups_serial(
call_llm: Callable[..., Any],
groups: list[list[dict[str, Any]]],
ctx: ScoreContext,
) -> dict[str, float]:
scores: dict[str, float] = {}
for grp in groups:
runs = [await _score_chunk_group(call_llm, grp, ctx.question, ctx.sub_questions) for _ in range(ctx.retries)]
scores.update(_merge_score_runs(runs))
return scores
async def _score_groups_parallel(
call_llm: Callable[..., Any],
groups: list[list[dict[str, Any]]],
ctx: ScoreContext,
) -> dict[str, float]:
coros: list[Awaitable[tuple[int, dict[str, float]]]] = []
for idx, grp in enumerate(groups):
for _ in range(ctx.retries):
coros.append(_score_chunk_group_run(call_llm, idx, grp, ctx.question, ctx.sub_questions))
results = await _gather_limited(coros, ctx.parallelism)
grouped: dict[int, list[dict[str, float]]] = {}
for idx, result in results:
grouped.setdefault(idx, []).append(result)
scores: dict[str, float] = {}
for runs in grouped.values():
scores.update(_merge_score_runs(runs))
return scores
async def _score_chunk_group(
call_llm: Callable[..., Any],
group: list[dict[str, Any]],
question: str,
sub_questions: list[str],
) -> dict[str, float]:
prompt = (
prompts.CHUNK_SCORE_PROMPT
+ "\nQuestion: "
+ question
+ "\nSubQuestions: "
+ json.dumps(sub_questions)
+ "\nChunks: "
+ json.dumps(group)
)
raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score")
data = _parse_json_list(raw)
scored: dict[str, float] = {}
for entry in data:
if not isinstance(entry, dict):
continue
cid = str(entry.get("id") or "").strip()
if not cid:
continue
try:
score = float(entry.get("score") or 0)
except (TypeError, ValueError):
score = 0.0
scored[cid] = score
return scored
async def _score_chunk_group_run(
call_llm: Callable[..., Any],
idx: int,
group: list[dict[str, Any]],
question: str,
sub_questions: list[str],
) -> tuple[int, dict[str, float]]:
return idx, await _score_chunk_group(call_llm, group, question, sub_questions)
def _merge_score_runs(runs: list[dict[str, float]]) -> dict[str, float]:
if not runs:
return {}
totals: dict[str, float] = {}
counts: dict[str, int] = {}
for run in runs:
for key, value in run.items():
totals[key] = totals.get(key, 0.0) + float(value)
counts[key] = counts.get(key, 0) + 1
return {key: totals[key] / counts[key] for key in totals}
def _keyword_hits(
ranked: list[dict[str, Any]],
head: dict[str, Any],
keywords: list[str] | None,
) -> list[dict[str, Any]]:
if not keywords:
return []
lowered = [kw.lower() for kw in keywords if isinstance(kw, str) and kw.strip()]
if not lowered:
return []
hits: list[dict[str, Any]] = []
for item in ranked:
if item is head:
continue
text = str(item.get("text") or "").lower()
if any(kw in text for kw in lowered):
hits.append(item)
return hits
def _select_chunks(
chunks: list[dict[str, Any]],
scores: dict[str, float],
plan: ModePlan,
keywords: list[str] | None = None,
must_ids: list[str] | None = None,
) -> list[dict[str, Any]]:
if not chunks:
return []
ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True)
selected: list[dict[str, Any]] = [chunks[0]]
if _append_must_chunks(chunks, selected, must_ids, plan.chunk_top):
return selected
if _append_keyword_chunks(ranked, selected, keywords, plan.chunk_top):
return selected
_append_ranked_chunks(ranked, selected, plan.chunk_top)
return selected
def _append_must_chunks(
chunks: list[dict[str, Any]],
selected: list[dict[str, Any]],
must_ids: list[str] | None,
limit: int,
) -> bool:
if not must_ids:
return False
id_map = {item["id"]: item for item in chunks}
for cid in must_ids:
item = id_map.get(cid)
if item and item not in selected:
selected.append(item)
if len(selected) >= limit:
return True
return False
def _append_keyword_chunks(
ranked: list[dict[str, Any]],
selected: list[dict[str, Any]],
keywords: list[str] | None,
limit: int,
) -> bool:
if not ranked:
return False
head = ranked[0]
for item in _keyword_hits(ranked, head, keywords):
if item not in selected:
selected.append(item)
if len(selected) >= limit:
return True
return False
def _append_ranked_chunks(
ranked: list[dict[str, Any]],
selected: list[dict[str, Any]],
limit: int,
) -> None:
for item in ranked:
if len(selected) >= limit:
break
if item not in selected:
selected.append(item)
def _format_runbooks(runbooks: list[str]) -> str:
if not runbooks:
return ""
return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks])
def _join_context(parts: list[str]) -> str:
text = "\n".join([part for part in parts if part])
return text.strip()
def _format_history(history: list[dict[str, str]] | None) -> str:
if not history:
return ""
lines = ["Recent conversation (non-authoritative):"]
for entry in history[-4:]:
if not isinstance(entry, dict):
continue
question = entry.get("q")
answer = entry.get("a")
role = entry.get("role")
content = entry.get("content")
if question:
lines.append(f"Q: {question}")
if answer:
lines.append(f"A: {answer}")
if role and content:
prefix = "Q" if role == "user" else "A"
lines.append(f"{prefix}: {content}")
return "\n".join(lines)
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
text = summary_text(snapshot)
if not text:
return []
return [line for line in text.splitlines() if line.strip()]
async def _select_metric_chunks(
call_llm: Callable[..., Awaitable[str]],
ctx: dict[str, Any],
chunks: list[dict[str, Any]],
plan: ModePlan,
) -> tuple[list[str], list[str]]:
summary_lines, question, sub_questions, keywords, token_set = _metric_ctx_values(ctx)
if not summary_lines or not chunks:
return [], []
keys = _extract_metric_keys(summary_lines)
if not keys:
return [], []
max_keys = max(4, plan.max_subquestions * 2)
candidate_keys = _filter_metric_keys(keys, token_set)
available_keys = candidate_keys or keys
prompt = prompts.METRIC_KEYS_PROMPT.format(available="\n".join(available_keys), max_keys=max_keys)
raw = await call_llm(
prompts.METRIC_KEYS_SYSTEM,
prompt + "\nQuestion: " + str(question) + "\nSubQuestions:\n" + "\n".join([str(item) for item in sub_questions]),
context="Keywords:\n" + ", ".join([str(item) for item in keywords if item]),
model=plan.fast_model,
tag="metric_keys",
)
selected = _parse_key_list(raw, available_keys, max_keys)
if candidate_keys:
selected = _merge_metric_keys(selected, candidate_keys, max_keys)
if selected and candidate_keys and not _metric_key_overlap(selected, token_set):
selected = candidate_keys[:max_keys]
if not selected and candidate_keys:
selected = candidate_keys[:max_keys]
if available_keys:
missing = await _validate_metric_keys(
call_llm,
{
"question": question,
"sub_questions": sub_questions,
"selected": selected,
},
available_keys,
plan,
)
if missing:
selected = _merge_metric_keys(selected, missing, max_keys)
if not selected:
return [], []
ids = _chunk_ids_for_keys(chunks, selected)
return selected, ids
async def _validate_metric_keys(
call_llm: Callable[..., Awaitable[str]],
ctx: dict[str, Any],
available: list[str],
plan: ModePlan,
) -> list[str]:
if not available:
return []
question = str(ctx.get("question") or "")
sub_questions = ctx.get("sub_questions") if isinstance(ctx.get("sub_questions"), list) else []
selected = ctx.get("selected") if isinstance(ctx.get("selected"), list) else []
cap = max(12, plan.max_subquestions * 4)
available_list = available[:cap]
prompt = prompts.METRIC_KEYS_VALIDATE_PROMPT.format(
question=question,
sub_questions=json.dumps(sub_questions),
selected=json.dumps(selected),
available="\n".join(available_list),
)
raw = await call_llm(
prompts.METRIC_KEYS_VALIDATE_SYSTEM,
prompt,
model=plan.fast_model,
tag="metric_keys_validate",
)
parsed = _parse_json_block(raw, fallback={})
items = parsed.get("missing") if isinstance(parsed, dict) else []
if not isinstance(items, list):
return []
available_set = set(available_list)
out: list[str] = []
for item in items:
if isinstance(item, str) and item in available_set and item not in out:
out.append(item)
return out
async def _gather_limited(coros: list[Awaitable[Any]], limit: int) -> list[Any]:
if not coros:
return []
semaphore = asyncio.Semaphore(max(1, limit))
async def runner(coro: Awaitable[Any]) -> Any:
async with semaphore:
return await coro
return await asyncio.gather(*(runner(coro) for coro in coros))
def _metric_ctx_values(ctx: dict[str, Any]) -> tuple[list[str], str, list[str], list[str], set[str]]:
summary_lines = ctx.get("summary_lines") if isinstance(ctx, dict) else None
if not isinstance(summary_lines, list):
return [], "", [], [], set()
question = ctx.get("question") if isinstance(ctx, dict) else ""
sub_questions = ctx.get("sub_questions") if isinstance(ctx, dict) else []
keywords = ctx.get("keywords") if isinstance(ctx, dict) else []
keyword_tokens = ctx.get("keyword_tokens") if isinstance(ctx, dict) else []
token_set = set([str(token) for token in keyword_tokens if token])
token_set |= set(_extract_keywords(str(question), str(question), sub_questions=sub_questions, keywords=keywords))
return summary_lines, str(question), sub_questions, keywords, token_set
def _extract_metric_keys(lines: list[str]) -> list[str]:
keys: list[str] = []
for line in lines:
if ":" not in line:
continue
key = line.split(":", 1)[0].strip()
if not key or " " in key:
continue
if key not in keys:
keys.append(key)
return keys
def _parse_key_list(raw: str, allowed: list[str], max_keys: int) -> list[str]:
parsed = _parse_json_block(raw, fallback={})
if isinstance(parsed, list):
items = parsed
else:
items = parsed.get("keys") if isinstance(parsed, dict) else []
if not isinstance(items, list):
return []
allowed_set = set(allowed)
out: list[str] = []
for item in items:
if not isinstance(item, str):
continue
if item in allowed_set and item not in out:
out.append(item)
if len(out) >= max_keys:
break
return out
def _chunk_ids_for_keys(chunks: list[dict[str, Any]], keys: list[str]) -> list[str]:
if not keys:
return []
ids: list[str] = []
key_set = {f"{key}:" for key in keys}
for chunk in chunks:
text = str(chunk.get("text") or "")
if not text:
continue
for line in text.splitlines():
for key in key_set:
if line.startswith(key):
cid = chunk.get("id")
if cid and cid not in ids:
ids.append(cid)
break
return ids
def _filter_metric_keys(keys: list[str], tokens: set[str]) -> list[str]:
if not keys or not tokens:
return []
lowered_tokens = {token.lower() for token in tokens if token and len(token) >= TOKEN_MIN_LEN}
ranked: list[tuple[int, str]] = []
for key in keys:
parts = [part for part in re.split(r"[_\\W]+", key.lower()) if part]
if not parts:
continue
hits = len(set(parts) & lowered_tokens)
if hits:
ranked.append((hits, key))
ranked.sort(key=lambda item: (-item[0], item[1]))
return [item[1] for item in ranked]
def _metric_key_overlap(keys: list[str], tokens: set[str]) -> bool:
if not keys or not tokens:
return False
lowered_tokens = {token.lower() for token in tokens if token and len(token) >= TOKEN_MIN_LEN}
for key in keys:
parts = [part for part in re.split(r"[_\\W]+", key.lower()) if part]
if set(parts) & lowered_tokens:
return True
return False
def _lines_for_metric_keys(lines: list[str], keys: list[str], max_lines: int = 0) -> list[str]:
if not lines or not keys:
return []
prefixes = {f"{key}:" for key in keys}
selected: list[str] = []
for line in lines:
for prefix in prefixes:
if line.startswith(prefix):
selected.append(line)
break
if max_lines and len(selected) >= max_lines:
break
return selected
def _merge_metric_keys(current: list[str], candidates: list[str], max_keys: int) -> list[str]:
merged: list[str] = []
seen = set()
for key in current:
if key and key not in seen:
merged.append(key)
seen.add(key)
for key in candidates:
if key and key not in seen:
merged.append(key)
seen.add(key)
if len(merged) >= max_keys:
break
return merged[:max_keys]
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
seen = set()
merged: list[str] = []
for line in primary + fallback:
if line in seen:
continue
seen.add(line)
merged.append(line)
return merged
def _expand_hottest_line(line: str) -> list[str]:
if not line:
return []
if not line.lower().startswith("hottest:"):
return []
expanded: list[str] = []
payload = line.split("hottest:", 1)[1]
for part in payload.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
metric, rest = part.split("=", 1)
metric = metric.strip()
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
if not match:
continue
node = match.group("node").strip()
value = match.group("value").strip()
class_match = re.search(r"\[(?P<class>[^\]]+)\]", rest)
node_class = class_match.group("class").strip() if class_match else ""
if node_class:
expanded.append(f"hottest_{metric}_node: {node} [{node_class}] ({value})")
else:
expanded.append(f"hottest_{metric}_node: {node} ({value})")
return expanded
def _has_token(text: str, token: str) -> bool:
if not text or not token:
return False
if token == "io":
return "i/o" in text or re.search(r"\bio\b", text) is not None
return re.search(rf"\b{re.escape(token)}\b", text) is not None
def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
if not hottest:
return []
hardware_by_node = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
node_pods_top = summary.get("node_pods_top") if isinstance(summary.get("node_pods_top"), list) else []
ns_map = {}
for item in node_pods_top:
if not isinstance(item, dict):
continue
node = item.get("node")
namespaces_top = item.get("namespaces_top") if isinstance(item.get("namespaces_top"), list) else []
ns_map[node] = namespaces_top
lines: list[str] = []
for metric, info in hottest.items():
if not isinstance(info, dict):
continue
node = info.get("node")
value = info.get("value")
if not node:
continue
node_class = hardware_by_node.get(node)
ns_parts = []
for entry in ns_map.get(node, [])[:3]:
if isinstance(entry, (list, tuple)) and len(entry) >= NS_ENTRY_MIN_LEN:
ns_parts.append(f"{entry[0]}={entry[1]}")
ns_text = ", ".join(ns_parts)
value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value)
line = f"hotspot.{metric}: node={node} class={node_class or 'unknown'} value={value_text}"
if ns_text:
line += f" namespaces_top={ns_text}"
lines.append(line)
return lines
def _metric_key_tokens(summary_lines: list[str]) -> set[str]:
tokens: set[str] = set()
for line in summary_lines:
if not isinstance(line, str) or ":" not in line:
continue
key = line.split(":", 1)[0].strip().lower()
if not key:
continue
tokens.add(key)
for part in re.split(r"[_\\s]+", key):
if part:
tokens.add(part)
return tokens
async def _select_best_candidate(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
tag: str,
) -> int:
if len(candidates) <= 1:
return 0
prompt = (
prompts.CANDIDATE_SELECT_PROMPT
+ "\nQuestion: "
+ question
+ "\nCandidates:\n"
+ "\n".join([f"{idx+1}) {cand}" for idx, cand in enumerate(candidates)])
)
raw = await call_llm(prompts.CANDIDATE_SELECT_SYSTEM, prompt, model=plan.model, tag=tag)
data = _parse_json_block(raw, fallback={})
best = data.get("best") if isinstance(data, dict) else None
if isinstance(best, int) and 1 <= best <= len(candidates):
return best - 1
return 0
def _dedupe_lines(lines: list[str], limit: int | None = None) -> list[str]:
seen: set[str] = set()
cleaned: list[str] = []
for line in lines:
value = (line or "").strip()
if not value or value in seen:
continue
if value.lower().startswith("lexicon_") or value.lower().startswith("units:"):
continue
cleaned.append(value)
seen.add(value)
if limit and len(cleaned) >= limit:
break
return cleaned
def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list[str]:
lines: list[str] = []
for chunk in selected:
text = chunk.get("text") if isinstance(chunk, dict) else None
if not isinstance(text, str):
continue
lines.extend([line for line in text.splitlines() if line.strip()])
return _dedupe_lines(lines, limit=limit)
async def _select_best_list(
call_llm: Callable[..., Any],
question: str,
candidates: list[list[str]],
plan: ModePlan,
tag: str,
) -> list[str]:
if not candidates:
return []
if len(candidates) == 1:
return candidates[0]
render = ["; ".join(items) for items in candidates]
best_idx = await _select_best_candidate(call_llm, question, render, plan, tag)
chosen = candidates[best_idx] if 0 <= best_idx < len(candidates) else candidates[0]
if not chosen:
merged: list[str] = []
for entry in candidates:
for item in entry:
if item not in merged:
merged.append(item)
chosen = merged
return chosen
async def _extract_fact_types(
call_llm: Callable[..., Any],
question: str,
keywords: list[str],
plan: ModePlan,
) -> list[str]:
prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question
if keywords:
prompt += "\nKeywords: " + ", ".join(keywords)
candidates: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.FACT_TYPES_SYSTEM, prompt, model=plan.fast_model, tag="fact_types")
data = _parse_json_block(raw, fallback={})
items = data.get("fact_types") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=10)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "fact_types_select")
return chosen[:10]
async def _derive_signals(
call_llm: Callable[..., Any],
question: str,
fact_types: list[str],
plan: ModePlan,
) -> list[str]:
if not fact_types:
return []
prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types))
candidates: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.SIGNAL_SYSTEM, prompt, model=plan.fast_model, tag="signals")
data = _parse_json_block(raw, fallback={})
items = data.get("signals") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=12)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "signals_select")
return chosen[:12]
async def _scan_chunk_for_signals(
call_llm: Callable[..., Any],
question: str,
signals: list[str],
chunk_lines: list[str],
plan: ModePlan,
) -> list[str]:
if not signals or not chunk_lines:
return []
prompt = prompts.CHUNK_SCAN_PROMPT.format(
signals="; ".join(signals),
lines="\n".join(chunk_lines),
)
attempts = max(1, min(plan.metric_retries, 2))
candidates: list[list[str]] = []
for _ in range(attempts):
raw = await call_llm(prompts.CHUNK_SCAN_SYSTEM, prompt, model=plan.fast_model, tag="chunk_scan")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in chunk_lines if line in items]
cleaned = _dedupe_lines(cleaned, limit=15)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "chunk_scan_select")
return chosen[:15]
async def _prune_metric_candidates(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
attempts: int,
) -> list[str]:
if not candidates:
return []
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6)
picks: list[list[str]] = []
for _ in range(max(attempts, 1)):
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_prune")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in candidates if line in items]
cleaned = _dedupe_lines(cleaned, limit=6)
if cleaned:
picks.append(cleaned)
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_prune_select")
return chosen[:6]
async def _select_fact_lines(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
max_lines: int,
) -> list[str]:
if not candidates:
return []
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines)
picks: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in candidates if line in items]
cleaned = _dedupe_lines(cleaned, limit=max_lines)
if cleaned:
picks.append(cleaned)
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_select_best")
return chosen[:max_lines]
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
if not reply:
return reply
if not unknown_nodes and not unknown_namespaces:
return reply
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if not sentences:
return reply
lowered_nodes = [node.lower() for node in unknown_nodes]
lowered_namespaces = [ns.lower() for ns in unknown_namespaces]
kept: list[str] = []
for sent in sentences:
lower = sent.lower()
if lowered_nodes and any(node in lower for node in lowered_nodes):
continue
if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces):
continue
kept.append(sent)
cleaned = " ".join(kept).strip()
return cleaned or reply
def _needs_evidence_guard(reply: str, facts: list[str]) -> bool:
if not reply or not facts:
return False
lower_reply = reply.lower()
fact_text = " ".join(facts).lower()
node_pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE)
nodes = {m.group(1).lower() for m in node_pattern.finditer(reply)}
if nodes:
missing = [node for node in nodes if node not in fact_text]
if missing:
return True
pressure_terms = ("pressure", "diskpressure", "memorypressure", "pidpressure", "headroom")
if any(term in lower_reply for term in pressure_terms) and not any(term in fact_text for term in pressure_terms):
return True
arch_terms = ("amd64", "arm64", "rpi", "rpi4", "rpi5", "jetson")
if any(term in lower_reply for term in arch_terms) and not any(term in fact_text for term in arch_terms):
return True
return False
def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]:
if not lines:
return []
tokens = _expand_tokens(keywords)
if not tokens:
return lines[:max_lines]
filtered = [line for line in lines if any(tok in line.lower() for tok in tokens)]
return (filtered or lines)[:max_lines]
def _global_facts(lines: list[str]) -> list[str]:
if not lines:
return []
wanted = ("nodes_total", "nodes_ready", "cluster_name", "cluster", "nodes_not_ready")
facts: list[str] = []
for line in lines:
lower = line.lower()
if any(key in lower for key in wanted):
facts.append(line)
return _dedupe_lines(facts, limit=6)
def _has_keyword_overlap(lines: list[str], keywords: list[str]) -> bool:
if not lines or not keywords:
return False
tokens = _expand_tokens(keywords)
if not tokens:
return False
for line in lines:
lower = line.lower()
if any(tok in lower for tok in tokens):
return True
return False
def _merge_tokens(primary: list[str], secondary: list[str], third: list[str] | None = None) -> list[str]:
merged: list[str] = []
for token in primary + secondary + (third or []):
if not token:
continue
if token not in merged:
merged.append(token)
return merged
def _extract_question_tokens(question: str) -> list[str]:
if not question:
return []
tokens: list[str] = []
for part in re.split(r"[^a-zA-Z0-9_-]+", question.lower()):
if len(part) < TOKEN_MIN_LEN:
continue
if part not in tokens:
tokens.append(part)
return tokens
def _expand_tokens(tokens: list[str]) -> list[str]:
if not tokens:
return []
expanded: list[str] = []
for token in tokens:
if not isinstance(token, str):
continue
for part in re.split(r"[^a-zA-Z0-9_-]+", token.lower()):
if len(part) < TOKEN_MIN_LEN:
continue
if part not in expanded:
expanded.append(part)
return expanded
def _ensure_token_coverage(
lines: list[str],
tokens: list[str],
summary_lines: list[str],
max_add: int = 4,
) -> list[str]:
if not lines or not tokens or not summary_lines:
return lines
hay = " ".join(lines).lower()
missing = [tok for tok in tokens if tok and tok.lower() not in hay]
if not missing:
return lines
added: list[str] = []
for token in missing:
token_lower = token.lower()
for line in summary_lines:
if token_lower in line.lower() and line not in lines and line not in added:
added.append(line)
break
if len(added) >= max_add:
break
if not added:
return lines
return _merge_fact_lines(added, lines)
def _best_keyword_line(lines: list[str], keywords: list[str]) -> str | None:
if not lines or not keywords:
return None
tokens = _expand_tokens(keywords)
if not tokens:
return None
best = None
best_score = 0
for line in lines:
lower = line.lower()
score = sum(1 for tok in tokens if tok in lower)
if score > best_score:
best_score = score
best = line
return best if best_score > 0 else None
def _line_starting_with(lines: list[str], prefix: str) -> str | None:
if not lines or not prefix:
return None
lower_prefix = prefix.lower()
for line in lines:
if str(line).lower().startswith(lower_prefix):
return line
return None
def _non_rpi_nodes(summary: dict[str, Any]) -> dict[str, list[str]]:
hardware = summary.get("hardware_by_node") if isinstance(summary, dict) else None
if not isinstance(hardware, dict):
return {}
grouped: dict[str, list[str]] = {}
for node, hw in hardware.items():
if not isinstance(node, str) or not isinstance(hw, str):
continue
if hw.startswith("rpi"):
continue
grouped.setdefault(hw, []).append(node)
for nodes in grouped.values():
nodes.sort()
return grouped
def _format_hardware_groups(groups: dict[str, list[str]], label: str) -> str:
if not groups:
return ""
parts = []
for hw, nodes in sorted(groups.items()):
parts.append(f"{hw} ({', '.join(nodes)})")
return f"{label}: " + "; ".join(parts) + "."
def _lexicon_context(summary: dict[str, Any]) -> str: # noqa: C901
if not isinstance(summary, dict):
return ""
lexicon = summary.get("lexicon")
if not isinstance(lexicon, dict):
return ""
terms = lexicon.get("terms")
aliases = lexicon.get("aliases")
lines: list[str] = []
if isinstance(terms, list):
for entry in terms[:8]:
if not isinstance(entry, dict):
continue
term = entry.get("term")
meaning = entry.get("meaning")
if term and meaning:
lines.append(f"{term}: {meaning}")
if isinstance(aliases, dict):
for key, value in list(aliases.items())[:6]:
if key and value:
lines.append(f"alias {key} -> {value}")
if not lines:
return ""
return "Lexicon:\n" + "\n".join(lines)
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
raw = text.strip()
match = re.search(r"\{.*\}", raw, flags=re.S)
if match:
return parse_json(match.group(0), fallback=fallback)
return parse_json(raw, fallback=fallback)
def _parse_json_list(text: str) -> list[dict[str, Any]]:
raw = text.strip()
match = re.search(r"\[.*\]", raw, flags=re.S)
data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={})
if isinstance(data, list):
return [entry for entry in data if isinstance(entry, dict)]
return []
def _scores_from_json(data: dict[str, Any]) -> AnswerScores:
return AnswerScores(
confidence=_coerce_int(data.get("confidence"), 60),
relevance=_coerce_int(data.get("relevance"), 60),
satisfaction=_coerce_int(data.get("satisfaction"), 60),
hallucination_risk=str(data.get("hallucination_risk") or "medium"),
)
def _coerce_int(value: Any, default: int) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def _default_scores() -> AnswerScores:
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
def _style_hint(classify: dict[str, Any]) -> str:
style = (classify.get("answer_style") or "").strip().lower()
qtype = (classify.get("question_type") or "").strip().lower()
if style == "insightful" or qtype in {"open_ended", "planning"}:
return "insightful"
return "direct"
def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
lowered = reply.lower()
missing_markers = (
"don't have",
"do not have",
"don't know",
"cannot",
"can't",
"need to",
"would need",
"does not provide",
"does not mention",
"not mention",
"not provided",
"not in context",
"not referenced",
"missing",
"no specific",
"no information",
)
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
return True
if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply):
return True
return False
def _reply_matches_metric_facts(reply: str, metric_facts: list[str]) -> bool:
if not reply or not metric_facts:
return True
reply_numbers = set(re.findall(r"\d+(?:\\.\d+)?", reply))
if not reply_numbers:
return False
fact_numbers = set(re.findall(r"\d+(?:\\.\d+)?", " ".join(metric_facts)))
if not fact_numbers:
return True
return bool(reply_numbers & fact_numbers)
def _needs_dedup(reply: str) -> bool:
if not reply:
return False
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if len(sentences) < DEDUP_MIN_SENTENCES:
return False
seen = set()
for sent in sentences:
norm = re.sub(r"\\s+", " ", sent.lower())
if norm in seen:
return True
seen.add(norm)
return False
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
q_lower = (question or "").lower()
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
return False
missing_markers = (
"does not provide",
"does not specify",
"not available",
"not provided",
"cannot determine",
"don't have",
"do not have",
"insufficient",
"no data",
)
if any(marker in reply.lower() for marker in missing_markers):
return True
if reply.count(".") <= 1:
return False
extra_markers = ("for more", "if you need", "additional", "based on")
return any(marker in reply.lower() for marker in extra_markers)
def _extract_keywords(
raw_question: str,
normalized: str,
sub_questions: list[str],
keywords: list[Any] | None,
) -> list[str]:
stopwords = {
"the",
"and",
"for",
"with",
"that",
"this",
"what",
"which",
"when",
"where",
"who",
"why",
"how",
"tell",
"show",
"list",
"give",
"about",
"right",
"now",
}
tokens: list[str] = []
for source in [raw_question, normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < TOKEN_MIN_LEN or part in stopwords:
continue
tokens.append(part)
if keywords:
for kw in keywords:
if isinstance(kw, str):
part = kw.strip().lower()
if part and part not in stopwords and part not in tokens:
tokens.append(part)
return list(dict.fromkeys(tokens))[:12]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:
return sorted([node for node in hardware.keys() if isinstance(node, str)])
return []
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
namespaces: list[str] = []
for entry in summary.get("namespace_pods") or []:
if isinstance(entry, dict):
name = entry.get("namespace")
if name:
namespaces.append(str(name))
return sorted(set(namespaces))
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
if not reply or not allowed:
return False
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
if not paths:
return False
allowed_set = {p.lower() for p in allowed}
return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
if not candidate or not allowed:
return None
best = None
best_score = 0.0
for path in allowed:
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
if score > best_score:
best_score = score
best = path
return best if best_score >= RUNBOOK_SIMILARITY_THRESHOLD else None
def _resolve_path(data: Any, path: str) -> Any | None:
if path.startswith("line:"):
return path.split("line:", 1)[1].strip()
cursor = data
for part in re.split(r"\.(?![^\[]*\])", path):
if not part:
continue
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
if not match:
return None
key = match.group(1)
index = match.group(2)
if isinstance(cursor, dict):
cursor = cursor.get(key)
else:
return None
if index is not None:
try:
idx = int(index)
if isinstance(cursor, list) and 0 <= idx < len(cursor):
cursor = cursor[idx]
else:
return None
except ValueError:
return None
return cursor
def _snapshot_id(summary: dict[str, Any]) -> str | None:
if not summary:
return None
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
value = summary.get(key)
if isinstance(value, str) and value:
return value
return None
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for claim in claims:
evidence = []
for ev in claim.evidence:
evidence.append(
{
"path": ev.path,
"reason": ev.reason,
"value_at_claim": ev.value_at_claim,
}
)
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
return output
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
if not payload:
return None
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip()
if not claim_text or not claim_id:
continue
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value_at_claim = ev.get("value_at_claim")
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
if evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return ConversationState(
updated_at=float(payload.get("updated_at") or time.monotonic()),
claims=claims,
snapshot_id=payload.get("snapshot_id"),
snapshot=payload.get("snapshot"),
)
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
raw = json.dumps(summary, ensure_ascii=False)
return raw[:max_chars]