ariadne: refactor checks for ruff limits

This commit is contained in:
Brad Stein 2026-01-22 02:55:11 -03:00
parent b45911c50e
commit 7c1c7ec904
3 changed files with 85 additions and 67 deletions

View File

@ -17,6 +17,9 @@ from .mailu import mailu
HTTP_OK = 200 HTTP_OK = 200
EXIT_PASSWORD_MATCH = 0
EXIT_PASSWORD_MISMATCH = 1
EXIT_USER_MISSING = 3
FIREFLY_PASSWORD_ATTR = "firefly_password" FIREFLY_PASSWORD_ATTR = "firefly_password"
FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at"
FIREFLY_PASSWORD_ROTATED_ATTR = "firefly_password_rotated_at" FIREFLY_PASSWORD_ROTATED_ATTR = "firefly_password_rotated_at"
@ -519,14 +522,29 @@ class FireflyService:
return {"status": "error", "detail": str(exc)} return {"status": "error", "detail": str(exc)}
detail = (result.stdout or result.stderr).strip() detail = (result.stdout or result.stderr).strip()
if result.exit_code == 0: if result.exit_code == EXIT_PASSWORD_MATCH:
return {"status": "match", "detail": detail} return {"status": "match", "detail": detail}
if result.exit_code == 1: if result.exit_code == EXIT_PASSWORD_MISMATCH:
return {"status": "mismatch", "detail": detail} return {"status": "mismatch", "detail": detail}
if result.exit_code == 3: if result.exit_code == EXIT_USER_MISSING:
return {"status": "missing", "detail": detail or "user missing"} return {"status": "missing", "detail": detail or "user missing"}
return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"} return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"}
def _rotation_outcome(self, prepared: FireflySyncInput) -> UserSyncOutcome:
if prepared.rotated_at:
return UserSyncOutcome("skipped")
check = self.check_password(prepared.mailu_email, prepared.password)
status = check.get("status") if isinstance(check, dict) else "error"
if status == "match":
return UserSyncOutcome("skipped")
if status == "mismatch":
if not _set_firefly_rotated_at(prepared.username):
return UserSyncOutcome("failed", "failed to set rotated_at")
return UserSyncOutcome("synced")
detail = check.get("detail") if isinstance(check, dict) else ""
detail = detail or "password check failed"
return UserSyncOutcome("failed", detail)
def run_cron(self) -> dict[str, Any]: def run_cron(self) -> dict[str, Any]:
if not settings.firefly_cron_token: if not settings.firefly_cron_token:
raise RuntimeError("firefly cron token missing") raise RuntimeError("firefly cron token missing")
@ -544,29 +562,19 @@ class FireflyService:
prepared = _build_sync_input(user) prepared = _build_sync_input(user)
if isinstance(prepared, UserSyncOutcome): if isinstance(prepared, UserSyncOutcome):
return prepared return prepared
outcome = None
if _should_skip_sync(prepared.password_generated, prepared.updated_at): if _should_skip_sync(prepared.password_generated, prepared.updated_at):
if prepared.rotated_at: outcome = self._rotation_outcome(prepared)
return UserSyncOutcome("skipped") if outcome is None:
check = self.check_password(prepared.mailu_email, prepared.password)
status = check.get("status") if isinstance(check, dict) else "error"
if status == "match":
return UserSyncOutcome("skipped")
if status == "mismatch":
if not _set_firefly_rotated_at(prepared.username):
return UserSyncOutcome("failed", "failed to set rotated_at")
return UserSyncOutcome("synced")
detail = check.get("detail") if isinstance(check, dict) else ""
detail = detail or "password check failed"
return UserSyncOutcome("failed", detail)
result = self.sync_user(prepared.mailu_email, prepared.password, wait=True) result = self.sync_user(prepared.mailu_email, prepared.password, wait=True)
result_status = result.get("status") if isinstance(result, dict) else "error" result_status = result.get("status") if isinstance(result, dict) else "error"
if result_status != "ok": if result_status != "ok":
return UserSyncOutcome("failed", f"sync {result_status}") outcome = UserSyncOutcome("failed", f"sync {result_status}")
if not _set_firefly_updated_at(prepared.username): elif not _set_firefly_updated_at(prepared.username):
return UserSyncOutcome("failed", "failed to set updated_at") outcome = UserSyncOutcome("failed", "failed to set updated_at")
else:
return UserSyncOutcome("synced") outcome = UserSyncOutcome("synced")
return outcome
def sync_users(self) -> dict[str, Any]: def sync_users(self) -> dict[str, Any]:
if not keycloak_admin.ready(): if not keycloak_admin.ready():

View File

@ -49,6 +49,16 @@ class VaultwardenSyncCounters:
return "ok" if self.failures == 0 else "error" return "ok" if self.failures == 0 else "error"
@dataclass
class VaultwardenInviteState:
username: str
status: str
synced_at: str
synced_ts: float | None
full_user: dict[str, Any]
counters: VaultwardenSyncCounters
def _extract_attr(attrs: Any, key: str) -> str: def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict): if not isinstance(attrs, dict):
return "" return ""
@ -216,26 +226,19 @@ def _should_refresh_invite(synced_ts: float | None) -> bool:
return (time.time() - synced_ts) >= settings.vaultwarden_invite_refresh_sec return (time.time() - synced_ts) >= settings.vaultwarden_invite_refresh_sec
def _handle_existing_invite( def _handle_existing_invite(state: VaultwardenInviteState) -> bool:
username: str, if state.status not in {"invited", "already_present"}:
current_status: str,
current_synced_at: str,
current_synced_ts: float | None,
full_user: dict[str, Any],
counters: VaultwardenSyncCounters,
) -> bool:
if current_status not in {"invited", "already_present"}:
return False return False
if current_status == "already_present": if state.status == "already_present":
if not current_synced_at: if not state.synced_at:
_set_sync_status(username, current_status) _set_sync_status(state.username, state.status)
_set_master_password_set(username, full_user) _set_master_password_set(state.username, state.full_user)
counters.skipped += 1 state.counters.skipped += 1
return True return True
if not _should_refresh_invite(current_synced_ts): if not _should_refresh_invite(state.synced_ts):
if not current_synced_at: if not state.synced_at:
_set_sync_status(username, current_status) _set_sync_status(state.username, state.status)
counters.skipped += 1 state.counters.skipped += 1
return True return True
return False return False
@ -262,14 +265,15 @@ def _sync_user(
counters.skipped += 1 counters.skipped += 1
else: else:
_ensure_email_attrs(username, full_user, email) _ensure_email_attrs(username, full_user, email)
if _handle_existing_invite( state = VaultwardenInviteState(
username, username=username,
current_status, status=current_status,
current_synced_at, synced_at=current_synced_at,
current_synced_ts, synced_ts=current_synced_ts,
full_user, full_user=full_user,
counters, counters=counters,
): )
if _handle_existing_invite(state):
status = None status = None
else: else:
counters.processed += 1 counters.processed += 1

View File

@ -14,6 +14,9 @@ from .keycloak_admin import keycloak_admin
from .mailu import mailu from .mailu import mailu
EXIT_PASSWORD_MATCH = 0
EXIT_PASSWORD_MISMATCH = 1
EXIT_USER_MISSING = 3
WGER_PASSWORD_ATTR = "wger_password" WGER_PASSWORD_ATTR = "wger_password"
WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at"
WGER_PASSWORD_ROTATED_ATTR = "wger_password_rotated_at" WGER_PASSWORD_ROTATED_ATTR = "wger_password_rotated_at"
@ -479,14 +482,29 @@ class WgerService:
return {"status": "error", "detail": str(exc)} return {"status": "error", "detail": str(exc)}
detail = (result.stdout or result.stderr).strip() detail = (result.stdout or result.stderr).strip()
if result.exit_code == 0: if result.exit_code == EXIT_PASSWORD_MATCH:
return {"status": "match", "detail": detail} return {"status": "match", "detail": detail}
if result.exit_code == 1: if result.exit_code == EXIT_PASSWORD_MISMATCH:
return {"status": "mismatch", "detail": detail} return {"status": "mismatch", "detail": detail}
if result.exit_code == 3: if result.exit_code == EXIT_USER_MISSING:
return {"status": "missing", "detail": detail or "user missing"} return {"status": "missing", "detail": detail or "user missing"}
return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"} return {"status": "error", "detail": detail or f"exit_code={result.exit_code}"}
def _rotation_outcome(self, prepared: WgerSyncInput) -> UserSyncOutcome:
if prepared.rotated_at:
return UserSyncOutcome("skipped")
check = self.check_password(prepared.username, prepared.password)
status = check.get("status") if isinstance(check, dict) else "error"
if status == "match":
return UserSyncOutcome("skipped")
if status == "mismatch":
if not _set_wger_rotated_at(prepared.username):
return UserSyncOutcome("failed", "failed to set rotated_at")
return UserSyncOutcome("synced")
detail = check.get("detail") if isinstance(check, dict) else ""
detail = detail or "password check failed"
return UserSyncOutcome("failed", detail)
def ensure_admin(self, wait: bool = False) -> dict[str, Any]: def ensure_admin(self, wait: bool = False) -> dict[str, Any]:
if not settings.wger_namespace: if not settings.wger_namespace:
raise RuntimeError("wger admin sync not configured") raise RuntimeError("wger admin sync not configured")
@ -516,19 +534,7 @@ class WgerService:
if isinstance(prepared, UserSyncOutcome): if isinstance(prepared, UserSyncOutcome):
return prepared return prepared
if _should_skip_sync(prepared.password_generated, prepared.updated_at): if _should_skip_sync(prepared.password_generated, prepared.updated_at):
if prepared.rotated_at: return self._rotation_outcome(prepared)
return UserSyncOutcome("skipped")
check = self.check_password(prepared.username, prepared.password)
status = check.get("status") if isinstance(check, dict) else "error"
if status == "match":
return UserSyncOutcome("skipped")
if status == "mismatch":
if not _set_wger_rotated_at(prepared.username):
return UserSyncOutcome("failed", "failed to set rotated_at")
return UserSyncOutcome("synced")
detail = check.get("detail") if isinstance(check, dict) else ""
detail = detail or "password check failed"
return UserSyncOutcome("failed", detail)
result = self.sync_user(prepared.username, prepared.mailu_email, prepared.password, wait=True) result = self.sync_user(prepared.username, prepared.mailu_email, prepared.password, wait=True)
result_status = result.get("status") if isinstance(result, dict) else "error" result_status = result.get("status") if isinstance(result, dict) else "error"