atlasbot: make retrieval fully llm-driven

This commit is contained in:
Brad Stein 2026-02-02 22:18:50 -03:00
parent b86c1097f7
commit 0dff795c98
2 changed files with 253 additions and 716 deletions

View File

@ -283,8 +283,6 @@ class AnswerEngine:
if "node" in lowered_q:
focus_entity = "node"
hotspot_override = False
hotspot_request = False
snapshot_context = ""
if classify.get("needs_snapshot"):
if observer:
@ -292,131 +290,64 @@ class AnswerEngine:
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
selected = _select_chunks(chunks, scored, plan, keyword_tokens)
key_facts = _key_fact_lines(summary_lines, keyword_tokens)
hottest_facts = _extract_hottest_facts(summary_lines, f"{question} {normalized}")
hardware_facts = _extract_hardware_usage_facts(summary_lines, f"{question} {normalized}")
hotspot_line = next((line for line in summary_lines if line.startswith("hottest:")), None)
forced_metric_lines: list[str] = []
fact_candidates = _collect_fact_candidates(selected, limit=plan.max_subquestions * 12)
key_facts = await _select_fact_lines(
call_llm,
normalized,
fact_candidates,
plan,
max_lines=max(4, plan.max_subquestions * 2),
)
metric_facts: list[str] = []
if classify.get("question_type") in {"metric", "diagnostic"}:
metric_prefixes = _metric_prefixes_from_lines(summary_lines)
selected_prefixes = await _select_metric_prefixes(
if observer:
observer("retrieve", "extracting fact types")
fact_types = await _extract_fact_types(
call_llm,
normalized,
metric_prefixes,
keyword_tokens,
plan,
plan.metric_retries,
)
if selected_prefixes:
forced_metric_lines = _lines_for_prefixes(summary_lines, selected_prefixes)
if not hardware_facts:
hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")
lowered_q = f"{question} {normalized}".lower()
if any(tok in lowered_q for tok in hardware_tokens):
for line in summary_lines:
if "hardware_usage_top:" in line:
hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
break
if not hardware_facts:
for line in summary_lines:
if "hardware_usage_avg:" in line:
hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")]
break
metric_facts = [line for line in key_facts if re.search(r"\d", line)]
hotspot_request = any(term in lowered_q for term in ("hot spot", "hotspot", "hot spots", "hotspots"))
hotspot_override = False
if focus_entity == "node" and hottest_facts:
metric_facts = hottest_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
elif hardware_facts:
metric_facts = hardware_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if classify.get("question_type") in {"metric", "diagnostic"}:
if focus_entity != "node" and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")) and any(
tok in lowered_q for tok in ("average", "avg", "mean", "ram", "memory", "cpu", "load")
):
hw_top = None
for line in summary_lines:
if "hardware_usage_top:" in line:
parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
if parts:
hw_top = parts[0]
break
if hw_top:
metric_facts = [hw_top]
if observer:
observer("retrieve", "deriving signals")
signals = await _derive_signals(
call_llm,
normalized,
fact_types,
plan,
)
if observer:
observer("retrieve", "scanning chunks")
candidate_lines: list[str] = []
if signals:
for chunk in selected:
chunk_lines = chunk["text"].splitlines()
if not chunk_lines:
continue
hits = await _scan_chunk_for_signals(
call_llm,
normalized,
signals,
chunk_lines,
plan,
)
if hits:
candidate_lines.extend(hits)
candidate_lines = list(dict.fromkeys(candidate_lines))
if candidate_lines:
if observer:
observer("retrieve", "pruning candidates")
metric_facts = await _prune_metric_candidates(
call_llm,
normalized,
candidate_lines,
plan,
plan.metric_retries,
)
if metric_facts:
key_facts = _merge_fact_lines(metric_facts, key_facts)
if hottest_facts and not hardware_facts and focus_entity != "class" and not hotspot_request:
metric_facts = hottest_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if "namespace" in lowered_q and any(term in lowered_q for term in ("hotspot", "hot spot", "hottest")):
hotspot_node = None
if hottest_facts:
match = re.search(r"hottest_\w+_node: (?P<node>[^\s\[]+)", hottest_facts[0])
if match:
hotspot_node = match.group("node")
if hotspot_node:
for line in summary_lines:
if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line:
metric_facts = _merge_fact_lines([line], metric_facts)
key_facts = _merge_fact_lines([line], key_facts)
break
if not hotspot_node or not any(line.startswith("node_namespaces_top:") for line in metric_facts):
for line in summary_lines:
if line.startswith("node_pods_top:"):
metric_facts = _merge_fact_lines([line], metric_facts)
key_facts = _merge_fact_lines([line], key_facts)
break
if classify.get("question_type") in {"metric", "diagnostic"} and not hottest_facts and not hardware_facts:
metric_candidates = _metric_candidate_lines(summary_lines, keyword_tokens)
if forced_metric_lines:
metric_candidates = forced_metric_lines + [
line for line in metric_candidates if line not in forced_metric_lines
]
selected_facts = await _select_metric_facts(call_llm, normalized, metric_candidates, plan)
if selected_facts:
metric_facts = selected_facts
key_facts = _merge_fact_lines(metric_facts, key_facts)
if self._settings.debug_pipeline:
_debug_log("metric_facts_selected", {"facts": metric_facts})
if classify.get("question_type") in {"metric", "diagnostic"} and not metric_facts:
lowered_q = f"{question} {normalized}".lower()
if "namespace" in lowered_q and "pod" in lowered_q:
for line in summary_lines:
if line.startswith("namespaces_top:"):
metric_facts = [line]
break
if not metric_facts:
hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")
if any(tok in lowered_q for tok in hardware_tokens):
for line in summary_lines:
if "hardware_usage_top:" in line:
metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
break
if not metric_facts:
for line in summary_lines:
if "hardware_usage_avg:" in line:
metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")]
break
if metric_facts:
key_facts = _merge_fact_lines(metric_facts, key_facts)
if ("pressure" in lowered_q or "diskpressure" in lowered_q or "storagepressure" in lowered_q) and not metric_facts:
pressure_lines = []
for line in summary_lines:
line_lower = line.lower()
if line_lower.startswith("pressure_summary:") or line_lower.startswith("pressure_nodes:"):
pressure_lines.append(line)
if "node_pressure" in line_lower or "pvc_pressure" in line_lower:
pressure_lines.append(line)
if line_lower.startswith("pvc_usage_top:") or line_lower.startswith("root_disk_low_headroom:"):
pressure_lines.append(line)
if pressure_lines:
metric_facts = pressure_lines[:2]
key_facts = _merge_fact_lines(metric_facts, key_facts)
if hotspot_request and hotspot_line:
metric_facts = [hotspot_line]
key_facts = _merge_fact_lines(metric_facts, key_facts)
if forced_metric_lines:
metric_facts = _merge_fact_lines(forced_metric_lines, metric_facts)
key_facts = _merge_fact_lines(forced_metric_lines, key_facts)
if self._settings.debug_pipeline:
_debug_log("metric_facts_selected", {"facts": metric_facts})
if self._settings.debug_pipeline:
scored_preview = sorted(
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
@ -632,278 +563,6 @@ class AnswerEngine:
if note:
reply = f"{reply}\n\n{note}"
if hotspot_request:
namespace_line = None
hotspot_node = None
if hotspot_line:
match = re.search(r"(titan-[a-z0-9]+)", hotspot_line)
if match:
hotspot_node = match.group(1)
if not hotspot_node:
for fact in metric_facts:
match = re.search(r"hottest_\w+_node: (?P<node>[^\s\[]+)", fact)
if match:
hotspot_node = match.group("node")
break
if "namespace" in lowered_q and hotspot_node:
for line in summary_lines:
if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line:
namespace_line = line
break
if not namespace_line:
for line in summary_lines:
if line.startswith("node_pods_top:") and hotspot_node in line:
namespace_line = line
break
if "namespace" in lowered_q and not namespace_line:
for fact in metric_facts:
if fact.startswith("node_namespaces_top:"):
namespace_line = fact
break
if not namespace_line:
for fact in metric_facts:
if fact.startswith("node_pods_top:"):
namespace_line = fact
break
if hotspot_line:
reply = f"Current hotspots: {hotspot_line}."
elif hotspot_node:
reply = f"Hotspot node: {hotspot_node}."
if namespace_line and reply:
reply = f"{reply} {namespace_line}."
if reply:
hotspot_override = True
if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts and not hotspot_override:
reply = _metric_fact_guard(reply, metric_facts, keyword_tokens)
if classify.get("question_type") in {"metric", "diagnostic"}:
lowered_q = f"{question} {normalized}".lower()
if any(tok in lowered_q for tok in ("how many", "count", "number of")) and any(
tok in lowered_q for tok in ("jetson", "rpi4", "rpi5", "amd64", "arm64", "rpi")
):
hw_line = next((line for line in summary_lines if line.startswith("hardware:")), None)
hw_nodes_line = next((line for line in summary_lines if line.startswith("hardware_nodes:")), None)
if hw_line:
def _find_value(key: str, line: str) -> str | None:
match = re.search(rf"{re.escape(key)}=([^;|]+)", line)
return match.group(1).strip() if match else None
target = None
if "jetson" in lowered_q:
target = "jetson"
elif "rpi5" in lowered_q:
target = "rpi5"
elif "rpi4" in lowered_q:
target = "rpi4"
elif "amd64" in lowered_q:
target = "amd64"
elif "arm64" in lowered_q:
target = "arm64"
elif "rpi" in lowered_q:
target = "rpi"
if target:
count = _find_value(target, hw_line)
nodes = _find_value(target, hw_nodes_line or "")
if count:
if nodes and "(" not in count:
reply = f"From the latest snapshot: {target}={count} ({nodes})."
else:
reply = f"From the latest snapshot: {target}={count}."
if any(tok in lowered_q for tok in ("mix", "breakdown", "composition", "hardware mix", "hardware stack")) and any(
tok in lowered_q for tok in ("jetson", "rpi4", "rpi5", "amd64", "arm64", "hardware")
):
hw_line = next((line for line in summary_lines if line.startswith("hardware:")), None)
hw_nodes_line = next((line for line in summary_lines if line.startswith("hardware_nodes:")), None)
if hw_line:
if hw_nodes_line:
reply = f"From the latest snapshot: {hw_line}; {hw_nodes_line}."
else:
reply = f"From the latest snapshot: {hw_line}."
if "node" in lowered_q and any(tok in lowered_q for tok in ("how many", "count", "total")):
nodes_line = next((line for line in summary_lines if line.startswith("nodes:")), None)
if nodes_line:
reply = f"From the latest snapshot: {nodes_line}."
if "pod" in lowered_q and "node" in lowered_q and any(tok in lowered_q for tok in ("most", "highest", "max")):
pods_line = next(
(line for line in summary_lines if line.startswith("node_pods_max:") or line.startswith("node_pods_top:")),
None,
)
if pods_line:
reply = f"From the latest snapshot: {pods_line}."
if "load_index" in lowered_q or "load index" in lowered_q:
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
node_load = summary.get("node_load") if isinstance(summary.get("node_load"), list) else []
best = None
for item in node_load:
if not isinstance(item, dict):
continue
node = item.get("node")
load = item.get("load_index")
try:
numeric = float(load)
except (TypeError, ValueError):
continue
if best is None or numeric > best["load_index"]:
best = {"node": node, "load_index": numeric}
if best and best.get("node") is not None:
reply = (
"From the latest snapshot: hottest_load_index_node: "
f"{best['node']} load_index={best['load_index']}."
)
if "workload" in lowered_q and any(tok in lowered_q for tok in ("not ready", "not-ready", "unready")):
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
health = summary.get("workloads_health") if isinstance(summary.get("workloads_health"), dict) else {}
items: list[dict[str, Any]] = []
for key in ("deployments", "statefulsets", "daemonsets"):
entry = health.get(key) if isinstance(health.get(key), dict) else {}
for item in entry.get("items") or []:
if not isinstance(item, dict):
continue
items.append(
{
"kind": key[:-1],
"namespace": item.get("namespace") or "",
"name": item.get("name") or "",
"desired": item.get("desired"),
"ready": item.get("ready"),
}
)
if items:
items = sorted(items, key=lambda item: (item.get("namespace") or "", item.get("name") or ""))
entries = [
f"{item.get('namespace','?')}/{item.get('name','?')} {item.get('kind','?')} ready={item.get('ready')} desired={item.get('desired')}"
for item in items[:3]
]
if entries:
reply = f"From the latest snapshot: workloads_not_ready: {', '.join(entries)}."
else:
reply = "From the latest snapshot: workloads_not_ready: none."
if "pod" in lowered_q and ("waiting" in lowered_q or "wait" in lowered_q) and "reason" in lowered_q:
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
pod_issues = summary.get("pod_issues") if isinstance(summary.get("pod_issues"), dict) else {}
waiting = pod_issues.get("waiting_reasons") if isinstance(pod_issues.get("waiting_reasons"), dict) else {}
if waiting:
top = sorted(waiting.items(), key=lambda item: (-item[1], item[0]))[:3]
items = [f"{reason}={count}" for reason, count in top]
reply = f"From the latest snapshot: pod_waiting_reasons: {'; '.join(items)}."
else:
reply = "From the latest snapshot: pod_waiting_reasons: none."
if "pvc" in lowered_q and any(tok in lowered_q for tok in ("usage", "used", "percent", "80", "full")):
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
pvc_usage = summary.get("pvc_usage_top") if isinstance(summary.get("pvc_usage_top"), list) else []
over = []
for item in pvc_usage:
if not isinstance(item, dict):
continue
used = item.get("used_percent")
if used is None:
continue
try:
used_val = float(used)
except (TypeError, ValueError):
continue
if used_val >= 80:
name = item.get("name") or item.get("pvc")
if name:
over.append(f"{name}={used_val:.2f}%")
if over:
reply = f"From the latest snapshot: pvc_usage>=80%: {', '.join(over)}."
else:
reply = "From the latest snapshot: pvc_usage>=80%: none."
if "pressure" in lowered_q:
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
pressure_nodes = summary.get("pressure_nodes") if isinstance(summary.get("pressure_nodes"), dict) else {}
entries = [f"{key}={value}" for key, value in pressure_nodes.items() if value]
if entries:
reply = f"From the latest snapshot: pressure_nodes: {', '.join(entries)}."
if _has_token(lowered_q, "io") and ("node" in lowered_q or "nodes" in lowered_q):
io_facts = _extract_hottest_facts(summary_lines, lowered_q)
io_line = next((fact for fact in io_facts if fact.startswith("hottest_io_node")), None)
if io_line:
reply = f"From the latest snapshot: {io_line}."
if "namespace" in lowered_q and "pod" in lowered_q:
ns_line = None
for line in summary_lines:
if line.startswith("namespaces_top:"):
ns_line = line
break
cpu_line = None
if any(tok in lowered_q for tok in ("cpu", "hottest", "highest cpu", "highest")):
cpu_facts = _extract_hottest_facts(summary_lines, lowered_q)
cpu_line = next((fact for fact in cpu_facts if fact.startswith("hottest_cpu_node")), None)
if ns_line:
if cpu_line:
reply = f"From the latest snapshot: {cpu_line}; {ns_line}."
else:
reply = f"From the latest snapshot: {ns_line}."
# do not fall through to other overrides
if ("hotspot" in lowered_q or "hottest" in lowered_q) and "namespace" in lowered_q and "node_namespaces_top:" not in reply:
hotspot_node = None
match = re.search(r"(titan-[a-z0-9]+)", reply)
if match:
hotspot_node = match.group(1)
if not hotspot_node:
for fact in metric_facts:
match = re.search(r"hottest_\w+_node: (?P<node>[^\s\[]+)", fact)
if match:
hotspot_node = match.group("node")
break
if hotspot_node:
node_line = next(
(line for line in summary_lines if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line),
None,
)
if not node_line:
node_line = next(
(line for line in summary_lines if line.startswith("node_pods_top:") and hotspot_node in line),
None,
)
if node_line:
reply = f"Hotspot node: {hotspot_node}. {node_line}."
if "postgres" in lowered_q and any(tok in lowered_q for tok in ("connection", "connections", "db")):
pg = None
if isinstance(snapshot_used, dict):
summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {}
if summary:
top = summary.get("top") if isinstance(summary.get("top"), dict) else {}
pg = top.get("postgres") if isinstance(top.get("postgres"), dict) else None
if not pg:
metrics = snapshot_used.get("metrics") if isinstance(snapshot_used.get("metrics"), dict) else {}
pg = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else None
if isinstance(pg, dict):
used = pg.get("used")
max_conn = pg.get("max")
hottest = pg.get("hottest_db") or pg.get("hottest")
if used is not None and max_conn is not None:
if hottest:
reply = f"Postgres connections: used={used}, max={max_conn}; hottest_db={hottest}."
else:
reply = f"Postgres connections: used={used}, max={max_conn}."
else:
reply = "Postgres connection data is not fully available in the snapshot."
if classify.get("question_type") in {"metric", "diagnostic"}:
lowered_q = f"{question} {normalized}".lower()
if (
focus_entity != "node"
and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64"))
and any(tok in lowered_q for tok in ("average", "avg", "mean", "per hardware", "by hardware", "typical"))
):
hw_top = None
for line in summary_lines:
if "hardware_usage_top:" in line:
parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")]
if parts:
hw_top = parts[0]
break
if hw_top:
reply = f"From the latest snapshot: {hw_top}."
reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup")
@ -1363,28 +1022,6 @@ def _select_chunks(
selected: list[dict[str, Any]] = []
head = chunks[0]
selected.append(head)
keyword_hits: list[dict[str, Any]] = []
raw_keywords = [kw.lower() for kw in (keywords or []) if kw]
focused = _focused_keywords(keywords or [])
if focused:
lowered = [kw.lower() for kw in focused if kw]
for item in ranked:
text = item.get("text", "").lower()
if any(kw in text for kw in lowered):
keyword_hits.append(item)
if raw_keywords:
for item in ranked:
if len(keyword_hits) >= plan.chunk_top:
break
text = item.get("text", "").lower()
if any(kw in text for kw in raw_keywords):
keyword_hits.append(item)
for item in keyword_hits:
if len(selected) >= plan.chunk_top:
break
if item in selected:
continue
selected.append(item)
for item in ranked:
if len(selected) >= plan.chunk_top:
break
@ -1433,33 +1070,6 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
return [line for line in text.splitlines() if line.strip()]
def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]:
if not lines or not keywords:
return []
lowered = [kw.lower() for kw in keywords if kw]
if not lowered:
return []
focused = _focused_keywords(lowered)
primary = focused or lowered
matches: list[str] = []
for line in lines:
line_lower = line.lower()
if any(kw in line_lower for kw in primary):
matches.append(line)
if len(matches) >= limit:
break
if len(matches) < limit and focused:
for line in lines:
if len(matches) >= limit:
break
if line in matches:
continue
line_lower = line.lower()
if any(kw in line_lower for kw in lowered):
matches.append(line)
return matches
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
seen = set()
merged: list[str] = []
@ -1541,172 +1151,6 @@ def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
return lines
def _extract_hottest_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if not any(
term in lowered
for term in ("hottest", "hot", "highest", "lowest", "most", "top", "peak", "loaded", "load", "busy")
):
return []
if "node" not in lowered and "nodes" not in lowered:
return []
line = ""
for item in lines:
if item.lower().startswith("hottest:"):
line = item
break
if " | " in item:
for seg in [seg.strip() for seg in item.split(" | ")]:
if seg.lower().startswith("hottest:"):
line = seg
break
if line:
break
if not line:
return []
facts = _expand_hottest_line(line)
if not facts:
return []
wanted = []
if _has_token(lowered, "io") and ("disk" in lowered or "storage" in lowered):
return [fact for fact in facts if fact.startswith("hottest_io_node")]
if _has_token(lowered, "cpu") or "processor" in lowered:
wanted.append("hottest_cpu_node")
if _has_token(lowered, "ram") or "memory" in lowered:
wanted.append("hottest_ram_node")
if _has_token(lowered, "net") or "network" in lowered or "throughput" in lowered:
wanted.append("hottest_net_node")
if _has_token(lowered, "io"):
wanted.append("hottest_io_node")
if _has_token(lowered, "disk") or "storage" in lowered:
wanted.append("hottest_disk_node")
if not wanted:
return facts
return [fact for fact in facts if any(label in fact for label in wanted)] or facts
def _extract_hardware_usage_facts(lines: list[str], question: str) -> list[str]:
if not lines:
return []
lowered = question.lower()
if "hardware" not in lowered:
return []
if not any(term in lowered for term in ("average", "avg", "mean", "per hardware", "by hardware", "typical")):
return []
avg_line = None
top_line = None
for line in lines:
segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line]
for seg in segments:
if seg.startswith("hardware_usage_avg:"):
avg_line = seg
elif seg.startswith("hardware_usage_top:"):
top_line = seg
if not avg_line and not top_line:
return []
wants_top = any(term in lowered for term in ("highest", "lowest", "most", "least", "top", "worst", "best"))
if wants_top and top_line:
return [top_line]
facts: list[str] = []
if avg_line:
facts.append(avg_line)
if top_line:
facts.append(top_line)
return facts
def _metric_candidate_lines(lines: list[str], keywords: list[str] | None, limit: int = 40) -> list[str]:
if not lines:
return []
lowered = [kw.lower() for kw in (keywords or []) if kw]
prefer_node = any("node" in kw for kw in lowered) or "hottest" in lowered
metric_tokens = {
"cpu",
"ram",
"memory",
"net",
"network",
"io",
"disk",
"load",
"usage",
"utilization",
"hottest",
"p95",
"percent",
"pressure",
"pod",
"pods",
"namespace",
"anomaly",
"anomalies",
"pvc",
"storage",
"pressure_summary",
"pressure_nodes",
"diskpressure",
"storagepressure",
}
candidates: list[str] = []
expanded: list[str] = []
for line in lines:
if " | " in line:
expanded.extend([seg.strip() for seg in line.split(" | ") if seg.strip()])
expanded.append(line)
for line in expanded:
if line.lower().startswith("hottest:"):
candidates.extend(_expand_hottest_line(line))
break
for line in expanded:
line_lower = line.lower()
if line_lower.startswith("lexicon_") or line_lower.startswith("units:"):
continue
if prefer_node and "pod_" in line_lower:
continue
if "hottest:" in line_lower:
candidates.append(line)
continue
if lowered and any(kw in line_lower for kw in lowered):
candidates.append(line)
continue
if any(token in line_lower for token in metric_tokens) and re.search(r"\d", line_lower):
candidates.append(line)
continue
return candidates[:limit]
def _metric_prefixes_from_lines(lines: list[str]) -> list[str]:
if not lines:
return []
prefixes: list[str] = []
for line in lines:
segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line]
for seg in segments:
match = re.match(r"^([a-z0-9_]+):", seg)
if match:
prefix = match.group(1)
if prefix not in prefixes:
prefixes.append(prefix)
return prefixes
def _lines_for_prefixes(lines: list[str], prefixes: list[str]) -> list[str]:
if not lines or not prefixes:
return []
wanted = set(prefixes)
selected: list[str] = []
for line in lines:
segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line]
for seg in segments:
match = re.match(r"^([a-z0-9_]+):", seg)
if match and match.group(1) in wanted:
if seg not in selected:
selected.append(seg)
return selected
async def _select_best_candidate(
call_llm: Callable[..., Any],
question: str,
@ -1731,42 +1175,45 @@ async def _select_best_candidate(
return 0
async def _select_metric_prefixes(
def _dedupe_lines(lines: list[str], limit: int | None = None) -> list[str]:
seen: set[str] = set()
cleaned: list[str] = []
for line in lines:
value = (line or "").strip()
if not value or value in seen:
continue
if value.lower().startswith("lexicon_") or value.lower().startswith("units:"):
continue
cleaned.append(value)
seen.add(value)
if limit and len(cleaned) >= limit:
break
return cleaned
def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list[str]:
lines: list[str] = []
for chunk in selected:
text = chunk.get("text") if isinstance(chunk, dict) else None
if not isinstance(text, str):
continue
lines.extend([line for line in text.splitlines() if line.strip()])
return _dedupe_lines(lines, limit=limit)
async def _select_best_list(
call_llm: Callable[..., Any],
question: str,
prefixes: list[str],
candidates: list[list[str]],
plan: ModePlan,
attempts: int,
tag: str,
) -> list[str]:
if not prefixes:
return []
prompt = (
prompts.METRIC_PREFIX_PROMPT
+ "\nQuestion: "
+ question
+ "\nAvailablePrefixes:\n"
+ ", ".join(prefixes)
)
candidates: list[list[str]] = []
for _ in range(max(attempts, 1)):
raw = await call_llm(prompts.METRIC_PREFIX_SYSTEM, prompt, model=plan.fast_model, tag="metric_prefix")
data = _parse_json_block(raw, fallback={})
picked = data.get("prefixes") if isinstance(data, dict) else None
if not isinstance(picked, list):
continue
cleaned: list[str] = []
allowed = set(prefixes)
for item in picked:
if isinstance(item, str) and item in allowed and item not in cleaned:
cleaned.append(item)
if cleaned:
candidates.append(cleaned)
if not candidates:
return []
if len(candidates) == 1:
return candidates[0]
render = ["; ".join(items) for items in candidates]
best_idx = await _select_best_candidate(call_llm, question, render, plan, "metric_prefix_select")
best_idx = await _select_best_candidate(call_llm, question, render, plan, tag)
chosen = candidates[best_idx] if 0 <= best_idx < len(candidates) else candidates[0]
if not chosen:
merged: list[str] = []
@ -1775,56 +1222,135 @@ async def _select_metric_prefixes(
if item not in merged:
merged.append(item)
chosen = merged
return chosen[:8]
return chosen
async def _select_metric_facts(
async def _extract_fact_types(
call_llm: Callable[..., Any],
question: str,
keywords: list[str],
plan: ModePlan,
) -> list[str]:
prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question
if keywords:
prompt += "\nKeywords: " + ", ".join(keywords)
candidates: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.FACT_TYPES_SYSTEM, prompt, model=plan.fast_model, tag="fact_types")
data = _parse_json_block(raw, fallback={})
items = data.get("fact_types") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=10)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "fact_types_select")
return chosen[:10]
async def _derive_signals(
call_llm: Callable[..., Any],
question: str,
fact_types: list[str],
plan: ModePlan,
) -> list[str]:
if not fact_types:
return []
prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types))
candidates: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.SIGNAL_SYSTEM, prompt, model=plan.fast_model, tag="signals")
data = _parse_json_block(raw, fallback={})
items = data.get("signals") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=12)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "signals_select")
return chosen[:12]
async def _scan_chunk_for_signals(
call_llm: Callable[..., Any],
question: str,
signals: list[str],
chunk_lines: list[str],
plan: ModePlan,
) -> list[str]:
if not signals or not chunk_lines:
return []
prompt = prompts.CHUNK_SCAN_PROMPT.format(
signals="; ".join(signals),
lines="\n".join(chunk_lines),
)
attempts = max(1, min(plan.metric_retries, 2))
candidates: list[list[str]] = []
for _ in range(attempts):
raw = await call_llm(prompts.CHUNK_SCAN_SYSTEM, prompt, model=plan.fast_model, tag="chunk_scan")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in chunk_lines if line in items]
cleaned = _dedupe_lines(cleaned, limit=15)
if cleaned:
candidates.append(cleaned)
chosen = await _select_best_list(call_llm, question, candidates, plan, "chunk_scan_select")
return chosen[:15]
async def _prune_metric_candidates(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
max_lines: int = 2,
attempts: int,
) -> list[str]:
if not candidates:
return []
prompt = (
prompts.FACT_SELECT_PROMPT.format(max_lines=max_lines)
+ "\nQuestion: "
+ question
+ "\nCandidates:\n"
+ "\n".join([f"- {line}" for line in candidates])
)
raw = await call_llm(prompts.FACT_SELECT_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
data = _parse_json_block(raw, fallback={})
lines = data.get("lines") if isinstance(data, dict) else None
if not isinstance(lines, list):
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6)
picks: list[list[str]] = []
for _ in range(max(attempts, 1)):
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_prune")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in candidates if line in items]
cleaned = _dedupe_lines(cleaned, limit=6)
if cleaned:
picks.append(cleaned)
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_prune_select")
return chosen[:6]
async def _select_fact_lines(
call_llm: Callable[..., Any],
question: str,
candidates: list[str],
plan: ModePlan,
max_lines: int,
) -> list[str]:
if not candidates:
return []
cleaned = []
allowed = set(candidates)
for line in lines:
if isinstance(line, str) and line in allowed and line not in cleaned:
cleaned.append(line)
if len(cleaned) >= max_lines:
break
return cleaned
def _metric_fact_guard(reply: str, metric_facts: list[str], keywords: list[str]) -> str:
if not metric_facts:
return reply
best_line = None
lowered_keywords = [kw.lower() for kw in keywords if kw]
for line in metric_facts:
line_lower = line.lower()
if any(kw in line_lower for kw in lowered_keywords):
best_line = line
break
best_line = best_line or metric_facts[0]
reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply))
fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts)))
if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)):
return f"From the latest snapshot: {best_line}."
return reply
prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines)
picks: list[list[str]] = []
attempts = max(plan.metric_retries, 1)
for _ in range(attempts):
raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_select")
data = _parse_json_block(raw, fallback={})
items = data.get("lines") if isinstance(data, dict) else None
if not isinstance(items, list):
continue
cleaned = [line for line in candidates if line in items]
cleaned = _dedupe_lines(cleaned, limit=max_lines)
if cleaned:
picks.append(cleaned)
chosen = await _select_best_list(call_llm, question, picks, plan, "fact_select_best")
return chosen[:max_lines]
def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str:
@ -2019,41 +1545,6 @@ def _extract_keywords(
return list(dict.fromkeys(tokens))[:12]
def _focused_keywords(tokens: list[str]) -> list[str]:
generic = {
"atlas",
"cluster",
"node",
"nodes",
"pod",
"pods",
"namespace",
"namespaces",
"k8s",
"kubernetes",
"service",
"services",
"workload",
"workloads",
}
scored: list[tuple[int, str]] = []
for token in tokens:
if not token or token in generic:
continue
score = 1
if any(ch.isdigit() for ch in token):
score += 2
if "-" in token:
score += 1
if len(token) >= 6:
score += 1
scored.append((score, token))
if not scored:
return [token for token in tokens if token not in generic][:6]
scored.sort(key=lambda item: (-item[0], item[1]))
return [token for _, token in scored][:6]
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
if hardware:

View File

@ -243,12 +243,58 @@ FACT_SELECT_SYSTEM = (
FACT_SELECT_PROMPT = (
"Pick up to {max_lines} lines from Candidates that best answer the question. "
"If the question asks for highest/hottest and Candidates include a line starting with 'hottest:', you must include that line. "
"If Candidates include hottest_*_node lines, prefer those for node hottest questions. "
"If the question mentions nodes and a 'hottest:' line exists, prefer node-level facts over pod-level lines. "
"Avoid pod_* lines unless the question explicitly mentions pods. "
"Exclude lexicon/definition lines; choose lines with concrete numeric values. "
"Return JSON with field: lines (list of strings). If none apply, return {{\"lines\": []}}."
"Prefer lines with concrete numeric values or explicit identifiers. "
"Avoid purely definitional lines unless the question asks for definitions. "
"Return JSON with field: lines (list of strings). If none apply, return {\"lines\": []}."
)
FACT_TYPES_SYSTEM = (
CLUSTER_SYSTEM
+ " Identify the minimal fact types needed from the snapshot to answer the question. "
+ "Return JSON only."
)
FACT_TYPES_PROMPT = (
"Return JSON with field: fact_types (list of short noun phrases). "
"Keep each entry short and concrete (e.g., \"node pressure flags\", \"hardware class counts\", \"postgres connections\")."
)
SIGNAL_SYSTEM = (
CLUSTER_SYSTEM
+ " Translate fact types into signals or cues likely present in snapshot lines. "
+ "Return JSON only."
)
SIGNAL_PROMPT = (
"Question: {question}\nFactTypes: {fact_types}\n"
"Return JSON with field: signals (list). "
"Signals should be brief phrases or tokens that might appear in snapshot lines."
)
CHUNK_SCAN_SYSTEM = (
CLUSTER_SYSTEM
+ " Select exact lines from the chunk that match the needed signals. "
+ "Return JSON only."
)
CHUNK_SCAN_PROMPT = (
"Signals: {signals}\n"
"Lines:\n{lines}\n"
"Return JSON with field: lines (list of exact lines from Lines)."
)
FACT_PRUNE_SYSTEM = (
CLUSTER_SYSTEM
+ " Prune candidate lines to the smallest set that answers the question. "
+ "Return JSON only."
)
FACT_PRUNE_PROMPT = (
"Question: {question}\n"
"Candidates:\n{candidates}\n"
"Return JSON with field: lines (list). "
"Pick up to {max_lines} lines that best answer the question. "
"Return an empty list if none apply."
)
SELECT_CLAIMS_PROMPT = (