test(ariadne): cover Vaultwarden service edges

This commit is contained in:
codex 2026-04-21 03:53:30 -03:00
parent 1b2d30e67e
commit 160dbd5f3d

View File

@ -1,4 +1,5 @@
from tests.unit.services.service_helpers import * from tests.unit.services.service_helpers import *
from ariadne.services.vaultwarden import VaultwardenInvite, VaultwardenLookup
def test_vaultwarden_invite_uses_admin_session(monkeypatch) -> None: def test_vaultwarden_invite_uses_admin_session(monkeypatch) -> None:
@ -353,3 +354,116 @@ def test_vaultwarden_admin_session_closes_existing(monkeypatch) -> None:
svc._admin_session_base_url = "http://old" svc._admin_session_base_url = "http://old"
assert svc._admin_session("http://vaultwarden") is not None assert svc._admin_session("http://vaultwarden") is not None
def test_vaultwarden_invite_conflict_status_edges() -> None:
class BadTextResponse:
@property
def text(self):
raise RuntimeError("bad body")
assert VaultwardenService._invite_conflict_status(DummyResponse(409, "already invited")) == "invited"
assert VaultwardenService._invite_conflict_status(DummyResponse(409, "something else")) is None
assert VaultwardenService._invite_conflict_status(BadTextResponse()) is None
def test_vaultwarden_invite_via_direct_edges(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
svc = VaultwardenService()
assert svc._invite_via("", "alice@bstein.dev") is None
invited = DummyVaultwardenClient()
invited.responses["/admin/invite"] = DummyResponse(409, "already invited")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: invited)
assert svc._invite_via("http://vaultwarden", "alice@bstein.dev").detail == "user already invited"
unknown_conflict = DummyVaultwardenClient()
unknown_conflict.responses["/admin/invite"] = DummyResponse(400, "bad request")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: unknown_conflict)
assert svc._invite_via("http://vaultwarden", "alice@bstein.dev").detail == "status 400"
def test_vaultwarden_invite_user_skips_blank_candidate(monkeypatch) -> None:
svc = VaultwardenService()
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(svc, "_invite_via", lambda base_url, _email: None if not base_url else VaultwardenInvite(True, "invited"))
assert svc.invite_user("alice@bstein.dev").status == "invited"
def test_vaultwarden_lookup_via_direct_edges(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
svc = VaultwardenService()
assert svc._lookup_via("", "alice@bstein.dev") is None
rate_limited = DummyVaultwardenClient()
rate_limited.responses["/admin/users"] = DummyResponse(429, "rate limited")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: rate_limited)
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").status == "rate_limited"
bad_payload = DummyVaultwardenClient()
bad_payload.responses["/admin/users"] = DummyResponse(200, "", json_data={"bad": "shape"})
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: bad_payload)
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").detail == "unexpected users response"
mixed_users = DummyVaultwardenClient()
mixed_users.responses["/admin/users"] = DummyResponse(200, "", json_data=["bad", {"email": "alice@bstein.dev"}])
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: mixed_users)
assert svc._lookup_via("http://vaultwarden", "ALICE@bstein.dev").status == "present"
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: (_ for _ in ()).throw(RuntimeError("rate limited")))
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").status == "rate_limited"
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: (_ for _ in ()).throw(RuntimeError("boom")))
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").detail == "boom"
def test_vaultwarden_find_user_by_email_edges() -> None:
svc = VaultwardenService()
assert svc.find_user_by_email("bad-email").status == "invalid_email"
svc._rate_limited_until = time.time() + 60
assert svc.find_user_by_email("alice@bstein.dev").status == "rate_limited"
def test_vaultwarden_find_user_fallbacks(monkeypatch) -> None:
svc = VaultwardenService()
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(
svc,
"_lookup_via",
lambda base_url, _email: None if not base_url else VaultwardenLookup(False, "rate_limited", "slow down"),
)
assert svc.find_user_by_email("alice@bstein.dev").status == "rate_limited"
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(
svc,
"_lookup_via",
lambda base_url, _email: None if not base_url else VaultwardenLookup(False, "error", "lookup failed"),
)
result = svc.find_user_by_email("alice@bstein.dev")
assert result.status == "error"
assert result.detail == "lookup failed"