from __future__ import annotations from contextlib import contextmanager from datetime import datetime, timedelta, timezone from types import SimpleNamespace from typing import Any from atlas_portal import provisioning class DummyResult: def __init__(self, row: dict[str, Any] | None = None) -> None: self.row = row def fetchone(self) -> dict[str, Any] | None: return self.row class DummyConn: def __init__(self, row: dict[str, Any] | None = None, *, locked: bool = True) -> None: self.row = row self.locked = locked self.executed: list[tuple[str, object | None]] = [] def execute(self, query: str, params: object | None = None) -> DummyResult: self.executed.append((query, params)) if "pg_try_advisory_lock" in query: return DummyResult({"locked": self.locked}) if "FROM access_requests" in query and "SELECT username" in query: return DummyResult(self.row) return DummyResult() class DummyAdmin: def __init__( self, *, ready: bool = True, user: dict[str, Any] | None = None, full: dict[str, Any] | None = None, group_id: str | None = "group-1", email_user: dict[str, Any] | None = None, ) -> None: self._ready = ready self.user = user self.full = full if full is not None else {"id": "user-1", "attributes": {}, "requiredActions": []} self.group_id = group_id self.email_user = email_user self.created: list[dict[str, Any]] = [] self.updated: list[tuple[str, dict[str, Any]]] = [] self.attributes: list[tuple[str, str, str]] = [] self.passwords: list[tuple[str, str, bool]] = [] self.groups: list[tuple[str, str]] = [] def ready(self) -> bool: return self._ready def find_user(self, username: str) -> dict[str, Any] | None: return self.user def find_user_by_email(self, email: str) -> dict[str, Any] | None: return self.email_user def create_user(self, payload: dict[str, Any]) -> str: self.created.append(payload) self.user = {"id": "user-1"} return "user-1" def get_user(self, user_id: str) -> dict[str, Any]: return self.full def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None: self.updated.append((user_id, payload)) def set_user_attribute(self, username: str, key: str, value: str) -> None: self.attributes.append((username, key, value)) def reset_password(self, user_id: str, password: str, *, temporary: bool) -> None: self.passwords.append((user_id, password, temporary)) def get_group_id(self, group_name: str) -> str | None: return self.group_id def add_user_to_group(self, user_id: str, group_id: str) -> None: self.groups.append((user_id, group_id)) class MailuClient: def __init__(self, *, timeout: int) -> None: self.timeout = timeout def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def post(self, url: str, json: dict[str, Any] | None = None): return SimpleNamespace(status_code=200) class FailingMailuClient(MailuClient): def post(self, url: str, json: dict[str, Any] | None = None): return SimpleNamespace(status_code=503) class ExplodingMailuClient(MailuClient): def post(self, url: str, json: dict[str, Any] | None = None): raise RuntimeError("mailu offline") def request_row(**overrides: Any) -> dict[str, Any]: row = { "username": "alice", "contact_email": "alice@example.dev", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": None, "initial_password_revealed_at": None, "provision_attempted_at": None, } row.update(overrides) return row def install_common_patches(monkeypatch, conn: DummyConn, admin: DummyAdmin, *, all_ok: bool = True) -> None: @contextmanager def connect(): yield conn monkeypatch.setattr(provisioning, "connect", connect) monkeypatch.setattr(provisioning, "admin_client", lambda: admin) monkeypatch.setattr(provisioning.settings, "MAILU_DOMAIN", "bstein.dev") monkeypatch.setattr(provisioning.settings, "DEFAULT_USER_GROUPS", ["dev"]) monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "") monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "") monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "") monkeypatch.setattr(provisioning.settings, "ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC", 300) monkeypatch.setattr(provisioning, "random_password", lambda length=16: f"pw-{length}") monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(provisioning, "invite_user", lambda email: SimpleNamespace(ok=True, status="invited", detail="")) monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, tasks: all_ok) def task_statuses(conn: DummyConn) -> dict[str, str]: statuses: dict[str, str] = {} for query, params in conn.executed: if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 3: statuses[str(params[1])] = str(params[2]) return statuses def test_provision_preflight_lock_and_status_paths(monkeypatch) -> None: monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, task_list: task_list == ["x"]) assert provisioning.provision_tasks_complete(DummyConn(), "code") is False assert provisioning.provision_access_request("").status == "unknown" monkeypatch.setattr(provisioning, "admin_client", lambda: DummyAdmin(ready=False)) assert provisioning.provision_access_request("code").status == "accounts_building" conn = DummyConn(request_row(), locked=False) install_common_patches(monkeypatch, conn, DummyAdmin()) assert provisioning.provision_access_request("code").status == "accounts_building" conn = DummyConn(None) install_common_patches(monkeypatch, conn, DummyAdmin()) assert provisioning.provision_access_request("code").status == "unknown" conn = DummyConn(request_row(status="denied")) install_common_patches(monkeypatch, conn, DummyAdmin()) assert provisioning.provision_access_request("code").status == "denied" recent = datetime.now(timezone.utc) - timedelta(seconds=30) conn = DummyConn(request_row(provision_attempted_at=recent)) install_common_patches(monkeypatch, conn, DummyAdmin()) assert provisioning.provision_access_request("code").status == "accounts_building" naive_recent = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(seconds=30) conn = DummyConn(request_row(provision_attempted_at=naive_recent)) install_common_patches(monkeypatch, conn, DummyAdmin()) assert provisioning.provision_access_request("code").status == "accounts_building" def test_provision_happy_path_creates_user_and_downstream_accounts(monkeypatch) -> None: conn = DummyConn(request_row(status="approved")) admin = DummyAdmin(full={"id": "user-1", "attributes": {}, "requiredActions": ["CONFIGURE_TOTP"]}) install_common_patches(monkeypatch, conn, admin) result = provisioning.provision_access_request("code") statuses = task_statuses(conn) assert result.ok is True assert result.status == "awaiting_onboarding" assert admin.created[0]["username"] == "alice" assert admin.updated == [("user-1", {"requiredActions": []})] assert admin.passwords == [("user-1", "pw-20", False)] assert admin.groups == [("user-1", "group-1")] assert statuses["keycloak_user"] == "ok" assert statuses["keycloak_password"] == "ok" assert statuses["keycloak_groups"] == "ok" assert statuses["mailu_app_password"] == "ok" assert statuses["mailu_sync"] == "ok" assert statuses["nextcloud_mail_sync"] == "ok" assert statuses["wger_account"] == "ok" assert statuses["firefly_account"] == "ok" assert statuses["vaultwarden_invite"] == "ok" assert any("pg_advisory_unlock" in query for query, _ in conn.executed) def test_provision_uses_existing_user_attributes_and_enabled_syncs(monkeypatch) -> None: attrs = { provisioning.MAILU_EMAIL_ATTR: ["custom@example.dev"], provisioning.MAILU_ENABLED_ATTR: ["yes"], provisioning.MAILU_APP_PASSWORD_ATTR: ["mail-pw"], provisioning.WGER_PASSWORD_ATTR: ["wger-pw"], provisioning.WGER_PASSWORD_UPDATED_ATTR: ["done"], provisioning.FIREFLY_PASSWORD_ATTR: "firefly-pw", provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: "done", "vaultwarden_email": ["vault@example.dev"], } conn = DummyConn(request_row(initial_password="existing-pw")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin, all_ok=False) monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev") monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=MailuClient)) monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud") monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync") monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: {"status": "ok"}) result = provisioning.provision_access_request("code") statuses = task_statuses(conn) assert result.ok is False assert result.status == "accounts_building" assert admin.created == [] assert admin.passwords == [("user-1", "existing-pw", False)] assert statuses["mailu_sync"] == "ok" assert statuses["nextcloud_mail_sync"] == "ok" def test_provision_existing_user_attribute_variants(monkeypatch) -> None: attrs = { provisioning.MAILU_EMAIL_ATTR: "custom@example.dev", provisioning.MAILU_ENABLED_ATTR: "yes", provisioning.MAILU_APP_PASSWORD_ATTR: "mail-pw", provisioning.WGER_PASSWORD_ATTR: "wger-pw", provisioning.WGER_PASSWORD_UPDATED_ATTR: "done", provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"], provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: ["done"], "vaultwarden_email": "vault@example.dev", } conn = DummyConn(request_row(initial_password="existing-pw")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin) provisioning.provision_access_request("code") assert ("alice", "vaultwarden_email", "vault@example.dev") in admin.attributes def test_provision_keycloak_user_error_paths(monkeypatch) -> None: conn = DummyConn(request_row(contact_email="alice@example.dev")) admin = DummyAdmin(email_user={"username": "other"}) install_common_patches(monkeypatch, conn, admin, all_ok=False) provisioning.provision_access_request("code") assert task_statuses(conn)["keycloak_user"] == "error" conn = DummyConn(request_row(contact_email="alice@example.dev")) admin = DummyAdmin(user={"username": "alice"}) install_common_patches(monkeypatch, conn, admin, all_ok=False) provisioning.provision_access_request("code") assert task_statuses(conn)["keycloak_user"] == "error" def test_provision_attribute_and_sync_error_paths(monkeypatch) -> None: class SelectiveFailAdmin(DummyAdmin): def set_user_attribute(self, username: str, key: str, value: str) -> None: if key == provisioning.MAILU_ENABLED_ATTR: raise RuntimeError("mailu enabled write failed") super().set_user_attribute(username, key, value) conn = DummyConn(request_row(initial_password="existing-pw")) admin = SelectiveFailAdmin( user={"id": "user-1"}, full={"id": "user-1", "attributes": {provisioning.MAILU_ENABLED_ATTR: "no"}, "requiredActions": []}, ) install_common_patches(monkeypatch, conn, admin, all_ok=False) provisioning.provision_access_request("code") assert task_statuses(conn)["mailu_app_password"] == "ok" class GetUserFailsAdmin(DummyAdmin): def get_user(self, user_id: str) -> dict[str, Any]: raise RuntimeError("keycloak read failed") conn = DummyConn(request_row(initial_password="existing-pw")) admin = GetUserFailsAdmin(user={"id": "user-1"}) install_common_patches(monkeypatch, conn, admin, all_ok=False) provisioning.provision_access_request("code") statuses = task_statuses(conn) assert statuses["mailu_app_password"] == "error" assert statuses["wger_account"] == "error" for client_cls in (FailingMailuClient, ExplodingMailuClient): conn = DummyConn(request_row(initial_password="existing-pw")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin, all_ok=False) monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev") monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=client_cls)) provisioning.provision_access_request("code") assert task_statuses(conn)["mailu_sync"] == "error" def test_provision_nextcloud_and_password_edge_paths(monkeypatch) -> None: conn = DummyConn(request_row(initial_password=None, initial_password_revealed_at=datetime.now(timezone.utc))) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin, all_ok=False) provisioning.provision_access_request("code") assert task_statuses(conn)["keycloak_password"] == "ok" for sync_result in ({"status": "failed"}, RuntimeError("nextcloud failed")): conn = DummyConn(request_row(initial_password="existing-pw")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin, all_ok=False) monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud") monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync") if isinstance(sync_result, Exception): monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(sync_result)) else: monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: sync_result) provisioning.provision_access_request("code") assert task_statuses(conn)["nextcloud_mail_sync"] == "error" def test_provision_records_task_errors_without_throwing(monkeypatch) -> None: conn = DummyConn(request_row(contact_email="")) admin = DummyAdmin(group_id=None) install_common_patches(monkeypatch, conn, admin, all_ok=False) monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) monkeypatch.setattr( provisioning, "invite_user", lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"), ) result = provisioning.provision_access_request("code") statuses = task_statuses(conn) assert result.ok is False assert statuses["keycloak_user"] == "error" assert "keycloak_password" not in statuses existing = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}, group_id=None) conn = DummyConn(request_row(initial_password=None)) install_common_patches(monkeypatch, conn, existing, all_ok=False) monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) monkeypatch.setattr( provisioning, "invite_user", lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"), ) provisioning.provision_access_request("code") statuses = task_statuses(conn) assert statuses["keycloak_groups"] == "error" assert statuses["wger_account"] == "error" assert statuses["firefly_account"] == "error" assert statuses["vaultwarden_invite"] == "error" conn = DummyConn(request_row(initial_password="existing-pw")) existing = DummyAdmin( user={"id": "user-1"}, full={ "id": "user-1", "attributes": { provisioning.WGER_PASSWORD_ATTR: "wger-pw", provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"], }, "requiredActions": [], }, ) install_common_patches(monkeypatch, conn, existing, all_ok=False) monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) provisioning.provision_access_request("code") statuses = task_statuses(conn) assert statuses["wger_account"] == "error" assert statuses["firefly_account"] == "error" def test_provision_falls_back_for_vaultwarden_invite(monkeypatch) -> None: conn = DummyConn(request_row(contact_email="fallback@example.dev")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin) invited: list[str] = [] def fake_invite(email: str): invited.append(email) if email == "alice@bstein.dev": return SimpleNamespace(ok=False, status="error", detail="primary failed") return SimpleNamespace(ok=True, status="fallback_invited", detail="") monkeypatch.setattr(provisioning, "invite_user", fake_invite) provisioning.provision_access_request("code") assert invited == ["alice@bstein.dev", "fallback@example.dev"] assert ("alice", "vaultwarden_email", "fallback@example.dev") in admin.attributes class VaultAttrFailAdmin(DummyAdmin): def set_user_attribute(self, username: str, key: str, value: str) -> None: if key.startswith("vaultwarden_"): raise RuntimeError("vault attr failed") super().set_user_attribute(username, key, value) conn = DummyConn(request_row(initial_password="existing-pw")) admin = VaultAttrFailAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin) provisioning.provision_access_request("code") assert task_statuses(conn)["vaultwarden_invite"] == "ok" conn = DummyConn(request_row(initial_password="existing-pw")) admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) install_common_patches(monkeypatch, conn, admin, all_ok=False) monkeypatch.setattr(provisioning, "invite_user", lambda email: (_ for _ in ()).throw(RuntimeError("vault down"))) provisioning.provision_access_request("code") assert task_statuses(conn)["vaultwarden_invite"] == "error"