From 0dff795c98491285dacfdff949060d71da23fa62 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 2 Feb 2026 22:18:50 -0300 Subject: [PATCH] atlasbot: make retrieval fully llm-driven --- atlasbot/engine/answerer.py | 911 ++++++++---------------------------- atlasbot/llm/prompts.py | 58 ++- 2 files changed, 253 insertions(+), 716 deletions(-) diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py index 8f0d57f..6f9f304 100644 --- a/atlasbot/engine/answerer.py +++ b/atlasbot/engine/answerer.py @@ -283,8 +283,6 @@ class AnswerEngine: if "node" in lowered_q: focus_entity = "node" - hotspot_override = False - hotspot_request = False snapshot_context = "" if classify.get("needs_snapshot"): if observer: @@ -292,131 +290,64 @@ class AnswerEngine: chunks = _chunk_lines(summary_lines, plan.chunk_lines) scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan) selected = _select_chunks(chunks, scored, plan, keyword_tokens) - key_facts = _key_fact_lines(summary_lines, keyword_tokens) - hottest_facts = _extract_hottest_facts(summary_lines, f"{question} {normalized}") - hardware_facts = _extract_hardware_usage_facts(summary_lines, f"{question} {normalized}") - hotspot_line = next((line for line in summary_lines if line.startswith("hottest:")), None) - forced_metric_lines: list[str] = [] + fact_candidates = _collect_fact_candidates(selected, limit=plan.max_subquestions * 12) + key_facts = await _select_fact_lines( + call_llm, + normalized, + fact_candidates, + plan, + max_lines=max(4, plan.max_subquestions * 2), + ) + metric_facts: list[str] = [] if classify.get("question_type") in {"metric", "diagnostic"}: - metric_prefixes = _metric_prefixes_from_lines(summary_lines) - selected_prefixes = await _select_metric_prefixes( + if observer: + observer("retrieve", "extracting fact types") + fact_types = await _extract_fact_types( call_llm, normalized, - metric_prefixes, + keyword_tokens, plan, - plan.metric_retries, ) - if selected_prefixes: - forced_metric_lines = _lines_for_prefixes(summary_lines, selected_prefixes) - if not hardware_facts: - hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64") - lowered_q = f"{question} {normalized}".lower() - if any(tok in lowered_q for tok in hardware_tokens): - for line in summary_lines: - if "hardware_usage_top:" in line: - hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")] - break - if not hardware_facts: - for line in summary_lines: - if "hardware_usage_avg:" in line: - hardware_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")] - break - metric_facts = [line for line in key_facts if re.search(r"\d", line)] - hotspot_request = any(term in lowered_q for term in ("hot spot", "hotspot", "hot spots", "hotspots")) - hotspot_override = False - if focus_entity == "node" and hottest_facts: - metric_facts = hottest_facts - key_facts = _merge_fact_lines(metric_facts, key_facts) - elif hardware_facts: - metric_facts = hardware_facts - key_facts = _merge_fact_lines(metric_facts, key_facts) - if classify.get("question_type") in {"metric", "diagnostic"}: - if focus_entity != "node" and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")) and any( - tok in lowered_q for tok in ("average", "avg", "mean", "ram", "memory", "cpu", "load") - ): - hw_top = None - for line in summary_lines: - if "hardware_usage_top:" in line: - parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")] - if parts: - hw_top = parts[0] - break - if hw_top: - metric_facts = [hw_top] + if observer: + observer("retrieve", "deriving signals") + signals = await _derive_signals( + call_llm, + normalized, + fact_types, + plan, + ) + if observer: + observer("retrieve", "scanning chunks") + candidate_lines: list[str] = [] + if signals: + for chunk in selected: + chunk_lines = chunk["text"].splitlines() + if not chunk_lines: + continue + hits = await _scan_chunk_for_signals( + call_llm, + normalized, + signals, + chunk_lines, + plan, + ) + if hits: + candidate_lines.extend(hits) + candidate_lines = list(dict.fromkeys(candidate_lines)) + if candidate_lines: + if observer: + observer("retrieve", "pruning candidates") + metric_facts = await _prune_metric_candidates( + call_llm, + normalized, + candidate_lines, + plan, + plan.metric_retries, + ) + if metric_facts: key_facts = _merge_fact_lines(metric_facts, key_facts) - if hottest_facts and not hardware_facts and focus_entity != "class" and not hotspot_request: - metric_facts = hottest_facts - key_facts = _merge_fact_lines(metric_facts, key_facts) - if "namespace" in lowered_q and any(term in lowered_q for term in ("hotspot", "hot spot", "hottest")): - hotspot_node = None - if hottest_facts: - match = re.search(r"hottest_\w+_node: (?P[^\s\[]+)", hottest_facts[0]) - if match: - hotspot_node = match.group("node") - if hotspot_node: - for line in summary_lines: - if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line: - metric_facts = _merge_fact_lines([line], metric_facts) - key_facts = _merge_fact_lines([line], key_facts) - break - if not hotspot_node or not any(line.startswith("node_namespaces_top:") for line in metric_facts): - for line in summary_lines: - if line.startswith("node_pods_top:"): - metric_facts = _merge_fact_lines([line], metric_facts) - key_facts = _merge_fact_lines([line], key_facts) - break - if classify.get("question_type") in {"metric", "diagnostic"} and not hottest_facts and not hardware_facts: - metric_candidates = _metric_candidate_lines(summary_lines, keyword_tokens) - if forced_metric_lines: - metric_candidates = forced_metric_lines + [ - line for line in metric_candidates if line not in forced_metric_lines - ] - selected_facts = await _select_metric_facts(call_llm, normalized, metric_candidates, plan) - if selected_facts: - metric_facts = selected_facts - key_facts = _merge_fact_lines(metric_facts, key_facts) - if self._settings.debug_pipeline: - _debug_log("metric_facts_selected", {"facts": metric_facts}) - if classify.get("question_type") in {"metric", "diagnostic"} and not metric_facts: - lowered_q = f"{question} {normalized}".lower() - if "namespace" in lowered_q and "pod" in lowered_q: - for line in summary_lines: - if line.startswith("namespaces_top:"): - metric_facts = [line] - break - if not metric_facts: - hardware_tokens = ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64") - if any(tok in lowered_q for tok in hardware_tokens): - for line in summary_lines: - if "hardware_usage_top:" in line: - metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")] - break - if not metric_facts: - for line in summary_lines: - if "hardware_usage_avg:" in line: - metric_facts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_avg:")] - break - if metric_facts: - key_facts = _merge_fact_lines(metric_facts, key_facts) - if ("pressure" in lowered_q or "diskpressure" in lowered_q or "storagepressure" in lowered_q) and not metric_facts: - pressure_lines = [] - for line in summary_lines: - line_lower = line.lower() - if line_lower.startswith("pressure_summary:") or line_lower.startswith("pressure_nodes:"): - pressure_lines.append(line) - if "node_pressure" in line_lower or "pvc_pressure" in line_lower: - pressure_lines.append(line) - if line_lower.startswith("pvc_usage_top:") or line_lower.startswith("root_disk_low_headroom:"): - pressure_lines.append(line) - if pressure_lines: - metric_facts = pressure_lines[:2] - key_facts = _merge_fact_lines(metric_facts, key_facts) - if hotspot_request and hotspot_line: - metric_facts = [hotspot_line] - key_facts = _merge_fact_lines(metric_facts, key_facts) - if forced_metric_lines: - metric_facts = _merge_fact_lines(forced_metric_lines, metric_facts) - key_facts = _merge_fact_lines(forced_metric_lines, key_facts) + if self._settings.debug_pipeline: + _debug_log("metric_facts_selected", {"facts": metric_facts}) if self._settings.debug_pipeline: scored_preview = sorted( [{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks], @@ -632,278 +563,6 @@ class AnswerEngine: if note: reply = f"{reply}\n\n{note}" - if hotspot_request: - namespace_line = None - hotspot_node = None - if hotspot_line: - match = re.search(r"(titan-[a-z0-9]+)", hotspot_line) - if match: - hotspot_node = match.group(1) - if not hotspot_node: - for fact in metric_facts: - match = re.search(r"hottest_\w+_node: (?P[^\s\[]+)", fact) - if match: - hotspot_node = match.group("node") - break - if "namespace" in lowered_q and hotspot_node: - for line in summary_lines: - if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line: - namespace_line = line - break - if not namespace_line: - for line in summary_lines: - if line.startswith("node_pods_top:") and hotspot_node in line: - namespace_line = line - break - if "namespace" in lowered_q and not namespace_line: - for fact in metric_facts: - if fact.startswith("node_namespaces_top:"): - namespace_line = fact - break - if not namespace_line: - for fact in metric_facts: - if fact.startswith("node_pods_top:"): - namespace_line = fact - break - if hotspot_line: - reply = f"Current hotspots: {hotspot_line}." - elif hotspot_node: - reply = f"Hotspot node: {hotspot_node}." - if namespace_line and reply: - reply = f"{reply} {namespace_line}." - if reply: - hotspot_override = True - - if classify.get("question_type") in {"metric", "diagnostic"} and metric_facts and not hotspot_override: - reply = _metric_fact_guard(reply, metric_facts, keyword_tokens) - - if classify.get("question_type") in {"metric", "diagnostic"}: - lowered_q = f"{question} {normalized}".lower() - if any(tok in lowered_q for tok in ("how many", "count", "number of")) and any( - tok in lowered_q for tok in ("jetson", "rpi4", "rpi5", "amd64", "arm64", "rpi") - ): - hw_line = next((line for line in summary_lines if line.startswith("hardware:")), None) - hw_nodes_line = next((line for line in summary_lines if line.startswith("hardware_nodes:")), None) - if hw_line: - def _find_value(key: str, line: str) -> str | None: - match = re.search(rf"{re.escape(key)}=([^;|]+)", line) - return match.group(1).strip() if match else None - - target = None - if "jetson" in lowered_q: - target = "jetson" - elif "rpi5" in lowered_q: - target = "rpi5" - elif "rpi4" in lowered_q: - target = "rpi4" - elif "amd64" in lowered_q: - target = "amd64" - elif "arm64" in lowered_q: - target = "arm64" - elif "rpi" in lowered_q: - target = "rpi" - if target: - count = _find_value(target, hw_line) - nodes = _find_value(target, hw_nodes_line or "") - if count: - if nodes and "(" not in count: - reply = f"From the latest snapshot: {target}={count} ({nodes})." - else: - reply = f"From the latest snapshot: {target}={count}." - if any(tok in lowered_q for tok in ("mix", "breakdown", "composition", "hardware mix", "hardware stack")) and any( - tok in lowered_q for tok in ("jetson", "rpi4", "rpi5", "amd64", "arm64", "hardware") - ): - hw_line = next((line for line in summary_lines if line.startswith("hardware:")), None) - hw_nodes_line = next((line for line in summary_lines if line.startswith("hardware_nodes:")), None) - if hw_line: - if hw_nodes_line: - reply = f"From the latest snapshot: {hw_line}; {hw_nodes_line}." - else: - reply = f"From the latest snapshot: {hw_line}." - if "node" in lowered_q and any(tok in lowered_q for tok in ("how many", "count", "total")): - nodes_line = next((line for line in summary_lines if line.startswith("nodes:")), None) - if nodes_line: - reply = f"From the latest snapshot: {nodes_line}." - if "pod" in lowered_q and "node" in lowered_q and any(tok in lowered_q for tok in ("most", "highest", "max")): - pods_line = next( - (line for line in summary_lines if line.startswith("node_pods_max:") or line.startswith("node_pods_top:")), - None, - ) - if pods_line: - reply = f"From the latest snapshot: {pods_line}." - if "load_index" in lowered_q or "load index" in lowered_q: - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - node_load = summary.get("node_load") if isinstance(summary.get("node_load"), list) else [] - best = None - for item in node_load: - if not isinstance(item, dict): - continue - node = item.get("node") - load = item.get("load_index") - try: - numeric = float(load) - except (TypeError, ValueError): - continue - if best is None or numeric > best["load_index"]: - best = {"node": node, "load_index": numeric} - if best and best.get("node") is not None: - reply = ( - "From the latest snapshot: hottest_load_index_node: " - f"{best['node']} load_index={best['load_index']}." - ) - if "workload" in lowered_q and any(tok in lowered_q for tok in ("not ready", "not-ready", "unready")): - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - health = summary.get("workloads_health") if isinstance(summary.get("workloads_health"), dict) else {} - items: list[dict[str, Any]] = [] - for key in ("deployments", "statefulsets", "daemonsets"): - entry = health.get(key) if isinstance(health.get(key), dict) else {} - for item in entry.get("items") or []: - if not isinstance(item, dict): - continue - items.append( - { - "kind": key[:-1], - "namespace": item.get("namespace") or "", - "name": item.get("name") or "", - "desired": item.get("desired"), - "ready": item.get("ready"), - } - ) - if items: - items = sorted(items, key=lambda item: (item.get("namespace") or "", item.get("name") or "")) - entries = [ - f"{item.get('namespace','?')}/{item.get('name','?')} {item.get('kind','?')} ready={item.get('ready')} desired={item.get('desired')}" - for item in items[:3] - ] - if entries: - reply = f"From the latest snapshot: workloads_not_ready: {', '.join(entries)}." - else: - reply = "From the latest snapshot: workloads_not_ready: none." - if "pod" in lowered_q and ("waiting" in lowered_q or "wait" in lowered_q) and "reason" in lowered_q: - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - pod_issues = summary.get("pod_issues") if isinstance(summary.get("pod_issues"), dict) else {} - waiting = pod_issues.get("waiting_reasons") if isinstance(pod_issues.get("waiting_reasons"), dict) else {} - if waiting: - top = sorted(waiting.items(), key=lambda item: (-item[1], item[0]))[:3] - items = [f"{reason}={count}" for reason, count in top] - reply = f"From the latest snapshot: pod_waiting_reasons: {'; '.join(items)}." - else: - reply = "From the latest snapshot: pod_waiting_reasons: none." - if "pvc" in lowered_q and any(tok in lowered_q for tok in ("usage", "used", "percent", "80", "full")): - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - pvc_usage = summary.get("pvc_usage_top") if isinstance(summary.get("pvc_usage_top"), list) else [] - over = [] - for item in pvc_usage: - if not isinstance(item, dict): - continue - used = item.get("used_percent") - if used is None: - continue - try: - used_val = float(used) - except (TypeError, ValueError): - continue - if used_val >= 80: - name = item.get("name") or item.get("pvc") - if name: - over.append(f"{name}={used_val:.2f}%") - if over: - reply = f"From the latest snapshot: pvc_usage>=80%: {', '.join(over)}." - else: - reply = "From the latest snapshot: pvc_usage>=80%: none." - if "pressure" in lowered_q: - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - pressure_nodes = summary.get("pressure_nodes") if isinstance(summary.get("pressure_nodes"), dict) else {} - entries = [f"{key}={value}" for key, value in pressure_nodes.items() if value] - if entries: - reply = f"From the latest snapshot: pressure_nodes: {', '.join(entries)}." - if _has_token(lowered_q, "io") and ("node" in lowered_q or "nodes" in lowered_q): - io_facts = _extract_hottest_facts(summary_lines, lowered_q) - io_line = next((fact for fact in io_facts if fact.startswith("hottest_io_node")), None) - if io_line: - reply = f"From the latest snapshot: {io_line}." - if "namespace" in lowered_q and "pod" in lowered_q: - ns_line = None - for line in summary_lines: - if line.startswith("namespaces_top:"): - ns_line = line - break - cpu_line = None - if any(tok in lowered_q for tok in ("cpu", "hottest", "highest cpu", "highest")): - cpu_facts = _extract_hottest_facts(summary_lines, lowered_q) - cpu_line = next((fact for fact in cpu_facts if fact.startswith("hottest_cpu_node")), None) - if ns_line: - if cpu_line: - reply = f"From the latest snapshot: {cpu_line}; {ns_line}." - else: - reply = f"From the latest snapshot: {ns_line}." - # do not fall through to other overrides - if ("hotspot" in lowered_q or "hottest" in lowered_q) and "namespace" in lowered_q and "node_namespaces_top:" not in reply: - hotspot_node = None - match = re.search(r"(titan-[a-z0-9]+)", reply) - if match: - hotspot_node = match.group(1) - if not hotspot_node: - for fact in metric_facts: - match = re.search(r"hottest_\w+_node: (?P[^\s\[]+)", fact) - if match: - hotspot_node = match.group("node") - break - if hotspot_node: - node_line = next( - (line for line in summary_lines if line.startswith("node_namespaces_top:") and f"{hotspot_node} " in line), - None, - ) - if not node_line: - node_line = next( - (line for line in summary_lines if line.startswith("node_pods_top:") and hotspot_node in line), - None, - ) - if node_line: - reply = f"Hotspot node: {hotspot_node}. {node_line}." - if "postgres" in lowered_q and any(tok in lowered_q for tok in ("connection", "connections", "db")): - pg = None - if isinstance(snapshot_used, dict): - summary = snapshot_used.get("summary") if isinstance(snapshot_used.get("summary"), dict) else {} - if summary: - top = summary.get("top") if isinstance(summary.get("top"), dict) else {} - pg = top.get("postgres") if isinstance(top.get("postgres"), dict) else None - if not pg: - metrics = snapshot_used.get("metrics") if isinstance(snapshot_used.get("metrics"), dict) else {} - pg = metrics.get("postgres_connections") if isinstance(metrics.get("postgres_connections"), dict) else None - if isinstance(pg, dict): - used = pg.get("used") - max_conn = pg.get("max") - hottest = pg.get("hottest_db") or pg.get("hottest") - if used is not None and max_conn is not None: - if hottest: - reply = f"Postgres connections: used={used}, max={max_conn}; hottest_db={hottest}." - else: - reply = f"Postgres connections: used={used}, max={max_conn}." - else: - reply = "Postgres connection data is not fully available in the snapshot." - - if classify.get("question_type") in {"metric", "diagnostic"}: - lowered_q = f"{question} {normalized}".lower() - if ( - focus_entity != "node" - and any(tok in lowered_q for tok in ("hardware", "class", "type", "rpi", "jetson", "amd64", "arm64")) - and any(tok in lowered_q for tok in ("average", "avg", "mean", "per hardware", "by hardware", "typical")) - ): - hw_top = None - for line in summary_lines: - if "hardware_usage_top:" in line: - parts = [seg.strip() for seg in line.split(" | ") if seg.strip().startswith("hardware_usage_top:")] - if parts: - hw_top = parts[0] - break - if hw_top: - reply = f"From the latest snapshot: {hw_top}." reply = await self._dedup_reply(reply, plan, call_llm, tag="dedup") @@ -1363,28 +1022,6 @@ def _select_chunks( selected: list[dict[str, Any]] = [] head = chunks[0] selected.append(head) - keyword_hits: list[dict[str, Any]] = [] - raw_keywords = [kw.lower() for kw in (keywords or []) if kw] - focused = _focused_keywords(keywords or []) - if focused: - lowered = [kw.lower() for kw in focused if kw] - for item in ranked: - text = item.get("text", "").lower() - if any(kw in text for kw in lowered): - keyword_hits.append(item) - if raw_keywords: - for item in ranked: - if len(keyword_hits) >= plan.chunk_top: - break - text = item.get("text", "").lower() - if any(kw in text for kw in raw_keywords): - keyword_hits.append(item) - for item in keyword_hits: - if len(selected) >= plan.chunk_top: - break - if item in selected: - continue - selected.append(item) for item in ranked: if len(selected) >= plan.chunk_top: break @@ -1433,33 +1070,6 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]: return [line for line in text.splitlines() if line.strip()] -def _key_fact_lines(lines: list[str], keywords: list[str] | None, limit: int = 6) -> list[str]: - if not lines or not keywords: - return [] - lowered = [kw.lower() for kw in keywords if kw] - if not lowered: - return [] - focused = _focused_keywords(lowered) - primary = focused or lowered - matches: list[str] = [] - for line in lines: - line_lower = line.lower() - if any(kw in line_lower for kw in primary): - matches.append(line) - if len(matches) >= limit: - break - if len(matches) < limit and focused: - for line in lines: - if len(matches) >= limit: - break - if line in matches: - continue - line_lower = line.lower() - if any(kw in line_lower for kw in lowered): - matches.append(line) - return matches - - def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]: seen = set() merged: list[str] = [] @@ -1541,172 +1151,6 @@ def _hotspot_evidence(summary: dict[str, Any]) -> list[str]: return lines -def _extract_hottest_facts(lines: list[str], question: str) -> list[str]: - if not lines: - return [] - lowered = question.lower() - if not any( - term in lowered - for term in ("hottest", "hot", "highest", "lowest", "most", "top", "peak", "loaded", "load", "busy") - ): - return [] - if "node" not in lowered and "nodes" not in lowered: - return [] - line = "" - for item in lines: - if item.lower().startswith("hottest:"): - line = item - break - if " | " in item: - for seg in [seg.strip() for seg in item.split(" | ")]: - if seg.lower().startswith("hottest:"): - line = seg - break - if line: - break - if not line: - return [] - facts = _expand_hottest_line(line) - if not facts: - return [] - wanted = [] - if _has_token(lowered, "io") and ("disk" in lowered or "storage" in lowered): - return [fact for fact in facts if fact.startswith("hottest_io_node")] - if _has_token(lowered, "cpu") or "processor" in lowered: - wanted.append("hottest_cpu_node") - if _has_token(lowered, "ram") or "memory" in lowered: - wanted.append("hottest_ram_node") - if _has_token(lowered, "net") or "network" in lowered or "throughput" in lowered: - wanted.append("hottest_net_node") - if _has_token(lowered, "io"): - wanted.append("hottest_io_node") - if _has_token(lowered, "disk") or "storage" in lowered: - wanted.append("hottest_disk_node") - if not wanted: - return facts - return [fact for fact in facts if any(label in fact for label in wanted)] or facts - - -def _extract_hardware_usage_facts(lines: list[str], question: str) -> list[str]: - if not lines: - return [] - lowered = question.lower() - if "hardware" not in lowered: - return [] - if not any(term in lowered for term in ("average", "avg", "mean", "per hardware", "by hardware", "typical")): - return [] - avg_line = None - top_line = None - for line in lines: - segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line] - for seg in segments: - if seg.startswith("hardware_usage_avg:"): - avg_line = seg - elif seg.startswith("hardware_usage_top:"): - top_line = seg - if not avg_line and not top_line: - return [] - wants_top = any(term in lowered for term in ("highest", "lowest", "most", "least", "top", "worst", "best")) - if wants_top and top_line: - return [top_line] - facts: list[str] = [] - if avg_line: - facts.append(avg_line) - if top_line: - facts.append(top_line) - return facts - - -def _metric_candidate_lines(lines: list[str], keywords: list[str] | None, limit: int = 40) -> list[str]: - if not lines: - return [] - lowered = [kw.lower() for kw in (keywords or []) if kw] - prefer_node = any("node" in kw for kw in lowered) or "hottest" in lowered - metric_tokens = { - "cpu", - "ram", - "memory", - "net", - "network", - "io", - "disk", - "load", - "usage", - "utilization", - "hottest", - "p95", - "percent", - "pressure", - "pod", - "pods", - "namespace", - "anomaly", - "anomalies", - "pvc", - "storage", - "pressure_summary", - "pressure_nodes", - "diskpressure", - "storagepressure", - } - candidates: list[str] = [] - expanded: list[str] = [] - for line in lines: - if " | " in line: - expanded.extend([seg.strip() for seg in line.split(" | ") if seg.strip()]) - expanded.append(line) - for line in expanded: - if line.lower().startswith("hottest:"): - candidates.extend(_expand_hottest_line(line)) - break - for line in expanded: - line_lower = line.lower() - if line_lower.startswith("lexicon_") or line_lower.startswith("units:"): - continue - if prefer_node and "pod_" in line_lower: - continue - if "hottest:" in line_lower: - candidates.append(line) - continue - if lowered and any(kw in line_lower for kw in lowered): - candidates.append(line) - continue - if any(token in line_lower for token in metric_tokens) and re.search(r"\d", line_lower): - candidates.append(line) - continue - return candidates[:limit] - - -def _metric_prefixes_from_lines(lines: list[str]) -> list[str]: - if not lines: - return [] - prefixes: list[str] = [] - for line in lines: - segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line] - for seg in segments: - match = re.match(r"^([a-z0-9_]+):", seg) - if match: - prefix = match.group(1) - if prefix not in prefixes: - prefixes.append(prefix) - return prefixes - - -def _lines_for_prefixes(lines: list[str], prefixes: list[str]) -> list[str]: - if not lines or not prefixes: - return [] - wanted = set(prefixes) - selected: list[str] = [] - for line in lines: - segments = [seg.strip() for seg in line.split(" | ")] if " | " in line else [line] - for seg in segments: - match = re.match(r"^([a-z0-9_]+):", seg) - if match and match.group(1) in wanted: - if seg not in selected: - selected.append(seg) - return selected - - async def _select_best_candidate( call_llm: Callable[..., Any], question: str, @@ -1731,42 +1175,45 @@ async def _select_best_candidate( return 0 -async def _select_metric_prefixes( +def _dedupe_lines(lines: list[str], limit: int | None = None) -> list[str]: + seen: set[str] = set() + cleaned: list[str] = [] + for line in lines: + value = (line or "").strip() + if not value or value in seen: + continue + if value.lower().startswith("lexicon_") or value.lower().startswith("units:"): + continue + cleaned.append(value) + seen.add(value) + if limit and len(cleaned) >= limit: + break + return cleaned + + +def _collect_fact_candidates(selected: list[dict[str, Any]], limit: int) -> list[str]: + lines: list[str] = [] + for chunk in selected: + text = chunk.get("text") if isinstance(chunk, dict) else None + if not isinstance(text, str): + continue + lines.extend([line for line in text.splitlines() if line.strip()]) + return _dedupe_lines(lines, limit=limit) + + +async def _select_best_list( call_llm: Callable[..., Any], question: str, - prefixes: list[str], + candidates: list[list[str]], plan: ModePlan, - attempts: int, + tag: str, ) -> list[str]: - if not prefixes: - return [] - prompt = ( - prompts.METRIC_PREFIX_PROMPT - + "\nQuestion: " - + question - + "\nAvailablePrefixes:\n" - + ", ".join(prefixes) - ) - candidates: list[list[str]] = [] - for _ in range(max(attempts, 1)): - raw = await call_llm(prompts.METRIC_PREFIX_SYSTEM, prompt, model=plan.fast_model, tag="metric_prefix") - data = _parse_json_block(raw, fallback={}) - picked = data.get("prefixes") if isinstance(data, dict) else None - if not isinstance(picked, list): - continue - cleaned: list[str] = [] - allowed = set(prefixes) - for item in picked: - if isinstance(item, str) and item in allowed and item not in cleaned: - cleaned.append(item) - if cleaned: - candidates.append(cleaned) if not candidates: return [] if len(candidates) == 1: return candidates[0] render = ["; ".join(items) for items in candidates] - best_idx = await _select_best_candidate(call_llm, question, render, plan, "metric_prefix_select") + best_idx = await _select_best_candidate(call_llm, question, render, plan, tag) chosen = candidates[best_idx] if 0 <= best_idx < len(candidates) else candidates[0] if not chosen: merged: list[str] = [] @@ -1775,56 +1222,135 @@ async def _select_metric_prefixes( if item not in merged: merged.append(item) chosen = merged - return chosen[:8] + return chosen -async def _select_metric_facts( +async def _extract_fact_types( + call_llm: Callable[..., Any], + question: str, + keywords: list[str], + plan: ModePlan, +) -> list[str]: + prompt = prompts.FACT_TYPES_PROMPT + "\nQuestion: " + question + if keywords: + prompt += "\nKeywords: " + ", ".join(keywords) + candidates: list[list[str]] = [] + attempts = max(plan.metric_retries, 1) + for _ in range(attempts): + raw = await call_llm(prompts.FACT_TYPES_SYSTEM, prompt, model=plan.fast_model, tag="fact_types") + data = _parse_json_block(raw, fallback={}) + items = data.get("fact_types") if isinstance(data, dict) else None + if not isinstance(items, list): + continue + cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=10) + if cleaned: + candidates.append(cleaned) + chosen = await _select_best_list(call_llm, question, candidates, plan, "fact_types_select") + return chosen[:10] + + +async def _derive_signals( + call_llm: Callable[..., Any], + question: str, + fact_types: list[str], + plan: ModePlan, +) -> list[str]: + if not fact_types: + return [] + prompt = prompts.SIGNAL_PROMPT.format(question=question, fact_types="; ".join(fact_types)) + candidates: list[list[str]] = [] + attempts = max(plan.metric_retries, 1) + for _ in range(attempts): + raw = await call_llm(prompts.SIGNAL_SYSTEM, prompt, model=plan.fast_model, tag="signals") + data = _parse_json_block(raw, fallback={}) + items = data.get("signals") if isinstance(data, dict) else None + if not isinstance(items, list): + continue + cleaned = _dedupe_lines([str(item) for item in items if isinstance(item, (str, int, float))], limit=12) + if cleaned: + candidates.append(cleaned) + chosen = await _select_best_list(call_llm, question, candidates, plan, "signals_select") + return chosen[:12] + + +async def _scan_chunk_for_signals( + call_llm: Callable[..., Any], + question: str, + signals: list[str], + chunk_lines: list[str], + plan: ModePlan, +) -> list[str]: + if not signals or not chunk_lines: + return [] + prompt = prompts.CHUNK_SCAN_PROMPT.format( + signals="; ".join(signals), + lines="\n".join(chunk_lines), + ) + attempts = max(1, min(plan.metric_retries, 2)) + candidates: list[list[str]] = [] + for _ in range(attempts): + raw = await call_llm(prompts.CHUNK_SCAN_SYSTEM, prompt, model=plan.fast_model, tag="chunk_scan") + data = _parse_json_block(raw, fallback={}) + items = data.get("lines") if isinstance(data, dict) else None + if not isinstance(items, list): + continue + cleaned = [line for line in chunk_lines if line in items] + cleaned = _dedupe_lines(cleaned, limit=15) + if cleaned: + candidates.append(cleaned) + chosen = await _select_best_list(call_llm, question, candidates, plan, "chunk_scan_select") + return chosen[:15] + + +async def _prune_metric_candidates( call_llm: Callable[..., Any], question: str, candidates: list[str], plan: ModePlan, - max_lines: int = 2, + attempts: int, ) -> list[str]: if not candidates: return [] - prompt = ( - prompts.FACT_SELECT_PROMPT.format(max_lines=max_lines) - + "\nQuestion: " - + question - + "\nCandidates:\n" - + "\n".join([f"- {line}" for line in candidates]) - ) - raw = await call_llm(prompts.FACT_SELECT_SYSTEM, prompt, model=plan.fast_model, tag="fact_select") - data = _parse_json_block(raw, fallback={}) - lines = data.get("lines") if isinstance(data, dict) else None - if not isinstance(lines, list): + prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=6) + picks: list[list[str]] = [] + for _ in range(max(attempts, 1)): + raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_prune") + data = _parse_json_block(raw, fallback={}) + items = data.get("lines") if isinstance(data, dict) else None + if not isinstance(items, list): + continue + cleaned = [line for line in candidates if line in items] + cleaned = _dedupe_lines(cleaned, limit=6) + if cleaned: + picks.append(cleaned) + chosen = await _select_best_list(call_llm, question, picks, plan, "fact_prune_select") + return chosen[:6] + + +async def _select_fact_lines( + call_llm: Callable[..., Any], + question: str, + candidates: list[str], + plan: ModePlan, + max_lines: int, +) -> list[str]: + if not candidates: return [] - cleaned = [] - allowed = set(candidates) - for line in lines: - if isinstance(line, str) and line in allowed and line not in cleaned: - cleaned.append(line) - if len(cleaned) >= max_lines: - break - return cleaned - - -def _metric_fact_guard(reply: str, metric_facts: list[str], keywords: list[str]) -> str: - if not metric_facts: - return reply - best_line = None - lowered_keywords = [kw.lower() for kw in keywords if kw] - for line in metric_facts: - line_lower = line.lower() - if any(kw in line_lower for kw in lowered_keywords): - best_line = line - break - best_line = best_line or metric_facts[0] - reply_numbers = set(re.findall(r"\d+(?:\.\d+)?", reply)) - fact_numbers = set(re.findall(r"\d+(?:\.\d+)?", " ".join(metric_facts))) - if not reply_numbers or (fact_numbers and not (reply_numbers & fact_numbers)): - return f"From the latest snapshot: {best_line}." - return reply + prompt = prompts.FACT_PRUNE_PROMPT.format(question=question, candidates="\n".join(candidates), max_lines=max_lines) + picks: list[list[str]] = [] + attempts = max(plan.metric_retries, 1) + for _ in range(attempts): + raw = await call_llm(prompts.FACT_PRUNE_SYSTEM, prompt, model=plan.fast_model, tag="fact_select") + data = _parse_json_block(raw, fallback={}) + items = data.get("lines") if isinstance(data, dict) else None + if not isinstance(items, list): + continue + cleaned = [line for line in candidates if line in items] + cleaned = _dedupe_lines(cleaned, limit=max_lines) + if cleaned: + picks.append(cleaned) + chosen = await _select_best_list(call_llm, question, picks, plan, "fact_select_best") + return chosen[:max_lines] def _strip_unknown_entities(reply: str, unknown_nodes: list[str], unknown_namespaces: list[str]) -> str: @@ -2019,41 +1545,6 @@ def _extract_keywords( return list(dict.fromkeys(tokens))[:12] -def _focused_keywords(tokens: list[str]) -> list[str]: - generic = { - "atlas", - "cluster", - "node", - "nodes", - "pod", - "pods", - "namespace", - "namespaces", - "k8s", - "kubernetes", - "service", - "services", - "workload", - "workloads", - } - scored: list[tuple[int, str]] = [] - for token in tokens: - if not token or token in generic: - continue - score = 1 - if any(ch.isdigit() for ch in token): - score += 2 - if "-" in token: - score += 1 - if len(token) >= 6: - score += 1 - scored.append((score, token)) - if not scored: - return [token for token in tokens if token not in generic][:6] - scored.sort(key=lambda item: (-item[0], item[1])) - return [token for _, token in scored][:6] - - def _allowed_nodes(summary: dict[str, Any]) -> list[str]: hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {} if hardware: diff --git a/atlasbot/llm/prompts.py b/atlasbot/llm/prompts.py index 211df68..383f21a 100644 --- a/atlasbot/llm/prompts.py +++ b/atlasbot/llm/prompts.py @@ -243,12 +243,58 @@ FACT_SELECT_SYSTEM = ( FACT_SELECT_PROMPT = ( "Pick up to {max_lines} lines from Candidates that best answer the question. " - "If the question asks for highest/hottest and Candidates include a line starting with 'hottest:', you must include that line. " - "If Candidates include hottest_*_node lines, prefer those for node hottest questions. " - "If the question mentions nodes and a 'hottest:' line exists, prefer node-level facts over pod-level lines. " - "Avoid pod_* lines unless the question explicitly mentions pods. " - "Exclude lexicon/definition lines; choose lines with concrete numeric values. " - "Return JSON with field: lines (list of strings). If none apply, return {{\"lines\": []}}." + "Prefer lines with concrete numeric values or explicit identifiers. " + "Avoid purely definitional lines unless the question asks for definitions. " + "Return JSON with field: lines (list of strings). If none apply, return {\"lines\": []}." +) + +FACT_TYPES_SYSTEM = ( + CLUSTER_SYSTEM + + " Identify the minimal fact types needed from the snapshot to answer the question. " + + "Return JSON only." +) + +FACT_TYPES_PROMPT = ( + "Return JSON with field: fact_types (list of short noun phrases). " + "Keep each entry short and concrete (e.g., \"node pressure flags\", \"hardware class counts\", \"postgres connections\")." +) + +SIGNAL_SYSTEM = ( + CLUSTER_SYSTEM + + " Translate fact types into signals or cues likely present in snapshot lines. " + + "Return JSON only." +) + +SIGNAL_PROMPT = ( + "Question: {question}\nFactTypes: {fact_types}\n" + "Return JSON with field: signals (list). " + "Signals should be brief phrases or tokens that might appear in snapshot lines." +) + +CHUNK_SCAN_SYSTEM = ( + CLUSTER_SYSTEM + + " Select exact lines from the chunk that match the needed signals. " + + "Return JSON only." +) + +CHUNK_SCAN_PROMPT = ( + "Signals: {signals}\n" + "Lines:\n{lines}\n" + "Return JSON with field: lines (list of exact lines from Lines)." +) + +FACT_PRUNE_SYSTEM = ( + CLUSTER_SYSTEM + + " Prune candidate lines to the smallest set that answers the question. " + + "Return JSON only." +) + +FACT_PRUNE_PROMPT = ( + "Question: {question}\n" + "Candidates:\n{candidates}\n" + "Return JSON with field: lines (list). " + "Pick up to {max_lines} lines that best answer the question. " + "Return an empty list if none apply." ) SELECT_CLAIMS_PROMPT = (