atlasbot: emphasize key facts and validate runbooks

This commit is contained in:
Brad Stein 2026-02-01 04:31:19 -03:00
parent eea9003d69
commit 59f17403ab

View File

@ -4,6 +4,7 @@ import logging
import math import math
import re import re
import time import time
import difflib
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Callable from typing import Any, Callable
@ -254,6 +255,7 @@ class AnswerEngine:
chunks = _chunk_lines(summary_lines, plan.chunk_lines) chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan) scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan, keyword_tokens) selected = _select_chunks(chunks, scored, plan, keyword_tokens)
key_facts = _key_fact_lines(summary_lines, keyword_tokens)
if self._settings.debug_pipeline: if self._settings.debug_pipeline:
scored_preview = sorted( scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks], [{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
@ -268,6 +270,8 @@ class AnswerEngine:
}, },
) )
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected]) snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
if key_facts:
snapshot_context = "KeyFacts:\n" + "\n".join(key_facts) + "\n\n" + snapshot_context
context = _join_context( context = _join_context(
[kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""] [kb_summary, _format_runbooks(runbooks), snapshot_context, history_ctx if classify.get("follow_up") else ""]
@ -372,6 +376,8 @@ class AnswerEngine:
) )
resolver = _parse_json_block(resolver_raw, fallback={}) resolver = _parse_json_block(resolver_raw, fallback={})
candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None candidate = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if not (candidate and candidate in runbook_paths):
candidate = _best_runbook_match(invalid[0], runbook_paths)
if candidate and candidate in runbook_paths: if candidate and candidate in runbook_paths:
enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate) enforce_prompt = prompts.RUNBOOK_ENFORCE_PROMPT.format(path=candidate)
reply = await call_llm( reply = await call_llm(
@ -845,6 +851,22 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
return [line for line in text.splitlines() if line.strip()] return [line for line in text.splitlines() if line.strip()]
def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]:
if not lines or not keywords:
return []
lowered = [kw.lower() for kw in keywords if kw]
if not lowered:
return []
matches: list[str] = []
for line in lines:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered):
matches.append(line)
if len(matches) >= limit:
break
return matches
def _lexicon_context(summary: dict[str, Any]) -> str: def _lexicon_context(summary: dict[str, Any]) -> str:
if not isinstance(summary, dict): if not isinstance(summary, dict):
return "" return ""
@ -1086,6 +1108,19 @@ def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> b
return True return True
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
if not candidate or not allowed:
return None
best = None
best_score = 0.0
for path in allowed:
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
if score > best_score:
best_score = score
best = path
return best if best_score >= 0.4 else None
def _resolve_path(data: Any, path: str) -> Any | None: def _resolve_path(data: Any, path: str) -> Any | None:
cursor = data cursor = data
for part in re.split(r"\.(?![^\[]*\])", path): for part in re.split(r"\.(?![^\[]*\])", path):