atlasbot: overhaul open-ended reasoning

This commit is contained in:
Brad Stein 2026-01-27 22:22:50 -03:00
parent 029e4d4ca6
commit 868075426c
2 changed files with 253 additions and 446 deletions

View File

@ -16,7 +16,7 @@ spec:
labels: labels:
app: atlasbot app: atlasbot
annotations: annotations:
checksum/atlasbot-configmap: manual-atlasbot-74 checksum/atlasbot-configmap: manual-atlasbot-75
vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "comms" vault.hashicorp.com/role: "comms"
vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret"

View File

@ -198,6 +198,8 @@ _INSIGHT_HINT_WORDS = {
"unique", "unique",
"notable", "notable",
"coolest", "coolest",
"risk",
"risky",
"favorite", "favorite",
"favourite", "favourite",
"trivia", "trivia",
@ -1641,17 +1643,6 @@ def _hottest_summary_line(metrics: dict[str, Any]) -> str:
return "Hot spots: " + "; ".join(parts) + "." return "Hot spots: " + "; ".join(parts) + "."
def _is_insight_query(query: str) -> bool:
q = normalize_query(query)
if not q:
return False
if any(word in q for word in _INSIGHT_HINT_WORDS):
return True
if "most" in q and any(word in q for word in ("unusual", "odd", "weird", "unconventional")):
return True
return False
_FOLLOWUP_HINTS = ( _FOLLOWUP_HINTS = (
"what about", "what about",
"how about", "how about",
@ -1724,198 +1715,6 @@ def _doc_intent(query: str) -> bool:
) )
def _insight_candidates(
inventory: list[dict[str, Any]],
snapshot: dict[str, Any] | None,
) -> list[tuple[str, str, str]]:
metrics = _snapshot_metrics(snapshot)
candidates: list[tuple[str, str, str]] = []
nodes_line = _nodes_summary_line(inventory, snapshot)
if nodes_line and "not ready" in nodes_line.lower():
candidates.append(("availability", nodes_line, "high"))
hottest = metrics.get("hottest_nodes") if isinstance(metrics.get("hottest_nodes"), dict) else {}
if hottest:
def _hot_node(entry: dict[str, Any]) -> str:
if not isinstance(entry, dict):
return ""
return (
entry.get("node")
or entry.get("label")
or (entry.get("metric") or {}).get("node")
or ""
)
cpu = hottest.get("cpu") if isinstance(hottest.get("cpu"), dict) else {}
cpu_node = _hot_node(cpu)
if cpu_node and cpu.get("value") is not None:
value_fmt = _format_metric_value(str(cpu.get("value")), percent=True)
candidates.append(("cpu", f"The busiest CPU right now is {cpu_node} at about {value_fmt}.", "high"))
ram = hottest.get("ram") if isinstance(hottest.get("ram"), dict) else {}
ram_node = _hot_node(ram)
if ram_node and ram.get("value") is not None:
value_fmt = _format_metric_value(str(ram.get("value")), percent=True)
candidates.append(("ram", f"RAM usage peaks on {ram_node} at about {value_fmt}.", "high"))
postgres_line = _postgres_summary_line(metrics)
if postgres_line:
candidates.append(("postgres", postgres_line, "high"))
hardware_insight = _hardware_insight(inventory)
if hardware_insight:
candidates.append(("hardware", hardware_insight, "medium"))
pods_line = _pods_summary_line(metrics)
if pods_line:
candidates.append(("pods", pods_line, "high"))
return candidates
def _hardware_insight(inventory: list[dict[str, Any]]) -> str:
if not inventory:
return ""
groups = _group_nodes(inventory)
jetsons = groups.get("jetson") or []
rpi5 = groups.get("rpi5") or []
rpi4 = groups.get("rpi4") or []
amd64 = groups.get("amd64") or []
parts: list[str] = []
if rpi5:
parts.append(f"rpi5={len(rpi5)}")
if rpi4:
parts.append(f"rpi4={len(rpi4)}")
if jetsons:
jetson_names = ", ".join(jetsons[:2])
parts.append(f"jetson={len(jetsons)} ({jetson_names})")
if amd64:
parts.append(f"amd64={len(amd64)}")
return ", ".join(parts)
def _recent_insight_keys(history_lines: list[str]) -> set[str]:
used: set[str] = set()
for line in history_lines[-10:]:
lower = normalize_query(line)
if not lower:
continue
if "postgres" in lower or "connections" in lower:
used.add("postgres")
if "atlas mixes" in lower or "hardware" in lower or "rpi" in lower or "jetson" in lower:
used.add("hardware")
if "busiest cpu" in lower or "cpu right now" in lower or "cpu " in lower:
used.add("cpu")
if "ram usage" in lower or "memory" in lower:
used.add("ram")
if "pods" in lower:
used.add("pods")
if "not ready" in lower:
used.add("availability")
return used
def _select_insight(
prompt: str,
candidates: list[tuple[str, str, str]],
*,
used_keys: set[str] | None = None,
) -> tuple[str, str, str] | None:
if not candidates:
return None
used = used_keys or set()
q = normalize_query(prompt)
prefer_keys: list[str] = []
if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")):
prefer_keys.extend(["hardware", "availability"])
if any(word in q for word in ("coolest", "favorite", "favourite", "trivia", "fun")):
prefer_keys.extend(["hardware", "cpu", "ram"])
if "interesting" in q and "most interesting" not in q:
prefer_keys.extend(["hardware", "postgres", "cpu", "ram"])
avoid_used = any(word in q for word in ("another", "else", "different", "other")) or "most interesting" in q
if any(word in q for word in ("another", "else", "different", "other")) and len(candidates) > 1:
for candidate in candidates:
if candidate[0] not in used:
return candidate
return candidates[1]
if prefer_keys:
for prefer in prefer_keys:
for key, text, conf in candidates:
if key == prefer and (not avoid_used or key not in used):
return key, text, conf
for prefer in prefer_keys:
for key, text, conf in candidates:
if key == prefer:
return key, text, conf
if used and avoid_used:
for candidate in candidates:
if candidate[0] not in used:
return candidate
return candidates[0]
def _format_insight_text(key: str, text: str) -> str:
cleaned = text.strip().rstrip(".")
if not cleaned:
return ""
if key == "hardware":
counts = (
cleaned.replace("Hardware mix includes ", "")
.replace("Atlas mixes tiny ", "")
.replace("Atlas mixes ", "")
.replace("which is unusual for a homelab cluster", "")
.strip()
.strip(".")
)
has_jetson = "jetson=" in counts
has_amd64 = "amd64=" in counts
detail = f"mixed hardware stack ({counts})"
if has_jetson and has_amd64:
flavor = "It blends low-power Pis with Jetson accelerators and a couple of AMD64 boxes."
elif has_jetson:
flavor = "It pairs low-power Pis with Jetson accelerators for edge and AI workloads."
elif has_amd64:
flavor = "It mixes low-power Pis with a couple of heavier AMD64 nodes."
else:
flavor = "It is a pretty uniform hardware stack, which is rare for a homelab."
return f"{detail}. {flavor}"
if key == "postgres":
detail = cleaned.replace("Postgres is at ", "")
return f"Postgres is at {detail}; that feels like healthy, steady load rather than strain."
if key == "pods":
detail = cleaned.replace("There are ", "")
return f"Pods look steady ({detail}); nothing looks stuck or unhealthy."
if key == "availability":
return cleaned + " That is the kind of stability I like to see."
if key in ("cpu", "ram"):
suffix = (
" If you're chasing hotspots, that's the node I'd watch first."
if key == "cpu"
else " That box is carrying the heaviest memory load right now."
)
return cleaned + "." + suffix
return cleaned + "."
def _insight_prefix(prompt: str) -> str:
q = normalize_query(prompt)
if "coolest" in q:
return "If I had to pick the coolest detail, I'd say "
if "favorite" in q or "favourite" in q:
return "My favorite detail is "
if "trivia" in q:
return "A bit of trivia I like: "
if "most interesting" in q:
return "The most interesting detail to me is "
if any(word in q for word in ("another", "else", "different", "other")):
return "Another interesting detail: "
if any(word in q for word in ("unconventional", "weird", "odd", "unique", "surprising")):
return "What stands out to me is that "
if any(word in q for word in ("interesting", "notable", "fun", "cool")):
return "One thing I'd call out is "
return ""
def cluster_overview_answer( def cluster_overview_answer(
prompt: str, prompt: str,
*, *,
@ -2784,7 +2583,7 @@ def _open_ended_system() -> str:
"If the question is ambiguous, pick a reasonable interpretation and state it briefly. " "If the question is ambiguous, pick a reasonable interpretation and state it briefly. "
"Avoid repeating the exact same observation as the last response if possible. " "Avoid repeating the exact same observation as the last response if possible. "
"Do not invent numbers or facts. " "Do not invent numbers or facts. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100)." "End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), HallucinationRisk (low|medium|high)."
) )
@ -2809,263 +2608,284 @@ def _ollama_call_safe(
def _candidate_note(candidate: dict[str, Any]) -> str: def _candidate_note(candidate: dict[str, Any]) -> str:
claim = str(candidate.get("claim") or candidate.get("summary") or "") claim = str(candidate.get("focus") or candidate.get("answer") or "")
return claim[:160] + ("" if len(claim) > 160 else "") return claim[:160] + ("" if len(claim) > 160 else "")
def _ensure_scores(answer: str) -> str: def _ensure_scores(answer: str) -> str:
text = answer.strip() text = answer.strip()
lines = [line for line in text.splitlines() if line.strip()] lines = [line for line in text.splitlines() if line.strip()]
has_relevance = any(line.lower().startswith("relevance:") for line in lines) has_relevance = any(line.lower().startswith("relevance") for line in lines)
has_satisfaction = any(line.lower().startswith("satisfaction:") for line in lines) has_satisfaction = any(line.lower().startswith("satisfaction") for line in lines)
has_confidence = any("confidence:" in line.lower() for line in lines) has_confidence = any(line.lower().startswith("confidence") for line in lines)
has_risk = any(line.lower().startswith("hallucinationrisk") for line in lines)
if not has_confidence: if not has_confidence:
lines.append("Confidence: medium") lines.append("Confidence: medium")
if not has_relevance: if not has_relevance:
lines.append("Relevance: 70") lines.append("Relevance: 70")
if not has_satisfaction: if not has_satisfaction:
lines.append("Satisfaction: 70") lines.append("Satisfaction: 70")
if not has_risk:
lines.append("HallucinationRisk: low")
return "\n".join(lines) return "\n".join(lines)
def _open_ended_plan(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
count: int,
state: ThoughtState | None,
) -> list[dict[str, Any]]:
if state:
state.update("planning", step=1, note="mapping angles")
count = max(1, count)
prompt_text = (
"Analyze the question and propose up to "
f"{count} distinct answer angles that can be supported by the fact pack. "
"Keep them diverse (e.g., metrics, hardware, workload placement, recent changes). "
"If the question is subjective, propose at least one angle that surfaces a standout detail. "
"Return JSON: {\"angles\":[{\"focus\":\"...\",\"reason\":\"...\",\"priority\":1-5}]}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context)
angles = result.get("angles") if isinstance(result, dict) else None
cleaned: list[dict[str, Any]] = []
seen: set[str] = set()
if isinstance(angles, list):
for item in angles:
if not isinstance(item, dict):
continue
focus = str(item.get("focus") or "").strip()
if not focus or focus.lower() in seen:
continue
seen.add(focus.lower())
priority = item.get("priority")
if not isinstance(priority, (int, float)):
priority = 3
cleaned.append(
{
"focus": focus,
"reason": str(item.get("reason") or ""),
"priority": int(max(1, min(5, priority))),
}
)
if not cleaned:
cleaned = [{"focus": "Direct answer", "reason": "Default fallback", "priority": 3}]
cleaned.sort(key=lambda item: item.get("priority", 3), reverse=True)
if state:
state.update("planning", step=1, note=_candidate_note(cleaned[0]))
return cleaned
def _normalize_score(value: Any, *, default: int = 60) -> int:
if isinstance(value, (int, float)):
return int(max(0, min(100, value)))
return default
def _confidence_score(value: Any) -> int:
text = str(value or "").strip().lower()
if text.startswith("high"):
return 85
if text.startswith("low"):
return 35
return 60
def _risk_penalty(value: Any) -> int:
text = str(value or "").strip().lower()
if text.startswith("high"):
return 20
if text.startswith("medium"):
return 10
return 0
def _open_ended_candidate(
prompt: str,
*,
focus: str,
fact_pack: str,
history_lines: list[str],
state: ThoughtState | None,
step: int,
) -> dict[str, Any]:
if state:
state.update("drafting", step=step, note=focus)
prompt_text = (
"Using ONLY the fact pack, answer the question focusing on this angle: "
f"{focus}. "
"Write 2-4 sentences in plain prose (not a list). "
"If you infer, label it as inference. "
"Return JSON: {\"answer\":\"...\",\"confidence\":\"high|medium|low\","
"\"relevance\":0-100,\"satisfaction\":0-100,\"risk\":\"low|medium|high\"}."
)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context)
if not isinstance(result, dict):
result = {}
answer = str(result.get("answer") or "").strip()
if not answer:
answer = "I don't have enough data to answer that from the current snapshot."
candidate = {
"focus": focus,
"answer": answer,
"confidence": result.get("confidence", "medium"),
"relevance": _normalize_score(result.get("relevance"), default=60),
"satisfaction": _normalize_score(result.get("satisfaction"), default=60),
"risk": result.get("risk", "medium"),
}
candidate["score"] = _candidate_score(candidate)
return candidate
def _candidate_score(candidate: dict[str, Any]) -> float:
relevance = _normalize_score(candidate.get("relevance"), default=60)
satisfaction = _normalize_score(candidate.get("satisfaction"), default=60)
confidence = _confidence_score(candidate.get("confidence"))
score = relevance * 0.45 + satisfaction * 0.35 + confidence * 0.2
return score - _risk_penalty(candidate.get("risk"))
def _select_candidates(candidates: list[dict[str, Any]], *, count: int) -> list[dict[str, Any]]:
if not candidates:
return []
ranked = sorted(candidates, key=lambda item: item.get("score", 0), reverse=True)
picked: list[dict[str, Any]] = []
seen_focus: set[str] = set()
for item in ranked:
focus = str(item.get("focus") or "").strip().lower()
if focus and focus in seen_focus:
continue
picked.append(item)
if focus:
seen_focus.add(focus)
if len(picked) >= count:
break
return picked or ranked[:count]
def _open_ended_synthesize(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
candidates: list[dict[str, Any]],
state: ThoughtState | None,
step: int,
) -> str:
if state:
state.update("synthesizing", step=step, note="composing answer")
synth_prompt = (
"Compose the final answer to the question using the candidate answers below. "
"Select the best 1-2 candidates, blend them if helpful, and keep 2-4 sentences. "
"Use only the fact pack as evidence. "
"If you infer, label it as inference. "
"Avoid repeating the last response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), "
"HallucinationRisk (low|medium|high).\n"
f"Question: {prompt}\n"
f"Candidates: {json.dumps(candidates, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call_safe(
("open", "synth"),
synth_prompt,
context=context,
fallback="I don't have enough data to answer that.",
system_override=_open_ended_system(),
)
return _ensure_scores(reply)
def _open_ended_multi(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
mode: str,
state: ThoughtState | None = None,
) -> str:
angle_count = 2 if mode == "fast" else 4
total_steps = 1 + angle_count + 2
if state:
state.total_steps = total_steps
angles = _open_ended_plan(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
count=angle_count,
state=state,
)
candidates: list[dict[str, Any]] = []
step = 2
for angle in angles[:angle_count]:
candidates.append(
_open_ended_candidate(
prompt,
focus=str(angle.get("focus") or "Direct answer"),
fact_pack=fact_pack,
history_lines=history_lines,
state=state,
step=step,
)
)
step += 1
if state:
state.update("evaluating", step=step, note="ranking candidates")
selected = _select_candidates(candidates, count=1 if mode == "fast" else 2)
step += 1
reply = _open_ended_synthesize(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
candidates=selected or candidates,
state=state,
step=step,
)
if state:
state.update("done", step=total_steps)
return reply
def _open_ended_total_steps(mode: str) -> int:
angle_count = 2 if mode == "fast" else 4
return 1 + angle_count + 2
def _open_ended_fast( def _open_ended_fast(
prompt: str, prompt: str,
*, *,
fact_pack: str, fact_pack: str,
history_lines: list[str], history_lines: list[str],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
tags_available: set[str],
history_tags: set[str],
state: ThoughtState | None = None, state: ThoughtState | None = None,
) -> str: ) -> str:
if state: return _open_ended_multi(
state.update("planning", step=1)
analysis = _interpret_open_question(
prompt, prompt,
fact_pack=fact_pack, fact_pack=fact_pack,
history_lines=history_lines, history_lines=history_lines,
tags_available=tags_available, mode="fast",
avoid_tags=history_tags,
state=state, state=state,
) )
candidates = _select_insights(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
state=state or ThoughtState(),
analysis=analysis,
fact_lines=fact_lines,
fact_meta=fact_meta,
avoid_tags=history_tags,
)
prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)}
selected = _select_diverse_insights(
candidates,
preference=analysis.get("preference", "balanced"),
prefer_tags=prefer_tags,
avoid_tags=history_tags,
history_tags=history_tags,
fact_meta=fact_meta,
count=2,
)
if state:
state.update("synthesizing", step=3)
synthesis_prompt = (
"Use the question, fact pack, and selected insights to answer in 2-4 sentences. "
"Speak naturally, not as a list. "
"If the question is subjective, add a light opinion grounded in facts. "
"Avoid repeating the exact same observation as the most recent response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}\n"
f"Selected: {json.dumps(selected, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call_safe(
("fast", "open"),
synthesis_prompt,
context=context,
fallback="I don't have enough data to answer that.",
system_override=_open_ended_system(),
)
return _ensure_scores(reply)
def _interpret_open_question(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
tags_available: set[str],
avoid_tags: set[str],
state: ThoughtState | None = None,
) -> dict[str, Any]:
tags_list = ", ".join(sorted(tags_available)) if tags_available else "none"
avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none"
prompt_text = (
"Analyze the question against the fact pack. "
"Return JSON: {\"focus\":\"...\",\"preference\":\"balanced|novelty|utilization|stability|risk\","
"\"tags\":[\"...\"] ,\"notes\":\"...\"}. "
"If the question implies interesting/unique/unconventional/cool, set preference to novelty "
"and prefer dynamic tags (utilization/pods/database/availability) when possible. "
f"Use only these tags if relevant: {tags_list}. Avoid tags: {avoid_list}. "
"Use only the fact pack."
)
context = _append_history_context(fact_pack, history_lines)
analysis = _ollama_json_call(prompt_text + f" Question: {prompt}", context=context)
if not isinstance(analysis, dict):
analysis = {"focus": "cluster snapshot", "preference": "balanced", "notes": "", "tags": []}
preference = analysis.get("preference") or "balanced"
if preference not in ("balanced", "novelty", "utilization", "stability", "risk"):
preference = "balanced"
analysis["preference"] = preference
analysis.setdefault("focus", "cluster snapshot")
analysis.setdefault("notes", "")
tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else []
clean_tags = {t for t in tags if isinstance(t, str)}
analysis["tags"] = sorted(clean_tags & tags_available)
if state:
state.update("planning", step=1, note=str(analysis.get("focus") or ""))
return analysis
def _select_insights(
prompt: str,
*,
fact_pack: str,
history_lines: list[str],
state: ThoughtState,
analysis: dict[str, Any],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
avoid_tags: set[str],
) -> list[dict[str, Any]]:
preferred_tags = analysis.get("tags") if isinstance(analysis.get("tags"), list) else []
prefer_list = ", ".join(sorted({t for t in preferred_tags if isinstance(t, str)}))
avoid_list = ", ".join(sorted(avoid_tags)) if avoid_tags else "none"
available_list = ", ".join(sorted({t for t in _ALLOWED_INSIGHT_TAGS}))
insight_prompt = (
"From the fact pack, select 3-5 candidate insights that could answer the question. "
"Return JSON: {\"insights\":[{\"summary\":\"...\",\"fact_ids\":[\"F1\"],"
"\"relevance\":0-1,\"novelty\":0-1,\"rationale\":\"...\",\"tags\":[\"...\"]}]}. "
f"Available tags: {available_list}. Prefer tags: {prefer_list or 'none'}. Avoid tags: {avoid_list}. "
"Use only the fact pack and provided tags."
)
state.update("drafting candidates", step=2)
context = _append_history_context(fact_pack, history_lines)
result = _ollama_json_call(insight_prompt + f" Question: {prompt}", context=context)
insights = result.get("insights") if isinstance(result, dict) else None
if not isinstance(insights, list):
insights = []
cleaned: list[dict[str, Any]] = []
for item in insights:
if not isinstance(item, dict):
continue
if not item.get("summary") or not item.get("fact_ids"):
continue
tags = _insight_tags(item, fact_meta)
item["tags"] = sorted(tags)
cleaned.append(item)
state.update("drafting candidates", step=2, note=_candidate_note(item))
seeds = _seed_insights(fact_lines, fact_meta)
for seed in seeds:
cleaned.append(seed)
return cleaned
def _score_insight(insight: dict[str, Any], preference: str) -> float:
relevance = insight.get("relevance") if isinstance(insight.get("relevance"), (int, float)) else 0.0
novelty = insight.get("novelty") if isinstance(insight.get("novelty"), (int, float)) else 0.0
if preference == "novelty":
return 0.4 * relevance + 0.6 * novelty
if preference == "utilization":
return 0.7 * relevance + 0.3 * novelty
if preference == "stability":
return 0.7 * relevance + 0.3 * novelty
if preference == "risk":
return 0.6 * relevance + 0.4 * novelty
return 0.6 * relevance + 0.4 * novelty
def _open_ended_deep( def _open_ended_deep(
prompt: str, prompt: str,
*, *,
fact_pack: str, fact_pack: str,
fact_ids: set[str],
history_lines: list[str], history_lines: list[str],
fact_lines: list[str],
fact_meta: dict[str, dict[str, Any]],
tags_available: set[str],
history_tags: set[str],
state: ThoughtState | None = None, state: ThoughtState | None = None,
) -> str: ) -> str:
state = state or ThoughtState() return _open_ended_multi(
if not fact_ids:
return _ensure_scores("I don't have enough data to answer that.")
state.total_steps = 7
analysis = _interpret_open_question(
prompt, prompt,
fact_pack=fact_pack, fact_pack=fact_pack,
history_lines=history_lines, history_lines=history_lines,
tags_available=tags_available, mode="deep",
avoid_tags=history_tags,
state=state, state=state,
) )
candidates = _select_insights(
prompt,
fact_pack=fact_pack,
history_lines=history_lines,
state=state,
analysis=analysis,
fact_lines=fact_lines,
fact_meta=fact_meta,
avoid_tags=history_tags,
)
state.update("verifying", step=3, note="scoring insights")
filtered: list[dict[str, Any]] = []
for cand in candidates:
cites = cand.get("fact_ids") if isinstance(cand.get("fact_ids"), list) else []
if cites and not all(cite in fact_ids for cite in cites):
continue
filtered.append(cand)
if not filtered:
filtered = candidates
preference = analysis.get("preference", "balanced")
prefer_tags = {t for t in analysis.get("tags", []) if isinstance(t, str)}
top = _select_diverse_insights(
filtered,
preference=preference,
prefer_tags=prefer_tags,
avoid_tags=history_tags,
history_tags=history_tags,
fact_meta=fact_meta,
count=2,
)
state.update("synthesizing", step=4, note="composing response")
synth_prompt = (
"Use the question, fact pack, and selected insights to craft a concise answer. "
"Write 2-4 sentences. Explain why the selected insights stand out. "
"If the question is subjective, include a light opinion grounded in facts. "
"Avoid repeating the same observation as the last response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100).\n"
f"Question: {prompt}\n"
f"Interpretation: {json.dumps(analysis, ensure_ascii=False)}\n"
f"Recent tags: {', '.join(sorted(history_tags)) if history_tags else 'none'}\n"
f"Selected: {json.dumps(top, ensure_ascii=False)}"
)
context = _append_history_context(fact_pack, history_lines)
reply = _ollama_call_safe(
("deep", "open"),
synth_prompt,
context=context,
fallback="I don't have enough data to answer that.",
system_override=_open_ended_system(),
)
state.update("done", step=7)
return _ensure_scores(reply)
def open_ended_answer( def open_ended_answer(
prompt: str, prompt: str,
@ -3081,30 +2901,17 @@ def open_ended_answer(
if not lines: if not lines:
return _ensure_scores("I don't have enough data to answer that.") return _ensure_scores("I don't have enough data to answer that.")
fact_pack = _fact_pack_text(lines) fact_pack = _fact_pack_text(lines)
fact_ids = {f"F{i+1}" for i in range(len(lines))}
fact_meta = _fact_pack_meta(lines)
tags_available = {tag for entry in fact_meta.values() for tag in entry.get("tags", [])}
history_tags = _history_tags(history_lines)
if mode == "fast": if mode == "fast":
return _open_ended_fast( return _open_ended_fast(
prompt, prompt,
fact_pack=fact_pack, fact_pack=fact_pack,
history_lines=history_lines, history_lines=history_lines,
fact_lines=lines,
fact_meta=fact_meta,
tags_available=tags_available,
history_tags=history_tags,
state=state, state=state,
) )
return _open_ended_deep( return _open_ended_deep(
prompt, prompt,
fact_pack=fact_pack, fact_pack=fact_pack,
fact_ids=fact_ids,
history_lines=history_lines, history_lines=history_lines,
fact_lines=lines,
fact_meta=fact_meta,
tags_available=tags_available,
history_tags=history_tags,
state=state, state=state,
) )
@ -3175,12 +2982,12 @@ class _AtlasbotHandler(BaseHTTPRequestHandler):
workloads=workloads, workloads=workloads,
) )
followup = _is_followup_query(cleaned) followup = _is_followup_query(cleaned)
cluster_query = ( cleaned_q = normalize_query(cleaned)
_is_cluster_query(cleaned, inventory=inventory, workloads=workloads) cluster_affinity = _is_cluster_query(cleaned, inventory=inventory, workloads=workloads)
or _knowledge_intent(cleaned) subjective = _is_subjective_query(cleaned)
or _is_subjective_query(cleaned) followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS)
or (history_cluster and followup) contextual = history_cluster and (followup or followup_affinity)
) cluster_query = cluster_affinity or contextual
context = "" context = ""
if cluster_query: if cluster_query:
context = build_context( context = build_context(
@ -3608,7 +3415,7 @@ def open_ended_with_thinking(
) -> str: ) -> str:
result: dict[str, str] = {"reply": ""} result: dict[str, str] = {"reply": ""}
done = threading.Event() done = threading.Event()
total_steps = 4 if mode == "fast" else 7 total_steps = _open_ended_total_steps(mode)
state = ThoughtState(total_steps=total_steps) state = ThoughtState(total_steps=total_steps)
def worker(): def worker():
@ -3722,12 +3529,12 @@ def sync_loop(token: str, room_id: str):
workloads=workloads, workloads=workloads,
) )
followup = _is_followup_query(cleaned_body) followup = _is_followup_query(cleaned_body)
cluster_query = ( cleaned_q = normalize_query(cleaned_body)
_is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads) cluster_affinity = _is_cluster_query(cleaned_body, inventory=inventory, workloads=workloads)
or _knowledge_intent(cleaned_body) subjective = _is_subjective_query(cleaned_body)
or _is_subjective_query(cleaned_body) followup_affinity = subjective or any(word in cleaned_q for word in METRIC_HINT_WORDS)
or (history_cluster and followup) contextual = history_cluster and (followup or followup_affinity)
) cluster_query = cluster_affinity or contextual
context = "" context = ""
if cluster_query: if cluster_query:
context = build_context( context = build_context(