db: move migrations to job and cap pools

This commit is contained in:
Brad Stein 2026-01-22 14:11:44 -03:00
parent e444a52b3d
commit bd8fa1fca5
5 changed files with 185 additions and 100 deletions

View File

@ -7,7 +7,6 @@ from flask import Flask, jsonify, send_from_directory
from flask_cors import CORS from flask_cors import CORS
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from .db import ensure_schema
from .routes import access_requests, account, admin_access, ai, auth_config, health, lab, monero from .routes import access_requests, account, admin_access, ai, auth_config, health, lab, monero
@ -16,8 +15,6 @@ def create_app() -> Flask:
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1)
CORS(app, resources={r"/api/*": {"origins": "*"}}) CORS(app, resources={r"/api/*": {"origins": "*"}})
ensure_schema()
health.register(app) health.register(app)
auth_config.register(app) auth_config.register(app)
account.register(app) account.register(app)

View File

@ -5,26 +5,80 @@ from typing import Any, Iterator
import psycopg import psycopg
from psycopg.rows import dict_row from psycopg.rows import dict_row
from psycopg_pool import ConnectionPool
from . import settings from . import settings
MIGRATION_LOCK_ID = 982731
_pool: ConnectionPool | None = None
def configured() -> bool: def configured() -> bool:
return bool(settings.PORTAL_DATABASE_URL) return bool(settings.PORTAL_DATABASE_URL)
def _pool_kwargs() -> dict[str, Any]:
options = (
f"-c lock_timeout={settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s "
f"-c statement_timeout={settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s "
f"-c idle_in_transaction_session_timeout={settings.PORTAL_DB_IDLE_IN_TX_TIMEOUT_SEC}s"
)
return {
"connect_timeout": settings.PORTAL_DB_CONNECT_TIMEOUT_SEC,
"application_name": "atlas_portal",
"options": options,
"row_factory": dict_row,
}
def _get_pool() -> ConnectionPool:
global _pool
if _pool is None:
if not settings.PORTAL_DATABASE_URL:
raise RuntimeError("portal database not configured")
_pool = ConnectionPool(
conninfo=settings.PORTAL_DATABASE_URL,
min_size=settings.PORTAL_DB_POOL_MIN,
max_size=settings.PORTAL_DB_POOL_MAX,
kwargs=_pool_kwargs(),
)
return _pool
@contextmanager @contextmanager
def connect() -> Iterator[psycopg.Connection[Any]]: def connect() -> Iterator[psycopg.Connection[Any]]:
if not settings.PORTAL_DATABASE_URL: if not settings.PORTAL_DATABASE_URL:
raise RuntimeError("portal database not configured") raise RuntimeError("portal database not configured")
with psycopg.connect(settings.PORTAL_DATABASE_URL, row_factory=dict_row) as conn: with _get_pool().connection() as conn:
conn.row_factory = dict_row
yield conn yield conn
def ensure_schema() -> None: def _try_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> bool:
if not settings.PORTAL_DATABASE_URL: row = conn.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,)).fetchone()
return bool(row and row[0])
def _release_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> None:
try:
conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
except Exception:
pass
def run_migrations() -> None:
if not settings.PORTAL_DATABASE_URL or not settings.PORTAL_RUN_MIGRATIONS:
return return
with connect() as conn: with connect() as conn:
try:
conn.execute(f"SET lock_timeout = '{settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s'")
conn.execute(f"SET statement_timeout = '{settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s'")
except Exception:
pass
if not _try_advisory_lock(conn, MIGRATION_LOCK_ID):
return
try:
conn.execute( conn.execute(
""" """
CREATE TABLE IF NOT EXISTS access_requests ( CREATE TABLE IF NOT EXISTS access_requests (
@ -43,22 +97,31 @@ def ensure_schema() -> None:
decided_by TEXT, decided_by TEXT,
initial_password TEXT, initial_password TEXT,
initial_password_revealed_at TIMESTAMPTZ, initial_password_revealed_at TIMESTAMPTZ,
provision_attempted_at TIMESTAMPTZ provision_attempted_at TIMESTAMPTZ,
welcome_email_sent_at TIMESTAMPTZ,
approval_flags TEXT[],
approval_note TEXT,
denial_note TEXT
) )
""" """
) )
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS initial_password TEXT") conn.execute(
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS initial_password_revealed_at TIMESTAMPTZ") """
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS provision_attempted_at TIMESTAMPTZ") ALTER TABLE access_requests
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS email_verification_token_hash TEXT") ADD COLUMN IF NOT EXISTS initial_password TEXT,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS email_verification_sent_at TIMESTAMPTZ") ADD COLUMN IF NOT EXISTS initial_password_revealed_at TIMESTAMPTZ,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ") ADD COLUMN IF NOT EXISTS provision_attempted_at TIMESTAMPTZ,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ") ADD COLUMN IF NOT EXISTS email_verification_token_hash TEXT,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS first_name TEXT") ADD COLUMN IF NOT EXISTS email_verification_sent_at TIMESTAMPTZ,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS last_name TEXT") ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_flags TEXT[]") ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_note TEXT") ADD COLUMN IF NOT EXISTS first_name TEXT,
conn.execute("ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS denial_note TEXT") ADD COLUMN IF NOT EXISTS last_name TEXT,
ADD COLUMN IF NOT EXISTS approval_flags TEXT[],
ADD COLUMN IF NOT EXISTS approval_note TEXT,
ADD COLUMN IF NOT EXISTS denial_note TEXT
"""
)
conn.execute( conn.execute(
""" """
CREATE TABLE IF NOT EXISTS access_request_tasks ( CREATE TABLE IF NOT EXISTS access_request_tasks (
@ -123,3 +186,9 @@ def ensure_schema() -> None:
WHERE status = 'pending' WHERE status = 'pending'
""" """
) )
finally:
_release_advisory_lock(conn, MIGRATION_LOCK_ID)
def ensure_schema() -> None:
run_migrations()

View File

@ -0,0 +1,11 @@
from __future__ import annotations
from .db import run_migrations
def main() -> None:
run_migrations()
if __name__ == "__main__":
main()

View File

@ -60,6 +60,13 @@ ACCOUNT_ALLOWED_GROUPS = [
] ]
PORTAL_DATABASE_URL = os.getenv("PORTAL_DATABASE_URL", "").strip() PORTAL_DATABASE_URL = os.getenv("PORTAL_DATABASE_URL", "").strip()
PORTAL_DB_POOL_MIN = int(os.getenv("PORTAL_DB_POOL_MIN", "0"))
PORTAL_DB_POOL_MAX = int(os.getenv("PORTAL_DB_POOL_MAX", "5"))
PORTAL_DB_CONNECT_TIMEOUT_SEC = int(os.getenv("PORTAL_DB_CONNECT_TIMEOUT_SEC", "5"))
PORTAL_DB_LOCK_TIMEOUT_SEC = int(os.getenv("PORTAL_DB_LOCK_TIMEOUT_SEC", "5"))
PORTAL_DB_STATEMENT_TIMEOUT_SEC = int(os.getenv("PORTAL_DB_STATEMENT_TIMEOUT_SEC", "30"))
PORTAL_DB_IDLE_IN_TX_TIMEOUT_SEC = int(os.getenv("PORTAL_DB_IDLE_IN_TX_TIMEOUT_SEC", "10"))
PORTAL_RUN_MIGRATIONS = _env_bool("PORTAL_RUN_MIGRATIONS", "false")
PORTAL_ADMIN_USERS = [u.strip() for u in os.getenv("PORTAL_ADMIN_USERS", "bstein").split(",") if u.strip()] PORTAL_ADMIN_USERS = [u.strip() for u in os.getenv("PORTAL_ADMIN_USERS", "bstein").split(",") if u.strip()]
PORTAL_ADMIN_GROUPS = [g.strip() for g in os.getenv("PORTAL_ADMIN_GROUPS", "admin").split(",") if g.strip()] PORTAL_ADMIN_GROUPS = [g.strip() for g in os.getenv("PORTAL_ADMIN_GROUPS", "admin").split(",") if g.strip()]

View File

@ -4,3 +4,4 @@ gunicorn==21.2.0
httpx==0.27.2 httpx==0.27.2
PyJWT[crypto]==2.10.1 PyJWT[crypto]==2.10.1
psycopg[binary]==3.2.6 psycopg[binary]==3.2.6
psycopg-pool==3.2.6