From 7c1c7ec9045e9ff2cbc8cf089c9ae47204b9337d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 22 Jan 2026 02:55:11 -0300 Subject: [PATCH] ariadne: refactor checks for ruff limits --- ariadne/services/firefly.py | 58 ++++++++++++++++------------ ariadne/services/vaultwarden_sync.py | 56 ++++++++++++++------------- ariadne/services/wger.py | 38 ++++++++++-------- 3 files changed, 85 insertions(+), 67 deletions(-) diff --git a/ariadne/services/firefly.py b/ariadne/services/firefly.py index 0a57709..047729d 100644 --- a/ariadne/services/firefly.py +++ b/ariadne/services/firefly.py @@ -17,6 +17,9 @@ from .mailu import mailu HTTP_OK = 200 +EXIT_PASSWORD_MATCH = 0 +EXIT_PASSWORD_MISMATCH = 1 +EXIT_USER_MISSING = 3 FIREFLY_PASSWORD_ATTR = "firefly_password" FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" FIREFLY_PASSWORD_ROTATED_ATTR = "firefly_password_rotated_at" @@ -519,14 +522,29 @@ class FireflyService: return {"status": "error", "detail": str(exc)} detail = (result.stdout or result.stderr).strip() - if result.exit_code == 0: + if result.exit_code == EXIT_PASSWORD_MATCH: return {"status": "match", "detail": detail} - if result.exit_code == 1: + if result.exit_code == EXIT_PASSWORD_MISMATCH: return {"status": "mismatch", "detail": detail} - if result.exit_code == 3: + if result.exit_code == EXIT_USER_MISSING: return {"status": "missing", "detail": detail or "user missing"} return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"} + def _rotation_outcome(self, prepared: FireflySyncInput) -> UserSyncOutcome: + if prepared.rotated_at: + return UserSyncOutcome("skipped") + check = self.check_password(prepared.mailu_email, prepared.password) + status = check.get("status") if isinstance(check, dict) else "error" + if status == "match": + return UserSyncOutcome("skipped") + if status == "mismatch": + if not _set_firefly_rotated_at(prepared.username): + return UserSyncOutcome("failed", "failed to set rotated_at") + return UserSyncOutcome("synced") + detail = check.get("detail") if isinstance(check, dict) else "" + detail = detail or "password check failed" + return UserSyncOutcome("failed", detail) + def run_cron(self) -> dict[str, Any]: if not settings.firefly_cron_token: raise RuntimeError("firefly cron token missing") @@ -544,29 +562,19 @@ class FireflyService: prepared = _build_sync_input(user) if isinstance(prepared, UserSyncOutcome): return prepared + outcome = None if _should_skip_sync(prepared.password_generated, prepared.updated_at): - if prepared.rotated_at: - return UserSyncOutcome("skipped") - check = self.check_password(prepared.mailu_email, prepared.password) - status = check.get("status") if isinstance(check, dict) else "error" - if status == "match": - return UserSyncOutcome("skipped") - if status == "mismatch": - if not _set_firefly_rotated_at(prepared.username): - return UserSyncOutcome("failed", "failed to set rotated_at") - return UserSyncOutcome("synced") - detail = check.get("detail") if isinstance(check, dict) else "" - detail = detail or "password check failed" - return UserSyncOutcome("failed", detail) - - result = self.sync_user(prepared.mailu_email, prepared.password, wait=True) - result_status = result.get("status") if isinstance(result, dict) else "error" - if result_status != "ok": - return UserSyncOutcome("failed", f"sync {result_status}") - if not _set_firefly_updated_at(prepared.username): - return UserSyncOutcome("failed", "failed to set updated_at") - - return UserSyncOutcome("synced") + outcome = self._rotation_outcome(prepared) + if outcome is None: + result = self.sync_user(prepared.mailu_email, prepared.password, wait=True) + result_status = result.get("status") if isinstance(result, dict) else "error" + if result_status != "ok": + outcome = UserSyncOutcome("failed", f"sync {result_status}") + elif not _set_firefly_updated_at(prepared.username): + outcome = UserSyncOutcome("failed", "failed to set updated_at") + else: + outcome = UserSyncOutcome("synced") + return outcome def sync_users(self) -> dict[str, Any]: if not keycloak_admin.ready(): diff --git a/ariadne/services/vaultwarden_sync.py b/ariadne/services/vaultwarden_sync.py index 2cacc8f..5e9fb68 100644 --- a/ariadne/services/vaultwarden_sync.py +++ b/ariadne/services/vaultwarden_sync.py @@ -49,6 +49,16 @@ class VaultwardenSyncCounters: return "ok" if self.failures == 0 else "error" +@dataclass +class VaultwardenInviteState: + username: str + status: str + synced_at: str + synced_ts: float | None + full_user: dict[str, Any] + counters: VaultwardenSyncCounters + + def _extract_attr(attrs: Any, key: str) -> str: if not isinstance(attrs, dict): return "" @@ -216,26 +226,19 @@ def _should_refresh_invite(synced_ts: float | None) -> bool: return (time.time() - synced_ts) >= settings.vaultwarden_invite_refresh_sec -def _handle_existing_invite( - username: str, - current_status: str, - current_synced_at: str, - current_synced_ts: float | None, - full_user: dict[str, Any], - counters: VaultwardenSyncCounters, -) -> bool: - if current_status not in {"invited", "already_present"}: +def _handle_existing_invite(state: VaultwardenInviteState) -> bool: + if state.status not in {"invited", "already_present"}: return False - if current_status == "already_present": - if not current_synced_at: - _set_sync_status(username, current_status) - _set_master_password_set(username, full_user) - counters.skipped += 1 + if state.status == "already_present": + if not state.synced_at: + _set_sync_status(state.username, state.status) + _set_master_password_set(state.username, state.full_user) + state.counters.skipped += 1 return True - if not _should_refresh_invite(current_synced_ts): - if not current_synced_at: - _set_sync_status(username, current_status) - counters.skipped += 1 + if not _should_refresh_invite(state.synced_ts): + if not state.synced_at: + _set_sync_status(state.username, state.status) + state.counters.skipped += 1 return True return False @@ -262,14 +265,15 @@ def _sync_user( counters.skipped += 1 else: _ensure_email_attrs(username, full_user, email) - if _handle_existing_invite( - username, - current_status, - current_synced_at, - current_synced_ts, - full_user, - counters, - ): + state = VaultwardenInviteState( + username=username, + status=current_status, + synced_at=current_synced_at, + synced_ts=current_synced_ts, + full_user=full_user, + counters=counters, + ) + if _handle_existing_invite(state): status = None else: counters.processed += 1 diff --git a/ariadne/services/wger.py b/ariadne/services/wger.py index fc79778..220eaec 100644 --- a/ariadne/services/wger.py +++ b/ariadne/services/wger.py @@ -14,6 +14,9 @@ from .keycloak_admin import keycloak_admin from .mailu import mailu +EXIT_PASSWORD_MATCH = 0 +EXIT_PASSWORD_MISMATCH = 1 +EXIT_USER_MISSING = 3 WGER_PASSWORD_ATTR = "wger_password" WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" WGER_PASSWORD_ROTATED_ATTR = "wger_password_rotated_at" @@ -479,14 +482,29 @@ class WgerService: return {"status": "error", "detail": str(exc)} detail = (result.stdout or result.stderr).strip() - if result.exit_code == 0: + if result.exit_code == EXIT_PASSWORD_MATCH: return {"status": "match", "detail": detail} - if result.exit_code == 1: + if result.exit_code == EXIT_PASSWORD_MISMATCH: return {"status": "mismatch", "detail": detail} - if result.exit_code == 3: + if result.exit_code == EXIT_USER_MISSING: return {"status": "missing", "detail": detail or "user missing"} return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"} + def _rotation_outcome(self, prepared: WgerSyncInput) -> UserSyncOutcome: + if prepared.rotated_at: + return UserSyncOutcome("skipped") + check = self.check_password(prepared.username, prepared.password) + status = check.get("status") if isinstance(check, dict) else "error" + if status == "match": + return UserSyncOutcome("skipped") + if status == "mismatch": + if not _set_wger_rotated_at(prepared.username): + return UserSyncOutcome("failed", "failed to set rotated_at") + return UserSyncOutcome("synced") + detail = check.get("detail") if isinstance(check, dict) else "" + detail = detail or "password check failed" + return UserSyncOutcome("failed", detail) + def ensure_admin(self, wait: bool = False) -> dict[str, Any]: if not settings.wger_namespace: raise RuntimeError("wger admin sync not configured") @@ -516,19 +534,7 @@ class WgerService: if isinstance(prepared, UserSyncOutcome): return prepared if _should_skip_sync(prepared.password_generated, prepared.updated_at): - if prepared.rotated_at: - return UserSyncOutcome("skipped") - check = self.check_password(prepared.username, prepared.password) - status = check.get("status") if isinstance(check, dict) else "error" - if status == "match": - return UserSyncOutcome("skipped") - if status == "mismatch": - if not _set_wger_rotated_at(prepared.username): - return UserSyncOutcome("failed", "failed to set rotated_at") - return UserSyncOutcome("synced") - detail = check.get("detail") if isinstance(check, dict) else "" - detail = detail or "password check failed" - return UserSyncOutcome("failed", detail) + return self._rotation_outcome(prepared) result = self.sync_user(prepared.username, prepared.mailu_email, prepared.password, wait=True) result_status = result.get("status") if isinstance(result, dict) else "error"