ariadne/ariadne/services/keycloak_profile.py

93 lines
2.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from ..utils.logging import get_logger
from .keycloak_admin import keycloak_admin
PROFILE_ACTIONS = {"UPDATE_PROFILE", "UPDATE_EMAIL", "VERIFY_EMAIL"}
logger = get_logger(__name__)
@dataclass(frozen=True)
class ProfileSyncSummary:
processed: int
updated: int
skipped: int
failures: int
detail: str = ""
def _profile_complete(user: dict[str, Any]) -> bool:
email = user.get("email") if isinstance(user.get("email"), str) else ""
last_name = user.get("lastName") if isinstance(user.get("lastName"), str) else ""
email_verified = bool(user.get("emailVerified"))
return bool(email.strip() and last_name.strip() and email_verified)
def run_profile_sync() -> ProfileSyncSummary:
if not keycloak_admin.ready():
summary = ProfileSyncSummary(0, 0, 0, 1, detail="keycloak admin not configured")
logger.info(
"keycloak profile sync skipped",
extra={"event": "keycloak_profile_sync", "status": "error", "detail": summary.detail},
)
return summary
processed = 0
updated = 0
skipped = 0
failures = 0
users = keycloak_admin.iter_users(page_size=200, brief=False)
for user in users:
username = user.get("username") if isinstance(user.get("username"), str) else ""
if not username or user.get("enabled") is False:
skipped += 1
continue
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
skipped += 1
continue
required = user.get("requiredActions") if isinstance(user.get("requiredActions"), list) else []
if not required:
skipped += 1
continue
if not _profile_complete(user):
skipped += 1
continue
trimmed = [action for action in required if action not in PROFILE_ACTIONS]
if trimmed == required:
skipped += 1
continue
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
failures += 1
continue
processed += 1
try:
keycloak_admin.update_user_safe(user_id, {"requiredActions": trimmed})
updated += 1
except Exception:
failures += 1
summary = ProfileSyncSummary(processed, updated, skipped, failures)
logger.info(
"keycloak profile sync finished",
extra={
"event": "keycloak_profile_sync",
"status": "ok" if failures == 0 else "error",
"processed": processed,
"updated": updated,
"skipped": skipped,
"failures": failures,
},
)
return summary