136 lines
6.1 KiB
Python
136 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
import types
|
|
|
|
from ariadne.services import vaultwarden_sync
|
|
from ariadne.services.vaultwarden import VaultwardenInvite
|
|
|
|
|
|
@dataclass
|
|
class Admin:
|
|
attrs: dict[str, dict] | None = None
|
|
fail_set: bool = False
|
|
calls: list[tuple[str, str, str]] | None = None
|
|
|
|
def get_user(self, user_id: str):
|
|
if user_id == "boom":
|
|
raise RuntimeError("missing")
|
|
return (self.attrs or {}).get(user_id, {})
|
|
|
|
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
|
if self.fail_set:
|
|
raise RuntimeError("set failed")
|
|
if self.calls is None:
|
|
self.calls = []
|
|
self.calls.append((username, key, value))
|
|
|
|
|
|
def _settings(**overrides):
|
|
base = {
|
|
"mailu_domain": "bstein.dev",
|
|
"vaultwarden_retry_cooldown_sec": 60,
|
|
"vaultwarden_failure_bailout": 2,
|
|
"vaultwarden_invite_refresh_sec": 3600,
|
|
}
|
|
base.update(overrides)
|
|
return types.SimpleNamespace(**base)
|
|
|
|
|
|
def _old_timestamp() -> str:
|
|
return (datetime.now(timezone.utc) - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def test_normalize_user_and_pending_failure_edges(monkeypatch) -> None:
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin(attrs={"1": {"username": "alice"}}))
|
|
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=0, vaultwarden_invite_refresh_sec=1))
|
|
|
|
assert vaultwarden_sync._normalize_user({"username": ""}) is None
|
|
assert vaultwarden_sync._normalize_user({"username": "alice", "enabled": False}) is None
|
|
assert vaultwarden_sync._normalize_user({"username": "service-account-api"}) is None
|
|
assert vaultwarden_sync._normalize_user({"username": "api", "serviceAccountClientId": "client"}) is None
|
|
assert vaultwarden_sync._normalize_user({"id": "boom", "username": "fallback"}) == ("fallback", {"id": "boom", "username": "fallback"})
|
|
|
|
assert vaultwarden_sync._has_pending_failures(
|
|
[{"username": "alice", "attributes": {"vaultwarden_status": ["already_present"], "vaultwarden_synced_at": [_old_timestamp()]}}]
|
|
)
|
|
assert vaultwarden_sync._has_pending_failures(
|
|
[{"username": "alice", "attributes": {"vaultwarden_status": ["invited"], "vaultwarden_synced_at": [_old_timestamp()]}}]
|
|
)
|
|
assert vaultwarden_sync._has_pending_failures(
|
|
[{"username": "alice", "attributes": {"vaultwarden_status": ["error"], "vaultwarden_synced_at": [_old_timestamp()]}}]
|
|
)
|
|
|
|
|
|
def test_master_password_and_existing_invite_handlers(monkeypatch) -> None:
|
|
admin = Admin()
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
|
|
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_invite_refresh_sec=0))
|
|
counters = vaultwarden_sync.VaultwardenSyncCounters()
|
|
|
|
vaultwarden_sync._set_master_password_set("alice", {"attributes": {"vaultwarden_master_password_set_at": ["already"]}})
|
|
assert admin.calls is None
|
|
|
|
vaultwarden_sync._set_master_password_set("alice", {"attributes": {}})
|
|
assert admin.calls and admin.calls[-1][1] == "vaultwarden_master_password_set_at"
|
|
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin(fail_set=True))
|
|
vaultwarden_sync._set_master_password_set("alice", {"attributes": {}})
|
|
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
|
|
handled = vaultwarden_sync._handle_existing_invite(
|
|
vaultwarden_sync.VaultwardenInviteState("alice", "already_present", "", None, {"attributes": {}}, counters)
|
|
)
|
|
assert handled is True
|
|
assert counters.skipped == 1
|
|
|
|
handled = vaultwarden_sync._handle_existing_invite(
|
|
vaultwarden_sync.VaultwardenInviteState("bob", "invited", "", None, {"attributes": {}}, counters)
|
|
)
|
|
assert handled is True
|
|
assert counters.skipped == 2
|
|
|
|
handled = vaultwarden_sync._handle_existing_invite(
|
|
vaultwarden_sync.VaultwardenInviteState("carol", "invited", _old_timestamp(), 1.0, {"attributes": {}}, counters)
|
|
)
|
|
assert handled is True
|
|
assert counters.skipped == 3
|
|
|
|
|
|
def test_sync_user_skip_and_already_present_edges(monkeypatch) -> None:
|
|
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=9999, vaultwarden_invite_refresh_sec=0))
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", Admin())
|
|
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda _email: True)
|
|
counters = vaultwarden_sync.VaultwardenSyncCounters()
|
|
|
|
assert vaultwarden_sync._sync_user({"username": ""}, counters) == (None, False)
|
|
assert counters.skipped == 1
|
|
|
|
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
assert vaultwarden_sync._sync_user(
|
|
{"username": "cooldown", "attributes": {"vaultwarden_status": ["error"], "vaultwarden_synced_at": [now]}},
|
|
counters,
|
|
) == (None, False)
|
|
assert counters.skipped == 2
|
|
|
|
assert vaultwarden_sync._sync_user({"id": "missing-full-user", "username": "missing-email", "attributes": {}, "email": ""}, counters) == (None, False)
|
|
assert counters.skipped == 3
|
|
|
|
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_invite_refresh_sec=1))
|
|
assert vaultwarden_sync._should_refresh_invite(None) is True
|
|
|
|
|
|
def test_sync_user_sets_master_password_for_already_present(monkeypatch) -> None:
|
|
admin = Admin()
|
|
monkeypatch.setattr(vaultwarden_sync, "settings", _settings(vaultwarden_retry_cooldown_sec=0, vaultwarden_invite_refresh_sec=0))
|
|
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", admin)
|
|
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda _email: True)
|
|
monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda _email: VaultwardenInvite(True, "already_present"))
|
|
counters = vaultwarden_sync.VaultwardenSyncCounters()
|
|
|
|
status, ok = vaultwarden_sync._sync_user({"username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}, counters)
|
|
|
|
assert (status, ok) == ("already_present", True)
|
|
assert any(call[1] == "vaultwarden_master_password_set_at" for call in admin.calls or [])
|