atlasbot: refine keyword focus and runbook references

This commit is contained in:
Brad Stein 2026-02-01 01:38:19 -03:00
parent 89d6ba3a3d
commit 583507b3e5

View File

@ -295,7 +295,14 @@ class AnswerEngine:
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes) unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces) unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths) runbook_fix = _needs_runbook_fix(reply, runbook_paths)
if snapshot_context and (_needs_evidence_fix(reply, classify) or unknown_nodes or unknown_namespaces or runbook_fix): runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
if snapshot_context and (
_needs_evidence_fix(reply, classify)
or unknown_nodes
or unknown_namespaces
or runbook_fix
or runbook_needed
):
if observer: if observer:
observer("evidence_fix", "repairing missing evidence") observer("evidence_fix", "repairing missing evidence")
extra_bits = [] extra_bits = []
@ -708,8 +715,9 @@ def _select_chunks(
head = chunks[0] head = chunks[0]
selected.append(head) selected.append(head)
keyword_hits: list[dict[str, Any]] = [] keyword_hits: list[dict[str, Any]] = []
if keywords: focused = _focused_keywords(keywords or [])
lowered = [kw.lower() for kw in keywords if kw] if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked: for item in ranked:
text = item.get("text", "").lower() text = item.get("text", "").lower()
if any(kw in text for kw in lowered): if any(kw in text for kw in lowered):
@ -895,6 +903,41 @@ def _extract_keywords(normalized: str, sub_questions: list[str], keywords: list[
return list(dict.fromkeys(tokens))[:12] return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]: def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {} hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware: if hardware:
@ -944,6 +987,21 @@ def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
return any(path.lower() not in allowed_set for path in paths) return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _resolve_path(data: Any, path: str) -> Any | None: def _resolve_path(data: Any, path: str) -> Any | None:
cursor = data cursor = data
for part in re.split(r"\.(?![^\[]*\])", path): for part in re.split(r"\.(?![^\[]*\])", path):