#!/usr/bin/env python3 from __future__ import annotations import os import sys import time from datetime import datetime, timezone from typing import Any, Iterable import httpx from atlas_portal import settings from atlas_portal.keycloak import admin_client from atlas_portal.vaultwarden import invite_user VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email" VAULTWARDEN_STATUS_ATTR = "vaultwarden_status" VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at" VAULTWARDEN_RETRY_COOLDOWN_SEC = int(os.getenv("VAULTWARDEN_RETRY_COOLDOWN_SEC", "1800")) VAULTWARDEN_FAILURE_BAILOUT = int(os.getenv("VAULTWARDEN_FAILURE_BAILOUT", "2")) def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]: client = admin_client() if not client.ready(): raise RuntimeError("keycloak admin client not configured") url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users" first = 0 while True: headers = _headers_with_retry(client) # We need attributes for idempotency (vaultwarden_status/vaultwarden_email). Keycloak defaults to a # brief representation which may omit these. params = {"first": str(first), "max": str(page_size), "briefRepresentation": "false"} payload = None for attempt in range(1, 6): try: with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as http: resp = http.get(url, params=params, headers=headers) resp.raise_for_status() payload = resp.json() break except httpx.HTTPError as exc: if attempt == 5: raise time.sleep(attempt * 2) if not isinstance(payload, list) or not payload: return for item in payload: if isinstance(item, dict): yield item if len(payload) < page_size: return first += page_size def _headers_with_retry(client, attempts: int = 6) -> dict[str, str]: last_exc: Exception | None = None for attempt in range(1, attempts + 1): try: return client.headers() except Exception as exc: last_exc = exc time.sleep(attempt * 2) if last_exc: raise last_exc raise RuntimeError("failed to fetch keycloak headers") def _extract_attr(attrs: Any, key: str) -> str: if not isinstance(attrs, dict): return "" raw = attrs.get(key) if isinstance(raw, list): for item in raw: if isinstance(item, str) and item.strip(): return item.strip() return "" if isinstance(raw, str) and raw.strip(): return raw.strip() return "" def _parse_synced_at(value: str) -> float | None: value = (value or "").strip() if not value: return None for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"): try: parsed = datetime.strptime(value, fmt) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.timestamp() except ValueError: continue return None def _vaultwarden_email_for_user(user: dict[str, Any]) -> str: username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" username = username.strip() if not username: return "" attrs = user.get("attributes") vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR) if vaultwarden_email: return vaultwarden_email mailu_email = _extract_attr(attrs, "mailu_email") if mailu_email: return mailu_email email = (user.get("email") if isinstance(user.get("email"), str) else "") or "" email = email.strip() if email and email.lower().endswith(f"@{settings.MAILU_DOMAIN.lower()}"): return email # Don't guess an internal mailbox address until Mailu sync has run and stored mailu_email. # This avoids spamming Vaultwarden invites that can never be delivered (unknown recipient). return "" def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None: value = (value or "").strip() if not value: return existing = _extract_attr(user.get("attributes"), key) if existing: return admin_client().set_user_attribute(username, key, value) def _set_user_attribute(username: str, key: str, value: str) -> None: value = (value or "").strip() if not value: return admin_client().set_user_attribute(username, key, value) def main() -> int: processed = 0 created = 0 skipped = 0 failures = 0 consecutive_failures = 0 for user in _iter_keycloak_users(): username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" username = username.strip() if not username: skipped += 1 continue enabled = user.get("enabled") if enabled is False: skipped += 1 continue if user.get("serviceAccountClientId") or username.startswith("service-account-"): skipped += 1 continue # Fetch the full user payload so we can reliably read attributes (and skip re-invites). user_id = (user.get("id") if isinstance(user.get("id"), str) else "") or "" user_id = user_id.strip() full_user = user if user_id: try: full_user = admin_client().get_user(user_id) except Exception: full_user = user current_status = _extract_attr(full_user.get("attributes"), VAULTWARDEN_STATUS_ATTR) current_synced_at = _extract_attr(full_user.get("attributes"), VAULTWARDEN_SYNCED_AT_ATTR) current_synced_ts = _parse_synced_at(current_synced_at) if current_status in {"rate_limited", "error"} and current_synced_ts: if time.time() - current_synced_ts < VAULTWARDEN_RETRY_COOLDOWN_SEC: skipped += 1 continue email = _vaultwarden_email_for_user(full_user) if not email: print(f"skip {username}: missing email", file=sys.stderr) skipped += 1 continue try: _set_user_attribute_if_missing(username, full_user, VAULTWARDEN_EMAIL_ATTR, email) except Exception: pass # If we've already successfully invited or confirmed presence, do not re-invite on every cron run. # Vaultwarden returns 409 for "already exists", which is idempotent but noisy and can trigger rate limits. if current_status in {"invited", "already_present"}: if not current_synced_at: try: _set_user_attribute( username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), ) except Exception: pass skipped += 1 continue processed += 1 result = invite_user(email) if result.ok: created += 1 consecutive_failures = 0 print(f"ok {username}: {result.status}") try: _set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status) _set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) except Exception: pass else: failures += 1 if result.status in {"rate_limited", "error"}: consecutive_failures += 1 print(f"err {username}: {result.status} {result.detail}", file=sys.stderr) try: _set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status) _set_user_attribute(username, VAULTWARDEN_SYNCED_AT_ATTR, time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) except Exception: pass if consecutive_failures >= VAULTWARDEN_FAILURE_BAILOUT: print("vaultwarden: too many consecutive failures; aborting run", file=sys.stderr) break print( f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}", file=sys.stderr, ) return 0 if failures == 0 else 2 if __name__ == "__main__": raise SystemExit(main())