atlasbot: prune context for grounding

This commit is contained in:
Brad Stein 2026-01-31 07:11:16 -03:00
parent 57fdf2e135
commit 1eec102e66
4 changed files with 117 additions and 7 deletions

View File

@ -90,19 +90,38 @@ class AnswerEngine:
summary = build_summary(snapshot_used) summary = build_summary(snapshot_used)
kb_summary = self._kb.summary() kb_summary = self._kb.summary()
runbooks = self._kb.runbook_titles(limit=4) runbooks = self._kb.runbook_titles(limit=4)
snapshot_ctx = summary_text(snapshot_used) summary_lines = _summary_lines(snapshot_used)
history_ctx = _format_history(history) core_context = _build_context(summary_lines, question, {"answer_style": "direct"}, max_lines=40)
base_context = _join_context([ base_context = _join_context([
kb_summary, kb_summary,
runbooks, runbooks,
f"ClusterSnapshot:{snapshot_ctx}" if snapshot_ctx else "", f"ClusterSnapshot:{core_context}" if core_context else "",
history_ctx,
]) ])
started = time.monotonic() started = time.monotonic()
if observer: if observer:
observer("classify", "classifying intent") observer("classify", "classifying intent")
classify = await self._classify(question, base_context) classify = await self._classify(question, base_context)
history_ctx = _format_history(history)
context_lines = _build_context(summary_lines, question, classify, max_lines=120)
base_context = _join_context([
kb_summary,
runbooks,
f"ClusterSnapshot:{context_lines}" if context_lines else "",
])
if history_ctx and classify.get("follow_up"):
history_ctx = "ConversationHistory (non-authoritative, use only for phrasing):\n" + history_ctx
base_context = _join_context([base_context, history_ctx])
log.info(
"atlasbot_context",
extra={
"extra": {
"mode": mode,
"lines": len(context_lines.splitlines()) if context_lines else 0,
"chars": len(context_lines) if context_lines else 0,
}
},
)
log.info( log.info(
"atlasbot_classify", "atlasbot_classify",
extra={"extra": {"mode": mode, "elapsed_sec": round(time.monotonic() - started, 2), "classify": classify}}, extra={"extra": {"mode": mode, "elapsed_sec": round(time.monotonic() - started, 2), "classify": classify}},
@ -513,6 +532,85 @@ def _snapshot_id(summary: dict[str, Any]) -> str | None:
return None return None
def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
text = summary_text(snapshot)
if not text:
return []
return [line for line in text.splitlines() if line.strip()]
def _build_context(
summary_lines: list[str],
question: str,
classify: dict[str, Any],
*,
max_lines: int,
) -> str:
if not summary_lines:
return ""
lower = (question or "").lower()
prefixes: set[str] = set()
core_prefixes = {
"nodes",
"archs",
"roles",
"hardware",
"node_arch",
"node_os",
"pods",
"namespaces_top",
"namespace_pods",
"namespace_nodes",
"hottest",
"postgres",
"signals",
"profiles",
"watchlist",
"snapshot",
}
prefixes.update(core_prefixes)
def _want(words: tuple[str, ...]) -> bool:
return any(word in lower for word in words)
if _want(("cpu", "load", "ram", "memory", "io", "disk", "net", "network")):
prefixes.update(
{
"node_usage",
"node_load",
"node_load_summary",
"node_usage_top",
"root_disk",
"pvc_usage",
"namespace_usage",
"namespace_io_net",
}
)
if _want(("namespace", "quota", "overcommit", "capacity")):
prefixes.update({"namespace_capacity", "namespace_capacity_summary", "namespace_usage", "namespace_requests"})
if _want(("pod", "pending", "crash", "image", "pull", "restart", "fail")):
prefixes.update({"pod_issues", "pod_restarts", "pod_usage", "pod_events", "events", "event_summary"})
if _want(("alert", "alerting", "incident", "error")):
prefixes.update({"signals", "events", "event_summary", "pod_issues", "watchlist"})
if _want(("flux", "reconcile", "gitops")):
prefixes.update({"flux"})
if _want(("longhorn", "volume", "pvc", "storage")):
prefixes.update({"longhorn", "pvc_usage", "root_disk"})
if _want(("workload", "deployment", "stateful", "daemon")):
prefixes.update({"workloads", "workloads_by_namespace", "workload_health"})
if classify.get("answer_style") == "insightful" or classify.get("question_type") == "open_ended":
prefixes.update({"signals", "profiles", "watchlist", "hottest"})
selected: list[str] = []
for line in summary_lines:
prefix = line.split(":", 1)[0].strip().lower()
if prefix in prefixes or any(prefix.startswith(pfx) for pfx in prefixes):
selected.append(line)
if len(selected) >= max_lines:
break
return "\n".join(selected)
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str: def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
raw = json.dumps(summary, ensure_ascii=False) raw = json.dumps(summary, ensure_ascii=False)
return raw[:max_chars] return raw[:max_chars]

View File

@ -60,10 +60,9 @@ class LLMClient:
def build_messages(system: str, prompt: str, *, context: str | None = None) -> list[dict[str, str]]: def build_messages(system: str, prompt: str, *, context: str | None = None) -> list[dict[str, str]]:
system_content = system messages: list[dict[str, str]] = [{"role": "system", "content": system}]
if context: if context:
system_content = system_content + "\n\nContext (grounded facts):\n" + context messages.append({"role": "user", "content": "Context (grounded facts):\n" + context})
messages: list[dict[str, str]] = [{"role": "system", "content": system_content}]
messages.append({"role": "user", "content": prompt}) messages.append({"role": "user", "content": prompt})
return messages return messages

View File

@ -1,8 +1,11 @@
CLUSTER_SYSTEM = ( CLUSTER_SYSTEM = (
"You are Atlas, the Titan Lab assistant for the Atlas cluster. " "You are Atlas, the Titan Lab assistant for the Atlas cluster. "
"Use the provided context as your source of truth. " "Use the provided context as your source of truth. "
"Context is authoritative; do not ignore it. "
"If Context is present, you must base numbers and facts on it. "
"If a fact or number is not present in the context, say you do not know. " "If a fact or number is not present in the context, say you do not know. "
"Do not invent metrics or capacities. " "Do not invent metrics or capacities. "
"If history conflicts with the snapshot, trust the snapshot. "
"If the question is about Atlas, respond in short paragraphs. " "If the question is about Atlas, respond in short paragraphs. "
"Avoid commands unless explicitly asked. " "Avoid commands unless explicitly asked. "
"If information is missing, say so clearly and avoid guessing. " "If information is missing, say so clearly and avoid guessing. "
@ -28,6 +31,7 @@ ANGLE_PROMPT = (
CANDIDATE_PROMPT = ( CANDIDATE_PROMPT = (
"Answer this angle using the provided context. " "Answer this angle using the provided context. "
"Context facts override any prior or remembered statements. "
"Keep it concise, 2-4 sentences. " "Keep it concise, 2-4 sentences. "
"If the question is open-ended, include one grounded interpretation or implication. " "If the question is open-ended, include one grounded interpretation or implication. "
"Avoid dumping raw metrics unless asked; prefer what the numbers imply. " "Avoid dumping raw metrics unless asked; prefer what the numbers imply. "

View File

@ -1669,6 +1669,15 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
if not summary: if not summary:
return "" return ""
lines: list[str] = [] lines: list[str] = []
collected_at = snapshot.get("collected_at") if isinstance(snapshot, dict) else None
snapshot_version = snapshot.get("snapshot_version") if isinstance(snapshot, dict) else None
if collected_at or snapshot_version:
bits = []
if collected_at:
bits.append(f"collected_at={collected_at}")
if snapshot_version:
bits.append(f"version={snapshot_version}")
lines.append("snapshot: " + ", ".join(bits))
_append_nodes(lines, summary) _append_nodes(lines, summary)
_append_pressure(lines, summary) _append_pressure(lines, summary)
_append_hardware(lines, summary) _append_hardware(lines, summary)