1992 lines
75 KiB
Python
1992 lines
75 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import math
|
|
import re
|
|
import time
|
|
import difflib
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable
|
|
|
|
from atlasbot.config import Settings
|
|
from atlasbot.knowledge.loader import KnowledgeBase
|
|
from atlasbot.llm.client import LLMClient, build_messages, parse_json
|
|
from atlasbot.llm import prompts
|
|
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
|
|
from atlasbot.state.store import ClaimStore
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
FOLLOWUP_SHORT_WORDS = 6
|
|
TOKEN_MIN_LEN = 3
|
|
NS_ENTRY_MIN_LEN = 2
|
|
DEDUP_MIN_SENTENCES = 3
|
|
RUNBOOK_SIMILARITY_THRESHOLD = 0.4
|
|
|
|
|
|
class LLMLimitReached(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AnswerScores:
|
|
confidence: int
|
|
relevance: int
|
|
satisfaction: int
|
|
hallucination_risk: str
|
|
|
|
|
|
@dataclass
|
|
class AnswerResult:
|
|
reply: str
|
|
scores: AnswerScores
|
|
meta: dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class EvidenceItem:
|
|
path: str
|
|
reason: str
|
|
value: Any | None = None
|
|
value_at_claim: Any | None = None
|
|
|
|
|
|
@dataclass
|
|
class ClaimItem:
|
|
id: str
|
|
claim: str
|
|
evidence: list[EvidenceItem]
|
|
|
|
|
|
@dataclass
|
|
class ConversationState:
|
|
updated_at: float
|
|
claims: list[ClaimItem]
|
|
snapshot_id: str | None = None
|
|
snapshot: dict[str, Any] | None = None
|
|
|
|
|
|
@dataclass
|
|
class ModePlan:
|
|
model: str
|
|
fast_model: str
|
|
max_subquestions: int
|
|
chunk_lines: int
|
|
chunk_top: int
|
|
chunk_group: int
|
|
use_tool: bool
|
|
use_critic: bool
|
|
use_gap: bool
|
|
use_scores: bool
|
|
drafts: int
|
|
metric_retries: int
|
|
subanswer_retries: int
|
|
|
|
|
|
class AnswerEngine:
|
|
def __init__(
|
|
self,
|
|
settings: Settings,
|
|
llm: LLMClient,
|
|
kb: KnowledgeBase,
|
|
snapshot: SnapshotProvider,
|
|
) -> None:
|
|
self._settings = settings
|
|
self._llm = llm
|
|
self._kb = kb
|
|
self._snapshot = snapshot
|
|
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
|
|
|
|
async def answer( # noqa: C901, PLR0912, PLR0913, PLR0915
|
|
self,
|
|
question: str,
|
|
*,
|
|
mode: str,
|
|
history: list[dict[str, str]] | None = None,
|
|
observer: Callable[[str, str], None] | None = None,
|
|
conversation_id: str | None = None,
|
|
snapshot_pin: bool | None = None,
|
|
) -> AnswerResult:
|
|
question = (question or "").strip()
|
|
if not question:
|
|
return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode})
|
|
if mode == "stock":
|
|
return await self._answer_stock(question)
|
|
|
|
limitless = "run limitless" in question.lower()
|
|
if limitless:
|
|
question = re.sub(r"(?i)run limitless", "", question).strip()
|
|
plan = _mode_plan(self._settings, mode)
|
|
call_limit = _llm_call_limit(self._settings, mode)
|
|
call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier)
|
|
call_count = 0
|
|
limit_hit = False
|
|
|
|
debug_tags = {
|
|
"route",
|
|
"decompose",
|
|
"chunk_score",
|
|
"fact_select",
|
|
"synth",
|
|
"subanswer",
|
|
"tool",
|
|
"followup",
|
|
"select_claims",
|
|
"evidence_fix",
|
|
}
|
|
|
|
def _debug_log(name: str, payload: Any) -> None:
|
|
if not self._settings.debug_pipeline:
|
|
return
|
|
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
|
|
|
|
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
|
|
nonlocal call_count, limit_hit
|
|
if not limitless and call_count >= call_cap:
|
|
limit_hit = True
|
|
raise LLMLimitReached("llm_limit")
|
|
call_count += 1
|
|
messages = build_messages(system, prompt, context=context)
|
|
response = await self._llm.chat(messages, model=model or plan.model)
|
|
log.info(
|
|
"atlasbot_llm_call",
|
|
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
|
|
)
|
|
if self._settings.debug_pipeline and tag in debug_tags:
|
|
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
|
|
return response
|
|
|
|
state = self._get_state(conversation_id)
|
|
pin_snapshot = bool(snapshot_pin) or self._settings.snapshot_pin_enabled
|
|
snapshot = self._snapshot.get()
|
|
snapshot_used = snapshot
|
|
if pin_snapshot and state and state.snapshot:
|
|
snapshot_used = state.snapshot
|
|
summary = build_summary(snapshot_used)
|
|
allowed_nodes = _allowed_nodes(summary)
|
|
allowed_namespaces = _allowed_namespaces(summary)
|
|
summary_lines = _summary_lines(snapshot_used)
|
|
global_facts = _global_facts(summary_lines)
|
|
kb_summary = self._kb.summary()
|
|
runbooks = self._kb.runbook_titles(limit=6)
|
|
runbook_paths = self._kb.runbook_paths(limit=10)
|
|
history_ctx = _format_history(history)
|
|
lexicon_ctx = _lexicon_context(summary)
|
|
key_facts: list[str] = []
|
|
metric_facts: list[str] = []
|
|
facts_used: list[str] = []
|
|
|
|
started = time.monotonic()
|
|
reply = ""
|
|
scores = _default_scores()
|
|
claims: list[ClaimItem] = []
|
|
classify: dict[str, Any] = {}
|
|
tool_hint: dict[str, Any] | None = None
|
|
try:
|
|
if observer:
|
|
observer("normalize", "normalizing")
|
|
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
|
|
normalize_raw = await call_llm(
|
|
prompts.NORMALIZE_SYSTEM,
|
|
normalize_prompt,
|
|
context=lexicon_ctx,
|
|
model=plan.fast_model,
|
|
tag="normalize",
|
|
)
|
|
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
|
|
normalized = str(normalize.get("normalized") or question).strip() or question
|
|
keywords = normalize.get("keywords") or []
|
|
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
|
|
keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords)
|
|
question_tokens = _extract_question_tokens(normalized)
|
|
|
|
if observer:
|
|
observer("route", "routing")
|
|
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
|
|
route_raw = await call_llm(
|
|
prompts.ROUTE_SYSTEM,
|
|
route_prompt,
|
|
context=_join_context([kb_summary, lexicon_ctx]),
|
|
model=plan.fast_model,
|
|
tag="route",
|
|
)
|
|
classify = _parse_json_block(route_raw, fallback={})
|
|
classify.setdefault("needs_snapshot", True)
|
|
classify.setdefault("answer_style", "direct")
|
|
classify.setdefault("follow_up", False)
|
|
classify.setdefault("focus_entity", "unknown")
|
|
classify.setdefault("focus_metric", "unknown")
|
|
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
|
|
lowered_question = f"{question} {normalized}".lower()
|
|
force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", lowered_question))
|
|
if any(term in lowered_question for term in ("postgres", "connections", "pvc", "ready")):
|
|
force_metric = True
|
|
cluster_terms = (
|
|
"atlas",
|
|
"cluster",
|
|
"node",
|
|
"nodes",
|
|
"namespace",
|
|
"pod",
|
|
"workload",
|
|
"k8s",
|
|
"kubernetes",
|
|
"postgres",
|
|
"database",
|
|
"db",
|
|
"connections",
|
|
"cpu",
|
|
"ram",
|
|
"memory",
|
|
"network",
|
|
"io",
|
|
"disk",
|
|
"pvc",
|
|
"storage",
|
|
)
|
|
if any(term in lowered_question for term in cluster_terms):
|
|
classify["needs_snapshot"] = True
|
|
lowered_norm = normalized.lower()
|
|
if (
|
|
("namespace" in lowered_norm and ("pod" in lowered_norm or "pods" in lowered_norm))
|
|
or re.search(r"\bmost\s+pods\b", lowered_norm)
|
|
or re.search(r"\bpods\s+running\b", lowered_norm)
|
|
):
|
|
classify["question_type"] = "metric"
|
|
classify["needs_snapshot"] = True
|
|
if re.search(r"\b(how many|count|number of|list)\b", lowered_question):
|
|
classify["question_type"] = "metric"
|
|
if any(term in lowered_question for term in ("postgres", "connections", "db")):
|
|
classify["question_type"] = "metric"
|
|
classify["needs_snapshot"] = True
|
|
if any(term in lowered_question for term in ("pvc", "persistentvolume", "persistent volume", "storage")):
|
|
if classify.get("question_type") not in {"metric", "diagnostic"}:
|
|
classify["question_type"] = "metric"
|
|
classify["needs_snapshot"] = True
|
|
if "ready" in lowered_question and classify.get("question_type") not in {"metric", "diagnostic"}:
|
|
classify["question_type"] = "diagnostic"
|
|
hottest_terms = ("hottest", "highest", "lowest", "most")
|
|
metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage", "pod", "pods", "namespace")
|
|
if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms):
|
|
classify["question_type"] = "metric"
|
|
|
|
if not classify.get("follow_up") and state and state.claims:
|
|
follow_terms = ("there", "that", "those", "these", "it", "them", "that one", "this", "former", "latter")
|
|
if any(term in lowered_question for term in follow_terms) or len(normalized.split()) <= FOLLOWUP_SHORT_WORDS:
|
|
classify["follow_up"] = True
|
|
|
|
if classify.get("follow_up") and state and state.claims:
|
|
if observer:
|
|
observer("followup", "answering follow-up")
|
|
reply = await self._answer_followup(question, state, summary, classify, plan, call_llm)
|
|
scores = await self._score_answer(question, reply, plan, call_llm)
|
|
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
|
|
return AnswerResult(reply, scores, meta)
|
|
|
|
if observer:
|
|
observer("decompose", "decomposing")
|
|
decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2)
|
|
decompose_raw = await call_llm(
|
|
prompts.DECOMPOSE_SYSTEM,
|
|
decompose_prompt + "\nQuestion: " + normalized,
|
|
context=lexicon_ctx,
|
|
model=plan.fast_model if mode == "quick" else plan.model,
|
|
tag="decompose",
|
|
)
|
|
parts = _parse_json_list(decompose_raw)
|
|
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
|
|
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
|
|
keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords)
|
|
focus_entity = str(classify.get("focus_entity") or "unknown").lower()
|
|
focus_metric = str(classify.get("focus_metric") or "unknown").lower()
|
|
lowered_q = f"{question} {normalized}".lower()
|
|
if "node" in lowered_q:
|
|
focus_entity = "node"
|
|
|
|
snapshot_context = ""
|
|
signal_tokens: list[str] = []
|
|
if classify.get("needs_snapshot"):
|
|
if observer:
|
|
observer("retrieve", "scoring chunks")
|
|
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
|
|
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
|
|
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
|
|
fact_candidates = _collect_fact_candidates(selected, limit=plan.max_subquestions * 12)
|
|
key_facts = await _select_fact_lines(
|
|
call_llm,
|
|
normalized,
|
|
fact_candidates,
|
|
plan,
|
|
max_lines=max(4, plan.max_subquestions * 2),
|
|
)
|
|
metric_facts: list[str] = []
|
|
if classify.get("question_type") in {"metric", "diagnostic"} or force_metric:
|
|
if observer:
|
|
observer("retrieve", "extracting fact types")
|
|
fact_types = await _extract_fact_types(
|
|
call_llm,
|
|
normalized,
|
|
keyword_tokens,
|
|
plan,
|
|
)
|
|
if observer:
|
|
observer("retrieve", "deriving signals")
|
|
signals = await _derive_signals(
|
|
call_llm,
|
|
normalized,
|
|
fact_types,
|
|
plan,
|
|
)
|
|
if isinstance(signals, list):
|
|
signal_tokens = [str(item) for item in signals if item]
|
|
all_tokens = _merge_tokens(signal_tokens, keyword_tokens, question_tokens)
|
|
if observer:
|
|
observer("retrieve", "scanning chunks")
|
|
candidate_lines: list[str] = []
|
|
if signals:
|
|
for chunk in selected:
|
|
chunk_lines = chunk["text"].splitlines()
|
|
if not chunk_lines:
|
|
continue
|
|
hits = await _scan_chunk_for_signals(
|
|
call_llm,
|
|
normalized,
|
|
signals,
|
|
chunk_lines,
|
|
plan,
|
|
)
|
|
if hits:
|
|
candidate_lines.extend(hits)
|
|
candidate_lines = list(dict.fromkeys(candidate_lines))
|
|
if candidate_lines:
|
|
if observer:
|
|
observer("retrieve", "pruning candidates")
|
|
metric_facts = await _prune_metric_candidates(
|
|
call_llm,
|
|
normalized,
|
|
candidate_lines,
|
|
plan,
|
|
plan.metric_retries,
|
|
)
|
|
if metric_facts:
|
|
key_facts = _merge_fact_lines(metric_facts, key_facts)
|
|
if self._settings.debug_pipeline:
|
|
_debug_log("metric_facts_selected", {"facts": metric_facts})
|
|
if not metric_facts:
|
|
if observer:
|
|
observer("retrieve", "fallback metric selection")
|
|
fallback_candidates = _filter_lines_by_keywords(summary_lines, all_tokens, max_lines=200)
|
|
if fallback_candidates:
|
|
metric_facts = await _select_fact_lines(
|
|
call_llm,
|
|
normalized,
|
|
fallback_candidates,
|
|
plan,
|
|
max_lines=max(2, plan.max_subquestions),
|
|
)
|
|
if not metric_facts and fallback_candidates:
|
|
metric_facts = fallback_candidates[: max(2, plan.max_subquestions)]
|
|
if metric_facts:
|
|
metric_facts = _ensure_token_coverage(
|
|
metric_facts,
|
|
all_tokens,
|
|
summary_lines,
|
|
max_add=plan.max_subquestions,
|
|
)
|
|
if metric_facts and not _has_keyword_overlap(metric_facts, keyword_tokens):
|
|
best_line = _best_keyword_line(summary_lines, keyword_tokens)
|
|
if best_line:
|
|
metric_facts = _merge_fact_lines([best_line], metric_facts)
|
|
if metric_facts:
|
|
key_facts = _merge_fact_lines(metric_facts, key_facts)
|
|
if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and not metric_facts and key_facts:
|
|
metric_facts = key_facts
|
|
if key_facts:
|
|
key_facts = _ensure_token_coverage(
|
|
key_facts,
|
|
all_tokens,
|
|
summary_lines,
|
|
max_add=plan.max_subquestions,
|
|
)
|
|
if self._settings.debug_pipeline:
|
|
scored_preview = sorted(
|
|
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
|
|
key=lambda item: item["score"],
|
|
reverse=True,
|
|
)[: min(len(chunks), max(plan.chunk_top, 6))]
|
|
_debug_log(
|
|
"chunk_selected",
|
|
{
|
|
"selected_ids": [item["id"] for item in selected],
|
|
"top_scored": scored_preview,
|
|
},
|
|
)
|
|
facts_used = list(dict.fromkeys(key_facts)) if key_facts else list(dict.fromkeys(metric_facts))
|
|
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
|
|
combined_facts = key_facts
|
|
if global_facts:
|
|
combined_facts = _merge_fact_lines(global_facts, key_facts)
|
|
if combined_facts:
|
|
snapshot_context = "KeyFacts:\n" + "\n".join(combined_facts) + "\n\n" + snapshot_context
|
|
|
|
context = _join_context(
|
|
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
|
|
)
|
|
|
|
if plan.use_tool and classify.get("needs_tool"):
|
|
if observer:
|
|
observer("tool", "suggesting tools")
|
|
tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized
|
|
tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool")
|
|
tool_hint = _parse_json_block(tool_raw, fallback={})
|
|
|
|
if observer:
|
|
observer("subanswers", "drafting subanswers")
|
|
subanswers: list[str] = []
|
|
for subq in sub_questions:
|
|
sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq
|
|
if plan.subanswer_retries > 1:
|
|
candidates: list[str] = []
|
|
for _ in range(plan.subanswer_retries):
|
|
candidate = await call_llm(
|
|
prompts.ANSWER_SYSTEM,
|
|
sub_prompt,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="subanswer",
|
|
)
|
|
candidates.append(candidate)
|
|
best_idx = await _select_best_candidate(call_llm, subq, candidates, plan, "subanswer_select")
|
|
subanswers.append(candidates[best_idx])
|
|
else:
|
|
sub_answer = await call_llm(
|
|
prompts.ANSWER_SYSTEM,
|
|
sub_prompt,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="subanswer",
|
|
)
|
|
subanswers.append(sub_answer)
|
|
|
|
if observer:
|
|
observer("synthesize", "synthesizing")
|
|
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
|
|
|
|
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
|
|
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
|
|
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
|
|
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
|
|
needs_evidence = _needs_evidence_fix(reply, classify)
|
|
hardware_terms = ("rpi", "raspberry", "jetson", "amd64", "arm64", "hardware")
|
|
hardware_line = _line_starting_with(summary_lines, "hardware_nodes:")
|
|
if any(term in lowered_question for term in hardware_terms) and hardware_line:
|
|
needs_evidence = True
|
|
if classify.get("question_type") in {"open_ended", "planning"} and metric_facts:
|
|
needs_evidence = True
|
|
resolved_runbook = None
|
|
if runbook_paths and (runbook_fix or runbook_needed):
|
|
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
|
|
resolver_raw = await call_llm(
|
|
prompts.RUNBOOK_SELECT_SYSTEM,
|
|
resolver_prompt,
|
|
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
|
|
model=plan.fast_model,
|
|
tag="runbook_select",
|
|
)
|
|
resolver = _parse_json_block(resolver_raw, fallback={})
|
|
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
|
|
if candidate and candidate in runbook_paths:
|
|
resolved_runbook = candidate
|
|
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
|
|
if observer:
|
|
observer("evidence_fix", "repairing missing evidence")
|
|
extra_bits = []
|
|
if unknown_nodes:
|
|
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
|
|
if unknown_namespaces:
|
|
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
|
|
if runbook_paths:
|
|
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
|
|
if resolved_runbook:
|
|
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
|
|
if metric_facts:
|
|
extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4]))
|
|
if hardware_line:
|
|
extra_bits.append("HardwareNodes: " + hardware_line)
|
|
if allowed_nodes:
|
|
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
|
|
if allowed_namespaces:
|
|
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
|
|
fix_prompt = (
|
|
prompts.EVIDENCE_FIX_PROMPT
|
|
+ "\nQuestion: "
|
|
+ normalized
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
|
|
)
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
fix_prompt,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="evidence_fix",
|
|
)
|
|
if unknown_nodes or unknown_namespaces:
|
|
refreshed_nodes = _find_unknown_nodes(reply, allowed_nodes)
|
|
refreshed_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
|
|
if refreshed_nodes or refreshed_namespaces:
|
|
reply = _strip_unknown_entities(reply, refreshed_nodes, refreshed_namespaces)
|
|
if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply):
|
|
if observer:
|
|
observer("runbook_enforce", "enforcing runbook path")
|
|
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook)
|
|
reply = await call_llm(
|
|
prompts.RUNBOOK_ENFORCE_SYSTEM,
|
|
enforce_prompt + "\nAnswer: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="runbook_enforce",
|
|
)
|
|
if runbook_paths:
|
|
invalid = [
|
|
token
|
|
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)
|
|
if token.lower() not in {p.lower() for p in runbook_paths}
|
|
]
|
|
if invalid:
|
|
if observer:
|
|
observer("runbook_enforce", "replacing invalid runbook path")
|
|
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
|
|
resolver_raw = await call_llm(
|
|
prompts.RUNBOOK_SELECT_SYSTEM,
|
|
resolver_prompt,
|
|
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
|
|
model=plan.fast_model,
|
|
tag="runbook_select",
|
|
)
|
|
resolver = _parse_json_block(resolver_raw, fallback={})
|
|
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
|
|
if not (candidate and candidate in runbook_paths):
|
|
candidate = _best_runbook_match(invalid[0], runbook_paths)
|
|
if candidate and candidate in runbook_paths:
|
|
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
|
|
reply = await call_llm(
|
|
prompts.RUNBOOK_ENFORCE_SYSTEM,
|
|
enforce_prompt + "\nAnswer: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="runbook_enforce",
|
|
)
|
|
reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces)
|
|
|
|
if facts_used and _needs_evidence_guard(reply, facts_used):
|
|
if observer:
|
|
observer("evidence_guard", "tightening unsupported claims")
|
|
guard_prompt = (
|
|
prompts.EVIDENCE_GUARD_PROMPT
|
|
+ "\nQuestion: "
|
|
+ normalized
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ "\nFactsUsed:\n"
|
|
+ "\n".join(facts_used)
|
|
)
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_GUARD_SYSTEM,
|
|
guard_prompt,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="evidence_guard",
|
|
)
|
|
|
|
if _needs_focus_fix(normalized, reply, classify):
|
|
if observer:
|
|
observer("focus_fix", "tightening answer")
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="focus_fix",
|
|
)
|
|
if not metric_facts:
|
|
best_line = _best_keyword_line(summary_lines, keyword_tokens)
|
|
if best_line:
|
|
reply = f"From the latest snapshot: {best_line}."
|
|
if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and metric_facts:
|
|
best_line = None
|
|
lowered_keywords = [kw.lower() for kw in keyword_tokens if kw]
|
|
for line in metric_facts:
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in lowered_keywords):
|
|
best_line = line
|
|
break
|
|
best_line = best_line or metric_facts[0]
|
|
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
|
|
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
|
|
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
|
|
reply = f"From the latest snapshot: {best_line}."
|
|
|
|
if plan.use_critic:
|
|
if observer:
|
|
observer("critic", "reviewing")
|
|
critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
|
|
critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic")
|
|
critic = _parse_json_block(critic_raw, fallback={})
|
|
if critic.get("issues"):
|
|
revise_prompt = (
|
|
prompts.REVISION_PROMPT
|
|
+ "\nQuestion: "
|
|
+ normalized
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ "\nCritique: "
|
|
+ json.dumps(critic)
|
|
)
|
|
reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise")
|
|
|
|
if plan.use_gap:
|
|
if observer:
|
|
observer("gap", "checking gaps")
|
|
gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
|
|
gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap")
|
|
gap = _parse_json_block(gap_raw, fallback={})
|
|
note = str(gap.get("note") or "").strip()
|
|
if note:
|
|
reply = f"{reply}\n\n{note}"
|
|
|
|
|
|
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
|
|
|
|
scores = await self._score_answer(normalized, reply, plan, call_llm)
|
|
claims = await self._extract_claims(normalized, reply, summary, facts_used, call_llm)
|
|
except LLMLimitReached:
|
|
if not reply:
|
|
reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass."
|
|
scores = _default_scores()
|
|
finally:
|
|
elapsed = round(time.monotonic() - started, 2)
|
|
log.info(
|
|
"atlasbot_answer",
|
|
extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}},
|
|
)
|
|
|
|
if conversation_id and claims:
|
|
self._store_state(conversation_id, claims, summary, snapshot_used, pin_snapshot)
|
|
|
|
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
|
|
return AnswerResult(reply, scores, meta)
|
|
|
|
async def _answer_stock(self, question: str) -> AnswerResult:
|
|
messages = build_messages(prompts.STOCK_SYSTEM, question)
|
|
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
|
|
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
|
|
|
|
async def _synthesize_answer( # noqa: PLR0913
|
|
self,
|
|
question: str,
|
|
subanswers: list[str],
|
|
context: str,
|
|
classify: dict[str, Any],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> str:
|
|
style_hint = _style_hint(classify)
|
|
if not subanswers:
|
|
prompt = (
|
|
prompts.SYNTHESIZE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nStyle: "
|
|
+ style_hint
|
|
+ "\nQuestionType: "
|
|
+ (classify.get("question_type") or "unknown")
|
|
)
|
|
return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")
|
|
draft_prompts = []
|
|
for idx in range(plan.drafts):
|
|
draft_prompts.append(
|
|
prompts.SYNTHESIZE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nStyle: "
|
|
+ style_hint
|
|
+ "\nQuestionType: "
|
|
+ (classify.get("question_type") or "unknown")
|
|
+ "\nSubanswers:\n"
|
|
+ "\n".join([f"- {item}" for item in subanswers])
|
|
+ f"\nDraftIndex: {idx + 1}"
|
|
)
|
|
drafts: list[str] = []
|
|
for prompt in draft_prompts:
|
|
drafts.append(await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth"))
|
|
if len(drafts) == 1:
|
|
return drafts[0]
|
|
select_prompt = (
|
|
prompts.DRAFT_SELECT_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nDrafts:\n"
|
|
+ "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)])
|
|
)
|
|
select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select")
|
|
selection = _parse_json_block(select_raw, fallback={})
|
|
idx = int(selection.get("best", 1)) - 1
|
|
if 0 <= idx < len(drafts):
|
|
return drafts[idx]
|
|
return drafts[0]
|
|
|
|
async def _score_answer(
|
|
self,
|
|
question: str,
|
|
reply: str,
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> AnswerScores:
|
|
if not plan.use_scores:
|
|
return _default_scores()
|
|
prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
|
|
raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score")
|
|
data = _parse_json_block(raw, fallback={})
|
|
return _scores_from_json(data)
|
|
|
|
async def _extract_claims(
|
|
self,
|
|
question: str,
|
|
reply: str,
|
|
summary: dict[str, Any],
|
|
facts_used: list[str],
|
|
call_llm: Callable[..., Any],
|
|
) -> list[ClaimItem]:
|
|
if not reply or not summary:
|
|
return []
|
|
summary_json = _json_excerpt(summary)
|
|
facts_used = [line.strip() for line in (facts_used or []) if line and line.strip()]
|
|
facts_block = ""
|
|
if facts_used:
|
|
facts_block = "\nFactsUsed:\n" + "\n".join([f"- {line}" for line in facts_used[:12]])
|
|
prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply + facts_block
|
|
raw = await call_llm(
|
|
prompts.CLAIM_SYSTEM,
|
|
prompt,
|
|
context=f"SnapshotSummaryJson:{summary_json}",
|
|
model=self._settings.ollama_model_fast,
|
|
tag="claim_map",
|
|
)
|
|
data = _parse_json_block(raw, fallback={})
|
|
claims_raw = data.get("claims") if isinstance(data, dict) else None
|
|
claims: list[ClaimItem] = []
|
|
if isinstance(claims_raw, list):
|
|
for entry in claims_raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
claim_text = str(entry.get("claim") or "").strip()
|
|
claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}"
|
|
evidence_items: list[EvidenceItem] = []
|
|
for ev in entry.get("evidence") or []:
|
|
if not isinstance(ev, dict):
|
|
continue
|
|
path = str(ev.get("path") or "").strip()
|
|
if not path:
|
|
continue
|
|
reason = str(ev.get("reason") or "").strip()
|
|
value = _resolve_path(summary, path)
|
|
evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value))
|
|
if claim_text and evidence_items:
|
|
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
|
return claims
|
|
|
|
async def _dedup_reply(
|
|
self,
|
|
reply: str,
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
tag: str,
|
|
) -> str:
|
|
if not _needs_dedup(reply):
|
|
return reply
|
|
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
|
|
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
|
|
|
|
async def _answer_followup( # noqa: C901, PLR0913
|
|
self,
|
|
question: str,
|
|
state: ConversationState,
|
|
summary: dict[str, Any],
|
|
classify: dict[str, Any],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> str:
|
|
claim_ids = await self._select_claims(question, state.claims, plan, call_llm)
|
|
selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2]
|
|
evidence_lines = []
|
|
lowered = question.lower()
|
|
for claim in selected:
|
|
evidence_lines.append(f"Claim: {claim.claim}")
|
|
for ev in claim.evidence:
|
|
current = _resolve_path(summary, ev.path)
|
|
ev.value = current
|
|
delta_note = ""
|
|
if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim:
|
|
delta_note = f" (now {current})"
|
|
evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}")
|
|
if any(term in lowered for term in ("hotspot", "hot spot", "hottest", "jetson", "rpi", "amd64", "arm64", "hardware", "class")):
|
|
hotspot_lines = _hotspot_evidence(summary)
|
|
if hotspot_lines:
|
|
evidence_lines.append("HotspotSummary:")
|
|
evidence_lines.extend(hotspot_lines)
|
|
evidence_ctx = "\n".join(evidence_lines)
|
|
prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx
|
|
reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup")
|
|
allowed_nodes = _allowed_nodes(summary)
|
|
allowed_namespaces = _allowed_namespaces(summary)
|
|
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
|
|
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
|
|
extra_bits = []
|
|
if unknown_nodes:
|
|
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
|
|
if unknown_namespaces:
|
|
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
|
|
if allowed_nodes:
|
|
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
|
|
if allowed_namespaces:
|
|
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
|
|
if extra_bits:
|
|
fix_prompt = (
|
|
prompts.EVIDENCE_FIX_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ "\n"
|
|
+ "\n".join(extra_bits)
|
|
)
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
fix_prompt,
|
|
context="Evidence:\n" + evidence_ctx,
|
|
model=plan.model,
|
|
tag="followup_fix",
|
|
)
|
|
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup")
|
|
reply = _strip_followup_meta(reply)
|
|
return reply
|
|
|
|
async def _select_claims(
|
|
self,
|
|
question: str,
|
|
claims: list[ClaimItem],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> list[str]:
|
|
if not claims:
|
|
return []
|
|
claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims]
|
|
prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief)
|
|
raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims")
|
|
data = _parse_json_block(raw, fallback={})
|
|
ids = data.get("claim_ids") if isinstance(data, dict) else []
|
|
if isinstance(ids, list):
|
|
return [str(item) for item in ids if item]
|
|
return []
|
|
|
|
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
|
|
if not conversation_id:
|
|
return None
|
|
state_payload = self._store.get(conversation_id)
|
|
return _state_from_payload(state_payload) if state_payload else None
|
|
|
|
def _store_state(
|
|
self,
|
|
conversation_id: str,
|
|
claims: list[ClaimItem],
|
|
summary: dict[str, Any],
|
|
snapshot: dict[str, Any] | None,
|
|
pin_snapshot: bool,
|
|
) -> None:
|
|
snapshot_id = _snapshot_id(summary)
|
|
pinned_snapshot = snapshot if pin_snapshot else None
|
|
payload = {
|
|
"updated_at": time.monotonic(),
|
|
"claims": _claims_to_payload(claims),
|
|
"snapshot_id": snapshot_id,
|
|
"snapshot": pinned_snapshot,
|
|
}
|
|
self._store.set(conversation_id, payload)
|
|
|
|
def _cleanup_state(self) -> None:
|
|
self._store.cleanup()
|
|
|
|
|
|
def _strip_followup_meta(reply: str) -> str:
|
|
cleaned = reply.strip()
|
|
if not cleaned:
|
|
return cleaned
|
|
prefixes = [
|
|
"The draft is correct based on the provided context.",
|
|
"The draft is correct based on the context.",
|
|
"The draft is correct based on the provided evidence.",
|
|
"The draft is correct.",
|
|
"Based on the provided context,",
|
|
"Based on the context,",
|
|
"Based on the provided evidence,",
|
|
]
|
|
for prefix in prefixes:
|
|
if cleaned.lower().startswith(prefix.lower()):
|
|
cleaned = cleaned[len(prefix) :].lstrip(" .")
|
|
break
|
|
return cleaned
|
|
|
|
|
|
def _build_meta( # noqa: PLR0913
|
|
mode: str,
|
|
call_count: int,
|
|
call_cap: int,
|
|
limit_hit: bool,
|
|
classify: dict[str, Any],
|
|
tool_hint: dict[str, Any] | None,
|
|
started: float,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"mode": mode,
|
|
"llm_calls": call_count,
|
|
"llm_limit": call_cap,
|
|
"llm_limit_hit": limit_hit,
|
|
"classify": classify,
|
|
"tool_hint": tool_hint,
|
|
"elapsed_sec": round(time.monotonic() - started, 2),
|
|
}
|
|
|
|
|
|
def _mode_plan(settings: Settings, mode: str) -> ModePlan:
|
|
if mode == "genius":
|
|
return ModePlan(
|
|
model=settings.ollama_model_genius,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=6,
|
|
chunk_lines=6,
|
|
chunk_top=10,
|
|
chunk_group=4,
|
|
use_tool=True,
|
|
use_critic=True,
|
|
use_gap=True,
|
|
use_scores=True,
|
|
drafts=2,
|
|
metric_retries=3,
|
|
subanswer_retries=3,
|
|
)
|
|
if mode == "smart":
|
|
return ModePlan(
|
|
model=settings.ollama_model_smart,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=4,
|
|
chunk_lines=8,
|
|
chunk_top=8,
|
|
chunk_group=4,
|
|
use_tool=True,
|
|
use_critic=True,
|
|
use_gap=True,
|
|
use_scores=True,
|
|
drafts=1,
|
|
metric_retries=2,
|
|
subanswer_retries=2,
|
|
)
|
|
return ModePlan(
|
|
model=settings.ollama_model_fast,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=2,
|
|
chunk_lines=12,
|
|
chunk_top=5,
|
|
chunk_group=5,
|
|
use_tool=False,
|
|
use_critic=False,
|
|
use_gap=False,
|
|
use_scores=False,
|
|
drafts=1,
|
|
metric_retries=1,
|
|
subanswer_retries=1,
|
|
)
|
|
|
|
|
|
def _llm_call_limit(settings: Settings, mode: str) -> int:
|
|
if mode == "genius":
|
|
return settings.genius_llm_calls_max
|
|
if mode == "smart":
|
|
return settings.smart_llm_calls_max
|
|
return settings.fast_llm_calls_max
|
|
|
|
|
|
def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]:
|
|
if not parts:
|
|
return [fallback]
|
|
ranked = []
|
|
for entry in parts:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
question = str(entry.get("question") or "").strip()
|
|
if not question:
|
|
continue
|
|
priority = entry.get("priority")
|
|
try:
|
|
weight = float(priority)
|
|
except (TypeError, ValueError):
|
|
weight = 1.0
|
|
ranked.append((weight, question))
|
|
ranked.sort(key=lambda item: item[0], reverse=True)
|
|
questions = [item[1] for item in ranked][:limit]
|
|
return questions or [fallback]
|
|
|
|
|
|
def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]:
|
|
chunks: list[dict[str, Any]] = []
|
|
if not lines:
|
|
return chunks
|
|
for idx in range(0, len(lines), lines_per_chunk):
|
|
chunk_lines = lines[idx : idx + lines_per_chunk]
|
|
text = "\n".join(chunk_lines)
|
|
summary = " | ".join(chunk_lines[:4])
|
|
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
|
|
return chunks
|
|
|
|
|
|
async def _score_chunks(
|
|
call_llm: Callable[..., Any],
|
|
chunks: list[dict[str, Any]],
|
|
question: str,
|
|
sub_questions: list[str],
|
|
plan: ModePlan,
|
|
) -> dict[str, float]:
|
|
scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks}
|
|
if not chunks:
|
|
return scores
|
|
group: list[dict[str, Any]] = []
|
|
for chunk in chunks:
|
|
group.append({"id": chunk["id"], "summary": chunk["summary"]})
|
|
if len(group) >= plan.chunk_group:
|
|
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
|
|
group = []
|
|
if group:
|
|
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
|
|
return scores
|
|
|
|
|
|
async def _score_chunk_group(
|
|
call_llm: Callable[..., Any],
|
|
group: list[dict[str, Any]],
|
|
question: str,
|
|
sub_questions: list[str],
|
|
) -> dict[str, float]:
|
|
prompt = (
|
|
prompts.CHUNK_SCORE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nSubQuestions: "
|
|
+ json.dumps(sub_questions)
|
|
+ "\nChunks: "
|
|
+ json.dumps(group)
|
|
)
|
|
raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score")
|
|
data = _parse_json_list(raw)
|
|
scored: dict[str, float] = {}
|
|
for entry in data:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
cid = str(entry.get("id") or "").strip()
|
|
if not cid:
|
|
continue
|
|
try:
|
|
score = float(entry.get("score") or 0)
|
|
except (TypeError, ValueError):
|
|
score = 0.0
|
|
scored[cid] = score
|
|
return scored
|
|
|
|
|
|
def _keyword_hits(
|
|
ranked: list[dict[str, Any]],
|
|
head: dict[str, Any],
|
|
keywords: list[str] | None,
|
|
) -> list[dict[str, Any]]:
|
|
if not keywords:
|
|
return []
|
|
lowered = [kw.lower() for kw in keywords if isinstance(kw, str) and kw.strip()]
|
|
if not lowered:
|
|
return []
|
|
hits: list[dict[str, Any]] = []
|
|
for item in ranked:
|
|
if item is head:
|
|
continue
|
|
text = str(item.get("text") or "").lower()
|
|
if any(kw in text for kw in lowered):
|
|
hits.append(item)
|
|
return hits
|
|
|
|
|
|
def _select_chunks(
|
|
chunks: list[dict[str, Any]],
|
|
scores: dict[str, float],
|
|
plan: ModePlan,
|
|
keywords: list[str] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if not chunks:
|
|
return []
|
|
ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True)
|
|
selected: list[dict[str, Any]] = []
|
|
head = chunks[0]
|
|
selected.append(head)
|
|
|
|
for item in _keyword_hits(ranked, head, keywords):
|
|
if len(selected) >= plan.chunk_top:
|
|
return selected
|
|
if item not in selected:
|
|
selected.append(item)
|
|
|
|
for item in ranked:
|
|
if len(selected) >= plan.chunk_top:
|
|
break
|
|
if item not in selected:
|
|
selected.append(item)
|
|
return selected
|
|
|
|
|
|
def _format_runbooks(runbooks: list[str]) -> str:
|
|
if not runbooks:
|
|
return ""
|
|
return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks])
|
|
|
|
|
|
def _join_context(parts: list[str]) -> str:
|
|
text = "\n".join([part for part in parts if part])
|
|
return text.strip()
|
|
|
|
|
|
def _format_history(history: list[dict[str, str]] | None) -> str:
|
|
if not history:
|
|
return ""
|
|
lines = ["Recent conversation (non-authoritative):"]
|
|
for entry in history[-4:]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
question = entry.get("q")
|
|
answer = entry.get("a")
|
|
role = entry.get("role")
|
|
content = entry.get("content")
|
|
if question:
|
|
lines.append(f"Q: {question}")
|
|
if answer:
|
|
lines.append(f"A: {answer}")
|
|
if role and content:
|
|
prefix = "Q" if role == "user" else "A"
|
|
lines.append(f"{prefix}: {content}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
|
|
text = summary_text(snapshot)
|
|
if not text:
|
|
return []
|
|
return [line for line in text.splitlines() if line.strip()]
|
|
|
|
|
|
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
|
|
seen = set()
|
|
merged: list[str] = []
|
|
for line in primary + fallback:
|
|
if line in seen:
|
|
continue
|
|
seen.add(line)
|
|
merged.append(line)
|
|
return merged
|
|
|
|
|
|
def _expand_hottest_line(line: str) -> list[str]:
|
|
if not line:
|
|
return []
|
|
if not line.lower().startswith("hottest:"):
|
|
return []
|
|
expanded: list[str] = []
|
|
payload = line.split("hottest:", 1)[1]
|
|
for part in payload.split(";"):
|
|
part = part.strip()
|
|
if not part or "=" not in part:
|
|
continue
|
|
metric, rest = part.split("=", 1)
|
|
metric = metric.strip()
|
|
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
|
|
if not match:
|
|
continue
|
|
node = match.group("node").strip()
|
|
value = match.group("value").strip()
|
|
class_match = re.search(r"\[(?P<class>[^\]]+)\]", rest)
|
|
node_class = class_match.group("class").strip() if class_match else ""
|
|
if node_class:
|
|
expanded.append(f"hottest_{metric}_node: {node} [{node_class}] ({value})")
|
|
else:
|
|
expanded.append(f"hottest_{metric}_node: {node} ({value})")
|
|
return expanded
|
|
|
|
|
|
def _has_token(text: str, token: str) -> bool:
|
|
if not text or not token:
|
|
return False
|
|
if token == "io":
|
|
return "i/o" in text or re.search(r"\bio\b", text) is not None
|
|
return re.search(rf"\b{re.escape(token)}\b", text) is not None
|
|
|
|
|
|
def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
|
|
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
|
|
if not hottest:
|
|
return []
|
|
hardware_by_node = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
|
|
node_pods_top = summary.get("node_pods_top") if isinstance(summary.get("node_pods_top"), list) else []
|
|
ns_map = {}
|
|
for item in node_pods_top:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
node = item.get("node")
|
|
namespaces_top = item.get("namespaces_top") if isinstance(item.get("namespaces_top"), list) else []
|
|
ns_map[node] = namespaces_top
|
|
lines: list[str] = []
|
|
for metric, info in hottest.items():
|
|
if not isinstance(info, dict):
|
|
continue
|
|
node = info.get("node")
|
|
value = info.get("value")
|
|
if not node:
|
|
continue
|
|
node_class = hardware_by_node.get(node)
|
|
ns_parts = []
|
|
for entry in ns_map.get(node, [])[:3]:
|
|
if isinstance(entry, (list, tuple)) and len(entry) >= NS_ENTRY_MIN_LEN:
|
|
ns_parts.append(f"{entry[0]}={entry[1]}")
|
|
ns_text = ", ".join(ns_parts)
|
|
value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value)
|
|
line = f"hotspot.{metric}: node={node} class={node_class or 'unknown'} value={value_text}"
|
|
if ns_text:
|
|
line += f" namespaces_top={ns_text}"
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
async def _select_best_candidate(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
candidates: list[str],
|
|
plan: ModePlan,
|
|
tag: str,
|
|
) -> int:
|
|
if len(candidates) <= 1:
|
|
return 0
|
|
prompt = (
|
|
prompts.CANDIDATE_SELECT_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nCandidates:\n"
|
|
+ "\n".join([f"{idx+1}) {cand}" for idx, cand in enumerate(candidates)])
|
|
)
|
|
raw = await call_llm(prompts.CANDIDATE_SELECT_SYSTEM, prompt, model=plan.model, tag=tag)
|
|
data = _parse_json_block(raw, fallback={})
|
|
best = data.get("best") if isinstance(data, dict) else None
|
|
if isinstance(best, int) and 1 <= best <= len(candidates):
|
|
return best - 1
|
|
return 0
|
|
|
|
|
|
def _dedupe_lines(lines: list[str], limit: int | None = None) -> list[str]:
|
|
seen: set[str] = set()
|
|
cleaned: list[str] = []
|
|
for line in lines:
|
|
value = (line or "").strip()
|
|
if not value or value in seen:
|
|
continue
|
|
if value.lower().startswith("lexicon_") or value.lower().startswith("units:"):
|
|
continue
|
|
cleaned.append(value)
|
|
seen.add(value)
|
|
if limit and len(cleaned) >= limit:
|
|
break
|
|
return cleaned
|
|
|
|
|
|
def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list[str]:
|
|
lines: list[str] = []
|
|
for chunk in selected:
|
|
text = chunk.get("text") if isinstance(chunk, dict) else None
|
|
if not isinstance(text, str):
|
|
continue
|
|
lines.extend([line for line in text.splitlines() if line.strip()])
|
|
return _dedupe_lines(lines, limit=limit)
|
|
|
|
|
|
async def _select_best_list(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
candidates: list[list[str]],
|
|
plan: ModePlan,
|
|
tag: str,
|
|
) -> list[str]:
|
|
if not candidates:
|
|
return []
|
|
if len(candidates) == 1:
|
|
return candidates[0]
|
|
render = ["; ".join(items) for items in candidates]
|
|
best_idx = await _select_best_candidate(call_llm, question, render, plan, tag)
|
|
chosen = candidates[best_idx] if 0 <= best_idx < len(candidates) else candidates[0]
|
|
if not chosen:
|
|
merged: list[str] = []
|
|
for entry in candidates:
|
|
for item in entry:
|
|
if item not in merged:
|
|
merged.append(item)
|
|
chosen = merged
|
|
return chosen
|
|
|
|
|
|
async def _extract_fact_types(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
keywords: list[str],
|
|
plan: ModePlan,
|
|
) -> list[str]:
|
|
prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question
|
|
if keywords:
|
|
prompt += "\nKeywords: " + ", ".join(keywords)
|
|
candidates: list[list[str]] = []
|
|
attempts = max(plan.metric_retries, 1)
|
|
for _ in range(attempts):
|
|
raw = await call_llm(prompts.FACT_TYPES_SYSTEM, prompt, model=plan.fast_model, tag="fact_types")
|
|
data = _parse_json_block(raw, fallback={})
|
|
items = data.get("fact_types") if isinstance(data, dict) else None
|
|
if not isinstance(items, list):
|
|
continue
|
|
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=10)
|
|
if cleaned:
|
|
candidates.append(cleaned)
|
|
chosen = await _select_best_list(call_llm, question, candidates, plan, "fact_types_select")
|
|
return chosen[:10]
|
|
|
|
|
|
async def _derive_signals(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
fact_types: list[str],
|
|
plan: ModePlan,
|
|
) -> list[str]:
|
|
if not fact_types:
|
|
return []
|
|
prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types))
|
|
candidates: list[list[str]] = []
|
|
attempts = max(plan.metric_retries, 1)
|
|
for _ in range(attempts):
|
|
raw = await call_llm(prompts.SIGNAL_SYSTEM, prompt, model=plan.fast_model, tag="signals")
|
|
data = _parse_json_block(raw, fallback={})
|
|
items = data.get("signals") if isinstance(data, dict) else None
|
|
if not isinstance(items, list):
|
|
continue
|
|
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=12)
|
|
if cleaned:
|
|
candidates.append(cleaned)
|
|
chosen = await _select_best_list(call_llm, question, candidates, plan, "signals_select")
|
|
return chosen[:12]
|
|
|
|
|
|
async def _scan_chunk_for_signals(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
signals: list[str],
|
|
chunk_lines: list[str],
|
|
plan: ModePlan,
|
|
) -> list[str]:
|
|
if not signals or not chunk_lines:
|
|
return []
|
|
prompt = prompts.CHUNK_SCAN_PROMPT.format(
|
|
signals="; ".join(signals),
|
|
lines="\n".join(chunk_lines),
|
|
)
|
|
attempts = max(1, min(plan.metric_retries, 2))
|
|
candidates: list[list[str]] = []
|
|
for _ in range(attempts):
|
|
raw = await call_llm(prompts.CHUNK_SCAN_SYSTEM, prompt, model=plan.fast_model, tag="chunk_scan")
|
|
data = _parse_json_block(raw, fallback={})
|
|
items = data.get("lines") if isinstance(data, dict) else None
|
|
if not isinstance(items, list):
|
|
continue
|
|
cleaned = [line for line in chunk_lines if line in items]
|
|
cleaned = _dedupe_lines(cleaned, limit=15)
|
|
if cleaned:
|
|
candidates.append(cleaned)
|
|
chosen = await _select_best_list(call_llm, question, candidates, plan, "chunk_scan_select")
|
|
return chosen[:15]
|
|
|
|
|
|
async def _prune_metric_candidates(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
candidates: list[str],
|
|
plan: ModePlan,
|
|
attempts: int,
|
|
) -> list[str]:
|
|
if not candidates:
|
|
return []
|
|
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6)
|
|
picks: list[list[str]] = []
|
|
for _ in range(max(attempts, 1)):
|
|
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_prune")
|
|
data = _parse_json_block(raw, fallback={})
|
|
items = data.get("lines") if isinstance(data, dict) else None
|
|
if not isinstance(items, list):
|
|
continue
|
|
cleaned = [line for line in candidates if line in items]
|
|
cleaned = _dedupe_lines(cleaned, limit=6)
|
|
if cleaned:
|
|
picks.append(cleaned)
|
|
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_prune_select")
|
|
return chosen[:6]
|
|
|
|
|
|
async def _select_fact_lines(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
candidates: list[str],
|
|
plan: ModePlan,
|
|
max_lines: int,
|
|
) -> list[str]:
|
|
if not candidates:
|
|
return []
|
|
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines)
|
|
picks: list[list[str]] = []
|
|
attempts = max(plan.metric_retries, 1)
|
|
for _ in range(attempts):
|
|
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
|
|
data = _parse_json_block(raw, fallback={})
|
|
items = data.get("lines") if isinstance(data, dict) else None
|
|
if not isinstance(items, list):
|
|
continue
|
|
cleaned = [line for line in candidates if line in items]
|
|
cleaned = _dedupe_lines(cleaned, limit=max_lines)
|
|
if cleaned:
|
|
picks.append(cleaned)
|
|
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_select_best")
|
|
return chosen[:max_lines]
|
|
|
|
|
|
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
|
|
if not reply:
|
|
return reply
|
|
if not unknown_nodes and not unknown_namespaces:
|
|
return reply
|
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
|
|
if not sentences:
|
|
return reply
|
|
lowered_nodes = [node.lower() for node in unknown_nodes]
|
|
lowered_namespaces = [ns.lower() for ns in unknown_namespaces]
|
|
kept: list[str] = []
|
|
for sent in sentences:
|
|
lower = sent.lower()
|
|
if lowered_nodes and any(node in lower for node in lowered_nodes):
|
|
continue
|
|
if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces):
|
|
continue
|
|
kept.append(sent)
|
|
cleaned = " ".join(kept).strip()
|
|
return cleaned or reply
|
|
|
|
|
|
def _needs_evidence_guard(reply: str, facts: list[str]) -> bool:
|
|
if not reply or not facts:
|
|
return False
|
|
lower_reply = reply.lower()
|
|
fact_text = " ".join(facts).lower()
|
|
node_pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE)
|
|
nodes = {m.group(1).lower() for m in node_pattern.finditer(reply)}
|
|
if nodes:
|
|
missing = [node for node in nodes if node not in fact_text]
|
|
if missing:
|
|
return True
|
|
pressure_terms = ("pressure", "diskpressure", "memorypressure", "pidpressure", "headroom")
|
|
if any(term in lower_reply for term in pressure_terms) and not any(term in fact_text for term in pressure_terms):
|
|
return True
|
|
arch_terms = ("amd64", "arm64", "rpi", "rpi4", "rpi5", "jetson")
|
|
if any(term in lower_reply for term in arch_terms) and not any(term in fact_text for term in arch_terms):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]:
|
|
if not lines:
|
|
return []
|
|
tokens = _expand_tokens(keywords)
|
|
if not tokens:
|
|
return lines[:max_lines]
|
|
filtered = [line for line in lines if any(tok in line.lower() for tok in tokens)]
|
|
return (filtered or lines)[:max_lines]
|
|
|
|
|
|
def _global_facts(lines: list[str]) -> list[str]:
|
|
if not lines:
|
|
return []
|
|
wanted = ("nodes_total", "nodes_ready", "cluster_name", "cluster", "nodes_not_ready")
|
|
facts: list[str] = []
|
|
for line in lines:
|
|
lower = line.lower()
|
|
if any(key in lower for key in wanted):
|
|
facts.append(line)
|
|
return _dedupe_lines(facts, limit=6)
|
|
|
|
|
|
def _has_keyword_overlap(lines: list[str], keywords: list[str]) -> bool:
|
|
if not lines or not keywords:
|
|
return False
|
|
tokens = _expand_tokens(keywords)
|
|
if not tokens:
|
|
return False
|
|
for line in lines:
|
|
lower = line.lower()
|
|
if any(tok in lower for tok in tokens):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _merge_tokens(primary: list[str], secondary: list[str], third: list[str] | None = None) -> list[str]:
|
|
merged: list[str] = []
|
|
for token in primary + secondary + (third or []):
|
|
if not token:
|
|
continue
|
|
if token not in merged:
|
|
merged.append(token)
|
|
return merged
|
|
|
|
|
|
def _extract_question_tokens(question: str) -> list[str]:
|
|
if not question:
|
|
return []
|
|
tokens: list[str] = []
|
|
for part in re.split(r"[^a-zA-Z0-9_-]+", question.lower()):
|
|
if len(part) < TOKEN_MIN_LEN:
|
|
continue
|
|
if part not in tokens:
|
|
tokens.append(part)
|
|
return tokens
|
|
|
|
|
|
def _expand_tokens(tokens: list[str]) -> list[str]:
|
|
if not tokens:
|
|
return []
|
|
expanded: list[str] = []
|
|
for token in tokens:
|
|
if not isinstance(token, str):
|
|
continue
|
|
for part in re.split(r"[^a-zA-Z0-9_-]+", token.lower()):
|
|
if len(part) < TOKEN_MIN_LEN:
|
|
continue
|
|
if part not in expanded:
|
|
expanded.append(part)
|
|
return expanded
|
|
|
|
|
|
def _ensure_token_coverage(
|
|
lines: list[str],
|
|
tokens: list[str],
|
|
summary_lines: list[str],
|
|
max_add: int = 4,
|
|
) -> list[str]:
|
|
if not lines or not tokens or not summary_lines:
|
|
return lines
|
|
hay = " ".join(lines).lower()
|
|
missing = [tok for tok in tokens if tok and tok.lower() not in hay]
|
|
if not missing:
|
|
return lines
|
|
added: list[str] = []
|
|
for token in missing:
|
|
token_lower = token.lower()
|
|
for line in summary_lines:
|
|
if token_lower in line.lower() and line not in lines and line not in added:
|
|
added.append(line)
|
|
break
|
|
if len(added) >= max_add:
|
|
break
|
|
if not added:
|
|
return lines
|
|
return _merge_fact_lines(added, lines)
|
|
|
|
|
|
def _best_keyword_line(lines: list[str], keywords: list[str]) -> str | None:
|
|
if not lines or not keywords:
|
|
return None
|
|
tokens = _expand_tokens(keywords)
|
|
if not tokens:
|
|
return None
|
|
best = None
|
|
best_score = 0
|
|
for line in lines:
|
|
lower = line.lower()
|
|
score = sum(1 for tok in tokens if tok in lower)
|
|
if score > best_score:
|
|
best_score = score
|
|
best = line
|
|
return best if best_score > 0 else None
|
|
|
|
|
|
def _line_starting_with(lines: list[str], prefix: str) -> str | None:
|
|
if not lines or not prefix:
|
|
return None
|
|
lower_prefix = prefix.lower()
|
|
for line in lines:
|
|
if str(line).lower().startswith(lower_prefix):
|
|
return line
|
|
return None
|
|
|
|
|
|
def _lexicon_context(summary: dict[str, Any]) -> str: # noqa: C901
|
|
if not isinstance(summary, dict):
|
|
return ""
|
|
lexicon = summary.get("lexicon")
|
|
if not isinstance(lexicon, dict):
|
|
return ""
|
|
terms = lexicon.get("terms")
|
|
aliases = lexicon.get("aliases")
|
|
lines: list[str] = []
|
|
if isinstance(terms, list):
|
|
for entry in terms[:8]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
term = entry.get("term")
|
|
meaning = entry.get("meaning")
|
|
if term and meaning:
|
|
lines.append(f"{term}: {meaning}")
|
|
if isinstance(aliases, dict):
|
|
for key, value in list(aliases.items())[:6]:
|
|
if key and value:
|
|
lines.append(f"alias {key} -> {value}")
|
|
if not lines:
|
|
return ""
|
|
return "Lexicon:\n" + "\n".join(lines)
|
|
|
|
|
|
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
|
|
raw = text.strip()
|
|
match = re.search(r"\{.*\}", raw, flags=re.S)
|
|
if match:
|
|
return parse_json(match.group(0), fallback=fallback)
|
|
return parse_json(raw, fallback=fallback)
|
|
|
|
|
|
def _parse_json_list(text: str) -> list[dict[str, Any]]:
|
|
raw = text.strip()
|
|
match = re.search(r"\[.*\]", raw, flags=re.S)
|
|
data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={})
|
|
if isinstance(data, list):
|
|
return [entry for entry in data if isinstance(entry, dict)]
|
|
return []
|
|
|
|
|
|
def _scores_from_json(data: dict[str, Any]) -> AnswerScores:
|
|
return AnswerScores(
|
|
confidence=_coerce_int(data.get("confidence"), 60),
|
|
relevance=_coerce_int(data.get("relevance"), 60),
|
|
satisfaction=_coerce_int(data.get("satisfaction"), 60),
|
|
hallucination_risk=str(data.get("hallucination_risk") or "medium"),
|
|
)
|
|
|
|
|
|
def _coerce_int(value: Any, default: int) -> int:
|
|
try:
|
|
return int(float(value))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _default_scores() -> AnswerScores:
|
|
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
|
|
|
|
|
|
def _style_hint(classify: dict[str, Any]) -> str:
|
|
style = (classify.get("answer_style") or "").strip().lower()
|
|
qtype = (classify.get("question_type") or "").strip().lower()
|
|
if style == "insightful" or qtype in {"open_ended", "planning"}:
|
|
return "insightful"
|
|
return "direct"
|
|
|
|
|
|
def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
|
|
if not reply:
|
|
return False
|
|
lowered = reply.lower()
|
|
missing_markers = (
|
|
"don't have",
|
|
"do not have",
|
|
"don't know",
|
|
"cannot",
|
|
"can't",
|
|
"need to",
|
|
"would need",
|
|
"does not provide",
|
|
"does not mention",
|
|
"not mention",
|
|
"not provided",
|
|
"not in context",
|
|
"not referenced",
|
|
"missing",
|
|
"no specific",
|
|
"no information",
|
|
)
|
|
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
|
|
return True
|
|
if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _needs_dedup(reply: str) -> bool:
|
|
if not reply:
|
|
return False
|
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
|
|
if len(sentences) < DEDUP_MIN_SENTENCES:
|
|
return False
|
|
seen = set()
|
|
for sent in sentences:
|
|
norm = re.sub(r"\\s+", " ", sent.lower())
|
|
if norm in seen:
|
|
return True
|
|
seen.add(norm)
|
|
return False
|
|
|
|
|
|
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
|
|
if not reply:
|
|
return False
|
|
q_lower = (question or "").lower()
|
|
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
|
|
return False
|
|
missing_markers = (
|
|
"does not provide",
|
|
"does not specify",
|
|
"not available",
|
|
"not provided",
|
|
"cannot determine",
|
|
"don't have",
|
|
"do not have",
|
|
"insufficient",
|
|
"no data",
|
|
)
|
|
if any(marker in reply.lower() for marker in missing_markers):
|
|
return True
|
|
if reply.count(".") <= 1:
|
|
return False
|
|
extra_markers = ("for more", "if you need", "additional", "based on")
|
|
return any(marker in reply.lower() for marker in extra_markers)
|
|
|
|
|
|
def _extract_keywords(
|
|
raw_question: str,
|
|
normalized: str,
|
|
sub_questions: list[str],
|
|
keywords: list[Any] | None,
|
|
) -> list[str]:
|
|
stopwords = {
|
|
"the",
|
|
"and",
|
|
"for",
|
|
"with",
|
|
"that",
|
|
"this",
|
|
"what",
|
|
"which",
|
|
"when",
|
|
"where",
|
|
"who",
|
|
"why",
|
|
"how",
|
|
"tell",
|
|
"show",
|
|
"list",
|
|
"give",
|
|
"about",
|
|
"right",
|
|
"now",
|
|
}
|
|
tokens: list[str] = []
|
|
for source in [raw_question, normalized, *sub_questions]:
|
|
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
|
|
if len(part) < TOKEN_MIN_LEN or part in stopwords:
|
|
continue
|
|
tokens.append(part)
|
|
if keywords:
|
|
for kw in keywords:
|
|
if isinstance(kw, str):
|
|
part = kw.strip().lower()
|
|
if part and part not in stopwords and part not in tokens:
|
|
tokens.append(part)
|
|
return list(dict.fromkeys(tokens))[:12]
|
|
|
|
|
|
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
|
|
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
|
|
if hardware:
|
|
return sorted([node for node in hardware.keys() if isinstance(node, str)])
|
|
return []
|
|
|
|
|
|
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
|
|
namespaces: list[str] = []
|
|
for entry in summary.get("namespace_pods") or []:
|
|
if isinstance(entry, dict):
|
|
name = entry.get("namespace")
|
|
if name:
|
|
namespaces.append(str(name))
|
|
return sorted(set(namespaces))
|
|
|
|
|
|
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
|
|
if not reply or not allowed:
|
|
return False
|
|
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
|
|
if not paths:
|
|
return False
|
|
allowed_set = {p.lower() for p in allowed}
|
|
return any(path.lower() not in allowed_set for path in paths)
|
|
|
|
|
|
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
|
|
if not allowed or not question:
|
|
return False
|
|
lowered = question.lower()
|
|
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
|
|
if not any(cue in lowered for cue in cues):
|
|
return False
|
|
if not reply:
|
|
return True
|
|
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
|
|
if token.lower() in {p.lower() for p in allowed}:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
|
|
if not candidate or not allowed:
|
|
return None
|
|
best = None
|
|
best_score = 0.0
|
|
for path in allowed:
|
|
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
|
|
if score > best_score:
|
|
best_score = score
|
|
best = path
|
|
return best if best_score >= RUNBOOK_SIMILARITY_THRESHOLD else None
|
|
|
|
|
|
def _resolve_path(data: Any, path: str) -> Any | None:
|
|
if path.startswith("line:"):
|
|
return path.split("line:", 1)[1].strip()
|
|
cursor = data
|
|
for part in re.split(r"\.(?![^\[]*\])", path):
|
|
if not part:
|
|
continue
|
|
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
|
|
if not match:
|
|
return None
|
|
key = match.group(1)
|
|
index = match.group(2)
|
|
if isinstance(cursor, dict):
|
|
cursor = cursor.get(key)
|
|
else:
|
|
return None
|
|
if index is not None:
|
|
try:
|
|
idx = int(index)
|
|
if isinstance(cursor, list) and 0 <= idx < len(cursor):
|
|
cursor = cursor[idx]
|
|
else:
|
|
return None
|
|
except ValueError:
|
|
return None
|
|
return cursor
|
|
|
|
|
|
def _snapshot_id(summary: dict[str, Any]) -> str | None:
|
|
if not summary:
|
|
return None
|
|
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
|
|
value = summary.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for claim in claims:
|
|
evidence = []
|
|
for ev in claim.evidence:
|
|
evidence.append(
|
|
{
|
|
"path": ev.path,
|
|
"reason": ev.reason,
|
|
"value_at_claim": ev.value_at_claim,
|
|
}
|
|
)
|
|
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
|
|
return output
|
|
|
|
|
|
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
|
|
if not payload:
|
|
return None
|
|
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
|
|
claims: list[ClaimItem] = []
|
|
if isinstance(claims_raw, list):
|
|
for entry in claims_raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
claim_text = str(entry.get("claim") or "").strip()
|
|
claim_id = str(entry.get("id") or "").strip()
|
|
if not claim_text or not claim_id:
|
|
continue
|
|
evidence_items: list[EvidenceItem] = []
|
|
for ev in entry.get("evidence") or []:
|
|
if not isinstance(ev, dict):
|
|
continue
|
|
path = str(ev.get("path") or "").strip()
|
|
if not path:
|
|
continue
|
|
reason = str(ev.get("reason") or "").strip()
|
|
value_at_claim = ev.get("value_at_claim")
|
|
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
|
|
if evidence_items:
|
|
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
|
return ConversationState(
|
|
updated_at=float(payload.get("updated_at") or time.monotonic()),
|
|
claims=claims,
|
|
snapshot_id=payload.get("snapshot_id"),
|
|
snapshot=payload.get("snapshot"),
|
|
)
|
|
|
|
|
|
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
|
|
raw = json.dumps(summary, ensure_ascii=False)
|
|
return raw[:max_chars]
|