from tests.unit.manager.provisioning_helpers import * def test_provisioning_missing_verified_email(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def find_user(self, username): return None monkeypatch.setattr(prov, "keycloak_admin", Admin()) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": None, "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_MISSING") assert outcome.status == "accounts_building" def test_provisioning_email_conflict(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def find_user(self, username): return None def find_user_by_email(self, email): return {"username": "other"} monkeypatch.setattr(prov, "keycloak_admin", Admin()) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_CONFLICT") assert outcome.status == "accounts_building" def test_provisioning_missing_contact_email(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def find_user(self, username): return None monkeypatch.setattr(prov, "keycloak_admin", Admin()) row = { "username": "alice", "contact_email": "", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_EMPTY") assert outcome.status == "accounts_building" def test_provisioning_user_id_missing(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def find_user(self, username): return {"id": ""} monkeypatch.setattr(prov, "keycloak_admin", Admin()) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_ID") assert outcome.status == "accounts_building" def test_provisioning_initial_password_revealed(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin()) monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True) monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": None, "initial_password_revealed_at": datetime.now(timezone.utc), "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_REVEALED") assert outcome.status == "accounts_building" def test_provisioning_vaultwarden_attribute_failure(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def set_user_attribute(self, username, key, value): raise RuntimeError("fail") monkeypatch.setattr(prov, "keycloak_admin", Admin()) monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True) monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"}) monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_VAULT") assert outcome.status == "accounts_building" def test_provisioning_vaultwarden_rate_limited(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, vaultwarden_admin_rate_limit_backoff_sec=600.0, provision_retry_cooldown_sec=0.0, ) monkeypatch.setattr(prov, "settings", dummy_settings) monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin()) monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda *_args, **_kwargs: True) monkeypatch.setattr( prov.vaultwarden, "invite_user", lambda _email: VaultwardenInvite(False, "rate_limited", "vaultwarden rate limited"), ) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } manager = prov.ProvisioningManager(DummyDB(row), DummyStorage()) ctx = prov.RequestContext( request_code="REQ_RATE", username="alice", first_name="", last_name="", contact_email="alice@example.com", email_verified_at=row["email_verified_at"], status="accounts_building", initial_password="temp", revealed_at=None, attempted_at=None, approval_flags=[], user_id="1", mailu_email="alice@bstein.dev", ) conn = DummyConn(row) manager._ensure_vaultwarden_invite(conn, ctx) inserts = [ params for query, params in conn.executed if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4 ] assert any(params[2] == "pending" and "rate limited until" in (params[3] or "") for params in inserts) def test_provisioning_vaultwarden_grandfathered(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, vaultwarden_admin_rate_limit_backoff_sec=600.0, provision_retry_cooldown_sec=0.0, ) monkeypatch.setattr(prov, "settings", dummy_settings) monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin()) monkeypatch.setattr( prov.vaultwarden, "find_user_by_email", lambda _email: VaultwardenLookup(True, "present", "user found"), ) monkeypatch.setattr( prov.vaultwarden, "invite_user", lambda _email: (_ for _ in ()).throw(RuntimeError("invite should not run")), ) row = { "username": "legacy", "contact_email": "legacy@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [prov.VAULTWARDEN_GRANDFATHERED_FLAG], } manager = prov.ProvisioningManager(DummyDB(row), DummyStorage()) ctx = prov.RequestContext( request_code="REQ_GRANDFATHER", username="legacy", first_name="", last_name="", contact_email="legacy@example.com", email_verified_at=row["email_verified_at"], status="accounts_building", initial_password="temp", revealed_at=None, attempted_at=None, approval_flags=row["approval_flags"], user_id="1", mailu_email="legacy@bstein.dev", ) conn = DummyConn(row) manager._ensure_vaultwarden_invite(conn, ctx) inserts = [ params for query, params in conn.executed if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4 ] assert any(params[2] == "ok" and params[3] == "grandfathered" for params in inserts)