from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import types from ariadne.services.vaultwarden import VaultwardenInvite from ariadne.services import vaultwarden_sync @dataclass class DummyAdmin: ready_value: bool = True users: list[dict] = None attrs: dict[str, dict] = None set_calls: list[tuple[str, str, str]] = None def ready(self) -> bool: return self.ready_value def iter_users(self, page_size: int = 200, brief: bool = False): return self.users or [] def get_user(self, user_id: str): return self.attrs.get(user_id, {}) if self.attrs else {} def set_user_attribute(self, username: str, key: str, value: str) -> None: if self.set_calls is None: self.set_calls = [] self.set_calls.append((username, key, value)) def test_vaultwarden_sync_requires_admin(monkeypatch) -> None: dummy = DummyAdmin(ready_value=False) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.failures == 1 assert summary.detail == "keycloak admin not configured" def test_vaultwarden_sync_skips_without_pending_failures(monkeypatch) -> None: now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") dummy = DummyAdmin( users=[ { "id": "1", "username": "alice", "enabled": True, "attributes": {"vaultwarden_status": ["invited"], "vaultwarden_synced_at": [now]}, } ], attrs={ "1": { "id": "1", "username": "alice", "attributes": {"vaultwarden_status": ["invited"], "vaultwarden_synced_at": [now]}, } }, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.detail == "no pending failures" assert summary.processed == 0 def test_vaultwarden_sync_skips_when_missing_mailbox(monkeypatch) -> None: dummy = DummyAdmin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: False) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.skipped == 1 assert summary.processed == 0 def test_vaultwarden_sync_invites(monkeypatch) -> None: dummy = DummyAdmin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.created_or_present == 1 assert dummy.set_calls def test_vaultwarden_sync_respects_retry_cooldown(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=9999, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=9999, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") dummy = DummyAdmin( users=[ { "id": "1", "username": "alice", "enabled": True, "attributes": { "mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["rate_limited"], "vaultwarden_synced_at": [now], }, } ], attrs={ "1": { "id": "1", "username": "alice", "attributes": { "mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["rate_limited"], "vaultwarden_synced_at": [now], }, } }, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.detail == "no pending failures" def test_vaultwarden_sync_bails_after_failures(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=1, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) dummy = DummyAdmin( users=[ {"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}, {"id": "2", "username": "bob", "enabled": True, "attributes": {"mailu_email": ["bob@bstein.dev"]}}, ], attrs={ "1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}, "2": {"id": "2", "username": "bob", "attributes": {"mailu_email": ["bob@bstein.dev"]}}, }, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(False, "error", "error")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.failures == 1 def test_vaultwarden_sync_uses_keycloak_email(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) dummy = DummyAdmin( users=[ { "id": "1", "username": "alice", "enabled": True, "email": "alice@bstein.dev", "attributes": {}, } ], attrs={ "1": { "id": "1", "username": "alice", "email": "alice@bstein.dev", "attributes": {}, } }, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.created_or_present == 1 def test_extract_attr_variants() -> None: assert vaultwarden_sync._extract_attr("bad", "key") == "" assert vaultwarden_sync._extract_attr({"key": ["", " "]}, "key") == "" assert vaultwarden_sync._extract_attr({"key": "value"}, "key") == "value" def test_parse_synced_at_invalid() -> None: assert vaultwarden_sync._parse_synced_at("") is None assert vaultwarden_sync._parse_synced_at("not-a-date") is None def test_parse_synced_at_valid() -> None: assert vaultwarden_sync._parse_synced_at("2025-01-01T00:00:00Z") assert vaultwarden_sync._parse_synced_at("2025-01-01T00:00:00+0000") def test_vaultwarden_email_for_user_prefers_attributes(monkeypatch) -> None: dummy_settings = types.SimpleNamespace(mailu_domain="bstein.dev") monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) user = {"username": "alice", "attributes": {"vaultwarden_email": ["alice@bstein.dev"]}} assert vaultwarden_sync._vaultwarden_email_for_user(user) == "alice@bstein.dev" def test_vaultwarden_email_for_user_uses_mailu_attr(monkeypatch) -> None: dummy_settings = types.SimpleNamespace(mailu_domain="bstein.dev") monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) user = {"username": "alice", "attributes": {"mailu_email": ["alias@bstein.dev"]}} assert vaultwarden_sync._vaultwarden_email_for_user(user) == "alias@bstein.dev" def test_vaultwarden_email_for_user_missing_username(monkeypatch) -> None: dummy_settings = types.SimpleNamespace(mailu_domain="bstein.dev") monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) assert vaultwarden_sync._vaultwarden_email_for_user({"username": " "}) == "" def test_vaultwarden_email_for_user_defaults_mailu(monkeypatch) -> None: dummy_settings = types.SimpleNamespace(mailu_domain="bstein.dev") monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) user = {"username": "alice", "email": "alice@example.com", "attributes": {}} assert vaultwarden_sync._vaultwarden_email_for_user(user) == "alice@bstein.dev" def test_set_user_attribute_if_missing_skips_existing(monkeypatch) -> None: dummy = DummyAdmin(users=[]) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) vaultwarden_sync._set_user_attribute_if_missing( "alice", {"attributes": {"vaultwarden_email": ["alice@bstein.dev"]}}, "vaultwarden_email", "alice@bstein.dev", ) assert dummy.set_calls is None def test_set_user_attribute_if_missing_empty_value(monkeypatch) -> None: dummy = DummyAdmin(users=[]) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) vaultwarden_sync._set_user_attribute_if_missing("alice", {"attributes": {}}, "vaultwarden_email", "") assert dummy.set_calls is None def test_set_user_attribute_ignores_empty(monkeypatch) -> None: dummy = DummyAdmin(users=[]) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) vaultwarden_sync._set_user_attribute("alice", "vaultwarden_email", "") assert dummy.set_calls is None def test_vaultwarden_sync_sets_synced_at_for_invited(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) dummy = DummyAdmin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["invited"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["invited"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.skipped == 1 assert dummy.set_calls def test_vaultwarden_sync_skips_disabled_and_service_accounts(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) dummy = DummyAdmin( users=[ {"id": "1", "username": "", "enabled": True, "attributes": {}}, {"id": "2", "username": "bob", "enabled": False, "attributes": {}}, {"id": "3", "username": "service-account-test", "enabled": True, "attributes": {}}, ], attrs={}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.detail == "no pending failures" def test_vaultwarden_sync_get_user_failure(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) class Admin(DummyAdmin): def get_user(self, user_id: str): raise RuntimeError("fail") dummy = Admin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.created_or_present == 1 def test_vaultwarden_sync_invited_attribute_failure(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, vaultwarden_invite_refresh_sec=0, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) class Admin(DummyAdmin): def set_user_attribute(self, username: str, key: str, value: str) -> None: raise RuntimeError("fail") dummy = Admin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["invited"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"], "vaultwarden_status": ["invited"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.skipped == 1 def test_vaultwarden_sync_set_attribute_failure_on_success(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) class Admin(DummyAdmin): def set_user_attribute(self, username: str, key: str, value: str) -> None: raise RuntimeError("fail") dummy = Admin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.created_or_present == 1 def test_vaultwarden_sync_set_attribute_failure_on_error(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) class Admin(DummyAdmin): def set_user_attribute(self, username: str, key: str, value: str) -> None: raise RuntimeError("fail") dummy = Admin( users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(False, "error", "error")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.failures == 1 def test_vaultwarden_sync_defaults_mailu_email(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", vaultwarden_retry_cooldown_sec=0, vaultwarden_failure_bailout=2, ) monkeypatch.setattr(vaultwarden_sync, "settings", dummy_settings) dummy = DummyAdmin( users=[{"id": "1", "username": "alice", "enabled": True, "email": "alice@example.com", "attributes": {}}], attrs={"1": {"id": "1", "username": "alice", "email": "alice@example.com", "attributes": {}}}, ) monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) summary = vaultwarden_sync.run_vaultwarden_sync() assert summary.processed == 1 assert summary.created_or_present == 1