1567 lines
58 KiB
Python
1567 lines
58 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import math
|
|
import re
|
|
import time
|
|
import difflib
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable
|
|
|
|
from atlasbot.config import Settings
|
|
from atlasbot.knowledge.loader import KnowledgeBase
|
|
from atlasbot.llm.client import LLMClient, build_messages, parse_json
|
|
from atlasbot.llm import prompts
|
|
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
|
|
from atlasbot.state.store import ClaimStore
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMLimitReached(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AnswerScores:
|
|
confidence: int
|
|
relevance: int
|
|
satisfaction: int
|
|
hallucination_risk: str
|
|
|
|
|
|
@dataclass
|
|
class AnswerResult:
|
|
reply: str
|
|
scores: AnswerScores
|
|
meta: dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class EvidenceItem:
|
|
path: str
|
|
reason: str
|
|
value: Any | None = None
|
|
value_at_claim: Any | None = None
|
|
|
|
|
|
@dataclass
|
|
class ClaimItem:
|
|
id: str
|
|
claim: str
|
|
evidence: list[EvidenceItem]
|
|
|
|
|
|
@dataclass
|
|
class ConversationState:
|
|
updated_at: float
|
|
claims: list[ClaimItem]
|
|
snapshot_id: str | None = None
|
|
snapshot: dict[str, Any] | None = None
|
|
|
|
|
|
@dataclass
|
|
class ModePlan:
|
|
model: str
|
|
fast_model: str
|
|
max_subquestions: int
|
|
chunk_lines: int
|
|
chunk_top: int
|
|
chunk_group: int
|
|
use_tool: bool
|
|
use_critic: bool
|
|
use_gap: bool
|
|
use_scores: bool
|
|
drafts: int
|
|
|
|
|
|
class AnswerEngine:
|
|
def __init__(
|
|
self,
|
|
settings: Settings,
|
|
llm: LLMClient,
|
|
kb: KnowledgeBase,
|
|
snapshot: SnapshotProvider,
|
|
) -> None:
|
|
self._settings = settings
|
|
self._llm = llm
|
|
self._kb = kb
|
|
self._snapshot = snapshot
|
|
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
|
|
|
|
async def answer(
|
|
self,
|
|
question: str,
|
|
*,
|
|
mode: str,
|
|
history: list[dict[str, str]] | None = None,
|
|
observer: Callable[[str, str], None] | None = None,
|
|
conversation_id: str | None = None,
|
|
) -> AnswerResult:
|
|
question = (question or "").strip()
|
|
if not question:
|
|
return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode})
|
|
if mode == "stock":
|
|
return await self._answer_stock(question)
|
|
|
|
limitless = "run limitless" in question.lower()
|
|
if limitless:
|
|
question = re.sub(r"(?i)run limitless", "", question).strip()
|
|
plan = _mode_plan(self._settings, mode)
|
|
call_limit = _llm_call_limit(self._settings, mode)
|
|
call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier)
|
|
call_count = 0
|
|
limit_hit = False
|
|
|
|
debug_tags = {
|
|
"route",
|
|
"decompose",
|
|
"chunk_score",
|
|
"fact_select",
|
|
"synth",
|
|
"subanswer",
|
|
"tool",
|
|
"followup",
|
|
"select_claims",
|
|
"evidence_fix",
|
|
}
|
|
|
|
def _debug_log(name: str, payload: Any) -> None:
|
|
if not self._settings.debug_pipeline:
|
|
return
|
|
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
|
|
|
|
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
|
|
nonlocal call_count, limit_hit
|
|
if not limitless and call_count >= call_cap:
|
|
limit_hit = True
|
|
raise LLMLimitReached("llm_limit")
|
|
call_count += 1
|
|
messages = build_messages(system, prompt, context=context)
|
|
response = await self._llm.chat(messages, model=model or plan.model)
|
|
log.info(
|
|
"atlasbot_llm_call",
|
|
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
|
|
)
|
|
if self._settings.debug_pipeline and tag in debug_tags:
|
|
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
|
|
return response
|
|
|
|
state = self._get_state(conversation_id)
|
|
snapshot = self._snapshot.get()
|
|
snapshot_used = snapshot
|
|
if self._settings.snapshot_pin_enabled and state and state.snapshot:
|
|
snapshot_used = state.snapshot
|
|
summary = build_summary(snapshot_used)
|
|
allowed_nodes = _allowed_nodes(summary)
|
|
allowed_namespaces = _allowed_namespaces(summary)
|
|
summary_lines = _summary_lines(snapshot_used)
|
|
kb_summary = self._kb.summary()
|
|
runbooks = self._kb.runbook_titles(limit=6)
|
|
runbook_paths = self._kb.runbook_paths(limit=10)
|
|
history_ctx = _format_history(history)
|
|
lexicon_ctx = _lexicon_context(summary)
|
|
key_facts: list[str] = []
|
|
metric_facts: list[str] = []
|
|
|
|
started = time.monotonic()
|
|
reply = ""
|
|
scores = _default_scores()
|
|
claims: list[ClaimItem] = []
|
|
classify: dict[str, Any] = {}
|
|
tool_hint: dict[str, Any] | None = None
|
|
try:
|
|
if observer:
|
|
observer("normalize", "normalizing")
|
|
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
|
|
normalize_raw = await call_llm(
|
|
prompts.NORMALIZE_SYSTEM,
|
|
normalize_prompt,
|
|
context=lexicon_ctx,
|
|
model=plan.fast_model,
|
|
tag="normalize",
|
|
)
|
|
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
|
|
normalized = str(normalize.get("normalized") or question).strip() or question
|
|
keywords = normalize.get("keywords") or []
|
|
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
|
|
keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords)
|
|
|
|
if observer:
|
|
observer("route", "routing")
|
|
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
|
|
route_raw = await call_llm(
|
|
prompts.ROUTE_SYSTEM,
|
|
route_prompt,
|
|
context=_join_context([kb_summary, lexicon_ctx]),
|
|
model=plan.fast_model,
|
|
tag="route",
|
|
)
|
|
classify = _parse_json_block(route_raw, fallback={})
|
|
classify.setdefault("needs_snapshot", True)
|
|
classify.setdefault("answer_style", "direct")
|
|
classify.setdefault("follow_up", False)
|
|
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
|
|
cluster_terms = (
|
|
"atlas",
|
|
"cluster",
|
|
"node",
|
|
"nodes",
|
|
"namespace",
|
|
"pod",
|
|
"workload",
|
|
"k8s",
|
|
"kubernetes",
|
|
"postgres",
|
|
"database",
|
|
"db",
|
|
"connections",
|
|
"cpu",
|
|
"ram",
|
|
"memory",
|
|
"network",
|
|
"io",
|
|
"disk",
|
|
"pvc",
|
|
"storage",
|
|
)
|
|
if any(term in normalized.lower() for term in cluster_terms):
|
|
classify["needs_snapshot"] = True
|
|
if re.search(r"\b(how many|count|number of|list)\b", normalized.lower()):
|
|
classify["question_type"] = "metric"
|
|
hottest_terms = ("hottest", "highest", "lowest", "most")
|
|
metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage")
|
|
lowered_question = normalized.lower()
|
|
if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms):
|
|
classify["question_type"] = "metric"
|
|
|
|
if classify.get("follow_up") and state and state.claims:
|
|
if observer:
|
|
observer("followup", "answering follow-up")
|
|
reply = await self._answer_followup(question, state, summary, classify, plan, call_llm)
|
|
scores = await self._score_answer(question, reply, plan, call_llm)
|
|
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
|
|
return AnswerResult(reply, scores, meta)
|
|
|
|
if observer:
|
|
observer("decompose", "decomposing")
|
|
decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2)
|
|
decompose_raw = await call_llm(
|
|
prompts.DECOMPOSE_SYSTEM,
|
|
decompose_prompt + "\nQuestion: " + normalized,
|
|
context=lexicon_ctx,
|
|
model=plan.fast_model if mode == "quick" else plan.model,
|
|
tag="decompose",
|
|
)
|
|
parts = _parse_json_list(decompose_raw)
|
|
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
|
|
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
|
|
keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords)
|
|
|
|
snapshot_context = ""
|
|
if classify.get("needs_snapshot"):
|
|
if observer:
|
|
observer("retrieve", "scoring chunks")
|
|
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
|
|
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
|
|
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
|
|
key_facts = _key_fact_lines(summary_lines, keyword_tokens)
|
|
hottest_facts = _extract_hottest_facts(summary_lines, f"{question} {normalized}")
|
|
hardware_facts = _extract_hardware_usage_facts(summary_lines, f"{question} {normalized}")
|
|
metric_facts = [line for line in key_facts if re.search(r"\d", line)]
|
|
if hardware_facts:
|
|
metric_facts = hardware_facts
|
|
key_facts = _merge_fact_lines(metric_facts, key_facts)
|
|
if hottest_facts and not hardware_facts:
|
|
metric_facts = hottest_facts
|
|
key_facts = _merge_fact_lines(metric_facts, key_facts)
|
|
if classify.get("question_type") in {"metric", "diagnostic"} and not hottest_facts and not hardware_facts:
|
|
metric_candidates = _metric_candidate_lines(summary_lines, keyword_tokens)
|
|
selected_facts = await _select_metric_facts(call_llm, normalized, metric_candidates, plan)
|
|
if selected_facts:
|
|
metric_facts = selected_facts
|
|
key_facts = _merge_fact_lines(metric_facts, key_facts)
|
|
if self._settings.debug_pipeline:
|
|
_debug_log("metric_facts_selected", {"facts": metric_facts})
|
|
if self._settings.debug_pipeline:
|
|
scored_preview = sorted(
|
|
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
|
|
key=lambda item: item["score"],
|
|
reverse=True,
|
|
)[: min(len(chunks), max(plan.chunk_top, 6))]
|
|
_debug_log(
|
|
"chunk_selected",
|
|
{
|
|
"selected_ids": [item["id"] for item in selected],
|
|
"top_scored": scored_preview,
|
|
},
|
|
)
|
|
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
|
|
if key_facts:
|
|
snapshot_context = "KeyFacts:\n" + "\n".join(key_facts) + "\n\n" + snapshot_context
|
|
|
|
context = _join_context(
|
|
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
|
|
)
|
|
|
|
if plan.use_tool and classify.get("needs_tool"):
|
|
if observer:
|
|
observer("tool", "suggesting tools")
|
|
tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized
|
|
tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool")
|
|
tool_hint = _parse_json_block(tool_raw, fallback={})
|
|
|
|
if observer:
|
|
observer("subanswers", "drafting subanswers")
|
|
subanswers: list[str] = []
|
|
for subq in sub_questions:
|
|
sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq
|
|
sub_answer = await call_llm(prompts.ANSWER_SYSTEM, sub_prompt, context=context, model=plan.model, tag="subanswer")
|
|
subanswers.append(sub_answer)
|
|
|
|
if observer:
|
|
observer("synthesize", "synthesizing")
|
|
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
|
|
|
|
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
|
|
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
|
|
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
|
|
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
|
|
needs_evidence = _needs_evidence_fix(reply, classify)
|
|
resolved_runbook = None
|
|
if runbook_paths and (runbook_fix or runbook_needed):
|
|
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
|
|
resolver_raw = await call_llm(
|
|
prompts.RUNBOOK_SELECT_SYSTEM,
|
|
resolver_prompt,
|
|
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
|
|
model=plan.fast_model,
|
|
tag="runbook_select",
|
|
)
|
|
resolver = _parse_json_block(resolver_raw, fallback={})
|
|
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
|
|
if candidate and candidate in runbook_paths:
|
|
resolved_runbook = candidate
|
|
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
|
|
if observer:
|
|
observer("evidence_fix", "repairing missing evidence")
|
|
extra_bits = []
|
|
if unknown_nodes:
|
|
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
|
|
if unknown_namespaces:
|
|
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
|
|
if runbook_paths:
|
|
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
|
|
if resolved_runbook:
|
|
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
|
|
if metric_facts:
|
|
extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4]))
|
|
if allowed_nodes:
|
|
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
|
|
if allowed_namespaces:
|
|
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
|
|
fix_prompt = (
|
|
prompts.EVIDENCE_FIX_PROMPT
|
|
+ "\nQuestion: "
|
|
+ normalized
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
|
|
)
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
fix_prompt,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="evidence_fix",
|
|
)
|
|
if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply):
|
|
if observer:
|
|
observer("runbook_enforce", "enforcing runbook path")
|
|
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook)
|
|
reply = await call_llm(
|
|
prompts.RUNBOOK_ENFORCE_SYSTEM,
|
|
enforce_prompt + "\nAnswer: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="runbook_enforce",
|
|
)
|
|
if runbook_paths:
|
|
invalid = [
|
|
token
|
|
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)
|
|
if token.lower() not in {p.lower() for p in runbook_paths}
|
|
]
|
|
if invalid:
|
|
if observer:
|
|
observer("runbook_enforce", "replacing invalid runbook path")
|
|
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
|
|
resolver_raw = await call_llm(
|
|
prompts.RUNBOOK_SELECT_SYSTEM,
|
|
resolver_prompt,
|
|
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
|
|
model=plan.fast_model,
|
|
tag="runbook_select",
|
|
)
|
|
resolver = _parse_json_block(resolver_raw, fallback={})
|
|
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
|
|
if not (candidate and candidate in runbook_paths):
|
|
candidate = _best_runbook_match(invalid[0], runbook_paths)
|
|
if candidate and candidate in runbook_paths:
|
|
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
|
|
reply = await call_llm(
|
|
prompts.RUNBOOK_ENFORCE_SYSTEM,
|
|
enforce_prompt + "\nAnswer: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="runbook_enforce",
|
|
)
|
|
reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces)
|
|
|
|
if _needs_focus_fix(normalized, reply, classify):
|
|
if observer:
|
|
observer("focus_fix", "tightening answer")
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply,
|
|
context=context,
|
|
model=plan.model,
|
|
tag="focus_fix",
|
|
)
|
|
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts:
|
|
best_line = None
|
|
lowered_keywords = [kw.lower() for kw in keyword_tokens if kw]
|
|
for line in metric_facts:
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in lowered_keywords):
|
|
best_line = line
|
|
break
|
|
best_line = best_line or metric_facts[0]
|
|
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
|
|
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
|
|
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
|
|
reply = f"From the latest snapshot: {best_line}."
|
|
|
|
if plan.use_critic:
|
|
if observer:
|
|
observer("critic", "reviewing")
|
|
critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
|
|
critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic")
|
|
critic = _parse_json_block(critic_raw, fallback={})
|
|
if critic.get("issues"):
|
|
revise_prompt = (
|
|
prompts.REVISION_PROMPT
|
|
+ "\nQuestion: "
|
|
+ normalized
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ "\nCritique: "
|
|
+ json.dumps(critic)
|
|
)
|
|
reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise")
|
|
|
|
if plan.use_gap:
|
|
if observer:
|
|
observer("gap", "checking gaps")
|
|
gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
|
|
gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap")
|
|
gap = _parse_json_block(gap_raw, fallback={})
|
|
note = str(gap.get("note") or "").strip()
|
|
if note:
|
|
reply = f"{reply}\n\n{note}"
|
|
|
|
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts:
|
|
reply = _metric_fact_guard(reply, metric_facts, keyword_tokens)
|
|
|
|
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
|
|
|
|
scores = await self._score_answer(normalized, reply, plan, call_llm)
|
|
claims = await self._extract_claims(normalized, reply, summary, call_llm)
|
|
except LLMLimitReached:
|
|
if not reply:
|
|
reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass."
|
|
scores = _default_scores()
|
|
finally:
|
|
elapsed = round(time.monotonic() - started, 2)
|
|
log.info(
|
|
"atlasbot_answer",
|
|
extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}},
|
|
)
|
|
|
|
if conversation_id and claims:
|
|
self._store_state(conversation_id, claims, summary, snapshot_used)
|
|
|
|
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
|
|
return AnswerResult(reply, scores, meta)
|
|
|
|
async def _answer_stock(self, question: str) -> AnswerResult:
|
|
messages = build_messages(prompts.STOCK_SYSTEM, question)
|
|
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
|
|
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
|
|
|
|
async def _synthesize_answer(
|
|
self,
|
|
question: str,
|
|
subanswers: list[str],
|
|
context: str,
|
|
classify: dict[str, Any],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> str:
|
|
style_hint = _style_hint(classify)
|
|
if not subanswers:
|
|
prompt = (
|
|
prompts.SYNTHESIZE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nStyle: "
|
|
+ style_hint
|
|
+ "\nQuestionType: "
|
|
+ (classify.get("question_type") or "unknown")
|
|
)
|
|
return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")
|
|
draft_prompts = []
|
|
for idx in range(plan.drafts):
|
|
draft_prompts.append(
|
|
prompts.SYNTHESIZE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nStyle: "
|
|
+ style_hint
|
|
+ "\nQuestionType: "
|
|
+ (classify.get("question_type") or "unknown")
|
|
+ "\nSubanswers:\n"
|
|
+ "\n".join([f"- {item}" for item in subanswers])
|
|
+ f"\nDraftIndex: {idx + 1}"
|
|
)
|
|
drafts: list[str] = []
|
|
for prompt in draft_prompts:
|
|
drafts.append(await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth"))
|
|
if len(drafts) == 1:
|
|
return drafts[0]
|
|
select_prompt = (
|
|
prompts.DRAFT_SELECT_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nDrafts:\n"
|
|
+ "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)])
|
|
)
|
|
select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select")
|
|
selection = _parse_json_block(select_raw, fallback={})
|
|
idx = int(selection.get("best", 1)) - 1
|
|
if 0 <= idx < len(drafts):
|
|
return drafts[idx]
|
|
return drafts[0]
|
|
|
|
async def _score_answer(
|
|
self,
|
|
question: str,
|
|
reply: str,
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> AnswerScores:
|
|
if not plan.use_scores:
|
|
return _default_scores()
|
|
prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
|
|
raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score")
|
|
data = _parse_json_block(raw, fallback={})
|
|
return _scores_from_json(data)
|
|
|
|
async def _extract_claims(
|
|
self,
|
|
question: str,
|
|
reply: str,
|
|
summary: dict[str, Any],
|
|
call_llm: Callable[..., Any],
|
|
) -> list[ClaimItem]:
|
|
if not reply or not summary:
|
|
return []
|
|
summary_json = _json_excerpt(summary)
|
|
prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
|
|
raw = await call_llm(prompts.CLAIM_SYSTEM, prompt, context=f"SnapshotSummaryJson:{summary_json}", model=self._settings.ollama_model_fast, tag="claim_map")
|
|
data = _parse_json_block(raw, fallback={})
|
|
claims_raw = data.get("claims") if isinstance(data, dict) else None
|
|
claims: list[ClaimItem] = []
|
|
if isinstance(claims_raw, list):
|
|
for entry in claims_raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
claim_text = str(entry.get("claim") or "").strip()
|
|
claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}"
|
|
evidence_items: list[EvidenceItem] = []
|
|
for ev in entry.get("evidence") or []:
|
|
if not isinstance(ev, dict):
|
|
continue
|
|
path = str(ev.get("path") or "").strip()
|
|
if not path:
|
|
continue
|
|
reason = str(ev.get("reason") or "").strip()
|
|
value = _resolve_path(summary, path)
|
|
evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value))
|
|
if claim_text and evidence_items:
|
|
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
|
return claims
|
|
|
|
async def _dedup_reply(
|
|
self,
|
|
reply: str,
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
tag: str,
|
|
) -> str:
|
|
if not _needs_dedup(reply):
|
|
return reply
|
|
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
|
|
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
|
|
|
|
async def _answer_followup(
|
|
self,
|
|
question: str,
|
|
state: ConversationState,
|
|
summary: dict[str, Any],
|
|
classify: dict[str, Any],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> str:
|
|
claim_ids = await self._select_claims(question, state.claims, plan, call_llm)
|
|
selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2]
|
|
evidence_lines = []
|
|
for claim in selected:
|
|
evidence_lines.append(f"Claim: {claim.claim}")
|
|
for ev in claim.evidence:
|
|
current = _resolve_path(summary, ev.path)
|
|
ev.value = current
|
|
delta_note = ""
|
|
if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim:
|
|
delta_note = f" (now {current})"
|
|
evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}")
|
|
evidence_ctx = "\n".join(evidence_lines)
|
|
prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx
|
|
reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup")
|
|
allowed_nodes = _allowed_nodes(summary)
|
|
allowed_namespaces = _allowed_namespaces(summary)
|
|
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
|
|
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
|
|
extra_bits = []
|
|
if unknown_nodes:
|
|
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
|
|
if unknown_namespaces:
|
|
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
|
|
if allowed_nodes:
|
|
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
|
|
if allowed_namespaces:
|
|
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
|
|
if extra_bits:
|
|
fix_prompt = (
|
|
prompts.EVIDENCE_FIX_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nDraft: "
|
|
+ reply
|
|
+ "\n"
|
|
+ "\n".join(extra_bits)
|
|
)
|
|
reply = await call_llm(
|
|
prompts.EVIDENCE_FIX_SYSTEM,
|
|
fix_prompt,
|
|
context="Evidence:\n" + evidence_ctx,
|
|
model=plan.model,
|
|
tag="followup_fix",
|
|
)
|
|
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup")
|
|
return reply
|
|
|
|
async def _select_claims(
|
|
self,
|
|
question: str,
|
|
claims: list[ClaimItem],
|
|
plan: ModePlan,
|
|
call_llm: Callable[..., Any],
|
|
) -> list[str]:
|
|
if not claims:
|
|
return []
|
|
claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims]
|
|
prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief)
|
|
raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims")
|
|
data = _parse_json_block(raw, fallback={})
|
|
ids = data.get("claim_ids") if isinstance(data, dict) else []
|
|
if isinstance(ids, list):
|
|
return [str(item) for item in ids if item]
|
|
return []
|
|
|
|
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
|
|
if not conversation_id:
|
|
return None
|
|
state_payload = self._store.get(conversation_id)
|
|
return _state_from_payload(state_payload) if state_payload else None
|
|
|
|
def _store_state(
|
|
self,
|
|
conversation_id: str,
|
|
claims: list[ClaimItem],
|
|
summary: dict[str, Any],
|
|
snapshot: dict[str, Any] | None,
|
|
) -> None:
|
|
snapshot_id = _snapshot_id(summary)
|
|
pinned_snapshot = snapshot if self._settings.snapshot_pin_enabled else None
|
|
payload = {
|
|
"updated_at": time.monotonic(),
|
|
"claims": _claims_to_payload(claims),
|
|
"snapshot_id": snapshot_id,
|
|
"snapshot": pinned_snapshot,
|
|
}
|
|
self._store.set(conversation_id, payload)
|
|
|
|
def _cleanup_state(self) -> None:
|
|
self._store.cleanup()
|
|
|
|
|
|
def _build_meta(
|
|
mode: str,
|
|
call_count: int,
|
|
call_cap: int,
|
|
limit_hit: bool,
|
|
classify: dict[str, Any],
|
|
tool_hint: dict[str, Any] | None,
|
|
started: float,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"mode": mode,
|
|
"llm_calls": call_count,
|
|
"llm_limit": call_cap,
|
|
"llm_limit_hit": limit_hit,
|
|
"classify": classify,
|
|
"tool_hint": tool_hint,
|
|
"elapsed_sec": round(time.monotonic() - started, 2),
|
|
}
|
|
|
|
|
|
def _mode_plan(settings: Settings, mode: str) -> ModePlan:
|
|
if mode == "genius":
|
|
return ModePlan(
|
|
model=settings.ollama_model_genius,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=6,
|
|
chunk_lines=6,
|
|
chunk_top=10,
|
|
chunk_group=4,
|
|
use_tool=True,
|
|
use_critic=True,
|
|
use_gap=True,
|
|
use_scores=True,
|
|
drafts=2,
|
|
)
|
|
if mode == "smart":
|
|
return ModePlan(
|
|
model=settings.ollama_model_smart,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=4,
|
|
chunk_lines=8,
|
|
chunk_top=8,
|
|
chunk_group=4,
|
|
use_tool=True,
|
|
use_critic=True,
|
|
use_gap=True,
|
|
use_scores=True,
|
|
drafts=1,
|
|
)
|
|
return ModePlan(
|
|
model=settings.ollama_model_fast,
|
|
fast_model=settings.ollama_model_fast,
|
|
max_subquestions=2,
|
|
chunk_lines=12,
|
|
chunk_top=5,
|
|
chunk_group=5,
|
|
use_tool=False,
|
|
use_critic=False,
|
|
use_gap=False,
|
|
use_scores=False,
|
|
drafts=1,
|
|
)
|
|
|
|
|
|
def _llm_call_limit(settings: Settings, mode: str) -> int:
|
|
if mode == "genius":
|
|
return settings.genius_llm_calls_max
|
|
if mode == "smart":
|
|
return settings.smart_llm_calls_max
|
|
return settings.fast_llm_calls_max
|
|
|
|
|
|
def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]:
|
|
if not parts:
|
|
return [fallback]
|
|
ranked = []
|
|
for entry in parts:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
question = str(entry.get("question") or "").strip()
|
|
if not question:
|
|
continue
|
|
priority = entry.get("priority")
|
|
try:
|
|
weight = float(priority)
|
|
except (TypeError, ValueError):
|
|
weight = 1.0
|
|
ranked.append((weight, question))
|
|
ranked.sort(key=lambda item: item[0], reverse=True)
|
|
questions = [item[1] for item in ranked][:limit]
|
|
return questions or [fallback]
|
|
|
|
|
|
def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]:
|
|
chunks: list[dict[str, Any]] = []
|
|
if not lines:
|
|
return chunks
|
|
for idx in range(0, len(lines), lines_per_chunk):
|
|
chunk_lines = lines[idx : idx + lines_per_chunk]
|
|
text = "\n".join(chunk_lines)
|
|
summary = " | ".join(chunk_lines[:4])
|
|
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
|
|
return chunks
|
|
|
|
|
|
async def _score_chunks(
|
|
call_llm: Callable[..., Any],
|
|
chunks: list[dict[str, Any]],
|
|
question: str,
|
|
sub_questions: list[str],
|
|
plan: ModePlan,
|
|
) -> dict[str, float]:
|
|
scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks}
|
|
if not chunks:
|
|
return scores
|
|
group: list[dict[str, Any]] = []
|
|
for chunk in chunks:
|
|
group.append({"id": chunk["id"], "summary": chunk["summary"]})
|
|
if len(group) >= plan.chunk_group:
|
|
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
|
|
group = []
|
|
if group:
|
|
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
|
|
return scores
|
|
|
|
|
|
async def _score_chunk_group(
|
|
call_llm: Callable[..., Any],
|
|
group: list[dict[str, Any]],
|
|
question: str,
|
|
sub_questions: list[str],
|
|
) -> dict[str, float]:
|
|
prompt = (
|
|
prompts.CHUNK_SCORE_PROMPT
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nSubQuestions: "
|
|
+ json.dumps(sub_questions)
|
|
+ "\nChunks: "
|
|
+ json.dumps(group)
|
|
)
|
|
raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score")
|
|
data = _parse_json_list(raw)
|
|
scored: dict[str, float] = {}
|
|
for entry in data:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
cid = str(entry.get("id") or "").strip()
|
|
if not cid:
|
|
continue
|
|
try:
|
|
score = float(entry.get("score") or 0)
|
|
except (TypeError, ValueError):
|
|
score = 0.0
|
|
scored[cid] = score
|
|
return scored
|
|
|
|
|
|
def _select_chunks(
|
|
chunks: list[dict[str, Any]],
|
|
scores: dict[str, float],
|
|
plan: ModePlan,
|
|
keywords: list[str] | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
if not chunks:
|
|
return []
|
|
ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True)
|
|
selected: list[dict[str, Any]] = []
|
|
head = chunks[0]
|
|
selected.append(head)
|
|
keyword_hits: list[dict[str, Any]] = []
|
|
raw_keywords = [kw.lower() for kw in (keywords or []) if kw]
|
|
focused = _focused_keywords(keywords or [])
|
|
if focused:
|
|
lowered = [kw.lower() for kw in focused if kw]
|
|
for item in ranked:
|
|
text = item.get("text", "").lower()
|
|
if any(kw in text for kw in lowered):
|
|
keyword_hits.append(item)
|
|
if raw_keywords:
|
|
for item in ranked:
|
|
if len(keyword_hits) >= plan.chunk_top:
|
|
break
|
|
text = item.get("text", "").lower()
|
|
if any(kw in text for kw in raw_keywords):
|
|
keyword_hits.append(item)
|
|
for item in keyword_hits:
|
|
if len(selected) >= plan.chunk_top:
|
|
break
|
|
if item in selected:
|
|
continue
|
|
selected.append(item)
|
|
for item in ranked:
|
|
if len(selected) >= plan.chunk_top:
|
|
break
|
|
if item is head:
|
|
continue
|
|
selected.append(item)
|
|
return selected
|
|
|
|
|
|
def _format_runbooks(runbooks: list[str]) -> str:
|
|
if not runbooks:
|
|
return ""
|
|
return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks])
|
|
|
|
|
|
def _join_context(parts: list[str]) -> str:
|
|
text = "\n".join([part for part in parts if part])
|
|
return text.strip()
|
|
|
|
|
|
def _format_history(history: list[dict[str, str]] | None) -> str:
|
|
if not history:
|
|
return ""
|
|
lines = ["Recent conversation (non-authoritative):"]
|
|
for entry in history[-4:]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
question = entry.get("q")
|
|
answer = entry.get("a")
|
|
role = entry.get("role")
|
|
content = entry.get("content")
|
|
if question:
|
|
lines.append(f"Q: {question}")
|
|
if answer:
|
|
lines.append(f"A: {answer}")
|
|
if role and content:
|
|
prefix = "Q" if role == "user" else "A"
|
|
lines.append(f"{prefix}: {content}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
|
|
text = summary_text(snapshot)
|
|
if not text:
|
|
return []
|
|
return [line for line in text.splitlines() if line.strip()]
|
|
|
|
|
|
def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]:
|
|
if not lines or not keywords:
|
|
return []
|
|
lowered = [kw.lower() for kw in keywords if kw]
|
|
if not lowered:
|
|
return []
|
|
focused = _focused_keywords(lowered)
|
|
primary = focused or lowered
|
|
matches: list[str] = []
|
|
for line in lines:
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in primary):
|
|
matches.append(line)
|
|
if len(matches) >= limit:
|
|
break
|
|
if len(matches) < limit and focused:
|
|
for line in lines:
|
|
if len(matches) >= limit:
|
|
break
|
|
if line in matches:
|
|
continue
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in lowered):
|
|
matches.append(line)
|
|
return matches
|
|
|
|
|
|
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
|
|
seen = set()
|
|
merged: list[str] = []
|
|
for line in primary + fallback:
|
|
if line in seen:
|
|
continue
|
|
seen.add(line)
|
|
merged.append(line)
|
|
return merged
|
|
|
|
|
|
def _expand_hottest_line(line: str) -> list[str]:
|
|
if not line:
|
|
return []
|
|
if not line.lower().startswith("hottest:"):
|
|
return []
|
|
expanded: list[str] = []
|
|
payload = line.split("hottest:", 1)[1]
|
|
for part in payload.split(";"):
|
|
part = part.strip()
|
|
if not part or "=" not in part:
|
|
continue
|
|
metric, rest = part.split("=", 1)
|
|
metric = metric.strip()
|
|
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
|
|
if not match:
|
|
continue
|
|
node = match.group("node").strip()
|
|
value = match.group("value").strip()
|
|
expanded.append(f"hottest_{metric}_node: {node} ({value})")
|
|
return expanded
|
|
|
|
|
|
def _extract_hottest_facts(lines: list[str], question: str) -> list[str]:
|
|
if not lines:
|
|
return []
|
|
lowered = question.lower()
|
|
if "hardware" in lowered or "class" in lowered:
|
|
return []
|
|
if not any(
|
|
term in lowered
|
|
for term in ("hottest", "hot", "highest", "lowest", "most", "top", "peak", "loaded", "load", "busy")
|
|
):
|
|
return []
|
|
if "node" not in lowered and "nodes" not in lowered:
|
|
return []
|
|
line = next((item for item in lines if item.lower().startswith("hottest:")), "")
|
|
if not line:
|
|
return []
|
|
facts = _expand_hottest_line(line)
|
|
if not facts:
|
|
return []
|
|
wanted = []
|
|
if any(term in lowered for term in ("cpu", "processor")):
|
|
wanted.append("hottest_cpu_node")
|
|
if any(term in lowered for term in ("ram", "memory")):
|
|
wanted.append("hottest_ram_node")
|
|
if any(term in lowered for term in ("net", "network", "throughput")):
|
|
wanted.append("hottest_net_node")
|
|
if "io" in lowered or "i/o" in lowered:
|
|
wanted.append("hottest_io_node")
|
|
if "disk" in lowered or "storage" in lowered:
|
|
wanted.append("hottest_disk_node")
|
|
if not wanted:
|
|
return facts
|
|
return [fact for fact in facts if any(label in fact for label in wanted)] or facts
|
|
|
|
|
|
def _extract_hardware_usage_facts(lines: list[str], question: str) -> list[str]:
|
|
if not lines:
|
|
return []
|
|
lowered = question.lower()
|
|
if "hardware" not in lowered:
|
|
return []
|
|
if not any(term in lowered for term in ("average", "avg", "mean", "load", "cpu", "ram", "memory")):
|
|
return []
|
|
avg_line = None
|
|
top_line = None
|
|
for line in lines:
|
|
if line.startswith("hardware_usage_avg:"):
|
|
avg_line = line
|
|
elif line.startswith("hardware_usage_top:"):
|
|
top_line = line
|
|
if not avg_line and not top_line:
|
|
return []
|
|
wants_top = any(term in lowered for term in ("highest", "lowest", "most", "least", "top", "worst", "best"))
|
|
if wants_top and top_line:
|
|
return [top_line]
|
|
facts: list[str] = []
|
|
if avg_line:
|
|
facts.append(avg_line)
|
|
if top_line:
|
|
facts.append(top_line)
|
|
return facts
|
|
|
|
|
|
def _metric_candidate_lines(lines: list[str], keywords: list[str] | None, limit: int = 40) -> list[str]:
|
|
if not lines:
|
|
return []
|
|
lowered = [kw.lower() for kw in (keywords or []) if kw]
|
|
prefer_node = any("node" in kw for kw in lowered) or "hottest" in lowered
|
|
metric_tokens = {
|
|
"cpu",
|
|
"ram",
|
|
"memory",
|
|
"net",
|
|
"network",
|
|
"io",
|
|
"disk",
|
|
"load",
|
|
"usage",
|
|
"utilization",
|
|
"hottest",
|
|
"p95",
|
|
"percent",
|
|
"pressure",
|
|
}
|
|
candidates: list[str] = []
|
|
for line in lines:
|
|
if line.lower().startswith("hottest:"):
|
|
candidates.extend(_expand_hottest_line(line))
|
|
break
|
|
for line in lines:
|
|
line_lower = line.lower()
|
|
if line_lower.startswith("lexicon_") or line_lower.startswith("units:"):
|
|
continue
|
|
if prefer_node and "pod_" in line_lower:
|
|
continue
|
|
if "hottest:" in line_lower:
|
|
candidates.append(line)
|
|
continue
|
|
if lowered and any(kw in line_lower for kw in lowered):
|
|
candidates.append(line)
|
|
continue
|
|
if any(token in line_lower for token in metric_tokens) and re.search(r"\d", line_lower):
|
|
candidates.append(line)
|
|
continue
|
|
return candidates[:limit]
|
|
|
|
|
|
async def _select_metric_facts(
|
|
call_llm: Callable[..., Any],
|
|
question: str,
|
|
candidates: list[str],
|
|
plan: ModePlan,
|
|
max_lines: int = 2,
|
|
) -> list[str]:
|
|
if not candidates:
|
|
return []
|
|
prompt = (
|
|
prompts.FACT_SELECT_PROMPT.format(max_lines=max_lines)
|
|
+ "\nQuestion: "
|
|
+ question
|
|
+ "\nCandidates:\n"
|
|
+ "\n".join([f"- {line}" for line in candidates])
|
|
)
|
|
raw = await call_llm(prompts.FACT_SELECT_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
|
|
data = _parse_json_block(raw, fallback={})
|
|
lines = data.get("lines") if isinstance(data, dict) else None
|
|
if not isinstance(lines, list):
|
|
return []
|
|
cleaned = []
|
|
allowed = set(candidates)
|
|
for line in lines:
|
|
if isinstance(line, str) and line in allowed and line not in cleaned:
|
|
cleaned.append(line)
|
|
if len(cleaned) >= max_lines:
|
|
break
|
|
return cleaned
|
|
|
|
|
|
def _metric_fact_guard(reply: str, metric_facts: list[str], keywords: list[str]) -> str:
|
|
if not metric_facts:
|
|
return reply
|
|
best_line = None
|
|
lowered_keywords = [kw.lower() for kw in keywords if kw]
|
|
for line in metric_facts:
|
|
line_lower = line.lower()
|
|
if any(kw in line_lower for kw in lowered_keywords):
|
|
best_line = line
|
|
break
|
|
best_line = best_line or metric_facts[0]
|
|
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
|
|
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
|
|
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
|
|
return f"From the latest snapshot: {best_line}."
|
|
return reply
|
|
|
|
|
|
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
|
|
if not reply:
|
|
return reply
|
|
if not unknown_nodes and not unknown_namespaces:
|
|
return reply
|
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
|
|
if not sentences:
|
|
return reply
|
|
lowered_nodes = [node.lower() for node in unknown_nodes]
|
|
lowered_namespaces = [ns.lower() for ns in unknown_namespaces]
|
|
kept: list[str] = []
|
|
for sent in sentences:
|
|
lower = sent.lower()
|
|
if lowered_nodes and any(node in lower for node in lowered_nodes):
|
|
continue
|
|
if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces):
|
|
continue
|
|
kept.append(sent)
|
|
cleaned = " ".join(kept).strip()
|
|
return cleaned or reply
|
|
|
|
|
|
def _lexicon_context(summary: dict[str, Any]) -> str:
|
|
if not isinstance(summary, dict):
|
|
return ""
|
|
lexicon = summary.get("lexicon")
|
|
if not isinstance(lexicon, dict):
|
|
return ""
|
|
terms = lexicon.get("terms")
|
|
aliases = lexicon.get("aliases")
|
|
lines: list[str] = []
|
|
if isinstance(terms, list):
|
|
for entry in terms[:8]:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
term = entry.get("term")
|
|
meaning = entry.get("meaning")
|
|
if term and meaning:
|
|
lines.append(f"{term}: {meaning}")
|
|
if isinstance(aliases, dict):
|
|
for key, value in list(aliases.items())[:6]:
|
|
if key and value:
|
|
lines.append(f"alias {key} -> {value}")
|
|
if not lines:
|
|
return ""
|
|
return "Lexicon:\n" + "\n".join(lines)
|
|
|
|
|
|
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
|
|
raw = text.strip()
|
|
match = re.search(r"\{.*\}", raw, flags=re.S)
|
|
if match:
|
|
return parse_json(match.group(0), fallback=fallback)
|
|
return parse_json(raw, fallback=fallback)
|
|
|
|
|
|
def _parse_json_list(text: str) -> list[dict[str, Any]]:
|
|
raw = text.strip()
|
|
match = re.search(r"\[.*\]", raw, flags=re.S)
|
|
data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={})
|
|
if isinstance(data, list):
|
|
return [entry for entry in data if isinstance(entry, dict)]
|
|
return []
|
|
|
|
|
|
def _scores_from_json(data: dict[str, Any]) -> AnswerScores:
|
|
return AnswerScores(
|
|
confidence=_coerce_int(data.get("confidence"), 60),
|
|
relevance=_coerce_int(data.get("relevance"), 60),
|
|
satisfaction=_coerce_int(data.get("satisfaction"), 60),
|
|
hallucination_risk=str(data.get("hallucination_risk") or "medium"),
|
|
)
|
|
|
|
|
|
def _coerce_int(value: Any, default: int) -> int:
|
|
try:
|
|
return int(float(value))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _default_scores() -> AnswerScores:
|
|
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
|
|
|
|
|
|
def _style_hint(classify: dict[str, Any]) -> str:
|
|
style = (classify.get("answer_style") or "").strip().lower()
|
|
qtype = (classify.get("question_type") or "").strip().lower()
|
|
if style == "insightful" or qtype in {"open_ended", "planning"}:
|
|
return "insightful"
|
|
return "direct"
|
|
|
|
|
|
def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
|
|
if not reply:
|
|
return False
|
|
lowered = reply.lower()
|
|
missing_markers = (
|
|
"don't have",
|
|
"do not have",
|
|
"don't know",
|
|
"cannot",
|
|
"can't",
|
|
"need to",
|
|
"would need",
|
|
"does not provide",
|
|
"does not mention",
|
|
"not mention",
|
|
"not provided",
|
|
"not in context",
|
|
"not referenced",
|
|
"missing",
|
|
"no specific",
|
|
"no information",
|
|
)
|
|
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
|
|
return True
|
|
if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _needs_dedup(reply: str) -> bool:
|
|
if not reply:
|
|
return False
|
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
|
|
if len(sentences) < 3:
|
|
return False
|
|
seen = set()
|
|
for sent in sentences:
|
|
norm = re.sub(r"\\s+", " ", sent.lower())
|
|
if norm in seen:
|
|
return True
|
|
seen.add(norm)
|
|
return False
|
|
|
|
|
|
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
|
|
if not reply:
|
|
return False
|
|
q_lower = (question or "").lower()
|
|
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
|
|
return False
|
|
if reply.count(".") <= 1:
|
|
return False
|
|
extra_markers = ("for more", "if you need", "additional", "based on")
|
|
return any(marker in reply.lower() for marker in extra_markers)
|
|
|
|
|
|
def _extract_keywords(
|
|
raw_question: str,
|
|
normalized: str,
|
|
sub_questions: list[str],
|
|
keywords: list[Any] | None,
|
|
) -> list[str]:
|
|
stopwords = {
|
|
"the",
|
|
"and",
|
|
"for",
|
|
"with",
|
|
"that",
|
|
"this",
|
|
"what",
|
|
"which",
|
|
"when",
|
|
"where",
|
|
"who",
|
|
"why",
|
|
"how",
|
|
"tell",
|
|
"show",
|
|
"list",
|
|
"give",
|
|
"about",
|
|
"right",
|
|
"now",
|
|
}
|
|
tokens: list[str] = []
|
|
for source in [raw_question, normalized, *sub_questions]:
|
|
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
|
|
if len(part) < 3 or part in stopwords:
|
|
continue
|
|
tokens.append(part)
|
|
if keywords:
|
|
for kw in keywords:
|
|
if isinstance(kw, str):
|
|
part = kw.strip().lower()
|
|
if part and part not in stopwords and part not in tokens:
|
|
tokens.append(part)
|
|
return list(dict.fromkeys(tokens))[:12]
|
|
|
|
|
|
def _focused_keywords(tokens: list[str]) -> list[str]:
|
|
generic = {
|
|
"atlas",
|
|
"cluster",
|
|
"node",
|
|
"nodes",
|
|
"pod",
|
|
"pods",
|
|
"namespace",
|
|
"namespaces",
|
|
"k8s",
|
|
"kubernetes",
|
|
"service",
|
|
"services",
|
|
"workload",
|
|
"workloads",
|
|
}
|
|
scored: list[tuple[int, str]] = []
|
|
for token in tokens:
|
|
if not token or token in generic:
|
|
continue
|
|
score = 1
|
|
if any(ch.isdigit() for ch in token):
|
|
score += 2
|
|
if "-" in token:
|
|
score += 1
|
|
if len(token) >= 6:
|
|
score += 1
|
|
scored.append((score, token))
|
|
if not scored:
|
|
return [token for token in tokens if token not in generic][:6]
|
|
scored.sort(key=lambda item: (-item[0], item[1]))
|
|
return [token for _, token in scored][:6]
|
|
|
|
|
|
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
|
|
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
|
|
if hardware:
|
|
return sorted([node for node in hardware.keys() if isinstance(node, str)])
|
|
return []
|
|
|
|
|
|
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
|
|
namespaces: list[str] = []
|
|
for entry in summary.get("namespace_pods") or []:
|
|
if isinstance(entry, dict):
|
|
name = entry.get("namespace")
|
|
if name:
|
|
namespaces.append(str(name))
|
|
return sorted(set(namespaces))
|
|
|
|
|
|
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\b(titan-[0-9a-z]+|node\d+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
|
|
if not reply or not allowed:
|
|
return False
|
|
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
|
|
if not paths:
|
|
return False
|
|
allowed_set = {p.lower() for p in allowed}
|
|
return any(path.lower() not in allowed_set for path in paths)
|
|
|
|
|
|
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
|
|
if not allowed or not question:
|
|
return False
|
|
lowered = question.lower()
|
|
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
|
|
if not any(cue in lowered for cue in cues):
|
|
return False
|
|
if not reply:
|
|
return True
|
|
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
|
|
if token.lower() in {p.lower() for p in allowed}:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
|
|
if not candidate or not allowed:
|
|
return None
|
|
best = None
|
|
best_score = 0.0
|
|
for path in allowed:
|
|
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
|
|
if score > best_score:
|
|
best_score = score
|
|
best = path
|
|
return best if best_score >= 0.4 else None
|
|
|
|
|
|
def _resolve_path(data: Any, path: str) -> Any | None:
|
|
cursor = data
|
|
for part in re.split(r"\.(?![^\[]*\])", path):
|
|
if not part:
|
|
continue
|
|
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
|
|
if not match:
|
|
return None
|
|
key = match.group(1)
|
|
index = match.group(2)
|
|
if isinstance(cursor, dict):
|
|
cursor = cursor.get(key)
|
|
else:
|
|
return None
|
|
if index is not None:
|
|
try:
|
|
idx = int(index)
|
|
if isinstance(cursor, list) and 0 <= idx < len(cursor):
|
|
cursor = cursor[idx]
|
|
else:
|
|
return None
|
|
except ValueError:
|
|
return None
|
|
return cursor
|
|
|
|
|
|
def _snapshot_id(summary: dict[str, Any]) -> str | None:
|
|
if not summary:
|
|
return None
|
|
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
|
|
value = summary.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for claim in claims:
|
|
evidence = []
|
|
for ev in claim.evidence:
|
|
evidence.append(
|
|
{
|
|
"path": ev.path,
|
|
"reason": ev.reason,
|
|
"value_at_claim": ev.value_at_claim,
|
|
}
|
|
)
|
|
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
|
|
return output
|
|
|
|
|
|
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
|
|
if not payload:
|
|
return None
|
|
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
|
|
claims: list[ClaimItem] = []
|
|
if isinstance(claims_raw, list):
|
|
for entry in claims_raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
claim_text = str(entry.get("claim") or "").strip()
|
|
claim_id = str(entry.get("id") or "").strip()
|
|
if not claim_text or not claim_id:
|
|
continue
|
|
evidence_items: list[EvidenceItem] = []
|
|
for ev in entry.get("evidence") or []:
|
|
if not isinstance(ev, dict):
|
|
continue
|
|
path = str(ev.get("path") or "").strip()
|
|
if not path:
|
|
continue
|
|
reason = str(ev.get("reason") or "").strip()
|
|
value_at_claim = ev.get("value_at_claim")
|
|
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
|
|
if evidence_items:
|
|
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
|
return ConversationState(
|
|
updated_at=float(payload.get("updated_at") or time.monotonic()),
|
|
claims=claims,
|
|
snapshot_id=payload.get("snapshot_id"),
|
|
snapshot=payload.get("snapshot"),
|
|
)
|
|
|
|
|
|
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
|
|
raw = json.dumps(summary, ensure_ascii=False)
|
|
return raw[:max_chars]
|