import json import os import sqlite3 import time from typing import Any class ClaimStore: def __init__(self, path: str, ttl_sec: int) -> None: self._path = path or ":memory:" self._ttl = max(60, ttl_sec) self._ensure_dir() self._init_db() def _ensure_dir(self) -> None: if self._path in {":memory:", ""}: return parent = os.path.dirname(self._path) if parent: os.makedirs(parent, exist_ok=True) def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self._path) conn.row_factory = sqlite3.Row return conn def _init_db(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS atlasbot_claims ( conversation_id TEXT PRIMARY KEY, updated_at REAL NOT NULL, snapshot_id TEXT, claims_json TEXT, snapshot_json TEXT ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS atlasbot_claims_updated ON atlasbot_claims(updated_at)" ) conn.commit() def get(self, conversation_id: str) -> dict[str, Any] | None: if not conversation_id: return None with self._connect() as conn: self._cleanup(conn) row = conn.execute( "SELECT updated_at, snapshot_id, claims_json, snapshot_json FROM atlasbot_claims WHERE conversation_id = ?", (conversation_id,), ).fetchone() if not row: return None return { "updated_at": row["updated_at"], "snapshot_id": row["snapshot_id"], "claims": _safe_json(row["claims_json"], []), "snapshot": _safe_json(row["snapshot_json"], None), } def set(self, conversation_id: str, payload: dict[str, Any]) -> None: if not conversation_id: return updated_at = float(payload.get("updated_at") or time.monotonic()) snapshot_id = payload.get("snapshot_id") claims_json = json.dumps(payload.get("claims") or []) snapshot_json = json.dumps(payload.get("snapshot")) if payload.get("snapshot") is not None else None with self._connect() as conn: conn.execute( """ INSERT INTO atlasbot_claims (conversation_id, updated_at, snapshot_id, claims_json, snapshot_json) VALUES (?, ?, ?, ?, ?) ON CONFLICT(conversation_id) DO UPDATE SET updated_at=excluded.updated_at, snapshot_id=excluded.snapshot_id, claims_json=excluded.claims_json, snapshot_json=excluded.snapshot_json """, (conversation_id, updated_at, snapshot_id, claims_json, snapshot_json), ) conn.commit() def cleanup(self) -> None: with self._connect() as conn: self._cleanup(conn) def _cleanup(self, conn: sqlite3.Connection) -> None: cutoff = time.monotonic() - self._ttl conn.execute("DELETE FROM atlasbot_claims WHERE updated_at < ?", (cutoff,)) conn.commit() def _safe_json(raw: str | None, fallback: Any) -> Any: if not raw: return fallback try: return json.loads(raw) except Exception: return fallback