test(bstein-home): cover provisioning orchestration
This commit is contained in:
parent
46f5ac9482
commit
d9dcc54bce
433
backend/tests/test_provisioning.py
Normal file
433
backend/tests/test_provisioning.py
Normal file
@ -0,0 +1,433 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from atlas_portal import provisioning
|
||||||
|
|
||||||
|
|
||||||
|
class DummyResult:
|
||||||
|
def __init__(self, row: dict[str, Any] | None = None) -> None:
|
||||||
|
self.row = row
|
||||||
|
|
||||||
|
def fetchone(self) -> dict[str, Any] | None:
|
||||||
|
return self.row
|
||||||
|
|
||||||
|
|
||||||
|
class DummyConn:
|
||||||
|
def __init__(self, row: dict[str, Any] | None = None, *, locked: bool = True) -> None:
|
||||||
|
self.row = row
|
||||||
|
self.locked = locked
|
||||||
|
self.executed: list[tuple[str, object | None]] = []
|
||||||
|
|
||||||
|
def execute(self, query: str, params: object | None = None) -> DummyResult:
|
||||||
|
self.executed.append((query, params))
|
||||||
|
if "pg_try_advisory_lock" in query:
|
||||||
|
return DummyResult({"locked": self.locked})
|
||||||
|
if "FROM access_requests" in query and "SELECT username" in query:
|
||||||
|
return DummyResult(self.row)
|
||||||
|
return DummyResult()
|
||||||
|
|
||||||
|
|
||||||
|
class DummyAdmin:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
ready: bool = True,
|
||||||
|
user: dict[str, Any] | None = None,
|
||||||
|
full: dict[str, Any] | None = None,
|
||||||
|
group_id: str | None = "group-1",
|
||||||
|
email_user: dict[str, Any] | None = None,
|
||||||
|
) -> None:
|
||||||
|
self._ready = ready
|
||||||
|
self.user = user
|
||||||
|
self.full = full if full is not None else {"id": "user-1", "attributes": {}, "requiredActions": []}
|
||||||
|
self.group_id = group_id
|
||||||
|
self.email_user = email_user
|
||||||
|
self.created: list[dict[str, Any]] = []
|
||||||
|
self.updated: list[tuple[str, dict[str, Any]]] = []
|
||||||
|
self.attributes: list[tuple[str, str, str]] = []
|
||||||
|
self.passwords: list[tuple[str, str, bool]] = []
|
||||||
|
self.groups: list[tuple[str, str]] = []
|
||||||
|
|
||||||
|
def ready(self) -> bool:
|
||||||
|
return self._ready
|
||||||
|
|
||||||
|
def find_user(self, username: str) -> dict[str, Any] | None:
|
||||||
|
return self.user
|
||||||
|
|
||||||
|
def find_user_by_email(self, email: str) -> dict[str, Any] | None:
|
||||||
|
return self.email_user
|
||||||
|
|
||||||
|
def create_user(self, payload: dict[str, Any]) -> str:
|
||||||
|
self.created.append(payload)
|
||||||
|
self.user = {"id": "user-1"}
|
||||||
|
return "user-1"
|
||||||
|
|
||||||
|
def get_user(self, user_id: str) -> dict[str, Any]:
|
||||||
|
return self.full
|
||||||
|
|
||||||
|
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
|
||||||
|
self.updated.append((user_id, payload))
|
||||||
|
|
||||||
|
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
||||||
|
self.attributes.append((username, key, value))
|
||||||
|
|
||||||
|
def reset_password(self, user_id: str, password: str, *, temporary: bool) -> None:
|
||||||
|
self.passwords.append((user_id, password, temporary))
|
||||||
|
|
||||||
|
def get_group_id(self, group_name: str) -> str | None:
|
||||||
|
return self.group_id
|
||||||
|
|
||||||
|
def add_user_to_group(self, user_id: str, group_id: str) -> None:
|
||||||
|
self.groups.append((user_id, group_id))
|
||||||
|
|
||||||
|
|
||||||
|
class MailuClient:
|
||||||
|
def __init__(self, *, timeout: int) -> None:
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def post(self, url: str, json: dict[str, Any] | None = None):
|
||||||
|
return SimpleNamespace(status_code=200)
|
||||||
|
|
||||||
|
|
||||||
|
class FailingMailuClient(MailuClient):
|
||||||
|
def post(self, url: str, json: dict[str, Any] | None = None):
|
||||||
|
return SimpleNamespace(status_code=503)
|
||||||
|
|
||||||
|
|
||||||
|
class ExplodingMailuClient(MailuClient):
|
||||||
|
def post(self, url: str, json: dict[str, Any] | None = None):
|
||||||
|
raise RuntimeError("mailu offline")
|
||||||
|
|
||||||
|
|
||||||
|
def request_row(**overrides: Any) -> dict[str, Any]:
|
||||||
|
row = {
|
||||||
|
"username": "alice",
|
||||||
|
"contact_email": "alice@example.dev",
|
||||||
|
"email_verified_at": datetime.now(timezone.utc),
|
||||||
|
"status": "accounts_building",
|
||||||
|
"initial_password": None,
|
||||||
|
"initial_password_revealed_at": None,
|
||||||
|
"provision_attempted_at": None,
|
||||||
|
}
|
||||||
|
row.update(overrides)
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def install_common_patches(monkeypatch, conn: DummyConn, admin: DummyAdmin, *, all_ok: bool = True) -> None:
|
||||||
|
@contextmanager
|
||||||
|
def connect():
|
||||||
|
yield conn
|
||||||
|
|
||||||
|
monkeypatch.setattr(provisioning, "connect", connect)
|
||||||
|
monkeypatch.setattr(provisioning, "admin_client", lambda: admin)
|
||||||
|
monkeypatch.setattr(provisioning.settings, "MAILU_DOMAIN", "bstein.dev")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "DEFAULT_USER_GROUPS", ["dev"])
|
||||||
|
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC", 300)
|
||||||
|
monkeypatch.setattr(provisioning, "random_password", lambda length=16: f"pw-{length}")
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "ok"})
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "ok"})
|
||||||
|
monkeypatch.setattr(provisioning, "invite_user", lambda email: SimpleNamespace(ok=True, status="invited", detail=""))
|
||||||
|
monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, tasks: all_ok)
|
||||||
|
|
||||||
|
|
||||||
|
def task_statuses(conn: DummyConn) -> dict[str, str]:
|
||||||
|
statuses: dict[str, str] = {}
|
||||||
|
for query, params in conn.executed:
|
||||||
|
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 3:
|
||||||
|
statuses[str(params[1])] = str(params[2])
|
||||||
|
return statuses
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_preflight_lock_and_status_paths(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, task_list: task_list == ["x"])
|
||||||
|
assert provisioning.provision_tasks_complete(DummyConn(), "code") is False
|
||||||
|
|
||||||
|
assert provisioning.provision_access_request("").status == "unknown"
|
||||||
|
|
||||||
|
monkeypatch.setattr(provisioning, "admin_client", lambda: DummyAdmin(ready=False))
|
||||||
|
assert provisioning.provision_access_request("code").status == "accounts_building"
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(), locked=False)
|
||||||
|
install_common_patches(monkeypatch, conn, DummyAdmin())
|
||||||
|
assert provisioning.provision_access_request("code").status == "accounts_building"
|
||||||
|
|
||||||
|
conn = DummyConn(None)
|
||||||
|
install_common_patches(monkeypatch, conn, DummyAdmin())
|
||||||
|
assert provisioning.provision_access_request("code").status == "unknown"
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(status="denied"))
|
||||||
|
install_common_patches(monkeypatch, conn, DummyAdmin())
|
||||||
|
assert provisioning.provision_access_request("code").status == "denied"
|
||||||
|
|
||||||
|
recent = datetime.now(timezone.utc) - timedelta(seconds=30)
|
||||||
|
conn = DummyConn(request_row(provision_attempted_at=recent))
|
||||||
|
install_common_patches(monkeypatch, conn, DummyAdmin())
|
||||||
|
assert provisioning.provision_access_request("code").status == "accounts_building"
|
||||||
|
|
||||||
|
naive_recent = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(seconds=30)
|
||||||
|
conn = DummyConn(request_row(provision_attempted_at=naive_recent))
|
||||||
|
install_common_patches(monkeypatch, conn, DummyAdmin())
|
||||||
|
assert provisioning.provision_access_request("code").status == "accounts_building"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_happy_path_creates_user_and_downstream_accounts(monkeypatch) -> None:
|
||||||
|
conn = DummyConn(request_row(status="approved"))
|
||||||
|
admin = DummyAdmin(full={"id": "user-1", "attributes": {}, "requiredActions": ["CONFIGURE_TOTP"]})
|
||||||
|
install_common_patches(monkeypatch, conn, admin)
|
||||||
|
|
||||||
|
result = provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
|
||||||
|
assert result.ok is True
|
||||||
|
assert result.status == "awaiting_onboarding"
|
||||||
|
assert admin.created[0]["username"] == "alice"
|
||||||
|
assert admin.updated == [("user-1", {"requiredActions": []})]
|
||||||
|
assert admin.passwords == [("user-1", "pw-20", False)]
|
||||||
|
assert admin.groups == [("user-1", "group-1")]
|
||||||
|
assert statuses["keycloak_user"] == "ok"
|
||||||
|
assert statuses["keycloak_password"] == "ok"
|
||||||
|
assert statuses["keycloak_groups"] == "ok"
|
||||||
|
assert statuses["mailu_app_password"] == "ok"
|
||||||
|
assert statuses["mailu_sync"] == "ok"
|
||||||
|
assert statuses["nextcloud_mail_sync"] == "ok"
|
||||||
|
assert statuses["wger_account"] == "ok"
|
||||||
|
assert statuses["firefly_account"] == "ok"
|
||||||
|
assert statuses["vaultwarden_invite"] == "ok"
|
||||||
|
assert any("pg_advisory_unlock" in query for query, _ in conn.executed)
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_uses_existing_user_attributes_and_enabled_syncs(monkeypatch) -> None:
|
||||||
|
attrs = {
|
||||||
|
provisioning.MAILU_EMAIL_ATTR: ["custom@example.dev"],
|
||||||
|
provisioning.MAILU_ENABLED_ATTR: ["yes"],
|
||||||
|
provisioning.MAILU_APP_PASSWORD_ATTR: ["mail-pw"],
|
||||||
|
provisioning.WGER_PASSWORD_ATTR: ["wger-pw"],
|
||||||
|
provisioning.WGER_PASSWORD_UPDATED_ATTR: ["done"],
|
||||||
|
provisioning.FIREFLY_PASSWORD_ATTR: "firefly-pw",
|
||||||
|
provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: "done",
|
||||||
|
"vaultwarden_email": ["vault@example.dev"],
|
||||||
|
}
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
|
||||||
|
monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=MailuClient))
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync")
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: {"status": "ok"})
|
||||||
|
|
||||||
|
result = provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
|
||||||
|
assert result.ok is False
|
||||||
|
assert result.status == "accounts_building"
|
||||||
|
assert admin.created == []
|
||||||
|
assert admin.passwords == [("user-1", "existing-pw", False)]
|
||||||
|
assert statuses["mailu_sync"] == "ok"
|
||||||
|
assert statuses["nextcloud_mail_sync"] == "ok"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_existing_user_attribute_variants(monkeypatch) -> None:
|
||||||
|
attrs = {
|
||||||
|
provisioning.MAILU_EMAIL_ATTR: "custom@example.dev",
|
||||||
|
provisioning.MAILU_ENABLED_ATTR: "yes",
|
||||||
|
provisioning.MAILU_APP_PASSWORD_ATTR: "mail-pw",
|
||||||
|
provisioning.WGER_PASSWORD_ATTR: "wger-pw",
|
||||||
|
provisioning.WGER_PASSWORD_UPDATED_ATTR: "done",
|
||||||
|
provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"],
|
||||||
|
provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: ["done"],
|
||||||
|
"vaultwarden_email": "vault@example.dev",
|
||||||
|
}
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
|
||||||
|
assert ("alice", "vaultwarden_email", "vault@example.dev") in admin.attributes
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_keycloak_user_error_paths(monkeypatch) -> None:
|
||||||
|
conn = DummyConn(request_row(contact_email="alice@example.dev"))
|
||||||
|
admin = DummyAdmin(email_user={"username": "other"})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["keycloak_user"] == "error"
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(contact_email="alice@example.dev"))
|
||||||
|
admin = DummyAdmin(user={"username": "alice"})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["keycloak_user"] == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_attribute_and_sync_error_paths(monkeypatch) -> None:
|
||||||
|
class SelectiveFailAdmin(DummyAdmin):
|
||||||
|
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
||||||
|
if key == provisioning.MAILU_ENABLED_ATTR:
|
||||||
|
raise RuntimeError("mailu enabled write failed")
|
||||||
|
super().set_user_attribute(username, key, value)
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = SelectiveFailAdmin(
|
||||||
|
user={"id": "user-1"},
|
||||||
|
full={"id": "user-1", "attributes": {provisioning.MAILU_ENABLED_ATTR: "no"}, "requiredActions": []},
|
||||||
|
)
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["mailu_app_password"] == "ok"
|
||||||
|
|
||||||
|
class GetUserFailsAdmin(DummyAdmin):
|
||||||
|
def get_user(self, user_id: str) -> dict[str, Any]:
|
||||||
|
raise RuntimeError("keycloak read failed")
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = GetUserFailsAdmin(user={"id": "user-1"})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
assert statuses["mailu_app_password"] == "error"
|
||||||
|
assert statuses["wger_account"] == "error"
|
||||||
|
|
||||||
|
for client_cls in (FailingMailuClient, ExplodingMailuClient):
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
|
||||||
|
monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=client_cls))
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["mailu_sync"] == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_nextcloud_and_password_edge_paths(monkeypatch) -> None:
|
||||||
|
conn = DummyConn(request_row(initial_password=None, initial_password_revealed_at=datetime.now(timezone.utc)))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["keycloak_password"] == "ok"
|
||||||
|
|
||||||
|
for sync_result in ({"status": "failed"}, RuntimeError("nextcloud failed")):
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud")
|
||||||
|
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync")
|
||||||
|
if isinstance(sync_result, Exception):
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(sync_result))
|
||||||
|
else:
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: sync_result)
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["nextcloud_mail_sync"] == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_records_task_errors_without_throwing(monkeypatch) -> None:
|
||||||
|
conn = DummyConn(request_row(contact_email=""))
|
||||||
|
admin = DummyAdmin(group_id=None)
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
monkeypatch.setattr(
|
||||||
|
provisioning,
|
||||||
|
"invite_user",
|
||||||
|
lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
|
||||||
|
assert result.ok is False
|
||||||
|
assert statuses["keycloak_user"] == "error"
|
||||||
|
assert "keycloak_password" not in statuses
|
||||||
|
|
||||||
|
existing = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}, group_id=None)
|
||||||
|
conn = DummyConn(request_row(initial_password=None))
|
||||||
|
install_common_patches(monkeypatch, conn, existing, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
monkeypatch.setattr(
|
||||||
|
provisioning,
|
||||||
|
"invite_user",
|
||||||
|
lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"),
|
||||||
|
)
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
assert statuses["keycloak_groups"] == "error"
|
||||||
|
assert statuses["wger_account"] == "error"
|
||||||
|
assert statuses["firefly_account"] == "error"
|
||||||
|
assert statuses["vaultwarden_invite"] == "error"
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
existing = DummyAdmin(
|
||||||
|
user={"id": "user-1"},
|
||||||
|
full={
|
||||||
|
"id": "user-1",
|
||||||
|
"attributes": {
|
||||||
|
provisioning.WGER_PASSWORD_ATTR: "wger-pw",
|
||||||
|
provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"],
|
||||||
|
},
|
||||||
|
"requiredActions": [],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
install_common_patches(monkeypatch, conn, existing, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
statuses = task_statuses(conn)
|
||||||
|
assert statuses["wger_account"] == "error"
|
||||||
|
assert statuses["firefly_account"] == "error"
|
||||||
|
|
||||||
|
|
||||||
|
def test_provision_falls_back_for_vaultwarden_invite(monkeypatch) -> None:
|
||||||
|
conn = DummyConn(request_row(contact_email="fallback@example.dev"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin)
|
||||||
|
invited: list[str] = []
|
||||||
|
|
||||||
|
def fake_invite(email: str):
|
||||||
|
invited.append(email)
|
||||||
|
if email == "alice@bstein.dev":
|
||||||
|
return SimpleNamespace(ok=False, status="error", detail="primary failed")
|
||||||
|
return SimpleNamespace(ok=True, status="fallback_invited", detail="")
|
||||||
|
|
||||||
|
monkeypatch.setattr(provisioning, "invite_user", fake_invite)
|
||||||
|
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
|
||||||
|
assert invited == ["alice@bstein.dev", "fallback@example.dev"]
|
||||||
|
assert ("alice", "vaultwarden_email", "fallback@example.dev") in admin.attributes
|
||||||
|
|
||||||
|
class VaultAttrFailAdmin(DummyAdmin):
|
||||||
|
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
||||||
|
if key.startswith("vaultwarden_"):
|
||||||
|
raise RuntimeError("vault attr failed")
|
||||||
|
super().set_user_attribute(username, key, value)
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = VaultAttrFailAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin)
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["vaultwarden_invite"] == "ok"
|
||||||
|
|
||||||
|
conn = DummyConn(request_row(initial_password="existing-pw"))
|
||||||
|
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
|
||||||
|
install_common_patches(monkeypatch, conn, admin, all_ok=False)
|
||||||
|
monkeypatch.setattr(provisioning, "invite_user", lambda email: (_ for _ in ()).throw(RuntimeError("vault down")))
|
||||||
|
provisioning.provision_access_request("code")
|
||||||
|
assert task_statuses(conn)["vaultwarden_invite"] == "error"
|
||||||
Loading…
x
Reference in New Issue
Block a user