refactor: split rotation check helpers

This commit is contained in:
Brad Stein 2026-01-23 18:42:25 -03:00
parent 22dc4e8be4
commit e4533f7c51
2 changed files with 76 additions and 86 deletions

View File

@ -462,6 +462,30 @@ def _build_sync_input(user: dict[str, Any]) -> FireflySyncInput | UserSyncOutcom
)
def _rotation_result(status: str, detail: str = "", rotated: bool | None = None) -> dict[str, Any]:
result = {"status": status}
if status == "ok":
result["rotated"] = bool(rotated)
elif detail:
result["detail"] = detail
return result
def _rotation_check_input(username: str) -> tuple[FireflySyncInput | UserSyncOutcome | None, str]:
if not username:
return None, "missing username"
if not keycloak_admin.ready():
return None, "keycloak admin not configured"
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
return None, "user not found"
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
return None, "missing user id"
full = keycloak_admin.get_user(user_id)
return _build_sync_input(full), ""
class FireflyService:
def __init__(self) -> None:
self._executor = PodExecutor(
@ -471,49 +495,20 @@ class FireflyService:
)
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
username = (username or "").strip()
status = "error"
detail = ""
rotated = None
if not username:
detail = "missing username"
elif not keycloak_admin.ready():
detail = "keycloak admin not configured"
else:
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
detail = "user not found"
else:
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
detail = "missing user id"
else:
full = keycloak_admin.get_user(user_id)
prepared = _build_sync_input(full)
if isinstance(prepared, UserSyncOutcome):
if prepared.status == "skipped":
status = "ok"
rotated = False
else:
status = prepared.status
detail = prepared.detail or ""
else:
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
status = "ok"
rotated = True
elif outcome.status == "skipped":
status = "ok"
rotated = False
else:
detail = outcome.detail or "rotation check failed"
result = {"status": status}
if status == "ok":
result["rotated"] = bool(rotated)
elif detail:
result["detail"] = detail
return result
cleaned = (username or "").strip()
prepared, error = _rotation_check_input(cleaned)
if error:
return _rotation_result("error", error)
if isinstance(prepared, UserSyncOutcome):
if prepared.status == "skipped":
return _rotation_result("ok", rotated=False)
return _rotation_result("error", prepared.detail or "")
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
return _rotation_result("ok", rotated=True)
if outcome.status == "skipped":
return _rotation_result("ok", rotated=False)
return _rotation_result("error", outcome.detail or "rotation check failed")
def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]:
email = (email or "").strip()

View File

@ -421,6 +421,30 @@ def _build_sync_input(user: dict[str, Any]) -> WgerSyncInput | UserSyncOutcome:
)
def _rotation_result(status: str, detail: str = "", rotated: bool | None = None) -> dict[str, Any]:
result = {"status": status}
if status == "ok":
result["rotated"] = bool(rotated)
elif detail:
result["detail"] = detail
return result
def _rotation_check_input(username: str) -> tuple[WgerSyncInput | UserSyncOutcome | None, str]:
if not username:
return None, "missing username"
if not keycloak_admin.ready():
return None, "keycloak admin not configured"
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
return None, "user not found"
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
return None, "missing user id"
full = keycloak_admin.get_user(user_id)
return _build_sync_input(full), ""
class WgerService:
def __init__(self) -> None:
self._executor = PodExecutor(
@ -430,49 +454,20 @@ class WgerService:
)
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
username = (username or "").strip()
status = "error"
detail = ""
rotated = None
if not username:
detail = "missing username"
elif not keycloak_admin.ready():
detail = "keycloak admin not configured"
else:
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
detail = "user not found"
else:
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
detail = "missing user id"
else:
full = keycloak_admin.get_user(user_id)
prepared = _build_sync_input(full)
if isinstance(prepared, UserSyncOutcome):
if prepared.status == "skipped":
status = "ok"
rotated = False
else:
status = prepared.status
detail = prepared.detail or ""
else:
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
status = "ok"
rotated = True
elif outcome.status == "skipped":
status = "ok"
rotated = False
else:
detail = outcome.detail or "rotation check failed"
result = {"status": status}
if status == "ok":
result["rotated"] = bool(rotated)
elif detail:
result["detail"] = detail
return result
cleaned = (username or "").strip()
prepared, error = _rotation_check_input(cleaned)
if error:
return _rotation_result("error", error)
if isinstance(prepared, UserSyncOutcome):
if prepared.status == "skipped":
return _rotation_result("ok", rotated=False)
return _rotation_result("error", prepared.detail or "")
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
return _rotation_result("ok", rotated=True)
if outcome.status == "skipped":
return _rotation_result("ok", rotated=False)
return _rotation_result("error", outcome.detail or "rotation check failed")
def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
username = (username or "").strip()