vaultwarden: disable signups and sync invites

This commit is contained in:
Brad Stein 2026-01-03 16:55:02 -03:00
parent 70980a2ca9
commit c386ff7c7a
2 changed files with 71 additions and 5 deletions

View File

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import sys import sys
import time
from typing import Any, Iterable from typing import Any, Iterable
import httpx import httpx
@ -12,6 +13,11 @@ from atlas_portal.keycloak import admin_client
from atlas_portal.vaultwarden import invite_user from atlas_portal.vaultwarden import invite_user
VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email"
VAULTWARDEN_STATUS_ATTR = "vaultwarden_status"
VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at"
def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]: def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]:
client = admin_client() client = admin_client()
if not client.ready(): if not client.ready():
@ -39,17 +45,60 @@ def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]:
first += page_size first += page_size
def _email_for_user(user: dict[str, Any]) -> str: def _extract_attr(attrs: Any, key: str) -> str:
email = (user.get("email") if isinstance(user.get("email"), str) else "") or "" if not isinstance(attrs, dict):
if email.strip(): return ""
return email.strip() raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _vaultwarden_email_for_user(user: dict[str, Any]) -> str:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip() username = username.strip()
if not username: if not username:
return "" return ""
attrs = user.get("attributes")
vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR)
if vaultwarden_email:
return vaultwarden_email
mailu_email = _extract_attr(attrs, "mailu_email")
if mailu_email:
return mailu_email
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
email = email.strip()
if email and email.lower().endswith(f"@{settings.MAILU_DOMAIN.lower()}"):
return email
return f"{username}@{settings.MAILU_DOMAIN}" return f"{username}@{settings.MAILU_DOMAIN}"
def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
existing = _extract_attr(user.get("attributes"), key)
if existing:
return
admin_client().set_user_attribute(username, key, value)
def _set_user_attribute(username: str, key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
admin_client().set_user_attribute(username, key, value)
def main() -> int: def main() -> int:
processed = 0 processed = 0
created = 0 created = 0
@ -72,20 +121,35 @@ def main() -> int:
skipped += 1 skipped += 1
continue continue
email = _email_for_user(user) email = _vaultwarden_email_for_user(user)
if not email: if not email:
print(f"skip {username}: missing email", file=sys.stderr) print(f"skip {username}: missing email", file=sys.stderr)
skipped += 1 skipped += 1
continue continue
try:
_set_user_attribute_if_missing(username, user, VAULTWARDEN_EMAIL_ATTR, email)
except Exception:
pass
processed += 1 processed += 1
result = invite_user(email) result = invite_user(email)
if result.ok: if result.ok:
created += 1 created += 1
print(f"ok {username}: {result.status}") print(f"ok {username}: {result.status}")
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
except Exception:
pass
else: else:
failures += 1 failures += 1
print(f"err {username}: {result.status} {result.detail}", file=sys.stderr) print(f"err {username}: {result.status} {result.detail}", file=sys.stderr)
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
except Exception:
pass
print( print(
f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}", f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}",

View File

@ -19,6 +19,8 @@ spec:
image: vaultwarden/server:1.33.2 image: vaultwarden/server:1.33.2
env: env:
- name: SIGNUPS_ALLOWED - name: SIGNUPS_ALLOWED
value: "false"
- name: INVITATIONS_ALLOWED
value: "true" value: "true"
- name: DATABASE_URL - name: DATABASE_URL
valueFrom: valueFrom: