diff --git a/atlasbot/config.py b/atlasbot/config.py index f74dec5..565353f 100644 --- a/atlasbot/config.py +++ b/atlasbot/config.py @@ -55,6 +55,7 @@ class Settings: thinking_interval_sec: int conversation_ttl_sec: int snapshot_pin_enabled: bool + state_db_path: str queue_enabled: bool nats_url: str @@ -153,6 +154,7 @@ def load_settings() -> Settings: thinking_interval_sec=_env_int("ATLASBOT_THINKING_INTERVAL_SEC", "30"), conversation_ttl_sec=_env_int("ATLASBOT_CONVERSATION_TTL_SEC", "900"), snapshot_pin_enabled=_env_bool("ATLASBOT_SNAPSHOT_PIN_ENABLED", "false"), + state_db_path=os.getenv("ATLASBOT_STATE_DB", "/data/atlasbot_state.db"), queue_enabled=_env_bool("ATLASBOT_QUEUE_ENABLED", "false"), nats_url=os.getenv("ATLASBOT_NATS_URL", "nats://nats.nats.svc.cluster.local:4222"), nats_stream=os.getenv("ATLASBOT_NATS_STREAM", "atlasbot"), diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py index 2ec6bf5..c00e2ab 100644 --- a/atlasbot/engine/answerer.py +++ b/atlasbot/engine/answerer.py @@ -12,6 +12,7 @@ from atlasbot.knowledge.loader import KnowledgeBase from atlasbot.llm.client import LLMClient, build_messages, parse_json from atlasbot.llm import prompts from atlasbot.snapshot.builder import SnapshotProvider, build_summary, summary_text +from atlasbot.state.store import ClaimStore log = logging.getLogger(__name__) @@ -85,7 +86,7 @@ class AnswerEngine: self._llm = llm self._kb = kb self._snapshot = snapshot - self._state: dict[str, ConversationState] = {} + self._store = ClaimStore(settings.state_db_path, settings.conversation_ttl_sec) async def answer( self, @@ -111,6 +112,22 @@ class AnswerEngine: call_count = 0 limit_hit = False + debug_tags = { + "route", + "decompose", + "chunk_score", + "synth", + "subanswer", + "tool", + "followup", + "select_claims", + } + + def _debug_log(name: str, payload: Any) -> None: + if not self._settings.debug_pipeline: + return + log.info("atlasbot_debug", extra={"extra": {"name": name, "payload": payload}}) + async def call_llm(system: str, prompt: str, *, context: str | None = None, model: str | None = None, tag: str = "") -> str: nonlocal call_count, limit_hit if not limitless and call_count >= call_cap: @@ -123,6 +140,8 @@ class AnswerEngine: "atlasbot_llm_call", extra={"extra": {"mode": mode, "tag": tag, "call": call_count, "limit": call_cap}}, ) + if self._settings.debug_pipeline and tag in debug_tags: + _debug_log(f"llm_raw_{tag}", str(response)[:1200]) return response state = self._get_state(conversation_id) @@ -135,6 +154,7 @@ class AnswerEngine: kb_summary = self._kb.summary() runbooks = self._kb.runbook_titles(limit=6) history_ctx = _format_history(history) + lexicon_ctx = _lexicon_context(summary) started = time.monotonic() reply = "" @@ -146,19 +166,33 @@ class AnswerEngine: if observer: observer("normalize", "normalizing") normalize_prompt = prompts.NORMALIZE_PROMPT + "\nQuestion: " + question - normalize_raw = await call_llm(prompts.NORMALIZE_SYSTEM, normalize_prompt, model=plan.fast_model, tag="normalize") + normalize_raw = await call_llm( + prompts.NORMALIZE_SYSTEM, + normalize_prompt, + context=lexicon_ctx, + model=plan.fast_model, + tag="normalize", + ) normalize = _parse_json_block(normalize_raw, fallback={"normalized": question, "keywords": []}) normalized = str(normalize.get("normalized") or question).strip() or question keywords = normalize.get("keywords") or [] + _debug_log("normalize_parsed", {"normalized": normalized, "keywords": keywords}) if observer: observer("route", "routing") route_prompt = prompts.ROUTE_PROMPT + "\nQuestion: " + normalized + "\nKeywords: " + json.dumps(keywords) - route_raw = await call_llm(prompts.ROUTE_SYSTEM, route_prompt, context=kb_summary, model=plan.fast_model, tag="route") + route_raw = await call_llm( + prompts.ROUTE_SYSTEM, + route_prompt, + context=_join_context([kb_summary, lexicon_ctx]), + model=plan.fast_model, + tag="route", + ) classify = _parse_json_block(route_raw, fallback={}) classify.setdefault("needs_snapshot", True) classify.setdefault("answer_style", "direct") classify.setdefault("follow_up", False) + _debug_log("route_parsed", {"classify": classify, "normalized": normalized}) cluster_terms = ( "atlas", "cluster", @@ -187,11 +221,13 @@ class AnswerEngine: decompose_raw = await call_llm( prompts.DECOMPOSE_SYSTEM, decompose_prompt + "\nQuestion: " + normalized, + context=lexicon_ctx, model=plan.fast_model if mode == "quick" else plan.model, tag="decompose", ) parts = _parse_json_list(decompose_raw) sub_questions = _select_subquestions(parts, normalized, plan.max_subquestions) + _debug_log("decompose_parsed", {"sub_questions": sub_questions}) snapshot_context = "" if classify.get("needs_snapshot"): @@ -200,6 +236,19 @@ class AnswerEngine: chunks = _chunk_lines(summary_lines, plan.chunk_lines) scored = await _score_chunks(call_llm, chunks, normalized, sub_questions, plan) selected = _select_chunks(chunks, scored, plan) + if self._settings.debug_pipeline: + scored_preview = sorted( + [{"id": c["id"], "score": scored.get(c["id"], 0.0), "summary": c["summary"]} for c in chunks], + key=lambda item: item["score"], + reverse=True, + )[: min(len(chunks), max(plan.chunk_top, 6))] + _debug_log( + "chunk_selected", + { + "selected_ids": [item["id"] for item in selected], + "top_scored": scored_preview, + }, + ) snapshot_context = "ClusterSnapshot:\n" + "\n".join([chunk["text"] for chunk in selected]) context = _join_context( @@ -413,8 +462,8 @@ class AnswerEngine: def _get_state(self, conversation_id: str | None) -> ConversationState | None: if not conversation_id: return None - self._cleanup_state() - return self._state.get(conversation_id) + state_payload = self._store.get(conversation_id) + return _state_from_payload(state_payload) if state_payload else None def _store_state( self, @@ -425,20 +474,16 @@ class AnswerEngine: ) -> None: snapshot_id = _snapshot_id(summary) pinned_snapshot = snapshot if self._settings.snapshot_pin_enabled else None - self._state[conversation_id] = ConversationState( - updated_at=time.monotonic(), - claims=claims, - snapshot_id=snapshot_id, - snapshot=pinned_snapshot, - ) - self._cleanup_state() + payload = { + "updated_at": time.monotonic(), + "claims": _claims_to_payload(claims), + "snapshot_id": snapshot_id, + "snapshot": pinned_snapshot, + } + self._store.set(conversation_id, payload) def _cleanup_state(self) -> None: - ttl = max(60, self._settings.conversation_ttl_sec) - now = time.monotonic() - expired = [key for key, state in self._state.items() if now - state.updated_at > ttl] - for key in expired: - self._state.pop(key, None) + self._store.cleanup() def _build_meta( @@ -529,7 +574,7 @@ def _select_subquestions(parts: list[dict[str, Any]], fallback: str, limit: int) except (TypeError, ValueError): weight = 1.0 ranked.append((weight, question)) - ranked.sort(key=lambda item: item[0]) + ranked.sort(key=lambda item: item[0], reverse=True) questions = [item[1] for item in ranked][:limit] return questions or [fallback] @@ -541,7 +586,7 @@ def _chunk_lines(lines: list[str], lines_per_chunk: int) -> list[dict[str, Any]] for idx in range(0, len(lines), lines_per_chunk): chunk_lines = lines[idx : idx + lines_per_chunk] text = "\n".join(chunk_lines) - summary = " | ".join(chunk_lines[:2]) + summary = " | ".join(chunk_lines[:4]) chunks.append({"id": f"c{idx//lines_per_chunk}", "text": text, "summary": summary}) return chunks @@ -658,6 +703,32 @@ def _summary_lines(snapshot: dict[str, Any] | None) -> list[str]: return [line for line in text.splitlines() if line.strip()] +def _lexicon_context(summary: dict[str, Any]) -> str: + if not isinstance(summary, dict): + return "" + lexicon = summary.get("lexicon") + if not isinstance(lexicon, dict): + return "" + terms = lexicon.get("terms") + aliases = lexicon.get("aliases") + lines: list[str] = [] + if isinstance(terms, list): + for entry in terms[:8]: + if not isinstance(entry, dict): + continue + term = entry.get("term") + meaning = entry.get("meaning") + if term and meaning: + lines.append(f"{term}: {meaning}") + if isinstance(aliases, dict): + for key, value in list(aliases.items())[:6]: + if key and value: + lines.append(f"alias {key} -> {value}") + if not lines: + return "" + return "Lexicon:\n" + "\n".join(lines) + + def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]: raw = text.strip() match = re.search(r"\{.*\}", raw, flags=re.S) @@ -731,6 +802,55 @@ def _snapshot_id(summary: dict[str, Any]) -> str | None: return None +def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]: + output: list[dict[str, Any]] = [] + for claim in claims: + evidence = [] + for ev in claim.evidence: + evidence.append( + { + "path": ev.path, + "reason": ev.reason, + "value_at_claim": ev.value_at_claim, + } + ) + output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence}) + return output + + +def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None: + if not payload: + return None + claims_raw = payload.get("claims") if isinstance(payload, dict) else None + claims: list[ClaimItem] = [] + if isinstance(claims_raw, list): + for entry in claims_raw: + if not isinstance(entry, dict): + continue + claim_text = str(entry.get("claim") or "").strip() + claim_id = str(entry.get("id") or "").strip() + if not claim_text or not claim_id: + continue + evidence_items: list[EvidenceItem] = [] + for ev in entry.get("evidence") or []: + if not isinstance(ev, dict): + continue + path = str(ev.get("path") or "").strip() + if not path: + continue + reason = str(ev.get("reason") or "").strip() + value_at_claim = ev.get("value_at_claim") + evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim)) + if evidence_items: + claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items)) + return ConversationState( + updated_at=float(payload.get("updated_at") or time.monotonic()), + claims=claims, + snapshot_id=payload.get("snapshot_id"), + snapshot=payload.get("snapshot"), + ) + + def _json_excerpt(summary: dict[str, Any], max_chars: int = 12000) -> str: raw = json.dumps(summary, ensure_ascii=False) return raw[:max_chars] diff --git a/atlasbot/llm/prompts.py b/atlasbot/llm/prompts.py index cca10f9..005b115 100644 --- a/atlasbot/llm/prompts.py +++ b/atlasbot/llm/prompts.py @@ -9,8 +9,9 @@ CLUSTER_SYSTEM = ( ) NORMALIZE_SYSTEM = ( - "Normalize user questions for reasoning. " - "Return JSON only." + CLUSTER_SYSTEM + + " Normalize user questions for reasoning. " + + "Return JSON only." ) NORMALIZE_PROMPT = ( @@ -19,10 +20,11 @@ NORMALIZE_PROMPT = ( ) ROUTE_SYSTEM = ( - "Route the question to the best sources and answer style. " - "Assume questions are about the Titan Lab Atlas Kubernetes cluster unless the user explicitly asks about something else. " - "Prefer snapshot evidence when available. " - "Return JSON only." + CLUSTER_SYSTEM + + " Route the question to the best sources and answer style. " + + "Assume questions are about the Titan Lab Atlas Kubernetes cluster unless the user explicitly asks about something else. " + + "Prefer snapshot evidence when available. " + + "Return JSON only." ) ROUTE_PROMPT = ( @@ -31,8 +33,9 @@ ROUTE_PROMPT = ( ) DECOMPOSE_SYSTEM = ( - "Break complex questions into smaller, answerable sub-questions. " - "Return JSON only." + CLUSTER_SYSTEM + + " Break complex questions into smaller, answerable sub-questions. " + + "Return JSON only." ) DECOMPOSE_PROMPT = ( @@ -41,8 +44,9 @@ DECOMPOSE_PROMPT = ( ) RETRIEVER_SYSTEM = ( - "Score relevance of chunk summaries to the question and sub-questions. " - "Return JSON list only." + CLUSTER_SYSTEM + + " Score relevance of chunk summaries to the question and sub-questions. " + + "Return JSON list only." ) CHUNK_SCORE_PROMPT = ( @@ -51,8 +55,9 @@ CHUNK_SCORE_PROMPT = ( ) TOOL_SYSTEM = ( - "Suggest a safe, read-only command that could refine the answer. " - "Return JSON only." + CLUSTER_SYSTEM + + " Suggest a safe, read-only command that could refine the answer. " + + "Return JSON only." ) TOOL_PROMPT = ( @@ -61,8 +66,9 @@ TOOL_PROMPT = ( ) ANSWER_SYSTEM = ( - "Answer a focused sub-question using the provided context. " - "Be concise and grounded." + CLUSTER_SYSTEM + + " Answer a focused sub-question using the provided context. " + + "Be concise and grounded." ) SUBANSWER_PROMPT = ( @@ -71,8 +77,9 @@ SUBANSWER_PROMPT = ( ) SYNTHESIZE_SYSTEM = ( - "Synthesize a final answer from sub-answers. " - "Keep it conversational and grounded." + CLUSTER_SYSTEM + + " Synthesize a final answer from sub-answers. " + + "Keep it conversational and grounded." ) SYNTHESIZE_PROMPT = ( @@ -86,8 +93,9 @@ DRAFT_SELECT_PROMPT = ( ) CRITIC_SYSTEM = ( - "Critique answers for unsupported claims or missing context. " - "Return JSON only." + CLUSTER_SYSTEM + + " Critique answers for unsupported claims or missing context. " + + "Return JSON only." ) CRITIC_PROMPT = ( @@ -95,8 +103,9 @@ CRITIC_PROMPT = ( ) REVISION_SYSTEM = ( - "Revise the answer based on critique. " - "Keep the response grounded and concise." + CLUSTER_SYSTEM + + " Revise the answer based on critique. " + + "Keep the response grounded and concise." ) REVISION_PROMPT = ( @@ -105,8 +114,9 @@ REVISION_PROMPT = ( ) GAP_SYSTEM = ( - "Identify missing data that would improve the answer. " - "Return JSON only." + CLUSTER_SYSTEM + + " Identify missing data that would improve the answer. " + + "Return JSON only." ) EVIDENCE_GAP_PROMPT = ( @@ -115,8 +125,9 @@ EVIDENCE_GAP_PROMPT = ( ) CLAIM_SYSTEM = ( - "Extract claim-evidence mappings from the answer. " - "Return JSON only." + CLUSTER_SYSTEM + + " Extract claim-evidence mappings from the answer. " + + "Return JSON only." ) CLAIM_MAP_PROMPT = ( @@ -124,8 +135,9 @@ CLAIM_MAP_PROMPT = ( ) FOLLOWUP_SYSTEM = ( - "Answer follow-ups using prior claim evidence only. " - "Return JSON only when asked to select claims." + CLUSTER_SYSTEM + + " Answer follow-ups using prior claim evidence only. " + + "Return JSON only when asked to select claims." ) FOLLOWUP_PROMPT = ( diff --git a/atlasbot/snapshot/builder.py b/atlasbot/snapshot/builder.py index 708828a..0a8460b 100644 --- a/atlasbot/snapshot/builder.py +++ b/atlasbot/snapshot/builder.py @@ -115,6 +115,8 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) -> profiles = cluster_summary.get("profiles") inventory = cluster_summary.get("inventory") topology = cluster_summary.get("topology") + lexicon = cluster_summary.get("lexicon") + cross_stats = cluster_summary.get("cross_stats") if isinstance(signals, list): summary["signals"] = signals if isinstance(profiles, dict): @@ -123,6 +125,10 @@ def _merge_cluster_summary(snapshot: dict[str, Any], summary: dict[str, Any]) -> summary["inventory"] = inventory if isinstance(topology, dict): summary["topology"] = topology + if isinstance(lexicon, dict): + summary["lexicon"] = lexicon + if isinstance(cross_stats, dict): + summary["cross_stats"] = cross_stats def _nodes_detail(snapshot: dict[str, Any]) -> list[dict[str, Any]]: @@ -538,14 +544,25 @@ def _append_nodes(lines: list[str], summary: dict[str, Any]) -> None: workers_str = "" if workers_total is not None and workers_ready is not None: workers_str = f", workers_ready={workers_ready}/{workers_total}" + total = nodes.get("total") + ready = nodes.get("ready") + not_ready = nodes.get("not_ready") + if not_ready is None: + not_ready = 0 lines.append( "nodes: total={total}, ready={ready}, not_ready={not_ready}{workers}".format( - total=nodes.get("total"), - ready=nodes.get("ready"), - not_ready=nodes.get("not_ready"), + total=total, + ready=ready, + not_ready=not_ready, workers=workers_str, ) ) + if total is not None: + lines.append(f"nodes_total: {total}") + if ready is not None: + lines.append(f"nodes_ready: {ready}") + if not_ready is not None: + lines.append(f"nodes_not_ready_count: {not_ready}") if not isinstance(summary.get("nodes_summary"), dict): return not_ready_names = summary["nodes_summary"].get("not_ready_names") or [] @@ -1664,6 +1681,81 @@ def _append_workloads_by_namespace(lines: list[str], summary: dict[str, Any]) -> lines.append(f"workloads_top_{ns}: " + "; ".join(parts)) +def _append_lexicon(lines: list[str], summary: dict[str, Any]) -> None: + lexicon = summary.get("lexicon") + if not isinstance(lexicon, dict): + return + terms = lexicon.get("terms") if isinstance(lexicon.get("terms"), list) else [] + aliases = lexicon.get("aliases") if isinstance(lexicon.get("aliases"), dict) else {} + for entry in terms[:8]: + if not isinstance(entry, dict): + continue + term = entry.get("term") + meaning = entry.get("meaning") + if term and meaning: + lines.append(f"lexicon_term: {term} => {meaning}") + for key, value in list(aliases.items())[:6]: + if key and value: + lines.append(f"lexicon_alias: {key} => {value}") + + +def _append_cross_stats(lines: list[str], summary: dict[str, Any]) -> None: + cross_stats = summary.get("cross_stats") + if not isinstance(cross_stats, dict): + return + node_entries = cross_stats.get("node_metric_top") if isinstance(cross_stats.get("node_metric_top"), list) else [] + for entry in node_entries[:10]: + if not isinstance(entry, dict): + continue + metric = entry.get("metric") + node = entry.get("node") + value = entry.get("value") + cpu = entry.get("cpu") + ram = entry.get("ram") + net = entry.get("net") + io = entry.get("io") + pods = entry.get("pods_total") + if metric and node: + parts = [ + f"value={_format_float(value)}", + f"cpu={_format_float(cpu)}", + f"ram={_format_float(ram)}", + f"net={_format_float(net)}", + f"io={_format_float(io)}", + ] + if pods is not None: + parts.append(f"pods={pods}") + lines.append(f"cross_node_{metric}: {node} " + " ".join(parts)) + ns_entries = cross_stats.get("namespace_metric_top") if isinstance(cross_stats.get("namespace_metric_top"), list) else [] + for entry in ns_entries[:10]: + if not isinstance(entry, dict): + continue + metric = entry.get("metric") + namespace = entry.get("namespace") + value = entry.get("value") + pods = entry.get("pods_total") + cpu_ratio = entry.get("cpu_ratio") + mem_ratio = entry.get("mem_ratio") + if metric and namespace: + parts = [ + f"value={_format_float(value)}", + f"cpu_ratio={_format_float(cpu_ratio)}", + f"mem_ratio={_format_float(mem_ratio)}", + ] + if pods is not None: + parts.append(f"pods={pods}") + lines.append(f"cross_namespace_{metric}: {namespace} " + " ".join(parts)) + pvc_entries = cross_stats.get("pvc_top") if isinstance(cross_stats.get("pvc_top"), list) else [] + for entry in pvc_entries[:5]: + if not isinstance(entry, dict): + continue + namespace = entry.get("namespace") + pvc = entry.get("pvc") + used = entry.get("used_percent") + if namespace and pvc: + lines.append(f"cross_pvc_usage: {namespace}/{pvc} used={_format_float(used)}") + + def summary_text(snapshot: dict[str, Any] | None) -> str: summary = build_summary(snapshot) if not summary: @@ -1680,6 +1772,7 @@ def summary_text(snapshot: dict[str, Any] | None) -> str: bits.append(f"version={snapshot_version}") lines.append("snapshot: " + ", ".join(bits)) _append_nodes(lines, summary) + _append_lexicon(lines, summary) _append_pressure(lines, summary) _append_hardware(lines, summary) _append_node_facts(lines, summary) @@ -1713,6 +1806,7 @@ def summary_text(snapshot: dict[str, Any] | None) -> str: _append_node_load_summary(lines, summary) _append_cluster_watchlist(lines, summary) _append_hardware_usage(lines, summary) + _append_cross_stats(lines, summary) _append_flux(lines, summary) _append_signals(lines, summary) _append_profiles(lines, summary) diff --git a/atlasbot/state/store.py b/atlasbot/state/store.py new file mode 100644 index 0000000..6c9c408 --- /dev/null +++ b/atlasbot/state/store.py @@ -0,0 +1,101 @@ +import json +import os +import sqlite3 +import time +from typing import Any + + +class ClaimStore: + def __init__(self, path: str, ttl_sec: int) -> None: + self._path = path or ":memory:" + self._ttl = max(60, ttl_sec) + self._ensure_dir() + self._init_db() + + def _ensure_dir(self) -> None: + if self._path in {":memory:", ""}: + return + parent = os.path.dirname(self._path) + if parent: + os.makedirs(parent, exist_ok=True) + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self._path) + conn.row_factory = sqlite3.Row + return conn + + def _init_db(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS atlasbot_claims ( + conversation_id TEXT PRIMARY KEY, + updated_at REAL NOT NULL, + snapshot_id TEXT, + claims_json TEXT, + snapshot_json TEXT + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS atlasbot_claims_updated ON atlasbot_claims(updated_at)" + ) + conn.commit() + + def get(self, conversation_id: str) -> dict[str, Any] | None: + if not conversation_id: + return None + with self._connect() as conn: + self._cleanup(conn) + row = conn.execute( + "SELECT updated_at, snapshot_id, claims_json, snapshot_json FROM atlasbot_claims WHERE conversation_id = ?", + (conversation_id,), + ).fetchone() + if not row: + return None + return { + "updated_at": row["updated_at"], + "snapshot_id": row["snapshot_id"], + "claims": _safe_json(row["claims_json"], []), + "snapshot": _safe_json(row["snapshot_json"], None), + } + + def set(self, conversation_id: str, payload: dict[str, Any]) -> None: + if not conversation_id: + return + updated_at = float(payload.get("updated_at") or time.monotonic()) + snapshot_id = payload.get("snapshot_id") + claims_json = json.dumps(payload.get("claims") or []) + snapshot_json = json.dumps(payload.get("snapshot")) if payload.get("snapshot") is not None else None + with self._connect() as conn: + conn.execute( + """ + INSERT INTO atlasbot_claims (conversation_id, updated_at, snapshot_id, claims_json, snapshot_json) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(conversation_id) DO UPDATE SET + updated_at=excluded.updated_at, + snapshot_id=excluded.snapshot_id, + claims_json=excluded.claims_json, + snapshot_json=excluded.snapshot_json + """, + (conversation_id, updated_at, snapshot_id, claims_json, snapshot_json), + ) + conn.commit() + + def cleanup(self) -> None: + with self._connect() as conn: + self._cleanup(conn) + + def _cleanup(self, conn: sqlite3.Connection) -> None: + cutoff = time.monotonic() - self._ttl + conn.execute("DELETE FROM atlasbot_claims WHERE updated_at < ?", (cutoff,)) + conn.commit() + + +def _safe_json(raw: str | None, fallback: Any) -> Any: + if not raw: + return fallback + try: + return json.loads(raw) + except Exception: + return fallback