diff --git a/backend/tests/test_provisioning.py b/backend/tests/test_provisioning.py new file mode 100644 index 0000000..78609ae --- /dev/null +++ b/backend/tests/test_provisioning.py @@ -0,0 +1,433 @@ +from __future__ import annotations + +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace +from typing import Any + +from atlas_portal import provisioning + + +class DummyResult: + def __init__(self, row: dict[str, Any] | None = None) -> None: + self.row = row + + def fetchone(self) -> dict[str, Any] | None: + return self.row + + +class DummyConn: + def __init__(self, row: dict[str, Any] | None = None, *, locked: bool = True) -> None: + self.row = row + self.locked = locked + self.executed: list[tuple[str, object | None]] = [] + + def execute(self, query: str, params: object | None = None) -> DummyResult: + self.executed.append((query, params)) + if "pg_try_advisory_lock" in query: + return DummyResult({"locked": self.locked}) + if "FROM access_requests" in query and "SELECT username" in query: + return DummyResult(self.row) + return DummyResult() + + +class DummyAdmin: + def __init__( + self, + *, + ready: bool = True, + user: dict[str, Any] | None = None, + full: dict[str, Any] | None = None, + group_id: str | None = "group-1", + email_user: dict[str, Any] | None = None, + ) -> None: + self._ready = ready + self.user = user + self.full = full if full is not None else {"id": "user-1", "attributes": {}, "requiredActions": []} + self.group_id = group_id + self.email_user = email_user + self.created: list[dict[str, Any]] = [] + self.updated: list[tuple[str, dict[str, Any]]] = [] + self.attributes: list[tuple[str, str, str]] = [] + self.passwords: list[tuple[str, str, bool]] = [] + self.groups: list[tuple[str, str]] = [] + + def ready(self) -> bool: + return self._ready + + def find_user(self, username: str) -> dict[str, Any] | None: + return self.user + + def find_user_by_email(self, email: str) -> dict[str, Any] | None: + return self.email_user + + def create_user(self, payload: dict[str, Any]) -> str: + self.created.append(payload) + self.user = {"id": "user-1"} + return "user-1" + + def get_user(self, user_id: str) -> dict[str, Any]: + return self.full + + def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None: + self.updated.append((user_id, payload)) + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + self.attributes.append((username, key, value)) + + def reset_password(self, user_id: str, password: str, *, temporary: bool) -> None: + self.passwords.append((user_id, password, temporary)) + + def get_group_id(self, group_name: str) -> str | None: + return self.group_id + + def add_user_to_group(self, user_id: str, group_id: str) -> None: + self.groups.append((user_id, group_id)) + + +class MailuClient: + def __init__(self, *, timeout: int) -> None: + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def post(self, url: str, json: dict[str, Any] | None = None): + return SimpleNamespace(status_code=200) + + +class FailingMailuClient(MailuClient): + def post(self, url: str, json: dict[str, Any] | None = None): + return SimpleNamespace(status_code=503) + + +class ExplodingMailuClient(MailuClient): + def post(self, url: str, json: dict[str, Any] | None = None): + raise RuntimeError("mailu offline") + + +def request_row(**overrides: Any) -> dict[str, Any]: + row = { + "username": "alice", + "contact_email": "alice@example.dev", + "email_verified_at": datetime.now(timezone.utc), + "status": "accounts_building", + "initial_password": None, + "initial_password_revealed_at": None, + "provision_attempted_at": None, + } + row.update(overrides) + return row + + +def install_common_patches(monkeypatch, conn: DummyConn, admin: DummyAdmin, *, all_ok: bool = True) -> None: + @contextmanager + def connect(): + yield conn + + monkeypatch.setattr(provisioning, "connect", connect) + monkeypatch.setattr(provisioning, "admin_client", lambda: admin) + monkeypatch.setattr(provisioning.settings, "MAILU_DOMAIN", "bstein.dev") + monkeypatch.setattr(provisioning.settings, "DEFAULT_USER_GROUPS", ["dev"]) + monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "") + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "") + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "") + monkeypatch.setattr(provisioning.settings, "ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC", 300) + monkeypatch.setattr(provisioning, "random_password", lambda length=16: f"pw-{length}") + monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "ok"}) + monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "ok"}) + monkeypatch.setattr(provisioning, "invite_user", lambda email: SimpleNamespace(ok=True, status="invited", detail="")) + monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, tasks: all_ok) + + +def task_statuses(conn: DummyConn) -> dict[str, str]: + statuses: dict[str, str] = {} + for query, params in conn.executed: + if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 3: + statuses[str(params[1])] = str(params[2]) + return statuses + + +def test_provision_preflight_lock_and_status_paths(monkeypatch) -> None: + monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, task_list: task_list == ["x"]) + assert provisioning.provision_tasks_complete(DummyConn(), "code") is False + + assert provisioning.provision_access_request("").status == "unknown" + + monkeypatch.setattr(provisioning, "admin_client", lambda: DummyAdmin(ready=False)) + assert provisioning.provision_access_request("code").status == "accounts_building" + + conn = DummyConn(request_row(), locked=False) + install_common_patches(monkeypatch, conn, DummyAdmin()) + assert provisioning.provision_access_request("code").status == "accounts_building" + + conn = DummyConn(None) + install_common_patches(monkeypatch, conn, DummyAdmin()) + assert provisioning.provision_access_request("code").status == "unknown" + + conn = DummyConn(request_row(status="denied")) + install_common_patches(monkeypatch, conn, DummyAdmin()) + assert provisioning.provision_access_request("code").status == "denied" + + recent = datetime.now(timezone.utc) - timedelta(seconds=30) + conn = DummyConn(request_row(provision_attempted_at=recent)) + install_common_patches(monkeypatch, conn, DummyAdmin()) + assert provisioning.provision_access_request("code").status == "accounts_building" + + naive_recent = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(seconds=30) + conn = DummyConn(request_row(provision_attempted_at=naive_recent)) + install_common_patches(monkeypatch, conn, DummyAdmin()) + assert provisioning.provision_access_request("code").status == "accounts_building" + + +def test_provision_happy_path_creates_user_and_downstream_accounts(monkeypatch) -> None: + conn = DummyConn(request_row(status="approved")) + admin = DummyAdmin(full={"id": "user-1", "attributes": {}, "requiredActions": ["CONFIGURE_TOTP"]}) + install_common_patches(monkeypatch, conn, admin) + + result = provisioning.provision_access_request("code") + statuses = task_statuses(conn) + + assert result.ok is True + assert result.status == "awaiting_onboarding" + assert admin.created[0]["username"] == "alice" + assert admin.updated == [("user-1", {"requiredActions": []})] + assert admin.passwords == [("user-1", "pw-20", False)] + assert admin.groups == [("user-1", "group-1")] + assert statuses["keycloak_user"] == "ok" + assert statuses["keycloak_password"] == "ok" + assert statuses["keycloak_groups"] == "ok" + assert statuses["mailu_app_password"] == "ok" + assert statuses["mailu_sync"] == "ok" + assert statuses["nextcloud_mail_sync"] == "ok" + assert statuses["wger_account"] == "ok" + assert statuses["firefly_account"] == "ok" + assert statuses["vaultwarden_invite"] == "ok" + assert any("pg_advisory_unlock" in query for query, _ in conn.executed) + + +def test_provision_uses_existing_user_attributes_and_enabled_syncs(monkeypatch) -> None: + attrs = { + provisioning.MAILU_EMAIL_ATTR: ["custom@example.dev"], + provisioning.MAILU_ENABLED_ATTR: ["yes"], + provisioning.MAILU_APP_PASSWORD_ATTR: ["mail-pw"], + provisioning.WGER_PASSWORD_ATTR: ["wger-pw"], + provisioning.WGER_PASSWORD_UPDATED_ATTR: ["done"], + provisioning.FIREFLY_PASSWORD_ATTR: "firefly-pw", + provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: "done", + "vaultwarden_email": ["vault@example.dev"], + } + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev") + monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=MailuClient)) + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud") + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync") + monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: {"status": "ok"}) + + result = provisioning.provision_access_request("code") + statuses = task_statuses(conn) + + assert result.ok is False + assert result.status == "accounts_building" + assert admin.created == [] + assert admin.passwords == [("user-1", "existing-pw", False)] + assert statuses["mailu_sync"] == "ok" + assert statuses["nextcloud_mail_sync"] == "ok" + + +def test_provision_existing_user_attribute_variants(monkeypatch) -> None: + attrs = { + provisioning.MAILU_EMAIL_ATTR: "custom@example.dev", + provisioning.MAILU_ENABLED_ATTR: "yes", + provisioning.MAILU_APP_PASSWORD_ATTR: "mail-pw", + provisioning.WGER_PASSWORD_ATTR: "wger-pw", + provisioning.WGER_PASSWORD_UPDATED_ATTR: "done", + provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"], + provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: ["done"], + "vaultwarden_email": "vault@example.dev", + } + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin) + + provisioning.provision_access_request("code") + + assert ("alice", "vaultwarden_email", "vault@example.dev") in admin.attributes + + +def test_provision_keycloak_user_error_paths(monkeypatch) -> None: + conn = DummyConn(request_row(contact_email="alice@example.dev")) + admin = DummyAdmin(email_user={"username": "other"}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + + provisioning.provision_access_request("code") + assert task_statuses(conn)["keycloak_user"] == "error" + + conn = DummyConn(request_row(contact_email="alice@example.dev")) + admin = DummyAdmin(user={"username": "alice"}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + + provisioning.provision_access_request("code") + assert task_statuses(conn)["keycloak_user"] == "error" + + +def test_provision_attribute_and_sync_error_paths(monkeypatch) -> None: + class SelectiveFailAdmin(DummyAdmin): + def set_user_attribute(self, username: str, key: str, value: str) -> None: + if key == provisioning.MAILU_ENABLED_ATTR: + raise RuntimeError("mailu enabled write failed") + super().set_user_attribute(username, key, value) + + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = SelectiveFailAdmin( + user={"id": "user-1"}, + full={"id": "user-1", "attributes": {provisioning.MAILU_ENABLED_ATTR: "no"}, "requiredActions": []}, + ) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + + provisioning.provision_access_request("code") + assert task_statuses(conn)["mailu_app_password"] == "ok" + + class GetUserFailsAdmin(DummyAdmin): + def get_user(self, user_id: str) -> dict[str, Any]: + raise RuntimeError("keycloak read failed") + + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = GetUserFailsAdmin(user={"id": "user-1"}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + provisioning.provision_access_request("code") + statuses = task_statuses(conn) + assert statuses["mailu_app_password"] == "error" + assert statuses["wger_account"] == "error" + + for client_cls in (FailingMailuClient, ExplodingMailuClient): + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev") + monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=client_cls)) + provisioning.provision_access_request("code") + assert task_statuses(conn)["mailu_sync"] == "error" + + +def test_provision_nextcloud_and_password_edge_paths(monkeypatch) -> None: + conn = DummyConn(request_row(initial_password=None, initial_password_revealed_at=datetime.now(timezone.utc))) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + + provisioning.provision_access_request("code") + assert task_statuses(conn)["keycloak_password"] == "ok" + + for sync_result in ({"status": "failed"}, RuntimeError("nextcloud failed")): + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud") + monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync") + if isinstance(sync_result, Exception): + monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(sync_result)) + else: + monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: sync_result) + provisioning.provision_access_request("code") + assert task_statuses(conn)["nextcloud_mail_sync"] == "error" + + +def test_provision_records_task_errors_without_throwing(monkeypatch) -> None: + conn = DummyConn(request_row(contact_email="")) + admin = DummyAdmin(group_id=None) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) + monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) + monkeypatch.setattr( + provisioning, + "invite_user", + lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"), + ) + + result = provisioning.provision_access_request("code") + statuses = task_statuses(conn) + + assert result.ok is False + assert statuses["keycloak_user"] == "error" + assert "keycloak_password" not in statuses + + existing = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}, group_id=None) + conn = DummyConn(request_row(initial_password=None)) + install_common_patches(monkeypatch, conn, existing, all_ok=False) + monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) + monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) + monkeypatch.setattr( + provisioning, + "invite_user", + lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"), + ) + provisioning.provision_access_request("code") + statuses = task_statuses(conn) + assert statuses["keycloak_groups"] == "error" + assert statuses["wger_account"] == "error" + assert statuses["firefly_account"] == "error" + assert statuses["vaultwarden_invite"] == "error" + + conn = DummyConn(request_row(initial_password="existing-pw")) + existing = DummyAdmin( + user={"id": "user-1"}, + full={ + "id": "user-1", + "attributes": { + provisioning.WGER_PASSWORD_ATTR: "wger-pw", + provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"], + }, + "requiredActions": [], + }, + ) + install_common_patches(monkeypatch, conn, existing, all_ok=False) + monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"}) + monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"}) + provisioning.provision_access_request("code") + statuses = task_statuses(conn) + assert statuses["wger_account"] == "error" + assert statuses["firefly_account"] == "error" + + +def test_provision_falls_back_for_vaultwarden_invite(monkeypatch) -> None: + conn = DummyConn(request_row(contact_email="fallback@example.dev")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin) + invited: list[str] = [] + + def fake_invite(email: str): + invited.append(email) + if email == "alice@bstein.dev": + return SimpleNamespace(ok=False, status="error", detail="primary failed") + return SimpleNamespace(ok=True, status="fallback_invited", detail="") + + monkeypatch.setattr(provisioning, "invite_user", fake_invite) + + provisioning.provision_access_request("code") + + assert invited == ["alice@bstein.dev", "fallback@example.dev"] + assert ("alice", "vaultwarden_email", "fallback@example.dev") in admin.attributes + + class VaultAttrFailAdmin(DummyAdmin): + def set_user_attribute(self, username: str, key: str, value: str) -> None: + if key.startswith("vaultwarden_"): + raise RuntimeError("vault attr failed") + super().set_user_attribute(username, key, value) + + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = VaultAttrFailAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin) + provisioning.provision_access_request("code") + assert task_statuses(conn)["vaultwarden_invite"] == "ok" + + conn = DummyConn(request_row(initial_password="existing-pw")) + admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}) + install_common_patches(monkeypatch, conn, admin, all_ok=False) + monkeypatch.setattr(provisioning, "invite_user", lambda email: (_ for _ in ()).throw(RuntimeError("vault down"))) + provisioning.provision_access_request("code") + assert task_statuses(conn)["vaultwarden_invite"] == "error"