refactor(ariadne): split provisioning workflow helpers

This commit is contained in:
codex 2026-04-21 01:42:56 -03:00
parent c11996d860
commit 18a6471c08
5 changed files with 731 additions and 664 deletions

View File

@ -1,15 +1,11 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from datetime import datetime
from datetime import datetime, timedelta, timezone
import hashlib
import re
import threading import threading
import time import time
from typing import Any
from ..db.database import Database from ..db.database import Database
from ..db.storage import REQUIRED_TASKS, Storage, TaskRunRecord from ..db.storage import REQUIRED_TASKS, Storage
from ..metrics.metrics import record_task_run, set_access_request_counts from ..metrics.metrics import record_task_run, set_access_request_counts
from ..services.firefly import firefly from ..services.firefly import firefly
from ..services.keycloak_admin import keycloak_admin from ..services.keycloak_admin import keycloak_admin
@ -19,81 +15,30 @@ from ..services.nextcloud import nextcloud
from ..services.vaultwarden import vaultwarden from ..services.vaultwarden import vaultwarden
from ..services.wger import wger from ..services.wger import wger
from ..settings import settings from ..settings import settings
from ..utils.errors import safe_error_detail
from ..utils.logging import get_logger from ..utils.logging import get_logger
from ..utils.passwords import random_password from ..utils.passwords import random_password
from .provisioning_accounts import _ProvisioningAccountsMixin
from .provisioning_protocol import (
MAILU_EMAIL_ATTR = "mailu_email" FIREFLY_PASSWORD_ATTR,
MAILU_APP_PASSWORD_ATTR = "mailu_app_password" FIREFLY_PASSWORD_UPDATED_ATTR,
MAILU_ENABLED_ATTR = "mailu_enabled" MAILU_APP_PASSWORD_ATTR,
WGER_PASSWORD_ATTR = "wger_password" MAILU_EMAIL_ATTR,
WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" MAILU_ENABLED_ATTR,
FIREFLY_PASSWORD_ATTR = "firefly_password" VAULTWARDEN_GRANDFATHERED_FLAG,
FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" WGER_PASSWORD_ATTR,
VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered" WGER_PASSWORD_UPDATED_ATTR,
_RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504} ProvisionOutcome,
_RETRYABLE_TOKENS = ( RequestContext,
"timeout", _advisory_lock_id,
"temporar", _extract_attr,
"rate limited",
"mailbox not ready",
"connection refused",
"connection reset",
"network is unreachable",
"dns",
"name resolution",
"service unavailable",
"bad gateway",
"gateway timeout",
) )
from .provisioning_tasks import _ProvisioningTaskMixin
logger = get_logger(__name__) logger = get_logger(__name__)
@dataclass(frozen=True) class ProvisioningManager(_ProvisioningTaskMixin, _ProvisioningAccountsMixin):
class ProvisionOutcome:
ok: bool
status: str
@dataclass
class RequestContext:
request_code: str
username: str
first_name: str
last_name: str
contact_email: str
email_verified_at: datetime | None
status: str
initial_password: str | None
revealed_at: datetime | None
attempted_at: datetime | None
approval_flags: list[str]
user_id: str = ""
mailu_email: str = ""
def _advisory_lock_id(request_code: str) -> int:
digest = hashlib.sha256(request_code.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big", signed=True)
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
class ProvisioningManager:
"""Coordinate approved access requests across identity and app services.""" """Coordinate approved access requests across identity and app services."""
def __init__(self, db: Database, storage: Storage) -> None: def __init__(self, db: Database, storage: Storage) -> None:
@ -102,6 +47,48 @@ class ProvisioningManager:
self._thread: threading.Thread | None = None self._thread: threading.Thread | None = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
@property
def _settings(self):
return settings
@property
def _logger(self):
return logger
@property
def _keycloak_admin(self):
return keycloak_admin
@property
def _mailu(self):
return mailu
@property
def _nextcloud(self):
return nextcloud
@property
def _wger(self):
return wger
@property
def _firefly(self):
return firefly
@property
def _vaultwarden(self):
return vaultwarden
@property
def _mailer(self):
return mailer
def _random_password(self, length: int = 32) -> str:
return random_password(length)
def _record_task_run_metric(self, task: str, status: str, duration_sec: float) -> None:
record_task_run(task, status, duration_sec)
def start(self) -> None: def start(self) -> None:
if self._thread and self._thread.is_alive(): if self._thread and self._thread.is_alive():
return return
@ -209,12 +196,7 @@ class ProvisioningManager:
extra={"event": "provision_unlock_error", "request_code": request_code}, extra={"event": "provision_unlock_error", "request_code": request_code},
) )
def _provision_locked( def _provision_locked(self, conn, request_code: str, required_tasks: list[str]) -> ProvisionOutcome:
self,
conn,
request_code: str,
required_tasks: list[str],
) -> ProvisionOutcome:
ctx = self._load_request(conn, request_code) ctx = self._load_request(conn, request_code)
if not ctx: if not ctx:
return ProvisionOutcome(ok=False, status="unknown") return ProvisionOutcome(ok=False, status="unknown")
@ -229,12 +211,7 @@ class ProvisioningManager:
return self._run_task_pipeline(conn, ctx, required_tasks) return self._run_task_pipeline(conn, ctx, required_tasks)
def _run_task_pipeline( def _run_task_pipeline(self, conn, ctx: RequestContext, required_tasks: list[str]) -> ProvisionOutcome:
self,
conn,
ctx: RequestContext,
required_tasks: list[str],
) -> ProvisionOutcome:
if not self._ensure_keycloak_user(conn, ctx): if not self._ensure_keycloak_user(conn, ctx):
return ProvisionOutcome(ok=False, status="accounts_building") return ProvisionOutcome(ok=False, status="accounts_building")
if not self._run_account_tasks(conn, ctx): if not self._run_account_tasks(conn, ctx):
@ -355,581 +332,19 @@ class ProvisioningManager:
pass pass
return ProvisionOutcome(ok=False, status=pending_status) return ProvisionOutcome(ok=False, status=pending_status)
def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None:
if not tasks:
return
conn.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
SELECT %s, task, 'pending', NULL, NOW()
FROM UNNEST(%s::text[]) AS task
ON CONFLICT (request_code, task) DO NOTHING
""",
(request_code, tasks),
)
def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: __all__ = [
conn.execute( "FIREFLY_PASSWORD_ATTR",
""" "FIREFLY_PASSWORD_UPDATED_ATTR",
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) "MAILU_APP_PASSWORD_ATTR",
VALUES (%s, %s, %s, %s, NOW()) "MAILU_EMAIL_ATTR",
ON CONFLICT (request_code, task) "MAILU_ENABLED_ATTR",
DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() "MailerError",
""", "ProvisionOutcome",
(request_code, task, status, detail), "ProvisioningManager",
) "RequestContext",
"VAULTWARDEN_GRANDFATHERED_FLAG",
def _task_statuses(self, conn, request_code: str) -> dict[str, str]: "WGER_PASSWORD_ATTR",
rows = conn.execute( "WGER_PASSWORD_UPDATED_ATTR",
"SELECT task, status FROM access_request_tasks WHERE request_code = %s", "_extract_attr",
(request_code,), ]
).fetchall()
output: dict[str, str] = {}
for row in rows:
task = row.get("task") if isinstance(row, dict) else None
status = row.get("status") if isinstance(row, dict) else None
if isinstance(task, str) and isinstance(status, str):
output[task] = status
return output
def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool:
statuses = self._task_statuses(conn, request_code)
for task in tasks:
if statuses.get(task) != "ok":
return False
return True
def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None:
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
record_task_run(task, status, duration_sec)
logger.info(
"task run",
extra={
"event": "task_run",
"request_code": request_code,
"task": task,
"status": status,
"duration_sec": round(duration_sec, 3),
"detail": detail or "",
},
)
try:
self._storage.record_event(
"provision_task",
{
"request_code": request_code,
"task": task,
"status": status,
"duration_sec": round(duration_sec, 3),
"detail": detail or "",
},
)
except Exception:
pass
try:
self._storage.record_task_run(
TaskRunRecord(
request_code=request_code,
task=task,
status=status,
detail=detail,
started_at=started,
finished_at=finished,
duration_ms=int(duration_sec * 1000),
)
)
except Exception:
pass
def _task_ok(
self,
conn,
request_code: str,
task: str,
detail: str | None,
started: datetime,
) -> None:
self._upsert_task(conn, request_code, task, "ok", detail)
self._record_task(request_code, task, "ok", detail, started)
def _task_error(
self,
conn,
request_code: str,
task: str,
detail: str,
started: datetime,
) -> None:
self._upsert_task(conn, request_code, task, "error", detail)
self._record_task(request_code, task, "error", detail, started)
def _task_pending(
self,
conn,
request_code: str,
task: str,
detail: str,
started: datetime,
) -> None:
self._upsert_task(conn, request_code, task, "pending", detail)
self._record_task(request_code, task, "pending", detail, started)
def _is_retryable_detail(self, detail: str) -> bool:
if not detail:
return False
detail_lower = detail.lower()
match = re.match(r"^http\s+(\d{3})", detail_lower)
if match:
try:
code = int(match.group(1))
except ValueError:
code = 0
if code in _RETRYABLE_HTTP_CODES:
return True
return any(token in detail_lower for token in _RETRYABLE_TOKENS)
def _retryable_detail(self, detail: str) -> str:
cleaned = detail.strip() if isinstance(detail, str) else ""
if not cleaned:
return "retryable: temporary failure"
return f"retryable: {cleaned}"
def _task_fail(
self,
conn,
request_code: str,
task: str,
detail: str,
started: datetime,
) -> None:
detail_lower = detail.lower()
if "missing verified email address" in detail_lower or "email not verified" in detail_lower:
self._task_pending(conn, request_code, task, "blocked: email not verified", started)
return
if self._is_retryable_detail(detail):
self._task_pending(conn, request_code, task, self._retryable_detail(detail), started)
return
self._task_error(conn, request_code, task, detail, started)
def _vaultwarden_rate_limit_detail(self) -> tuple[str, datetime]:
retry_at = datetime.now(timezone.utc) + timedelta(
seconds=float(settings.vaultwarden_admin_rate_limit_backoff_sec)
)
retry_iso = retry_at.strftime("%Y-%m-%dT%H:%M:%SZ")
return f"rate limited until {retry_iso}", retry_at
@staticmethod
def _parse_retry_at(detail: str) -> datetime | None:
prefix = "rate limited until "
if not isinstance(detail, str) or not detail.startswith(prefix):
return None
ts = detail[len(prefix) :].strip()
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"):
try:
parsed = datetime.strptime(ts, fmt)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except ValueError:
continue
return None
def _vaultwarden_retry_due(self, conn, request_code: str) -> bool:
row = conn.execute(
"""
SELECT status, detail
FROM access_request_tasks
WHERE request_code = %s AND task = 'vaultwarden_invite'
""",
(request_code,),
).fetchone()
if not isinstance(row, dict):
return True
if row.get("status") != "pending":
return True
retry_at = self._parse_retry_at(row.get("detail") or "")
if not retry_at:
return True
return datetime.now(timezone.utc) >= retry_at
@staticmethod
def _set_vaultwarden_attrs(username: str, email: str, status: str) -> None:
if not username or not email or not status:
return
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(username, "vaultwarden_email", email)
keycloak_admin.set_user_attribute(username, "vaultwarden_status", status)
keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso)
except Exception:
return
def _ready_for_retry(self, ctx: RequestContext) -> bool:
if ctx.status != "accounts_building":
return True
attempted_at = ctx.attempted_at
if not isinstance(attempted_at, datetime):
return True
if attempted_at.tzinfo is None:
attempted_at = attempted_at.replace(tzinfo=timezone.utc)
age_sec = (datetime.now(timezone.utc) - attempted_at).total_seconds()
return age_sec >= settings.provision_retry_cooldown_sec
def _require_verified_email(self, ctx: RequestContext) -> str:
if not isinstance(ctx.email_verified_at, datetime):
raise RuntimeError("missing verified email address")
email = ctx.contact_email.strip()
if not email:
raise RuntimeError("missing verified email address")
return email
def _ensure_email_unused(self, email: str, username: str) -> None:
existing_email_user = keycloak_admin.find_user_by_email(email)
if existing_email_user and (existing_email_user.get("username") or "") != username:
raise RuntimeError("email is already associated with an existing Atlas account")
def _new_user_payload(
self,
username: str,
email: str,
mailu_email: str,
first_name: str,
last_name: str,
) -> dict[str, Any]:
payload = {
"username": username,
"enabled": True,
"email": email,
"emailVerified": True,
"requiredActions": [],
"attributes": {
MAILU_EMAIL_ATTR: [mailu_email],
MAILU_ENABLED_ATTR: ["true"],
},
}
if first_name:
payload["firstName"] = first_name
if last_name:
payload["lastName"] = last_name
else:
payload["lastName"] = username
return payload
def _create_or_fetch_user(self, ctx: RequestContext) -> dict[str, Any]:
user = keycloak_admin.find_user(ctx.username)
if user:
return user
email = self._require_verified_email(ctx)
self._ensure_email_unused(email, ctx.username)
payload = self._new_user_payload(ctx.username, email, ctx.mailu_email, ctx.first_name, ctx.last_name)
try:
created_id = keycloak_admin.create_user(payload)
return keycloak_admin.get_user(created_id)
except Exception as exc:
detail = safe_error_detail(exc, "create user failed")
logger.warning(
"keycloak create user failed, checking for existing user",
extra={"event": "keycloak_user_fallback", "username": ctx.username, "detail": detail},
)
user = keycloak_admin.find_user(ctx.username)
if user:
return user
user = keycloak_admin.find_user_by_email(email)
if user:
return user
raise
def _fetch_full_user(self, user_id: str, fallback: dict[str, Any]) -> dict[str, Any]:
try:
return keycloak_admin.get_user(user_id)
except Exception:
return fallback
def _strip_totp_action(self, user_id: str, full_user: dict[str, Any]) -> None:
actions = full_user.get("requiredActions")
if not isinstance(actions, list) or "CONFIGURE_TOTP" not in actions:
return
new_actions = [action for action in actions if action != "CONFIGURE_TOTP"]
keycloak_admin.update_user_safe(user_id, {"requiredActions": new_actions})
def _ensure_contact_email(self, ctx: RequestContext, full_user: dict[str, Any]) -> None:
email_value = full_user.get("email")
if isinstance(email_value, str) and email_value.strip():
return
if isinstance(ctx.email_verified_at, datetime) and ctx.contact_email.strip():
keycloak_admin.update_user_safe(
ctx.user_id,
{"email": ctx.contact_email.strip(), "emailVerified": True},
)
def _ensure_mailu_attrs(self, ctx: RequestContext, full_user: dict[str, Any]) -> None:
attrs = full_user.get("attributes") or {}
if not isinstance(attrs, dict):
return
existing = _extract_attr(attrs, MAILU_EMAIL_ATTR)
if existing:
ctx.mailu_email = existing
else:
ctx.mailu_email = f"{ctx.username}@{settings.mailu_domain}"
keycloak_admin.set_user_attribute(ctx.username, MAILU_EMAIL_ATTR, ctx.mailu_email)
enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR)
if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}:
keycloak_admin.set_user_attribute(ctx.username, MAILU_ENABLED_ATTR, "true")
def _sync_user_profile(self, ctx: RequestContext, user: dict[str, Any]) -> None:
try:
full_user = self._fetch_full_user(ctx.user_id, user)
self._strip_totp_action(ctx.user_id, full_user)
self._ensure_contact_email(ctx, full_user)
self._ensure_mailu_attrs(ctx, full_user)
except Exception:
ctx.mailu_email = f"{ctx.username}@{settings.mailu_domain}"
def _ensure_keycloak_user(self, conn, ctx: RequestContext) -> bool:
start = datetime.now(timezone.utc)
try:
user = self._create_or_fetch_user(ctx)
ctx.user_id = str((user or {}).get("id") or "")
if not ctx.user_id:
raise RuntimeError("user id missing")
self._sync_user_profile(ctx, user)
self._task_ok(conn, ctx.request_code, "keycloak_user", None, start)
return True
except Exception as exc:
detail = safe_error_detail(exc, "failed to ensure user")
self._task_fail(conn, ctx.request_code, "keycloak_user", detail, start)
return False
def _ensure_keycloak_password(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
should_reset = ctx.status == "accounts_building" and ctx.revealed_at is None
password_value: str | None = None
if should_reset:
if isinstance(ctx.initial_password, str) and ctx.initial_password:
password_value = ctx.initial_password
elif ctx.initial_password is None:
password_value = random_password(20)
conn.execute(
"""
UPDATE access_requests
SET initial_password = %s
WHERE request_code = %s AND initial_password IS NULL
""",
(password_value, ctx.request_code),
)
ctx.initial_password = password_value
if password_value:
keycloak_admin.reset_password(ctx.user_id, password_value, temporary=False)
if isinstance(ctx.initial_password, str) and ctx.initial_password:
self._task_ok(conn, ctx.request_code, "keycloak_password", None, start)
elif ctx.revealed_at is not None:
detail = "initial password already revealed"
self._task_ok(conn, ctx.request_code, "keycloak_password", detail, start)
else:
raise RuntimeError("initial password missing")
except Exception as exc:
detail = safe_error_detail(exc, "failed to set password")
self._task_fail(conn, ctx.request_code, "keycloak_password", detail, start)
def _ensure_keycloak_groups(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
approved_flags = [flag for flag in ctx.approval_flags if flag in settings.allowed_flag_groups]
groups = list(dict.fromkeys(settings.default_user_groups + approved_flags))
for group_name in groups:
gid = keycloak_admin.get_group_id(group_name)
if not gid:
raise RuntimeError(f"group missing: {group_name}")
keycloak_admin.add_user_to_group(ctx.user_id, gid)
self._task_ok(conn, ctx.request_code, "keycloak_groups", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to add groups")
self._task_fail(conn, ctx.request_code, "keycloak_groups", detail, start)
def _ensure_mailu_app_password(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR)
if not existing:
keycloak_admin.set_user_attribute(ctx.username, MAILU_APP_PASSWORD_ATTR, random_password())
self._task_ok(conn, ctx.request_code, "mailu_app_password", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to set mail password")
self._task_fail(conn, ctx.request_code, "mailu_app_password", detail, start)
def _sync_mailu(self, conn, ctx: RequestContext) -> bool:
start = datetime.now(timezone.utc)
try:
if not mailu.ready():
detail = "mailu not configured"
self._task_ok(conn, ctx.request_code, "mailu_sync", detail, start)
return True
mailu.sync(reason="ariadne_access_approve", force=True)
mailbox_ready = mailu.wait_for_mailbox(
ctx.mailu_email,
settings.mailu_mailbox_wait_timeout_sec,
)
if not mailbox_ready:
raise RuntimeError("mailbox not ready")
self._task_ok(conn, ctx.request_code, "mailu_sync", None, start)
return True
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync mailu")
self._task_fail(conn, ctx.request_code, "mailu_sync", detail, start)
return False
def _sync_nextcloud_mail(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
if not settings.nextcloud_namespace:
detail = "sync disabled"
self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
return
result = nextcloud.sync_mail(ctx.username, wait=True)
if isinstance(result, dict) and result.get("status") == "ok":
self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", None, start)
return
status_val = result.get("status") if isinstance(result, dict) else "error"
summary = result.get("summary") if isinstance(result, dict) else None
detail = ""
if summary is not None:
detail = getattr(summary, "detail", "") or ""
if not detail and isinstance(result, dict):
detail = str(result.get("detail") or "")
detail = detail or str(status_val)
self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync nextcloud")
self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
def _ensure_wger_account(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR)
wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR)
if not wger_password:
wger_password = random_password(20)
keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_ATTR, wger_password)
if not wger_password_updated_at:
result = wger.sync_user(ctx.username, ctx.mailu_email, wger_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
detail = result.get("detail") if isinstance(result, dict) else ""
detail = detail or f"wger sync {status_val}"
raise RuntimeError(detail)
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_UPDATED_ATTR, now_iso)
self._task_ok(conn, ctx.request_code, "wger_account", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision wger")
self._task_fail(conn, ctx.request_code, "wger_account", detail, start)
def _ensure_firefly_account(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR)
firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR)
if not firefly_password:
firefly_password = random_password(24)
keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_ATTR, firefly_password)
if not firefly_password_updated_at:
result = firefly.sync_user(ctx.mailu_email, firefly_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso)
self._task_ok(conn, ctx.request_code, "firefly_account", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision firefly")
self._task_fail(conn, ctx.request_code, "firefly_account", detail, start)
def _handle_vaultwarden_grandfathered(self, conn, ctx: RequestContext, start: datetime) -> None:
lookup = vaultwarden.find_user_by_email(ctx.contact_email)
if lookup.status == "rate_limited":
detail, _ = self._vaultwarden_rate_limit_detail()
self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start)
self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "rate_limited")
return
if lookup.ok and lookup.status == "present":
self._task_ok(conn, ctx.request_code, "vaultwarden_invite", "grandfathered", start)
self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "grandfathered")
return
if lookup.ok and lookup.status == "missing":
self._task_error(
conn,
ctx.request_code,
"vaultwarden_invite",
"vaultwarden account not found for recovery email",
start,
)
return
detail = lookup.detail or lookup.status
self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start)
def _ensure_vaultwarden_invite(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
if not self._vaultwarden_retry_due(conn, ctx.request_code):
return
if VAULTWARDEN_GRANDFATHERED_FLAG in ctx.approval_flags:
self._handle_vaultwarden_grandfathered(conn, ctx, start)
return
if not mailu.wait_for_mailbox(ctx.mailu_email, settings.mailu_mailbox_wait_timeout_sec):
try:
mailu.sync(reason="ariadne_vaultwarden_retry", force=True)
except Exception:
pass
if not mailu.wait_for_mailbox(ctx.mailu_email, settings.mailu_mailbox_wait_timeout_sec):
raise RuntimeError("mailbox not ready")
result = vaultwarden.invite_user(ctx.mailu_email)
if result.ok:
self._task_ok(conn, ctx.request_code, "vaultwarden_invite", result.status, start)
elif result.status == "rate_limited":
detail, _ = self._vaultwarden_rate_limit_detail()
self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start)
else:
detail = result.detail or result.status
self._task_error(conn, ctx.request_code, "vaultwarden_invite", detail, start)
status = result.status if result.status != "rate_limited" else "rate_limited"
self._set_vaultwarden_attrs(ctx.username, ctx.mailu_email, status)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision vaultwarden")
self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start)
def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None:
if not settings.welcome_email_enabled:
return
if not contact_email:
return
try:
row = self._db.fetchone(
"SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s",
(request_code,),
)
if row and row.get("welcome_email_sent_at"):
return
onboarding_url = f"{settings.portal_public_base_url}/onboarding?code={request_code}"
mailer.send_welcome(contact_email, request_code, onboarding_url, username=username)
self._storage.mark_welcome_sent(request_code)
except MailerError:
return

View File

@ -0,0 +1,401 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from ..services.mailer import MailerError
from ..utils.errors import safe_error_detail
from .provisioning_protocol import (
FIREFLY_PASSWORD_ATTR,
FIREFLY_PASSWORD_UPDATED_ATTR,
MAILU_APP_PASSWORD_ATTR,
MAILU_EMAIL_ATTR,
MAILU_ENABLED_ATTR,
VAULTWARDEN_GRANDFATHERED_FLAG,
WGER_PASSWORD_ATTR,
WGER_PASSWORD_UPDATED_ATTR,
RequestContext,
_extract_attr,
)
class _ProvisioningAccountsMixin:
def _set_vaultwarden_attrs(self, username: str, email: str, status: str) -> None:
if not username or not email or not status:
return
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self._keycloak_admin.set_user_attribute(username, "vaultwarden_email", email)
self._keycloak_admin.set_user_attribute(username, "vaultwarden_status", status)
self._keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso)
except Exception:
return
def _ready_for_retry(self, ctx: RequestContext) -> bool:
if ctx.status != "accounts_building":
return True
attempted_at = ctx.attempted_at
if not isinstance(attempted_at, datetime):
return True
if attempted_at.tzinfo is None:
attempted_at = attempted_at.replace(tzinfo=timezone.utc)
age_sec = (datetime.now(timezone.utc) - attempted_at).total_seconds()
return age_sec >= self._settings.provision_retry_cooldown_sec
def _require_verified_email(self, ctx: RequestContext) -> str:
if not isinstance(ctx.email_verified_at, datetime):
raise RuntimeError("missing verified email address")
email = ctx.contact_email.strip()
if not email:
raise RuntimeError("missing verified email address")
return email
def _ensure_email_unused(self, email: str, username: str) -> None:
existing_email_user = self._keycloak_admin.find_user_by_email(email)
if existing_email_user and (existing_email_user.get("username") or "") != username:
raise RuntimeError("email is already associated with an existing Atlas account")
def _new_user_payload(
self,
username: str,
email: str,
mailu_email: str,
first_name: str,
last_name: str,
) -> dict[str, Any]:
payload = {
"username": username,
"enabled": True,
"email": email,
"emailVerified": True,
"requiredActions": [],
"attributes": {
MAILU_EMAIL_ATTR: [mailu_email],
MAILU_ENABLED_ATTR: ["true"],
},
}
if first_name:
payload["firstName"] = first_name
if last_name:
payload["lastName"] = last_name
else:
payload["lastName"] = username
return payload
def _create_or_fetch_user(self, ctx: RequestContext) -> dict[str, Any]:
user = self._keycloak_admin.find_user(ctx.username)
if user:
return user
email = self._require_verified_email(ctx)
self._ensure_email_unused(email, ctx.username)
payload = self._new_user_payload(ctx.username, email, ctx.mailu_email, ctx.first_name, ctx.last_name)
try:
created_id = self._keycloak_admin.create_user(payload)
return self._keycloak_admin.get_user(created_id)
except Exception as exc:
detail = safe_error_detail(exc, "create user failed")
self._logger.warning(
"keycloak create user failed, checking for existing user",
extra={"event": "keycloak_user_fallback", "username": ctx.username, "detail": detail},
)
user = self._keycloak_admin.find_user(ctx.username)
if user:
return user
user = self._keycloak_admin.find_user_by_email(email)
if user:
return user
raise
def _fetch_full_user(self, user_id: str, fallback: dict[str, Any]) -> dict[str, Any]:
try:
return self._keycloak_admin.get_user(user_id)
except Exception:
return fallback
def _strip_totp_action(self, user_id: str, full_user: dict[str, Any]) -> None:
actions = full_user.get("requiredActions")
if not isinstance(actions, list) or "CONFIGURE_TOTP" not in actions:
return
new_actions = [action for action in actions if action != "CONFIGURE_TOTP"]
self._keycloak_admin.update_user_safe(user_id, {"requiredActions": new_actions})
def _ensure_contact_email(self, ctx: RequestContext, full_user: dict[str, Any]) -> None:
email_value = full_user.get("email")
if isinstance(email_value, str) and email_value.strip():
return
if isinstance(ctx.email_verified_at, datetime) and ctx.contact_email.strip():
self._keycloak_admin.update_user_safe(
ctx.user_id,
{"email": ctx.contact_email.strip(), "emailVerified": True},
)
def _ensure_mailu_attrs(self, ctx: RequestContext, full_user: dict[str, Any]) -> None:
attrs = full_user.get("attributes") or {}
if not isinstance(attrs, dict):
return
existing = _extract_attr(attrs, MAILU_EMAIL_ATTR)
if existing:
ctx.mailu_email = existing
else:
ctx.mailu_email = f"{ctx.username}@{self._settings.mailu_domain}"
self._keycloak_admin.set_user_attribute(ctx.username, MAILU_EMAIL_ATTR, ctx.mailu_email)
enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR)
if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}:
self._keycloak_admin.set_user_attribute(ctx.username, MAILU_ENABLED_ATTR, "true")
def _sync_user_profile(self, ctx: RequestContext, user: dict[str, Any]) -> None:
try:
full_user = self._fetch_full_user(ctx.user_id, user)
self._strip_totp_action(ctx.user_id, full_user)
self._ensure_contact_email(ctx, full_user)
self._ensure_mailu_attrs(ctx, full_user)
except Exception:
ctx.mailu_email = f"{ctx.username}@{self._settings.mailu_domain}"
def _ensure_keycloak_user(self, conn, ctx: RequestContext) -> bool:
start = datetime.now(timezone.utc)
try:
user = self._create_or_fetch_user(ctx)
ctx.user_id = str((user or {}).get("id") or "")
if not ctx.user_id:
raise RuntimeError("user id missing")
self._sync_user_profile(ctx, user)
self._task_ok(conn, ctx.request_code, "keycloak_user", None, start)
return True
except Exception as exc:
detail = safe_error_detail(exc, "failed to ensure user")
self._task_fail(conn, ctx.request_code, "keycloak_user", detail, start)
return False
def _ensure_keycloak_password(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
should_reset = ctx.status == "accounts_building" and ctx.revealed_at is None
password_value: str | None = None
if should_reset:
if isinstance(ctx.initial_password, str) and ctx.initial_password:
password_value = ctx.initial_password
elif ctx.initial_password is None:
password_value = self._random_password(20)
conn.execute(
"""
UPDATE access_requests
SET initial_password = %s
WHERE request_code = %s AND initial_password IS NULL
""",
(password_value, ctx.request_code),
)
ctx.initial_password = password_value
if password_value:
self._keycloak_admin.reset_password(ctx.user_id, password_value, temporary=False)
if isinstance(ctx.initial_password, str) and ctx.initial_password:
self._task_ok(conn, ctx.request_code, "keycloak_password", None, start)
elif ctx.revealed_at is not None:
detail = "initial password already revealed"
self._task_ok(conn, ctx.request_code, "keycloak_password", detail, start)
else:
raise RuntimeError("initial password missing")
except Exception as exc:
detail = safe_error_detail(exc, "failed to set password")
self._task_fail(conn, ctx.request_code, "keycloak_password", detail, start)
def _ensure_keycloak_groups(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
approved_flags = [flag for flag in ctx.approval_flags if flag in self._settings.allowed_flag_groups]
groups = list(dict.fromkeys(self._settings.default_user_groups + approved_flags))
for group_name in groups:
gid = self._keycloak_admin.get_group_id(group_name)
if not gid:
raise RuntimeError(f"group missing: {group_name}")
self._keycloak_admin.add_user_to_group(ctx.user_id, gid)
self._task_ok(conn, ctx.request_code, "keycloak_groups", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to add groups")
self._task_fail(conn, ctx.request_code, "keycloak_groups", detail, start)
def _ensure_mailu_app_password(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = self._keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR)
if not existing:
self._keycloak_admin.set_user_attribute(ctx.username, MAILU_APP_PASSWORD_ATTR, self._random_password())
self._task_ok(conn, ctx.request_code, "mailu_app_password", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to set mail password")
self._task_fail(conn, ctx.request_code, "mailu_app_password", detail, start)
def _sync_mailu(self, conn, ctx: RequestContext) -> bool:
start = datetime.now(timezone.utc)
try:
if not self._mailu.ready():
detail = "mailu not configured"
self._task_ok(conn, ctx.request_code, "mailu_sync", detail, start)
return True
self._mailu.sync(reason="ariadne_access_approve", force=True)
mailbox_ready = self._mailu.wait_for_mailbox(
ctx.mailu_email,
self._settings.mailu_mailbox_wait_timeout_sec,
)
if not mailbox_ready:
raise RuntimeError("mailbox not ready")
self._task_ok(conn, ctx.request_code, "mailu_sync", None, start)
return True
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync mailu")
self._task_fail(conn, ctx.request_code, "mailu_sync", detail, start)
return False
def _sync_nextcloud_mail(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
if not self._settings.nextcloud_namespace:
detail = "sync disabled"
self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
return
result = self._nextcloud.sync_mail(ctx.username, wait=True)
if isinstance(result, dict) and result.get("status") == "ok":
self._task_ok(conn, ctx.request_code, "nextcloud_mail_sync", None, start)
return
status_val = result.get("status") if isinstance(result, dict) else "error"
summary = result.get("summary") if isinstance(result, dict) else None
detail = ""
if summary is not None:
detail = getattr(summary, "detail", "") or ""
if not detail and isinstance(result, dict):
detail = str(result.get("detail") or "")
detail = detail or str(status_val)
self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync nextcloud")
self._task_fail(conn, ctx.request_code, "nextcloud_mail_sync", detail, start)
def _ensure_wger_account(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = self._keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR)
wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR)
if not wger_password:
wger_password = self._random_password(20)
self._keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_ATTR, wger_password)
if not wger_password_updated_at:
result = self._wger.sync_user(ctx.username, ctx.mailu_email, wger_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
detail = result.get("detail") if isinstance(result, dict) else ""
detail = detail or f"wger sync {status_val}"
raise RuntimeError(detail)
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self._keycloak_admin.set_user_attribute(ctx.username, WGER_PASSWORD_UPDATED_ATTR, now_iso)
self._task_ok(conn, ctx.request_code, "wger_account", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision wger")
self._task_fail(conn, ctx.request_code, "wger_account", detail, start)
def _ensure_firefly_account(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
full = self._keycloak_admin.get_user(ctx.user_id)
attrs = full.get("attributes") or {}
firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR)
firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR)
if not firefly_password:
firefly_password = self._random_password(24)
self._keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_ATTR, firefly_password)
if not firefly_password_updated_at:
result = self._firefly.sync_user(ctx.mailu_email, firefly_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
self._keycloak_admin.set_user_attribute(ctx.username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso)
self._task_ok(conn, ctx.request_code, "firefly_account", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision firefly")
self._task_fail(conn, ctx.request_code, "firefly_account", detail, start)
def _handle_vaultwarden_grandfathered(self, conn, ctx: RequestContext, start: datetime) -> None:
lookup = self._vaultwarden.find_user_by_email(ctx.contact_email)
if lookup.status == "rate_limited":
detail, _ = self._vaultwarden_rate_limit_detail()
self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start)
self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "rate_limited")
return
if lookup.ok and lookup.status == "present":
self._task_ok(conn, ctx.request_code, "vaultwarden_invite", "grandfathered", start)
self._set_vaultwarden_attrs(ctx.username, ctx.contact_email, "grandfathered")
return
if lookup.ok and lookup.status == "missing":
self._task_error(
conn,
ctx.request_code,
"vaultwarden_invite",
"vaultwarden account not found for recovery email",
start,
)
return
detail = lookup.detail or lookup.status
self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start)
def _ensure_vaultwarden_invite(self, conn, ctx: RequestContext) -> None:
start = datetime.now(timezone.utc)
try:
if not self._vaultwarden_retry_due(conn, ctx.request_code):
return
if VAULTWARDEN_GRANDFATHERED_FLAG in ctx.approval_flags:
self._handle_vaultwarden_grandfathered(conn, ctx, start)
return
if not self._mailu.wait_for_mailbox(ctx.mailu_email, self._settings.mailu_mailbox_wait_timeout_sec):
try:
self._mailu.sync(reason="ariadne_vaultwarden_retry", force=True)
except Exception:
pass
if not self._mailu.wait_for_mailbox(ctx.mailu_email, self._settings.mailu_mailbox_wait_timeout_sec):
raise RuntimeError("mailbox not ready")
result = self._vaultwarden.invite_user(ctx.mailu_email)
if result.ok:
self._task_ok(conn, ctx.request_code, "vaultwarden_invite", result.status, start)
elif result.status == "rate_limited":
detail, _ = self._vaultwarden_rate_limit_detail()
self._task_pending(conn, ctx.request_code, "vaultwarden_invite", detail, start)
else:
detail = result.detail or result.status
self._task_error(conn, ctx.request_code, "vaultwarden_invite", detail, start)
status = result.status if result.status != "rate_limited" else "rate_limited"
self._set_vaultwarden_attrs(ctx.username, ctx.mailu_email, status)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision vaultwarden")
self._task_fail(conn, ctx.request_code, "vaultwarden_invite", detail, start)
def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None:
if not self._settings.welcome_email_enabled:
return
if not contact_email:
return
try:
row = self._db.fetchone(
"SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s",
(request_code,),
)
if row and row.get("welcome_email_sent_at"):
return
onboarding_url = f"{self._settings.portal_public_base_url}/onboarding?code={request_code}"
self._mailer.send_welcome(contact_email, request_code, onboarding_url, username=username)
self._storage.mark_welcome_sent(request_code)
except MailerError:
return

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import hashlib
from typing import Any
MAILU_EMAIL_ATTR = "mailu_email"
MAILU_APP_PASSWORD_ATTR = "mailu_app_password"
MAILU_ENABLED_ATTR = "mailu_enabled"
WGER_PASSWORD_ATTR = "wger_password"
WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at"
FIREFLY_PASSWORD_ATTR = "firefly_password"
FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at"
VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered"
_RETRYABLE_HTTP_CODES = {429, 500, 502, 503, 504}
_RETRYABLE_TOKENS = (
"timeout",
"temporar",
"rate limited",
"mailbox not ready",
"connection refused",
"connection reset",
"network is unreachable",
"dns",
"name resolution",
"service unavailable",
"bad gateway",
"gateway timeout",
)
@dataclass(frozen=True)
class ProvisionOutcome:
ok: bool
status: str
@dataclass
class RequestContext:
request_code: str
username: str
first_name: str
last_name: str
contact_email: str
email_verified_at: datetime | None
status: str
initial_password: str | None
revealed_at: datetime | None
attempted_at: datetime | None
approval_flags: list[str]
user_id: str = ""
mailu_email: str = ""
def _advisory_lock_id(request_code: str) -> int:
digest = hashlib.sha256(request_code.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big", signed=True)
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""

View File

@ -0,0 +1,179 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import re
from ..db.storage import TaskRunRecord
from .provisioning_protocol import _RETRYABLE_HTTP_CODES, _RETRYABLE_TOKENS
class _ProvisioningTaskMixin:
def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None:
if not tasks:
return
conn.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
SELECT %s, task, 'pending', NULL, NOW()
FROM UNNEST(%s::text[]) AS task
ON CONFLICT (request_code, task) DO NOTHING
""",
(request_code, tasks),
)
def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None:
conn.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (request_code, task)
DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW()
""",
(request_code, task, status, detail),
)
def _task_statuses(self, conn, request_code: str) -> dict[str, str]:
rows = conn.execute(
"SELECT task, status FROM access_request_tasks WHERE request_code = %s",
(request_code,),
).fetchall()
output: dict[str, str] = {}
for row in rows:
task = row.get("task") if isinstance(row, dict) else None
status = row.get("status") if isinstance(row, dict) else None
if isinstance(task, str) and isinstance(status, str):
output[task] = status
return output
def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool:
statuses = self._task_statuses(conn, request_code)
for task in tasks:
if statuses.get(task) != "ok":
return False
return True
def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None:
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
self._record_task_run_metric(task, status, duration_sec)
self._logger.info(
"task run",
extra={
"event": "task_run",
"request_code": request_code,
"task": task,
"status": status,
"duration_sec": round(duration_sec, 3),
"detail": detail or "",
},
)
try:
self._storage.record_event(
"provision_task",
{
"request_code": request_code,
"task": task,
"status": status,
"duration_sec": round(duration_sec, 3),
"detail": detail or "",
},
)
except Exception:
pass
try:
self._storage.record_task_run(
TaskRunRecord(
request_code=request_code,
task=task,
status=status,
detail=detail,
started_at=started,
finished_at=finished,
duration_ms=int(duration_sec * 1000),
)
)
except Exception:
pass
def _task_ok(self, conn, request_code: str, task: str, detail: str | None, started: datetime) -> None:
self._upsert_task(conn, request_code, task, "ok", detail)
self._record_task(request_code, task, "ok", detail, started)
def _task_error(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None:
self._upsert_task(conn, request_code, task, "error", detail)
self._record_task(request_code, task, "error", detail, started)
def _task_pending(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None:
self._upsert_task(conn, request_code, task, "pending", detail)
self._record_task(request_code, task, "pending", detail, started)
def _is_retryable_detail(self, detail: str) -> bool:
if not detail:
return False
detail_lower = detail.lower()
match = re.match(r"^http\s+(\d{3})", detail_lower)
if match:
try:
code = int(match.group(1))
except ValueError:
code = 0
if code in _RETRYABLE_HTTP_CODES:
return True
return any(token in detail_lower for token in _RETRYABLE_TOKENS)
def _retryable_detail(self, detail: str) -> str:
cleaned = detail.strip() if isinstance(detail, str) else ""
if not cleaned:
return "retryable: temporary failure"
return f"retryable: {cleaned}"
def _task_fail(self, conn, request_code: str, task: str, detail: str, started: datetime) -> None:
detail_lower = detail.lower()
if "missing verified email address" in detail_lower or "email not verified" in detail_lower:
self._task_pending(conn, request_code, task, "blocked: email not verified", started)
return
if self._is_retryable_detail(detail):
self._task_pending(conn, request_code, task, self._retryable_detail(detail), started)
return
self._task_error(conn, request_code, task, detail, started)
def _vaultwarden_rate_limit_detail(self) -> tuple[str, datetime]:
retry_at = datetime.now(timezone.utc) + timedelta(
seconds=float(self._settings.vaultwarden_admin_rate_limit_backoff_sec)
)
retry_iso = retry_at.strftime("%Y-%m-%dT%H:%M:%SZ")
return f"rate limited until {retry_iso}", retry_at
@staticmethod
def _parse_retry_at(detail: str) -> datetime | None:
prefix = "rate limited until "
if not isinstance(detail, str) or not detail.startswith(prefix):
return None
ts = detail[len(prefix) :].strip()
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"):
try:
parsed = datetime.strptime(ts, fmt)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed
except ValueError:
continue
return None
def _vaultwarden_retry_due(self, conn, request_code: str) -> bool:
row = conn.execute(
"""
SELECT status, detail
FROM access_request_tasks
WHERE request_code = %s AND task = 'vaultwarden_invite'
""",
(request_code,),
).fetchone()
if not isinstance(row, dict):
return True
if row.get("status") != "pending":
return True
retry_at = self._parse_retry_at(row.get("detail") or "")
if not retry_at:
return True
return datetime.now(timezone.utc) >= retry_at

View File

@ -1,7 +1,6 @@
# path reason # path reason
ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
ariadne/app.py split planned; Flask app bootstrap/routes currently co-located ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
tests/test_services.py test module split planned; broad service contract coverage retained meanwhile tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
tests/test_app.py test module split planned; API coverage retained meanwhile tests/test_app.py test module split planned; API coverage retained meanwhile

1 # path reason
2 ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
3 ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
4 tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
5 tests/test_services.py test module split planned; broad service contract coverage retained meanwhile
6 tests/test_app.py test module split planned; API coverage retained meanwhile