ariadne/tests/unit/services/test_vaultwarden_service.py

470 lines
20 KiB
Python

from tests.unit.services.service_helpers import *
from ariadne.services.vaultwarden import VaultwardenInvite, VaultwardenLookup
def test_vaultwarden_invite_uses_admin_session(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.invite_user("alice@bstein.dev")
assert result.ok is True
assert any(call[0] == "/admin" for call in client.calls)
assert any(call[0] == "/admin/invite" for call in client.calls)
def test_vaultwarden_invite_handles_rate_limit(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
client.responses["/admin/invite"] = DummyResponse(429, "rate limited")
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.invite_user("alice@bstein.dev")
assert result.status == "rate_limited"
def test_vaultwarden_invite_existing_user(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
client.responses["/admin/invite"] = DummyResponse(409, "user already exists")
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.invite_user("alice@bstein.dev")
assert result.status == "already_present"
def test_vaultwarden_invite_rejects_invalid_email() -> None:
svc = VaultwardenService()
result = svc.invite_user("bad-email")
assert result.status == "invalid_email"
def test_vaultwarden_invite_rate_limited_short_circuit() -> None:
svc = VaultwardenService()
svc._rate_limited_until = time.time() + 60
result = svc.invite_user("alice@bstein.dev")
assert result.status == "rate_limited"
def test_vaultwarden_lookup_user_present(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
client.responses["/admin/users"] = DummyResponse(200, "", json_data=[{"email": "alice@bstein.dev"}])
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.find_user_by_email("alice@bstein.dev")
assert result.status == "present"
def test_vaultwarden_lookup_user_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
client.responses["/admin/users"] = DummyResponse(200, "", json_data=[{"email": "bob@bstein.dev"}])
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.find_user_by_email("alice@bstein.dev")
assert result.status == "missing"
def test_vaultwarden_invite_handles_admin_exception(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("boom"))),
)
svc = VaultwardenService()
monkeypatch.setattr(
svc,
"_admin_session",
lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("rate limited")),
)
result = svc.invite_user("alice@bstein.dev")
assert result.status == "rate_limited"
def test_vaultwarden_invite_handles_bad_body(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
class BadTextResponse:
def __init__(self, status_code=500):
self.status_code = status_code
def raise_for_status(self):
return None
@property
def text(self):
raise RuntimeError("boom")
class BadTextClient(DummyVaultwardenClient):
def post(self, path, json=None, data=None):
self.calls.append((path, json, data))
return BadTextResponse(500)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: BadTextClient())
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: "127.0.0.1"),
)
svc = VaultwardenService()
result = svc.invite_user("alice@bstein.dev")
assert result.status == "error"
def test_vaultwarden_invite_handles_fallback_skip(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr(
"ariadne.services.vaultwarden.VaultwardenService._find_pod_ip",
staticmethod(lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("boom"))),
)
svc = VaultwardenService()
monkeypatch.setattr(svc, "_admin_session", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("nope")))
result = svc.invite_user("alice@bstein.dev")
assert result.status == "error"
def test_vaultwarden_find_pod_ip(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.vaultwarden.get_json",
lambda *args, **kwargs: {
"items": [
{
"status": {
"phase": "Running",
"podIP": "10.0.0.1",
"conditions": [{"type": "Ready", "status": "True"}],
}
}
]
},
)
assert VaultwardenService._find_pod_ip("ns", "app=vaultwarden") == "10.0.0.1"
def test_vaultwarden_find_pod_ip_skips_missing_ip(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.vaultwarden.get_json",
lambda *args, **kwargs: {
"items": [
{"status": {"phase": "Running", "podIP": ""}},
{"status": {"phase": "Running", "podIP": "10.0.0.2", "conditions": []}},
]
},
)
assert VaultwardenService._find_pod_ip("ns", "app=vaultwarden") == "10.0.0.2"
def test_vaultwarden_find_pod_ip_conditions_default_ready(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.vaultwarden.get_json",
lambda *args, **kwargs: {
"items": [
{"status": {"phase": "Running", "podIP": "10.0.0.3", "conditions": ["bad"]}},
]
},
)
assert VaultwardenService._find_pod_ip("ns", "app=vaultwarden") == "10.0.0.3"
def test_vaultwarden_find_pod_ip_no_pods(monkeypatch) -> None:
monkeypatch.setattr("ariadne.services.vaultwarden.get_json", lambda *args, **kwargs: {"items": []})
with pytest.raises(RuntimeError):
VaultwardenService._find_pod_ip("ns", "app=vaultwarden")
def test_vaultwarden_find_pod_ip_missing_ip(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.vaultwarden.get_json",
lambda *args, **kwargs: {
"items": [
{"status": {"phase": "Pending", "conditions": ["bad"]}},
]
},
)
with pytest.raises(RuntimeError):
VaultwardenService._find_pod_ip("ns", "app=vaultwarden")
def test_vaultwarden_admin_session_rate_limit(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=1,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
client = DummyVaultwardenClient()
client.responses["/admin"] = DummyResponse(429, "")
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: client)
svc = VaultwardenService()
with pytest.raises(RuntimeError):
svc._admin_session("http://vaultwarden")
def test_vaultwarden_admin_session_reuses_client() -> None:
svc = VaultwardenService()
svc._admin_client = DummyVaultwardenClient()
svc._admin_session_expires_at = time.time() + 60
svc._admin_session_base_url = "http://vaultwarden"
client = svc._admin_session("http://vaultwarden")
assert client is svc._admin_client
def test_vaultwarden_admin_session_rate_limited_until() -> None:
svc = VaultwardenService()
svc._rate_limited_until = time.time() + 60
with pytest.raises(RuntimeError):
svc._admin_session("http://vaultwarden")
def test_vaultwarden_admin_session_closes_existing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
class CloseFail:
def close(self):
raise RuntimeError("boom")
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
monkeypatch.setattr("ariadne.services.vaultwarden.get_secret_value", lambda *args, **kwargs: "token")
monkeypatch.setattr("ariadne.services.vaultwarden.httpx.Client", lambda *args, **kwargs: DummyVaultwardenClient())
svc = VaultwardenService()
svc._admin_client = CloseFail()
svc._admin_session_expires_at = time.time() - 10
svc._admin_session_base_url = "http://old"
assert svc._admin_session("http://vaultwarden") is not None
def test_vaultwarden_invite_conflict_status_edges() -> None:
class BadTextResponse:
@property
def text(self):
raise RuntimeError("bad body")
assert VaultwardenService._invite_conflict_status(DummyResponse(409, "already invited")) == "invited"
assert VaultwardenService._invite_conflict_status(DummyResponse(409, "something else")) is None
assert VaultwardenService._invite_conflict_status(BadTextResponse()) is None
def test_vaultwarden_invite_via_direct_edges(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
svc = VaultwardenService()
assert svc._invite_via("", "alice@bstein.dev") is None
invited = DummyVaultwardenClient()
invited.responses["/admin/invite"] = DummyResponse(409, "already invited")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: invited)
assert svc._invite_via("http://vaultwarden", "alice@bstein.dev").detail == "user already invited"
unknown_conflict = DummyVaultwardenClient()
unknown_conflict.responses["/admin/invite"] = DummyResponse(400, "bad request")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: unknown_conflict)
assert svc._invite_via("http://vaultwarden", "alice@bstein.dev").detail == "status 400"
def test_vaultwarden_invite_user_skips_blank_candidate(monkeypatch) -> None:
svc = VaultwardenService()
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(svc, "_invite_via", lambda base_url, _email: None if not base_url else VaultwardenInvite(True, "invited"))
assert svc.invite_user("alice@bstein.dev").status == "invited"
def test_vaultwarden_lookup_via_direct_edges(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vaultwarden_namespace="vaultwarden",
vaultwarden_admin_secret_name="vaultwarden-admin",
vaultwarden_admin_secret_key="ADMIN_TOKEN",
vaultwarden_admin_rate_limit_backoff_sec=600,
vaultwarden_admin_session_ttl_sec=900,
vaultwarden_service_host="vaultwarden-service.vaultwarden.svc.cluster.local",
vaultwarden_pod_label="app=vaultwarden",
vaultwarden_pod_port=80,
)
monkeypatch.setattr("ariadne.services.vaultwarden.settings", dummy_settings)
svc = VaultwardenService()
assert svc._lookup_via("", "alice@bstein.dev") is None
rate_limited = DummyVaultwardenClient()
rate_limited.responses["/admin/users"] = DummyResponse(429, "rate limited")
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: rate_limited)
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").status == "rate_limited"
bad_payload = DummyVaultwardenClient()
bad_payload.responses["/admin/users"] = DummyResponse(200, "", json_data={"bad": "shape"})
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: bad_payload)
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").detail == "unexpected users response"
mixed_users = DummyVaultwardenClient()
mixed_users.responses["/admin/users"] = DummyResponse(200, "", json_data=["bad", {"email": "alice@bstein.dev"}])
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: mixed_users)
assert svc._lookup_via("http://vaultwarden", "ALICE@bstein.dev").status == "present"
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: (_ for _ in ()).throw(RuntimeError("rate limited")))
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").status == "rate_limited"
monkeypatch.setattr(svc, "_admin_session", lambda _base_url: (_ for _ in ()).throw(RuntimeError("boom")))
assert svc._lookup_via("http://vaultwarden", "alice@bstein.dev").detail == "boom"
def test_vaultwarden_find_user_by_email_edges() -> None:
svc = VaultwardenService()
assert svc.find_user_by_email("bad-email").status == "invalid_email"
svc._rate_limited_until = time.time() + 60
assert svc.find_user_by_email("alice@bstein.dev").status == "rate_limited"
def test_vaultwarden_find_user_fallbacks(monkeypatch) -> None:
svc = VaultwardenService()
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(
svc,
"_lookup_via",
lambda base_url, _email: None if not base_url else VaultwardenLookup(False, "rate_limited", "slow down"),
)
assert svc.find_user_by_email("alice@bstein.dev").status == "rate_limited"
monkeypatch.setattr(svc, "_candidate_urls", lambda: ["", "http://vaultwarden"])
monkeypatch.setattr(
svc,
"_lookup_via",
lambda base_url, _email: None if not base_url else VaultwardenLookup(False, "error", "lookup failed"),
)
result = svc.find_user_by_email("alice@bstein.dev")
assert result.status == "error"
assert result.detail == "lookup failed"