diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index d8ce3ee..cc628dd 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-69 + checksum/atlasbot-configmap: manual-atlasbot-70 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" @@ -78,11 +78,11 @@ spec: - name: BOT_USER value: atlasbot - name: BOT_MENTIONS - value: atlasbot,aatlasbot + value: atlasbot,aatlasbot,atlas_quick,atlas_smart - name: OLLAMA_URL value: http://ollama.ai.svc.cluster.local:11434 - name: OLLAMA_MODEL - value: qwen2.5:14b-instruct-q4_0 + value: qwen2.5:14b-instruct - name: OLLAMA_TIMEOUT_SEC value: "600" - name: ATLASBOT_THINKING_INTERVAL_SEC diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 141b971..aa7e614 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -333,6 +333,19 @@ def _strip_bot_mention(text: str) -> str: return cleaned or text.strip() +def _detect_mode_from_body(body: str, *, default: str = "deep") -> str: + lower = normalize_query(body or "") + if "atlas_quick" in lower or "atlas-quick" in lower: + return "fast" + if "atlas_smart" in lower or "atlas-smart" in lower: + return "deep" + if lower.startswith("quick ") or lower.startswith("fast "): + return "fast" + if lower.startswith("smart ") or lower.startswith("deep "): + return "deep" + return default + + # Matrix HTTP helper. def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path @@ -2420,6 +2433,300 @@ def _append_history_context(context: str, history_lines: list[str]) -> str: return combined +class ThoughtState: + def __init__(self, total_steps: int = 0): + self._lock = threading.Lock() + self.stage = "starting" + self.note = "" + self.step = 0 + self.total_steps = total_steps + + def update(self, stage: str, *, note: str = "", step: int | None = None) -> None: + with self._lock: + self.stage = stage + if note: + self.note = note + if step is not None: + self.step = step + + def status_line(self) -> str: + with self._lock: + stage = self.stage + note = self.note + step = self.step + total = self.total_steps + step_part = f"{step}/{total}" if total else str(step) if step else "" + detail = f"Stage {step_part}: {stage}".strip() + if note: + return f"Still thinking ({detail}). Latest insight: {note}" + return f"Still thinking ({detail})." + + +def _ollama_json_call(prompt: str, *, context: str, retries: int = 2) -> dict[str, Any]: + system = ( + "System: You are Atlas, a reasoning assistant. " + "Return strict JSON only (no code fences, no trailing commentary). " + "If you cannot comply, return {}. " + "Only use facts from the provided context. " + "If you make an inference, label it as 'inference' in the JSON." + ) + last_exc: Exception | None = None + for attempt in range(max(1, retries + 1)): + try: + raw = _ollama_call( + ("json", "internal"), + prompt, + context=context, + use_history=False, + system_override=system, + ) + cleaned = _strip_code_fence(raw).strip() + if cleaned.startswith("{") and cleaned.endswith("}"): + return json.loads(cleaned) + last = json.loads(_strip_code_fence(cleaned)) + if isinstance(last, dict): + return last + except Exception as exc: # noqa: BLE001 + last_exc = exc + time.sleep(min(2, 2 ** attempt)) + if last_exc: + return {} + return {} + + +def _fact_pack_lines( + prompt: str, + *, + inventory: list[dict[str, Any]], + snapshot: dict[str, Any] | None, + workloads: list[dict[str, Any]] | None, +) -> list[str]: + raw = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) + lines: list[str] = [] + for line in raw.splitlines(): + trimmed = line.strip() + if not trimmed or trimmed.lower().startswith("facts"): + continue + lines.append(trimmed) + return lines + + +def _fact_pack_text(lines: list[str]) -> str: + labeled = [f"F{idx + 1}: {line}" for idx, line in enumerate(lines)] + return "Fact pack:\n" + "\n".join(labeled) + + +def _open_ended_system() -> str: + return ( + "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " + "Use ONLY the provided fact pack and recent chat as your evidence. " + "You may draw light inferences if you label them as such. " + "Write concise, human sentences, not a list. " + "If the question is subjective, share a light opinion grounded in facts. " + "If the question is ambiguous, pick a reasonable interpretation and state it briefly. " + "Avoid repeating the exact same observation as the last response if possible. " + "Do not invent numbers or facts. " + "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100)." + ) + + +def _candidate_note(candidate: dict[str, Any]) -> str: + claim = str(candidate.get("claim") or candidate.get("summary") or "") + return claim[:160] + ("…" if len(claim) > 160 else "") + + +def _ensure_scores(answer: str) -> str: + text = answer.strip() + lines = [line for line in text.splitlines() if line.strip()] + has_relevance = any(line.lower().startswith("relevance:") for line in lines) + has_satisfaction = any(line.lower().startswith("satisfaction:") for line in lines) + has_confidence = any("confidence:" in line.lower() for line in lines) + if not has_confidence: + lines.append("Confidence: medium") + if not has_relevance: + lines.append("Relevance: 70") + if not has_satisfaction: + lines.append("Satisfaction: 70") + return "\n".join(lines) + + +def _open_ended_fast( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + state: ThoughtState | None = None, +) -> str: + if state: + state.update("synthesizing", step=2) + synthesis_prompt = ( + "You are given a question and a fact pack. " + "Answer in 2-4 sentences, using only facts from the pack. " + "Pick one or two facts that best fit the question and explain why they matter. " + "If the question is subjective, add a light opinion grounded in those facts. " + "Do not list raw facts; speak naturally. " + "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" + f"Question: {prompt}" + ) + context = _append_history_context(fact_pack, history_lines) + reply = _ollama_call( + ("fast", "open"), + synthesis_prompt, + context=context, + use_history=False, + system_override=_open_ended_system(), + ) + return _ensure_scores(reply) + + +def _interpret_open_question( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], +) -> dict[str, Any]: + prompt_text = ( + "Analyze the question against the fact pack. " + "Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\"," + "\"notes\":\"...\"}. " + "Use only the fact pack." + ) + context = _append_history_context(fact_pack, history_lines) + analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context) + if not isinstance(analysis, dict): + return {"focus": "cluster snapshot", "preference": "balanced", "notes": ""} + preference = analysis.get("preference") or "balanced" + if preference not in ("balanced", "novelty", "utilization", "stability", "risk"): + preference = "balanced" + analysis["preference"] = preference + analysis.setdefault("focus", "cluster snapshot") + analysis.setdefault("notes", "") + return analysis + + +def _select_insights( + prompt: str, + *, + fact_pack: str, + history_lines: list[str], + state: ThoughtState, +) -> list[dict[str, Any]]: + insight_prompt = ( + "From the fact pack, select 3-5 candidate insights that could answer the question. " + "Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"]," + "\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\"}]}. " + "Use only the fact pack." + ) + state.update("drafting candidates", step=2) + context = _append_history_context(fact_pack, history_lines) + result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context) + insights = result.get("insights") if isinstance(result, dict) else None + if not isinstance(insights, list): + return [] + cleaned: list[dict[str, Any]] = [] + for item in insights: + if not isinstance(item, dict): + continue + if not item.get("summary") or not item.get("fact_ids"): + continue + cleaned.append(item) + state.update("drafting candidates", step=2, note=_candidate_note(item)) + return cleaned + + +def _score_insight(insight: dict[str, Any], preference: str) -> float: + relevance = insight.get("relevance") if isinstance(insight.get("relevance"), (int, float)) else 0.0 + novelty = insight.get("novelty") if isinstance(insight.get("novelty"), (int, float)) else 0.0 + if preference == "novelty": + return 0.4 * relevance + 0.6 * novelty + if preference == "utilization": + return 0.7 * relevance + 0.3 * novelty + if preference == "stability": + return 0.7 * relevance + 0.3 * novelty + if preference == "risk": + return 0.6 * relevance + 0.4 * novelty + return 0.6 * relevance + 0.4 * novelty + + +def _open_ended_deep( + prompt: str, + *, + fact_pack: str, + fact_ids: set[str], + history_lines: list[str], + state: ThoughtState | None = None, +) -> str: + state = state or ThoughtState() + if not fact_ids: + return _ensure_scores("I don't have enough data to answer that.") + state.total_steps = 6 + state.update("planning", step=1) + analysis = _interpret_open_question(prompt, fact_pack=fact_pack, history_lines=history_lines) + state.update("planning", step=1, note=str(analysis.get("focus") or "")) + + candidates = _select_insights(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state) + state.update("verifying", step=3) + filtered: list[dict[str, Any]] = [] + for cand in candidates: + cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else [] + if cites and not all(cite in fact_ids for cite in cites): + continue + filtered.append(cand) + if not filtered: + filtered = candidates + + preference = analysis.get("preference", "balanced") + ranked = sorted(filtered, key=lambda item: _score_insight(item, preference), reverse=True) + top = ranked[:2] + state.update("synthesizing", step=4) + synth_prompt = ( + "Use the question, fact pack, and selected insights to craft a concise answer. " + "Write 2-4 sentences. Explain why the selected insights stand out. " + "If the question is subjective, include a light opinion grounded in facts. " + "Avoid repeating the same observation as the last response if possible. " + "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n" + f"Question: {prompt}\n" + f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n" + f"Selected: {json.dumps(top, ensure_ascii=False)}" + ) + context = _append_history_context(fact_pack, history_lines) + reply = _ollama_call( + ("deep", "open"), + synth_prompt, + context=context, + use_history=False, + system_override=_open_ended_system(), + ) + state.update("done", step=6) + return _ensure_scores(reply) + + +def open_ended_answer( + prompt: str, + *, + inventory: list[dict[str, Any]], + snapshot: dict[str, Any] | None, + workloads: list[dict[str, Any]], + history_lines: list[str], + mode: str, + state: ThoughtState | None = None, +) -> str: + lines = _fact_pack_lines(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) + if not lines: + return _ensure_scores("I don't have enough data to answer that.") + fact_pack = _fact_pack_text(lines) + fact_ids = {f"F{i+1}" for i in range(len(lines))} + if mode == "fast": + return _open_ended_fast(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state) + return _open_ended_deep(prompt, fact_pack=fact_pack, fact_ids=fact_ids, history_lines=history_lines, state=state) + + +def _non_cluster_reply(prompt: str) -> str: + return _ensure_scores( + "I focus on the Atlas/Othrys cluster and don't have enough data to answer that." + ) + + # Internal HTTP endpoint for cluster answers (website uses this). class _AtlasbotHandler(BaseHTTPRequestHandler): server_version = "AtlasbotHTTP/1.0" @@ -2466,6 +2773,9 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): self._write_json(400, {"error": "missing_prompt"}) return cleaned = _strip_bot_mention(prompt) + mode = str(payload.get("mode") or "fast").lower() + if mode not in ("fast", "deep"): + mode = "fast" snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) @@ -2491,34 +2801,30 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): ) fallback = "I don't have enough data to answer that." if cluster_query: - facts_answer = cluster_answer( - cleaned, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history_lines, - ) open_ended = _is_subjective_query(cleaned) or _knowledge_intent(cleaned) if open_ended: - llm_context = _append_history_context(context, history_lines) - answer = ollama_reply( - ("http", "internal"), + answer = open_ended_answer( cleaned, - context=llm_context, - fallback=facts_answer or fallback, - use_history=False, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history_lines, + mode=mode, + state=None, ) else: - answer = facts_answer or fallback + answer = ( + cluster_answer( + cleaned, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history_lines, + ) + or fallback + ) else: - llm_prompt = cleaned - answer = ollama_reply( - ("http", "internal"), - llm_prompt, - context=context, - fallback=fallback, - use_history=False, - ) + answer = _non_cluster_reply(cleaned) self._write_json(200, {"answer": answer}) @@ -2760,8 +3066,15 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str: summary = "\n".join(parts).strip() return _format_confidence(summary, "medium") if summary else "" -def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str: - system = ( +def _ollama_call( + hist_key, + prompt: str, + *, + context: str, + use_history: bool = True, + system_override: str | None = None, +) -> str: + system = system_override or ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " "Use the provided context and facts as your source of truth. " @@ -2877,6 +3190,47 @@ def ollama_reply_with_thinking( thread.join(timeout=1) return result["reply"] or fallback or "Model backend is busy. Try again in a moment." + +def open_ended_with_thinking( + token: str, + room: str, + prompt: str, + *, + inventory: list[dict[str, Any]], + snapshot: dict[str, Any] | None, + workloads: list[dict[str, Any]], + history_lines: list[str], + mode: str, +) -> str: + result: dict[str, str] = {"reply": ""} + done = threading.Event() + total_steps = 2 if mode == "fast" else 6 + state = ThoughtState(total_steps=total_steps) + + def worker(): + result["reply"] = open_ended_answer( + prompt, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history_lines, + mode=mode, + state=state, + ) + done.set() + + thread = threading.Thread(target=worker, daemon=True) + thread.start() + if not done.wait(2.0): + send_msg(token, room, "Thinking…") + heartbeat = max(10, THINKING_INTERVAL_SEC) + next_heartbeat = time.monotonic() + heartbeat + while not done.wait(max(0, next_heartbeat - time.monotonic())): + send_msg(token, room, state.status_line()) + next_heartbeat += heartbeat + thread.join(timeout=1) + return result["reply"] or "Model backend is busy. Try again in a moment." + def sync_loop(token: str, room_id: str): since = None try: @@ -2931,6 +3285,7 @@ def sync_loop(token: str, room_id: str): cleaned_body = _strip_bot_mention(body) lower_body = cleaned_body.lower() + mode = _detect_mode_from_body(body, default="deep" if is_dm else "deep") # Only do live cluster introspection in DMs. allow_tools = is_dm @@ -2984,39 +3339,34 @@ def sync_loop(token: str, room_id: str): fallback = "I don't have enough data to answer that." if cluster_query: - facts_answer = cluster_answer( - cleaned_body, - inventory=inventory, - snapshot=snapshot, - workloads=workloads, - history_lines=history[hist_key], - ) open_ended = _is_subjective_query(cleaned_body) or _knowledge_intent(cleaned_body) if open_ended: - llm_context = _append_history_context(context, history[hist_key]) - reply = ollama_reply_with_thinking( + reply = open_ended_with_thinking( token, rid, - hist_key, cleaned_body, - context=llm_context, - fallback=facts_answer or fallback, - use_history=False, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history[hist_key], + mode=mode if mode in ("fast", "deep") else "deep", ) else: - reply = facts_answer or fallback + reply = ( + cluster_answer( + cleaned_body, + inventory=inventory, + snapshot=snapshot, + workloads=workloads, + history_lines=history[hist_key], + ) + or fallback + ) else: - llm_prompt = cleaned_body - reply = ollama_reply_with_thinking( - token, - rid, - hist_key, - llm_prompt, - context=context, - fallback=fallback, - use_history=False, - ) + reply = _non_cluster_reply(cleaned_body) send_msg(token, rid, reply) + history[hist_key].append(f"Atlas: {reply}") + history[hist_key] = history[hist_key][-80:] def login_with_retry(): last_err = None