refactor: reduce rotation check branches
This commit is contained in:
parent
e9b5f32ef4
commit
22dc4e8be4
@ -472,28 +472,48 @@ class FireflyService:
|
||||
|
||||
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
|
||||
username = (username or "").strip()
|
||||
status = "error"
|
||||
detail = ""
|
||||
rotated = None
|
||||
if not username:
|
||||
return {"status": "error", "detail": "missing username"}
|
||||
if not keycloak_admin.ready():
|
||||
return {"status": "error", "detail": "keycloak admin not configured"}
|
||||
detail = "missing username"
|
||||
elif not keycloak_admin.ready():
|
||||
detail = "keycloak admin not configured"
|
||||
else:
|
||||
user = keycloak_admin.find_user(username)
|
||||
if not isinstance(user, dict):
|
||||
detail = "user not found"
|
||||
else:
|
||||
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
|
||||
if not user_id:
|
||||
detail = "missing user id"
|
||||
else:
|
||||
full = keycloak_admin.get_user(user_id)
|
||||
prepared = _build_sync_input(full)
|
||||
if isinstance(prepared, UserSyncOutcome):
|
||||
if prepared.status == "skipped":
|
||||
status = "ok"
|
||||
rotated = False
|
||||
else:
|
||||
status = prepared.status
|
||||
detail = prepared.detail or ""
|
||||
else:
|
||||
outcome = self._rotation_outcome(prepared)
|
||||
if outcome.status == "synced":
|
||||
status = "ok"
|
||||
rotated = True
|
||||
elif outcome.status == "skipped":
|
||||
status = "ok"
|
||||
rotated = False
|
||||
else:
|
||||
detail = outcome.detail or "rotation check failed"
|
||||
|
||||
user = keycloak_admin.find_user(username)
|
||||
if not isinstance(user, dict):
|
||||
return {"status": "error", "detail": "user not found"}
|
||||
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
|
||||
if not user_id:
|
||||
return {"status": "error", "detail": "missing user id"}
|
||||
full = keycloak_admin.get_user(user_id)
|
||||
prepared = _build_sync_input(full)
|
||||
if isinstance(prepared, UserSyncOutcome):
|
||||
return {"status": prepared.status, "detail": prepared.detail or ""}
|
||||
|
||||
outcome = self._rotation_outcome(prepared)
|
||||
if outcome.status == "synced":
|
||||
return {"status": "ok", "rotated": True}
|
||||
if outcome.status == "skipped":
|
||||
return {"status": "ok", "rotated": False}
|
||||
return {"status": "error", "detail": outcome.detail or "rotation check failed"}
|
||||
result = {"status": status}
|
||||
if status == "ok":
|
||||
result["rotated"] = bool(rotated)
|
||||
elif detail:
|
||||
result["detail"] = detail
|
||||
return result
|
||||
|
||||
def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]:
|
||||
email = (email or "").strip()
|
||||
|
||||
@ -431,28 +431,48 @@ class WgerService:
|
||||
|
||||
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
|
||||
username = (username or "").strip()
|
||||
status = "error"
|
||||
detail = ""
|
||||
rotated = None
|
||||
if not username:
|
||||
return {"status": "error", "detail": "missing username"}
|
||||
if not keycloak_admin.ready():
|
||||
return {"status": "error", "detail": "keycloak admin not configured"}
|
||||
detail = "missing username"
|
||||
elif not keycloak_admin.ready():
|
||||
detail = "keycloak admin not configured"
|
||||
else:
|
||||
user = keycloak_admin.find_user(username)
|
||||
if not isinstance(user, dict):
|
||||
detail = "user not found"
|
||||
else:
|
||||
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
|
||||
if not user_id:
|
||||
detail = "missing user id"
|
||||
else:
|
||||
full = keycloak_admin.get_user(user_id)
|
||||
prepared = _build_sync_input(full)
|
||||
if isinstance(prepared, UserSyncOutcome):
|
||||
if prepared.status == "skipped":
|
||||
status = "ok"
|
||||
rotated = False
|
||||
else:
|
||||
status = prepared.status
|
||||
detail = prepared.detail or ""
|
||||
else:
|
||||
outcome = self._rotation_outcome(prepared)
|
||||
if outcome.status == "synced":
|
||||
status = "ok"
|
||||
rotated = True
|
||||
elif outcome.status == "skipped":
|
||||
status = "ok"
|
||||
rotated = False
|
||||
else:
|
||||
detail = outcome.detail or "rotation check failed"
|
||||
|
||||
user = keycloak_admin.find_user(username)
|
||||
if not isinstance(user, dict):
|
||||
return {"status": "error", "detail": "user not found"}
|
||||
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
|
||||
if not user_id:
|
||||
return {"status": "error", "detail": "missing user id"}
|
||||
full = keycloak_admin.get_user(user_id)
|
||||
prepared = _build_sync_input(full)
|
||||
if isinstance(prepared, UserSyncOutcome):
|
||||
return {"status": prepared.status, "detail": prepared.detail or ""}
|
||||
|
||||
outcome = self._rotation_outcome(prepared)
|
||||
if outcome.status == "synced":
|
||||
return {"status": "ok", "rotated": True}
|
||||
if outcome.status == "skipped":
|
||||
return {"status": "ok", "rotated": False}
|
||||
return {"status": "error", "detail": outcome.detail or "rotation check failed"}
|
||||
result = {"status": status}
|
||||
if status == "ok":
|
||||
result["rotated"] = bool(rotated)
|
||||
elif detail:
|
||||
result["detail"] = detail
|
||||
return result
|
||||
|
||||
def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
|
||||
username = (username or "").strip()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user