atlasbot/atlasbot/engine/answerer.py

1553 lines
57 KiB
Python
Raw Normal View History

2026-01-28 11:46:52 -03:00
import asyncio
2026-01-30 16:41:04 -03:00
import json
2026-01-28 11:46:52 -03:00
import logging
import math
2026-01-28 11:46:52 -03:00
import re
import time
import difflib
2026-01-28 11:46:52 -03:00
from dataclasses import dataclass
from typing import Any, Callable
from atlasbot.config import Settings
from atlasbot.knowledge.loader import KnowledgeBase
from atlasbot.llm.client import LLMClient, build_messages, parse_json
from atlasbot.llm import prompts
2026-01-30 16:41:04 -03:00
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
from atlasbot.state.store import ClaimStore
2026-01-28 11:46:52 -03:00
log = logging.getLogger(__name__)
class LLMLimitReached(RuntimeError):
pass
2026-01-28 11:46:52 -03:00
@dataclass
class AnswerScores:
confidence: int
relevance: int
satisfaction: int
hallucination_risk: str
@dataclass
class AnswerResult:
reply: str
scores: AnswerScores
meta: dict[str, Any]
2026-01-30 16:41:04 -03:00
@dataclass
class EvidenceItem:
path: str
reason: str
value: Any | None = None
value_at_claim: Any | None = None
@dataclass
class ClaimItem:
id: str
claim: str
evidence: list[EvidenceItem]
@dataclass
class ConversationState:
updated_at: float
claims: list[ClaimItem]
snapshot_id: str | None = None
snapshot: dict[str, Any] | None = None
@dataclass
class ModePlan:
model: str
fast_model: str
max_subquestions: int
chunk_lines: int
chunk_top: int
chunk_group: int
use_tool: bool
use_critic: bool
use_gap: bool
use_scores: bool
drafts: int
2026-01-28 11:46:52 -03:00
class AnswerEngine:
def __init__(
self,
settings: Settings,
llm: LLMClient,
kb: KnowledgeBase,
snapshot: SnapshotProvider,
) -> None:
self._settings = settings
self._llm = llm
self._kb = kb
self._snapshot = snapshot
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
2026-01-28 11:46:52 -03:00
async def answer(
self,
question: str,
*,
mode: str,
history: list[dict[str, str]] | None = None,
2026-01-28 11:46:52 -03:00
observer: Callable[[str, str], None] | None = None,
2026-01-30 16:41:04 -03:00
conversation_id: str | None = None,
2026-01-28 11:46:52 -03:00
) -> AnswerResult:
question = (question or "").strip()
if not question:
return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode})
if mode == "stock":
return await self._answer_stock(question)
limitless = "run limitless" in question.lower()
if limitless:
question = re.sub(r"(?i)run limitless", "", question).strip()
plan = _mode_plan(self._settings, mode)
call_limit = _llm_call_limit(self._settings, mode)
call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier)
call_count = 0
limit_hit = False
debug_tags = {
"route",
"decompose",
"chunk_score",
2026-02-01 11:47:21 -03:00
"fact_select",
"synth",
"subanswer",
"tool",
"followup",
"select_claims",
2026-01-31 22:27:32 -03:00
"evidence_fix",
}
def _debug_log(name: str, payload: Any) -> None:
if not self._settings.debug_pipeline:
return
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
nonlocal call_count, limit_hit
if not limitless and call_count >= call_cap:
limit_hit = True
raise LLMLimitReached("llm_limit")
call_count += 1
messages = build_messages(system, prompt, context=context)
response = await self._llm.chat(messages, model=model or plan.model)
log.info(
"atlasbot_llm_call",
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
)
if self._settings.debug_pipeline and tag in debug_tags:
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
return response
2026-01-30 16:41:04 -03:00
state = self._get_state(conversation_id)
2026-01-28 11:46:52 -03:00
snapshot = self._snapshot.get()
2026-01-30 16:41:04 -03:00
snapshot_used = snapshot
if self._settings.snapshot_pin_enabled and state and state.snapshot:
snapshot_used = state.snapshot
summary = build_summary(snapshot_used)
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
2026-01-31 07:11:16 -03:00
summary_lines = _summary_lines(snapshot_used)
kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=6)
runbook_paths = self._kb.runbook_paths(limit=10)
history_ctx = _format_history(history)
lexicon_ctx = _lexicon_context(summary)
key_facts: list[str] = []
metric_facts: list[str] = []
2026-01-28 11:46:52 -03:00
started = time.monotonic()
reply = ""
scores = _default_scores()
claims: list[ClaimItem] = []
classify: dict[str, Any] = {}
tool_hint: dict[str, Any] | None = None
try:
if observer:
observer("normalize", "normalizing")
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
normalize_raw = await call_llm(
prompts.NORMALIZE_SYSTEM,
normalize_prompt,
context=lexicon_ctx,
model=plan.fast_model,
tag="normalize",
)
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
normalized = str(normalize.get("normalized") or question).strip() or question
keywords = normalize.get("keywords") or []
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords)
if observer:
observer("route", "routing")
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
route_raw = await call_llm(
prompts.ROUTE_SYSTEM,
route_prompt,
context=_join_context([kb_summary, lexicon_ctx]),
model=plan.fast_model,
tag="route",
)
classify = _parse_json_block(route_raw, fallback={})
classify.setdefault("needs_snapshot", True)
classify.setdefault("answer_style", "direct")
classify.setdefault("follow_up", False)
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
cluster_terms = (
"atlas",
"cluster",
"node",
"nodes",
"namespace",
"pod",
"workload",
"k8s",
"kubernetes",
"postgres",
"database",
"db",
"connections",
"cpu",
"ram",
"memory",
"network",
"io",
"disk",
"pvc",
"storage",
)
if any(term in normalized.lower() for term in cluster_terms):
classify["needs_snapshot"] = True
if re.search(r"\b(how many|count|number of|list)\b", normalized.lower()):
classify["question_type"] = "metric"
hottest_terms = ("hottest", "highest", "lowest", "most")
metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage")
lowered_question = normalized.lower()
if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms):
classify["question_type"] = "metric"
if classify.get("follow_up") and state and state.claims:
if observer:
observer("followup", "answering follow-up")
reply = await self._answer_followup(question, state, summary, classify, plan, call_llm)
scores = await self._score_answer(question, reply, plan, call_llm)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
return AnswerResult(reply, scores, meta)
2026-01-30 16:41:04 -03:00
if observer:
observer("decompose", "decomposing")
decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2)
decompose_raw = await call_llm(
prompts.DECOMPOSE_SYSTEM,
decompose_prompt + "\nQuestion: " + normalized,
context=lexicon_ctx,
model=plan.fast_model if mode == "quick" else plan.model,
tag="decompose",
)
parts = _parse_json_list(decompose_raw)
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords)
snapshot_context = ""
if classify.get("needs_snapshot"):
if observer:
observer("retrieve", "scoring chunks")
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
key_facts = _key_fact_lines(summary_lines, keyword_tokens)
hottest_facts = _extract_hottest_facts(summary_lines, f"{question} {normalized}")
hardware_facts = _extract_hardware_usage_facts(summary_lines, f"{question} {normalized}")
2026-02-01 05:47:12 -03:00
metric_facts = [line for line in key_facts if re.search(r"\d", line)]
if hardware_facts:
metric_facts = hardware_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if hottest_facts and not hardware_facts:
metric_facts = hottest_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if classify.get("question_type") in {"metric", "diagnostic"} and not hottest_facts and not hardware_facts:
2026-02-01 11:47:21 -03:00
metric_candidates = _metric_candidate_lines(summary_lines, keyword_tokens)
2026-02-01 12:00:42 -03:00
selected_facts = await _select_metric_facts(call_llm, normalized, metric_candidates, plan)
if selected_facts:
metric_facts = selected_facts
2026-02-01 11:47:21 -03:00
key_facts = _merge_fact_lines(metric_facts, key_facts)
2026-02-01 12:00:42 -03:00
if self._settings.debug_pipeline:
_debug_log("metric_facts_selected", {"facts": metric_facts})
if self._settings.debug_pipeline:
scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
key=lambda item: item["score"],
reverse=True,
)[: min(len(chunks), max(plan.chunk_top, 6))]
_debug_log(
"chunk_selected",
{
"selected_ids": [item["id"] for item in selected],
"top_scored": scored_preview,
},
)
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
if key_facts:
snapshot_context = "KeyFacts:\n" + "\n".join(key_facts) + "\n\n" + snapshot_context
context = _join_context(
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
)
if plan.use_tool and classify.get("needs_tool"):
if observer:
observer("tool", "suggesting tools")
tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized
tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool")
tool_hint = _parse_json_block(tool_raw, fallback={})
if observer:
observer("subanswers", "drafting subanswers")
subanswers: list[str] = []
for subq in sub_questions:
sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq
sub_answer = await call_llm(prompts.ANSWER_SYSTEM, sub_prompt, context=context, model=plan.model, tag="subanswer")
subanswers.append(sub_answer)
if observer:
observer("synthesize", "synthesizing")
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
needs_evidence = _needs_evidence_fix(reply, classify)
resolved_runbook = None
if runbook_paths and (runbook_fix or runbook_needed):
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if candidate and candidate in runbook_paths:
resolved_runbook = candidate
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
2026-01-31 22:27:32 -03:00
if observer:
observer("evidence_fix", "repairing missing evidence")
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if runbook_paths:
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
if resolved_runbook:
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
if metric_facts:
extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4]))
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
2026-01-31 22:27:32 -03:00
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
2026-01-31 22:27:32 -03:00
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context=context,
model=plan.model,
tag="evidence_fix",
)
if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply):
if observer:
observer("runbook_enforce", "enforcing runbook path")
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
if runbook_paths:
invalid = [
token
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)
if token.lower() not in {p.lower() for p in runbook_paths}
]
if invalid:
if observer:
observer("runbook_enforce", "replacing invalid runbook path")
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if not (candidate and candidate in runbook_paths):
candidate = _best_runbook_match(invalid[0], runbook_paths)
if candidate and candidate in runbook_paths:
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
reply = await call_llm(
prompts.RUNBOOK_ENFORCE_SYSTEM,
enforce_prompt + "\nAnswer: " + reply,
context=context,
model=plan.model,
tag="runbook_enforce",
)
2026-02-01 11:47:21 -03:00
reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces)
if _needs_focus_fix(normalized, reply, classify):
if observer:
observer("focus_fix", "tightening answer")
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply,
context=context,
model=plan.model,
tag="focus_fix",
)
2026-02-01 05:17:53 -03:00
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts:
best_line = None
lowered_keywords = [kw.lower() for kw in keyword_tokens if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
2026-02-01 05:17:53 -03:00
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
reply = f"From the latest snapshot: {best_line}."
2026-01-31 22:27:32 -03:00
if plan.use_critic:
if observer:
observer("critic", "reviewing")
critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic")
critic = _parse_json_block(critic_raw, fallback={})
if critic.get("issues"):
revise_prompt = (
prompts.REVISION_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ "\nCritique: "
+ json.dumps(critic)
)
reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise")
if plan.use_gap:
if observer:
observer("gap", "checking gaps")
gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply
gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap")
gap = _parse_json_block(gap_raw, fallback={})
note = str(gap.get("note") or "").strip()
if note:
reply = f"{reply}\n\n{note}"
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts:
reply = _metric_fact_guard(reply, metric_facts, keyword_tokens)
2026-02-01 11:18:06 -03:00
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
scores = await self._score_answer(normalized, reply, plan, call_llm)
claims = await self._extract_claims(normalized, reply, summary, call_llm)
except LLMLimitReached:
if not reply:
reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass."
scores = _default_scores()
finally:
elapsed = round(time.monotonic() - started, 2)
log.info(
"atlasbot_answer",
extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}},
)
2026-01-30 16:41:04 -03:00
if conversation_id and claims:
self._store_state(conversation_id, claims, summary, snapshot_used)
meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started)
2026-01-28 11:46:52 -03:00
return AnswerResult(reply, scores, meta)
async def _answer_stock(self, question: str) -> AnswerResult:
messages = build_messages(prompts.STOCK_SYSTEM, question)
reply = await self._llm.chat(messages, model=self._settings.ollama_model)
return AnswerResult(reply, _default_scores(), {"mode": "stock"})
async def _synthesize_answer(
2026-01-28 11:46:52 -03:00
self,
question: str,
subanswers: list[str],
2026-01-28 11:46:52 -03:00
context: str,
2026-01-29 20:53:28 -03:00
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> str:
style_hint = _style_hint(classify)
if not subanswers:
prompt = (
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
)
return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")
draft_prompts = []
for idx in range(plan.drafts):
draft_prompts.append(
prompts.SYNTHESIZE_PROMPT
+ "\nQuestion: "
+ question
+ "\nStyle: "
+ style_hint
+ "\nQuestionType: "
+ (classify.get("question_type") or "unknown")
+ "\nSubanswers:\n"
+ "\n".join([f"- {item}" for item in subanswers])
+ f"\nDraftIndex: {idx + 1}"
)
drafts: list[str] = []
for prompt in draft_prompts:
drafts.append(await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth"))
if len(drafts) == 1:
return drafts[0]
select_prompt = (
prompts.DRAFT_SELECT_PROMPT
2026-01-28 11:46:52 -03:00
+ "\nQuestion: "
+ question
+ "\nDrafts:\n"
+ "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)])
2026-01-28 11:46:52 -03:00
)
select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select")
selection = _parse_json_block(select_raw, fallback={})
idx = int(selection.get("best", 1)) - 1
if 0 <= idx < len(drafts):
return drafts[idx]
return drafts[0]
async def _score_answer(
self,
question: str,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
) -> AnswerScores:
if not plan.use_scores:
return _default_scores()
prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score")
data = _parse_json_block(raw, fallback={})
return _scores_from_json(data)
2026-01-28 11:46:52 -03:00
2026-01-30 16:41:04 -03:00
async def _extract_claims(
self,
question: str,
reply: str,
summary: dict[str, Any],
call_llm: Callable[..., Any],
2026-01-30 16:41:04 -03:00
) -> list[ClaimItem]:
if not reply or not summary:
return []
summary_json = _json_excerpt(summary)
prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply
raw = await call_llm(prompts.CLAIM_SYSTEM, prompt, context=f"SnapshotSummaryJson:{summary_json}", model=self._settings.ollama_model_fast, tag="claim_map")
2026-01-30 16:41:04 -03:00
data = _parse_json_block(raw, fallback={})
claims_raw = data.get("claims") if isinstance(data, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}"
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value = _resolve_path(summary, path)
evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value))
if claim_text and evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return claims
2026-02-01 11:18:06 -03:00
async def _dedup_reply(
self,
reply: str,
plan: ModePlan,
call_llm: Callable[..., Any],
tag: str,
) -> str:
if not _needs_dedup(reply):
return reply
dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply
return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag)
2026-01-30 16:41:04 -03:00
async def _answer_followup(
self,
question: str,
state: ConversationState,
summary: dict[str, Any],
classify: dict[str, Any],
plan: ModePlan,
call_llm: Callable[..., Any],
2026-01-30 16:41:04 -03:00
) -> str:
claim_ids = await self._select_claims(question, state.claims, plan, call_llm)
2026-01-30 16:41:04 -03:00
selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2]
evidence_lines = []
for claim in selected:
evidence_lines.append(f"Claim: {claim.claim}")
for ev in claim.evidence:
current = _resolve_path(summary, ev.path)
ev.value = current
delta_note = ""
if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim:
delta_note = f" (now {current})"
evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}")
2026-01-30 16:41:04 -03:00
evidence_ctx = "\n".join(evidence_lines)
prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx
2026-02-01 11:18:06 -03:00
reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup")
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
if extra_bits:
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ question
+ "\nDraft: "
+ reply
+ "\n"
+ "\n".join(extra_bits)
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
fix_prompt,
context="Evidence:\n" + evidence_ctx,
model=plan.model,
tag="followup_fix",
)
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup")
return reply
async def _select_claims(
self,
question: str,
claims: list[ClaimItem],
plan: ModePlan,
call_llm: Callable[..., Any],
) -> list[str]:
2026-01-30 16:41:04 -03:00
if not claims:
return []
claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims]
prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief)
raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims")
2026-01-30 16:41:04 -03:00
data = _parse_json_block(raw, fallback={})
ids = data.get("claim_ids") if isinstance(data, dict) else []
if isinstance(ids, list):
return [str(item) for item in ids if item]
return []
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
if not conversation_id:
return None
state_payload = self._store.get(conversation_id)
return _state_from_payload(state_payload) if state_payload else None
2026-01-30 16:41:04 -03:00
def _store_state(
self,
conversation_id: str,
claims: list[ClaimItem],
summary: dict[str, Any],
snapshot: dict[str, Any] | None,
) -> None:
snapshot_id = _snapshot_id(summary)
pinned_snapshot = snapshot if self._settings.snapshot_pin_enabled else None
payload = {
"updated_at": time.monotonic(),
"claims": _claims_to_payload(claims),
"snapshot_id": snapshot_id,
"snapshot": pinned_snapshot,
}
self._store.set(conversation_id, payload)
2026-01-30 16:41:04 -03:00
def _cleanup_state(self) -> None:
self._store.cleanup()
2026-01-30 16:41:04 -03:00
2026-01-28 11:46:52 -03:00
def _build_meta(
mode: str,
call_count: int,
call_cap: int,
limit_hit: bool,
classify: dict[str, Any],
tool_hint: dict[str, Any] | None,
started: float,
) -> dict[str, Any]:
return {
"mode": mode,
"llm_calls": call_count,
"llm_limit": call_cap,
"llm_limit_hit": limit_hit,
"classify": classify,
"tool_hint": tool_hint,
"elapsed_sec": round(time.monotonic() - started, 2),
}
def _mode_plan(settings: Settings, mode: str) -> ModePlan:
if mode == "genius":
return ModePlan(
model=settings.ollama_model_genius,
fast_model=settings.ollama_model_fast,
max_subquestions=6,
chunk_lines=6,
chunk_top=10,
chunk_group=4,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=2,
)
if mode == "smart":
return ModePlan(
model=settings.ollama_model_smart,
fast_model=settings.ollama_model_fast,
max_subquestions=4,
chunk_lines=8,
chunk_top=8,
chunk_group=4,
use_tool=True,
use_critic=True,
use_gap=True,
use_scores=True,
drafts=1,
)
return ModePlan(
model=settings.ollama_model_fast,
fast_model=settings.ollama_model_fast,
max_subquestions=2,
chunk_lines=12,
chunk_top=5,
chunk_group=5,
use_tool=False,
use_critic=False,
use_gap=False,
use_scores=False,
drafts=1,
)
def _llm_call_limit(settings: Settings, mode: str) -> int:
if mode == "genius":
return settings.genius_llm_calls_max
if mode == "smart":
return settings.smart_llm_calls_max
return settings.fast_llm_calls_max
def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]:
if not parts:
return [fallback]
ranked = []
for entry in parts:
if not isinstance(entry, dict):
continue
question = str(entry.get("question") or "").strip()
if not question:
continue
priority = entry.get("priority")
try:
weight = float(priority)
except (TypeError, ValueError):
weight = 1.0
ranked.append((weight, question))
ranked.sort(key=lambda item: item[0], reverse=True)
questions = [item[1] for item in ranked][:limit]
return questions or [fallback]
def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]:
chunks: list[dict[str, Any]] = []
if not lines:
return chunks
for idx in range(0, len(lines), lines_per_chunk):
chunk_lines = lines[idx : idx + lines_per_chunk]
text = "\n".join(chunk_lines)
summary = " | ".join(chunk_lines[:4])
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
return chunks
async def _score_chunks(
call_llm: Callable[..., Any],
chunks: list[dict[str, Any]],
question: str,
sub_questions: list[str],
plan: ModePlan,
) -> dict[str, float]:
scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks}
if not chunks:
return scores
group: list[dict[str, Any]] = []
for chunk in chunks:
group.append({"id": chunk["id"], "summary": chunk["summary"]})
if len(group) >= plan.chunk_group:
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
group = []
if group:
scores.update(await _score_chunk_group(call_llm, group, question, sub_questions))
return scores
async def _score_chunk_group(
call_llm: Callable[..., Any],
group: list[dict[str, Any]],
question: str,
sub_questions: list[str],
) -> dict[str, float]:
prompt = (
prompts.CHUNK_SCORE_PROMPT
+ "\nQuestion: "
+ question
+ "\nSubQuestions: "
+ json.dumps(sub_questions)
+ "\nChunks: "
+ json.dumps(group)
)
raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score")
data = _parse_json_list(raw)
scored: dict[str, float] = {}
for entry in data:
if not isinstance(entry, dict):
continue
cid = str(entry.get("id") or "").strip()
if not cid:
continue
try:
score = float(entry.get("score") or 0)
except (TypeError, ValueError):
score = 0.0
scored[cid] = score
return scored
def _select_chunks(
chunks: list[dict[str, Any]],
scores: dict[str, float],
plan: ModePlan,
keywords: list[str] | None = None,
) -> list[dict[str, Any]]:
if not chunks:
return []
ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True)
selected: list[dict[str, Any]] = []
head = chunks[0]
selected.append(head)
keyword_hits: list[dict[str, Any]] = []
raw_keywords = [kw.lower() for kw in (keywords or []) if kw]
focused = _focused_keywords(keywords or [])
if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked:
text = item.get("text", "").lower()
if any(kw in text for kw in lowered):
keyword_hits.append(item)
if raw_keywords:
for item in ranked:
if len(keyword_hits) >= plan.chunk_top:
break
text = item.get("text", "").lower()
if any(kw in text for kw in raw_keywords):
keyword_hits.append(item)
for item in keyword_hits:
if len(selected) >= plan.chunk_top:
break
if item in selected:
continue
selected.append(item)
for item in ranked:
if len(selected) >= plan.chunk_top:
break
if item is head:
continue
selected.append(item)
return selected
def _format_runbooks(runbooks: list[str]) -> str:
if not runbooks:
return ""
return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks])
2026-01-28 11:46:52 -03:00
def _join_context(parts: list[str]) -> str:
text = "\n".join([part for part in parts if part])
2026-01-28 11:46:52 -03:00
return text.strip()
def _format_history(history: list[dict[str, str]] | None) -> str:
if not history:
return ""
lines = ["Recent conversation (non-authoritative):"]
for entry in history[-4:]:
2026-01-30 16:41:04 -03:00
if not isinstance(entry, dict):
continue
question = entry.get("q")
answer = entry.get("a")
role = entry.get("role")
content = entry.get("content")
if question:
lines.append(f"Q: {question}")
if answer:
lines.append(f"A: {answer}")
2026-01-30 16:41:04 -03:00
if role and content:
prefix = "Q" if role == "user" else "A"
lines.append(f"{prefix}: {content}")
return "\n".join(lines)
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
text = summary_text(snapshot)
if not text:
return []
return [line for line in text.splitlines() if line.strip()]
def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]:
if not lines or not keywords:
return []
lowered = [kw.lower() for kw in keywords if kw]
if not lowered:
return []
2026-02-01 05:22:59 -03:00
focused = _focused_keywords(lowered)
primary = focused or lowered
matches: list[str] = []
for line in lines:
line_lower = line.lower()
2026-02-01 05:22:59 -03:00
if any(kw in line_lower for kw in primary):
matches.append(line)
if len(matches) >= limit:
break
2026-02-01 05:22:59 -03:00
if len(matches) < limit and focused:
for line in lines:
if len(matches) >= limit:
break
if line in matches:
continue
line_lower = line.lower()
if any(kw in line_lower for kw in lowered):
matches.append(line)
return matches
2026-02-01 11:47:21 -03:00
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
seen = set()
merged: list[str] = []
for line in primary + fallback:
if line in seen:
continue
seen.add(line)
merged.append(line)
return merged
2026-02-01 14:06:42 -03:00
def _expand_hottest_line(line: str) -> list[str]:
if not line:
return []
if not line.lower().startswith("hottest:"):
return []
expanded: list[str] = []
payload = line.split("hottest:", 1)[1]
for part in payload.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
metric, rest = part.split("=", 1)
metric = metric.strip()
2026-02-01 14:45:10 -03:00
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
2026-02-01 14:06:42 -03:00
if not match:
continue
node = match.group("node").strip()
value = match.group("value").strip()
expanded.append(f"hottest_{metric}_node: {node} ({value})")
return expanded
def _extract_hottest_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if "hardware" in lowered or "class" in lowered:
return []
if not any(
term in lowered
for term in ("hottest", "hot", "highest", "lowest", "most", "top", "peak", "loaded", "load", "busy")
):
return []
if "node" not in lowered and "nodes" not in lowered:
return []
line = next((item for item in lines if item.lower().startswith("hottest:")), "")
if not line:
return []
facts = _expand_hottest_line(line)
if not facts:
return []
wanted = []
if any(term in lowered for term in ("cpu", "processor")):
wanted.append("hottest_cpu_node")
if any(term in lowered for term in ("ram", "memory")):
wanted.append("hottest_ram_node")
if any(term in lowered for term in ("net", "network", "throughput")):
wanted.append("hottest_net_node")
if "io" in lowered or "i/o" in lowered:
wanted.append("hottest_io_node")
if "disk" in lowered or "storage" in lowered:
wanted.append("hottest_disk_node")
if not wanted:
return facts
return [fact for fact in facts if any(label in fact for label in wanted)] or facts
def _extract_hardware_usage_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if "hardware" not in lowered:
return []
if not any(term in lowered for term in ("average", "avg", "mean", "load", "cpu", "ram", "memory")):
return []
for line in lines:
if line.startswith("hardware_usage_avg:"):
return [line]
return []
2026-02-01 11:47:21 -03:00
def _metric_candidate_lines(lines: list[str], keywords: list[str] | None, limit: int = 40) -> list[str]:
if not lines:
return []
lowered = [kw.lower() for kw in (keywords or []) if kw]
2026-02-01 12:34:57 -03:00
prefer_node = any("node" in kw for kw in lowered) or "hottest" in lowered
2026-02-01 11:54:27 -03:00
metric_tokens = {
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"load",
"usage",
"utilization",
"hottest",
"p95",
"percent",
"pressure",
}
2026-02-01 11:47:21 -03:00
candidates: list[str] = []
2026-02-01 14:06:42 -03:00
for line in lines:
if line.lower().startswith("hottest:"):
candidates.extend(_expand_hottest_line(line))
break
2026-02-01 11:47:21 -03:00
for line in lines:
line_lower = line.lower()
if line_lower.startswith("lexicon_") or line_lower.startswith("units:"):
continue
2026-02-01 12:34:57 -03:00
if prefer_node and "pod_" in line_lower:
continue
2026-02-01 11:54:27 -03:00
if "hottest:" in line_lower:
candidates.append(line)
continue
2026-02-01 11:47:21 -03:00
if lowered and any(kw in line_lower for kw in lowered):
candidates.append(line)
2026-02-01 11:54:27 -03:00
continue
if any(token in line_lower for token in metric_tokens) and re.search(r"\d", line_lower):
2026-02-01 11:47:21 -03:00
candidates.append(line)
2026-02-01 11:54:27 -03:00
continue
return candidates[:limit]
2026-02-01 11:47:21 -03:00
async def _select_metric_facts(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
max_lines: int = 2,
) -> list[str]:
if not candidates:
return []
prompt = (
prompts.FACT_SELECT_PROMPT.format(max_lines=max_lines)
+ "\nQuestion: "
+ question
+ "\nCandidates:\n"
+ "\n".join([f"- {line}" for line in candidates])
)
raw = await call_llm(prompts.FACT_SELECT_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
data = _parse_json_block(raw, fallback={})
lines = data.get("lines") if isinstance(data, dict) else None
if not isinstance(lines, list):
return []
cleaned = []
allowed = set(candidates)
for line in lines:
if isinstance(line, str) and line in allowed and line not in cleaned:
cleaned.append(line)
if len(cleaned) >= max_lines:
break
return cleaned
def _metric_fact_guard(reply: str, metric_facts: list[str], keywords: list[str]) -> str:
if not metric_facts:
return reply
best_line = None
lowered_keywords = [kw.lower() for kw in keywords if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
return f"From the latest snapshot: {best_line}."
return reply
2026-02-01 11:47:21 -03:00
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
if not reply:
return reply
if not unknown_nodes and not unknown_namespaces:
return reply
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if not sentences:
return reply
lowered_nodes = [node.lower() for node in unknown_nodes]
lowered_namespaces = [ns.lower() for ns in unknown_namespaces]
kept: list[str] = []
for sent in sentences:
lower = sent.lower()
if lowered_nodes and any(node in lower for node in lowered_nodes):
continue
if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces):
continue
kept.append(sent)
cleaned = " ".join(kept).strip()
return cleaned or reply
def _lexicon_context(summary: dict[str, Any]) -> str:
if not isinstance(summary, dict):
return ""
lexicon = summary.get("lexicon")
if not isinstance(lexicon, dict):
return ""
terms = lexicon.get("terms")
aliases = lexicon.get("aliases")
lines: list[str] = []
if isinstance(terms, list):
for entry in terms[:8]:
if not isinstance(entry, dict):
continue
term = entry.get("term")
meaning = entry.get("meaning")
if term and meaning:
lines.append(f"{term}: {meaning}")
if isinstance(aliases, dict):
for key, value in list(aliases.items())[:6]:
if key and value:
lines.append(f"alias {key} -> {value}")
if not lines:
return ""
return "Lexicon:\n" + "\n".join(lines)
2026-01-28 11:46:52 -03:00
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
raw = text.strip()
match = re.search(r"\{.*\}", raw, flags=re.S)
if match:
return parse_json(match.group(0), fallback=fallback)
return parse_json(raw, fallback=fallback)
def _parse_json_list(text: str) -> list[dict[str, Any]]:
raw = text.strip()
match = re.search(r"\[.*\]", raw, flags=re.S)
data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={})
if isinstance(data, list):
return [entry for entry in data if isinstance(entry, dict)]
return []
def _scores_from_json(data: dict[str, Any]) -> AnswerScores:
return AnswerScores(
confidence=_coerce_int(data.get("confidence"), 60),
relevance=_coerce_int(data.get("relevance"), 60),
satisfaction=_coerce_int(data.get("satisfaction"), 60),
hallucination_risk=str(data.get("hallucination_risk") or "medium"),
)
def _coerce_int(value: Any, default: int) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def _default_scores() -> AnswerScores:
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
2026-01-30 16:41:04 -03:00
def _style_hint(classify: dict[str, Any]) -> str:
style = (classify.get("answer_style") or "").strip().lower()
qtype = (classify.get("question_type") or "").strip().lower()
if style == "insightful" or qtype in {"open_ended", "planning"}:
return "insightful"
return "direct"
2026-01-31 22:27:32 -03:00
def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
lowered = reply.lower()
missing_markers = (
"don't have",
"do not have",
"don't know",
"cannot",
"can't",
"need to",
"would need",
"does not provide",
"does not mention",
"not mention",
2026-01-31 22:27:32 -03:00
"not provided",
"not in context",
"not referenced",
2026-01-31 22:27:32 -03:00
"missing",
"no specific",
"no information",
2026-01-31 22:27:32 -03:00
)
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
return True
if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply):
return True
return False
2026-02-01 11:18:06 -03:00
def _needs_dedup(reply: str) -> bool:
if not reply:
return False
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()]
if len(sentences) < 3:
return False
seen = set()
for sent in sentences:
norm = re.sub(r"\\s+", " ", sent.lower())
if norm in seen:
return True
seen.add(norm)
return False
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
if not reply:
return False
q_lower = (question or "").lower()
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
return False
if reply.count(".") <= 1:
return False
extra_markers = ("for more", "if you need", "additional", "based on")
return any(marker in reply.lower() for marker in extra_markers)
def _extract_keywords(
raw_question: str,
normalized: str,
sub_questions: list[str],
keywords: list[Any] | None,
) -> list[str]:
stopwords = {
"the",
"and",
"for",
"with",
"that",
"this",
"what",
"which",
"when",
"where",
"who",
"why",
"how",
"tell",
"show",
"list",
"give",
"about",
"right",
"now",
}
tokens: list[str] = []
for source in [raw_question, normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < 3 or part in stopwords:
continue
tokens.append(part)
if keywords:
for kw in keywords:
if isinstance(kw, str):
part = kw.strip().lower()
if part and part not in stopwords and part not in tokens:
tokens.append(part)
return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:
return sorted([node for node in hardware.keys() if isinstance(node, str)])
return []
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
namespaces: list[str] = []
for entry in summary.get("namespace_pods") or []:
if isinstance(entry, dict):
name = entry.get("namespace")
if name:
namespaces.append(str(name))
return sorted(set(namespaces))
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\b(titan-[0-9a-z]+|node\d+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
if not reply or not allowed:
return False
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
if not paths:
return False
allowed_set = {p.lower() for p in allowed}
return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
if not candidate or not allowed:
return None
best = None
best_score = 0.0
for path in allowed:
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
if score > best_score:
best_score = score
best = path
return best if best_score >= 0.4 else None
2026-01-30 16:41:04 -03:00
def _resolve_path(data: Any, path: str) -> Any | None:
cursor = data
for part in re.split(r"\.(?![^\[]*\])", path):
2026-01-30 16:41:04 -03:00
if not part:
continue
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
2026-01-30 16:41:04 -03:00
if not match:
return None
key = match.group(1)
index = match.group(2)
if isinstance(cursor, dict):
cursor = cursor.get(key)
else:
return None
if index is not None:
try:
idx = int(index)
if isinstance(cursor, list) and 0 <= idx < len(cursor):
cursor = cursor[idx]
else:
return None
except ValueError:
return None
return cursor
def _snapshot_id(summary: dict[str, Any]) -> str | None:
if not summary:
return None
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
value = summary.get(key)
if isinstance(value, str) and value:
return value
return None
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for claim in claims:
evidence = []
for ev in claim.evidence:
evidence.append(
{
"path": ev.path,
"reason": ev.reason,
"value_at_claim": ev.value_at_claim,
}
)
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
return output
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
if not payload:
return None
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
claims: list[ClaimItem] = []
if isinstance(claims_raw, list):
for entry in claims_raw:
if not isinstance(entry, dict):
continue
claim_text = str(entry.get("claim") or "").strip()
claim_id = str(entry.get("id") or "").strip()
if not claim_text or not claim_id:
continue
evidence_items: list[EvidenceItem] = []
for ev in entry.get("evidence") or []:
if not isinstance(ev, dict):
continue
path = str(ev.get("path") or "").strip()
if not path:
continue
reason = str(ev.get("reason") or "").strip()
value_at_claim = ev.get("value_at_claim")
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
if evidence_items:
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
return ConversationState(
updated_at=float(payload.get("updated_at") or time.monotonic()),
claims=claims,
snapshot_id=payload.get("snapshot_id"),
snapshot=payload.get("snapshot"),
)
2026-01-30 16:41:04 -03:00
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
raw = json.dumps(summary, ensure_ascii=False)
return raw[:max_chars]