bstein-dev-home/backend/tests/test_provisioning.py

434 lines
19 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Any
from atlas_portal import provisioning
class DummyResult:
def __init__(self, row: dict[str, Any] | None = None) -> None:
self.row = row
def fetchone(self) -> dict[str, Any] | None:
return self.row
class DummyConn:
def __init__(self, row: dict[str, Any] | None = None, *, locked: bool = True) -> None:
self.row = row
self.locked = locked
self.executed: list[tuple[str, object | None]] = []
def execute(self, query: str, params: object | None = None) -> DummyResult:
self.executed.append((query, params))
if "pg_try_advisory_lock" in query:
return DummyResult({"locked": self.locked})
if "FROM access_requests" in query and "SELECT username" in query:
return DummyResult(self.row)
return DummyResult()
class DummyAdmin:
def __init__(
self,
*,
ready: bool = True,
user: dict[str, Any] | None = None,
full: dict[str, Any] | None = None,
group_id: str | None = "group-1",
email_user: dict[str, Any] | None = None,
) -> None:
self._ready = ready
self.user = user
self.full = full if full is not None else {"id": "user-1", "attributes": {}, "requiredActions": []}
self.group_id = group_id
self.email_user = email_user
self.created: list[dict[str, Any]] = []
self.updated: list[tuple[str, dict[str, Any]]] = []
self.attributes: list[tuple[str, str, str]] = []
self.passwords: list[tuple[str, str, bool]] = []
self.groups: list[tuple[str, str]] = []
def ready(self) -> bool:
return self._ready
def find_user(self, username: str) -> dict[str, Any] | None:
return self.user
def find_user_by_email(self, email: str) -> dict[str, Any] | None:
return self.email_user
def create_user(self, payload: dict[str, Any]) -> str:
self.created.append(payload)
self.user = {"id": "user-1"}
return "user-1"
def get_user(self, user_id: str) -> dict[str, Any]:
return self.full
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
self.updated.append((user_id, payload))
def set_user_attribute(self, username: str, key: str, value: str) -> None:
self.attributes.append((username, key, value))
def reset_password(self, user_id: str, password: str, *, temporary: bool) -> None:
self.passwords.append((user_id, password, temporary))
def get_group_id(self, group_name: str) -> str | None:
return self.group_id
def add_user_to_group(self, user_id: str, group_id: str) -> None:
self.groups.append((user_id, group_id))
class MailuClient:
def __init__(self, *, timeout: int) -> None:
self.timeout = timeout
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def post(self, url: str, json: dict[str, Any] | None = None):
return SimpleNamespace(status_code=200)
class FailingMailuClient(MailuClient):
def post(self, url: str, json: dict[str, Any] | None = None):
return SimpleNamespace(status_code=503)
class ExplodingMailuClient(MailuClient):
def post(self, url: str, json: dict[str, Any] | None = None):
raise RuntimeError("mailu offline")
def request_row(**overrides: Any) -> dict[str, Any]:
row = {
"username": "alice",
"contact_email": "alice@example.dev",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
}
row.update(overrides)
return row
def install_common_patches(monkeypatch, conn: DummyConn, admin: DummyAdmin, *, all_ok: bool = True) -> None:
@contextmanager
def connect():
yield conn
monkeypatch.setattr(provisioning, "connect", connect)
monkeypatch.setattr(provisioning, "admin_client", lambda: admin)
monkeypatch.setattr(provisioning.settings, "MAILU_DOMAIN", "bstein.dev")
monkeypatch.setattr(provisioning.settings, "DEFAULT_USER_GROUPS", ["dev"])
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "")
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "")
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "")
monkeypatch.setattr(provisioning.settings, "ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC", 300)
monkeypatch.setattr(provisioning, "random_password", lambda length=16: f"pw-{length}")
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(provisioning, "invite_user", lambda email: SimpleNamespace(ok=True, status="invited", detail=""))
monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, tasks: all_ok)
def task_statuses(conn: DummyConn) -> dict[str, str]:
statuses: dict[str, str] = {}
for query, params in conn.executed:
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 3:
statuses[str(params[1])] = str(params[2])
return statuses
def test_provision_preflight_lock_and_status_paths(monkeypatch) -> None:
monkeypatch.setattr(provisioning, "all_tasks_ok", lambda conn, code, task_list: task_list == ["x"])
assert provisioning.provision_tasks_complete(DummyConn(), "code") is False
assert provisioning.provision_access_request("").status == "unknown"
monkeypatch.setattr(provisioning, "admin_client", lambda: DummyAdmin(ready=False))
assert provisioning.provision_access_request("code").status == "accounts_building"
conn = DummyConn(request_row(), locked=False)
install_common_patches(monkeypatch, conn, DummyAdmin())
assert provisioning.provision_access_request("code").status == "accounts_building"
conn = DummyConn(None)
install_common_patches(monkeypatch, conn, DummyAdmin())
assert provisioning.provision_access_request("code").status == "unknown"
conn = DummyConn(request_row(status="denied"))
install_common_patches(monkeypatch, conn, DummyAdmin())
assert provisioning.provision_access_request("code").status == "denied"
recent = datetime.now(timezone.utc) - timedelta(seconds=30)
conn = DummyConn(request_row(provision_attempted_at=recent))
install_common_patches(monkeypatch, conn, DummyAdmin())
assert provisioning.provision_access_request("code").status == "accounts_building"
naive_recent = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(seconds=30)
conn = DummyConn(request_row(provision_attempted_at=naive_recent))
install_common_patches(monkeypatch, conn, DummyAdmin())
assert provisioning.provision_access_request("code").status == "accounts_building"
def test_provision_happy_path_creates_user_and_downstream_accounts(monkeypatch) -> None:
conn = DummyConn(request_row(status="approved"))
admin = DummyAdmin(full={"id": "user-1", "attributes": {}, "requiredActions": ["CONFIGURE_TOTP"]})
install_common_patches(monkeypatch, conn, admin)
result = provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert result.ok is True
assert result.status == "awaiting_onboarding"
assert admin.created[0]["username"] == "alice"
assert admin.updated == [("user-1", {"requiredActions": []})]
assert admin.passwords == [("user-1", "pw-20", False)]
assert admin.groups == [("user-1", "group-1")]
assert statuses["keycloak_user"] == "ok"
assert statuses["keycloak_password"] == "ok"
assert statuses["keycloak_groups"] == "ok"
assert statuses["mailu_app_password"] == "ok"
assert statuses["mailu_sync"] == "ok"
assert statuses["nextcloud_mail_sync"] == "ok"
assert statuses["wger_account"] == "ok"
assert statuses["firefly_account"] == "ok"
assert statuses["vaultwarden_invite"] == "ok"
assert any("pg_advisory_unlock" in query for query, _ in conn.executed)
def test_provision_uses_existing_user_attributes_and_enabled_syncs(monkeypatch) -> None:
attrs = {
provisioning.MAILU_EMAIL_ATTR: ["custom@example.dev"],
provisioning.MAILU_ENABLED_ATTR: ["yes"],
provisioning.MAILU_APP_PASSWORD_ATTR: ["mail-pw"],
provisioning.WGER_PASSWORD_ATTR: ["wger-pw"],
provisioning.WGER_PASSWORD_UPDATED_ATTR: ["done"],
provisioning.FIREFLY_PASSWORD_ATTR: "firefly-pw",
provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: "done",
"vaultwarden_email": ["vault@example.dev"],
}
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=MailuClient))
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud")
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync")
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: {"status": "ok"})
result = provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert result.ok is False
assert result.status == "accounts_building"
assert admin.created == []
assert admin.passwords == [("user-1", "existing-pw", False)]
assert statuses["mailu_sync"] == "ok"
assert statuses["nextcloud_mail_sync"] == "ok"
def test_provision_existing_user_attribute_variants(monkeypatch) -> None:
attrs = {
provisioning.MAILU_EMAIL_ATTR: "custom@example.dev",
provisioning.MAILU_ENABLED_ATTR: "yes",
provisioning.MAILU_APP_PASSWORD_ATTR: "mail-pw",
provisioning.WGER_PASSWORD_ATTR: "wger-pw",
provisioning.WGER_PASSWORD_UPDATED_ATTR: "done",
provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"],
provisioning.FIREFLY_PASSWORD_UPDATED_ATTR: ["done"],
"vaultwarden_email": "vault@example.dev",
}
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": attrs, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin)
provisioning.provision_access_request("code")
assert ("alice", "vaultwarden_email", "vault@example.dev") in admin.attributes
def test_provision_keycloak_user_error_paths(monkeypatch) -> None:
conn = DummyConn(request_row(contact_email="alice@example.dev"))
admin = DummyAdmin(email_user={"username": "other"})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
provisioning.provision_access_request("code")
assert task_statuses(conn)["keycloak_user"] == "error"
conn = DummyConn(request_row(contact_email="alice@example.dev"))
admin = DummyAdmin(user={"username": "alice"})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
provisioning.provision_access_request("code")
assert task_statuses(conn)["keycloak_user"] == "error"
def test_provision_attribute_and_sync_error_paths(monkeypatch) -> None:
class SelectiveFailAdmin(DummyAdmin):
def set_user_attribute(self, username: str, key: str, value: str) -> None:
if key == provisioning.MAILU_ENABLED_ATTR:
raise RuntimeError("mailu enabled write failed")
super().set_user_attribute(username, key, value)
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = SelectiveFailAdmin(
user={"id": "user-1"},
full={"id": "user-1", "attributes": {provisioning.MAILU_ENABLED_ATTR: "no"}, "requiredActions": []},
)
install_common_patches(monkeypatch, conn, admin, all_ok=False)
provisioning.provision_access_request("code")
assert task_statuses(conn)["mailu_app_password"] == "ok"
class GetUserFailsAdmin(DummyAdmin):
def get_user(self, user_id: str) -> dict[str, Any]:
raise RuntimeError("keycloak read failed")
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = GetUserFailsAdmin(user={"id": "user-1"})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert statuses["mailu_app_password"] == "error"
assert statuses["wger_account"] == "error"
for client_cls in (FailingMailuClient, ExplodingMailuClient):
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
monkeypatch.setattr(provisioning.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
monkeypatch.setattr(provisioning, "httpx", SimpleNamespace(Client=client_cls))
provisioning.provision_access_request("code")
assert task_statuses(conn)["mailu_sync"] == "error"
def test_provision_nextcloud_and_password_edge_paths(monkeypatch) -> None:
conn = DummyConn(request_row(initial_password=None, initial_password_revealed_at=datetime.now(timezone.utc)))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
provisioning.provision_access_request("code")
assert task_statuses(conn)["keycloak_password"] == "ok"
for sync_result in ({"status": "failed"}, RuntimeError("nextcloud failed")):
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_NAMESPACE", "nextcloud")
monkeypatch.setattr(provisioning.settings, "NEXTCLOUD_MAIL_SYNC_CRONJOB", "sync")
if isinstance(sync_result, Exception):
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(sync_result))
else:
monkeypatch.setattr(provisioning, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: sync_result)
provisioning.provision_access_request("code")
assert task_statuses(conn)["nextcloud_mail_sync"] == "error"
def test_provision_records_task_errors_without_throwing(monkeypatch) -> None:
conn = DummyConn(request_row(contact_email=""))
admin = DummyAdmin(group_id=None)
install_common_patches(monkeypatch, conn, admin, all_ok=False)
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
monkeypatch.setattr(
provisioning,
"invite_user",
lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"),
)
result = provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert result.ok is False
assert statuses["keycloak_user"] == "error"
assert "keycloak_password" not in statuses
existing = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []}, group_id=None)
conn = DummyConn(request_row(initial_password=None))
install_common_patches(monkeypatch, conn, existing, all_ok=False)
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
monkeypatch.setattr(
provisioning,
"invite_user",
lambda email: SimpleNamespace(ok=False, status="error", detail="invite failed"),
)
provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert statuses["keycloak_groups"] == "error"
assert statuses["wger_account"] == "error"
assert statuses["firefly_account"] == "error"
assert statuses["vaultwarden_invite"] == "error"
conn = DummyConn(request_row(initial_password="existing-pw"))
existing = DummyAdmin(
user={"id": "user-1"},
full={
"id": "user-1",
"attributes": {
provisioning.WGER_PASSWORD_ATTR: "wger-pw",
provisioning.FIREFLY_PASSWORD_ATTR: ["firefly-pw"],
},
"requiredActions": [],
},
)
install_common_patches(monkeypatch, conn, existing, all_ok=False)
monkeypatch.setattr(provisioning, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
monkeypatch.setattr(provisioning, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
provisioning.provision_access_request("code")
statuses = task_statuses(conn)
assert statuses["wger_account"] == "error"
assert statuses["firefly_account"] == "error"
def test_provision_falls_back_for_vaultwarden_invite(monkeypatch) -> None:
conn = DummyConn(request_row(contact_email="fallback@example.dev"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin)
invited: list[str] = []
def fake_invite(email: str):
invited.append(email)
if email == "alice@bstein.dev":
return SimpleNamespace(ok=False, status="error", detail="primary failed")
return SimpleNamespace(ok=True, status="fallback_invited", detail="")
monkeypatch.setattr(provisioning, "invite_user", fake_invite)
provisioning.provision_access_request("code")
assert invited == ["alice@bstein.dev", "fallback@example.dev"]
assert ("alice", "vaultwarden_email", "fallback@example.dev") in admin.attributes
class VaultAttrFailAdmin(DummyAdmin):
def set_user_attribute(self, username: str, key: str, value: str) -> None:
if key.startswith("vaultwarden_"):
raise RuntimeError("vault attr failed")
super().set_user_attribute(username, key, value)
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = VaultAttrFailAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin)
provisioning.provision_access_request("code")
assert task_statuses(conn)["vaultwarden_invite"] == "ok"
conn = DummyConn(request_row(initial_password="existing-pw"))
admin = DummyAdmin(user={"id": "user-1"}, full={"id": "user-1", "attributes": {}, "requiredActions": []})
install_common_patches(monkeypatch, conn, admin, all_ok=False)
monkeypatch.setattr(provisioning, "invite_user", lambda email: (_ for _ in ()).throw(RuntimeError("vault down")))
provisioning.provision_access_request("code")
assert task_statuses(conn)["vaultwarden_invite"] == "error"