from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import hashlib import time import httpx from . import settings from .db import connect from .keycloak import admin_client from .utils import random_password from .vaultwarden import invite_user MAILU_EMAIL_ATTR = "mailu_email" MAILU_APP_PASSWORD_ATTR = "mailu_app_password" REQUIRED_PROVISION_TASKS: tuple[str, ...] = ( "keycloak_user", "keycloak_password", "keycloak_groups", "mailu_app_password", "mailu_sync", "vaultwarden_invite", ) @dataclass(frozen=True) class ProvisionResult: ok: bool status: str def _advisory_lock_id(request_code: str) -> int: digest = hashlib.sha256(request_code.encode("utf-8")).digest() return int.from_bytes(digest[:8], "big", signed=True) def _upsert_task(conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: conn.execute( """ INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (request_code, task) DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() """, (request_code, task, status, detail), ) def _task_statuses(conn, request_code: str) -> dict[str, str]: rows = conn.execute( "SELECT task, status FROM access_request_tasks WHERE request_code = %s", (request_code,), ).fetchall() output: dict[str, str] = {} for row in rows: task = row.get("task") if isinstance(row, dict) else None status = row.get("status") if isinstance(row, dict) else None if isinstance(task, str) and isinstance(status, str): output[task] = status return output def _all_tasks_ok(conn, request_code: str, tasks: list[str]) -> bool: statuses = _task_statuses(conn, request_code) for task in tasks: if statuses.get(task) != "ok": return False return True def provision_tasks_complete(conn, request_code: str) -> bool: return _all_tasks_ok(conn, request_code, list(REQUIRED_PROVISION_TASKS)) def provision_access_request(request_code: str) -> ProvisionResult: if not request_code: return ProvisionResult(ok=False, status="unknown") if not admin_client().ready(): return ProvisionResult(ok=False, status="accounts_building") required_tasks = list(REQUIRED_PROVISION_TASKS) with connect() as conn: lock_id = _advisory_lock_id(request_code) lock_row = conn.execute( "SELECT pg_try_advisory_lock(%s) AS locked", (lock_id,), ).fetchone() if not lock_row or not lock_row.get("locked"): return ProvisionResult(ok=False, status="accounts_building") try: row = conn.execute( """ SELECT username, contact_email, email_verified_at, status, initial_password, initial_password_revealed_at, provision_attempted_at FROM access_requests WHERE request_code = %s """, (request_code,), ).fetchone() if not row: return ProvisionResult(ok=False, status="unknown") username = str(row.get("username") or "") contact_email = str(row.get("contact_email") or "") email_verified_at = row.get("email_verified_at") status = str(row.get("status") or "") initial_password = row.get("initial_password") revealed_at = row.get("initial_password_revealed_at") attempted_at = row.get("provision_attempted_at") if status not in {"accounts_building", "awaiting_onboarding", "ready"}: return ProvisionResult(ok=False, status=status or "unknown") if status == "accounts_building": now = datetime.now(timezone.utc) if isinstance(attempted_at, datetime): if attempted_at.tzinfo is None: attempted_at = attempted_at.replace(tzinfo=timezone.utc) age_sec = (now - attempted_at).total_seconds() if age_sec < settings.ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC: return ProvisionResult(ok=False, status="accounts_building") conn.execute( "UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s", (request_code,), ) user_id = "" mailu_email = f"{username}@{settings.MAILU_DOMAIN}" # Task: ensure Keycloak user exists try: user = admin_client().find_user(username) if not user: email = contact_email.strip() if not email: raise RuntimeError("contact email missing") payload = { "username": username, "enabled": True, "email": email, "emailVerified": bool(email_verified_at), "requiredActions": ["CONFIGURE_TOTP"], "attributes": {MAILU_EMAIL_ATTR: [mailu_email]}, } created_id = admin_client().create_user(payload) user = admin_client().get_user(created_id) user_id = str((user or {}).get("id") or "") if not user_id: raise RuntimeError("user id missing") try: full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} mailu_from_attr: str | None = None if isinstance(attrs, dict): raw_mailu = attrs.get(MAILU_EMAIL_ATTR) if isinstance(raw_mailu, list): for item in raw_mailu: if isinstance(item, str) and item.strip(): mailu_from_attr = item.strip() break elif isinstance(raw_mailu, str) and raw_mailu.strip(): mailu_from_attr = raw_mailu.strip() if mailu_from_attr: mailu_email = mailu_from_attr else: mailu_email = f"{username}@{settings.MAILU_DOMAIN}" admin_client().set_user_attribute(username, MAILU_EMAIL_ATTR, mailu_email) except Exception: mailu_email = f"{username}@{settings.MAILU_DOMAIN}" _upsert_task(conn, request_code, "keycloak_user", "ok", None) except Exception: _upsert_task(conn, request_code, "keycloak_user", "error", "failed to ensure user") # Task: set initial temporary password and store it for "show once" onboarding. try: if not user_id: raise RuntimeError("missing user id") should_reset = status == "accounts_building" and revealed_at is None password_value: str | None = None if should_reset: if isinstance(initial_password, str) and initial_password: password_value = initial_password elif initial_password is None: password_value = random_password(20) conn.execute( """ UPDATE access_requests SET initial_password = %s WHERE request_code = %s AND initial_password IS NULL """, (password_value, request_code), ) initial_password = password_value if password_value: admin_client().reset_password(user_id, password_value, temporary=True) if isinstance(initial_password, str) and initial_password: _upsert_task(conn, request_code, "keycloak_password", "ok", None) elif revealed_at is not None: _upsert_task(conn, request_code, "keycloak_password", "ok", "initial password already revealed") else: raise RuntimeError("initial password missing") except Exception: _upsert_task(conn, request_code, "keycloak_password", "error", "failed to set password") # Task: group membership (default dev) try: if not user_id: raise RuntimeError("missing user id") groups = settings.DEFAULT_USER_GROUPS or ["dev"] for group_name in groups: gid = admin_client().get_group_id(group_name) if not gid: raise RuntimeError("group missing") admin_client().add_user_to_group(user_id, gid) _upsert_task(conn, request_code, "keycloak_groups", "ok", None) except Exception: _upsert_task(conn, request_code, "keycloak_groups", "error", "failed to add groups") # Task: ensure mailu_app_password attribute exists try: if not user_id: raise RuntimeError("missing user id") full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} existing = None if isinstance(attrs, dict): raw = attrs.get(MAILU_APP_PASSWORD_ATTR) if isinstance(raw, list) and raw and isinstance(raw[0], str): existing = raw[0] elif isinstance(raw, str) and raw: existing = raw if not existing: admin_client().set_user_attribute(username, MAILU_APP_PASSWORD_ATTR, random_password()) _upsert_task(conn, request_code, "mailu_app_password", "ok", None) except Exception: _upsert_task(conn, request_code, "mailu_app_password", "error", "failed to set mail password") # Task: trigger Mailu sync if configured try: if not settings.MAILU_SYNC_URL: _upsert_task(conn, request_code, "mailu_sync", "ok", "sync disabled") else: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_access_approve"}, ) if resp.status_code != 200: raise RuntimeError("mailu sync failed") _upsert_task(conn, request_code, "mailu_sync", "ok", None) except Exception: _upsert_task(conn, request_code, "mailu_sync", "error", "failed to sync mailu") # Task: ensure Vaultwarden account exists (invite flow) try: if not user_id: raise RuntimeError("missing user id") result = invite_user(mailu_email or f"{username}@{settings.MAILU_DOMAIN}") if result.ok: _upsert_task(conn, request_code, "vaultwarden_invite", "ok", result.status) else: _upsert_task(conn, request_code, "vaultwarden_invite", "error", result.detail or result.status) except Exception: _upsert_task(conn, request_code, "vaultwarden_invite", "error", "failed to provision vaultwarden") if _all_tasks_ok(conn, request_code, required_tasks): conn.execute( """ UPDATE access_requests SET status = 'awaiting_onboarding' WHERE request_code = %s AND status = 'accounts_building' """, (request_code,), ) return ProvisionResult(ok=True, status="awaiting_onboarding") return ProvisionResult(ok=False, status="accounts_building") finally: conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))