from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import hashlib import time import httpx from . import settings from .db import connect from .keycloak import admin_client from .nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync from .utils import random_password from .vaultwarden import invite_user from .firefly_user_sync import trigger as trigger_firefly_user_sync from .wger_user_sync import trigger as trigger_wger_user_sync MAILU_EMAIL_ATTR = "mailu_email" MAILU_APP_PASSWORD_ATTR = "mailu_app_password" MAILU_ENABLED_ATTR = "mailu_enabled" WGER_PASSWORD_ATTR = "wger_password" WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" FIREFLY_PASSWORD_ATTR = "firefly_password" FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" REQUIRED_PROVISION_TASKS: tuple[str, ...] = ( "keycloak_user", "keycloak_password", "keycloak_groups", "mailu_app_password", "mailu_sync", "nextcloud_mail_sync", "wger_account", "firefly_account", "vaultwarden_invite", ) @dataclass(frozen=True) class ProvisionResult: ok: bool status: str def _advisory_lock_id(request_code: str) -> int: digest = hashlib.sha256(request_code.encode("utf-8")).digest() return int.from_bytes(digest[:8], "big", signed=True) def _upsert_task(conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: conn.execute( """ INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (request_code, task) DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() """, (request_code, task, status, detail), ) def _ensure_task_rows(conn, request_code: str, tasks: list[str]) -> None: if not tasks: return conn.execute( """ INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) SELECT %s, task, 'pending', NULL, NOW() FROM UNNEST(%s::text[]) AS task ON CONFLICT (request_code, task) DO NOTHING """, (request_code, tasks), ) def _safe_error_detail(exc: Exception, fallback: str) -> str: if isinstance(exc, RuntimeError): msg = str(exc).strip() if msg: return msg if isinstance(exc, httpx.HTTPStatusError): detail = f"http {exc.response.status_code}" try: payload = exc.response.json() msg: str | None = None if isinstance(payload, dict): raw = payload.get("errorMessage") or payload.get("error") or payload.get("message") if isinstance(raw, str) and raw.strip(): msg = raw.strip() elif isinstance(payload, str) and payload.strip(): msg = payload.strip() if msg: msg = " ".join(msg.split()) detail = f"{detail}: {msg[:200]}" except Exception: text = (exc.response.text or "").strip() if text: text = " ".join(text.split()) detail = f"{detail}: {text[:200]}" return detail if isinstance(exc, httpx.TimeoutException): return "timeout" return fallback def _task_statuses(conn, request_code: str) -> dict[str, str]: rows = conn.execute( "SELECT task, status FROM access_request_tasks WHERE request_code = %s", (request_code,), ).fetchall() output: dict[str, str] = {} for row in rows: task = row.get("task") if isinstance(row, dict) else None status = row.get("status") if isinstance(row, dict) else None if isinstance(task, str) and isinstance(status, str): output[task] = status return output def _all_tasks_ok(conn, request_code: str, tasks: list[str]) -> bool: statuses = _task_statuses(conn, request_code) for task in tasks: if statuses.get(task) != "ok": return False return True def provision_tasks_complete(conn, request_code: str) -> bool: return _all_tasks_ok(conn, request_code, list(REQUIRED_PROVISION_TASKS)) def provision_access_request(request_code: str) -> ProvisionResult: if not request_code: return ProvisionResult(ok=False, status="unknown") if not admin_client().ready(): return ProvisionResult(ok=False, status="accounts_building") required_tasks = list(REQUIRED_PROVISION_TASKS) with connect() as conn: lock_id = _advisory_lock_id(request_code) lock_row = conn.execute( "SELECT pg_try_advisory_lock(%s) AS locked", (lock_id,), ).fetchone() if not lock_row or not lock_row.get("locked"): return ProvisionResult(ok=False, status="accounts_building") try: row = conn.execute( """ SELECT username, contact_email, email_verified_at, status, initial_password, initial_password_revealed_at, provision_attempted_at FROM access_requests WHERE request_code = %s """, (request_code,), ).fetchone() if not row: return ProvisionResult(ok=False, status="unknown") username = str(row.get("username") or "") contact_email = str(row.get("contact_email") or "") email_verified_at = row.get("email_verified_at") status = str(row.get("status") or "") initial_password = row.get("initial_password") revealed_at = row.get("initial_password_revealed_at") attempted_at = row.get("provision_attempted_at") if status == "approved": conn.execute( "UPDATE access_requests SET status = 'accounts_building' WHERE request_code = %s AND status = 'approved'", (request_code,), ) status = "accounts_building" if status not in {"accounts_building", "awaiting_onboarding", "ready"}: return ProvisionResult(ok=False, status=status or "unknown") _ensure_task_rows(conn, request_code, required_tasks) if status == "accounts_building": now = datetime.now(timezone.utc) if isinstance(attempted_at, datetime): if attempted_at.tzinfo is None: attempted_at = attempted_at.replace(tzinfo=timezone.utc) age_sec = (now - attempted_at).total_seconds() if age_sec < settings.ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC: return ProvisionResult(ok=False, status="accounts_building") conn.execute( "UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s", (request_code,), ) user_id = "" mailu_email = f"{username}@{settings.MAILU_DOMAIN}" # Task: ensure Keycloak user exists try: user = admin_client().find_user(username) if not user: email = contact_email.strip() if not email: raise RuntimeError("missing verified email address") existing_email_user = admin_client().find_user_by_email(email) if existing_email_user and (existing_email_user.get("username") or "") != username: raise RuntimeError("email is already associated with an existing Atlas account") # The portal already verified the external contact email before approval, # so mark it as verified in Keycloak. # # Do not force password rotation on first login: the onboarding flow # intentionally guides users through Vaultwarden first, then triggers a # Keycloak password change step later. # # Do not force MFA enrollment during initial login: users can opt into MFA # later. required_actions: list[str] = [] payload = { "username": username, "enabled": True, "email": email, "emailVerified": True, "requiredActions": required_actions, "attributes": { MAILU_EMAIL_ATTR: [mailu_email], MAILU_ENABLED_ATTR: ["true"], }, } created_id = admin_client().create_user(payload) user = admin_client().get_user(created_id) user_id = str((user or {}).get("id") or "") if not user_id: raise RuntimeError("user id missing") try: full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} actions = full.get("requiredActions") if isinstance(actions, list) and "CONFIGURE_TOTP" in actions: # Backfill earlier accounts created when we forced MFA enrollment. new_actions = [a for a in actions if a != "CONFIGURE_TOTP"] admin_client().update_user_safe(user_id, {"requiredActions": new_actions}) mailu_from_attr: str | None = None if isinstance(attrs, dict): raw_mailu = attrs.get(MAILU_EMAIL_ATTR) if isinstance(raw_mailu, list): for item in raw_mailu: if isinstance(item, str) and item.strip(): mailu_from_attr = item.strip() break elif isinstance(raw_mailu, str) and raw_mailu.strip(): mailu_from_attr = raw_mailu.strip() if mailu_from_attr: mailu_email = mailu_from_attr else: mailu_email = f"{username}@{settings.MAILU_DOMAIN}" admin_client().set_user_attribute(username, MAILU_EMAIL_ATTR, mailu_email) try: raw_enabled = attrs.get(MAILU_ENABLED_ATTR) if isinstance(attrs, dict) else None enabled_value = "" if isinstance(raw_enabled, list) and raw_enabled: enabled_value = str(raw_enabled[0]).strip() elif isinstance(raw_enabled, str): enabled_value = raw_enabled.strip() if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}: admin_client().set_user_attribute(username, MAILU_ENABLED_ATTR, "true") except Exception: pass except Exception: mailu_email = f"{username}@{settings.MAILU_DOMAIN}" _upsert_task(conn, request_code, "keycloak_user", "ok", None) except Exception as exc: _upsert_task(conn, request_code, "keycloak_user", "error", _safe_error_detail(exc, "failed to ensure user")) if not user_id: return ProvisionResult(ok=False, status="accounts_building") # Task: set initial password and store it for "show once" onboarding. try: if not user_id: raise RuntimeError("missing user id") should_reset = status == "accounts_building" and revealed_at is None password_value: str | None = None if should_reset: if isinstance(initial_password, str) and initial_password: password_value = initial_password elif initial_password is None: password_value = random_password(20) conn.execute( """ UPDATE access_requests SET initial_password = %s WHERE request_code = %s AND initial_password IS NULL """, (password_value, request_code), ) initial_password = password_value if password_value: admin_client().reset_password(user_id, password_value, temporary=False) if isinstance(initial_password, str) and initial_password: _upsert_task(conn, request_code, "keycloak_password", "ok", None) elif revealed_at is not None: _upsert_task(conn, request_code, "keycloak_password", "ok", "initial password already revealed") else: raise RuntimeError("initial password missing") except Exception as exc: _upsert_task(conn, request_code, "keycloak_password", "error", _safe_error_detail(exc, "failed to set password")) # Task: group membership (default dev) try: if not user_id: raise RuntimeError("missing user id") groups = settings.DEFAULT_USER_GROUPS or ["dev"] for group_name in groups: gid = admin_client().get_group_id(group_name) if not gid: raise RuntimeError("group missing") admin_client().add_user_to_group(user_id, gid) _upsert_task(conn, request_code, "keycloak_groups", "ok", None) except Exception as exc: _upsert_task(conn, request_code, "keycloak_groups", "error", _safe_error_detail(exc, "failed to add groups")) # Task: ensure mailu_app_password attribute exists try: if not user_id: raise RuntimeError("missing user id") full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} existing = None if isinstance(attrs, dict): raw = attrs.get(MAILU_APP_PASSWORD_ATTR) if isinstance(raw, list) and raw and isinstance(raw[0], str): existing = raw[0] elif isinstance(raw, str) and raw: existing = raw if not existing: admin_client().set_user_attribute(username, MAILU_APP_PASSWORD_ATTR, random_password()) _upsert_task(conn, request_code, "mailu_app_password", "ok", None) except Exception as exc: _upsert_task(conn, request_code, "mailu_app_password", "error", _safe_error_detail(exc, "failed to set mail password")) # Task: trigger Mailu sync if configured try: if not settings.MAILU_SYNC_URL: _upsert_task(conn, request_code, "mailu_sync", "ok", "sync disabled") else: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_access_approve"}, ) if resp.status_code != 200: raise RuntimeError("mailu sync failed") _upsert_task(conn, request_code, "mailu_sync", "ok", None) except Exception as exc: _upsert_task(conn, request_code, "mailu_sync", "error", _safe_error_detail(exc, "failed to sync mailu")) # Task: trigger Nextcloud mail sync if configured try: if not settings.NEXTCLOUD_NAMESPACE or not settings.NEXTCLOUD_MAIL_SYNC_CRONJOB: _upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", "sync disabled") else: result = trigger_nextcloud_mail_sync(username, wait=True) if isinstance(result, dict) and result.get("status") == "ok": _upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", None) else: status_val = result.get("status") if isinstance(result, dict) else "error" _upsert_task(conn, request_code, "nextcloud_mail_sync", "error", str(status_val)) except Exception as exc: _upsert_task(conn, request_code, "nextcloud_mail_sync", "error", _safe_error_detail(exc, "failed to sync nextcloud")) # Task: ensure wger account exists try: if not user_id: raise RuntimeError("missing user id") full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} wger_password = "" wger_password_updated_at = "" if isinstance(attrs, dict): raw_pw = attrs.get(WGER_PASSWORD_ATTR) if isinstance(raw_pw, list) and raw_pw and isinstance(raw_pw[0], str): wger_password = raw_pw[0] elif isinstance(raw_pw, str) and raw_pw: wger_password = raw_pw raw_updated = attrs.get(WGER_PASSWORD_UPDATED_ATTR) if isinstance(raw_updated, list) and raw_updated and isinstance(raw_updated[0], str): wger_password_updated_at = raw_updated[0] elif isinstance(raw_updated, str) and raw_updated: wger_password_updated_at = raw_updated if not wger_password: wger_password = random_password(20) admin_client().set_user_attribute(username, WGER_PASSWORD_ATTR, wger_password) wger_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}" if not wger_password_updated_at: result = trigger_wger_user_sync(username, wger_email, wger_password, wait=True) status_val = result.get("status") if isinstance(result, dict) else "error" if status_val != "ok": raise RuntimeError(f"wger sync {status_val}") now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") admin_client().set_user_attribute(username, WGER_PASSWORD_UPDATED_ATTR, now_iso) _upsert_task(conn, request_code, "wger_account", "ok", None) except Exception as exc: _upsert_task(conn, request_code, "wger_account", "error", _safe_error_detail(exc, "failed to provision wger")) # Task: ensure firefly account exists try: if not user_id: raise RuntimeError("missing user id") full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} firefly_password = "" firefly_password_updated_at = "" if isinstance(attrs, dict): raw_pw = attrs.get(FIREFLY_PASSWORD_ATTR) if isinstance(raw_pw, list) and raw_pw and isinstance(raw_pw[0], str): firefly_password = raw_pw[0] elif isinstance(raw_pw, str) and raw_pw: firefly_password = raw_pw raw_updated = attrs.get(FIREFLY_PASSWORD_UPDATED_ATTR) if isinstance(raw_updated, list) and raw_updated and isinstance(raw_updated[0], str): firefly_password_updated_at = raw_updated[0] elif isinstance(raw_updated, str) and raw_updated: firefly_password_updated_at = raw_updated if not firefly_password: firefly_password = random_password(24) admin_client().set_user_attribute(username, FIREFLY_PASSWORD_ATTR, firefly_password) firefly_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}" if not firefly_password_updated_at: result = trigger_firefly_user_sync(username, firefly_email, firefly_password, wait=True) status_val = result.get("status") if isinstance(result, dict) else "error" if status_val != "ok": raise RuntimeError(f"firefly sync {status_val}") now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") admin_client().set_user_attribute(username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso) _upsert_task(conn, request_code, "firefly_account", "ok", None) except Exception as exc: _upsert_task( conn, request_code, "firefly_account", "error", _safe_error_detail(exc, "failed to provision firefly"), ) # Task: ensure Vaultwarden account exists (invite flow) try: if not user_id: raise RuntimeError("missing user id") vaultwarden_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}" try: full = admin_client().get_user(user_id) attrs = full.get("attributes") or {} override = None if isinstance(attrs, dict): raw = attrs.get("vaultwarden_email") if isinstance(raw, list): for item in raw: if isinstance(item, str) and item.strip(): override = item.strip() break elif isinstance(raw, str) and raw.strip(): override = raw.strip() if override: vaultwarden_email = override except Exception: pass result = invite_user(vaultwarden_email) if not result.ok and result.status == "error": fallback_email = contact_email.strip() if fallback_email and fallback_email != vaultwarden_email: fallback_result = invite_user(fallback_email) if fallback_result.ok: vaultwarden_email = fallback_email result = fallback_result if result.ok: _upsert_task(conn, request_code, "vaultwarden_invite", "ok", result.status) else: _upsert_task(conn, request_code, "vaultwarden_invite", "error", result.detail or result.status) # Persist Vaultwarden association/status on the Keycloak user so the portal can display it quickly. try: now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") admin_client().set_user_attribute(username, "vaultwarden_email", vaultwarden_email) admin_client().set_user_attribute(username, "vaultwarden_status", result.status) admin_client().set_user_attribute(username, "vaultwarden_synced_at", now_iso) except Exception: pass except Exception as exc: _upsert_task( conn, request_code, "vaultwarden_invite", "error", _safe_error_detail(exc, "failed to provision vaultwarden"), ) if _all_tasks_ok(conn, request_code, required_tasks): conn.execute( """ UPDATE access_requests SET status = 'awaiting_onboarding' WHERE request_code = %s AND status = 'accounts_building' """, (request_code,), ) return ProvisionResult(ok=True, status="awaiting_onboarding") return ProvisionResult(ok=False, status="accounts_building") finally: conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))