99 lines
2.8 KiB
Python
99 lines
2.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from typing import Any, Iterable
|
|
|
|
import httpx
|
|
|
|
from atlas_portal import settings
|
|
from atlas_portal.keycloak import admin_client
|
|
from atlas_portal.vaultwarden import invite_user
|
|
|
|
|
|
def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]:
|
|
client = admin_client()
|
|
if not client.ready():
|
|
raise RuntimeError("keycloak admin client not configured")
|
|
|
|
url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users"
|
|
first = 0
|
|
while True:
|
|
headers = client.headers()
|
|
params = {"first": str(first), "max": str(page_size)}
|
|
with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as http:
|
|
resp = http.get(url, params=params, headers=headers)
|
|
resp.raise_for_status()
|
|
payload = resp.json()
|
|
|
|
if not isinstance(payload, list) or not payload:
|
|
return
|
|
|
|
for item in payload:
|
|
if isinstance(item, dict):
|
|
yield item
|
|
|
|
if len(payload) < page_size:
|
|
return
|
|
first += page_size
|
|
|
|
|
|
def _email_for_user(user: dict[str, Any]) -> str:
|
|
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
|
|
if email.strip():
|
|
return email.strip()
|
|
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
|
|
username = username.strip()
|
|
if not username:
|
|
return ""
|
|
return f"{username}@{settings.MAILU_DOMAIN}"
|
|
|
|
|
|
def main() -> int:
|
|
processed = 0
|
|
created = 0
|
|
skipped = 0
|
|
failures = 0
|
|
|
|
for user in _iter_keycloak_users():
|
|
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
|
|
username = username.strip()
|
|
if not username:
|
|
skipped += 1
|
|
continue
|
|
|
|
enabled = user.get("enabled")
|
|
if enabled is False:
|
|
skipped += 1
|
|
continue
|
|
|
|
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
|
|
skipped += 1
|
|
continue
|
|
|
|
email = _email_for_user(user)
|
|
if not email:
|
|
print(f"skip {username}: missing email", file=sys.stderr)
|
|
skipped += 1
|
|
continue
|
|
|
|
processed += 1
|
|
result = invite_user(email)
|
|
if result.ok:
|
|
created += 1
|
|
print(f"ok {username}: {result.status}")
|
|
else:
|
|
failures += 1
|
|
print(f"err {username}: {result.status} {result.detail}", file=sys.stderr)
|
|
|
|
print(
|
|
f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}",
|
|
file=sys.stderr,
|
|
)
|
|
return 0 if failures == 0 else 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|