471 lines
22 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import time
import httpx
from . import settings
from .db import connect
from .keycloak import admin_client
from .nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync
from .provisioning_tasks import (
REQUIRED_PROVISION_TASKS,
all_tasks_ok,
ensure_task_rows,
safe_error_detail,
upsert_task,
)
from .utils import random_password
from .vaultwarden import invite_user
from .firefly_user_sync import trigger as trigger_firefly_user_sync
from .wger_user_sync import trigger as trigger_wger_user_sync
MAILU_EMAIL_ATTR = "mailu_email"
MAILU_APP_PASSWORD_ATTR = "mailu_app_password"
MAILU_ENABLED_ATTR = "mailu_enabled"
WGER_PASSWORD_ATTR = "wger_password"
WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at"
FIREFLY_PASSWORD_ATTR = "firefly_password"
FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at"
@dataclass(frozen=True)
class ProvisionResult:
"""Outcome returned by one provisioning attempt."""
ok: bool
status: str
def _advisory_lock_id(request_code: str) -> int:
"""Derive a stable Postgres advisory lock id from a request code."""
digest = hashlib.sha256(request_code.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big", signed=True)
def provision_tasks_complete(conn, request_code: str) -> bool:
"""Return whether all required provisioning tasks are marked complete."""
return all_tasks_ok(conn, request_code, list(REQUIRED_PROVISION_TASKS))
def provision_access_request(request_code: str) -> ProvisionResult:
"""Provision all downstream accounts required for an approved request.
Args:
request_code: Access request code being provisioned.
Returns:
A ``ProvisionResult`` describing whether provisioning reached a terminal
ready state or still needs another retry.
"""
if not request_code:
return ProvisionResult(ok=False, status="unknown")
if not admin_client().ready():
return ProvisionResult(ok=False, status="accounts_building")
required_tasks = list(REQUIRED_PROVISION_TASKS)
with connect() as conn:
lock_id = _advisory_lock_id(request_code)
lock_row = conn.execute(
"SELECT pg_try_advisory_lock(%s) AS locked",
(lock_id,),
).fetchone()
if not lock_row or not lock_row.get("locked"):
return ProvisionResult(ok=False, status="accounts_building")
try:
row = conn.execute(
"""
SELECT username,
contact_email,
email_verified_at,
status,
initial_password,
initial_password_revealed_at,
provision_attempted_at
FROM access_requests
WHERE request_code = %s
""",
(request_code,),
).fetchone()
if not row:
return ProvisionResult(ok=False, status="unknown")
username = str(row.get("username") or "")
contact_email = str(row.get("contact_email") or "")
email_verified_at = row.get("email_verified_at")
status = str(row.get("status") or "")
initial_password = row.get("initial_password")
revealed_at = row.get("initial_password_revealed_at")
attempted_at = row.get("provision_attempted_at")
if status == "approved":
conn.execute(
"UPDATE access_requests SET status = 'accounts_building' WHERE request_code = %s AND status = 'approved'",
(request_code,),
)
status = "accounts_building"
if status not in {"accounts_building", "awaiting_onboarding", "ready"}:
return ProvisionResult(ok=False, status=status or "unknown")
ensure_task_rows(conn, request_code, required_tasks)
if status == "accounts_building":
now = datetime.now(timezone.utc)
if isinstance(attempted_at, datetime):
if attempted_at.tzinfo is None:
attempted_at = attempted_at.replace(tzinfo=timezone.utc)
age_sec = (now - attempted_at).total_seconds()
if age_sec < settings.ACCESS_REQUEST_PROVISION_RETRY_COOLDOWN_SEC:
return ProvisionResult(ok=False, status="accounts_building")
conn.execute(
"UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s",
(request_code,),
)
user_id = ""
mailu_email = f"{username}@{settings.MAILU_DOMAIN}"
# Task: ensure Keycloak user exists
try:
user = admin_client().find_user(username)
if not user:
email = contact_email.strip()
if not email:
raise RuntimeError("missing verified email address")
existing_email_user = admin_client().find_user_by_email(email)
if existing_email_user and (existing_email_user.get("username") or "") != username:
raise RuntimeError("email is already associated with an existing Atlas account")
# The portal already verified the external contact email before approval,
# so mark it as verified in Keycloak.
#
# Do not force password rotation on first login: the onboarding flow
# intentionally guides users through Vaultwarden first, then triggers a
# Keycloak password change step later.
#
# Do not force MFA enrollment during initial login: users can opt into MFA
# later.
required_actions: list[str] = []
payload = {
"username": username,
"enabled": True,
"email": email,
"emailVerified": True,
"requiredActions": required_actions,
"attributes": {
MAILU_EMAIL_ATTR: [mailu_email],
MAILU_ENABLED_ATTR: ["true"],
},
}
created_id = admin_client().create_user(payload)
user = admin_client().get_user(created_id)
user_id = str((user or {}).get("id") or "")
if not user_id:
raise RuntimeError("user id missing")
try:
full = admin_client().get_user(user_id)
attrs = full.get("attributes") or {}
actions = full.get("requiredActions")
if isinstance(actions, list) and "CONFIGURE_TOTP" in actions:
# Backfill earlier accounts created when we forced MFA enrollment.
new_actions = [a for a in actions if a != "CONFIGURE_TOTP"]
admin_client().update_user_safe(user_id, {"requiredActions": new_actions})
mailu_from_attr: str | None = None
if isinstance(attrs, dict):
raw_mailu = attrs.get(MAILU_EMAIL_ATTR)
if isinstance(raw_mailu, list):
for item in raw_mailu:
if isinstance(item, str) and item.strip():
mailu_from_attr = item.strip()
break
elif isinstance(raw_mailu, str) and raw_mailu.strip():
mailu_from_attr = raw_mailu.strip()
if mailu_from_attr:
mailu_email = mailu_from_attr
else:
mailu_email = f"{username}@{settings.MAILU_DOMAIN}"
admin_client().set_user_attribute(username, MAILU_EMAIL_ATTR, mailu_email)
try:
raw_enabled = attrs.get(MAILU_ENABLED_ATTR) if isinstance(attrs, dict) else None
enabled_value = ""
if isinstance(raw_enabled, list) and raw_enabled:
enabled_value = str(raw_enabled[0]).strip()
elif isinstance(raw_enabled, str):
enabled_value = raw_enabled.strip()
if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}:
admin_client().set_user_attribute(username, MAILU_ENABLED_ATTR, "true")
except Exception:
pass
except Exception:
mailu_email = f"{username}@{settings.MAILU_DOMAIN}"
upsert_task(conn, request_code, "keycloak_user", "ok", None)
except Exception as exc:
upsert_task(conn, request_code, "keycloak_user", "error", safe_error_detail(exc, "failed to ensure user"))
if not user_id:
return ProvisionResult(ok=False, status="accounts_building")
# Task: set initial password and store it for "show once" onboarding.
try:
if not user_id:
raise RuntimeError("missing user id")
should_reset = status == "accounts_building" and revealed_at is None
password_value: str | None = None
if should_reset:
if isinstance(initial_password, str) and initial_password:
password_value = initial_password
elif initial_password is None:
password_value = random_password(20)
conn.execute(
"""
UPDATE access_requests
SET initial_password = %s
WHERE request_code = %s AND initial_password IS NULL
""",
(password_value, request_code),
)
initial_password = password_value
if password_value:
admin_client().reset_password(user_id, password_value, temporary=False)
if isinstance(initial_password, str) and initial_password:
upsert_task(conn, request_code, "keycloak_password", "ok", None)
elif revealed_at is not None:
upsert_task(conn, request_code, "keycloak_password", "ok", "initial password already revealed")
else:
raise RuntimeError("initial password missing")
except Exception as exc:
upsert_task(conn, request_code, "keycloak_password", "error", safe_error_detail(exc, "failed to set password"))
# Task: group membership (default dev)
try:
if not user_id:
raise RuntimeError("missing user id")
groups = settings.DEFAULT_USER_GROUPS or ["dev"]
for group_name in groups:
gid = admin_client().get_group_id(group_name)
if not gid:
raise RuntimeError("group missing")
admin_client().add_user_to_group(user_id, gid)
upsert_task(conn, request_code, "keycloak_groups", "ok", None)
except Exception as exc:
upsert_task(conn, request_code, "keycloak_groups", "error", safe_error_detail(exc, "failed to add groups"))
# Task: ensure mailu_app_password attribute exists
try:
if not user_id:
raise RuntimeError("missing user id")
full = admin_client().get_user(user_id)
attrs = full.get("attributes") or {}
existing = None
if isinstance(attrs, dict):
raw = attrs.get(MAILU_APP_PASSWORD_ATTR)
if isinstance(raw, list) and raw and isinstance(raw[0], str):
existing = raw[0]
elif isinstance(raw, str) and raw:
existing = raw
if not existing:
admin_client().set_user_attribute(username, MAILU_APP_PASSWORD_ATTR, random_password())
upsert_task(conn, request_code, "mailu_app_password", "ok", None)
except Exception as exc:
upsert_task(conn, request_code, "mailu_app_password", "error", safe_error_detail(exc, "failed to set mail password"))
# Task: trigger Mailu sync if configured
try:
if not settings.MAILU_SYNC_URL:
upsert_task(conn, request_code, "mailu_sync", "ok", "sync disabled")
else:
with httpx.Client(timeout=30) as client:
resp = client.post(
settings.MAILU_SYNC_URL,
json={"ts": int(time.time()), "wait": True, "reason": "portal_access_approve"},
)
if resp.status_code != 200:
raise RuntimeError("mailu sync failed")
upsert_task(conn, request_code, "mailu_sync", "ok", None)
except Exception as exc:
upsert_task(conn, request_code, "mailu_sync", "error", safe_error_detail(exc, "failed to sync mailu"))
# Task: trigger Nextcloud mail sync if configured
try:
if not settings.NEXTCLOUD_NAMESPACE or not settings.NEXTCLOUD_MAIL_SYNC_CRONJOB:
upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", "sync disabled")
else:
result = trigger_nextcloud_mail_sync(username, wait=True)
if isinstance(result, dict) and result.get("status") == "ok":
upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", None)
else:
status_val = result.get("status") if isinstance(result, dict) else "error"
upsert_task(conn, request_code, "nextcloud_mail_sync", "error", str(status_val))
except Exception as exc:
upsert_task(conn, request_code, "nextcloud_mail_sync", "error", safe_error_detail(exc, "failed to sync nextcloud"))
# Task: ensure wger account exists
try:
if not user_id:
raise RuntimeError("missing user id")
full = admin_client().get_user(user_id)
attrs = full.get("attributes") or {}
wger_password = ""
wger_password_updated_at = ""
if isinstance(attrs, dict):
raw_pw = attrs.get(WGER_PASSWORD_ATTR)
if isinstance(raw_pw, list) and raw_pw and isinstance(raw_pw[0], str):
wger_password = raw_pw[0]
elif isinstance(raw_pw, str) and raw_pw:
wger_password = raw_pw
raw_updated = attrs.get(WGER_PASSWORD_UPDATED_ATTR)
if isinstance(raw_updated, list) and raw_updated and isinstance(raw_updated[0], str):
wger_password_updated_at = raw_updated[0]
elif isinstance(raw_updated, str) and raw_updated:
wger_password_updated_at = raw_updated
if not wger_password:
wger_password = random_password(20)
admin_client().set_user_attribute(username, WGER_PASSWORD_ATTR, wger_password)
wger_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}"
if not wger_password_updated_at:
result = trigger_wger_user_sync(username, wger_email, wger_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"wger sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
admin_client().set_user_attribute(username, WGER_PASSWORD_UPDATED_ATTR, now_iso)
upsert_task(conn, request_code, "wger_account", "ok", None)
except Exception as exc:
upsert_task(conn, request_code, "wger_account", "error", safe_error_detail(exc, "failed to provision wger"))
# Task: ensure firefly account exists
try:
if not user_id:
raise RuntimeError("missing user id")
full = admin_client().get_user(user_id)
attrs = full.get("attributes") or {}
firefly_password = ""
firefly_password_updated_at = ""
if isinstance(attrs, dict):
raw_pw = attrs.get(FIREFLY_PASSWORD_ATTR)
if isinstance(raw_pw, list) and raw_pw and isinstance(raw_pw[0], str):
firefly_password = raw_pw[0]
elif isinstance(raw_pw, str) and raw_pw:
firefly_password = raw_pw
raw_updated = attrs.get(FIREFLY_PASSWORD_UPDATED_ATTR)
if isinstance(raw_updated, list) and raw_updated and isinstance(raw_updated[0], str):
firefly_password_updated_at = raw_updated[0]
elif isinstance(raw_updated, str) and raw_updated:
firefly_password_updated_at = raw_updated
if not firefly_password:
firefly_password = random_password(24)
admin_client().set_user_attribute(username, FIREFLY_PASSWORD_ATTR, firefly_password)
firefly_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}"
if not firefly_password_updated_at:
result = trigger_firefly_user_sync(username, firefly_email, firefly_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
admin_client().set_user_attribute(username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso)
upsert_task(conn, request_code, "firefly_account", "ok", None)
except Exception as exc:
upsert_task(
conn,
request_code,
"firefly_account",
"error",
safe_error_detail(exc, "failed to provision firefly"),
)
# Task: ensure Vaultwarden account exists (invite flow)
try:
if not user_id:
raise RuntimeError("missing user id")
vaultwarden_email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}"
try:
full = admin_client().get_user(user_id)
attrs = full.get("attributes") or {}
override = None
if isinstance(attrs, dict):
raw = attrs.get("vaultwarden_email")
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
override = item.strip()
break
elif isinstance(raw, str) and raw.strip():
override = raw.strip()
if override:
vaultwarden_email = override
except Exception:
pass
result = invite_user(vaultwarden_email)
if not result.ok and result.status == "error":
fallback_email = contact_email.strip()
if fallback_email and fallback_email != vaultwarden_email:
fallback_result = invite_user(fallback_email)
if fallback_result.ok:
vaultwarden_email = fallback_email
result = fallback_result
if result.ok:
upsert_task(conn, request_code, "vaultwarden_invite", "ok", result.status)
else:
upsert_task(conn, request_code, "vaultwarden_invite", "error", result.detail or result.status)
# Persist Vaultwarden association/status on the Keycloak user so the portal can display it quickly.
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
admin_client().set_user_attribute(username, "vaultwarden_email", vaultwarden_email)
admin_client().set_user_attribute(username, "vaultwarden_status", result.status)
admin_client().set_user_attribute(username, "vaultwarden_synced_at", now_iso)
except Exception:
pass
except Exception as exc:
upsert_task(
conn,
request_code,
"vaultwarden_invite",
"error",
safe_error_detail(exc, "failed to provision vaultwarden"),
)
if all_tasks_ok(conn, request_code, required_tasks):
conn.execute(
"""
UPDATE access_requests
SET status = 'awaiting_onboarding'
WHERE request_code = %s AND status = 'accounts_building'
""",
(request_code,),
)
return ProvisionResult(ok=True, status="awaiting_onboarding")
return ProvisionResult(ok=False, status="accounts_building")
finally:
conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))