atlasbot: refine open-ended reasoning pipeline

This commit is contained in:
Brad Stein 2026-01-27 21:02:20 -03:00
parent 9e06d7afc8
commit 34c91c6d08
2 changed files with 401 additions and 51 deletions

View File

@ -16,7 +16,7 @@ spec:
labels: labels:
app: atlasbot app: atlasbot
annotations: annotations:
checksum/atlasbot-configmap: manual-atlasbot-69 checksum/atlasbot-configmap: manual-atlasbot-70
vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms" vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"
@ -78,11 +78,11 @@ spec:
- name: BOT_USER - name: BOT_USER
value: atlasbot value: atlasbot
- name: BOT_MENTIONS - name: BOT_MENTIONS
value: atlasbot,aatlasbot value: atlasbot,aatlasbot,atlas_quick,atlas_smart
- name: OLLAMA_URL - name: OLLAMA_URL
value: http://ollama.ai.svc.cluster.local:11434 value: http://ollama.ai.svc.cluster.local:11434
- name: OLLAMA_MODEL - name: OLLAMA_MODEL
value: qwen2.5:14b-instruct-q4_0 value: qwen2.5:14b-instruct
- name: OLLAMA_TIMEOUT_SEC - name: OLLAMA_TIMEOUT_SEC
value: "600" value: "600"
- name: ATLASBOT_THINKING_INTERVAL_SEC - name: ATLASBOT_THINKING_INTERVAL_SEC

View File

@ -333,6 +333,19 @@ def _strip_bot_mention(text: str) -> str:
return cleaned or text.strip() return cleaned or text.strip()
def _detect_mode_from_body(body: str, *, default: str = "deep") -> str:
lower = normalize_query(body or "")
if "atlas_quick" in lower or "atlas-quick" in lower:
return "fast"
if "atlas_smart" in lower or "atlas-smart" in lower:
return "deep"
if lower.startswith("quick ") or lower.startswith("fast "):
return "fast"
if lower.startswith("smart ") or lower.startswith("deep "):
return "deep"
return default
# Matrix HTTP helper. # Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path url = (base or BASE) + path
@ -2420,6 +2433,300 @@ def _append_history_context(context: str, history_lines: list[str]) -> str:
return combined return combined
class ThoughtState:
def __init__(self, total_steps: int = 0):
self._lock = threading.Lock()
self.stage = "starting"
self.note = ""
self.step = 0
self.total_steps = total_steps
def update(self, stage: str, *, note: str = "", step: int | None = None) -> None:
with self._lock:
self.stage = stage
if note:
self.note = note
if step is not None:
self.step = step
def status_line(self) -> str:
with self._lock:
stage = self.stage
note = self.note
step = self.step
total = self.total_steps
step_part = f"{step}/{total}" if total else str(step) if step else ""
detail = f"Stage {step_part}: {stage}".strip()
if note:
return f"Still thinking ({detail}). Latest insight: {note}"
return f"Still thinking ({detail})."
def _ollama_json_call(prompt: str, *, context: str, retries: int = 2) -> dict[str, Any]:
system = (
"System: You are Atlas, a reasoning assistant. "
"Return strict JSON only (no code fences, no trailing commentary). "
"If you cannot comply, return {}. "
"Only use facts from the provided context. "
"If you make an inference, label it as 'inference' in the JSON."
)
last_exc: Exception | None = None
for attempt in range(max(1, retries + 1)):
try:
raw = _ollama_call(
("json", "internal"),
prompt,
context=context,
use_history=False,
system_override=system,
)
cleaned = _strip_code_fence(raw).strip()
if cleaned.startswith("{") and cleaned.endswith("}"):
return json.loads(cleaned)
last = json.loads(_strip_code_fence(cleaned))
if isinstance(last, dict):
return last
except Exception as exc: # noqa: BLE001
last_exc = exc
time.sleep(min(2, 2 ** attempt))
if last_exc:
return {}
return {}
def _fact_pack_lines(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]] | None,
) -> list[str]:
raw = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
lines: list[str] = []
for line in raw.splitlines():
trimmed = line.strip()
if not trimmed or trimmed.lower().startswith("facts"):
continue
lines.append(trimmed)
return lines
def _fact_pack_text(lines: list[str]) -> str:
labeled = [f"F{idx + 1}: {line}" for idx, line in enumerate(lines)]
return "Fact pack:\n" + "\n".join(labeled)
def _open_ended_system() -> str:
return (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Use ONLY the provided fact pack and recent chat as your evidence. "
"You may draw light inferences if you label them as such. "
"Write concise, human sentences, not a list. "
"If the question is subjective, share a light opinion grounded in facts. "
"If the question is ambiguous, pick a reasonable interpretation and state it briefly. "
"Avoid repeating the exact same observation as the last response if possible. "
"Do not invent numbers or facts. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100)."
)
def _candidate_note(candidate: dict[str, Any]) -> str:
claim = str(candidate.get("claim") or candidate.get("summary") or "")
return claim[:160] + ("" if len(claim) > 160 else "")
def _ensure_scores(answer: str) -> str:
text = answer.strip()
lines = [line for line in text.splitlines() if line.strip()]
has_relevance = any(line.lower().startswith("relevance:") for line in lines)
has_satisfaction = any(line.lower().startswith("satisfaction:") for line in lines)
has_confidence = any("confidence:" in line.lower() for line in lines)
if not has_confidence:
lines.append("Confidence: medium")
if not has_relevance:
lines.append("Relevance: 70")
if not has_satisfaction:
lines.append("Satisfaction: 70")
return "\n".join(lines)
def _open_ended_fast(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
state: ThoughtState | None = None,
) -> str:
if state:
state.update("synthesizing", step=2)
synthesis_prompt = (
"You are given a question and a fact pack. "
"Answer in 2-4 sentences, using only facts from the pack. "
"Pick one or two facts that best fit the question and explain why they matter. "
"If the question is subjective, add a light opinion grounded in those facts. "
"Do not list raw facts; speak naturally. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call(
("fast", "open"),
synthesis_prompt,
context=context,
use_history=False,
system_override=_open_ended_system(),
)
return _ensure_scores(reply)
def _interpret_open_question(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
) -> dict[str, Any]:
prompt_text = (
"Analyze the question against the fact pack. "
"Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\","
"\"notes\":\"...\"}. "
"Use only the fact pack."
)
context = _append_history_context(fact_pack, history_lines)
analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context)
if not isinstance(analysis, dict):
return {"focus": "cluster snapshot", "preference": "balanced", "notes": ""}
preference = analysis.get("preference") or "balanced"
if preference not in ("balanced", "novelty", "utilization", "stability", "risk"):
preference = "balanced"
analysis["preference"] = preference
analysis.setdefault("focus", "cluster snapshot")
analysis.setdefault("notes", "")
return analysis
def _select_insights(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
state: ThoughtState,
) -> list[dict[str, Any]]:
insight_prompt = (
"From the fact pack, select 3-5 candidate insights that could answer the question. "
"Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"],"
"\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\"}]}. "
"Use only the fact pack."
)
state.update("drafting candidates", step=2)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context)
insights = result.get("insights") if isinstance(result, dict) else None
if not isinstance(insights, list):
return []
cleaned: list[dict[str, Any]] = []
for item in insights:
if not isinstance(item, dict):
continue
if not item.get("summary") or not item.get("fact_ids"):
continue
cleaned.append(item)
state.update("drafting candidates", step=2, note=_candidate_note(item))
return cleaned
def _score_insight(insight: dict[str, Any], preference: str) -> float:
relevance = insight.get("relevance") if isinstance(insight.get("relevance"), (int, float)) else 0.0
novelty = insight.get("novelty") if isinstance(insight.get("novelty"), (int, float)) else 0.0
if preference == "novelty":
return 0.4 * relevance + 0.6 * novelty
if preference == "utilization":
return 0.7 * relevance + 0.3 * novelty
if preference == "stability":
return 0.7 * relevance + 0.3 * novelty
if preference == "risk":
return 0.6 * relevance + 0.4 * novelty
return 0.6 * relevance + 0.4 * novelty
def _open_ended_deep(
prompt: str,
*,
fact_pack: str,
fact_ids: set[str],
history_lines: list[str],
state: ThoughtState | None = None,
) -> str:
state = state or ThoughtState()
if not fact_ids:
return _ensure_scores("I don't have enough data to answer that.")
state.total_steps = 6
state.update("planning", step=1)
analysis = _interpret_open_question(prompt, fact_pack=fact_pack, history_lines=history_lines)
state.update("planning", step=1, note=str(analysis.get("focus") or ""))
candidates = _select_insights(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state)
state.update("verifying", step=3)
filtered: list[dict[str, Any]] = []
for cand in candidates:
cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else []
if cites and not all(cite in fact_ids for cite in cites):
continue
filtered.append(cand)
if not filtered:
filtered = candidates
preference = analysis.get("preference", "balanced")
ranked = sorted(filtered, key=lambda item: _score_insight(item, preference), reverse=True)
top = ranked[:2]
state.update("synthesizing", step=4)
synth_prompt = (
"Use the question, fact pack, and selected insights to craft a concise answer. "
"Write 2-4 sentences. Explain why the selected insights stand out. "
"If the question is subjective, include a light opinion grounded in facts. "
"Avoid repeating the same observation as the last response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}\n"
f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n"
f"Selected: {json.dumps(top, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call(
("deep", "open"),
synth_prompt,
context=context,
use_history=False,
system_override=_open_ended_system(),
)
state.update("done", step=6)
return _ensure_scores(reply)
def open_ended_answer(
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]],
history_lines: list[str],
mode: str,
state: ThoughtState | None = None,
) -> str:
lines = _fact_pack_lines(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads)
if not lines:
return _ensure_scores("I don't have enough data to answer that.")
fact_pack = _fact_pack_text(lines)
fact_ids = {f"F{i+1}" for i in range(len(lines))}
if mode == "fast":
return _open_ended_fast(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state)
return _open_ended_deep(prompt, fact_pack=fact_pack, fact_ids=fact_ids, history_lines=history_lines, state=state)
def _non_cluster_reply(prompt: str) -> str:
return _ensure_scores(
"I focus on the Atlas/Othrys cluster and don't have enough data to answer that."
)
# Internal HTTP endpoint for cluster answers (website uses this). # Internal HTTP endpoint for cluster answers (website uses this).
class _AtlasbotHandler(BaseHTTPRequestHandler): class _AtlasbotHandler(BaseHTTPRequestHandler):
server_version = "AtlasbotHTTP/1.0" server_version = "AtlasbotHTTP/1.0"
@ -2466,6 +2773,9 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
self._write_json(400, {"error": "missing_prompt"}) self._write_json(400, {"error": "missing_prompt"})
return return
cleaned = _strip_bot_mention(prompt) cleaned = _strip_bot_mention(prompt)
mode = str(payload.get("mode") or "fast").lower()
if mode not in ("fast", "deep"):
mode = "fast"
snapshot = _snapshot_state() snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live() inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot) workloads = _snapshot_workloads(snapshot)
@ -2491,34 +2801,30 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
) )
fallback = "I don't have enough data to answer that." fallback = "I don't have enough data to answer that."
if cluster_query: if cluster_query:
facts_answer = cluster_answer(
cleaned,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history_lines,
)
open_ended = _is_subjective_query(cleaned) or _knowledge_intent(cleaned) open_ended = _is_subjective_query(cleaned) or _knowledge_intent(cleaned)
if open_ended: if open_ended:
llm_context = _append_history_context(context, history_lines) answer = open_ended_answer(
answer = ollama_reply(
("http", "internal"),
cleaned, cleaned,
context=llm_context, inventory=inventory,
fallback=facts_answer or fallback, snapshot=snapshot,
use_history=False, workloads=workloads,
history_lines=history_lines,
mode=mode,
state=None,
) )
else: else:
answer = facts_answer or fallback answer = (
cluster_answer(
cleaned,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history_lines,
)
or fallback
)
else: else:
llm_prompt = cleaned answer = _non_cluster_reply(cleaned)
answer = ollama_reply(
("http", "internal"),
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
self._write_json(200, {"answer": answer}) self._write_json(200, {"answer": answer})
@ -2760,8 +3066,15 @@ def knowledge_summary(prompt: str, inventory: list[dict[str, Any]]) -> str:
summary = "\n".join(parts).strip() summary = "\n".join(parts).strip()
return _format_confidence(summary, "medium") if summary else "" return _format_confidence(summary, "medium") if summary else ""
def _ollama_call(hist_key, prompt: str, *, context: str, use_history: bool = True) -> str: def _ollama_call(
system = ( hist_key,
prompt: str,
*,
context: str,
use_history: bool = True,
system_override: str | None = None,
) -> str:
system = system_override or (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Be helpful, direct, and concise. " "Be helpful, direct, and concise. "
"Use the provided context and facts as your source of truth. " "Use the provided context and facts as your source of truth. "
@ -2877,6 +3190,47 @@ def ollama_reply_with_thinking(
thread.join(timeout=1) thread.join(timeout=1)
return result["reply"] or fallback or "Model backend is busy. Try again in a moment." return result["reply"] or fallback or "Model backend is busy. Try again in a moment."
def open_ended_with_thinking(
token: str,
room: str,
prompt: str,
*,
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
workloads: list[dict[str, Any]],
history_lines: list[str],
mode: str,
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
total_steps = 2 if mode == "fast" else 6
state = ThoughtState(total_steps=total_steps)
def worker():
result["reply"] = open_ended_answer(
prompt,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history_lines,
mode=mode,
state=state,
)
done.set()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
if not done.wait(2.0):
send_msg(token, room, "Thinking…")
heartbeat = max(10, THINKING_INTERVAL_SEC)
next_heartbeat = time.monotonic() + heartbeat
while not done.wait(max(0, next_heartbeat - time.monotonic())):
send_msg(token, room, state.status_line())
next_heartbeat += heartbeat
thread.join(timeout=1)
return result["reply"] or "Model backend is busy. Try again in a moment."
def sync_loop(token: str, room_id: str): def sync_loop(token: str, room_id: str):
since = None since = None
try: try:
@ -2931,6 +3285,7 @@ def sync_loop(token: str, room_id: str):
cleaned_body = _strip_bot_mention(body) cleaned_body = _strip_bot_mention(body)
lower_body = cleaned_body.lower() lower_body = cleaned_body.lower()
mode = _detect_mode_from_body(body, default="deep" if is_dm else "deep")
# Only do live cluster introspection in DMs. # Only do live cluster introspection in DMs.
allow_tools = is_dm allow_tools = is_dm
@ -2984,39 +3339,34 @@ def sync_loop(token: str, room_id: str):
fallback = "I don't have enough data to answer that." fallback = "I don't have enough data to answer that."
if cluster_query: if cluster_query:
facts_answer = cluster_answer(
cleaned_body,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history[hist_key],
)
open_ended = _is_subjective_query(cleaned_body) or _knowledge_intent(cleaned_body) open_ended = _is_subjective_query(cleaned_body) or _knowledge_intent(cleaned_body)
if open_ended: if open_ended:
llm_context = _append_history_context(context, history[hist_key]) reply = open_ended_with_thinking(
reply = ollama_reply_with_thinking(
token, token,
rid, rid,
hist_key,
cleaned_body, cleaned_body,
context=llm_context, inventory=inventory,
fallback=facts_answer or fallback, snapshot=snapshot,
use_history=False, workloads=workloads,
history_lines=history[hist_key],
mode=mode if mode in ("fast", "deep") else "deep",
) )
else: else:
reply = facts_answer or fallback reply = (
cluster_answer(
cleaned_body,
inventory=inventory,
snapshot=snapshot,
workloads=workloads,
history_lines=history[hist_key],
)
or fallback
)
else: else:
llm_prompt = cleaned_body reply = _non_cluster_reply(cleaned_body)
reply = ollama_reply_with_thinking(
token,
rid,
hist_key,
llm_prompt,
context=context,
fallback=fallback,
use_history=False,
)
send_msg(token, rid, reply) send_msg(token, rid, reply)
history[hist_key].append(f"Atlas: {reply}")
history[hist_key] = history[hist_key][-80:]
def login_with_retry(): def login_with_retry():
last_err = None last_err = None