197 lines
7.1 KiB
Python
197 lines
7.1 KiB
Python
from __future__ import annotations
|
|
|
|
from contextlib import contextmanager
|
|
from typing import Any, Iterator
|
|
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
from psycopg_pool import ConnectionPool
|
|
|
|
from . import settings
|
|
|
|
|
|
MIGRATION_LOCK_ID = 982731
|
|
_pool: ConnectionPool | None = None
|
|
|
|
|
|
def configured() -> bool:
|
|
return bool(settings.PORTAL_DATABASE_URL)
|
|
|
|
|
|
def _pool_kwargs() -> dict[str, Any]:
|
|
options = (
|
|
f"-c lock_timeout={settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s "
|
|
f"-c statement_timeout={settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s "
|
|
f"-c idle_in_transaction_session_timeout={settings.PORTAL_DB_IDLE_IN_TX_TIMEOUT_SEC}s"
|
|
)
|
|
return {
|
|
"connect_timeout": settings.PORTAL_DB_CONNECT_TIMEOUT_SEC,
|
|
"application_name": "atlas_portal",
|
|
"options": options,
|
|
"row_factory": dict_row,
|
|
}
|
|
|
|
|
|
def _get_pool() -> ConnectionPool:
|
|
global _pool
|
|
if _pool is None:
|
|
if not settings.PORTAL_DATABASE_URL:
|
|
raise RuntimeError("portal database not configured")
|
|
_pool = ConnectionPool(
|
|
conninfo=settings.PORTAL_DATABASE_URL,
|
|
min_size=settings.PORTAL_DB_POOL_MIN,
|
|
max_size=settings.PORTAL_DB_POOL_MAX,
|
|
kwargs=_pool_kwargs(),
|
|
)
|
|
return _pool
|
|
|
|
|
|
@contextmanager
|
|
def connect() -> Iterator[psycopg.Connection[Any]]:
|
|
if not settings.PORTAL_DATABASE_URL:
|
|
raise RuntimeError("portal database not configured")
|
|
with _get_pool().connection() as conn:
|
|
conn.row_factory = dict_row
|
|
yield conn
|
|
|
|
|
|
def _try_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> bool:
|
|
row = conn.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,)).fetchone()
|
|
if isinstance(row, dict):
|
|
return bool(row.get("pg_try_advisory_lock"))
|
|
return bool(row and row[0])
|
|
|
|
|
|
def _release_advisory_lock(conn: psycopg.Connection[Any], lock_id: int) -> None:
|
|
try:
|
|
conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_migrations() -> None:
|
|
if not settings.PORTAL_DATABASE_URL or not settings.PORTAL_RUN_MIGRATIONS:
|
|
return
|
|
with connect() as conn:
|
|
try:
|
|
conn.execute(f"SET lock_timeout = '{settings.PORTAL_DB_LOCK_TIMEOUT_SEC}s'")
|
|
conn.execute(f"SET statement_timeout = '{settings.PORTAL_DB_STATEMENT_TIMEOUT_SEC}s'")
|
|
except Exception:
|
|
pass
|
|
if not _try_advisory_lock(conn, MIGRATION_LOCK_ID):
|
|
return
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS access_requests (
|
|
request_code TEXT PRIMARY KEY,
|
|
username TEXT NOT NULL,
|
|
first_name TEXT,
|
|
last_name TEXT,
|
|
contact_email TEXT,
|
|
note TEXT,
|
|
status TEXT NOT NULL,
|
|
email_verification_token_hash TEXT,
|
|
email_verification_sent_at TIMESTAMPTZ,
|
|
email_verified_at TIMESTAMPTZ,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
decided_at TIMESTAMPTZ,
|
|
decided_by TEXT,
|
|
initial_password TEXT,
|
|
initial_password_revealed_at TIMESTAMPTZ,
|
|
provision_attempted_at TIMESTAMPTZ,
|
|
welcome_email_sent_at TIMESTAMPTZ,
|
|
approval_flags TEXT[],
|
|
approval_note TEXT,
|
|
denial_note TEXT
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
ALTER TABLE access_requests
|
|
ADD COLUMN IF NOT EXISTS initial_password TEXT,
|
|
ADD COLUMN IF NOT EXISTS initial_password_revealed_at TIMESTAMPTZ,
|
|
ADD COLUMN IF NOT EXISTS provision_attempted_at TIMESTAMPTZ,
|
|
ADD COLUMN IF NOT EXISTS email_verification_token_hash TEXT,
|
|
ADD COLUMN IF NOT EXISTS email_verification_sent_at TIMESTAMPTZ,
|
|
ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ,
|
|
ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ,
|
|
ADD COLUMN IF NOT EXISTS first_name TEXT,
|
|
ADD COLUMN IF NOT EXISTS last_name TEXT,
|
|
ADD COLUMN IF NOT EXISTS approval_flags TEXT[],
|
|
ADD COLUMN IF NOT EXISTS approval_note TEXT,
|
|
ADD COLUMN IF NOT EXISTS denial_note TEXT
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS access_request_tasks (
|
|
request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE,
|
|
task TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
detail TEXT,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
PRIMARY KEY (request_code, task)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS access_request_onboarding_steps (
|
|
request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE,
|
|
step TEXT NOT NULL,
|
|
completed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
PRIMARY KEY (request_code, step)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS access_request_onboarding_artifacts (
|
|
request_code TEXT NOT NULL REFERENCES access_requests(request_code) ON DELETE CASCADE,
|
|
artifact TEXT NOT NULL,
|
|
value_hash TEXT NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
PRIMARY KEY (request_code, artifact)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS access_requests_status_created_at
|
|
ON access_requests (status, created_at)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS access_request_tasks_request_code
|
|
ON access_request_tasks (request_code)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS access_request_onboarding_steps_request_code
|
|
ON access_request_onboarding_steps (request_code)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS access_request_onboarding_artifacts_request_code
|
|
ON access_request_onboarding_artifacts (request_code)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS access_requests_username_pending
|
|
ON access_requests (username)
|
|
WHERE status = 'pending'
|
|
"""
|
|
)
|
|
finally:
|
|
_release_advisory_lock(conn, MIGRATION_LOCK_ID)
|
|
|
|
|
|
def ensure_schema() -> None:
|
|
run_migrations()
|