from __future__ import annotations from contextlib import contextmanager from typing import Any, Iterator import psycopg from psycopg.rows import dict_row from psycopg_pool import ConnectionPool from . import settings MIGRATION_LOCK_ID = 982731 _pool: ConnectionPool | None = None def configured() -> bool: return bool(settings.PORTAL_DATABASE_URL) def _pool_kwargs() -> dict[str, Any]: options = ( f"-c lock_timeout={settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s " f"-c statement_timeout={settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s " f"-c idle_in_transaction_session_timeout={settings.PORTAL_DB_IDLE_IN_TX_TIMEOUT_SEC}s" ) return { "connect_timeout": settings.PORTAL_DB_CONNECT_TIMEOUT_SEC, "application_name": "atlas_portal", "options": options, "row_factory": dict_row, } def _get_pool() -> ConnectionPool: global _pool if _pool is None: if not settings.PORTAL_DATABASE_URL: raise RuntimeError("portal database not configured") _pool = ConnectionPool( conninfo=settings.PORTAL_DATABASE_URL, min_size=settings.PORTAL_DB_POOL_MIN, max_size=settings.PORTAL_DB_POOL_MAX, kwargs=_pool_kwargs(), ) return _pool @contextmanager def connect() -> Iterator[psycopg.Connection[Any]]: if not settings.PORTAL_DATABASE_URL: raise RuntimeError("portal database not configured") with _get_pool().connection() as conn: conn.row_factory = dict_row yield conn def _try_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> bool: row = conn.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,)).fetchone() if isinstance(row, dict): return bool(row.get("pg_try_advisory_lock")) return bool(row and row[0]) def _release_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> None: try: conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,)) except Exception: pass def run_migrations() -> None: if not settings.PORTAL_DATABASE_URL or not settings.PORTAL_RUN_MIGRATIONS: return with connect() as conn: try: conn.execute(f"SET lock_timeout = '{settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s'") conn.execute(f"SET statement_timeout = '{settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s'") except Exception: pass if not _try_advisory_lock(conn, MIGRATION_LOCK_ID): return try: conn.execute( """ CREATE TABLE IF NOT EXISTS access_requests ( request_code TEXT PRIMARY KEY, username TEXT NOT NULL, first_name TEXT, last_name TEXT, contact_email TEXT, note TEXT, status TEXT NOT NULL, email_verification_token_hash TEXT, email_verification_sent_at TIMESTAMPTZ, email_verified_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), decided_at TIMESTAMPTZ, decided_by TEXT, initial_password TEXT, initial_password_revealed_at TIMESTAMPTZ, provision_attempted_at TIMESTAMPTZ, welcome_email_sent_at TIMESTAMPTZ, approval_flags TEXT[], approval_note TEXT, denial_note TEXT ) """ ) conn.execute( """ ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS initial_password TEXT, ADD COLUMN IF NOT EXISTS initial_password_revealed_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS provision_attempted_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS email_verification_token_hash TEXT, ADD COLUMN IF NOT EXISTS email_verification_sent_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS first_name TEXT, ADD COLUMN IF NOT EXISTS last_name TEXT, ADD COLUMN IF NOT EXISTS approval_flags TEXT[], ADD COLUMN IF NOT EXISTS approval_note TEXT, ADD COLUMN IF NOT EXISTS denial_note TEXT """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS access_request_tasks ( request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE, task TEXT NOT NULL, status TEXT NOT NULL, detail TEXT, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (request_code, task) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS access_request_onboarding_steps ( request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE, step TEXT NOT NULL, completed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (request_code, step) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS access_request_onboarding_artifacts ( request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE, artifact TEXT NOT NULL, value_hash TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (request_code, artifact) ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS access_requests_status_created_at ON access_requests (status, created_at) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS access_request_tasks_request_code ON access_request_tasks (request_code) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS access_request_onboarding_steps_request_code ON access_request_onboarding_steps (request_code) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS access_request_onboarding_artifacts_request_code ON access_request_onboarding_artifacts (request_code) """ ) conn.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS access_requests_username_pending ON access_requests (username) WHERE status = 'pending' """ ) finally: _release_advisory_lock(conn, MIGRATION_LOCK_ID) def ensure_schema() -> None: run_migrations()