diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index 6761287..b08f20d 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-81 + checksum/atlasbot-configmap: manual-atlasbot-82 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" @@ -83,6 +83,10 @@ spec: value: http://ollama.ai.svc.cluster.local:11434 - name: OLLAMA_MODEL value: qwen2.5:14b-instruct + - name: ATLASBOT_MODEL_FAST + value: qwen2.5:14b-instruct + - name: ATLASBOT_MODEL_DEEP + value: qwen2.5:14b-instruct - name: OLLAMA_FALLBACK_MODEL value: qwen2.5:14b-instruct-q4_0 - name: OLLAMA_TIMEOUT_SEC diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 7f22ad5..7e6341e 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -17,6 +17,8 @@ ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") +MODEL_FAST = os.environ.get("ATLASBOT_MODEL_FAST", "") +MODEL_DEEP = os.environ.get("ATLASBOT_MODEL_DEEP", "") FALLBACK_MODEL = os.environ.get("OLLAMA_FALLBACK_MODEL", "") API_KEY = os.environ.get("CHAT_API_KEY", "") OLLAMA_TIMEOUT_SEC = float(os.environ.get("OLLAMA_TIMEOUT_SEC", "480")) @@ -372,6 +374,14 @@ def _detect_mode_from_body(body: str, *, default: str = "deep") -> str: return default +def _model_for_mode(mode: str) -> str: + if mode == "fast" and MODEL_FAST: + return MODEL_FAST + if mode == "deep" and MODEL_DEEP: + return MODEL_DEEP + return MODEL + + # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path @@ -2487,7 +2497,13 @@ class ThoughtState: return f"Still thinking ({detail})." -def _ollama_json_call(prompt: str, *, context: str, retries: int = 2) -> dict[str, Any]: +def _ollama_json_call( + prompt: str, + *, + context: str, + retries: int = 2, + model: str | None = None, +) -> dict[str, Any]: system = ( "System: You are Atlas, a reasoning assistant. " "Return strict JSON only (no code fences, no trailing commentary). " @@ -2504,6 +2520,7 @@ def _ollama_json_call(prompt: str, *, context: str, retries: int = 2) -> dict[st context=context, use_history=False, system_override=system, + model=model, ) cleaned = _strip_code_fence(raw).strip() if cleaned.startswith("{") and cleaned.endswith("}"): @@ -2547,6 +2564,19 @@ def _fact_pack_text(lines: list[str]) -> str: return "Fact pack:\n" + "\n".join(labeled) +def _tool_fact_lines(prompt: str, *, allow_tools: bool) -> list[str]: + if not allow_tools: + return [] + metrics_context, _ = metrics_query_context(prompt, allow_tools=True) + lines: list[str] = [] + if metrics_context: + for line in metrics_context.splitlines(): + trimmed = line.strip() + if trimmed: + lines.append(f"tool_metrics: {trimmed}") + return lines + + _ALLOWED_INSIGHT_TAGS = { "availability", "architecture", @@ -2607,6 +2637,15 @@ def _history_tags(history_lines: list[str]) -> set[str]: return tags & _ALLOWED_INSIGHT_TAGS +def _normalize_fraction(value: Any, *, default: float = 0.5) -> float: + if isinstance(value, (int, float)): + score = float(value) + if score > 1: + score = score / 100.0 + return max(0.0, min(1.0, score)) + return default + + def _seed_insights( lines: list[str], fact_meta: dict[str, dict[str, Any]], @@ -2735,9 +2774,9 @@ def _open_ended_system() -> str: "Use ONLY the provided fact pack and recent chat as your evidence. " "You may draw light inferences if you label them as such. " "Write concise, human sentences with a helpful, calm tone (not a list). " - "If the question is subjective, share a light opinion grounded in facts. " + "If the question is subjective, share a light opinion grounded in facts and explain why it stands out. " "If the question is ambiguous, pick a reasonable interpretation and state it briefly. " - "Avoid repeating the exact same observation as the last response if possible. " + "Avoid repeating the exact same observation as the last response if possible; vary across metrics, workload, or hardware details. " "Do not invent numbers or facts. " "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), HallucinationRisk (low|medium|high)." ) @@ -2750,6 +2789,7 @@ def _ollama_call_safe( context: str, fallback: str, system_override: str | None = None, + model: str | None = None, ) -> str: try: return _ollama_call( @@ -2758,6 +2798,7 @@ def _ollama_call_safe( context=context, use_history=False, system_override=system_override, + model=model, ) except Exception: return fallback @@ -2841,6 +2882,7 @@ def _open_ended_plan( history_lines: list[str], count: int, state: ThoughtState | None, + model: str | None, ) -> list[dict[str, Any]]: if state: state.update("planning", step=1, note="mapping angles") @@ -2850,10 +2892,15 @@ def _open_ended_plan( f"{count} distinct answer angles that can be supported by the fact pack. " "Keep them diverse (e.g., metrics, hardware, workload placement, recent changes). " "If the question is subjective, propose at least one angle that surfaces a standout detail. " + "Avoid repeating the same angle as the most recent response if possible. " "Return JSON: {\"angles\":[{\"focus\":\"...\",\"reason\":\"...\",\"priority\":1-5}]}." ) context = _append_history_context(fact_pack, history_lines) - result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) + result = _ollama_json_call( + prompt_text + f" Question: {prompt}", + context=context, + model=model, + ) angles = result.get("angles") if isinstance(result, dict) else None cleaned: list[dict[str, Any]] = [] seen: set[str] = set() @@ -2883,6 +2930,81 @@ def _open_ended_plan( return cleaned +def _preferred_tags_for_prompt(prompt: str) -> set[str]: + q = normalize_query(prompt) + tags: set[str] = set() + if any(word in q for word in ("cpu", "ram", "memory", "net", "network", "io", "disk", "hottest", "busy", "usage", "utilization", "load")): + tags.add("utilization") + if any(word in q for word in ("postgres", "database", "db", "connections")): + tags.add("database") + if any(word in q for word in ("pod", "pods", "deployment", "job", "cronjob")): + tags.add("pods") + if any(word in q for word in ("workload", "service", "namespace")): + tags.add("workloads") + if any(word in q for word in ("ready", "not ready", "down", "unreachable", "availability")): + tags.add("availability") + if any(word in q for word in ("node", "nodes", "hardware", "arch", "architecture", "rpi", "jetson", "amd64", "arm64", "worker", "control-plane")): + tags.update({"hardware", "inventory", "architecture"}) + return tags & _ALLOWED_INSIGHT_TAGS + + +def _open_ended_insights( + prompt: str, + *, + fact_pack: str, + fact_meta: dict[str, dict[str, Any]], + history_lines: list[str], + count: int, + state: ThoughtState | None, + model: str | None, +) -> list[dict[str, Any]]: + if state: + state.update("analyzing", note="scouting insights") + count = max(1, count) + allowed_tags = ", ".join(sorted(_ALLOWED_INSIGHT_TAGS)) + prompt_text = ( + "Review the fact pack and propose up to " + f"{count} insights that could answer the question. " + "Each insight should be grounded in the facts. " + "Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"]," + "\"relevance\":0-1,\"novelty\":0-1,\"tags\":[\"tag\"],\"rationale\":\"...\"}]}. " + f"Only use tags from: {allowed_tags}." + ) + context = _append_history_context(fact_pack, history_lines) + result = _ollama_json_call( + prompt_text + f" Question: {prompt}", + context=context, + model=model, + ) + insights = result.get("insights") if isinstance(result, dict) else None + cleaned: list[dict[str, Any]] = [] + valid_ids = set(fact_meta.keys()) + if isinstance(insights, list): + for item in insights: + if not isinstance(item, dict): + continue + summary = str(item.get("summary") or item.get("claim") or "").strip() + if not summary: + continue + raw_ids = item.get("fact_ids") if isinstance(item.get("fact_ids"), list) else [] + fact_ids = [fid for fid in raw_ids if isinstance(fid, str) and fid in valid_ids] + if not fact_ids: + continue + cleaned.append( + { + "summary": summary, + "fact_ids": fact_ids, + "relevance": _normalize_fraction(item.get("relevance"), default=0.6), + "novelty": _normalize_fraction(item.get("novelty"), default=0.5), + "rationale": str(item.get("rationale") or ""), + "tags": [t for t in (item.get("tags") or []) if isinstance(t, str)], + } + ) + if cleaned and state: + state.update("analyzing", note=_candidate_note(cleaned[0])) + return cleaned + + def _normalize_score(value: Any, *, default: int = 60) -> int: if isinstance(value, (int, float)): return int(max(0, min(100, value))) @@ -2915,20 +3037,31 @@ def _open_ended_candidate( history_lines: list[str], state: ThoughtState | None, step: int, + fact_hints: list[str] | None = None, + model: str | None = None, ) -> dict[str, Any]: if state: state.update("drafting", step=step, note=focus) + hint_text = "" + if fact_hints: + hint_text = " Prioritize these fact IDs if relevant: " + ", ".join(fact_hints) + "." prompt_text = ( "Using ONLY the fact pack, answer the question focusing on this angle: " f"{focus}. " - "Write 2-4 sentences in plain prose (not a list). " + "Write 2-4 sentences in plain prose (not a list)." + + hint_text + + " " "If you infer, label it as inference. " "List which fact pack IDs you used. " "Return JSON: {\"answer\":\"...\",\"facts_used\":[\"F1\"],\"confidence\":\"high|medium|low\"," "\"relevance\":0-100,\"satisfaction\":0-100,\"risk\":\"low|medium|high\"}." ) context = _append_history_context(fact_pack, history_lines) - result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) + result = _ollama_json_call( + prompt_text + f" Question: {prompt}", + context=context, + model=model, + ) if not isinstance(result, dict): result = {} answer = str(result.get("answer") or "").strip() @@ -2986,9 +3119,12 @@ def _open_ended_synthesize( candidates: list[dict[str, Any]], state: ThoughtState | None, step: int, + model: str | None, + critique: str | None = None, ) -> str: if state: state.update("synthesizing", step=step, note="composing answer") + critique_block = f"\nCritique guidance: {critique}\n" if critique else "\n" synth_prompt = ( "Compose the final answer to the question using the candidate answers below. " "Select the best 1-2 candidates, blend them if helpful, and keep 2-4 sentences. " @@ -3001,6 +3137,7 @@ def _open_ended_synthesize( "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), " "HallucinationRisk (low|medium|high).\n" f"Question: {prompt}\n" + f"{critique_block}" f"Candidates: {json.dumps(candidates, ensure_ascii=False)}" ) context = _append_history_context(fact_pack, history_lines) @@ -3010,20 +3147,55 @@ def _open_ended_synthesize( context=context, fallback="I don't have enough data to answer that.", system_override=_open_ended_system(), + model=model, ) return _ensure_scores(reply) +def _open_ended_critique( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + candidates: list[dict[str, Any]], + state: ThoughtState | None, + step: int, + model: str | None, +) -> str: + if state: + state.update("reviewing", step=step, note="quality check") + critique_prompt = ( + "Review the candidate answers against the fact pack. " + "Identify any missing important detail or risky inference and give one sentence of guidance. " + "Return JSON: {\"guidance\":\"...\",\"risk\":\"low|medium|high\"}." + ) + context = _append_history_context(fact_pack, history_lines) + result = _ollama_json_call( + critique_prompt + f" Question: {prompt} Candidates: {json.dumps(candidates, ensure_ascii=False)}", + context=context, + model=model, + ) + if isinstance(result, dict): + guidance = str(result.get("guidance") or "").strip() + if guidance: + return guidance + return "" + + def _open_ended_multi( prompt: str, *, fact_pack: str, + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], history_lines: list[str], mode: str, state: ThoughtState | None = None, ) -> str: + model = _model_for_mode(mode) angle_count = 2 if mode == "fast" else 4 - total_steps = 1 + angle_count + 2 + insight_count = 2 if mode == "fast" else 4 + total_steps = 2 + angle_count + 2 + (1 if mode == "deep" else 0) if state: state.total_steps = total_steps angles = _open_ended_plan( @@ -3032,10 +3204,57 @@ def _open_ended_multi( history_lines=history_lines, count=angle_count, state=state, + model=model, ) + insights = _open_ended_insights( + prompt, + fact_pack=fact_pack, + fact_meta=fact_meta, + history_lines=history_lines, + count=insight_count, + state=state, + model=model, + ) + seeds = _seed_insights(fact_lines, fact_meta, limit=max(4, insight_count)) + insight_candidates = insights + seeds + subjective = _is_subjective_query(prompt) + prefer_tags = _preferred_tags_for_prompt(prompt) + history_tags = _history_tags(history_lines) + avoid_tags = history_tags if subjective else set() + preference = "novelty" if subjective else "relevance" + selected_insights = _select_diverse_insights( + insight_candidates, + preference=preference, + prefer_tags=prefer_tags, + avoid_tags=avoid_tags, + history_tags=history_tags, + fact_meta=fact_meta, + count=1 if mode == "fast" else 2, + ) + if state and selected_insights: + state.update("analyzing", note=_candidate_note(selected_insights[0])) + + angle_inputs: list[dict[str, Any]] = [] + for insight in selected_insights: + angle_inputs.append( + { + "focus": str(insight.get("summary") or "Direct answer"), + "fact_ids": insight.get("fact_ids") or [], + } + ) + for angle in angles: + if len(angle_inputs) >= angle_count: + break + angle_inputs.append( + { + "focus": str(angle.get("focus") or "Direct answer"), + "fact_ids": [], + } + ) + candidates: list[dict[str, Any]] = [] - step = 2 - for angle in angles[:angle_count]: + step = 3 + for angle in angle_inputs[:angle_count]: candidates.append( _open_ended_candidate( prompt, @@ -3044,6 +3263,8 @@ def _open_ended_multi( history_lines=history_lines, state=state, step=step, + fact_hints=angle.get("fact_ids") if isinstance(angle.get("fact_ids"), list) else None, + model=model, ) ) step += 1 @@ -3051,6 +3272,18 @@ def _open_ended_multi( state.update("evaluating", step=step, note="ranking candidates") selected = _select_candidates(candidates, count=1 if mode == "fast" else 2) step += 1 + critique = "" + if mode == "deep": + critique = _open_ended_critique( + prompt, + fact_pack=fact_pack, + history_lines=history_lines, + candidates=selected or candidates, + state=state, + step=step, + model=model, + ) + step += 1 reply = _open_ended_synthesize( prompt, fact_pack=fact_pack, @@ -3058,6 +3291,8 @@ def _open_ended_multi( candidates=selected or candidates, state=state, step=step, + model=model, + critique=critique, ) if state: state.update("done", step=total_steps) @@ -3066,19 +3301,23 @@ def _open_ended_multi( def _open_ended_total_steps(mode: str) -> int: angle_count = 2 if mode == "fast" else 4 - return 1 + angle_count + 2 + return 2 + angle_count + 2 + (1 if mode == "deep" else 0) def _open_ended_fast( prompt: str, *, fact_pack: str, + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], history_lines: list[str], state: ThoughtState | None = None, ) -> str: return _open_ended_multi( prompt, fact_pack=fact_pack, + fact_lines=fact_lines, + fact_meta=fact_meta, history_lines=history_lines, mode="fast", state=state, @@ -3089,12 +3328,16 @@ def _open_ended_deep( prompt: str, *, fact_pack: str, + fact_lines: list[str], + fact_meta: dict[str, dict[str, Any]], history_lines: list[str], state: ThoughtState | None = None, ) -> str: return _open_ended_multi( prompt, fact_pack=fact_pack, + fact_lines=fact_lines, + fact_meta=fact_meta, history_lines=history_lines, mode="deep", state=state, @@ -3109,31 +3352,61 @@ def open_ended_answer( workloads: list[dict[str, Any]], history_lines: list[str], mode: str, + allow_tools: bool, state: ThoughtState | None = None, ) -> str: lines = _fact_pack_lines(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) + if _knowledge_intent(prompt) or _doc_intent(prompt): + kb_detail = kb_retrieve(prompt) + if kb_detail: + for line in kb_detail.splitlines(): + if line.strip(): + lines.append(line.strip()) + tool_lines = _tool_fact_lines(prompt, allow_tools=allow_tools) + if tool_lines: + lines.extend(tool_lines) if not lines: return _ensure_scores("I don't have enough data to answer that.") fact_pack = _fact_pack_text(lines) + fact_meta = _fact_pack_meta(lines) if mode == "fast": return _open_ended_fast( prompt, fact_pack=fact_pack, + fact_lines=lines, + fact_meta=fact_meta, history_lines=history_lines, state=state, ) return _open_ended_deep( prompt, fact_pack=fact_pack, + fact_lines=lines, + fact_meta=fact_meta, history_lines=history_lines, state=state, ) -def _non_cluster_reply(prompt: str) -> str: - return _ensure_scores( - "I focus on the Atlas/Othrys cluster and don't have enough data to answer that." +def _non_cluster_reply(prompt: str, *, history_lines: list[str], mode: str) -> str: + system = ( + "System: You are Atlas, a helpful general assistant. " + "Answer using common knowledge when possible, and say when you're unsure. " + "Be concise and avoid unnecessary caveats. " + "Respond in plain sentences (no lists unless asked). " + "End every response with a line: 'Confidence: high|medium|low'." ) + model = _model_for_mode(mode) + context = _append_history_context("", history_lines) if history_lines else "" + reply = _ollama_call( + ("general", "reply"), + prompt, + context=context, + use_history=False, + system_override=system, + model=model, + ) + return _ensure_scores(reply) # Internal HTTP endpoint for cluster answers (website uses this). @@ -3183,7 +3456,11 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): return cleaned = _strip_bot_mention(prompt) mode = str(payload.get("mode") or "deep").lower() - if mode not in ("fast", "deep"): + if mode in ("quick", "fast"): + mode = "fast" + elif mode in ("smart", "deep"): + mode = "deep" + else: mode = "deep" snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() @@ -3212,37 +3489,19 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): snapshot=snapshot, workloads=workloads, ) - fallback = "I don't have enough data to answer that." if cluster_query: - open_ended = ( - _is_subjective_query(cleaned) - or _knowledge_intent(cleaned) - or _is_overview_query(cleaned) - or _doc_intent(cleaned) + answer = open_ended_answer( + cleaned, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history_lines, + mode=mode, + allow_tools=False, + state=None, ) - if open_ended: - answer = open_ended_answer( - cleaned, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history_lines, - mode=mode, - state=None, - ) - else: - answer = ( - cluster_answer( - cleaned, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history_lines, - ) - or fallback - ) else: - answer = _non_cluster_reply(cleaned) + answer = _non_cluster_reply(cleaned, history_lines=history_lines, mode=mode) self._write_json(200, {"answer": answer}) @@ -3490,6 +3749,7 @@ def _ollama_call( context: str, use_history: bool = True, system_override: str | None = None, + model: str | None = None, ) -> str: system = system_override or ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " @@ -3521,7 +3781,8 @@ def _ollama_call( messages.extend(_history_to_messages(history[hist_key][-24:])) messages.append({"role": "user", "content": prompt}) - payload = {"model": MODEL, "messages": messages, "stream": False} + model_name = model or MODEL + payload = {"model": model_name, "messages": messages, "stream": False} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY @@ -3561,11 +3822,18 @@ def ollama_reply( context: str, fallback: str = "", use_history: bool = True, + model: str | None = None, ) -> str: last_error = None for attempt in range(max(1, OLLAMA_RETRIES + 1)): try: - return _ollama_call(hist_key, prompt, context=context, use_history=use_history) + return _ollama_call( + hist_key, + prompt, + context=context, + use_history=use_history, + model=model, + ) except Exception as exc: # noqa: BLE001 last_error = exc time.sleep(min(4, 2 ** attempt)) @@ -3584,6 +3852,7 @@ def ollama_reply_with_thinking( context: str, fallback: str, use_history: bool = True, + model: str | None = None, ) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() @@ -3595,6 +3864,7 @@ def ollama_reply_with_thinking( context=context, fallback=fallback, use_history=use_history, + model=model, ) done.set() @@ -3627,6 +3897,7 @@ def open_ended_with_thinking( workloads: list[dict[str, Any]], history_lines: list[str], mode: str, + allow_tools: bool, ) -> str: result: dict[str, str] = {"reply": ""} done = threading.Event() @@ -3641,6 +3912,7 @@ def open_ended_with_thinking( workloads=workloads, history_lines=history_lines, mode=mode, + allow_tools=allow_tools, state=state, ) done.set() @@ -3766,39 +4038,24 @@ def sync_loop(token: str, room_id: str): extra = "VictoriaMetrics (PromQL result):\n" + rendered send_msg(token, rid, extra) continue - fallback = "I don't have enough data to answer that." - if cluster_query: - open_ended = ( - _is_subjective_query(cleaned_body) - or _knowledge_intent(cleaned_body) - or _is_overview_query(cleaned_body) - or _doc_intent(cleaned_body) + reply = open_ended_with_thinking( + token, + rid, + cleaned_body, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history[hist_key], + mode=mode if mode in ("fast", "deep") else "deep", + allow_tools=allow_tools, ) - if open_ended: - reply = open_ended_with_thinking( - token, - rid, - cleaned_body, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history[hist_key], - mode=mode if mode in ("fast", "deep") else "deep", - ) - else: - reply = ( - cluster_answer( - cleaned_body, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history[hist_key], - ) - or fallback - ) else: - reply = _non_cluster_reply(cleaned_body) + reply = _non_cluster_reply( + cleaned_body, + history_lines=history[hist_key], + mode=mode if mode in ("fast", "deep") else "deep", + ) send_msg(token, rid, reply) history[hist_key].append(f"Atlas: {reply}") history[hist_key] = history[hist_key][-80:]