vaultwarden: reduce lookup returns

This commit is contained in:
Brad Stein 2026-01-24 06:18:35 -03:00
parent d21595aaac
commit dc0fb4eb22

View File

@ -112,29 +112,38 @@ class VaultwardenService:
def _lookup_via(self, base_url: str, email: str) -> VaultwardenLookup | None:
if not base_url:
return None
result: VaultwardenLookup | None = None
try:
session = self._admin_session(base_url)
resp = session.get("/admin/users")
if resp.status_code == HTTP_TOO_MANY_REQUESTS:
self._rate_limited_until = time.time() + float(settings.vaultwarden_admin_rate_limit_backoff_sec)
return VaultwardenLookup(ok=False, status="rate_limited", detail="vaultwarden rate limited")
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list):
return VaultwardenLookup(ok=False, status="error", detail="unexpected users response")
target = email.lower()
for entry in users:
if not isinstance(entry, dict):
continue
user_email = entry.get("email")
if isinstance(user_email, str) and user_email.lower() == target:
return VaultwardenLookup(ok=True, status="present", detail="user found")
return VaultwardenLookup(ok=True, status="missing", detail="user missing")
result = VaultwardenLookup(ok=False, status="rate_limited", detail="vaultwarden rate limited")
else:
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list):
result = VaultwardenLookup(ok=False, status="error", detail="unexpected users response")
else:
target = email.lower()
found = False
for entry in users:
if not isinstance(entry, dict):
continue
user_email = entry.get("email")
if isinstance(user_email, str) and user_email.lower() == target:
found = True
break
status = "present" if found else "missing"
detail = "user found" if found else "user missing"
result = VaultwardenLookup(ok=True, status=status, detail=detail)
except Exception as exc:
message = str(exc)
if "rate limited" in message.lower():
return VaultwardenLookup(ok=False, status="rate_limited", detail="vaultwarden rate limited")
return VaultwardenLookup(ok=False, status="error", detail=message)
result = VaultwardenLookup(ok=False, status="rate_limited", detail="vaultwarden rate limited")
else:
result = VaultwardenLookup(ok=False, status="error", detail=message)
return result or VaultwardenLookup(ok=False, status="error", detail="lookup failed")
def invite_user(self, email: str) -> VaultwardenInvite:
email = self._normalize_email(email)