from tests.unit.manager.provisioning_helpers import * def test_provisioning_run_loop_waits_for_admin(monkeypatch) -> None: dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: False) class DB: def fetchall(self, *_args, **_kwargs): return [] class Storage: def list_provision_candidates(self): raise AssertionError("should not list candidates") manager = prov.ProvisioningManager(DB(), Storage()) def fake_sleep(_): manager._stop_event.set() monkeypatch.setattr(prov.time, "sleep", fake_sleep) manager._stop_event.clear() manager._run_loop() def test_provisioning_initial_password_missing(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="", nextcloud_mail_sync_cronjob="", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) admin = DummyAdmin() monkeypatch.setattr(prov, "keycloak_admin", admin) monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True) monkeypatch.setattr(prov.wger, "sync_user", lambda username, email, password, wait=True: {"status": "ok"}) monkeypatch.setattr(prov.firefly, "sync_user", lambda email, password, wait=True: {"status": "ok"}) monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "awaiting_onboarding", "initial_password": None, "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } db = DummyDB(row) storage = DummyStorage() manager = prov.ProvisioningManager(db, storage) outcome = manager.provision_access_request("REQ201") assert outcome.status == "awaiting_onboarding" def test_provisioning_group_and_mailu_errors(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_sync_url="http://mailu", mailu_mailbox_wait_timeout_sec=1.0, nextcloud_namespace="nextcloud", nextcloud_mail_sync_cronjob="nextcloud-mail-sync", provision_retry_cooldown_sec=0.0, default_user_groups=["dev"], allowed_flag_groups=[], welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) class Admin(DummyAdmin): def get_group_id(self, group_name: str): return None def set_user_attribute(self, username: str, key: str, value: str): if key == "mailu_app_password": raise RuntimeError("fail") return None admin = Admin() monkeypatch.setattr(prov, "keycloak_admin", admin) monkeypatch.setattr(prov.mailu, "sync", lambda reason, force=False: None) monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True) monkeypatch.setattr(prov.nextcloud, "sync_mail", lambda username, wait=True: (_ for _ in ()).throw(RuntimeError("fail"))) monkeypatch.setattr(prov.wger, "sync_user", lambda username, email, password, wait=True: {"status": "ok"}) monkeypatch.setattr(prov.firefly, "sync_user", lambda email, password, wait=True: {"status": "ok"}) monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False) row = { "username": "alice", "contact_email": "alice@example.com", "email_verified_at": datetime.now(timezone.utc), "status": "accounts_building", "initial_password": "temp", "initial_password_revealed_at": None, "provision_attempted_at": None, "approval_flags": [], } db = DummyDB(row) storage = DummyStorage() manager = prov.ProvisioningManager(db, storage) outcome = manager.provision_access_request("REQ202") assert outcome.status == "accounts_building" def test_provisioning_task_helpers() -> None: class Conn: def execute(self, *_args, **_kwargs): class Result: def fetchall(self): return [ {"task": "a", "status": "ok"}, {"task": "b", "status": "error"}, "bad", ] return Result() manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) statuses = manager._task_statuses(Conn(), "REQ") assert statuses == {"a": "ok", "b": "error"} assert manager._all_tasks_ok(Conn(), "REQ", ["a"]) is True assert manager._all_tasks_ok(Conn(), "REQ", ["b"]) is False def test_provisioning_retryable_detail_detection() -> None: manager = prov.ProvisioningManager(DummyDB({}, locked=True), DummyStorage()) assert manager._is_retryable_detail("") is False assert manager._is_retryable_detail("timeout") is True assert manager._is_retryable_detail("http 503: service unavailable") is True assert manager._is_retryable_detail("mailbox not ready") is True assert manager._is_retryable_detail("invalid credentials") is False assert manager._retryable_detail("") == "retryable: temporary failure" assert manager._retryable_detail("timeout").startswith("retryable:") def test_provisioning_retry_at_parsing_and_due_state() -> None: manager = prov.ProvisioningManager(DummyDB({}, locked=True), DummyStorage()) assert manager._parse_retry_at("") is None assert manager._parse_retry_at("not a retry") is None assert manager._parse_retry_at("rate limited until not-a-date") is None assert manager._parse_retry_at("rate limited until 2099-01-01T00:00:00+0000").tzinfo is not None class Conn: def __init__(self, row): self.row = row def execute(self, *_args, **_kwargs): return types.SimpleNamespace(fetchone=lambda: self.row) assert manager._vaultwarden_retry_due(Conn({"status": "ok", "detail": ""}), "REQ") is True assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": "not a retry"}), "REQ") is True future = "rate limited until 2099-01-01T00:00:00Z" assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": future}), "REQ") is False past = "rate limited until 2000-01-01T00:00:00Z" assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": past}), "REQ") is True def test_provisioning_ensure_task_rows_empty() -> None: manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) manager._ensure_task_rows(DummyConn({}, locked=True), "REQ", []) def test_provisioning_record_task_ignores_storage_errors(monkeypatch) -> None: class Storage: def record_event(self, *args, **kwargs): raise RuntimeError("fail") def record_task_run(self, *args, **kwargs): raise RuntimeError("fail") db = DummyDB({}) manager = prov.ProvisioningManager(db, Storage()) manager._record_task("REQ", "task", "ok", None, datetime.now(timezone.utc)) def test_provisioning_send_welcome_email_variants(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( welcome_email_enabled=False, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) manager._send_welcome_email("REQ", "alice", "alice@example.com") dummy_settings = types.SimpleNamespace( welcome_email_enabled=True, portal_public_base_url="https://bstein.dev", ) monkeypatch.setattr(prov, "settings", dummy_settings) _patch_mailu_ready(monkeypatch, dummy_settings) manager._send_welcome_email("REQ", "alice", "") class DB(DummyDB): def fetchone(self, *_args, **_kwargs): return None class Storage(DummyStorage): def mark_welcome_sent(self, *args, **kwargs): raise AssertionError("should not be called") manager = prov.ProvisioningManager(DB({}), Storage()) monkeypatch.setattr( prov.mailer, "send_welcome", lambda *args, **kwargs: (_ for _ in ()).throw(prov.MailerError("fail")), ) manager._send_welcome_email("REQ", "alice", "alice@example.com") def test_provisioning_start_stop(monkeypatch) -> None: class DummyThread: def __init__(self, target=None, name=None, daemon=None): self.started = False self.joined = False def is_alive(self) -> bool: return False def start(self) -> None: self.started = True def join(self, timeout=None) -> None: self.joined = True monkeypatch.setattr(prov.threading, "Thread", DummyThread) manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) manager.start() assert manager._thread.started is True manager.stop() assert manager._thread.joined is True def test_provisioning_start_skips_when_running() -> None: class LiveThread: def __init__(self): self.started = False def is_alive(self) -> bool: return True def start(self) -> None: self.started = True manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) manager._thread = LiveThread() manager.start() assert manager._thread.started is False def test_provisioning_run_loop_skips_when_admin_not_ready(monkeypatch) -> None: manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: False) monkeypatch.setattr(manager, "_sync_status_metrics", lambda: (_ for _ in ()).throw(RuntimeError("fail"))) monkeypatch.setattr( prov.time, "sleep", lambda *_args, **_kwargs: manager._stop_event.set(), ) manager._run_loop() def test_provisioning_run_loop_processes_candidates(monkeypatch) -> None: manager = prov.ProvisioningManager(DummyDB({}), DummyStorage()) monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: True) monkeypatch.setattr(manager._storage, "list_provision_candidates", lambda: [types.SimpleNamespace(request_code="REQ")]) calls: list[str] = [] monkeypatch.setattr(manager, "provision_access_request", lambda code: calls.append(code)) monkeypatch.setattr( prov.time, "sleep", lambda *_args, **_kwargs: manager._stop_event.set(), ) manager._run_loop() assert calls == ["REQ"] def test_provisioning_sync_status_metrics(monkeypatch) -> None: db = DummyDB({}) db.fetchall = lambda *_args, **_kwargs: [{"status": "pending", "count": 2}] manager = prov.ProvisioningManager(db, DummyStorage()) captured = {} monkeypatch.setattr(prov, "set_access_request_counts", lambda payload: captured.update(payload)) manager._sync_status_metrics() assert captured["pending"] == 2