titan-iac/scripts/vaultwarden_cred_sync.py

184 lines
6.0 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import sys
import time
from typing import Any, Iterable
import httpx
from atlas_portal import settings
from atlas_portal.keycloak import admin_client
from atlas_portal.vaultwarden import invite_user
VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email"
VAULTWARDEN_STATUS_ATTR = "vaultwarden_status"
VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at"
def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]:
client = admin_client()
if not client.ready():
raise RuntimeError("keycloak admin client not configured")
url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users"
first = 0
while True:
headers = client.headers()
# We need attributes for idempotency (vaultwarden_status/vaultwarden_email). Keycloak defaults to a
# brief representation which may omit these.
params = {"first": str(first), "max": str(page_size), "briefRepresentation": "false"}
with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as http:
resp = http.get(url, params=params, headers=headers)
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, list) or not payload:
return
for item in payload:
if isinstance(item, dict):
yield item
if len(payload) < page_size:
return
first += page_size
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _vaultwarden_email_for_user(user: dict[str, Any]) -> str:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
return ""
attrs = user.get("attributes")
vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR)
if vaultwarden_email:
return vaultwarden_email
mailu_email = _extract_attr(attrs, "mailu_email")
if mailu_email:
return mailu_email
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
email = email.strip()
if email and email.lower().endswith(f"@{settings.MAILU_DOMAIN.lower()}"):
return email
# Don't guess an internal mailbox address until Mailu sync has run and stored mailu_email.
# This avoids spamming Vaultwarden invites that can never be delivered (unknown recipient).
return ""
def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
existing = _extract_attr(user.get("attributes"), key)
if existing:
return
admin_client().set_user_attribute(username, key, value)
def _set_user_attribute(username: str, key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
admin_client().set_user_attribute(username, key, value)
def main() -> int:
processed = 0
created = 0
skipped = 0
failures = 0
for user in _iter_keycloak_users():
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
skipped += 1
continue
enabled = user.get("enabled")
if enabled is False:
skipped += 1
continue
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
skipped += 1
continue
# Fetch the full user payload so we can reliably read attributes (and skip re-invites).
user_id = (user.get("id") if isinstance(user.get("id"), str) else "") or ""
user_id = user_id.strip()
full_user = user
if user_id:
try:
full_user = admin_client().get_user(user_id)
except Exception:
full_user = user
current_status = _extract_attr(full_user.get("attributes"), VAULTWARDEN_STATUS_ATTR)
email = _vaultwarden_email_for_user(full_user)
if not email:
print(f"skip {username}: missing email", file=sys.stderr)
skipped += 1
continue
try:
_set_user_attribute_if_missing(username, full_user, VAULTWARDEN_EMAIL_ATTR, email)
except Exception:
pass
# If we've already successfully invited or confirmed presence, do not re-invite on every cron run.
# Vaultwarden returns 409 for "already exists", which is idempotent but noisy and can trigger rate limits.
if current_status in {"invited", "already_present"}:
skipped += 1
continue
processed += 1
result = invite_user(email)
if result.ok:
created += 1
print(f"ok {username}: {result.status}")
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
except Exception:
pass
else:
failures += 1
print(f"err {username}: {result.status} {result.detail}", file=sys.stderr)
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))
except Exception:
pass
print(
f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}",
file=sys.stderr,
)
return 0 if failures == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())