atlasbot: lexicon context and claim store
This commit is contained in:
parent
99ea10d78c
commit
c1f1ef23a6
@ -55,6 +55,7 @@ class Settings:
|
||||
thinking_interval_sec: int
|
||||
conversation_ttl_sec: int
|
||||
snapshot_pin_enabled: bool
|
||||
state_db_path: str
|
||||
|
||||
queue_enabled: bool
|
||||
nats_url: str
|
||||
@ -153,6 +154,7 @@ def load_settings() -> Settings:
|
||||
thinking_interval_sec=_env_int("ATLASBOT_THINKING_INTERVAL_SEC", "30"),
|
||||
conversation_ttl_sec=_env_int("ATLASBOT_CONVERSATION_TTL_SEC", "900"),
|
||||
snapshot_pin_enabled=_env_bool("ATLASBOT_SNAPSHOT_PIN_ENABLED", "false"),
|
||||
state_db_path=os.getenv("ATLASBOT_STATE_DB", "/data/atlasbot_state.db"),
|
||||
queue_enabled=_env_bool("ATLASBOT_QUEUE_ENABLED", "false"),
|
||||
nats_url=os.getenv("ATLASBOT_NATS_URL", "nats://nats.nats.svc.cluster.local:4222"),
|
||||
nats_stream=os.getenv("ATLASBOT_NATS_STREAM", "atlasbot"),
|
||||
|
||||
@ -12,6 +12,7 @@ from atlasbot.knowledge.loader import KnowledgeBase
|
||||
from atlasbot.llm.client import LLMClient, build_messages, parse_json
|
||||
from atlasbot.llm import prompts
|
||||
from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text
|
||||
from atlasbot.state.store import ClaimStore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@ -85,7 +86,7 @@ class AnswerEngine:
|
||||
self._llm = llm
|
||||
self._kb = kb
|
||||
self._snapshot = snapshot
|
||||
self._state: dict[str, ConversationState] = {}
|
||||
self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec)
|
||||
|
||||
async def answer(
|
||||
self,
|
||||
@ -111,6 +112,22 @@ class AnswerEngine:
|
||||
call_count = 0
|
||||
limit_hit = False
|
||||
|
||||
debug_tags = {
|
||||
"route",
|
||||
"decompose",
|
||||
"chunk_score",
|
||||
"synth",
|
||||
"subanswer",
|
||||
"tool",
|
||||
"followup",
|
||||
"select_claims",
|
||||
}
|
||||
|
||||
def _debug_log(name: str, payload: Any) -> None:
|
||||
if not self._settings.debug_pipeline:
|
||||
return
|
||||
log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}})
|
||||
|
||||
async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str:
|
||||
nonlocal call_count, limit_hit
|
||||
if not limitless and call_count >= call_cap:
|
||||
@ -123,6 +140,8 @@ class AnswerEngine:
|
||||
"atlasbot_llm_call",
|
||||
extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}},
|
||||
)
|
||||
if self._settings.debug_pipeline and tag in debug_tags:
|
||||
_debug_log(f"llm_raw_{tag}", str(response)[:1200])
|
||||
return response
|
||||
|
||||
state = self._get_state(conversation_id)
|
||||
@ -135,6 +154,7 @@ class AnswerEngine:
|
||||
kb_summary = self._kb.summary()
|
||||
runbooks = self._kb.runbook_titles(limit=6)
|
||||
history_ctx = _format_history(history)
|
||||
lexicon_ctx = _lexicon_context(summary)
|
||||
|
||||
started = time.monotonic()
|
||||
reply = ""
|
||||
@ -146,19 +166,33 @@ class AnswerEngine:
|
||||
if observer:
|
||||
observer("normalize", "normalizing")
|
||||
normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question
|
||||
normalize_raw = await call_llm(prompts.NORMALIZE_SYSTEM, normalize_prompt, model=plan.fast_model, tag="normalize")
|
||||
normalize_raw = await call_llm(
|
||||
prompts.NORMALIZE_SYSTEM,
|
||||
normalize_prompt,
|
||||
context=lexicon_ctx,
|
||||
model=plan.fast_model,
|
||||
tag="normalize",
|
||||
)
|
||||
normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []})
|
||||
normalized = str(normalize.get("normalized") or question).strip() or question
|
||||
keywords = normalize.get("keywords") or []
|
||||
_debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords})
|
||||
|
||||
if observer:
|
||||
observer("route", "routing")
|
||||
route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords)
|
||||
route_raw = await call_llm(prompts.ROUTE_SYSTEM, route_prompt, context=kb_summary, model=plan.fast_model, tag="route")
|
||||
route_raw = await call_llm(
|
||||
prompts.ROUTE_SYSTEM,
|
||||
route_prompt,
|
||||
context=_join_context([kb_summary, lexicon_ctx]),
|
||||
model=plan.fast_model,
|
||||
tag="route",
|
||||
)
|
||||
classify = _parse_json_block(route_raw, fallback={})
|
||||
classify.setdefault("needs_snapshot", True)
|
||||
classify.setdefault("answer_style", "direct")
|
||||
classify.setdefault("follow_up", False)
|
||||
_debug_log("route_parsed", {"classify": classify, "normalized": normalized})
|
||||
cluster_terms = (
|
||||
"atlas",
|
||||
"cluster",
|
||||
@ -187,11 +221,13 @@ class AnswerEngine:
|
||||
decompose_raw = await call_llm(
|
||||
prompts.DECOMPOSE_SYSTEM,
|
||||
decompose_prompt + "\nQuestion: " + normalized,
|
||||
context=lexicon_ctx,
|
||||
model=plan.fast_model if mode == "quick" else plan.model,
|
||||
tag="decompose",
|
||||
)
|
||||
parts = _parse_json_list(decompose_raw)
|
||||
sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions)
|
||||
_debug_log("decompose_parsed", {"sub_questions": sub_questions})
|
||||
|
||||
snapshot_context = ""
|
||||
if classify.get("needs_snapshot"):
|
||||
@ -200,6 +236,19 @@ class AnswerEngine:
|
||||
chunks = _chunk_lines(summary_lines, plan.chunk_lines)
|
||||
scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan)
|
||||
selected = _select_chunks(chunks, scored, plan)
|
||||
if self._settings.debug_pipeline:
|
||||
scored_preview = sorted(
|
||||
[{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks],
|
||||
key=lambda item: item["score"],
|
||||
reverse=True,
|
||||
)[: min(len(chunks), max(plan.chunk_top, 6))]
|
||||
_debug_log(
|
||||
"chunk_selected",
|
||||
{
|
||||
"selected_ids": [item["id"] for item in selected],
|
||||
"top_scored": scored_preview,
|
||||
},
|
||||
)
|
||||
snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected])
|
||||
|
||||
context = _join_context(
|
||||
@ -413,8 +462,8 @@ class AnswerEngine:
|
||||
def _get_state(self, conversation_id: str | None) -> ConversationState | None:
|
||||
if not conversation_id:
|
||||
return None
|
||||
self._cleanup_state()
|
||||
return self._state.get(conversation_id)
|
||||
state_payload = self._store.get(conversation_id)
|
||||
return _state_from_payload(state_payload) if state_payload else None
|
||||
|
||||
def _store_state(
|
||||
self,
|
||||
@ -425,20 +474,16 @@ class AnswerEngine:
|
||||
) -> None:
|
||||
snapshot_id = _snapshot_id(summary)
|
||||
pinned_snapshot = snapshot if self._settings.snapshot_pin_enabled else None
|
||||
self._state[conversation_id] = ConversationState(
|
||||
updated_at=time.monotonic(),
|
||||
claims=claims,
|
||||
snapshot_id=snapshot_id,
|
||||
snapshot=pinned_snapshot,
|
||||
)
|
||||
self._cleanup_state()
|
||||
payload = {
|
||||
"updated_at": time.monotonic(),
|
||||
"claims": _claims_to_payload(claims),
|
||||
"snapshot_id": snapshot_id,
|
||||
"snapshot": pinned_snapshot,
|
||||
}
|
||||
self._store.set(conversation_id, payload)
|
||||
|
||||
def _cleanup_state(self) -> None:
|
||||
ttl = max(60, self._settings.conversation_ttl_sec)
|
||||
now = time.monotonic()
|
||||
expired = [key for key, state in self._state.items() if now - state.updated_at > ttl]
|
||||
for key in expired:
|
||||
self._state.pop(key, None)
|
||||
self._store.cleanup()
|
||||
|
||||
|
||||
def _build_meta(
|
||||
@ -529,7 +574,7 @@ def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int)
|
||||
except (TypeError, ValueError):
|
||||
weight = 1.0
|
||||
ranked.append((weight, question))
|
||||
ranked.sort(key=lambda item: item[0])
|
||||
ranked.sort(key=lambda item: item[0], reverse=True)
|
||||
questions = [item[1] for item in ranked][:limit]
|
||||
return questions or [fallback]
|
||||
|
||||
@ -541,7 +586,7 @@ def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]]
|
||||
for idx in range(0, len(lines), lines_per_chunk):
|
||||
chunk_lines = lines[idx : idx + lines_per_chunk]
|
||||
text = "\n".join(chunk_lines)
|
||||
summary = " | ".join(chunk_lines[:2])
|
||||
summary = " | ".join(chunk_lines[:4])
|
||||
chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary})
|
||||
return chunks
|
||||
|
||||
@ -658,6 +703,32 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]:
|
||||
return [line for line in text.splitlines() if line.strip()]
|
||||
|
||||
|
||||
def _lexicon_context(summary: dict[str, Any]) -> str:
|
||||
if not isinstance(summary, dict):
|
||||
return ""
|
||||
lexicon = summary.get("lexicon")
|
||||
if not isinstance(lexicon, dict):
|
||||
return ""
|
||||
terms = lexicon.get("terms")
|
||||
aliases = lexicon.get("aliases")
|
||||
lines: list[str] = []
|
||||
if isinstance(terms, list):
|
||||
for entry in terms[:8]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
term = entry.get("term")
|
||||
meaning = entry.get("meaning")
|
||||
if term and meaning:
|
||||
lines.append(f"{term}: {meaning}")
|
||||
if isinstance(aliases, dict):
|
||||
for key, value in list(aliases.items())[:6]:
|
||||
if key and value:
|
||||
lines.append(f"alias {key} -> {value}")
|
||||
if not lines:
|
||||
return ""
|
||||
return "Lexicon:\n" + "\n".join(lines)
|
||||
|
||||
|
||||
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
|
||||
raw = text.strip()
|
||||
match = re.search(r"\{.*\}", raw, flags=re.S)
|
||||
@ -731,6 +802,55 @@ def _snapshot_id(summary: dict[str, Any]) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
|
||||
output: list[dict[str, Any]] = []
|
||||
for claim in claims:
|
||||
evidence = []
|
||||
for ev in claim.evidence:
|
||||
evidence.append(
|
||||
{
|
||||
"path": ev.path,
|
||||
"reason": ev.reason,
|
||||
"value_at_claim": ev.value_at_claim,
|
||||
}
|
||||
)
|
||||
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
|
||||
return output
|
||||
|
||||
|
||||
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
|
||||
if not payload:
|
||||
return None
|
||||
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
|
||||
claims: list[ClaimItem] = []
|
||||
if isinstance(claims_raw, list):
|
||||
for entry in claims_raw:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
claim_text = str(entry.get("claim") or "").strip()
|
||||
claim_id = str(entry.get("id") or "").strip()
|
||||
if not claim_text or not claim_id:
|
||||
continue
|
||||
evidence_items: list[EvidenceItem] = []
|
||||
for ev in entry.get("evidence") or []:
|
||||
if not isinstance(ev, dict):
|
||||
continue
|
||||
path = str(ev.get("path") or "").strip()
|
||||
if not path:
|
||||
continue
|
||||
reason = str(ev.get("reason") or "").strip()
|
||||
value_at_claim = ev.get("value_at_claim")
|
||||
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
|
||||
if evidence_items:
|
||||
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
||||
return ConversationState(
|
||||
updated_at=float(payload.get("updated_at") or time.monotonic()),
|
||||
claims=claims,
|
||||
snapshot_id=payload.get("snapshot_id"),
|
||||
snapshot=payload.get("snapshot"),
|
||||
)
|
||||
|
||||
|
||||
def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str:
|
||||
raw = json.dumps(summary, ensure_ascii=False)
|
||||
return raw[:max_chars]
|
||||
|
||||
@ -9,8 +9,9 @@ CLUSTER_SYSTEM = (
|
||||
)
|
||||
|
||||
NORMALIZE_SYSTEM = (
|
||||
"Normalize user questions for reasoning. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Normalize user questions for reasoning. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
NORMALIZE_PROMPT = (
|
||||
@ -19,10 +20,11 @@ NORMALIZE_PROMPT = (
|
||||
)
|
||||
|
||||
ROUTE_SYSTEM = (
|
||||
"Route the question to the best sources and answer style. "
|
||||
"Assume questions are about the Titan Lab Atlas Kubernetes cluster unless the user explicitly asks about something else. "
|
||||
"Prefer snapshot evidence when available. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Route the question to the best sources and answer style. "
|
||||
+ "Assume questions are about the Titan Lab Atlas Kubernetes cluster unless the user explicitly asks about something else. "
|
||||
+ "Prefer snapshot evidence when available. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
ROUTE_PROMPT = (
|
||||
@ -31,8 +33,9 @@ ROUTE_PROMPT = (
|
||||
)
|
||||
|
||||
DECOMPOSE_SYSTEM = (
|
||||
"Break complex questions into smaller, answerable sub-questions. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Break complex questions into smaller, answerable sub-questions. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
DECOMPOSE_PROMPT = (
|
||||
@ -41,8 +44,9 @@ DECOMPOSE_PROMPT = (
|
||||
)
|
||||
|
||||
RETRIEVER_SYSTEM = (
|
||||
"Score relevance of chunk summaries to the question and sub-questions. "
|
||||
"Return JSON list only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Score relevance of chunk summaries to the question and sub-questions. "
|
||||
+ "Return JSON list only."
|
||||
)
|
||||
|
||||
CHUNK_SCORE_PROMPT = (
|
||||
@ -51,8 +55,9 @@ CHUNK_SCORE_PROMPT = (
|
||||
)
|
||||
|
||||
TOOL_SYSTEM = (
|
||||
"Suggest a safe, read-only command that could refine the answer. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Suggest a safe, read-only command that could refine the answer. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
TOOL_PROMPT = (
|
||||
@ -61,8 +66,9 @@ TOOL_PROMPT = (
|
||||
)
|
||||
|
||||
ANSWER_SYSTEM = (
|
||||
"Answer a focused sub-question using the provided context. "
|
||||
"Be concise and grounded."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Answer a focused sub-question using the provided context. "
|
||||
+ "Be concise and grounded."
|
||||
)
|
||||
|
||||
SUBANSWER_PROMPT = (
|
||||
@ -71,8 +77,9 @@ SUBANSWER_PROMPT = (
|
||||
)
|
||||
|
||||
SYNTHESIZE_SYSTEM = (
|
||||
"Synthesize a final answer from sub-answers. "
|
||||
"Keep it conversational and grounded."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Synthesize a final answer from sub-answers. "
|
||||
+ "Keep it conversational and grounded."
|
||||
)
|
||||
|
||||
SYNTHESIZE_PROMPT = (
|
||||
@ -86,8 +93,9 @@ DRAFT_SELECT_PROMPT = (
|
||||
)
|
||||
|
||||
CRITIC_SYSTEM = (
|
||||
"Critique answers for unsupported claims or missing context. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Critique answers for unsupported claims or missing context. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
CRITIC_PROMPT = (
|
||||
@ -95,8 +103,9 @@ CRITIC_PROMPT = (
|
||||
)
|
||||
|
||||
REVISION_SYSTEM = (
|
||||
"Revise the answer based on critique. "
|
||||
"Keep the response grounded and concise."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Revise the answer based on critique. "
|
||||
+ "Keep the response grounded and concise."
|
||||
)
|
||||
|
||||
REVISION_PROMPT = (
|
||||
@ -105,8 +114,9 @@ REVISION_PROMPT = (
|
||||
)
|
||||
|
||||
GAP_SYSTEM = (
|
||||
"Identify missing data that would improve the answer. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Identify missing data that would improve the answer. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
EVIDENCE_GAP_PROMPT = (
|
||||
@ -115,8 +125,9 @@ EVIDENCE_GAP_PROMPT = (
|
||||
)
|
||||
|
||||
CLAIM_SYSTEM = (
|
||||
"Extract claim-evidence mappings from the answer. "
|
||||
"Return JSON only."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Extract claim-evidence mappings from the answer. "
|
||||
+ "Return JSON only."
|
||||
)
|
||||
|
||||
CLAIM_MAP_PROMPT = (
|
||||
@ -124,8 +135,9 @@ CLAIM_MAP_PROMPT = (
|
||||
)
|
||||
|
||||
FOLLOWUP_SYSTEM = (
|
||||
"Answer follow-ups using prior claim evidence only. "
|
||||
"Return JSON only when asked to select claims."
|
||||
CLUSTER_SYSTEM
|
||||
+ " Answer follow-ups using prior claim evidence only. "
|
||||
+ "Return JSON only when asked to select claims."
|
||||
)
|
||||
|
||||
FOLLOWUP_PROMPT = (
|
||||
|
||||
@ -115,6 +115,8 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) ->
|
||||
profiles = cluster_summary.get("profiles")
|
||||
inventory = cluster_summary.get("inventory")
|
||||
topology = cluster_summary.get("topology")
|
||||
lexicon = cluster_summary.get("lexicon")
|
||||
cross_stats = cluster_summary.get("cross_stats")
|
||||
if isinstance(signals, list):
|
||||
summary["signals"] = signals
|
||||
if isinstance(profiles, dict):
|
||||
@ -123,6 +125,10 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) ->
|
||||
summary["inventory"] = inventory
|
||||
if isinstance(topology, dict):
|
||||
summary["topology"] = topology
|
||||
if isinstance(lexicon, dict):
|
||||
summary["lexicon"] = lexicon
|
||||
if isinstance(cross_stats, dict):
|
||||
summary["cross_stats"] = cross_stats
|
||||
|
||||
|
||||
def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
@ -538,14 +544,25 @@ def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None:
|
||||
workers_str = ""
|
||||
if workers_total is not None and workers_ready is not None:
|
||||
workers_str = f", workers_ready={workers_ready}/{workers_total}"
|
||||
total = nodes.get("total")
|
||||
ready = nodes.get("ready")
|
||||
not_ready = nodes.get("not_ready")
|
||||
if not_ready is None:
|
||||
not_ready = 0
|
||||
lines.append(
|
||||
"nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format(
|
||||
total=nodes.get("total"),
|
||||
ready=nodes.get("ready"),
|
||||
not_ready=nodes.get("not_ready"),
|
||||
total=total,
|
||||
ready=ready,
|
||||
not_ready=not_ready,
|
||||
workers=workers_str,
|
||||
)
|
||||
)
|
||||
if total is not None:
|
||||
lines.append(f"nodes_total: {total}")
|
||||
if ready is not None:
|
||||
lines.append(f"nodes_ready: {ready}")
|
||||
if not_ready is not None:
|
||||
lines.append(f"nodes_not_ready_count: {not_ready}")
|
||||
if not isinstance(summary.get("nodes_summary"), dict):
|
||||
return
|
||||
not_ready_names = summary["nodes_summary"].get("not_ready_names") or []
|
||||
@ -1664,6 +1681,81 @@ def _append_workloads_by_namespace(lines: list[str], summary: dict[str, Any]) ->
|
||||
lines.append(f"workloads_top_{ns}: " + "; ".join(parts))
|
||||
|
||||
|
||||
def _append_lexicon(lines: list[str], summary: dict[str, Any]) -> None:
|
||||
lexicon = summary.get("lexicon")
|
||||
if not isinstance(lexicon, dict):
|
||||
return
|
||||
terms = lexicon.get("terms") if isinstance(lexicon.get("terms"), list) else []
|
||||
aliases = lexicon.get("aliases") if isinstance(lexicon.get("aliases"), dict) else {}
|
||||
for entry in terms[:8]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
term = entry.get("term")
|
||||
meaning = entry.get("meaning")
|
||||
if term and meaning:
|
||||
lines.append(f"lexicon_term: {term} => {meaning}")
|
||||
for key, value in list(aliases.items())[:6]:
|
||||
if key and value:
|
||||
lines.append(f"lexicon_alias: {key} => {value}")
|
||||
|
||||
|
||||
def _append_cross_stats(lines: list[str], summary: dict[str, Any]) -> None:
|
||||
cross_stats = summary.get("cross_stats")
|
||||
if not isinstance(cross_stats, dict):
|
||||
return
|
||||
node_entries = cross_stats.get("node_metric_top") if isinstance(cross_stats.get("node_metric_top"), list) else []
|
||||
for entry in node_entries[:10]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
metric = entry.get("metric")
|
||||
node = entry.get("node")
|
||||
value = entry.get("value")
|
||||
cpu = entry.get("cpu")
|
||||
ram = entry.get("ram")
|
||||
net = entry.get("net")
|
||||
io = entry.get("io")
|
||||
pods = entry.get("pods_total")
|
||||
if metric and node:
|
||||
parts = [
|
||||
f"value={_format_float(value)}",
|
||||
f"cpu={_format_float(cpu)}",
|
||||
f"ram={_format_float(ram)}",
|
||||
f"net={_format_float(net)}",
|
||||
f"io={_format_float(io)}",
|
||||
]
|
||||
if pods is not None:
|
||||
parts.append(f"pods={pods}")
|
||||
lines.append(f"cross_node_{metric}: {node} " + " ".join(parts))
|
||||
ns_entries = cross_stats.get("namespace_metric_top") if isinstance(cross_stats.get("namespace_metric_top"), list) else []
|
||||
for entry in ns_entries[:10]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
metric = entry.get("metric")
|
||||
namespace = entry.get("namespace")
|
||||
value = entry.get("value")
|
||||
pods = entry.get("pods_total")
|
||||
cpu_ratio = entry.get("cpu_ratio")
|
||||
mem_ratio = entry.get("mem_ratio")
|
||||
if metric and namespace:
|
||||
parts = [
|
||||
f"value={_format_float(value)}",
|
||||
f"cpu_ratio={_format_float(cpu_ratio)}",
|
||||
f"mem_ratio={_format_float(mem_ratio)}",
|
||||
]
|
||||
if pods is not None:
|
||||
parts.append(f"pods={pods}")
|
||||
lines.append(f"cross_namespace_{metric}: {namespace} " + " ".join(parts))
|
||||
pvc_entries = cross_stats.get("pvc_top") if isinstance(cross_stats.get("pvc_top"), list) else []
|
||||
for entry in pvc_entries[:5]:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
namespace = entry.get("namespace")
|
||||
pvc = entry.get("pvc")
|
||||
used = entry.get("used_percent")
|
||||
if namespace and pvc:
|
||||
lines.append(f"cross_pvc_usage: {namespace}/{pvc} used={_format_float(used)}")
|
||||
|
||||
|
||||
def summary_text(snapshot: dict[str, Any] | None) -> str:
|
||||
summary = build_summary(snapshot)
|
||||
if not summary:
|
||||
@ -1680,6 +1772,7 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
|
||||
bits.append(f"version={snapshot_version}")
|
||||
lines.append("snapshot: " + ", ".join(bits))
|
||||
_append_nodes(lines, summary)
|
||||
_append_lexicon(lines, summary)
|
||||
_append_pressure(lines, summary)
|
||||
_append_hardware(lines, summary)
|
||||
_append_node_facts(lines, summary)
|
||||
@ -1713,6 +1806,7 @@ def summary_text(snapshot: dict[str, Any] | None) -> str:
|
||||
_append_node_load_summary(lines, summary)
|
||||
_append_cluster_watchlist(lines, summary)
|
||||
_append_hardware_usage(lines, summary)
|
||||
_append_cross_stats(lines, summary)
|
||||
_append_flux(lines, summary)
|
||||
_append_signals(lines, summary)
|
||||
_append_profiles(lines, summary)
|
||||
|
||||
101
atlasbot/state/store.py
Normal file
101
atlasbot/state/store.py
Normal file
@ -0,0 +1,101 @@
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ClaimStore:
|
||||
def __init__(self, path: str, ttl_sec: int) -> None:
|
||||
self._path = path or ":memory:"
|
||||
self._ttl = max(60, ttl_sec)
|
||||
self._ensure_dir()
|
||||
self._init_db()
|
||||
|
||||
def _ensure_dir(self) -> None:
|
||||
if self._path in {":memory:", ""}:
|
||||
return
|
||||
parent = os.path.dirname(self._path)
|
||||
if parent:
|
||||
os.makedirs(parent, exist_ok=True)
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(self._path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def _init_db(self) -> None:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS atlasbot_claims (
|
||||
conversation_id TEXT PRIMARY KEY,
|
||||
updated_at REAL NOT NULL,
|
||||
snapshot_id TEXT,
|
||||
claims_json TEXT,
|
||||
snapshot_json TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS atlasbot_claims_updated ON atlasbot_claims(updated_at)"
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def get(self, conversation_id: str) -> dict[str, Any] | None:
|
||||
if not conversation_id:
|
||||
return None
|
||||
with self._connect() as conn:
|
||||
self._cleanup(conn)
|
||||
row = conn.execute(
|
||||
"SELECT updated_at, snapshot_id, claims_json, snapshot_json FROM atlasbot_claims WHERE conversation_id = ?",
|
||||
(conversation_id,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"updated_at": row["updated_at"],
|
||||
"snapshot_id": row["snapshot_id"],
|
||||
"claims": _safe_json(row["claims_json"], []),
|
||||
"snapshot": _safe_json(row["snapshot_json"], None),
|
||||
}
|
||||
|
||||
def set(self, conversation_id: str, payload: dict[str, Any]) -> None:
|
||||
if not conversation_id:
|
||||
return
|
||||
updated_at = float(payload.get("updated_at") or time.monotonic())
|
||||
snapshot_id = payload.get("snapshot_id")
|
||||
claims_json = json.dumps(payload.get("claims") or [])
|
||||
snapshot_json = json.dumps(payload.get("snapshot")) if payload.get("snapshot") is not None else None
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO atlasbot_claims (conversation_id, updated_at, snapshot_id, claims_json, snapshot_json)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(conversation_id) DO UPDATE SET
|
||||
updated_at=excluded.updated_at,
|
||||
snapshot_id=excluded.snapshot_id,
|
||||
claims_json=excluded.claims_json,
|
||||
snapshot_json=excluded.snapshot_json
|
||||
""",
|
||||
(conversation_id, updated_at, snapshot_id, claims_json, snapshot_json),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
with self._connect() as conn:
|
||||
self._cleanup(conn)
|
||||
|
||||
def _cleanup(self, conn: sqlite3.Connection) -> None:
|
||||
cutoff = time.monotonic() - self._ttl
|
||||
conn.execute("DELETE FROM atlasbot_claims WHERE updated_at < ?", (cutoff,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
def _safe_json(raw: str | None, fallback: Any) -> Any:
|
||||
if not raw:
|
||||
return fallback
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except Exception:
|
||||
return fallback
|
||||
Loading…
x
Reference in New Issue
Block a user