titan-iac/scripts/vaultwarden_cred_sync.py

99 lines
2.8 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import sys
from typing import Any, Iterable
import httpx
from atlas_portal import settings
from atlas_portal.keycloak import admin_client
from atlas_portal.vaultwarden import invite_user
def _iter_keycloak_users(page_size: int = 200) -> Iterable[dict[str, Any]]:
client = admin_client()
if not client.ready():
raise RuntimeError("keycloak admin client not configured")
url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users"
first = 0
while True:
headers = client.headers()
params = {"first": str(first), "max": str(page_size)}
with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as http:
resp = http.get(url, params=params, headers=headers)
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, list) or not payload:
return
for item in payload:
if isinstance(item, dict):
yield item
if len(payload) < page_size:
return
first += page_size
def _email_for_user(user: dict[str, Any]) -> str:
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
if email.strip():
return email.strip()
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
return ""
return f"{username}@{settings.MAILU_DOMAIN}"
def main() -> int:
processed = 0
created = 0
skipped = 0
failures = 0
for user in _iter_keycloak_users():
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
skipped += 1
continue
enabled = user.get("enabled")
if enabled is False:
skipped += 1
continue
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
skipped += 1
continue
email = _email_for_user(user)
if not email:
print(f"skip {username}: missing email", file=sys.stderr)
skipped += 1
continue
processed += 1
result = invite_user(email)
if result.ok:
created += 1
print(f"ok {username}: {result.status}")
else:
failures += 1
print(f"err {username}: {result.status} {result.detail}", file=sys.stderr)
print(
f"done processed={processed} created_or_present={created} skipped={skipped} failures={failures}",
file=sys.stderr,
)
return 0 if failures == 0 else 2
if __name__ == "__main__":
raise SystemExit(main())