atlasbot: refine open-ended reasoning

This commit is contained in:
Brad Stein 2026-01-27 21:52:07 -03:00
parent 38c8d08ab4
commit e97aaafed9
2 changed files with 378 additions and 34 deletions

View File

@ -16,7 +16,7 @@ spec:
labels:
app: atlasbot
annotations:
checksum/atlasbot-configmap: manual-atlasbot-73
checksum/atlasbot-configmap: manual-atlasbot-74
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"

View File

@ -138,6 +138,7 @@ CLUSTER_HINT_WORDS = {
"cluster",
"k8s",
"kubernetes",
"health",
"node",
"nodes",
"hardware",
@ -211,6 +212,7 @@ _OVERVIEW_HINT_WORDS = {
"explain",
"tell me about",
"what do you know",
"health",
}
_OLLAMA_LOCK = threading.Lock()
@ -1220,6 +1222,8 @@ def snapshot_metric_answer(
q = normalize_query(prompt)
metric = _detect_metric(q)
op = _detect_operation(q)
if op == "list" and metric in {"cpu", "ram", "net", "io"}:
op = "top"
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
only_workers = "worker" in q or "workers" in q
@ -1340,6 +1344,8 @@ def structured_answer(
tokens = _tokens(q)
op = _detect_operation(q)
metric = _detect_metric(q)
if op == "list" and metric in {"cpu", "ram", "net", "io"}:
op = "top"
entity = _detect_entity(q)
include_hw, exclude_hw = _detect_hardware_filters(q)
nodes_in_query = _extract_titan_nodes(q)
@ -1646,6 +1652,37 @@ def _is_insight_query(query: str) -> bool:
return False
_FOLLOWUP_HINTS = (
"what about",
"how about",
"and what",
"and how",
"tell me more",
"anything else",
"something else",
"that one",
"those",
"them",
"it",
"this",
"that",
"else",
"another",
"again",
)
def _is_followup_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
if any(hint in q for hint in _FOLLOWUP_HINTS):
return True
if len(q.split()) <= 3 and not any(word in q for word in _INSIGHT_HINT_WORDS):
return True
return False
def _is_subjective_query(query: str) -> bool:
q = normalize_query(query)
if not q:
@ -2541,6 +2578,12 @@ def _fact_pack_lines(
if not trimmed or trimmed.lower().startswith("facts"):
continue
lines.append(trimmed)
if _knowledge_intent(prompt) or _doc_intent(prompt) or _is_overview_query(prompt):
kb_titles = kb_retrieve_titles(prompt, limit=4)
if kb_titles:
for kb_line in kb_titles.splitlines():
if kb_line.strip():
lines.append(kb_line.strip())
return lines
@ -2549,12 +2592,194 @@ def _fact_pack_text(lines: list[str]) -> str:
return "Fact pack:\n" + "\n".join(labeled)
_ALLOWED_INSIGHT_TAGS = {
"availability",
"architecture",
"database",
"hardware",
"inventory",
"node_detail",
"os",
"pods",
"utilization",
"workloads",
"workers",
}
_DYNAMIC_TAGS = {"availability", "database", "pods", "utilization", "workloads"}
_INVENTORY_TAGS = {"hardware", "architecture", "inventory", "workers", "node_detail", "os"}
def _fact_line_tags(line: str) -> set[str]:
text = (line or "").lower()
tags: set[str] = set()
if any(key in text for key in ("nodes_total", "ready", "not_ready", "workers_ready", "workers_not_ready")):
tags.add("availability")
if "nodes_by_arch" in text or "arch " in text or "architecture" in text:
tags.add("architecture")
if any(key in text for key in ("rpi", "jetson", "amd64", "arm64", "non_raspberry_pi")):
tags.update({"hardware", "inventory"})
if "control_plane_nodes" in text or "worker_nodes" in text:
tags.add("inventory")
if any(key in text for key in ("hottest_", "node_usage", "cpu=", "ram=", "net=", "io=")):
tags.add("utilization")
if "postgres_" in text or "postgres connections" in text:
tags.add("database")
if "pods_" in text or "pod phases" in text:
tags.add("pods")
if "workloads" in text or "primary_node" in text:
tags.add("workloads")
if "node_details" in text:
tags.add("node_detail")
if "os mix" in text or "os" in text:
tags.add("os")
return tags & _ALLOWED_INSIGHT_TAGS
def _fact_pack_meta(lines: list[str]) -> dict[str, dict[str, Any]]:
meta: dict[str, dict[str, Any]] = {}
for idx, line in enumerate(lines):
fid = f"F{idx + 1}"
tags = sorted(_fact_line_tags(line))
meta[fid] = {"tags": tags}
return meta
def _history_tags(history_lines: list[str]) -> set[str]:
tags: set[str] = set()
for line in history_lines[-6:]:
tags.update(_fact_line_tags(line))
return tags & _ALLOWED_INSIGHT_TAGS
def _seed_insights(
lines: list[str],
fact_meta: dict[str, dict[str, Any]],
*,
limit: int = 6,
) -> list[dict[str, Any]]:
priority = [
"utilization",
"database",
"pods",
"workloads",
"availability",
"hardware",
"architecture",
"inventory",
]
seeds: list[dict[str, Any]] = []
used_tags: set[str] = set()
for tag in priority:
for idx, line in enumerate(lines):
fid = f"F{idx + 1}"
tags = set(fact_meta.get(fid, {}).get("tags") or [])
if tag not in tags or fid in {s["fact_ids"][0] for s in seeds}:
continue
summary = line.lstrip("- ").strip()
seeds.append(
{
"summary": summary,
"fact_ids": [fid],
"relevance": 0.5,
"novelty": 0.5,
"rationale": "seeded from fact pack",
"tags": sorted(tags),
}
)
used_tags.update(tags)
if len(seeds) >= limit:
return seeds
return seeds
def _insight_tags(insight: dict[str, Any], fact_meta: dict[str, dict[str, Any]]) -> set[str]:
tags: set[str] = set()
for fid in insight.get("fact_ids") if isinstance(insight.get("fact_ids"), list) else []:
tags.update(fact_meta.get(fid, {}).get("tags") or [])
raw_tags = insight.get("tags") if isinstance(insight.get("tags"), list) else []
tags.update(t for t in raw_tags if isinstance(t, str))
summary = insight.get("summary") or insight.get("claim") or ""
if isinstance(summary, str):
tags.update(_fact_line_tags(summary))
return tags & _ALLOWED_INSIGHT_TAGS
def _insight_score(
insight: dict[str, Any],
*,
preference: str,
prefer_tags: set[str],
avoid_tags: set[str],
history_tags: set[str],
fact_meta: dict[str, dict[str, Any]],
) -> float:
base = _score_insight(insight, preference)
tags = _insight_tags(insight, fact_meta)
if prefer_tags and tags:
base += 0.15 * len(tags & prefer_tags)
if avoid_tags and tags:
base -= 0.12 * len(tags & avoid_tags)
if history_tags and tags:
base -= 0.08 * len(tags & history_tags)
if preference == "novelty":
if tags & _DYNAMIC_TAGS:
base += 0.12
if tags & _INVENTORY_TAGS:
base -= 0.08
return base
def _select_diverse_insights(
candidates: list[dict[str, Any]],
*,
preference: str,
prefer_tags: set[str],
avoid_tags: set[str],
history_tags: set[str],
fact_meta: dict[str, dict[str, Any]],
count: int = 2,
) -> list[dict[str, Any]]:
scored: list[tuple[float, dict[str, Any]]] = []
for item in candidates:
tags = _insight_tags(item, fact_meta)
item["tags"] = sorted(tags)
score = _insight_score(
item,
preference=preference,
prefer_tags=prefer_tags,
avoid_tags=avoid_tags,
history_tags=history_tags,
fact_meta=fact_meta,
)
scored.append((score, item))
scored.sort(key=lambda pair: pair[0], reverse=True)
picked: list[dict[str, Any]] = []
used_tags: set[str] = set()
for _, item in scored:
tags = set(item.get("tags") or [])
if used_tags and tags and tags <= used_tags and len(picked) < count:
continue
picked.append(item)
used_tags.update(tags)
if len(picked) >= count:
break
if len(picked) < count:
for _, item in scored:
if item in picked:
continue
picked.append(item)
if len(picked) >= count:
break
return picked
def _open_ended_system() -> str:
return (
"System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
"Use ONLY the provided fact pack and recent chat as your evidence. "
"You may draw light inferences if you label them as such. "
"Write concise, human sentences, not a list. "
"Write concise, human sentences with a helpful, calm tone (not a list). "
"If the question is subjective, share a light opinion grounded in facts. "
"If the question is ambiguous, pick a reasonable interpretation and state it briefly. "
"Avoid repeating the exact same observation as the last response if possible. "
@ -2608,18 +2833,52 @@ def _open_ended_fast(
*,
fact_pack: str,
history_lines: list[str],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
tags_available: set[str],
history_tags: set[str],
state: ThoughtState | None = None,
) -> str:
if state:
state.update("synthesizing", step=2)
state.update("planning", step=1)
analysis = _interpret_open_question(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
tags_available=tags_available,
avoid_tags=history_tags,
state=state,
)
candidates = _select_insights(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
state=state or ThoughtState(),
analysis=analysis,
fact_lines=fact_lines,
fact_meta=fact_meta,
avoid_tags=history_tags,
)
prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)}
selected = _select_diverse_insights(
candidates,
preference=analysis.get("preference", "balanced"),
prefer_tags=prefer_tags,
avoid_tags=history_tags,
history_tags=history_tags,
fact_meta=fact_meta,
count=2,
)
if state:
state.update("synthesizing", step=3)
synthesis_prompt = (
"You are given a question and a fact pack. "
"Answer in 2-4 sentences, using only facts from the pack. "
"Pick one or two facts that best fit the question and explain why they matter. "
"If the question is subjective, add a light opinion grounded in those facts. "
"Do not list raw facts; speak naturally. "
"Use the question, fact pack, and selected insights to answer in 2-4 sentences. "
"Speak naturally, not as a list. "
"If the question is subjective, add a light opinion grounded in facts. "
"Avoid repeating the exact same observation as the most recent response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}"
f"Question: {prompt}\n"
f"Selected: {json.dumps(selected, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call_safe(
@ -2637,23 +2896,36 @@ def _interpret_open_question(
*,
fact_pack: str,
history_lines: list[str],
tags_available: set[str],
avoid_tags: set[str],
state: ThoughtState | None = None,
) -> dict[str, Any]:
tags_list = ", ".join(sorted(tags_available)) if tags_available else "none"
avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none"
prompt_text = (
"Analyze the question against the fact pack. "
"Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\","
"\"notes\":\"...\"}. "
"\"tags\":[\"...\"] ,\"notes\":\"...\"}. "
"If the question implies interesting/unique/unconventional/cool, set preference to novelty "
"and prefer dynamic tags (utilization/pods/database/availability) when possible. "
f"Use only these tags if relevant: {tags_list}. Avoid tags: {avoid_list}. "
"Use only the fact pack."
)
context = _append_history_context(fact_pack, history_lines)
analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context)
if not isinstance(analysis, dict):
return {"focus": "cluster snapshot", "preference": "balanced", "notes": ""}
analysis = {"focus": "cluster snapshot", "preference": "balanced", "notes": "", "tags": []}
preference = analysis.get("preference") or "balanced"
if preference not in ("balanced", "novelty", "utilization", "stability", "risk"):
preference = "balanced"
analysis["preference"] = preference
analysis.setdefault("focus", "cluster snapshot")
analysis.setdefault("notes", "")
tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else []
clean_tags = {t for t in tags if isinstance(t, str)}
analysis["tags"] = sorted(clean_tags & tags_available)
if state:
state.update("planning", step=1, note=str(analysis.get("focus") or ""))
return analysis
@ -2663,27 +2935,41 @@ def _select_insights(
fact_pack: str,
history_lines: list[str],
state: ThoughtState,
analysis: dict[str, Any],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
avoid_tags: set[str],
) -> list[dict[str, Any]]:
preferred_tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else []
prefer_list = ", ".join(sorted({t for t in preferred_tags if isinstance(t, str)}))
avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none"
available_list = ", ".join(sorted({t for t in _ALLOWED_INSIGHT_TAGS}))
insight_prompt = (
"From the fact pack, select 3-5 candidate insights that could answer the question. "
"Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"],"
"\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\"}]}. "
"Use only the fact pack."
"\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\",\"tags\":[\"...\"]}]}. "
f"Available tags: {available_list}. Prefer tags: {prefer_list or 'none'}. Avoid tags: {avoid_list}. "
"Use only the fact pack and provided tags."
)
state.update("drafting candidates", step=2)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context)
insights = result.get("insights") if isinstance(result, dict) else None
if not isinstance(insights, list):
return []
insights = []
cleaned: list[dict[str, Any]] = []
for item in insights:
if not isinstance(item, dict):
continue
if not item.get("summary") or not item.get("fact_ids"):
continue
tags = _insight_tags(item, fact_meta)
item["tags"] = sorted(tags)
cleaned.append(item)
state.update("drafting candidates", step=2, note=_candidate_note(item))
seeds = _seed_insights(fact_lines, fact_meta)
for seed in seeds:
cleaned.append(seed)
return cleaned
@ -2707,18 +2993,36 @@ def _open_ended_deep(
fact_pack: str,
fact_ids: set[str],
history_lines: list[str],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
tags_available: set[str],
history_tags: set[str],
state: ThoughtState | None = None,
) -> str:
state = state or ThoughtState()
if not fact_ids:
return _ensure_scores("I don't have enough data to answer that.")
state.total_steps = 6
state.update("planning", step=1)
analysis = _interpret_open_question(prompt, fact_pack=fact_pack, history_lines=history_lines)
state.update("planning", step=1, note=str(analysis.get("focus") or ""))
state.total_steps = 7
analysis = _interpret_open_question(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
tags_available=tags_available,
avoid_tags=history_tags,
state=state,
)
candidates = _select_insights(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state)
state.update("verifying", step=3)
candidates = _select_insights(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
state=state,
analysis=analysis,
fact_lines=fact_lines,
fact_meta=fact_meta,
avoid_tags=history_tags,
)
state.update("verifying", step=3, note="scoring insights")
filtered: list[dict[str, Any]] = []
for cand in candidates:
cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else []
@ -2729,9 +3033,17 @@ def _open_ended_deep(
filtered = candidates
preference = analysis.get("preference", "balanced")
ranked = sorted(filtered, key=lambda item: _score_insight(item, preference), reverse=True)
top = ranked[:2]
state.update("synthesizing", step=4)
prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)}
top = _select_diverse_insights(
filtered,
preference=preference,
prefer_tags=prefer_tags,
avoid_tags=history_tags,
history_tags=history_tags,
fact_meta=fact_meta,
count=2,
)
state.update("synthesizing", step=4, note="composing response")
synth_prompt = (
"Use the question, fact pack, and selected insights to craft a concise answer. "
"Write 2-4 sentences. Explain why the selected insights stand out. "
@ -2740,6 +3052,7 @@ def _open_ended_deep(
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}\n"
f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n"
f"Recent tags: {', '.join(sorted(history_tags)) if history_tags else 'none'}\n"
f"Selected: {json.dumps(top, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
@ -2750,7 +3063,7 @@ def _open_ended_deep(
fallback="I don't have enough data to answer that.",
system_override=_open_ended_system(),
)
state.update("done", step=6)
state.update("done", step=7)
return _ensure_scores(reply)
@ -2769,9 +3082,31 @@ def open_ended_answer(
return _ensure_scores("I don't have enough data to answer that.")
fact_pack = _fact_pack_text(lines)
fact_ids = {f"F{i+1}" for i in range(len(lines))}
fact_meta = _fact_pack_meta(lines)
tags_available = {tag for entry in fact_meta.values() for tag in entry.get("tags", [])}
history_tags = _history_tags(history_lines)
if mode == "fast":
return _open_ended_fast(prompt, fact_pack=fact_pack, history_lines=history_lines, state=state)
return _open_ended_deep(prompt, fact_pack=fact_pack, fact_ids=fact_ids, history_lines=history_lines, state=state)
return _open_ended_fast(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
fact_lines=lines,
fact_meta=fact_meta,
tags_available=tags_available,
history_tags=history_tags,
state=state,
)
return _open_ended_deep(
prompt,
fact_pack=fact_pack,
fact_ids=fact_ids,
history_lines=history_lines,
fact_lines=lines,
fact_meta=fact_meta,
tags_available=tags_available,
history_tags=history_tags,
state=state,
)
def _non_cluster_reply(prompt: str) -> str:
@ -2826,9 +3161,9 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
self._write_json(400, {"error": "missing_prompt"})
return
cleaned = _strip_bot_mention(prompt)
mode = str(payload.get("mode") or "fast").lower()
mode = str(payload.get("mode") or "deep").lower()
if mode not in ("fast", "deep"):
mode = "fast"
mode = "deep"
snapshot = _snapshot_state()
inventory = _snapshot_inventory(snapshot) or node_inventory_live()
workloads = _snapshot_workloads(snapshot)
@ -2839,11 +3174,12 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
inventory=inventory,
workloads=workloads,
)
followup = _is_followup_query(cleaned)
cluster_query = (
_is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
or history_cluster
or _knowledge_intent(cleaned)
or _is_subjective_query(cleaned)
or (history_cluster and followup)
)
context = ""
if cluster_query:
@ -2857,7 +3193,11 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
)
fallback = "I don't have enough data to answer that."
if cluster_query:
open_ended = _is_subjective_query(cleaned) or _knowledge_intent(cleaned)
open_ended = (
_is_subjective_query(cleaned)
or _knowledge_intent(cleaned)
or _is_overview_query(cleaned)
)
if open_ended:
answer = open_ended_answer(
cleaned,
@ -3068,7 +3408,6 @@ def _knowledge_intent(prompt: str) -> bool:
"summary",
"describe",
"explain",
"what is",
)
)
@ -3269,7 +3608,7 @@ def open_ended_with_thinking(
) -> str:
result: dict[str, str] = {"reply": ""}
done = threading.Event()
total_steps = 2 if mode == "fast" else 6
total_steps = 4 if mode == "fast" else 7
state = ThoughtState(total_steps=total_steps)
def worker():
@ -3382,11 +3721,12 @@ def sync_loop(token: str, room_id: str):
inventory=inventory,
workloads=workloads,
)
followup = _is_followup_query(cleaned_body)
cluster_query = (
_is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
or history_cluster
or _knowledge_intent(cleaned_body)
or _is_subjective_query(cleaned_body)
or (history_cluster and followup)
)
context = ""
if cluster_query:
@ -3407,7 +3747,11 @@ def sync_loop(token: str, room_id: str):
fallback = "I don't have enough data to answer that."
if cluster_query:
open_ended = _is_subjective_query(cleaned_body) or _knowledge_intent(cleaned_body)
open_ended = (
_is_subjective_query(cleaned_body)
or _knowledge_intent(cleaned_body)
or _is_overview_query(cleaned_body)
)
if open_ended:
reply = open_ended_with_thinking(
token,