From e97aaafed94dbd7bb951cfe5faae06cf43d4211f Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 27 Jan 2026 21:52:07 -0300 Subject: [PATCH] atlasbot: refine open-ended reasoning --- services/comms/atlasbot-deployment.yaml | 2 +- services/comms/scripts/atlasbot/bot.py | 410 ++++++++++++++++++++++-- 2 files changed, 378 insertions(+), 34 deletions(-) diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index 4e27b5a..5e5bc05 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-73 + checksum/atlasbot-configmap: manual-atlasbot-74 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index b9bc0e6..0176293 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -138,6 +138,7 @@ CLUSTER_HINT_WORDS = { "cluster", "k8s", "kubernetes", + "health", "node", "nodes", "hardware", @@ -211,6 +212,7 @@ _OVERVIEW_HINT_WORDS = { "explain", "tell me about", "what do you know", + "health", } _OLLAMA_LOCK = threading.Lock() @@ -1220,6 +1222,8 @@ def snapshot_metric_answer( q = normalize_query(prompt) metric = _detect_metric(q) op = _detect_operation(q) + if op == "list" and metric in {"cpu", "ram", "net", "io"}: + op = "top" include_hw, exclude_hw = _detect_hardware_filters(q) nodes_in_query = _extract_titan_nodes(q) only_workers = "worker" in q or "workers" in q @@ -1340,6 +1344,8 @@ def structured_answer( tokens = _tokens(q) op = _detect_operation(q) metric = _detect_metric(q) + if op == "list" and metric in {"cpu", "ram", "net", "io"}: + op = "top" entity = _detect_entity(q) include_hw, exclude_hw = _detect_hardware_filters(q) nodes_in_query = _extract_titan_nodes(q) @@ -1646,6 +1652,37 @@ def _is_insight_query(query: str) -> bool: return False +_FOLLOWUP_HINTS = ( + "what about", + "how about", + "and what", + "and how", + "tell me more", + "anything else", + "something else", + "that one", + "those", + "them", + "it", + "this", + "that", + "else", + "another", + "again", +) + + +def _is_followup_query(query: str) -> bool: + q = normalize_query(query) + if not q: + return False + if any(hint in q for hint in _FOLLOWUP_HINTS): + return True + if len(q.split()) <= 3 and not any(word in q for word in _INSIGHT_HINT_WORDS): + return True + return False + + def _is_subjective_query(query: str) -> bool: q = normalize_query(query) if not q: @@ -2541,6 +2578,12 @@ def _fact_pack_lines( if not trimmed or trimmed.lower().startswith("facts"): continue lines.append(trimmed) + if _knowledge_intent(prompt) or _doc_intent(prompt) or _is_overview_query(prompt): + kb_titles = kb_retrieve_titles(prompt, limit=4) + if kb_titles: + for kb_line in kb_titles.splitlines(): + if kb_line.strip(): + lines.append(kb_line.strip()) return lines @@ -2549,12 +2592,194 @@ def _fact_pack_text(lines: list[str]) -> str: return "Fact pack:\n" + "\n".join(labeled) +_ALLOWED_INSIGHT_TAGS = { + "availability", + "architecture", + "database", + "hardware", + "inventory", + "node_detail", + "os", + "pods", + "utilization", + "workloads", + "workers", +} + +_DYNAMIC_TAGS = {"availability", "database", "pods", "utilization", "workloads"} +_INVENTORY_TAGS = {"hardware", "architecture", "inventory", "workers", "node_detail", "os"} + + +def _fact_line_tags(line: str) -> set[str]: + text = (line or "").lower() + tags: set[str] = set() + if any(key in text for key in ("nodes_total", "ready", "not_ready", "workers_ready", "workers_not_ready")): + tags.add("availability") + if "nodes_by_arch" in text or "arch " in text or "architecture" in text: + tags.add("architecture") + if any(key in text for key in ("rpi", "jetson", "amd64", "arm64", "non_raspberry_pi")): + tags.update({"hardware", "inventory"}) + if "control_plane_nodes" in text or "worker_nodes" in text: + tags.add("inventory") + if any(key in text for key in ("hottest_", "node_usage", "cpu=", "ram=", "net=", "io=")): + tags.add("utilization") + if "postgres_" in text or "postgres connections" in text: + tags.add("database") + if "pods_" in text or "pod phases" in text: + tags.add("pods") + if "workloads" in text or "primary_node" in text: + tags.add("workloads") + if "node_details" in text: + tags.add("node_detail") + if "os mix" in text or "os" in text: + tags.add("os") + return tags & _ALLOWED_INSIGHT_TAGS + + +def _fact_pack_meta(lines: list[str]) -> dict[str, dict[str, Any]]: + meta: dict[str, dict[str, Any]] = {} + for idx, line in enumerate(lines): + fid = f"F{idx + 1}" + tags = sorted(_fact_line_tags(line)) + meta[fid] = {"tags": tags} + return meta + + +def _history_tags(history_lines: list[str]) -> set[str]: + tags: set[str] = set() + for line in history_lines[-6:]: + tags.update(_fact_line_tags(line)) + return tags & _ALLOWED_INSIGHT_TAGS + + +def _seed_insights( + lines: list[str], + fact_meta: dict[str, dict[str, Any]], + *, + limit: int = 6, +) -> list[dict[str, Any]]: + priority = [ + "utilization", + "database", + "pods", + "workloads", + "availability", + "hardware", + "architecture", + "inventory", + ] + seeds: list[dict[str, Any]] = [] + used_tags: set[str] = set() + for tag in priority: + for idx, line in enumerate(lines): + fid = f"F{idx + 1}" + tags = set(fact_meta.get(fid, {}).get("tags") or []) + if tag not in tags or fid in {s["fact_ids"][0] for s in seeds}: + continue + summary = line.lstrip("- ").strip() + seeds.append( + { + "summary": summary, + "fact_ids": [fid], + "relevance": 0.5, + "novelty": 0.5, + "rationale": "seeded from fact pack", + "tags": sorted(tags), + } + ) + used_tags.update(tags) + if len(seeds) >= limit: + return seeds + return seeds + + +def _insight_tags(insight: dict[str, Any], fact_meta: dict[str, dict[str, Any]]) -> set[str]: + tags: set[str] = set() + for fid in insight.get("fact_ids") if isinstance(insight.get("fact_ids"), list) else []: + tags.update(fact_meta.get(fid, {}).get("tags") or []) + raw_tags = insight.get("tags") if isinstance(insight.get("tags"), list) else [] + tags.update(t for t in raw_tags if isinstance(t, str)) + summary = insight.get("summary") or insight.get("claim") or "" + if isinstance(summary, str): + tags.update(_fact_line_tags(summary)) + return tags & _ALLOWED_INSIGHT_TAGS + + +def _insight_score( + insight: dict[str, Any], + *, + preference: str, + prefer_tags: set[str], + avoid_tags: set[str], + history_tags: set[str], + fact_meta: dict[str, dict[str, Any]], +) -> float: + base = _score_insight(insight, preference) + tags = _insight_tags(insight, fact_meta) + if prefer_tags and tags: + base += 0.15 * len(tags & prefer_tags) + if avoid_tags and tags: + base -= 0.12 * len(tags & avoid_tags) + if history_tags and tags: + base -= 0.08 * len(tags & history_tags) + if preference == "novelty": + if tags & _DYNAMIC_TAGS: + base += 0.12 + if tags & _INVENTORY_TAGS: + base -= 0.08 + return base + + +def _select_diverse_insights( + candidates: list[dict[str, Any]], + *, + preference: str, + prefer_tags: set[str], + avoid_tags: set[str], + history_tags: set[str], + fact_meta: dict[str, dict[str, Any]], + count: int = 2, +) -> list[dict[str, Any]]: + scored: list[tuple[float, dict[str, Any]]] = [] + for item in candidates: + tags = _insight_tags(item, fact_meta) + item["tags"] = sorted(tags) + score = _insight_score( + item, + preference=preference, + prefer_tags=prefer_tags, + avoid_tags=avoid_tags, + history_tags=history_tags, + fact_meta=fact_meta, + ) + scored.append((score, item)) + scored.sort(key=lambda pair: pair[0], reverse=True) + picked: list[dict[str, Any]] = [] + used_tags: set[str] = set() + for _, item in scored: + tags = set(item.get("tags") or []) + if used_tags and tags and tags <= used_tags and len(picked) < count: + continue + picked.append(item) + used_tags.update(tags) + if len(picked) >= count: + break + if len(picked) < count: + for _, item in scored: + if item in picked: + continue + picked.append(item) + if len(picked) >= count: + break + return picked + + def _open_ended_system() -> str: return ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Use ONLY the provided fact pack and recent chat as your evidence. " "You may draw light inferences if you label them as such. " - "Write concise, human sentences, not a list. " + "Write concise, human sentences with a helpful, calm tone (not a list). " "If the question is subjective, share a light opinion grounded in facts. " "If the question is ambiguous, pick a reasonable interpretation and state it briefly. " "Avoid repeating the exact same observation as the last response if possible. " @@ -2608,18 +2833,52 @@ def _open_ended_fast( *, fact_pack: str, history_lines: list[str], + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], + tags_available: set[str], + history_tags: set[str], state: ThoughtState | None = None, ) -> str: if state: - state.update("synthesizing", step=2) + state.update("planning", step=1) + analysis = _interpret_open_question( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + tags_available=tags_available, + avoid_tags=history_tags, + state=state, + ) + candidates = _select_insights( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + state=state or ThoughtState(), + analysis=analysis, + fact_lines=fact_lines, + fact_meta=fact_meta, + avoid_tags=history_tags, + ) + prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)} + selected = _select_diverse_insights( + candidates, + preference=analysis.get("preference", "balanced"), + prefer_tags=prefer_tags, + avoid_tags=history_tags, + history_tags=history_tags, + fact_meta=fact_meta, + count=2, + ) + if state: + state.update("synthesizing", step=3) synthesis_prompt = ( - "You are given a question and a fact pack. " - "Answer in 2-4 sentences, using only facts from the pack. " - "Pick one or two facts that best fit the question and explain why they matter. " - "If the question is subjective, add a light opinion grounded in those facts. " - "Do not list raw facts; speak naturally. " + "Use the question, fact pack, and selected insights to answer in 2-4 sentences. " + "Speak naturally, not as a list. " + "If the question is subjective, add a light opinion grounded in facts. " + "Avoid repeating the exact same observation as the most recent response if possible. " "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" - f"Question: {prompt}" + f"Question: {prompt}\n" + f"Selected: {json.dumps(selected, ensure_ascii=False)}" ) context = _append_history_context(fact_pack, history_lines) reply = _ollama_call_safe( @@ -2637,23 +2896,36 @@ def _interpret_open_question( *, fact_pack: str, history_lines: list[str], + tags_available: set[str], + avoid_tags: set[str], + state: ThoughtState | None = None, ) -> dict[str, Any]: + tags_list = ", ".join(sorted(tags_available)) if tags_available else "none" + avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none" prompt_text = ( "Analyze the question against the fact pack. " "Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\"," - "\"notes\":\"...\"}. " + "\"tags\":[\"...\"] ,\"notes\":\"...\"}. " + "If the question implies interesting/unique/unconventional/cool, set preference to novelty " + "and prefer dynamic tags (utilization/pods/database/availability) when possible. " + f"Use only these tags if relevant: {tags_list}. Avoid tags: {avoid_list}. " "Use only the fact pack." ) context = _append_history_context(fact_pack, history_lines) analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) if not isinstance(analysis, dict): - return {"focus": "cluster snapshot", "preference": "balanced", "notes": ""} + analysis = {"focus": "cluster snapshot", "preference": "balanced", "notes": "", "tags": []} preference = analysis.get("preference") or "balanced" if preference not in ("balanced", "novelty", "utilization", "stability", "risk"): preference = "balanced" analysis["preference"] = preference analysis.setdefault("focus", "cluster snapshot") analysis.setdefault("notes", "") + tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else [] + clean_tags = {t for t in tags if isinstance(t, str)} + analysis["tags"] = sorted(clean_tags & tags_available) + if state: + state.update("planning", step=1, note=str(analysis.get("focus") or "")) return analysis @@ -2663,27 +2935,41 @@ def _select_insights( fact_pack: str, history_lines: list[str], state: ThoughtState, + analysis: dict[str, Any], + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], + avoid_tags: set[str], ) -> list[dict[str, Any]]: + preferred_tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else [] + prefer_list = ", ".join(sorted({t for t in preferred_tags if isinstance(t, str)})) + avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none" + available_list = ", ".join(sorted({t for t in _ALLOWED_INSIGHT_TAGS})) insight_prompt = ( "From the fact pack, select 3-5 candidate insights that could answer the question. " "Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"]," - "\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\"}]}. " - "Use only the fact pack." + "\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\",\"tags\":[\"...\"]}]}. " + f"Available tags: {available_list}. Prefer tags: {prefer_list or 'none'}. Avoid tags: {avoid_list}. " + "Use only the fact pack and provided tags." ) state.update("drafting candidates", step=2) context = _append_history_context(fact_pack, history_lines) result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context) insights = result.get("insights") if isinstance(result, dict) else None if not isinstance(insights, list): - return [] + insights = [] cleaned: list[dict[str, Any]] = [] for item in insights: if not isinstance(item, dict): continue if not item.get("summary") or not item.get("fact_ids"): continue + tags = _insight_tags(item, fact_meta) + item["tags"] = sorted(tags) cleaned.append(item) state.update("drafting candidates", step=2, note=_candidate_note(item)) + seeds = _seed_insights(fact_lines, fact_meta) + for seed in seeds: + cleaned.append(seed) return cleaned @@ -2707,18 +2993,36 @@ def _open_ended_deep( fact_pack: str, fact_ids: set[str], history_lines: list[str], + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], + tags_available: set[str], + history_tags: set[str], state: ThoughtState | None = None, ) -> str: state = state or ThoughtState() if not fact_ids: return _ensure_scores("I don't have enough data to answer that.") - state.total_steps = 6 - state.update("planning", step=1) - analysis = _interpret_open_question(prompt, fact_pack=fact_pack, history_lines=history_lines) - state.update("planning", step=1, note=str(analysis.get("focus") or "")) + state.total_steps = 7 + analysis = _interpret_open_question( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + tags_available=tags_available, + avoid_tags=history_tags, + state=state, + ) - candidates = _select_insights(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state) - state.update("verifying", step=3) + candidates = _select_insights( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + state=state, + analysis=analysis, + fact_lines=fact_lines, + fact_meta=fact_meta, + avoid_tags=history_tags, + ) + state.update("verifying", step=3, note="scoring insights") filtered: list[dict[str, Any]] = [] for cand in candidates: cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else [] @@ -2729,9 +3033,17 @@ def _open_ended_deep( filtered = candidates preference = analysis.get("preference", "balanced") - ranked = sorted(filtered, key=lambda item: _score_insight(item, preference), reverse=True) - top = ranked[:2] - state.update("synthesizing", step=4) + prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)} + top = _select_diverse_insights( + filtered, + preference=preference, + prefer_tags=prefer_tags, + avoid_tags=history_tags, + history_tags=history_tags, + fact_meta=fact_meta, + count=2, + ) + state.update("synthesizing", step=4, note="composing response") synth_prompt = ( "Use the question, fact pack, and selected insights to craft a concise answer. " "Write 2-4 sentences. Explain why the selected insights stand out. " @@ -2740,6 +3052,7 @@ def _open_ended_deep( "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" f"Question: {prompt}\n" f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n" + f"Recent tags: {', '.join(sorted(history_tags)) if history_tags else 'none'}\n" f"Selected: {json.dumps(top, ensure_ascii=False)}" ) context = _append_history_context(fact_pack, history_lines) @@ -2750,7 +3063,7 @@ def _open_ended_deep( fallback="I don't have enough data to answer that.", system_override=_open_ended_system(), ) - state.update("done", step=6) + state.update("done", step=7) return _ensure_scores(reply) @@ -2769,9 +3082,31 @@ def open_ended_answer( return _ensure_scores("I don't have enough data to answer that.") fact_pack = _fact_pack_text(lines) fact_ids = {f"F{i+1}" for i in range(len(lines))} + fact_meta = _fact_pack_meta(lines) + tags_available = {tag for entry in fact_meta.values() for tag in entry.get("tags", [])} + history_tags = _history_tags(history_lines) if mode == "fast": - return _open_ended_fast(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state) - return _open_ended_deep(prompt, fact_pack=fact_pack, fact_ids=fact_ids, history_lines=history_lines, state=state) + return _open_ended_fast( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + fact_lines=lines, + fact_meta=fact_meta, + tags_available=tags_available, + history_tags=history_tags, + state=state, + ) + return _open_ended_deep( + prompt, + fact_pack=fact_pack, + fact_ids=fact_ids, + history_lines=history_lines, + fact_lines=lines, + fact_meta=fact_meta, + tags_available=tags_available, + history_tags=history_tags, + state=state, + ) def _non_cluster_reply(prompt: str) -> str: @@ -2826,9 +3161,9 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): self._write_json(400, {"error": "missing_prompt"}) return cleaned = _strip_bot_mention(prompt) - mode = str(payload.get("mode") or "fast").lower() + mode = str(payload.get("mode") or "deep").lower() if mode not in ("fast", "deep"): - mode = "fast" + mode = "deep" snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) @@ -2839,11 +3174,12 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): inventory=inventory, workloads=workloads, ) + followup = _is_followup_query(cleaned) cluster_query = ( _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) - or history_cluster or _knowledge_intent(cleaned) or _is_subjective_query(cleaned) + or (history_cluster and followup) ) context = "" if cluster_query: @@ -2857,7 +3193,11 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): ) fallback = "I don't have enough data to answer that." if cluster_query: - open_ended = _is_subjective_query(cleaned) or _knowledge_intent(cleaned) + open_ended = ( + _is_subjective_query(cleaned) + or _knowledge_intent(cleaned) + or _is_overview_query(cleaned) + ) if open_ended: answer = open_ended_answer( cleaned, @@ -3068,7 +3408,6 @@ def _knowledge_intent(prompt: str) -> bool: "summary", "describe", "explain", - "what is", ) ) @@ -3269,7 +3608,7 @@ def open_ended_with_thinking( ) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() - total_steps = 2 if mode == "fast" else 6 + total_steps = 4 if mode == "fast" else 7 state = ThoughtState(total_steps=total_steps) def worker(): @@ -3382,11 +3721,12 @@ def sync_loop(token: str, room_id: str): inventory=inventory, workloads=workloads, ) + followup = _is_followup_query(cleaned_body) cluster_query = ( _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) - or history_cluster or _knowledge_intent(cleaned_body) or _is_subjective_query(cleaned_body) + or (history_cluster and followup) ) context = "" if cluster_query: @@ -3407,7 +3747,11 @@ def sync_loop(token: str, room_id: str): fallback = "I don't have enough data to answer that." if cluster_query: - open_ended = _is_subjective_query(cleaned_body) or _knowledge_intent(cleaned_body) + open_ended = ( + _is_subjective_query(cleaned_body) + or _knowledge_intent(cleaned_body) + or _is_overview_query(cleaned_body) + ) if open_ended: reply = open_ended_with_thinking( token,