ariadne/tests/unit/services/test_vaultwarden_sync_edges.py

136 lines
6.1 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import types
from ariadne.services import vaultwarden_sync
from ariadne.services.vaultwarden import VaultwardenInvite
@dataclass
class Admin:
attrs: dict[str, dict] | None = None
fail_set: bool = False
calls: list[tuple[str, str, str]] | None = None
def get_user(self, user_id: str):
if user_id == "boom":
raise RuntimeError("missing")
return (self.attrs or {}).get(user_id, {})
def set_user_attribute(self, username: str, key: str, value: str) -> None:
if self.fail_set:
raise RuntimeError("set failed")
if self.calls is None:
self.calls = []
self.calls.append((username, key, value))
def _settings(**overrides):
base = {
"mailu_domain": "bstein.dev",
"vaultwarden_retry_cooldown_sec": 60,
"vaultwarden_failure_bailout": 2,
"vaultwarden_invite_refresh_sec": 3600,
}
base.update(overrides)
return types.SimpleNamespace(**base)
def _old_timestamp() -> str:
return (datetime.now(timezone.utc) - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
def test_normalize_user_and_pending_failure_edges(monkeypatch) -> None:
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin(attrs={"1": {"username": "alice"}}))
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=0, vaultwarden_invite_refresh_sec=1))
assert vaultwarden_sync._normalize_user({"username": ""}) is None
assert vaultwarden_sync._normalize_user({"username": "alice", "enabled": False}) is None
assert vaultwarden_sync._normalize_user({"username": "service-account-api"}) is None
assert vaultwarden_sync._normalize_user({"username": "api", "serviceAccountClientId": "client"}) is None
assert vaultwarden_sync._normalize_user({"id": "boom", "username": "fallback"}) == ("fallback", {"id": "boom", "username": "fallback"})
assert vaultwarden_sync._has_pending_failures(
[{"username": "alice", "attributes": {"vaultwarden_status": ["already_present"], "vaultwarden_synced_at": [_old_timestamp()]}}]
)
assert vaultwarden_sync._has_pending_failures(
[{"username": "alice", "attributes": {"vaultwarden_status": ["invited"], "vaultwarden_synced_at": [_old_timestamp()]}}]
)
assert vaultwarden_sync._has_pending_failures(
[{"username": "alice", "attributes": {"vaultwarden_status": ["error"], "vaultwarden_synced_at": [_old_timestamp()]}}]
)
def test_master_password_and_existing_invite_handlers(monkeypatch) -> None:
admin = Admin()
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_invite_refresh_sec=0))
counters = vaultwarden_sync.VaultwardenSyncCounters()
vaultwarden_sync._set_master_password_set("alice", {"attributes": {"vaultwarden_master_password_set_at": ["already"]}})
assert admin.calls is None
vaultwarden_sync._set_master_password_set("alice", {"attributes": {}})
assert admin.calls and admin.calls[-1][1] == "vaultwarden_master_password_set_at"
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin(fail_set=True))
vaultwarden_sync._set_master_password_set("alice", {"attributes": {}})
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
handled = vaultwarden_sync._handle_existing_invite(
vaultwarden_sync.VaultwardenInviteState("alice", "already_present", "", None, {"attributes": {}}, counters)
)
assert handled is True
assert counters.skipped == 1
handled = vaultwarden_sync._handle_existing_invite(
vaultwarden_sync.VaultwardenInviteState("bob", "invited", "", None, {"attributes": {}}, counters)
)
assert handled is True
assert counters.skipped == 2
handled = vaultwarden_sync._handle_existing_invite(
vaultwarden_sync.VaultwardenInviteState("carol", "invited", _old_timestamp(), 1.0, {"attributes": {}}, counters)
)
assert handled is True
assert counters.skipped == 3
def test_sync_user_skip_and_already_present_edges(monkeypatch) -> None:
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=9999, vaultwarden_invite_refresh_sec=0))
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin())
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda _email: True)
counters = vaultwarden_sync.VaultwardenSyncCounters()
assert vaultwarden_sync._sync_user({"username": ""}, counters) == (None, False)
assert counters.skipped == 1
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
assert vaultwarden_sync._sync_user(
{"username": "cooldown", "attributes": {"vaultwarden_status": ["error"], "vaultwarden_synced_at": [now]}},
counters,
) == (None, False)
assert counters.skipped == 2
assert vaultwarden_sync._sync_user({"id": "missing-full-user", "username": "missing-email", "attributes": {}, "email": ""}, counters) == (None, False)
assert counters.skipped == 3
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_invite_refresh_sec=1))
assert vaultwarden_sync._should_refresh_invite(None) is True
def test_sync_user_sets_master_password_for_already_present(monkeypatch) -> None:
admin = Admin()
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=0, vaultwarden_invite_refresh_sec=0))
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda _email: True)
monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda _email: VaultwardenInvite(True, "already_present"))
counters = vaultwarden_sync.VaultwardenSyncCounters()
status, ok = vaultwarden_sync._sync_user({"username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}, counters)
assert (status, ok) == ("already_present", True)
assert any(call[1] == "vaultwarden_master_password_set_at" for call in admin.calls or [])