From 18a6471c087f1d76c1384e6e90078fa68a408bbb Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 01:42:56 -0300 Subject: [PATCH] refactor(ariadne): split provisioning workflow helpers --- ariadne/manager/provisioning.py | 741 +++-------------------- ariadne/manager/provisioning_accounts.py | 401 ++++++++++++ ariadne/manager/provisioning_protocol.py | 73 +++ ariadne/manager/provisioning_tasks.py | 179 ++++++ ci/loc_hygiene_waivers.tsv | 1 - 5 files changed, 731 insertions(+), 664 deletions(-) create mode 100644 ariadne/manager/provisioning_accounts.py create mode 100644 ariadne/manager/provisioning_protocol.py create mode 100644 ariadne/manager/provisioning_tasks.py diff --git a/ariadne/manager/provisioning.py b/ariadne/manager/provisioning.py index 625c6e6..7f66e4d 100644 --- a/ariadne/manager/provisioning.py +++ b/ariadne/manager/provisioning.py @@ -1,15 +1,11 @@ from __future__ import annotations -from dataclasses import dataclass -from datetime import datetime, timedelta, timezone -import hashlib -import re +from datetime import datetime import threading import time -from typing import Any from ..db.database import Database -from ..db.storage import REQUIRED_TASKS, Storage, TaskRunRecord +from ..db.storage import REQUIRED_TASKS, Storage from ..metrics.metrics import record_task_run, set_access_request_counts from ..services.firefly import firefly from ..services.keycloak_admin import keycloak_admin @@ -19,81 +15,30 @@ from ..services.nextcloud import nextcloud from ..services.vaultwarden import vaultwarden from ..services.wger import wger from ..settings import settings -from ..utils.errors import safe_error_detail from ..utils.logging import get_logger from ..utils.passwords import random_password - - -MAILU_EMAIL_ATTR = "mailu_email" -MAILU_APP_PASSWORD_ATTR = "mailu_app_password" -MAILU_ENABLED_ATTR = "mailu_enabled" -WGER_PASSWORD_ATTR = "wger_password" -WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" -FIREFLY_PASSWORD_ATTR = "firefly_password" -FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" -VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered" -_RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504} -_RETRYABLE_TOKENS = ( - "timeout", - "temporar", - "rate limited", - "mailbox not ready", - "connection refused", - "connection reset", - "network is unreachable", - "dns", - "name resolution", - "service unavailable", - "bad gateway", - "gateway timeout", +from .provisioning_accounts import _ProvisioningAccountsMixin +from .provisioning_protocol import ( + FIREFLY_PASSWORD_ATTR, + FIREFLY_PASSWORD_UPDATED_ATTR, + MAILU_APP_PASSWORD_ATTR, + MAILU_EMAIL_ATTR, + MAILU_ENABLED_ATTR, + VAULTWARDEN_GRANDFATHERED_FLAG, + WGER_PASSWORD_ATTR, + WGER_PASSWORD_UPDATED_ATTR, + ProvisionOutcome, + RequestContext, + _advisory_lock_id, + _extract_attr, ) +from .provisioning_tasks import _ProvisioningTaskMixin + logger = get_logger(__name__) -@dataclass(frozen=True) -class ProvisionOutcome: - ok: bool - status: str - - -@dataclass -class RequestContext: - request_code: str - username: str - first_name: str - last_name: str - contact_email: str - email_verified_at: datetime | None - status: str - initial_password: str | None - revealed_at: datetime | None - attempted_at: datetime | None - approval_flags: list[str] - user_id: str = "" - mailu_email: str = "" - - -def _advisory_lock_id(request_code: str) -> int: - digest = hashlib.sha256(request_code.encode("utf-8")).digest() - return int.from_bytes(digest[:8], "big", signed=True) - - -def _extract_attr(attrs: Any, key: str) -> str: - if not isinstance(attrs, dict): - return "" - raw = attrs.get(key) - if isinstance(raw, list): - for item in raw: - if isinstance(item, str) and item.strip(): - return item.strip() - return "" - if isinstance(raw, str) and raw.strip(): - return raw.strip() - return "" - - -class ProvisioningManager: +class ProvisioningManager(_ProvisioningTaskMixin, _ProvisioningAccountsMixin): """Coordinate approved access requests across identity and app services.""" def __init__(self, db: Database, storage: Storage) -> None: @@ -102,6 +47,48 @@ class ProvisioningManager: self._thread: threading.Thread | None = None self._stop_event = threading.Event() + @property + def _settings(self): + return settings + + @property + def _logger(self): + return logger + + @property + def _keycloak_admin(self): + return keycloak_admin + + @property + def _mailu(self): + return mailu + + @property + def _nextcloud(self): + return nextcloud + + @property + def _wger(self): + return wger + + @property + def _firefly(self): + return firefly + + @property + def _vaultwarden(self): + return vaultwarden + + @property + def _mailer(self): + return mailer + + def _random_password(self, length: int = 32) -> str: + return random_password(length) + + def _record_task_run_metric(self, task: str, status: str, duration_sec: float) -> None: + record_task_run(task, status, duration_sec) + def start(self) -> None: if self._thread and self._thread.is_alive(): return @@ -209,12 +196,7 @@ class ProvisioningManager: extra={"event": "provision_unlock_error", "request_code": request_code}, ) - def _provision_locked( - self, - conn, - request_code: str, - required_tasks: list[str], - ) -> ProvisionOutcome: + def _provision_locked(self, conn, request_code: str, required_tasks: list[str]) -> ProvisionOutcome: ctx = self._load_request(conn, request_code) if not ctx: return ProvisionOutcome(ok=False, status="unknown") @@ -229,12 +211,7 @@ class ProvisioningManager: return self._run_task_pipeline(conn, ctx, required_tasks) - def _run_task_pipeline( - self, - conn, - ctx: RequestContext, - required_tasks: list[str], - ) -> ProvisionOutcome: + def _run_task_pipeline(self, conn, ctx: RequestContext, required_tasks: list[str]) -> ProvisionOutcome: if not self._ensure_keycloak_user(conn, ctx): return ProvisionOutcome(ok=False, status="accounts_building") if not self._run_account_tasks(conn, ctx): @@ -355,581 +332,19 @@ class ProvisioningManager: pass return ProvisionOutcome(ok=False, status=pending_status) - def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None: - if not tasks: - return - conn.execute( - """ - INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) - SELECT %s, task, 'pending', NULL, NOW() - FROM UNNEST(%s::text[]) AS task - ON CONFLICT (request_code, task) DO NOTHING - """, - (request_code, tasks), - ) - def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: - conn.execute( - """ - INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) - VALUES (%s, %s, %s, %s, NOW()) - ON CONFLICT (request_code, task) - DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() - """, - (request_code, task, status, detail), - ) - - def _task_statuses(self, conn, request_code: str) -> dict[str, str]: - rows = conn.execute( - "SELECT task, status FROM access_request_tasks WHERE request_code = %s", - (request_code,), - ).fetchall() - output: dict[str, str] = {} - for row in rows: - task = row.get("task") if isinstance(row, dict) else None - status = row.get("status") if isinstance(row, dict) else None - if isinstance(task, str) and isinstance(status, str): - output[task] = status - return output - - def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool: - statuses = self._task_statuses(conn, request_code) - for task in tasks: - if statuses.get(task) != "ok": - return False - return True - - def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None: - finished = datetime.now(timezone.utc) - duration_sec = (finished - started).total_seconds() - record_task_run(task, status, duration_sec) - logger.info( - "task run", - extra={ - "event": "task_run", - "request_code": request_code, - "task": task, - "status": status, - "duration_sec": round(duration_sec, 3), - "detail": detail or "", - }, - ) - try: - self._storage.record_event( - "provision_task", - { - "request_code": request_code, - "task": task, - "status": status, - "duration_sec": round(duration_sec, 3), - "detail": detail or "", - }, - ) - except Exception: - pass - try: - self._storage.record_task_run( - TaskRunRecord( - request_code=request_code, - task=task, - status=status, - detail=detail, - started_at=started, - finished_at=finished, - duration_ms=int(duration_sec * 1000), - ) - ) - except Exception: - pass - - def _task_ok( - self, - conn, - request_code: str, - task: str, - detail: str | None, - started: datetime, - ) -> None: - self._upsert_task(conn, request_code, task, "ok", detail) - self._record_task(request_code, task, "ok", detail, started) - - def _task_error( - self, - conn, - request_code: str, - task: str, - detail: str, - started: datetime, - ) -> None: - self._upsert_task(conn, request_code, task, "error", detail) - self._record_task(request_code, task, "error", detail, started) - - def _task_pending( - self, - conn, - request_code: str, - task: str, - detail: str, - started: datetime, - ) -> None: - self._upsert_task(conn, request_code, task, "pending", detail) - self._record_task(request_code, task, "pending", detail, started) - - def _is_retryable_detail(self, detail: str) -> bool: - if not detail: - return False - detail_lower = detail.lower() - match = re.match(r"^http\s+(\d{3})", detail_lower) - if match: - try: - code = int(match.group(1)) - except ValueError: - code = 0 - if code in _RETRYABLE_HTTP_CODES: - return True - return any(token in detail_lower for token in _RETRYABLE_TOKENS) - - def _retryable_detail(self, detail: str) -> str: - cleaned = detail.strip() if isinstance(detail, str) else "" - if not cleaned: - return "retryable: temporary failure" - return f"retryable: {cleaned}" - - def _task_fail( - self, - conn, - request_code: str, - task: str, - detail: str, - started: datetime, - ) -> None: - detail_lower = detail.lower() - if "missing verified email address" in detail_lower or "email not verified" in detail_lower: - self._task_pending(conn, request_code, task, "blocked: email not verified", started) - return - if self._is_retryable_detail(detail): - self._task_pending(conn, request_code, task, self._retryable_detail(detail), started) - return - self._task_error(conn, request_code, task, detail, started) - - def _vaultwarden_rate_limit_detail(self) -> tuple[str, datetime]: - retry_at = datetime.now(timezone.utc) + timedelta( - seconds=float(settings.vaultwarden_admin_rate_limit_backoff_sec) - ) - retry_iso = retry_at.strftime("%Y-%m-%dT%H:%M:%SZ") - return f"rate limited until {retry_iso}", retry_at - - @staticmethod - def _parse_retry_at(detail: str) -> datetime | None: - prefix = "rate limited until " - if not isinstance(detail, str) or not detail.startswith(prefix): - return None - ts = detail[len(prefix) :].strip() - for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"): - try: - parsed = datetime.strptime(ts, fmt) - if parsed.tzinfo is None: - parsed = parsed.replace(tzinfo=timezone.utc) - return parsed - except ValueError: - continue - return None - - def _vaultwarden_retry_due(self, conn, request_code: str) -> bool: - row = conn.execute( - """ - SELECT status, detail - FROM access_request_tasks - WHERE request_code = %s AND task = 'vaultwarden_invite' - """, - (request_code,), - ).fetchone() - if not isinstance(row, dict): - return True - if row.get("status") != "pending": - return True - retry_at = self._parse_retry_at(row.get("detail") or "") - if not retry_at: - return True - return datetime.now(timezone.utc) >= retry_at - - @staticmethod - def _set_vaultwarden_attrs(username: str, email: str, status: str) -> None: - if not username or not email or not status: - return - try: - now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - keycloak_admin.set_user_attribute(username, "vaultwarden_email", email) - keycloak_admin.set_user_attribute(username, "vaultwarden_status", status) - keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso) - except Exception: - return - - def _ready_for_retry(self, ctx: RequestContext) -> bool: - if ctx.status != "accounts_building": - return True - attempted_at = ctx.attempted_at - if not isinstance(attempted_at, datetime): - return True - if attempted_at.tzinfo is None: - attempted_at = attempted_at.replace(tzinfo=timezone.utc) - age_sec = (datetime.now(timezone.utc) - attempted_at).total_seconds() - return age_sec >= settings.provision_retry_cooldown_sec - - def _require_verified_email(self, ctx: RequestContext) -> str: - if not isinstance(ctx.email_verified_at, datetime): - raise RuntimeError("missing verified email address") - email = ctx.contact_email.strip() - if not email: - raise RuntimeError("missing verified email address") - return email - - def _ensure_email_unused(self, email: str, username: str) -> None: - existing_email_user = keycloak_admin.find_user_by_email(email) - if existing_email_user and (existing_email_user.get("username") or "") != username: - raise RuntimeError("email is already associated with an existing Atlas account") - - def _new_user_payload( - self, - username: str, - email: str, - mailu_email: str, - first_name: str, - last_name: str, - ) -> dict[str, Any]: - payload = { - "username": username, - "enabled": True, - "email": email, - "emailVerified": True, - "requiredActions": [], - "attributes": { - MAILU_EMAIL_ATTR: [mailu_email], - MAILU_ENABLED_ATTR: ["true"], - }, - } - if first_name: - payload["firstName"] = first_name - if last_name: - payload["lastName"] = last_name - else: - payload["lastName"] = username - return payload - - def _create_or_fetch_user(self, ctx: RequestContext) -> dict[str, Any]: - user = keycloak_admin.find_user(ctx.username) - if user: - return user - email = self._require_verified_email(ctx) - self._ensure_email_unused(email, ctx.username) - payload = self._new_user_payload(ctx.username, email, ctx.mailu_email, ctx.first_name, ctx.last_name) - try: - created_id = keycloak_admin.create_user(payload) - return keycloak_admin.get_user(created_id) - except Exception as exc: - detail = safe_error_detail(exc, "create user failed") - logger.warning( - "keycloak create user failed, checking for existing user", - extra={"event": "keycloak_user_fallback", "username": ctx.username, "detail": detail}, - ) - user = keycloak_admin.find_user(ctx.username) - if user: - return user - user = keycloak_admin.find_user_by_email(email) - if user: - return user - raise - - def _fetch_full_user(self, user_id: str, fallback: dict[str, Any]) -> dict[str, Any]: - try: - return keycloak_admin.get_user(user_id) - except Exception: - return fallback - - def _strip_totp_action(self, user_id: str, full_user: dict[str, Any]) -> None: - actions = full_user.get("requiredActions") - if not isinstance(actions, list) or "CONFIGURE_TOTP" not in actions: - return - new_actions = [action for action in actions if action != "CONFIGURE_TOTP"] - keycloak_admin.update_user_safe(user_id, {"requiredActions": new_actions}) - - def _ensure_contact_email(self, ctx: RequestContext, full_user: dict[str, Any]) -> None: - email_value = full_user.get("email") - if isinstance(email_value, str) and email_value.strip(): - return - if isinstance(ctx.email_verified_at, datetime) and ctx.contact_email.strip(): - keycloak_admin.update_user_safe( - ctx.user_id, - {"email": ctx.contact_email.strip(), "emailVerified": True}, - ) - - def _ensure_mailu_attrs(self, ctx: RequestContext, full_user: dict[str, Any]) -> None: - attrs = full_user.get("attributes") or {} - if not isinstance(attrs, dict): - return - existing = _extract_attr(attrs, MAILU_EMAIL_ATTR) - if existing: - ctx.mailu_email = existing - else: - ctx.mailu_email = f"{ctx.username}@{settings.mailu_domain}" - keycloak_admin.set_user_attribute(ctx.username, MAILU_EMAIL_ATTR, ctx.mailu_email) - enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR) - if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}: - keycloak_admin.set_user_attribute(ctx.username, MAILU_ENABLED_ATTR, "true") - - def _sync_user_profile(self, ctx: RequestContext, user: dict[str, Any]) -> None: - try: - full_user = self._fetch_full_user(ctx.user_id, user) - self._strip_totp_action(ctx.user_id, full_user) - self._ensure_contact_email(ctx, full_user) - self._ensure_mailu_attrs(ctx, full_user) - except Exception: - ctx.mailu_email = f"{ctx.username}@{settings.mailu_domain}" - - def _ensure_keycloak_user(self, conn, ctx: RequestContext) -> bool: - start = datetime.now(timezone.utc) - try: - user = self._create_or_fetch_user(ctx) - ctx.user_id = str((user or {}).get("id") or "") - if not ctx.user_id: - raise RuntimeError("user id missing") - self._sync_user_profile(ctx, user) - self._task_ok(conn, ctx.request_code, "keycloak_user", None, start) - return True - except Exception as exc: - detail = safe_error_detail(exc, "failed to ensure user") - self._task_fail(conn, ctx.request_code, "keycloak_user", detail, start) - return False - - def _ensure_keycloak_password(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - should_reset = ctx.status == "accounts_building" and ctx.revealed_at is None - password_value: str | None = None - - if should_reset: - if isinstance(ctx.initial_password, str) and ctx.initial_password: - password_value = ctx.initial_password - elif ctx.initial_password is None: - password_value = random_password(20) - conn.execute( - """ - UPDATE access_requests - SET initial_password = %s - WHERE request_code = %s AND initial_password IS NULL - """, - (password_value, ctx.request_code), - ) - ctx.initial_password = password_value - - if password_value: - keycloak_admin.reset_password(ctx.user_id, password_value, temporary=False) - - if isinstance(ctx.initial_password, str) and ctx.initial_password: - self._task_ok(conn, ctx.request_code, "keycloak_password", None, start) - elif ctx.revealed_at is not None: - detail = "initial password already revealed" - self._task_ok(conn, ctx.request_code, "keycloak_password", detail, start) - else: - raise RuntimeError("initial password missing") - except Exception as exc: - detail = safe_error_detail(exc, "failed to set password") - self._task_fail(conn, ctx.request_code, "keycloak_password", detail, start) - - def _ensure_keycloak_groups(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - approved_flags = [flag for flag in ctx.approval_flags if flag in settings.allowed_flag_groups] - groups = list(dict.fromkeys(settings.default_user_groups + approved_flags)) - for group_name in groups: - gid = keycloak_admin.get_group_id(group_name) - if not gid: - raise RuntimeError(f"group missing: {group_name}") - keycloak_admin.add_user_to_group(ctx.user_id, gid) - self._task_ok(conn, ctx.request_code, "keycloak_groups", None, start) - except Exception as exc: - detail = safe_error_detail(exc, "failed to add groups") - self._task_fail(conn, ctx.request_code, "keycloak_groups", detail, start) - - def _ensure_mailu_app_password(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - full = keycloak_admin.get_user(ctx.user_id) - attrs = full.get("attributes") or {} - existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR) - if not existing: - keycloak_admin.set_user_attribute(ctx.username, MAILU_APP_PASSWORD_ATTR, random_password()) - self._task_ok(conn, ctx.request_code, "mailu_app_password", None, start) - except Exception as exc: - detail = safe_error_detail(exc, "failed to set mail password") - self._task_fail(conn, ctx.request_code, "mailu_app_password", detail, start) - - def _sync_mailu(self, conn, ctx: RequestContext) -> bool: - start = datetime.now(timezone.utc) - try: - if not mailu.ready(): - detail = "mailu not configured" - self._task_ok(conn, ctx.request_code, "mailu_sync", detail, start) - return True - mailu.sync(reason="ariadne_access_approve", force=True) - mailbox_ready = mailu.wait_for_mailbox( - ctx.mailu_email, - settings.mailu_mailbox_wait_timeout_sec, - ) - if not mailbox_ready: - raise RuntimeError("mailbox not ready") - self._task_ok(conn, ctx.request_code, "mailu_sync", None, start) - return True - except Exception as exc: - detail = safe_error_detail(exc, "failed to sync mailu") - self._task_fail(conn, ctx.request_code, "mailu_sync", detail, start) - return False - - def _sync_nextcloud_mail(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - if not settings.nextcloud_namespace: - detail = "sync disabled" - self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) - return - result = nextcloud.sync_mail(ctx.username, wait=True) - if isinstance(result, dict) and result.get("status") == "ok": - self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", None, start) - return - status_val = result.get("status") if isinstance(result, dict) else "error" - summary = result.get("summary") if isinstance(result, dict) else None - detail = "" - if summary is not None: - detail = getattr(summary, "detail", "") or "" - if not detail and isinstance(result, dict): - detail = str(result.get("detail") or "") - detail = detail or str(status_val) - self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) - except Exception as exc: - detail = safe_error_detail(exc, "failed to sync nextcloud") - self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) - - def _ensure_wger_account(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - full = keycloak_admin.get_user(ctx.user_id) - attrs = full.get("attributes") or {} - wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR) - wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR) - - if not wger_password: - wger_password = random_password(20) - keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_ATTR, wger_password) - - if not wger_password_updated_at: - result = wger.sync_user(ctx.username, ctx.mailu_email, wger_password, wait=True) - status_val = result.get("status") if isinstance(result, dict) else "error" - if status_val != "ok": - detail = result.get("detail") if isinstance(result, dict) else "" - detail = detail or f"wger sync {status_val}" - raise RuntimeError(detail) - now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_UPDATED_ATTR, now_iso) - - self._task_ok(conn, ctx.request_code, "wger_account", None, start) - except Exception as exc: - detail = safe_error_detail(exc, "failed to provision wger") - self._task_fail(conn, ctx.request_code, "wger_account", detail, start) - - def _ensure_firefly_account(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - full = keycloak_admin.get_user(ctx.user_id) - attrs = full.get("attributes") or {} - firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR) - firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR) - - if not firefly_password: - firefly_password = random_password(24) - keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_ATTR, firefly_password) - - if not firefly_password_updated_at: - result = firefly.sync_user(ctx.mailu_email, firefly_password, wait=True) - status_val = result.get("status") if isinstance(result, dict) else "error" - if status_val != "ok": - raise RuntimeError(f"firefly sync {status_val}") - now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") - keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso) - - self._task_ok(conn, ctx.request_code, "firefly_account", None, start) - except Exception as exc: - detail = safe_error_detail(exc, "failed to provision firefly") - self._task_fail(conn, ctx.request_code, "firefly_account", detail, start) - - def _handle_vaultwarden_grandfathered(self, conn, ctx: RequestContext, start: datetime) -> None: - lookup = vaultwarden.find_user_by_email(ctx.contact_email) - if lookup.status == "rate_limited": - detail, _ = self._vaultwarden_rate_limit_detail() - self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start) - self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "rate_limited") - return - if lookup.ok and lookup.status == "present": - self._task_ok(conn, ctx.request_code, "vaultwarden_invite", "grandfathered", start) - self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "grandfathered") - return - if lookup.ok and lookup.status == "missing": - self._task_error( - conn, - ctx.request_code, - "vaultwarden_invite", - "vaultwarden account not found for recovery email", - start, - ) - return - detail = lookup.detail or lookup.status - self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start) - - def _ensure_vaultwarden_invite(self, conn, ctx: RequestContext) -> None: - start = datetime.now(timezone.utc) - try: - if not self._vaultwarden_retry_due(conn, ctx.request_code): - return - if VAULTWARDEN_GRANDFATHERED_FLAG in ctx.approval_flags: - self._handle_vaultwarden_grandfathered(conn, ctx, start) - return - if not mailu.wait_for_mailbox(ctx.mailu_email, settings.mailu_mailbox_wait_timeout_sec): - try: - mailu.sync(reason="ariadne_vaultwarden_retry", force=True) - except Exception: - pass - if not mailu.wait_for_mailbox(ctx.mailu_email, settings.mailu_mailbox_wait_timeout_sec): - raise RuntimeError("mailbox not ready") - - result = vaultwarden.invite_user(ctx.mailu_email) - if result.ok: - self._task_ok(conn, ctx.request_code, "vaultwarden_invite", result.status, start) - elif result.status == "rate_limited": - detail, _ = self._vaultwarden_rate_limit_detail() - self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start) - else: - detail = result.detail or result.status - self._task_error(conn, ctx.request_code, "vaultwarden_invite", detail, start) - - status = result.status if result.status != "rate_limited" else "rate_limited" - self._set_vaultwarden_attrs(ctx.username, ctx.mailu_email, status) - except Exception as exc: - detail = safe_error_detail(exc, "failed to provision vaultwarden") - self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start) - - def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None: - if not settings.welcome_email_enabled: - return - if not contact_email: - return - try: - row = self._db.fetchone( - "SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s", - (request_code,), - ) - if row and row.get("welcome_email_sent_at"): - return - onboarding_url = f"{settings.portal_public_base_url}/onboarding?code={request_code}" - mailer.send_welcome(contact_email, request_code, onboarding_url, username=username) - self._storage.mark_welcome_sent(request_code) - except MailerError: - return +__all__ = [ + "FIREFLY_PASSWORD_ATTR", + "FIREFLY_PASSWORD_UPDATED_ATTR", + "MAILU_APP_PASSWORD_ATTR", + "MAILU_EMAIL_ATTR", + "MAILU_ENABLED_ATTR", + "MailerError", + "ProvisionOutcome", + "ProvisioningManager", + "RequestContext", + "VAULTWARDEN_GRANDFATHERED_FLAG", + "WGER_PASSWORD_ATTR", + "WGER_PASSWORD_UPDATED_ATTR", + "_extract_attr", +] diff --git a/ariadne/manager/provisioning_accounts.py b/ariadne/manager/provisioning_accounts.py new file mode 100644 index 0000000..410bce5 --- /dev/null +++ b/ariadne/manager/provisioning_accounts.py @@ -0,0 +1,401 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from ..services.mailer import MailerError +from ..utils.errors import safe_error_detail +from .provisioning_protocol import ( + FIREFLY_PASSWORD_ATTR, + FIREFLY_PASSWORD_UPDATED_ATTR, + MAILU_APP_PASSWORD_ATTR, + MAILU_EMAIL_ATTR, + MAILU_ENABLED_ATTR, + VAULTWARDEN_GRANDFATHERED_FLAG, + WGER_PASSWORD_ATTR, + WGER_PASSWORD_UPDATED_ATTR, + RequestContext, + _extract_attr, +) + + +class _ProvisioningAccountsMixin: + def _set_vaultwarden_attrs(self, username: str, email: str, status: str) -> None: + if not username or not email or not status: + return + try: + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + self._keycloak_admin.set_user_attribute(username, "vaultwarden_email", email) + self._keycloak_admin.set_user_attribute(username, "vaultwarden_status", status) + self._keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso) + except Exception: + return + + def _ready_for_retry(self, ctx: RequestContext) -> bool: + if ctx.status != "accounts_building": + return True + attempted_at = ctx.attempted_at + if not isinstance(attempted_at, datetime): + return True + if attempted_at.tzinfo is None: + attempted_at = attempted_at.replace(tzinfo=timezone.utc) + age_sec = (datetime.now(timezone.utc) - attempted_at).total_seconds() + return age_sec >= self._settings.provision_retry_cooldown_sec + + def _require_verified_email(self, ctx: RequestContext) -> str: + if not isinstance(ctx.email_verified_at, datetime): + raise RuntimeError("missing verified email address") + email = ctx.contact_email.strip() + if not email: + raise RuntimeError("missing verified email address") + return email + + def _ensure_email_unused(self, email: str, username: str) -> None: + existing_email_user = self._keycloak_admin.find_user_by_email(email) + if existing_email_user and (existing_email_user.get("username") or "") != username: + raise RuntimeError("email is already associated with an existing Atlas account") + + def _new_user_payload( + self, + username: str, + email: str, + mailu_email: str, + first_name: str, + last_name: str, + ) -> dict[str, Any]: + payload = { + "username": username, + "enabled": True, + "email": email, + "emailVerified": True, + "requiredActions": [], + "attributes": { + MAILU_EMAIL_ATTR: [mailu_email], + MAILU_ENABLED_ATTR: ["true"], + }, + } + if first_name: + payload["firstName"] = first_name + if last_name: + payload["lastName"] = last_name + else: + payload["lastName"] = username + return payload + + def _create_or_fetch_user(self, ctx: RequestContext) -> dict[str, Any]: + user = self._keycloak_admin.find_user(ctx.username) + if user: + return user + email = self._require_verified_email(ctx) + self._ensure_email_unused(email, ctx.username) + payload = self._new_user_payload(ctx.username, email, ctx.mailu_email, ctx.first_name, ctx.last_name) + try: + created_id = self._keycloak_admin.create_user(payload) + return self._keycloak_admin.get_user(created_id) + except Exception as exc: + detail = safe_error_detail(exc, "create user failed") + self._logger.warning( + "keycloak create user failed, checking for existing user", + extra={"event": "keycloak_user_fallback", "username": ctx.username, "detail": detail}, + ) + user = self._keycloak_admin.find_user(ctx.username) + if user: + return user + user = self._keycloak_admin.find_user_by_email(email) + if user: + return user + raise + + def _fetch_full_user(self, user_id: str, fallback: dict[str, Any]) -> dict[str, Any]: + try: + return self._keycloak_admin.get_user(user_id) + except Exception: + return fallback + + def _strip_totp_action(self, user_id: str, full_user: dict[str, Any]) -> None: + actions = full_user.get("requiredActions") + if not isinstance(actions, list) or "CONFIGURE_TOTP" not in actions: + return + new_actions = [action for action in actions if action != "CONFIGURE_TOTP"] + self._keycloak_admin.update_user_safe(user_id, {"requiredActions": new_actions}) + + def _ensure_contact_email(self, ctx: RequestContext, full_user: dict[str, Any]) -> None: + email_value = full_user.get("email") + if isinstance(email_value, str) and email_value.strip(): + return + if isinstance(ctx.email_verified_at, datetime) and ctx.contact_email.strip(): + self._keycloak_admin.update_user_safe( + ctx.user_id, + {"email": ctx.contact_email.strip(), "emailVerified": True}, + ) + + def _ensure_mailu_attrs(self, ctx: RequestContext, full_user: dict[str, Any]) -> None: + attrs = full_user.get("attributes") or {} + if not isinstance(attrs, dict): + return + existing = _extract_attr(attrs, MAILU_EMAIL_ATTR) + if existing: + ctx.mailu_email = existing + else: + ctx.mailu_email = f"{ctx.username}@{self._settings.mailu_domain}" + self._keycloak_admin.set_user_attribute(ctx.username, MAILU_EMAIL_ATTR, ctx.mailu_email) + enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR) + if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}: + self._keycloak_admin.set_user_attribute(ctx.username, MAILU_ENABLED_ATTR, "true") + + def _sync_user_profile(self, ctx: RequestContext, user: dict[str, Any]) -> None: + try: + full_user = self._fetch_full_user(ctx.user_id, user) + self._strip_totp_action(ctx.user_id, full_user) + self._ensure_contact_email(ctx, full_user) + self._ensure_mailu_attrs(ctx, full_user) + except Exception: + ctx.mailu_email = f"{ctx.username}@{self._settings.mailu_domain}" + + def _ensure_keycloak_user(self, conn, ctx: RequestContext) -> bool: + start = datetime.now(timezone.utc) + try: + user = self._create_or_fetch_user(ctx) + ctx.user_id = str((user or {}).get("id") or "") + if not ctx.user_id: + raise RuntimeError("user id missing") + self._sync_user_profile(ctx, user) + self._task_ok(conn, ctx.request_code, "keycloak_user", None, start) + return True + except Exception as exc: + detail = safe_error_detail(exc, "failed to ensure user") + self._task_fail(conn, ctx.request_code, "keycloak_user", detail, start) + return False + + def _ensure_keycloak_password(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + should_reset = ctx.status == "accounts_building" and ctx.revealed_at is None + password_value: str | None = None + + if should_reset: + if isinstance(ctx.initial_password, str) and ctx.initial_password: + password_value = ctx.initial_password + elif ctx.initial_password is None: + password_value = self._random_password(20) + conn.execute( + """ + UPDATE access_requests + SET initial_password = %s + WHERE request_code = %s AND initial_password IS NULL + """, + (password_value, ctx.request_code), + ) + ctx.initial_password = password_value + + if password_value: + self._keycloak_admin.reset_password(ctx.user_id, password_value, temporary=False) + + if isinstance(ctx.initial_password, str) and ctx.initial_password: + self._task_ok(conn, ctx.request_code, "keycloak_password", None, start) + elif ctx.revealed_at is not None: + detail = "initial password already revealed" + self._task_ok(conn, ctx.request_code, "keycloak_password", detail, start) + else: + raise RuntimeError("initial password missing") + except Exception as exc: + detail = safe_error_detail(exc, "failed to set password") + self._task_fail(conn, ctx.request_code, "keycloak_password", detail, start) + + def _ensure_keycloak_groups(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + approved_flags = [flag for flag in ctx.approval_flags if flag in self._settings.allowed_flag_groups] + groups = list(dict.fromkeys(self._settings.default_user_groups + approved_flags)) + for group_name in groups: + gid = self._keycloak_admin.get_group_id(group_name) + if not gid: + raise RuntimeError(f"group missing: {group_name}") + self._keycloak_admin.add_user_to_group(ctx.user_id, gid) + self._task_ok(conn, ctx.request_code, "keycloak_groups", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to add groups") + self._task_fail(conn, ctx.request_code, "keycloak_groups", detail, start) + + def _ensure_mailu_app_password(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + full = self._keycloak_admin.get_user(ctx.user_id) + attrs = full.get("attributes") or {} + existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR) + if not existing: + self._keycloak_admin.set_user_attribute(ctx.username, MAILU_APP_PASSWORD_ATTR, self._random_password()) + self._task_ok(conn, ctx.request_code, "mailu_app_password", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to set mail password") + self._task_fail(conn, ctx.request_code, "mailu_app_password", detail, start) + + def _sync_mailu(self, conn, ctx: RequestContext) -> bool: + start = datetime.now(timezone.utc) + try: + if not self._mailu.ready(): + detail = "mailu not configured" + self._task_ok(conn, ctx.request_code, "mailu_sync", detail, start) + return True + self._mailu.sync(reason="ariadne_access_approve", force=True) + mailbox_ready = self._mailu.wait_for_mailbox( + ctx.mailu_email, + self._settings.mailu_mailbox_wait_timeout_sec, + ) + if not mailbox_ready: + raise RuntimeError("mailbox not ready") + self._task_ok(conn, ctx.request_code, "mailu_sync", None, start) + return True + except Exception as exc: + detail = safe_error_detail(exc, "failed to sync mailu") + self._task_fail(conn, ctx.request_code, "mailu_sync", detail, start) + return False + + def _sync_nextcloud_mail(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + if not self._settings.nextcloud_namespace: + detail = "sync disabled" + self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) + return + result = self._nextcloud.sync_mail(ctx.username, wait=True) + if isinstance(result, dict) and result.get("status") == "ok": + self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", None, start) + return + status_val = result.get("status") if isinstance(result, dict) else "error" + summary = result.get("summary") if isinstance(result, dict) else None + detail = "" + if summary is not None: + detail = getattr(summary, "detail", "") or "" + if not detail and isinstance(result, dict): + detail = str(result.get("detail") or "") + detail = detail or str(status_val) + self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to sync nextcloud") + self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start) + + def _ensure_wger_account(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + full = self._keycloak_admin.get_user(ctx.user_id) + attrs = full.get("attributes") or {} + wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR) + wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR) + + if not wger_password: + wger_password = self._random_password(20) + self._keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_ATTR, wger_password) + + if not wger_password_updated_at: + result = self._wger.sync_user(ctx.username, ctx.mailu_email, wger_password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + detail = result.get("detail") if isinstance(result, dict) else "" + detail = detail or f"wger sync {status_val}" + raise RuntimeError(detail) + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + self._keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_UPDATED_ATTR, now_iso) + + self._task_ok(conn, ctx.request_code, "wger_account", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision wger") + self._task_fail(conn, ctx.request_code, "wger_account", detail, start) + + def _ensure_firefly_account(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + full = self._keycloak_admin.get_user(ctx.user_id) + attrs = full.get("attributes") or {} + firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR) + firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR) + + if not firefly_password: + firefly_password = self._random_password(24) + self._keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_ATTR, firefly_password) + + if not firefly_password_updated_at: + result = self._firefly.sync_user(ctx.mailu_email, firefly_password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + raise RuntimeError(f"firefly sync {status_val}") + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + self._keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso) + + self._task_ok(conn, ctx.request_code, "firefly_account", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision firefly") + self._task_fail(conn, ctx.request_code, "firefly_account", detail, start) + + def _handle_vaultwarden_grandfathered(self, conn, ctx: RequestContext, start: datetime) -> None: + lookup = self._vaultwarden.find_user_by_email(ctx.contact_email) + if lookup.status == "rate_limited": + detail, _ = self._vaultwarden_rate_limit_detail() + self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start) + self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "rate_limited") + return + if lookup.ok and lookup.status == "present": + self._task_ok(conn, ctx.request_code, "vaultwarden_invite", "grandfathered", start) + self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "grandfathered") + return + if lookup.ok and lookup.status == "missing": + self._task_error( + conn, + ctx.request_code, + "vaultwarden_invite", + "vaultwarden account not found for recovery email", + start, + ) + return + detail = lookup.detail or lookup.status + self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start) + + def _ensure_vaultwarden_invite(self, conn, ctx: RequestContext) -> None: + start = datetime.now(timezone.utc) + try: + if not self._vaultwarden_retry_due(conn, ctx.request_code): + return + if VAULTWARDEN_GRANDFATHERED_FLAG in ctx.approval_flags: + self._handle_vaultwarden_grandfathered(conn, ctx, start) + return + if not self._mailu.wait_for_mailbox(ctx.mailu_email, self._settings.mailu_mailbox_wait_timeout_sec): + try: + self._mailu.sync(reason="ariadne_vaultwarden_retry", force=True) + except Exception: + pass + if not self._mailu.wait_for_mailbox(ctx.mailu_email, self._settings.mailu_mailbox_wait_timeout_sec): + raise RuntimeError("mailbox not ready") + + result = self._vaultwarden.invite_user(ctx.mailu_email) + if result.ok: + self._task_ok(conn, ctx.request_code, "vaultwarden_invite", result.status, start) + elif result.status == "rate_limited": + detail, _ = self._vaultwarden_rate_limit_detail() + self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start) + else: + detail = result.detail or result.status + self._task_error(conn, ctx.request_code, "vaultwarden_invite", detail, start) + + status = result.status if result.status != "rate_limited" else "rate_limited" + self._set_vaultwarden_attrs(ctx.username, ctx.mailu_email, status) + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision vaultwarden") + self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start) + + def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None: + if not self._settings.welcome_email_enabled: + return + if not contact_email: + return + try: + row = self._db.fetchone( + "SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s", + (request_code,), + ) + if row and row.get("welcome_email_sent_at"): + return + onboarding_url = f"{self._settings.portal_public_base_url}/onboarding?code={request_code}" + self._mailer.send_welcome(contact_email, request_code, onboarding_url, username=username) + self._storage.mark_welcome_sent(request_code) + except MailerError: + return diff --git a/ariadne/manager/provisioning_protocol.py b/ariadne/manager/provisioning_protocol.py new file mode 100644 index 0000000..b4aecc0 --- /dev/null +++ b/ariadne/manager/provisioning_protocol.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +import hashlib +from typing import Any + +MAILU_EMAIL_ATTR = "mailu_email" +MAILU_APP_PASSWORD_ATTR = "mailu_app_password" +MAILU_ENABLED_ATTR = "mailu_enabled" +WGER_PASSWORD_ATTR = "wger_password" +WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" +FIREFLY_PASSWORD_ATTR = "firefly_password" +FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" +VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered" + +_RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504} +_RETRYABLE_TOKENS = ( + "timeout", + "temporar", + "rate limited", + "mailbox not ready", + "connection refused", + "connection reset", + "network is unreachable", + "dns", + "name resolution", + "service unavailable", + "bad gateway", + "gateway timeout", +) + + +@dataclass(frozen=True) +class ProvisionOutcome: + ok: bool + status: str + + +@dataclass +class RequestContext: + request_code: str + username: str + first_name: str + last_name: str + contact_email: str + email_verified_at: datetime | None + status: str + initial_password: str | None + revealed_at: datetime | None + attempted_at: datetime | None + approval_flags: list[str] + user_id: str = "" + mailu_email: str = "" + + +def _advisory_lock_id(request_code: str) -> int: + digest = hashlib.sha256(request_code.encode("utf-8")).digest() + return int.from_bytes(digest[:8], "big", signed=True) + + +def _extract_attr(attrs: Any, key: str) -> str: + if not isinstance(attrs, dict): + return "" + raw = attrs.get(key) + if isinstance(raw, list): + for item in raw: + if isinstance(item, str) and item.strip(): + return item.strip() + return "" + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return "" diff --git a/ariadne/manager/provisioning_tasks.py b/ariadne/manager/provisioning_tasks.py new file mode 100644 index 0000000..fb16cf2 --- /dev/null +++ b/ariadne/manager/provisioning_tasks.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +import re + +from ..db.storage import TaskRunRecord +from .provisioning_protocol import _RETRYABLE_HTTP_CODES, _RETRYABLE_TOKENS + + +class _ProvisioningTaskMixin: + def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None: + if not tasks: + return + conn.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + SELECT %s, task, 'pending', NULL, NOW() + FROM UNNEST(%s::text[]) AS task + ON CONFLICT (request_code, task) DO NOTHING + """, + (request_code, tasks), + ) + + def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: + conn.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + VALUES (%s, %s, %s, %s, NOW()) + ON CONFLICT (request_code, task) + DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() + """, + (request_code, task, status, detail), + ) + + def _task_statuses(self, conn, request_code: str) -> dict[str, str]: + rows = conn.execute( + "SELECT task, status FROM access_request_tasks WHERE request_code = %s", + (request_code,), + ).fetchall() + output: dict[str, str] = {} + for row in rows: + task = row.get("task") if isinstance(row, dict) else None + status = row.get("status") if isinstance(row, dict) else None + if isinstance(task, str) and isinstance(status, str): + output[task] = status + return output + + def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool: + statuses = self._task_statuses(conn, request_code) + for task in tasks: + if statuses.get(task) != "ok": + return False + return True + + def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None: + finished = datetime.now(timezone.utc) + duration_sec = (finished - started).total_seconds() + self._record_task_run_metric(task, status, duration_sec) + self._logger.info( + "task run", + extra={ + "event": "task_run", + "request_code": request_code, + "task": task, + "status": status, + "duration_sec": round(duration_sec, 3), + "detail": detail or "", + }, + ) + try: + self._storage.record_event( + "provision_task", + { + "request_code": request_code, + "task": task, + "status": status, + "duration_sec": round(duration_sec, 3), + "detail": detail or "", + }, + ) + except Exception: + pass + try: + self._storage.record_task_run( + TaskRunRecord( + request_code=request_code, + task=task, + status=status, + detail=detail, + started_at=started, + finished_at=finished, + duration_ms=int(duration_sec * 1000), + ) + ) + except Exception: + pass + + def _task_ok(self, conn, request_code: str, task: str, detail: str | None, started: datetime) -> None: + self._upsert_task(conn, request_code, task, "ok", detail) + self._record_task(request_code, task, "ok", detail, started) + + def _task_error(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None: + self._upsert_task(conn, request_code, task, "error", detail) + self._record_task(request_code, task, "error", detail, started) + + def _task_pending(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None: + self._upsert_task(conn, request_code, task, "pending", detail) + self._record_task(request_code, task, "pending", detail, started) + + def _is_retryable_detail(self, detail: str) -> bool: + if not detail: + return False + detail_lower = detail.lower() + match = re.match(r"^http\s+(\d{3})", detail_lower) + if match: + try: + code = int(match.group(1)) + except ValueError: + code = 0 + if code in _RETRYABLE_HTTP_CODES: + return True + return any(token in detail_lower for token in _RETRYABLE_TOKENS) + + def _retryable_detail(self, detail: str) -> str: + cleaned = detail.strip() if isinstance(detail, str) else "" + if not cleaned: + return "retryable: temporary failure" + return f"retryable: {cleaned}" + + def _task_fail(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None: + detail_lower = detail.lower() + if "missing verified email address" in detail_lower or "email not verified" in detail_lower: + self._task_pending(conn, request_code, task, "blocked: email not verified", started) + return + if self._is_retryable_detail(detail): + self._task_pending(conn, request_code, task, self._retryable_detail(detail), started) + return + self._task_error(conn, request_code, task, detail, started) + + def _vaultwarden_rate_limit_detail(self) -> tuple[str, datetime]: + retry_at = datetime.now(timezone.utc) + timedelta( + seconds=float(self._settings.vaultwarden_admin_rate_limit_backoff_sec) + ) + retry_iso = retry_at.strftime("%Y-%m-%dT%H:%M:%SZ") + return f"rate limited until {retry_iso}", retry_at + + @staticmethod + def _parse_retry_at(detail: str) -> datetime | None: + prefix = "rate limited until " + if not isinstance(detail, str) or not detail.startswith(prefix): + return None + ts = detail[len(prefix) :].strip() + for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"): + try: + parsed = datetime.strptime(ts, fmt) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + except ValueError: + continue + return None + + def _vaultwarden_retry_due(self, conn, request_code: str) -> bool: + row = conn.execute( + """ + SELECT status, detail + FROM access_request_tasks + WHERE request_code = %s AND task = 'vaultwarden_invite' + """, + (request_code,), + ).fetchone() + if not isinstance(row, dict): + return True + if row.get("status") != "pending": + return True + retry_at = self._parse_retry_at(row.get("detail") or "") + if not retry_at: + return True + return datetime.now(timezone.utc) >= retry_at diff --git a/ci/loc_hygiene_waivers.tsv b/ci/loc_hygiene_waivers.tsv index 0b36927..6d5a9bc 100644 --- a/ci/loc_hygiene_waivers.tsv +++ b/ci/loc_hygiene_waivers.tsv @@ -1,7 +1,6 @@ # path reason ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog ariadne/app.py split planned; Flask app bootstrap/routes currently co-located -ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile tests/test_services.py test module split planned; broad service contract coverage retained meanwhile tests/test_app.py test module split planned; API coverage retained meanwhile