From e4533f7c51db8af3d970876cccc79bc395ee05cf Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 23 Jan 2026 18:42:25 -0300 Subject: [PATCH] refactor: split rotation check helpers --- ariadne/services/firefly.py | 81 +++++++++++++++++-------------------- ariadne/services/wger.py | 81 +++++++++++++++++-------------------- 2 files changed, 76 insertions(+), 86 deletions(-) diff --git a/ariadne/services/firefly.py b/ariadne/services/firefly.py index 6ba4087..7e461a2 100644 --- a/ariadne/services/firefly.py +++ b/ariadne/services/firefly.py @@ -462,6 +462,30 @@ def _build_sync_input(user: dict[str, Any]) -> FireflySyncInput | UserSyncOutcom ) +def _rotation_result(status: str, detail: str = "", rotated: bool | None = None) -> dict[str, Any]: + result = {"status": status} + if status == "ok": + result["rotated"] = bool(rotated) + elif detail: + result["detail"] = detail + return result + + +def _rotation_check_input(username: str) -> tuple[FireflySyncInput | UserSyncOutcome | None, str]: + if not username: + return None, "missing username" + if not keycloak_admin.ready(): + return None, "keycloak admin not configured" + user = keycloak_admin.find_user(username) + if not isinstance(user, dict): + return None, "user not found" + user_id = user.get("id") if isinstance(user.get("id"), str) else "" + if not user_id: + return None, "missing user id" + full = keycloak_admin.get_user(user_id) + return _build_sync_input(full), "" + + class FireflyService: def __init__(self) -> None: self._executor = PodExecutor( @@ -471,49 +495,20 @@ class FireflyService: ) def check_rotation_for_user(self, username: str) -> dict[str, Any]: - username = (username or "").strip() - status = "error" - detail = "" - rotated = None - if not username: - detail = "missing username" - elif not keycloak_admin.ready(): - detail = "keycloak admin not configured" - else: - user = keycloak_admin.find_user(username) - if not isinstance(user, dict): - detail = "user not found" - else: - user_id = user.get("id") if isinstance(user.get("id"), str) else "" - if not user_id: - detail = "missing user id" - else: - full = keycloak_admin.get_user(user_id) - prepared = _build_sync_input(full) - if isinstance(prepared, UserSyncOutcome): - if prepared.status == "skipped": - status = "ok" - rotated = False - else: - status = prepared.status - detail = prepared.detail or "" - else: - outcome = self._rotation_outcome(prepared) - if outcome.status == "synced": - status = "ok" - rotated = True - elif outcome.status == "skipped": - status = "ok" - rotated = False - else: - detail = outcome.detail or "rotation check failed" - - result = {"status": status} - if status == "ok": - result["rotated"] = bool(rotated) - elif detail: - result["detail"] = detail - return result + cleaned = (username or "").strip() + prepared, error = _rotation_check_input(cleaned) + if error: + return _rotation_result("error", error) + if isinstance(prepared, UserSyncOutcome): + if prepared.status == "skipped": + return _rotation_result("ok", rotated=False) + return _rotation_result("error", prepared.detail or "") + outcome = self._rotation_outcome(prepared) + if outcome.status == "synced": + return _rotation_result("ok", rotated=True) + if outcome.status == "skipped": + return _rotation_result("ok", rotated=False) + return _rotation_result("error", outcome.detail or "rotation check failed") def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]: email = (email or "").strip() diff --git a/ariadne/services/wger.py b/ariadne/services/wger.py index e6138c2..ddff949 100644 --- a/ariadne/services/wger.py +++ b/ariadne/services/wger.py @@ -421,6 +421,30 @@ def _build_sync_input(user: dict[str, Any]) -> WgerSyncInput | UserSyncOutcome: ) +def _rotation_result(status: str, detail: str = "", rotated: bool | None = None) -> dict[str, Any]: + result = {"status": status} + if status == "ok": + result["rotated"] = bool(rotated) + elif detail: + result["detail"] = detail + return result + + +def _rotation_check_input(username: str) -> tuple[WgerSyncInput | UserSyncOutcome | None, str]: + if not username: + return None, "missing username" + if not keycloak_admin.ready(): + return None, "keycloak admin not configured" + user = keycloak_admin.find_user(username) + if not isinstance(user, dict): + return None, "user not found" + user_id = user.get("id") if isinstance(user.get("id"), str) else "" + if not user_id: + return None, "missing user id" + full = keycloak_admin.get_user(user_id) + return _build_sync_input(full), "" + + class WgerService: def __init__(self) -> None: self._executor = PodExecutor( @@ -430,49 +454,20 @@ class WgerService: ) def check_rotation_for_user(self, username: str) -> dict[str, Any]: - username = (username or "").strip() - status = "error" - detail = "" - rotated = None - if not username: - detail = "missing username" - elif not keycloak_admin.ready(): - detail = "keycloak admin not configured" - else: - user = keycloak_admin.find_user(username) - if not isinstance(user, dict): - detail = "user not found" - else: - user_id = user.get("id") if isinstance(user.get("id"), str) else "" - if not user_id: - detail = "missing user id" - else: - full = keycloak_admin.get_user(user_id) - prepared = _build_sync_input(full) - if isinstance(prepared, UserSyncOutcome): - if prepared.status == "skipped": - status = "ok" - rotated = False - else: - status = prepared.status - detail = prepared.detail or "" - else: - outcome = self._rotation_outcome(prepared) - if outcome.status == "synced": - status = "ok" - rotated = True - elif outcome.status == "skipped": - status = "ok" - rotated = False - else: - detail = outcome.detail or "rotation check failed" - - result = {"status": status} - if status == "ok": - result["rotated"] = bool(rotated) - elif detail: - result["detail"] = detail - return result + cleaned = (username or "").strip() + prepared, error = _rotation_check_input(cleaned) + if error: + return _rotation_result("error", error) + if isinstance(prepared, UserSyncOutcome): + if prepared.status == "skipped": + return _rotation_result("ok", rotated=False) + return _rotation_result("error", prepared.detail or "") + outcome = self._rotation_outcome(prepared) + if outcome.status == "synced": + return _rotation_result("ok", rotated=True) + if outcome.status == "skipped": + return _rotation_result("ok", rotated=False) + return _rotation_result("error", outcome.detail or "rotation check failed") def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]: username = (username or "").strip()