#!/usr/bin/env python3 from __future__ import annotations import sys from typing import Any, Iterable import httpx from atlas_portal import settings from atlas_portal.keycloak import admin_client from atlas_portal.vaultwarden import invite_user def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]: client = admin_client() if not client.ready(): raise RuntimeError("keycloak admin client not configured") url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users" first = 0 while True: headers = client.headers() params = {"first": str(first), "max": str(page_size)} with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as http: resp = http.get(url, params=params, headers=headers) resp.raise_for_status() payload = resp.json() if not isinstance(payload, list) or not payload: return for item in payload: if isinstance(item, dict): yield item if len(payload) < page_size: return first += page_size def _email_for_user(user: dict[str, Any]) -> str: email = (user.get("email") if isinstance(user.get("email"), str) else "") or "" if email.strip(): return email.strip() username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" username = username.strip() if not username: return "" return f"{username}@{settings.MAILU_DOMAIN}" def main() -> int: processed = 0 created = 0 skipped = 0 failures = 0 for user in _iter_keycloak_users(): username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" username = username.strip() if not username: skipped += 1 continue enabled = user.get("enabled") if enabled is False: skipped += 1 continue if user.get("serviceAccountClientId") or username.startswith("service-account-"): skipped += 1 continue email = _email_for_user(user) if not email: print(f"skip {username}: missing email", file=sys.stderr) skipped += 1 continue processed += 1 result = invite_user(email) if result.ok: created += 1 print(f"ok {username}: {result.status}") else: failures += 1 print(f"err {username}: {result.status} {result.detail}", file=sys.stderr) print( f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}", file=sys.stderr, ) return 0 if failures == 0 else 2 if __name__ == "__main__": raise SystemExit(main())