Compare commits

..

6 Commits

4 changed files with 293 additions and 4 deletions

View File

@ -151,9 +151,12 @@ class AnswerEngine:
if self._settings.snapshot_pin_enabled and state and state.snapshot:
snapshot_used = state.snapshot
summary = build_summary(snapshot_used)
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used)
kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=6)
runbook_paths = self._kb.runbook_paths(limit=10)
history_ctx = _format_history(history)
lexicon_ctx = _lexicon_context(summary)
@ -178,6 +181,7 @@ class AnswerEngine:
normalized = str(normalize.get("normalized") or question).strip() or question
keywords = normalize.get("keywords") or []
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
keyword_tokens = _extract_keywords(normalized, sub_questions=[], keywords=keywords)
if observer:
observer("route", "routing")
@ -204,6 +208,18 @@ class AnswerEngine:
"workload",
"k8s",
"kubernetes",
"postgres",
"database",
"db",
"connections",
"cpu",
"ram",
"memory",
"network",
"io",
"disk",
"pvc",
"storage",
)
if any(term in normalized.lower() for term in cluster_terms):
classify["needs_snapshot"] = True
@ -229,6 +245,7 @@ class AnswerEngine:
parts = _parse_json_list(decompose_raw)
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
keyword_tokens = _extract_keywords(normalized, sub_questions=sub_questions, keywords=keywords)
snapshot_context = ""
if classify.get("needs_snapshot"):
@ -236,7 +253,7 @@ class AnswerEngine:
observer("retrieve", "scoring chunks")
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan)
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
if self._settings.debug_pipeline:
scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
@ -275,15 +292,46 @@ class AnswerEngine:
observer("synthesize", "synthesizing")
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
if snapshot_context and _needs_evidence_fix(reply, classify):
unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
needs_evidence = _needs_evidence_fix(reply, classify)
resolved_runbook = None
if runbook_paths and (runbook_fix or runbook_needed):
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
resolved_runbook = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
if observer:
observer("evidence_fix", "repairing missing evidence")
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if runbook_paths:
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
if resolved_runbook:
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: "
+ normalized
+ "\nDraft: "
+ reply
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
)
reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM,
@ -667,6 +715,7 @@ def _select_chunks(
chunks: list[dict[str, Any]],
scores: dict[str, float],
plan: ModePlan,
keywords: list[str] | None = None,
) -> list[dict[str, Any]]:
if not chunks:
return []
@ -674,6 +723,20 @@ def _select_chunks(
selected: list[dict[str, Any]] = []
head = chunks[0]
selected.append(head)
keyword_hits: list[dict[str, Any]] = []
focused = _focused_keywords(keywords or [])
if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked:
text = item.get("text", "").lower()
if any(kw in text for kw in lowered):
keyword_hits.append(item)
for item in keyword_hits:
if len(selected) >= plan.chunk_top:
break
if item in selected:
continue
selected.append(item)
for item in ranked:
if len(selected) >= plan.chunk_top:
break
@ -797,9 +860,15 @@ def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
"can't",
"need to",
"would need",
"does not provide",
"does not mention",
"not mention",
"not provided",
"not in context",
"not referenced",
"missing",
"no specific",
"no information",
)
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
return True
@ -808,6 +877,143 @@ def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
return False
def _extract_keywords(normalized: str, sub_questions: list[str], keywords: list[Any] | None) -> list[str]:
stopwords = {
"the",
"and",
"for",
"with",
"that",
"this",
"what",
"which",
"when",
"where",
"who",
"why",
"how",
"tell",
"show",
"list",
"give",
"about",
"right",
"now",
}
tokens: list[str] = []
for source in [normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < 3 or part in stopwords:
continue
tokens.append(part)
if keywords:
for kw in keywords:
if isinstance(kw, str):
part = kw.strip().lower()
if part and part not in stopwords and part not in tokens:
tokens.append(part)
return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:
return sorted([node for node in hardware.keys() if isinstance(node, str)])
return []
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
namespaces: list[str] = []
for entry in summary.get("namespace_pods") or []:
if isinstance(entry, dict):
name = entry.get("namespace")
if name:
namespaces.append(str(name))
return sorted(set(namespaces))
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\b(titan-[0-9a-z]+|node\d+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
if not reply or not allowed:
return False
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
if not paths:
return False
allowed_set = {p.lower() for p in allowed}
return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _resolve_path(data: Any, path: str) -> Any | None:
cursor = data
for part in re.split(r"\.(?![^\[]*\])", path):

View File

@ -58,3 +58,16 @@ class KnowledgeBase:
if not titles:
return ""
return "Relevant runbooks:\n" + "\n".join(titles[:limit])
def runbook_paths(self, *, limit: int = 10) -> list[str]:
self.load()
if not self._runbooks:
return []
paths: list[str] = []
for entry in self._runbooks:
if not isinstance(entry, dict):
continue
path = entry.get("path")
if path:
paths.append(str(path))
return paths[:limit]

View File

@ -99,7 +99,20 @@ EVIDENCE_FIX_PROMPT = (
"Check the draft against the context. "
"If the draft says data is missing but the context includes relevant values, "
"rewrite the answer to include those values. "
"If data is truly missing, keep the draft concise and honest."
"If data is truly missing, keep the draft concise and honest. "
"If AllowedRunbooks are provided, use an exact path from that list when answering "
"documentation or checklist questions and do not invent new paths."
)
RUNBOOK_SELECT_SYSTEM = (
CLUSTER_SYSTEM
+ " Select the single best runbook path from the allowed list. "
+ "Return JSON only."
)
RUNBOOK_SELECT_PROMPT = (
"Pick the best runbook path for the question from the AllowedRunbooks list. "
"Return JSON with field: path. If none apply, return {\"path\": \"\"}."
)
DRAFT_SELECT_PROMPT = (

View File

@ -104,6 +104,7 @@ def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]:
summary.update(_build_workloads(snapshot))
summary.update(_build_flux(snapshot))
_merge_cluster_summary(snapshot, summary)
_augment_lexicon(summary)
return summary
@ -131,6 +132,37 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) ->
summary["cross_stats"] = cross_stats
def _augment_lexicon(summary: dict[str, Any]) -> None:
lexicon = summary.get("lexicon")
if not isinstance(lexicon, dict):
lexicon = {"terms": [], "aliases": {}}
terms = list(lexicon.get("terms") or [])
aliases = dict(lexicon.get("aliases") or {})
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
hardware_map = {
"rpi5": "Raspberry Pi 5 nodes",
"rpi4": "Raspberry Pi 4 nodes",
"rpi": "Raspberry Pi nodes",
"jetson": "NVIDIA Jetson nodes",
"amd64": "AMD64 nodes",
}
existing_terms = {entry.get("term") for entry in terms if isinstance(entry, dict)}
for key, meaning in hardware_map.items():
if key not in hardware:
continue
if key not in existing_terms:
terms.append({"term": key, "meaning": meaning})
if key not in aliases:
aliases[key] = meaning
if "raspberry pi 5" not in aliases and "rpi5" in hardware:
aliases["raspberry pi 5"] = "rpi5"
if "raspberry pi 4" not in aliases and "rpi4" in hardware:
aliases["raspberry pi 4"] = "rpi4"
lexicon["terms"] = terms
lexicon["aliases"] = aliases
summary["lexicon"] = lexicon
def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
items = snapshot.get("nodes_detail")
return items if isinstance(items, list) else []
@ -593,6 +625,21 @@ def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None:
lines.append("hardware: " + "; ".join(sorted(parts)))
def _append_hardware_groups(lines: list[str], summary: dict[str, Any]) -> None:
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
if not hardware:
return
parts = []
for key, names in hardware.items():
if not isinstance(names, list):
continue
name_list = _format_names([str(name) for name in names if name])
if name_list:
parts.append(f"{key}={name_list}")
if parts:
lines.append("hardware_nodes: " + "; ".join(sorted(parts)))
def _append_node_ages(lines: list[str], summary: dict[str, Any]) -> None:
ages = summary.get("node_ages") if isinstance(summary.get("node_ages"), list) else []
if not ages:
@ -1276,6 +1323,15 @@ def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None:
hottest=hottest,
)
)
used = postgres.get("used")
max_conn = postgres.get("max")
if used is not None or max_conn is not None:
lines.append(
"postgres_connections_total: used={used}, max={max}".format(
used=_format_float(used),
max=_format_float(max_conn),
)
)
by_db = postgres.get("by_db")
if isinstance(by_db, list) and by_db:
parts = []
@ -1772,9 +1828,10 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
bits.append(f"version={snapshot_version}")
lines.append("snapshot: " + ", ".join(bits))
_append_nodes(lines, summary)
_append_hardware(lines, summary)
_append_hardware_groups(lines, summary)
_append_lexicon(lines, summary)
_append_pressure(lines, summary)
_append_hardware(lines, summary)
_append_node_facts(lines, summary)
_append_node_ages(lines, summary)
_append_node_taints(lines, summary)