import asyncio import json import logging import math import re import time import difflib from dataclasses import dataclass from typing import Any, Callable from atlasbot.config import Settings from atlasbot.knowledge.loader import KnowledgeBase from atlasbot.llm.client import LLMClient, build_messages, parse_json from atlasbot.llm import prompts from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text from atlasbot.state.store import ClaimStore log = logging.getLogger(__name__) FOLLOWUP_SHORT_WORDS = 6 NS_ENTRY_MIN_LEN = 2 DEDUP_MIN_SENTENCES = 3 TOKEN_MIN_LEN = 3 RUNBOOK_SIMILARITY_THRESHOLD = 0.4 class LLMLimitReached(RuntimeError): pass @dataclass class AnswerScores: confidence: int relevance: int satisfaction: int hallucination_risk: str @dataclass class AnswerResult: reply: str scores: AnswerScores meta: dict[str, Any] @dataclass class EvidenceItem: path: str reason: str value: Any | None = None value_at_claim: Any | None = None @dataclass class ClaimItem: id: str claim: str evidence: list[EvidenceItem] @dataclass class ConversationState: updated_at: float claims: list[ClaimItem] snapshot_id: str | None = None snapshot: dict[str, Any] | None = None @dataclass class ModePlan: model: str fast_model: str max_subquestions: int chunk_lines: int chunk_top: int chunk_group: int use_tool: bool use_critic: bool use_gap: bool use_scores: bool drafts: int metric_retries: int subanswer_retries: int class AnswerEngine: def __init__( self, settings: Settings, llm: LLMClient, kb: KnowledgeBase, snapshot: SnapshotProvider, ) -> None: self._settings = settings self._llm = llm self._kb = kb self._snapshot = snapshot self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec) async def answer( # noqa: C901, PLR0912, PLR0913, PLR0915 self, question: str, *, mode: str, history: list[dict[str, str]] | None = None, observer: Callable[[str, str], None] | None = None, conversation_id: str | None = None, snapshot_pin: bool | None = None, ) -> AnswerResult: question = (question or "").strip() if not question: return AnswerResult("I need a question to answer.", _default_scores(), {"mode": mode}) if mode == "stock": return await self._answer_stock(question) limitless = "run limitless" in question.lower() if limitless: question = re.sub(r"(?i)run limitless", "", question).strip() plan = _mode_plan(self._settings, mode) call_limit = _llm_call_limit(self._settings, mode) call_cap = math.ceil(call_limit * self._settings.llm_limit_multiplier) call_count = 0 limit_hit = False debug_tags = { "route", "decompose", "chunk_score", "fact_select", "synth", "subanswer", "tool", "followup", "select_claims", "evidence_fix", } def _debug_log(name: str, payload: Any) -> None: if not self._settings.debug_pipeline: return log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}}) async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str: nonlocal call_count, limit_hit if not limitless and call_count >= call_cap: limit_hit = True raise LLMLimitReached("llm_limit") call_count += 1 messages = build_messages(system, prompt, context=context) response = await self._llm.chat(messages, model=model or plan.model) log.info( "atlasbot_llm_call", extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}}, ) if self._settings.debug_pipeline and tag in debug_tags: _debug_log(f"llm_raw_{tag}", str(response)[:1200]) return response state = self._get_state(conversation_id) pin_snapshot = bool(snapshot_pin) or self._settings.snapshot_pin_enabled snapshot = self._snapshot.get() snapshot_used = snapshot if pin_snapshot and state and state.snapshot: snapshot_used = state.snapshot summary = build_summary(snapshot_used) allowed_nodes = _allowed_nodes(summary) allowed_namespaces = _allowed_namespaces(summary) summary_lines = _summary_lines(snapshot_used) global_facts = _global_facts(summary_lines) kb_summary = self._kb.summary() runbooks = self._kb.runbook_titles(limit=6) runbook_paths = self._kb.runbook_paths(limit=10) history_ctx = _format_history(history) lexicon_ctx = _lexicon_context(summary) key_facts: list[str] = [] metric_facts: list[str] = [] facts_used: list[str] = [] started = time.monotonic() reply = "" scores = _default_scores() claims: list[ClaimItem] = [] classify: dict[str, Any] = {} tool_hint: dict[str, Any] | None = None try: if observer: observer("normalize", "normalizing") normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question normalize_raw = await call_llm( prompts.NORMALIZE_SYSTEM, normalize_prompt, context=lexicon_ctx, model=plan.fast_model, tag="normalize", ) normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []}) normalized = str(normalize.get("normalized") or question).strip() or question keywords = normalize.get("keywords") or [] _debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords}) keyword_tokens = _extract_keywords(question, normalized, sub_questions=[], keywords=keywords) if observer: observer("route", "routing") route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords) route_raw = await call_llm( prompts.ROUTE_SYSTEM, route_prompt, context=_join_context([kb_summary, lexicon_ctx]), model=plan.fast_model, tag="route", ) classify = _parse_json_block(route_raw, fallback={}) classify.setdefault("needs_snapshot", True) classify.setdefault("answer_style", "direct") classify.setdefault("follow_up", False) classify.setdefault("focus_entity", "unknown") classify.setdefault("focus_metric", "unknown") _debug_log("route_parsed", {"classify": classify, "normalized": normalized}) force_metric = bool(re.search(r"\bhow many\b|\bcount\b|\btotal\b", normalized.lower())) cluster_terms = ( "atlas", "cluster", "node", "nodes", "namespace", "pod", "workload", "k8s", "kubernetes", "postgres", "database", "db", "connections", "cpu", "ram", "memory", "network", "io", "disk", "pvc", "storage", ) if any(term in normalized.lower() for term in cluster_terms): classify["needs_snapshot"] = True lowered_norm = normalized.lower() if ( ("namespace" in lowered_norm and ("pod" in lowered_norm or "pods" in lowered_norm)) or re.search(r"\bmost\s+pods\b", lowered_norm) or re.search(r"\bpods\s+running\b", lowered_norm) ): classify["question_type"] = "metric" classify["needs_snapshot"] = True if re.search(r"\b(how many|count|number of|list)\b", normalized.lower()): classify["question_type"] = "metric" hottest_terms = ("hottest", "highest", "lowest", "most") metric_terms = ("cpu", "ram", "memory", "net", "network", "io", "disk", "load", "usage", "pod", "pods", "namespace") lowered_question = f"{question} {normalized}".lower() if any(term in lowered_question for term in hottest_terms) and any(term in lowered_question for term in metric_terms): classify["question_type"] = "metric" if not classify.get("follow_up") and state and state.claims: follow_terms = ("there", "that", "those", "these", "it", "them", "that one", "this", "former", "latter") if any(term in lowered_question for term in follow_terms) or len(normalized.split()) <= FOLLOWUP_SHORT_WORDS: classify["follow_up"] = True if classify.get("follow_up") and state and state.claims: if observer: observer("followup", "answering follow-up") reply = await self._answer_followup(question, state, summary, classify, plan, call_llm) scores = await self._score_answer(question, reply, plan, call_llm) meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started) return AnswerResult(reply, scores, meta) if observer: observer("decompose", "decomposing") decompose_prompt = prompts.DECOMPOSE_PROMPT.format(max_parts=plan.max_subquestions * 2) decompose_raw = await call_llm( prompts.DECOMPOSE_SYSTEM, decompose_prompt + "\nQuestion: " + normalized, context=lexicon_ctx, model=plan.fast_model if mode == "quick" else plan.model, tag="decompose", ) parts = _parse_json_list(decompose_raw) sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions) _debug_log("decompose_parsed", {"sub_questions": sub_questions}) keyword_tokens = _extract_keywords(question, normalized, sub_questions=sub_questions, keywords=keywords) focus_entity = str(classify.get("focus_entity") or "unknown").lower() focus_metric = str(classify.get("focus_metric") or "unknown").lower() lowered_q = f"{question} {normalized}".lower() if "node" in lowered_q: focus_entity = "node" snapshot_context = "" signal_tokens: list[str] = [] if classify.get("needs_snapshot"): if observer: observer("retrieve", "scoring chunks") chunks = _chunk_lines(summary_lines, plan.chunk_lines) scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan) selected = _select_chunks(chunks, scored, plan, keyword_tokens) fact_candidates = _collect_fact_candidates(selected, limit=plan.max_subquestions * 12) key_facts = await _select_fact_lines( call_llm, normalized, fact_candidates, plan, max_lines=max(4, plan.max_subquestions * 2), ) metric_facts: list[str] = [] if classify.get("question_type") in {"metric", "diagnostic"} or force_metric: if observer: observer("retrieve", "extracting fact types") fact_types = await _extract_fact_types( call_llm, normalized, keyword_tokens, plan, ) if observer: observer("retrieve", "deriving signals") signals = await _derive_signals( call_llm, normalized, fact_types, plan, ) if isinstance(signals, list): signal_tokens = [str(item) for item in signals if item] if observer: observer("retrieve", "scanning chunks") candidate_lines: list[str] = [] if signals: for chunk in selected: chunk_lines = chunk["text"].splitlines() if not chunk_lines: continue hits = await _scan_chunk_for_signals( call_llm, normalized, signals, chunk_lines, plan, ) if hits: candidate_lines.extend(hits) candidate_lines = list(dict.fromkeys(candidate_lines)) if candidate_lines: if observer: observer("retrieve", "pruning candidates") metric_facts = await _prune_metric_candidates( call_llm, normalized, candidate_lines, plan, plan.metric_retries, ) if metric_facts: key_facts = _merge_fact_lines(metric_facts, key_facts) if self._settings.debug_pipeline: _debug_log("metric_facts_selected", {"facts": metric_facts}) if not metric_facts: if observer: observer("retrieve", "fallback metric selection") fallback_candidates = _filter_lines_by_keywords( summary_lines, signal_tokens or keyword_tokens, max_lines=200, ) if fallback_candidates: metric_facts = await _select_fact_lines( call_llm, normalized, fallback_candidates, plan, max_lines=max(2, plan.max_subquestions), ) if not metric_facts and fallback_candidates: metric_facts = fallback_candidates[: max(2, plan.max_subquestions)] if metric_facts: key_facts = _merge_fact_lines(metric_facts, key_facts) if self._settings.debug_pipeline: scored_preview = sorted( [{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks], key=lambda item: item["score"], reverse=True, )[: min(len(chunks), max(plan.chunk_top, 6))] _debug_log( "chunk_selected", { "selected_ids": [item["id"] for item in selected], "top_scored": scored_preview, }, ) facts_used = list(dict.fromkeys(key_facts)) if key_facts else list(dict.fromkeys(metric_facts)) snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected]) combined_facts = key_facts if global_facts: combined_facts = _merge_fact_lines(global_facts, key_facts) if combined_facts: snapshot_context = "KeyFacts:\n" + "\n".join(combined_facts) + "\n\n" + snapshot_context context = _join_context( [kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""] ) if plan.use_tool and classify.get("needs_tool"): if observer: observer("tool", "suggesting tools") tool_prompt = prompts.TOOL_PROMPT + "\nQuestion: " + normalized tool_raw = await call_llm(prompts.TOOL_SYSTEM, tool_prompt, context=context, model=plan.fast_model, tag="tool") tool_hint = _parse_json_block(tool_raw, fallback={}) if observer: observer("subanswers", "drafting subanswers") subanswers: list[str] = [] for subq in sub_questions: sub_prompt = prompts.SUBANSWER_PROMPT + "\nQuestion: " + subq if plan.subanswer_retries > 1: candidates: list[str] = [] for _ in range(plan.subanswer_retries): candidate = await call_llm( prompts.ANSWER_SYSTEM, sub_prompt, context=context, model=plan.model, tag="subanswer", ) candidates.append(candidate) best_idx = await _select_best_candidate(call_llm, subq, candidates, plan, "subanswer_select") subanswers.append(candidates[best_idx]) else: sub_answer = await call_llm( prompts.ANSWER_SYSTEM, sub_prompt, context=context, model=plan.model, tag="subanswer", ) subanswers.append(sub_answer) if observer: observer("synthesize", "synthesizing") reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm) unknown_nodes = _find_unknown_nodes(reply, allowed_nodes) unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces) runbook_fix = _needs_runbook_fix(reply, runbook_paths) runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply) needs_evidence = _needs_evidence_fix(reply, classify) if classify.get("question_type") in {"open_ended", "planning"} and metric_facts: needs_evidence = True resolved_runbook = None if runbook_paths and (runbook_fix or runbook_needed): resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized resolver_raw = await call_llm( prompts.RUNBOOK_SELECT_SYSTEM, resolver_prompt, context="AllowedRunbooks:\n" + "\n".join(runbook_paths), model=plan.fast_model, tag="runbook_select", ) resolver = _parse_json_block(resolver_raw, fallback={}) candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None if candidate and candidate in runbook_paths: resolved_runbook = candidate if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed: if observer: observer("evidence_fix", "repairing missing evidence") extra_bits = [] if unknown_nodes: extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes))) if unknown_namespaces: extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces))) if runbook_paths: extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths)) if resolved_runbook: extra_bits.append("ResolvedRunbook: " + resolved_runbook) if metric_facts: extra_bits.append("MustUseFacts: " + "; ".join(metric_facts[:4])) if allowed_nodes: extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes)) if allowed_namespaces: extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces)) fix_prompt = ( prompts.EVIDENCE_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply + ("\n" + "\n".join(extra_bits) if extra_bits else "") ) reply = await call_llm( prompts.EVIDENCE_FIX_SYSTEM, fix_prompt, context=context, model=plan.model, tag="evidence_fix", ) if unknown_nodes or unknown_namespaces: refreshed_nodes = _find_unknown_nodes(reply, allowed_nodes) refreshed_namespaces = _find_unknown_namespaces(reply, allowed_namespaces) if refreshed_nodes or refreshed_namespaces: reply = _strip_unknown_entities(reply, refreshed_nodes, refreshed_namespaces) if runbook_paths and resolved_runbook and _needs_runbook_reference(normalized, runbook_paths, reply): if observer: observer("runbook_enforce", "enforcing runbook path") enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=resolved_runbook) reply = await call_llm( prompts.RUNBOOK_ENFORCE_SYSTEM, enforce_prompt + "\nAnswer: " + reply, context=context, model=plan.model, tag="runbook_enforce", ) if runbook_paths: invalid = [ token for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply) if token.lower() not in {p.lower() for p in runbook_paths} ] if invalid: if observer: observer("runbook_enforce", "replacing invalid runbook path") resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized resolver_raw = await call_llm( prompts.RUNBOOK_SELECT_SYSTEM, resolver_prompt, context="AllowedRunbooks:\n" + "\n".join(runbook_paths), model=plan.fast_model, tag="runbook_select", ) resolver = _parse_json_block(resolver_raw, fallback={}) candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None if not (candidate and candidate in runbook_paths): candidate = _best_runbook_match(invalid[0], runbook_paths) if candidate and candidate in runbook_paths: enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate) reply = await call_llm( prompts.RUNBOOK_ENFORCE_SYSTEM, enforce_prompt + "\nAnswer: " + reply, context=context, model=plan.model, tag="runbook_enforce", ) reply = _strip_unknown_entities(reply, unknown_nodes, unknown_namespaces) if facts_used and _needs_evidence_guard(reply, facts_used): if observer: observer("evidence_guard", "tightening unsupported claims") guard_prompt = ( prompts.EVIDENCE_GUARD_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply + "\nFactsUsed:\n" + "\n".join(facts_used) ) reply = await call_llm( prompts.EVIDENCE_GUARD_SYSTEM, guard_prompt, context=context, model=plan.model, tag="evidence_guard", ) if _needs_focus_fix(normalized, reply, classify): if observer: observer("focus_fix", "tightening answer") reply = await call_llm( prompts.EVIDENCE_FIX_SYSTEM, prompts.FOCUS_FIX_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply, context=context, model=plan.model, tag="focus_fix", ) if (classify.get("question_type") in {"metric", "diagnostic"} or force_metric) and metric_facts: best_line = None lowered_keywords = [kw.lower() for kw in keyword_tokens if kw] for line in metric_facts: line_lower = line.lower() if any(kw in line_lower for kw in lowered_keywords): best_line = line break best_line = best_line or metric_facts[0] reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply)) fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts))) if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)): reply = f"From the latest snapshot: {best_line}." if plan.use_critic: if observer: observer("critic", "reviewing") critic_prompt = prompts.CRITIC_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply critic_raw = await call_llm(prompts.CRITIC_SYSTEM, critic_prompt, context=context, model=plan.model, tag="critic") critic = _parse_json_block(critic_raw, fallback={}) if critic.get("issues"): revise_prompt = ( prompts.REVISION_PROMPT + "\nQuestion: " + normalized + "\nDraft: " + reply + "\nCritique: " + json.dumps(critic) ) reply = await call_llm(prompts.REVISION_SYSTEM, revise_prompt, context=context, model=plan.model, tag="revise") if plan.use_gap: if observer: observer("gap", "checking gaps") gap_prompt = prompts.EVIDENCE_GAP_PROMPT + "\nQuestion: " + normalized + "\nAnswer: " + reply gap_raw = await call_llm(prompts.GAP_SYSTEM, gap_prompt, context=context, model=plan.fast_model, tag="gap") gap = _parse_json_block(gap_raw, fallback={}) note = str(gap.get("note") or "").strip() if note: reply = f"{reply}\n\n{note}" reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup") scores = await self._score_answer(normalized, reply, plan, call_llm) claims = await self._extract_claims(normalized, reply, summary, facts_used, call_llm) except LLMLimitReached: if not reply: reply = "I started working on this but hit my reasoning limit. Ask again with 'Run limitless' for a deeper pass." scores = _default_scores() finally: elapsed = round(time.monotonic() - started, 2) log.info( "atlasbot_answer", extra={"extra": {"mode": mode, "seconds": elapsed, "llm_calls": call_count, "limit": call_cap, "limit_hit": limit_hit}}, ) if conversation_id and claims: self._store_state(conversation_id, claims, summary, snapshot_used, pin_snapshot) meta = _build_meta(mode, call_count, call_cap, limit_hit, classify, tool_hint, started) return AnswerResult(reply, scores, meta) async def _answer_stock(self, question: str) -> AnswerResult: messages = build_messages(prompts.STOCK_SYSTEM, question) reply = await self._llm.chat(messages, model=self._settings.ollama_model) return AnswerResult(reply, _default_scores(), {"mode": "stock"}) async def _synthesize_answer( # noqa: PLR0913 self, question: str, subanswers: list[str], context: str, classify: dict[str, Any], plan: ModePlan, call_llm: Callable[..., Any], ) -> str: style_hint = _style_hint(classify) if not subanswers: prompt = ( prompts.SYNTHESIZE_PROMPT + "\nQuestion: " + question + "\nStyle: " + style_hint + "\nQuestionType: " + (classify.get("question_type") or "unknown") ) return await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth") draft_prompts = [] for idx in range(plan.drafts): draft_prompts.append( prompts.SYNTHESIZE_PROMPT + "\nQuestion: " + question + "\nStyle: " + style_hint + "\nQuestionType: " + (classify.get("question_type") or "unknown") + "\nSubanswers:\n" + "\n".join([f"- {item}" for item in subanswers]) + f"\nDraftIndex: {idx + 1}" ) drafts: list[str] = [] for prompt in draft_prompts: drafts.append(await call_llm(prompts.SYNTHESIZE_SYSTEM, prompt, context=context, model=plan.model, tag="synth")) if len(drafts) == 1: return drafts[0] select_prompt = ( prompts.DRAFT_SELECT_PROMPT + "\nQuestion: " + question + "\nDrafts:\n" + "\n\n".join([f"Draft {idx + 1}: {text}" for idx, text in enumerate(drafts)]) ) select_raw = await call_llm(prompts.CRITIC_SYSTEM, select_prompt, context=context, model=plan.fast_model, tag="draft_select") selection = _parse_json_block(select_raw, fallback={}) idx = int(selection.get("best", 1)) - 1 if 0 <= idx < len(drafts): return drafts[idx] return drafts[0] async def _score_answer( self, question: str, reply: str, plan: ModePlan, call_llm: Callable[..., Any], ) -> AnswerScores: if not plan.use_scores: return _default_scores() prompt = prompts.SCORE_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply raw = await call_llm(prompts.SCORE_SYSTEM, prompt, model=plan.fast_model, tag="score") data = _parse_json_block(raw, fallback={}) return _scores_from_json(data) async def _extract_claims( self, question: str, reply: str, summary: dict[str, Any], facts_used: list[str], call_llm: Callable[..., Any], ) -> list[ClaimItem]: if not reply or not summary: return [] summary_json = _json_excerpt(summary) facts_used = [line.strip() for line in (facts_used or []) if line and line.strip()] facts_block = "" if facts_used: facts_block = "\nFactsUsed:\n" + "\n".join([f"- {line}" for line in facts_used[:12]]) prompt = prompts.CLAIM_MAP_PROMPT + "\nQuestion: " + question + "\nAnswer: " + reply + facts_block raw = await call_llm( prompts.CLAIM_SYSTEM, prompt, context=f"SnapshotSummaryJson:{summary_json}", model=self._settings.ollama_model_fast, tag="claim_map", ) data = _parse_json_block(raw, fallback={}) claims_raw = data.get("claims") if isinstance(data, dict) else None claims: list[ClaimItem] = [] if isinstance(claims_raw, list): for entry in claims_raw: if not isinstance(entry, dict): continue claim_text = str(entry.get("claim") or "").strip() claim_id = str(entry.get("id") or "").strip() or f"c{len(claims)+1}" evidence_items: list[EvidenceItem] = [] for ev in entry.get("evidence") or []: if not isinstance(ev, dict): continue path = str(ev.get("path") or "").strip() if not path: continue reason = str(ev.get("reason") or "").strip() value = _resolve_path(summary, path) evidence_items.append(EvidenceItem(path=path, reason=reason, value=value, value_at_claim=value)) if claim_text and evidence_items: claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items)) return claims async def _dedup_reply( self, reply: str, plan: ModePlan, call_llm: Callable[..., Any], tag: str, ) -> str: if not _needs_dedup(reply): return reply dedup_prompt = prompts.DEDUP_PROMPT + "\nDraft: " + reply return await call_llm(prompts.DEDUP_SYSTEM, dedup_prompt, model=plan.fast_model, tag=tag) async def _answer_followup( # noqa: C901, PLR0913 self, question: str, state: ConversationState, summary: dict[str, Any], classify: dict[str, Any], plan: ModePlan, call_llm: Callable[..., Any], ) -> str: claim_ids = await self._select_claims(question, state.claims, plan, call_llm) selected = [claim for claim in state.claims if claim.id in claim_ids] if claim_ids else state.claims[:2] evidence_lines = [] lowered = question.lower() for claim in selected: evidence_lines.append(f"Claim: {claim.claim}") for ev in claim.evidence: current = _resolve_path(summary, ev.path) ev.value = current delta_note = "" if ev.value_at_claim is not None and current is not None and current != ev.value_at_claim: delta_note = f" (now {current})" evidence_lines.append(f"- {ev.path}: {ev.value_at_claim}{delta_note}") if any(term in lowered for term in ("hotspot", "hot spot", "hottest", "jetson", "rpi", "amd64", "arm64", "hardware", "class")): hotspot_lines = _hotspot_evidence(summary) if hotspot_lines: evidence_lines.append("HotspotSummary:") evidence_lines.extend(hotspot_lines) evidence_ctx = "\n".join(evidence_lines) prompt = prompts.FOLLOWUP_PROMPT + "\nFollow-up: " + question + "\nEvidence:\n" + evidence_ctx reply = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.model, tag="followup") allowed_nodes = _allowed_nodes(summary) allowed_namespaces = _allowed_namespaces(summary) unknown_nodes = _find_unknown_nodes(reply, allowed_nodes) unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces) extra_bits = [] if unknown_nodes: extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes))) if unknown_namespaces: extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces))) if allowed_nodes: extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes)) if allowed_namespaces: extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces)) if extra_bits: fix_prompt = ( prompts.EVIDENCE_FIX_PROMPT + "\nQuestion: " + question + "\nDraft: " + reply + "\n" + "\n".join(extra_bits) ) reply = await call_llm( prompts.EVIDENCE_FIX_SYSTEM, fix_prompt, context="Evidence:\n" + evidence_ctx, model=plan.model, tag="followup_fix", ) reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup_followup") reply = _strip_followup_meta(reply) return reply async def _select_claims( self, question: str, claims: list[ClaimItem], plan: ModePlan, call_llm: Callable[..., Any], ) -> list[str]: if not claims: return [] claims_brief = [{"id": claim.id, "claim": claim.claim} for claim in claims] prompt = prompts.SELECT_CLAIMS_PROMPT + "\nFollow-up: " + question + "\nClaims: " + json.dumps(claims_brief) raw = await call_llm(prompts.FOLLOWUP_SYSTEM, prompt, model=plan.fast_model, tag="select_claims") data = _parse_json_block(raw, fallback={}) ids = data.get("claim_ids") if isinstance(data, dict) else [] if isinstance(ids, list): return [str(item) for item in ids if item] return [] def _get_state(self, conversation_id: str | None) -> ConversationState | None: if not conversation_id: return None state_payload = self._store.get(conversation_id) return _state_from_payload(state_payload) if state_payload else None def _store_state( self, conversation_id: str, claims: list[ClaimItem], summary: dict[str, Any], snapshot: dict[str, Any] | None, pin_snapshot: bool, ) -> None: snapshot_id = _snapshot_id(summary) pinned_snapshot = snapshot if pin_snapshot else None payload = { "updated_at": time.monotonic(), "claims": _claims_to_payload(claims), "snapshot_id": snapshot_id, "snapshot": pinned_snapshot, } self._store.set(conversation_id, payload) def _cleanup_state(self) -> None: self._store.cleanup() def _strip_followup_meta(reply: str) -> str: cleaned = reply.strip() if not cleaned: return cleaned prefixes = [ "The draft is correct based on the provided context.", "The draft is correct based on the context.", "The draft is correct based on the provided evidence.", "The draft is correct.", "Based on the provided context,", "Based on the context,", "Based on the provided evidence,", ] for prefix in prefixes: if cleaned.lower().startswith(prefix.lower()): cleaned = cleaned[len(prefix) :].lstrip(" .") break return cleaned def _build_meta( # noqa: PLR0913 mode: str, call_count: int, call_cap: int, limit_hit: bool, classify: dict[str, Any], tool_hint: dict[str, Any] | None, started: float, ) -> dict[str, Any]: return { "mode": mode, "llm_calls": call_count, "llm_limit": call_cap, "llm_limit_hit": limit_hit, "classify": classify, "tool_hint": tool_hint, "elapsed_sec": round(time.monotonic() - started, 2), } def _mode_plan(settings: Settings, mode: str) -> ModePlan: if mode == "genius": return ModePlan( model=settings.ollama_model_genius, fast_model=settings.ollama_model_fast, max_subquestions=6, chunk_lines=6, chunk_top=10, chunk_group=4, use_tool=True, use_critic=True, use_gap=True, use_scores=True, drafts=2, metric_retries=3, subanswer_retries=3, ) if mode == "smart": return ModePlan( model=settings.ollama_model_smart, fast_model=settings.ollama_model_fast, max_subquestions=4, chunk_lines=8, chunk_top=8, chunk_group=4, use_tool=True, use_critic=True, use_gap=True, use_scores=True, drafts=1, metric_retries=2, subanswer_retries=2, ) return ModePlan( model=settings.ollama_model_fast, fast_model=settings.ollama_model_fast, max_subquestions=2, chunk_lines=12, chunk_top=5, chunk_group=5, use_tool=False, use_critic=False, use_gap=False, use_scores=False, drafts=1, metric_retries=1, subanswer_retries=1, ) def _llm_call_limit(settings: Settings, mode: str) -> int: if mode == "genius": return settings.genius_llm_calls_max if mode == "smart": return settings.smart_llm_calls_max return settings.fast_llm_calls_max def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) -> list[str]: if not parts: return [fallback] ranked = [] for entry in parts: if not isinstance(entry, dict): continue question = str(entry.get("question") or "").strip() if not question: continue priority = entry.get("priority") try: weight = float(priority) except (TypeError, ValueError): weight = 1.0 ranked.append((weight, question)) ranked.sort(key=lambda item: item[0], reverse=True) questions = [item[1] for item in ranked][:limit] return questions or [fallback] def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]: chunks: list[dict[str, Any]] = [] if not lines: return chunks for idx in range(0, len(lines), lines_per_chunk): chunk_lines = lines[idx : idx + lines_per_chunk] text = "\n".join(chunk_lines) summary = " | ".join(chunk_lines[:4]) chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary}) return chunks async def _score_chunks( call_llm: Callable[..., Any], chunks: list[dict[str, Any]], question: str, sub_questions: list[str], plan: ModePlan, ) -> dict[str, float]: scores: dict[str, float] = {chunk["id"]: 0.0 for chunk in chunks} if not chunks: return scores group: list[dict[str, Any]] = [] for chunk in chunks: group.append({"id": chunk["id"], "summary": chunk["summary"]}) if len(group) >= plan.chunk_group: scores.update(await _score_chunk_group(call_llm, group, question, sub_questions)) group = [] if group: scores.update(await _score_chunk_group(call_llm, group, question, sub_questions)) return scores async def _score_chunk_group( call_llm: Callable[..., Any], group: list[dict[str, Any]], question: str, sub_questions: list[str], ) -> dict[str, float]: prompt = ( prompts.CHUNK_SCORE_PROMPT + "\nQuestion: " + question + "\nSubQuestions: " + json.dumps(sub_questions) + "\nChunks: " + json.dumps(group) ) raw = await call_llm(prompts.RETRIEVER_SYSTEM, prompt, model=None, tag="chunk_score") data = _parse_json_list(raw) scored: dict[str, float] = {} for entry in data: if not isinstance(entry, dict): continue cid = str(entry.get("id") or "").strip() if not cid: continue try: score = float(entry.get("score") or 0) except (TypeError, ValueError): score = 0.0 scored[cid] = score return scored def _select_chunks( chunks: list[dict[str, Any]], scores: dict[str, float], plan: ModePlan, keywords: list[str] | None = None, ) -> list[dict[str, Any]]: if not chunks: return [] ranked = sorted(chunks, key=lambda item: scores.get(item["id"], 0.0), reverse=True) selected: list[dict[str, Any]] = [] head = chunks[0] selected.append(head) for item in ranked: if len(selected) >= plan.chunk_top: break if item is head: continue selected.append(item) return selected def _format_runbooks(runbooks: list[str]) -> str: if not runbooks: return "" return "Relevant runbooks:\n" + "\n".join([f"- {item}" for item in runbooks]) def _join_context(parts: list[str]) -> str: text = "\n".join([part for part in parts if part]) return text.strip() def _format_history(history: list[dict[str, str]] | None) -> str: if not history: return "" lines = ["Recent conversation (non-authoritative):"] for entry in history[-4:]: if not isinstance(entry, dict): continue question = entry.get("q") answer = entry.get("a") role = entry.get("role") content = entry.get("content") if question: lines.append(f"Q: {question}") if answer: lines.append(f"A: {answer}") if role and content: prefix = "Q" if role == "user" else "A" lines.append(f"{prefix}: {content}") return "\n".join(lines) def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]: text = summary_text(snapshot) if not text: return [] return [line for line in text.splitlines() if line.strip()] def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]: seen = set() merged: list[str] = [] for line in primary + fallback: if line in seen: continue seen.add(line) merged.append(line) return merged def _expand_hottest_line(line: str) -> list[str]: if not line: return [] if not line.lower().startswith("hottest:"): return [] expanded: list[str] = [] payload = line.split("hottest:", 1)[1] for part in payload.split(";"): part = part.strip() if not part or "=" not in part: continue metric, rest = part.split("=", 1) metric = metric.strip() match = re.search(r"(?P[^\s\[]+).*\((?P[^)]+)\)", rest) if not match: continue node = match.group("node").strip() value = match.group("value").strip() class_match = re.search(r"\[(?P[^\]]+)\]", rest) node_class = class_match.group("class").strip() if class_match else "" if node_class: expanded.append(f"hottest_{metric}_node: {node} [{node_class}] ({value})") else: expanded.append(f"hottest_{metric}_node: {node} ({value})") return expanded def _has_token(text: str, token: str) -> bool: if not text or not token: return False if token == "io": return "i/o" in text or re.search(r"\bio\b", text) is not None return re.search(rf"\b{re.escape(token)}\b", text) is not None def _hotspot_evidence(summary: dict[str, Any]) -> list[str]: hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {} if not hottest: return [] hardware_by_node = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {} node_pods_top = summary.get("node_pods_top") if isinstance(summary.get("node_pods_top"), list) else [] ns_map = {} for item in node_pods_top: if not isinstance(item, dict): continue node = item.get("node") namespaces_top = item.get("namespaces_top") if isinstance(item.get("namespaces_top"), list) else [] ns_map[node] = namespaces_top lines: list[str] = [] for metric, info in hottest.items(): if not isinstance(info, dict): continue node = info.get("node") value = info.get("value") if not node: continue node_class = hardware_by_node.get(node) ns_parts = [] for entry in ns_map.get(node, [])[:3]: if isinstance(entry, (list, tuple)) and len(entry) >= NS_ENTRY_MIN_LEN: ns_parts.append(f"{entry[0]}={entry[1]}") ns_text = ", ".join(ns_parts) value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value) line = f"hotspot.{metric}: node={node} class={node_class or 'unknown'} value={value_text}" if ns_text: line += f" namespaces_top={ns_text}" lines.append(line) return lines async def _select_best_candidate( call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, tag: str, ) -> int: if len(candidates) <= 1: return 0 prompt = ( prompts.CANDIDATE_SELECT_PROMPT + "\nQuestion: " + question + "\nCandidates:\n" + "\n".join([f"{idx+1}) {cand}" for idx, cand in enumerate(candidates)]) ) raw = await call_llm(prompts.CANDIDATE_SELECT_SYSTEM, prompt, model=plan.model, tag=tag) data = _parse_json_block(raw, fallback={}) best = data.get("best") if isinstance(data, dict) else None if isinstance(best, int) and 1 <= best <= len(candidates): return best - 1 return 0 def _dedupe_lines(lines: list[str], limit: int | None = None) -> list[str]: seen: set[str] = set() cleaned: list[str] = [] for line in lines: value = (line or "").strip() if not value or value in seen: continue if value.lower().startswith("lexicon_") or value.lower().startswith("units:"): continue cleaned.append(value) seen.add(value) if limit and len(cleaned) >= limit: break return cleaned def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list[str]: lines: list[str] = [] for chunk in selected: text = chunk.get("text") if isinstance(chunk, dict) else None if not isinstance(text, str): continue lines.extend([line for line in text.splitlines() if line.strip()]) return _dedupe_lines(lines, limit=limit) async def _select_best_list( call_llm: Callable[..., Any], question: str, candidates: list[list[str]], plan: ModePlan, tag: str, ) -> list[str]: if not candidates: return [] if len(candidates) == 1: return candidates[0] render = ["; ".join(items) for items in candidates] best_idx = await _select_best_candidate(call_llm, question, render, plan, tag) chosen = candidates[best_idx] if 0 <= best_idx < len(candidates) else candidates[0] if not chosen: merged: list[str] = [] for entry in candidates: for item in entry: if item not in merged: merged.append(item) chosen = merged return chosen async def _extract_fact_types( call_llm: Callable[..., Any], question: str, keywords: list[str], plan: ModePlan, ) -> list[str]: prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question if keywords: prompt += "\nKeywords: " + ", ".join(keywords) candidates: list[list[str]] = [] attempts = max(plan.metric_retries, 1) for _ in range(attempts): raw = await call_llm(prompts.FACT_TYPES_SYSTEM, prompt, model=plan.fast_model, tag="fact_types") data = _parse_json_block(raw, fallback={}) items = data.get("fact_types") if isinstance(data, dict) else None if not isinstance(items, list): continue cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=10) if cleaned: candidates.append(cleaned) chosen = await _select_best_list(call_llm, question, candidates, plan, "fact_types_select") return chosen[:10] async def _derive_signals( call_llm: Callable[..., Any], question: str, fact_types: list[str], plan: ModePlan, ) -> list[str]: if not fact_types: return [] prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types)) candidates: list[list[str]] = [] attempts = max(plan.metric_retries, 1) for _ in range(attempts): raw = await call_llm(prompts.SIGNAL_SYSTEM, prompt, model=plan.fast_model, tag="signals") data = _parse_json_block(raw, fallback={}) items = data.get("signals") if isinstance(data, dict) else None if not isinstance(items, list): continue cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=12) if cleaned: candidates.append(cleaned) chosen = await _select_best_list(call_llm, question, candidates, plan, "signals_select") return chosen[:12] async def _scan_chunk_for_signals( call_llm: Callable[..., Any], question: str, signals: list[str], chunk_lines: list[str], plan: ModePlan, ) -> list[str]: if not signals or not chunk_lines: return [] prompt = prompts.CHUNK_SCAN_PROMPT.format( signals="; ".join(signals), lines="\n".join(chunk_lines), ) attempts = max(1, min(plan.metric_retries, 2)) candidates: list[list[str]] = [] for _ in range(attempts): raw = await call_llm(prompts.CHUNK_SCAN_SYSTEM, prompt, model=plan.fast_model, tag="chunk_scan") data = _parse_json_block(raw, fallback={}) items = data.get("lines") if isinstance(data, dict) else None if not isinstance(items, list): continue cleaned = [line for line in chunk_lines if line in items] cleaned = _dedupe_lines(cleaned, limit=15) if cleaned: candidates.append(cleaned) chosen = await _select_best_list(call_llm, question, candidates, plan, "chunk_scan_select") return chosen[:15] async def _prune_metric_candidates( call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, attempts: int, ) -> list[str]: if not candidates: return [] prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6) picks: list[list[str]] = [] for _ in range(max(attempts, 1)): raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_prune") data = _parse_json_block(raw, fallback={}) items = data.get("lines") if isinstance(data, dict) else None if not isinstance(items, list): continue cleaned = [line for line in candidates if line in items] cleaned = _dedupe_lines(cleaned, limit=6) if cleaned: picks.append(cleaned) chosen = await _select_best_list(call_llm, question, picks, plan, "fact_prune_select") return chosen[:6] async def _select_fact_lines( call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, max_lines: int, ) -> list[str]: if not candidates: return [] prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines) picks: list[list[str]] = [] attempts = max(plan.metric_retries, 1) for _ in range(attempts): raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_select") data = _parse_json_block(raw, fallback={}) items = data.get("lines") if isinstance(data, dict) else None if not isinstance(items, list): continue cleaned = [line for line in candidates if line in items] cleaned = _dedupe_lines(cleaned, limit=max_lines) if cleaned: picks.append(cleaned) chosen = await _select_best_list(call_llm, question, picks, plan, "fact_select_best") return chosen[:max_lines] def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str: if not reply: return reply if not unknown_nodes and not unknown_namespaces: return reply sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()] if not sentences: return reply lowered_nodes = [node.lower() for node in unknown_nodes] lowered_namespaces = [ns.lower() for ns in unknown_namespaces] kept: list[str] = [] for sent in sentences: lower = sent.lower() if lowered_nodes and any(node in lower for node in lowered_nodes): continue if lowered_namespaces and any(f"namespace {ns}" in lower for ns in lowered_namespaces): continue kept.append(sent) cleaned = " ".join(kept).strip() return cleaned or reply def _needs_evidence_guard(reply: str, facts: list[str]) -> bool: if not reply or not facts: return False lower_reply = reply.lower() fact_text = " ".join(facts).lower() node_pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE) nodes = {m.group(1).lower() for m in node_pattern.finditer(reply)} if nodes: missing = [node for node in nodes if node not in fact_text] if missing: return True pressure_terms = ("pressure", "diskpressure", "memorypressure", "pidpressure", "headroom") if any(term in lower_reply for term in pressure_terms) and not any(term in fact_text for term in pressure_terms): return True return False def _filter_lines_by_keywords(lines: list[str], keywords: list[str], max_lines: int) -> list[str]: if not lines: return [] tokens = [kw.lower() for kw in keywords if isinstance(kw, str) and kw.strip()] if not tokens: return lines[:max_lines] filtered = [line for line in lines if any(tok in line.lower() for tok in tokens)] return (filtered or lines)[:max_lines] def _global_facts(lines: list[str]) -> list[str]: if not lines: return [] wanted = ("nodes_total", "nodes_ready", "cluster_name", "cluster", "nodes_not_ready") facts: list[str] = [] for line in lines: lower = line.lower() if any(key in lower for key in wanted): facts.append(line) return _dedupe_lines(facts, limit=6) def _lexicon_context(summary: dict[str, Any]) -> str: # noqa: C901 if not isinstance(summary, dict): return "" lexicon = summary.get("lexicon") if not isinstance(lexicon, dict): return "" terms = lexicon.get("terms") aliases = lexicon.get("aliases") lines: list[str] = [] if isinstance(terms, list): for entry in terms[:8]: if not isinstance(entry, dict): continue term = entry.get("term") meaning = entry.get("meaning") if term and meaning: lines.append(f"{term}: {meaning}") if isinstance(aliases, dict): for key, value in list(aliases.items())[:6]: if key and value: lines.append(f"alias {key} -> {value}") if not lines: return "" return "Lexicon:\n" + "\n".join(lines) def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]: raw = text.strip() match = re.search(r"\{.*\}", raw, flags=re.S) if match: return parse_json(match.group(0), fallback=fallback) return parse_json(raw, fallback=fallback) def _parse_json_list(text: str) -> list[dict[str, Any]]: raw = text.strip() match = re.search(r"\[.*\]", raw, flags=re.S) data = parse_json(match.group(0), fallback={}) if match else parse_json(raw, fallback={}) if isinstance(data, list): return [entry for entry in data if isinstance(entry, dict)] return [] def _scores_from_json(data: dict[str, Any]) -> AnswerScores: return AnswerScores( confidence=_coerce_int(data.get("confidence"), 60), relevance=_coerce_int(data.get("relevance"), 60), satisfaction=_coerce_int(data.get("satisfaction"), 60), hallucination_risk=str(data.get("hallucination_risk") or "medium"), ) def _coerce_int(value: Any, default: int) -> int: try: return int(float(value)) except (TypeError, ValueError): return default def _default_scores() -> AnswerScores: return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium") def _style_hint(classify: dict[str, Any]) -> str: style = (classify.get("answer_style") or "").strip().lower() qtype = (classify.get("question_type") or "").strip().lower() if style == "insightful" or qtype in {"open_ended", "planning"}: return "insightful" return "direct" def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool: if not reply: return False lowered = reply.lower() missing_markers = ( "don't have", "do not have", "don't know", "cannot", "can't", "need to", "would need", "does not provide", "does not mention", "not mention", "not provided", "not in context", "not referenced", "missing", "no specific", "no information", ) if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers): return True if classify.get("question_type") in {"metric", "diagnostic"} and not re.search(r"\d", reply): return True return False def _needs_dedup(reply: str) -> bool: if not reply: return False sentences = [s.strip() for s in re.split(r"(?<=[.!?])\\s+", reply) if s.strip()] if len(sentences) < DEDUP_MIN_SENTENCES: return False seen = set() for sent in sentences: norm = re.sub(r"\\s+", " ", sent.lower()) if norm in seen: return True seen.add(norm) return False def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool: if not reply: return False q_lower = (question or "").lower() if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower): return False if reply.count(".") <= 1: return False extra_markers = ("for more", "if you need", "additional", "based on") return any(marker in reply.lower() for marker in extra_markers) def _extract_keywords( raw_question: str, normalized: str, sub_questions: list[str], keywords: list[Any] | None, ) -> list[str]: stopwords = { "the", "and", "for", "with", "that", "this", "what", "which", "when", "where", "who", "why", "how", "tell", "show", "list", "give", "about", "right", "now", } tokens: list[str] = [] for source in [raw_question, normalized, *sub_questions]: for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()): if len(part) < TOKEN_MIN_LEN or part in stopwords: continue tokens.append(part) if keywords: for kw in keywords: if isinstance(kw, str): part = kw.strip().lower() if part and part not in stopwords and part not in tokens: tokens.append(part) return list(dict.fromkeys(tokens))[:12] def _allowed_nodes(summary: dict[str, Any]) -> list[str]: hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {} if hardware: return sorted([node for node in hardware.keys() if isinstance(node, str)]) return [] def _allowed_namespaces(summary: dict[str, Any]) -> list[str]: namespaces: list[str] = [] for entry in summary.get("namespace_pods") or []: if isinstance(entry, dict): name = entry.get("namespace") if name: namespaces.append(str(name)) return sorted(set(namespaces)) def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]: if not reply or not allowed: return [] pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE) found = {m.group(1) for m in pattern.finditer(reply)} if not found: return [] allowed_set = {a.lower() for a in allowed} return sorted({item for item in found if item.lower() not in allowed_set}) def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]: if not reply or not allowed: return [] pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE) found = {m.group(1) for m in pattern.finditer(reply)} if not found: return [] allowed_set = {a.lower() for a in allowed} return sorted({item for item in found if item.lower() not in allowed_set}) def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool: if not reply or not allowed: return False paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply)) if not paths: return False allowed_set = {p.lower() for p in allowed} return any(path.lower() not in allowed_set for path in paths) def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool: if not allowed or not question: return False lowered = question.lower() cues = ("runbook", "checklist", "documented", "documentation", "where", "guide") if not any(cue in lowered for cue in cues): return False if not reply: return True for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply): if token.lower() in {p.lower() for p in allowed}: return False return True def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None: if not candidate or not allowed: return None best = None best_score = 0.0 for path in allowed: score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio() if score > best_score: best_score = score best = path return best if best_score >= RUNBOOK_SIMILARITY_THRESHOLD else None def _resolve_path(data: Any, path: str) -> Any | None: if path.startswith("line:"): return path.split("line:", 1)[1].strip() cursor = data for part in re.split(r"\.(?![^\[]*\])", path): if not part: continue match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part) if not match: return None key = match.group(1) index = match.group(2) if isinstance(cursor, dict): cursor = cursor.get(key) else: return None if index is not None: try: idx = int(index) if isinstance(cursor, list) and 0 <= idx < len(cursor): cursor = cursor[idx] else: return None except ValueError: return None return cursor def _snapshot_id(summary: dict[str, Any]) -> str | None: if not summary: return None for key in ("generated_at", "snapshot_ts", "snapshot_id"): value = summary.get(key) if isinstance(value, str) and value: return value return None def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]: output: list[dict[str, Any]] = [] for claim in claims: evidence = [] for ev in claim.evidence: evidence.append( { "path": ev.path, "reason": ev.reason, "value_at_claim": ev.value_at_claim, } ) output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence}) return output def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None: if not payload: return None claims_raw = payload.get("claims") if isinstance(payload, dict) else None claims: list[ClaimItem] = [] if isinstance(claims_raw, list): for entry in claims_raw: if not isinstance(entry, dict): continue claim_text = str(entry.get("claim") or "").strip() claim_id = str(entry.get("id") or "").strip() if not claim_text or not claim_id: continue evidence_items: list[EvidenceItem] = [] for ev in entry.get("evidence") or []: if not isinstance(ev, dict): continue path = str(ev.get("path") or "").strip() if not path: continue reason = str(ev.get("reason") or "").strip() value_at_claim = ev.get("value_at_claim") evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim)) if evidence_items: claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items)) return ConversationState( updated_at=float(payload.get("updated_at") or time.monotonic()), claims=claims, snapshot_id=payload.get("snapshot_id"), snapshot=payload.get("snapshot"), ) def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str: raw = json.dumps(summary, ensure_ascii=False) return raw[:max_chars]