diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index 5e5bc05..17e2cb2 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-74 + checksum/atlasbot-configmap: manual-atlasbot-75 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 0176293..0668521 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -198,6 +198,8 @@ _INSIGHT_HINT_WORDS = { "unique", "notable", "coolest", + "risk", + "risky", "favorite", "favourite", "trivia", @@ -1641,17 +1643,6 @@ def _hottest_summary_line(metrics: dict[str, Any]) -> str: return "Hot spots: " + "; ".join(parts) + "." -def _is_insight_query(query: str) -> bool: - q = normalize_query(query) - if not q: - return False - if any(word in q for word in _INSIGHT_HINT_WORDS): - return True - if "most" in q and any(word in q for word in ("unusual", "odd", "weird", "unconventional")): - return True - return False - - _FOLLOWUP_HINTS = ( "what about", "how about", @@ -1724,198 +1715,6 @@ def _doc_intent(query: str) -> bool: ) -def _insight_candidates( - inventory: list[dict[str, Any]], - snapshot: dict[str, Any] | None, -) -> list[tuple[str, str, str]]: - metrics = _snapshot_metrics(snapshot) - candidates: list[tuple[str, str, str]] = [] - - nodes_line = _nodes_summary_line(inventory, snapshot) - if nodes_line and "not ready" in nodes_line.lower(): - candidates.append(("availability", nodes_line, "high")) - - hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {} - if hottest: - def _hot_node(entry: dict[str, Any]) -> str: - if not isinstance(entry, dict): - return "" - return ( - entry.get("node") - or entry.get("label") - or (entry.get("metric") or {}).get("node") - or "" - ) - - cpu = hottest.get("cpu") if isinstance(hottest.get("cpu"), dict) else {} - cpu_node = _hot_node(cpu) - if cpu_node and cpu.get("value") is not None: - value_fmt = _format_metric_value(str(cpu.get("value")), percent=True) - candidates.append(("cpu", f"The busiest CPU right now is {cpu_node} at about {value_fmt}.", "high")) - ram = hottest.get("ram") if isinstance(hottest.get("ram"), dict) else {} - ram_node = _hot_node(ram) - if ram_node and ram.get("value") is not None: - value_fmt = _format_metric_value(str(ram.get("value")), percent=True) - candidates.append(("ram", f"RAM usage peaks on {ram_node} at about {value_fmt}.", "high")) - - postgres_line = _postgres_summary_line(metrics) - if postgres_line: - candidates.append(("postgres", postgres_line, "high")) - - hardware_insight = _hardware_insight(inventory) - if hardware_insight: - candidates.append(("hardware", hardware_insight, "medium")) - - pods_line = _pods_summary_line(metrics) - if pods_line: - candidates.append(("pods", pods_line, "high")) - - return candidates - - -def _hardware_insight(inventory: list[dict[str, Any]]) -> str: - if not inventory: - return "" - groups = _group_nodes(inventory) - jetsons = groups.get("jetson") or [] - rpi5 = groups.get("rpi5") or [] - rpi4 = groups.get("rpi4") or [] - amd64 = groups.get("amd64") or [] - parts: list[str] = [] - if rpi5: - parts.append(f"rpi5={len(rpi5)}") - if rpi4: - parts.append(f"rpi4={len(rpi4)}") - if jetsons: - jetson_names = ", ".join(jetsons[:2]) - parts.append(f"jetson={len(jetsons)} ({jetson_names})") - if amd64: - parts.append(f"amd64={len(amd64)}") - return ", ".join(parts) - - -def _recent_insight_keys(history_lines: list[str]) -> set[str]: - used: set[str] = set() - for line in history_lines[-10:]: - lower = normalize_query(line) - if not lower: - continue - if "postgres" in lower or "connections" in lower: - used.add("postgres") - if "atlas mixes" in lower or "hardware" in lower or "rpi" in lower or "jetson" in lower: - used.add("hardware") - if "busiest cpu" in lower or "cpu right now" in lower or "cpu " in lower: - used.add("cpu") - if "ram usage" in lower or "memory" in lower: - used.add("ram") - if "pods" in lower: - used.add("pods") - if "not ready" in lower: - used.add("availability") - return used - - -def _select_insight( - prompt: str, - candidates: list[tuple[str, str, str]], - *, - used_keys: set[str] | None = None, -) -> tuple[str, str, str] | None: - if not candidates: - return None - used = used_keys or set() - q = normalize_query(prompt) - prefer_keys: list[str] = [] - if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")): - prefer_keys.extend(["hardware", "availability"]) - if any(word in q for word in ("coolest", "favorite", "favourite", "trivia", "fun")): - prefer_keys.extend(["hardware", "cpu", "ram"]) - if "interesting" in q and "most interesting" not in q: - prefer_keys.extend(["hardware", "postgres", "cpu", "ram"]) - avoid_used = any(word in q for word in ("another", "else", "different", "other")) or "most interesting" in q - if any(word in q for word in ("another", "else", "different", "other")) and len(candidates) > 1: - for candidate in candidates: - if candidate[0] not in used: - return candidate - return candidates[1] - if prefer_keys: - for prefer in prefer_keys: - for key, text, conf in candidates: - if key == prefer and (not avoid_used or key not in used): - return key, text, conf - for prefer in prefer_keys: - for key, text, conf in candidates: - if key == prefer: - return key, text, conf - if used and avoid_used: - for candidate in candidates: - if candidate[0] not in used: - return candidate - return candidates[0] - - -def _format_insight_text(key: str, text: str) -> str: - cleaned = text.strip().rstrip(".") - if not cleaned: - return "" - if key == "hardware": - counts = ( - cleaned.replace("Hardware mix includes ", "") - .replace("Atlas mixes tiny ", "") - .replace("Atlas mixes ", "") - .replace("which is unusual for a homelab cluster", "") - .strip() - .strip(".") - ) - has_jetson = "jetson=" in counts - has_amd64 = "amd64=" in counts - detail = f"mixed hardware stack ({counts})" - if has_jetson and has_amd64: - flavor = "It blends low-power Pis with Jetson accelerators and a couple of AMD64 boxes." - elif has_jetson: - flavor = "It pairs low-power Pis with Jetson accelerators for edge and AI workloads." - elif has_amd64: - flavor = "It mixes low-power Pis with a couple of heavier AMD64 nodes." - else: - flavor = "It is a pretty uniform hardware stack, which is rare for a homelab." - return f"{detail}. {flavor}" - if key == "postgres": - detail = cleaned.replace("Postgres is at ", "") - return f"Postgres is at {detail}; that feels like healthy, steady load rather than strain." - if key == "pods": - detail = cleaned.replace("There are ", "") - return f"Pods look steady ({detail}); nothing looks stuck or unhealthy." - if key == "availability": - return cleaned + " That is the kind of stability I like to see." - if key in ("cpu", "ram"): - suffix = ( - " If you're chasing hotspots, that's the node I'd watch first." - if key == "cpu" - else " That box is carrying the heaviest memory load right now." - ) - return cleaned + "." + suffix - return cleaned + "." - - -def _insight_prefix(prompt: str) -> str: - q = normalize_query(prompt) - if "coolest" in q: - return "If I had to pick the coolest detail, I'd say " - if "favorite" in q or "favourite" in q: - return "My favorite detail is " - if "trivia" in q: - return "A bit of trivia I like: " - if "most interesting" in q: - return "The most interesting detail to me is " - if any(word in q for word in ("another", "else", "different", "other")): - return "Another interesting detail: " - if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")): - return "What stands out to me is that " - if any(word in q for word in ("interesting", "notable", "fun", "cool")): - return "One thing I'd call out is " - return "" - - def cluster_overview_answer( prompt: str, *, @@ -2784,7 +2583,7 @@ def _open_ended_system() -> str: "If the question is ambiguous, pick a reasonable interpretation and state it briefly. " "Avoid repeating the exact same observation as the last response if possible. " "Do not invent numbers or facts. " - "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100)." + "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), HallucinationRisk (low|medium|high)." ) @@ -2809,263 +2608,284 @@ def _ollama_call_safe( def _candidate_note(candidate: dict[str, Any]) -> str: - claim = str(candidate.get("claim") or candidate.get("summary") or "") + claim = str(candidate.get("focus") or candidate.get("answer") or "") return claim[:160] + ("…" if len(claim) > 160 else "") def _ensure_scores(answer: str) -> str: text = answer.strip() lines = [line for line in text.splitlines() if line.strip()] - has_relevance = any(line.lower().startswith("relevance:") for line in lines) - has_satisfaction = any(line.lower().startswith("satisfaction:") for line in lines) - has_confidence = any("confidence:" in line.lower() for line in lines) + has_relevance = any(line.lower().startswith("relevance") for line in lines) + has_satisfaction = any(line.lower().startswith("satisfaction") for line in lines) + has_confidence = any(line.lower().startswith("confidence") for line in lines) + has_risk = any(line.lower().startswith("hallucinationrisk") for line in lines) if not has_confidence: lines.append("Confidence: medium") if not has_relevance: lines.append("Relevance: 70") if not has_satisfaction: lines.append("Satisfaction: 70") + if not has_risk: + lines.append("HallucinationRisk: low") return "\n".join(lines) +def _open_ended_plan( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + count: int, + state: ThoughtState | None, +) -> list[dict[str, Any]]: + if state: + state.update("planning", step=1, note="mapping angles") + count = max(1, count) + prompt_text = ( + "Analyze the question and propose up to " + f"{count} distinct answer angles that can be supported by the fact pack. " + "Keep them diverse (e.g., metrics, hardware, workload placement, recent changes). " + "If the question is subjective, propose at least one angle that surfaces a standout detail. " + "Return JSON: {\"angles\":[{\"focus\":\"...\",\"reason\":\"...\",\"priority\":1-5}]}." + ) + context = _append_history_context(fact_pack, history_lines) + result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) + angles = result.get("angles") if isinstance(result, dict) else None + cleaned: list[dict[str, Any]] = [] + seen: set[str] = set() + if isinstance(angles, list): + for item in angles: + if not isinstance(item, dict): + continue + focus = str(item.get("focus") or "").strip() + if not focus or focus.lower() in seen: + continue + seen.add(focus.lower()) + priority = item.get("priority") + if not isinstance(priority, (int, float)): + priority = 3 + cleaned.append( + { + "focus": focus, + "reason": str(item.get("reason") or ""), + "priority": int(max(1, min(5, priority))), + } + ) + if not cleaned: + cleaned = [{"focus": "Direct answer", "reason": "Default fallback", "priority": 3}] + cleaned.sort(key=lambda item: item.get("priority", 3), reverse=True) + if state: + state.update("planning", step=1, note=_candidate_note(cleaned[0])) + return cleaned + + +def _normalize_score(value: Any, *, default: int = 60) -> int: + if isinstance(value, (int, float)): + return int(max(0, min(100, value))) + return default + + +def _confidence_score(value: Any) -> int: + text = str(value or "").strip().lower() + if text.startswith("high"): + return 85 + if text.startswith("low"): + return 35 + return 60 + + +def _risk_penalty(value: Any) -> int: + text = str(value or "").strip().lower() + if text.startswith("high"): + return 20 + if text.startswith("medium"): + return 10 + return 0 + + +def _open_ended_candidate( + prompt: str, + *, + focus: str, + fact_pack: str, + history_lines: list[str], + state: ThoughtState | None, + step: int, +) -> dict[str, Any]: + if state: + state.update("drafting", step=step, note=focus) + prompt_text = ( + "Using ONLY the fact pack, answer the question focusing on this angle: " + f"{focus}. " + "Write 2-4 sentences in plain prose (not a list). " + "If you infer, label it as inference. " + "Return JSON: {\"answer\":\"...\",\"confidence\":\"high|medium|low\"," + "\"relevance\":0-100,\"satisfaction\":0-100,\"risk\":\"low|medium|high\"}." + ) + context = _append_history_context(fact_pack, history_lines) + result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) + if not isinstance(result, dict): + result = {} + answer = str(result.get("answer") or "").strip() + if not answer: + answer = "I don't have enough data to answer that from the current snapshot." + candidate = { + "focus": focus, + "answer": answer, + "confidence": result.get("confidence", "medium"), + "relevance": _normalize_score(result.get("relevance"), default=60), + "satisfaction": _normalize_score(result.get("satisfaction"), default=60), + "risk": result.get("risk", "medium"), + } + candidate["score"] = _candidate_score(candidate) + return candidate + + +def _candidate_score(candidate: dict[str, Any]) -> float: + relevance = _normalize_score(candidate.get("relevance"), default=60) + satisfaction = _normalize_score(candidate.get("satisfaction"), default=60) + confidence = _confidence_score(candidate.get("confidence")) + score = relevance * 0.45 + satisfaction * 0.35 + confidence * 0.2 + return score - _risk_penalty(candidate.get("risk")) + + +def _select_candidates(candidates: list[dict[str, Any]], *, count: int) -> list[dict[str, Any]]: + if not candidates: + return [] + ranked = sorted(candidates, key=lambda item: item.get("score", 0), reverse=True) + picked: list[dict[str, Any]] = [] + seen_focus: set[str] = set() + for item in ranked: + focus = str(item.get("focus") or "").strip().lower() + if focus and focus in seen_focus: + continue + picked.append(item) + if focus: + seen_focus.add(focus) + if len(picked) >= count: + break + return picked or ranked[:count] + + +def _open_ended_synthesize( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + candidates: list[dict[str, Any]], + state: ThoughtState | None, + step: int, +) -> str: + if state: + state.update("synthesizing", step=step, note="composing answer") + synth_prompt = ( + "Compose the final answer to the question using the candidate answers below. " + "Select the best 1-2 candidates, blend them if helpful, and keep 2-4 sentences. " + "Use only the fact pack as evidence. " + "If you infer, label it as inference. " + "Avoid repeating the last response if possible. " + "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), " + "HallucinationRisk (low|medium|high).\n" + f"Question: {prompt}\n" + f"Candidates: {json.dumps(candidates, ensure_ascii=False)}" + ) + context = _append_history_context(fact_pack, history_lines) + reply = _ollama_call_safe( + ("open", "synth"), + synth_prompt, + context=context, + fallback="I don't have enough data to answer that.", + system_override=_open_ended_system(), + ) + return _ensure_scores(reply) + + +def _open_ended_multi( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + mode: str, + state: ThoughtState | None = None, +) -> str: + angle_count = 2 if mode == "fast" else 4 + total_steps = 1 + angle_count + 2 + if state: + state.total_steps = total_steps + angles = _open_ended_plan( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + count=angle_count, + state=state, + ) + candidates: list[dict[str, Any]] = [] + step = 2 + for angle in angles[:angle_count]: + candidates.append( + _open_ended_candidate( + prompt, + focus=str(angle.get("focus") or "Direct answer"), + fact_pack=fact_pack, + history_lines=history_lines, + state=state, + step=step, + ) + ) + step += 1 + if state: + state.update("evaluating", step=step, note="ranking candidates") + selected = _select_candidates(candidates, count=1 if mode == "fast" else 2) + step += 1 + reply = _open_ended_synthesize( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + candidates=selected or candidates, + state=state, + step=step, + ) + if state: + state.update("done", step=total_steps) + return reply + + +def _open_ended_total_steps(mode: str) -> int: + angle_count = 2 if mode == "fast" else 4 + return 1 + angle_count + 2 + + def _open_ended_fast( prompt: str, *, fact_pack: str, history_lines: list[str], - fact_lines: list[str], - fact_meta: dict[str, dict[str, Any]], - tags_available: set[str], - history_tags: set[str], state: ThoughtState | None = None, ) -> str: - if state: - state.update("planning", step=1) - analysis = _interpret_open_question( + return _open_ended_multi( prompt, fact_pack=fact_pack, history_lines=history_lines, - tags_available=tags_available, - avoid_tags=history_tags, + mode="fast", state=state, ) - candidates = _select_insights( - prompt, - fact_pack=fact_pack, - history_lines=history_lines, - state=state or ThoughtState(), - analysis=analysis, - fact_lines=fact_lines, - fact_meta=fact_meta, - avoid_tags=history_tags, - ) - prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)} - selected = _select_diverse_insights( - candidates, - preference=analysis.get("preference", "balanced"), - prefer_tags=prefer_tags, - avoid_tags=history_tags, - history_tags=history_tags, - fact_meta=fact_meta, - count=2, - ) - if state: - state.update("synthesizing", step=3) - synthesis_prompt = ( - "Use the question, fact pack, and selected insights to answer in 2-4 sentences. " - "Speak naturally, not as a list. " - "If the question is subjective, add a light opinion grounded in facts. " - "Avoid repeating the exact same observation as the most recent response if possible. " - "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" - f"Question: {prompt}\n" - f"Selected: {json.dumps(selected, ensure_ascii=False)}" - ) - context = _append_history_context(fact_pack, history_lines) - reply = _ollama_call_safe( - ("fast", "open"), - synthesis_prompt, - context=context, - fallback="I don't have enough data to answer that.", - system_override=_open_ended_system(), - ) - return _ensure_scores(reply) - - -def _interpret_open_question( - prompt: str, - *, - fact_pack: str, - history_lines: list[str], - tags_available: set[str], - avoid_tags: set[str], - state: ThoughtState | None = None, -) -> dict[str, Any]: - tags_list = ", ".join(sorted(tags_available)) if tags_available else "none" - avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none" - prompt_text = ( - "Analyze the question against the fact pack. " - "Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\"," - "\"tags\":[\"...\"] ,\"notes\":\"...\"}. " - "If the question implies interesting/unique/unconventional/cool, set preference to novelty " - "and prefer dynamic tags (utilization/pods/database/availability) when possible. " - f"Use only these tags if relevant: {tags_list}. Avoid tags: {avoid_list}. " - "Use only the fact pack." - ) - context = _append_history_context(fact_pack, history_lines) - analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) - if not isinstance(analysis, dict): - analysis = {"focus": "cluster snapshot", "preference": "balanced", "notes": "", "tags": []} - preference = analysis.get("preference") or "balanced" - if preference not in ("balanced", "novelty", "utilization", "stability", "risk"): - preference = "balanced" - analysis["preference"] = preference - analysis.setdefault("focus", "cluster snapshot") - analysis.setdefault("notes", "") - tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else [] - clean_tags = {t for t in tags if isinstance(t, str)} - analysis["tags"] = sorted(clean_tags & tags_available) - if state: - state.update("planning", step=1, note=str(analysis.get("focus") or "")) - return analysis - - -def _select_insights( - prompt: str, - *, - fact_pack: str, - history_lines: list[str], - state: ThoughtState, - analysis: dict[str, Any], - fact_lines: list[str], - fact_meta: dict[str, dict[str, Any]], - avoid_tags: set[str], -) -> list[dict[str, Any]]: - preferred_tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else [] - prefer_list = ", ".join(sorted({t for t in preferred_tags if isinstance(t, str)})) - avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none" - available_list = ", ".join(sorted({t for t in _ALLOWED_INSIGHT_TAGS})) - insight_prompt = ( - "From the fact pack, select 3-5 candidate insights that could answer the question. " - "Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"]," - "\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\",\"tags\":[\"...\"]}]}. " - f"Available tags: {available_list}. Prefer tags: {prefer_list or 'none'}. Avoid tags: {avoid_list}. " - "Use only the fact pack and provided tags." - ) - state.update("drafting candidates", step=2) - context = _append_history_context(fact_pack, history_lines) - result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context) - insights = result.get("insights") if isinstance(result, dict) else None - if not isinstance(insights, list): - insights = [] - cleaned: list[dict[str, Any]] = [] - for item in insights: - if not isinstance(item, dict): - continue - if not item.get("summary") or not item.get("fact_ids"): - continue - tags = _insight_tags(item, fact_meta) - item["tags"] = sorted(tags) - cleaned.append(item) - state.update("drafting candidates", step=2, note=_candidate_note(item)) - seeds = _seed_insights(fact_lines, fact_meta) - for seed in seeds: - cleaned.append(seed) - return cleaned - - -def _score_insight(insight: dict[str, Any], preference: str) -> float: - relevance = insight.get("relevance") if isinstance(insight.get("relevance"), (int, float)) else 0.0 - novelty = insight.get("novelty") if isinstance(insight.get("novelty"), (int, float)) else 0.0 - if preference == "novelty": - return 0.4 * relevance + 0.6 * novelty - if preference == "utilization": - return 0.7 * relevance + 0.3 * novelty - if preference == "stability": - return 0.7 * relevance + 0.3 * novelty - if preference == "risk": - return 0.6 * relevance + 0.4 * novelty - return 0.6 * relevance + 0.4 * novelty def _open_ended_deep( prompt: str, *, fact_pack: str, - fact_ids: set[str], history_lines: list[str], - fact_lines: list[str], - fact_meta: dict[str, dict[str, Any]], - tags_available: set[str], - history_tags: set[str], state: ThoughtState | None = None, ) -> str: - state = state or ThoughtState() - if not fact_ids: - return _ensure_scores("I don't have enough data to answer that.") - state.total_steps = 7 - analysis = _interpret_open_question( + return _open_ended_multi( prompt, fact_pack=fact_pack, history_lines=history_lines, - tags_available=tags_available, - avoid_tags=history_tags, + mode="deep", state=state, ) - candidates = _select_insights( - prompt, - fact_pack=fact_pack, - history_lines=history_lines, - state=state, - analysis=analysis, - fact_lines=fact_lines, - fact_meta=fact_meta, - avoid_tags=history_tags, - ) - state.update("verifying", step=3, note="scoring insights") - filtered: list[dict[str, Any]] = [] - for cand in candidates: - cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else [] - if cites and not all(cite in fact_ids for cite in cites): - continue - filtered.append(cand) - if not filtered: - filtered = candidates - - preference = analysis.get("preference", "balanced") - prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)} - top = _select_diverse_insights( - filtered, - preference=preference, - prefer_tags=prefer_tags, - avoid_tags=history_tags, - history_tags=history_tags, - fact_meta=fact_meta, - count=2, - ) - state.update("synthesizing", step=4, note="composing response") - synth_prompt = ( - "Use the question, fact pack, and selected insights to craft a concise answer. " - "Write 2-4 sentences. Explain why the selected insights stand out. " - "If the question is subjective, include a light opinion grounded in facts. " - "Avoid repeating the same observation as the last response if possible. " - "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" - f"Question: {prompt}\n" - f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n" - f"Recent tags: {', '.join(sorted(history_tags)) if history_tags else 'none'}\n" - f"Selected: {json.dumps(top, ensure_ascii=False)}" - ) - context = _append_history_context(fact_pack, history_lines) - reply = _ollama_call_safe( - ("deep", "open"), - synth_prompt, - context=context, - fallback="I don't have enough data to answer that.", - system_override=_open_ended_system(), - ) - state.update("done", step=7) - return _ensure_scores(reply) - def open_ended_answer( prompt: str, @@ -3081,30 +2901,17 @@ def open_ended_answer( if not lines: return _ensure_scores("I don't have enough data to answer that.") fact_pack = _fact_pack_text(lines) - fact_ids = {f"F{i+1}" for i in range(len(lines))} - fact_meta = _fact_pack_meta(lines) - tags_available = {tag for entry in fact_meta.values() for tag in entry.get("tags", [])} - history_tags = _history_tags(history_lines) if mode == "fast": return _open_ended_fast( prompt, fact_pack=fact_pack, history_lines=history_lines, - fact_lines=lines, - fact_meta=fact_meta, - tags_available=tags_available, - history_tags=history_tags, state=state, ) return _open_ended_deep( prompt, fact_pack=fact_pack, - fact_ids=fact_ids, history_lines=history_lines, - fact_lines=lines, - fact_meta=fact_meta, - tags_available=tags_available, - history_tags=history_tags, state=state, ) @@ -3175,12 +2982,12 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): workloads=workloads, ) followup = _is_followup_query(cleaned) - cluster_query = ( - _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) - or _knowledge_intent(cleaned) - or _is_subjective_query(cleaned) - or (history_cluster and followup) - ) + cleaned_q = normalize_query(cleaned) + cluster_affinity = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads) + subjective = _is_subjective_query(cleaned) + followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS) + contextual = history_cluster and (followup or followup_affinity) + cluster_query = cluster_affinity or contextual context = "" if cluster_query: context = build_context( @@ -3608,7 +3415,7 @@ def open_ended_with_thinking( ) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() - total_steps = 4 if mode == "fast" else 7 + total_steps = _open_ended_total_steps(mode) state = ThoughtState(total_steps=total_steps) def worker(): @@ -3722,12 +3529,12 @@ def sync_loop(token: str, room_id: str): workloads=workloads, ) followup = _is_followup_query(cleaned_body) - cluster_query = ( - _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) - or _knowledge_intent(cleaned_body) - or _is_subjective_query(cleaned_body) - or (history_cluster and followup) - ) + cleaned_q = normalize_query(cleaned_body) + cluster_affinity = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) + subjective = _is_subjective_query(cleaned_body) + followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS) + contextual = history_cluster and (followup or followup_affinity) + cluster_query = cluster_affinity or contextual context = "" if cluster_query: context = build_context(