Compare commits

...

6 Commits

4 changed files with 293 additions and 4 deletions

View File

@ -151,9 +151,12 @@ class AnswerEngine:
if self._settings.snapshot_pin_enabled and state and state.snapshot: if self._settings.snapshot_pin_enabled and state and state.snapshot:
snapshot_used = state.snapshot snapshot_used = state.snapshot
summary = build_summary(snapshot_used) summary = build_summary(snapshot_used)
allowed_nodes = _allowed_nodes(summary)
allowed_namespaces = _allowed_namespaces(summary)
summary_lines = _summary_lines(snapshot_used) summary_lines = _summary_lines(snapshot_used)
kb_summary = self._kb.summary() kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=6) runbooks = self._kb.runbook_titles(limit=6)
runbook_paths = self._kb.runbook_paths(limit=10)
history_ctx = _format_history(history) history_ctx = _format_history(history)
lexicon_ctx = _lexicon_context(summary) lexicon_ctx = _lexicon_context(summary)
@ -178,6 +181,7 @@ class AnswerEngine:
normalized = str(normalize.get("normalized") or question).strip() or question normalized = str(normalize.get("normalized") or question).strip() or question
keywords = normalize.get("keywords") or [] keywords = normalize.get("keywords") or []
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords}) _debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
keyword_tokens = _extract_keywords(normalized, sub_questions=[], keywords=keywords)
if observer: if observer:
observer("route", "routing") observer("route", "routing")
@ -204,6 +208,18 @@ class AnswerEngine:
"workload", "workload",
"k8s", "k8s",
"kubernetes", "kubernetes",
"postgres",
"database",
"db",
"connections",
"cpu",
"ram",
"memory",
"network",
"io",
"disk",
"pvc",
"storage",
) )
if any(term in normalized.lower() for term in cluster_terms): if any(term in normalized.lower() for term in cluster_terms):
classify["needs_snapshot"] = True classify["needs_snapshot"] = True
@ -229,6 +245,7 @@ class AnswerEngine:
parts = _parse_json_list(decompose_raw) parts = _parse_json_list(decompose_raw)
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions) sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
_debug_log("decompose_parsed", {"sub_questions": sub_questions}) _debug_log("decompose_parsed", {"sub_questions": sub_questions})
keyword_tokens = _extract_keywords(normalized, sub_questions=sub_questions, keywords=keywords)
snapshot_context = "" snapshot_context = ""
if classify.get("needs_snapshot"): if classify.get("needs_snapshot"):
@ -236,7 +253,7 @@ class AnswerEngine:
observer("retrieve", "scoring chunks") observer("retrieve", "scoring chunks")
chunks = _chunk_lines(summary_lines, plan.chunk_lines) chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan) scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan) selected = _select_chunks(chunks, scored, plan, keyword_tokens)
if self._settings.debug_pipeline: if self._settings.debug_pipeline:
scored_preview = sorted( scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks], [{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
@ -275,15 +292,46 @@ class AnswerEngine:
observer("synthesize", "synthesizing") observer("synthesize", "synthesizing")
reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm) reply = await self._synthesize_answer(normalized, subanswers, context, classify, plan, call_llm)
if snapshot_context and _needs_evidence_fix(reply, classify): unknown_nodes = _find_unknown_nodes(reply, allowed_nodes)
unknown_namespaces = _find_unknown_namespaces(reply, allowed_namespaces)
runbook_fix = _needs_runbook_fix(reply, runbook_paths)
runbook_needed = _needs_runbook_reference(normalized, runbook_paths, reply)
needs_evidence = _needs_evidence_fix(reply, classify)
resolved_runbook = None
if runbook_paths and (runbook_fix or runbook_needed):
resolver_prompt = prompts.RUNBOOK_SELECT_PROMPT + "\nQuestion: " + normalized
resolver_raw = await call_llm(
prompts.RUNBOOK_SELECT_SYSTEM,
resolver_prompt,
context="AllowedRunbooks:\n" + "\n".join(runbook_paths),
model=plan.fast_model,
tag="runbook_select",
)
resolver = _parse_json_block(resolver_raw, fallback={})
resolved_runbook = resolver.get("path") if isinstance(resolver.get("path"), str) else None
if (snapshot_context and needs_evidence) or unknown_nodes or unknown_namespaces or runbook_fix or runbook_needed:
if observer: if observer:
observer("evidence_fix", "repairing missing evidence") observer("evidence_fix", "repairing missing evidence")
extra_bits = []
if unknown_nodes:
extra_bits.append("UnknownNodes: " + ", ".join(sorted(unknown_nodes)))
if unknown_namespaces:
extra_bits.append("UnknownNamespaces: " + ", ".join(sorted(unknown_namespaces)))
if runbook_paths:
extra_bits.append("AllowedRunbooks: " + ", ".join(runbook_paths))
if resolved_runbook:
extra_bits.append("ResolvedRunbook: " + resolved_runbook)
if allowed_nodes:
extra_bits.append("AllowedNodes: " + ", ".join(allowed_nodes))
if allowed_namespaces:
extra_bits.append("AllowedNamespaces: " + ", ".join(allowed_namespaces))
fix_prompt = ( fix_prompt = (
prompts.EVIDENCE_FIX_PROMPT prompts.EVIDENCE_FIX_PROMPT
+ "\nQuestion: " + "\nQuestion: "
+ normalized + normalized
+ "\nDraft: " + "\nDraft: "
+ reply + reply
+ ("\n" + "\n".join(extra_bits) if extra_bits else "")
) )
reply = await call_llm( reply = await call_llm(
prompts.EVIDENCE_FIX_SYSTEM, prompts.EVIDENCE_FIX_SYSTEM,
@ -667,6 +715,7 @@ def _select_chunks(
chunks: list[dict[str, Any]], chunks: list[dict[str, Any]],
scores: dict[str, float], scores: dict[str, float],
plan: ModePlan, plan: ModePlan,
keywords: list[str] | None = None,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
if not chunks: if not chunks:
return [] return []
@ -674,6 +723,20 @@ def _select_chunks(
selected: list[dict[str, Any]] = [] selected: list[dict[str, Any]] = []
head = chunks[0] head = chunks[0]
selected.append(head) selected.append(head)
keyword_hits: list[dict[str, Any]] = []
focused = _focused_keywords(keywords or [])
if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked:
text = item.get("text", "").lower()
if any(kw in text for kw in lowered):
keyword_hits.append(item)
for item in keyword_hits:
if len(selected) >= plan.chunk_top:
break
if item in selected:
continue
selected.append(item)
for item in ranked: for item in ranked:
if len(selected) >= plan.chunk_top: if len(selected) >= plan.chunk_top:
break break
@ -797,9 +860,15 @@ def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
"can't", "can't",
"need to", "need to",
"would need", "would need",
"does not provide",
"does not mention",
"not mention",
"not provided", "not provided",
"not in context",
"not referenced",
"missing", "missing",
"no specific", "no specific",
"no information",
) )
if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers): if classify.get("needs_snapshot") and any(marker in lowered for marker in missing_markers):
return True return True
@ -808,6 +877,143 @@ def _needs_evidence_fix(reply: str, classify: dict[str, Any]) -> bool:
return False return False
def _extract_keywords(normalized: str, sub_questions: list[str], keywords: list[Any] | None) -> list[str]:
stopwords = {
"the",
"and",
"for",
"with",
"that",
"this",
"what",
"which",
"when",
"where",
"who",
"why",
"how",
"tell",
"show",
"list",
"give",
"about",
"right",
"now",
}
tokens: list[str] = []
for source in [normalized, *sub_questions]:
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
if len(part) < 3 or part in stopwords:
continue
tokens.append(part)
if keywords:
for kw in keywords:
if isinstance(kw, str):
part = kw.strip().lower()
if part and part not in stopwords and part not in tokens:
tokens.append(part)
return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:
return sorted([node for node in hardware.keys() if isinstance(node, str)])
return []
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
namespaces: list[str] = []
for entry in summary.get("namespace_pods") or []:
if isinstance(entry, dict):
name = entry.get("namespace")
if name:
namespaces.append(str(name))
return sorted(set(namespaces))
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\b(titan-[0-9a-z]+|node\d+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
if not reply or not allowed:
return []
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
found = {m.group(1) for m in pattern.finditer(reply)}
if not found:
return []
allowed_set = {a.lower() for a in allowed}
return sorted({item for item in found if item.lower() not in allowed_set})
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
if not reply or not allowed:
return False
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
if not paths:
return False
allowed_set = {p.lower() for p in allowed}
return any(path.lower() not in allowed_set for path in paths)
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
if not allowed or not question:
return False
lowered = question.lower()
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
if not any(cue in lowered for cue in cues):
return False
if not reply:
return True
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
if token.lower() in {p.lower() for p in allowed}:
return False
return True
def _resolve_path(data: Any, path: str) -> Any | None: def _resolve_path(data: Any, path: str) -> Any | None:
cursor = data cursor = data
for part in re.split(r"\.(?![^\[]*\])", path): for part in re.split(r"\.(?![^\[]*\])", path):

View File

@ -58,3 +58,16 @@ class KnowledgeBase:
if not titles: if not titles:
return "" return ""
return "Relevant runbooks:\n" + "\n".join(titles[:limit]) return "Relevant runbooks:\n" + "\n".join(titles[:limit])
def runbook_paths(self, *, limit: int = 10) -> list[str]:
self.load()
if not self._runbooks:
return []
paths: list[str] = []
for entry in self._runbooks:
if not isinstance(entry, dict):
continue
path = entry.get("path")
if path:
paths.append(str(path))
return paths[:limit]

View File

@ -99,7 +99,20 @@ EVIDENCE_FIX_PROMPT = (
"Check the draft against the context. " "Check the draft against the context. "
"If the draft says data is missing but the context includes relevant values, " "If the draft says data is missing but the context includes relevant values, "
"rewrite the answer to include those values. " "rewrite the answer to include those values. "
"If data is truly missing, keep the draft concise and honest." "If data is truly missing, keep the draft concise and honest. "
"If AllowedRunbooks are provided, use an exact path from that list when answering "
"documentation or checklist questions and do not invent new paths."
)
RUNBOOK_SELECT_SYSTEM = (
CLUSTER_SYSTEM
+ " Select the single best runbook path from the allowed list. "
+ "Return JSON only."
)
RUNBOOK_SELECT_PROMPT = (
"Pick the best runbook path for the question from the AllowedRunbooks list. "
"Return JSON with field: path. If none apply, return {\"path\": \"\"}."
) )
DRAFT_SELECT_PROMPT = ( DRAFT_SELECT_PROMPT = (

View File

@ -104,6 +104,7 @@ def build_summary(snapshot: dict[str, Any] | None) -> dict[str, Any]:
summary.update(_build_workloads(snapshot)) summary.update(_build_workloads(snapshot))
summary.update(_build_flux(snapshot)) summary.update(_build_flux(snapshot))
_merge_cluster_summary(snapshot, summary) _merge_cluster_summary(snapshot, summary)
_augment_lexicon(summary)
return summary return summary
@ -131,6 +132,37 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) ->
summary["cross_stats"] = cross_stats summary["cross_stats"] = cross_stats
def _augment_lexicon(summary: dict[str, Any]) -> None:
lexicon = summary.get("lexicon")
if not isinstance(lexicon, dict):
lexicon = {"terms": [], "aliases": {}}
terms = list(lexicon.get("terms") or [])
aliases = dict(lexicon.get("aliases") or {})
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
hardware_map = {
"rpi5": "Raspberry Pi 5 nodes",
"rpi4": "Raspberry Pi 4 nodes",
"rpi": "Raspberry Pi nodes",
"jetson": "NVIDIA Jetson nodes",
"amd64": "AMD64 nodes",
}
existing_terms = {entry.get("term") for entry in terms if isinstance(entry, dict)}
for key, meaning in hardware_map.items():
if key not in hardware:
continue
if key not in existing_terms:
terms.append({"term": key, "meaning": meaning})
if key not in aliases:
aliases[key] = meaning
if "raspberry pi 5" not in aliases and "rpi5" in hardware:
aliases["raspberry pi 5"] = "rpi5"
if "raspberry pi 4" not in aliases and "rpi4" in hardware:
aliases["raspberry pi 4"] = "rpi4"
lexicon["terms"] = terms
lexicon["aliases"] = aliases
summary["lexicon"] = lexicon
def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
items = snapshot.get("nodes_detail") items = snapshot.get("nodes_detail")
return items if isinstance(items, list) else [] return items if isinstance(items, list) else []
@ -593,6 +625,21 @@ def _append_hardware(lines: list[str], summary: dict[str, Any]) -> None:
lines.append("hardware: " + "; ".join(sorted(parts))) lines.append("hardware: " + "; ".join(sorted(parts)))
def _append_hardware_groups(lines: list[str], summary: dict[str, Any]) -> None:
hardware = summary.get("hardware") if isinstance(summary.get("hardware"), dict) else {}
if not hardware:
return
parts = []
for key, names in hardware.items():
if not isinstance(names, list):
continue
name_list = _format_names([str(name) for name in names if name])
if name_list:
parts.append(f"{key}={name_list}")
if parts:
lines.append("hardware_nodes: " + "; ".join(sorted(parts)))
def _append_node_ages(lines: list[str], summary: dict[str, Any]) -> None: def _append_node_ages(lines: list[str], summary: dict[str, Any]) -> None:
ages = summary.get("node_ages") if isinstance(summary.get("node_ages"), list) else [] ages = summary.get("node_ages") if isinstance(summary.get("node_ages"), list) else []
if not ages: if not ages:
@ -1276,6 +1323,15 @@ def _append_postgres(lines: list[str], summary: dict[str, Any]) -> None:
hottest=hottest, hottest=hottest,
) )
) )
used = postgres.get("used")
max_conn = postgres.get("max")
if used is not None or max_conn is not None:
lines.append(
"postgres_connections_total: used={used}, max={max}".format(
used=_format_float(used),
max=_format_float(max_conn),
)
)
by_db = postgres.get("by_db") by_db = postgres.get("by_db")
if isinstance(by_db, list) and by_db: if isinstance(by_db, list) and by_db:
parts = [] parts = []
@ -1772,9 +1828,10 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
bits.append(f"version={snapshot_version}") bits.append(f"version={snapshot_version}")
lines.append("snapshot: " + ", ".join(bits)) lines.append("snapshot: " + ", ".join(bits))
_append_nodes(lines, summary) _append_nodes(lines, summary)
_append_hardware(lines, summary)
_append_hardware_groups(lines, summary)
_append_lexicon(lines, summary) _append_lexicon(lines, summary)
_append_pressure(lines, summary) _append_pressure(lines, summary)
_append_hardware(lines, summary)
_append_node_facts(lines, summary) _append_node_facts(lines, summary)
_append_node_ages(lines, summary) _append_node_ages(lines, summary)
_append_node_taints(lines, summary) _append_node_taints(lines, summary)