1108 lines
44 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
import hashlib
import hmac
import re
import secrets
import string
from typing import Any
from urllib.parse import quote
from flask import jsonify, request, g, redirect
import psycopg
from .. import ariadne_client
from ..db import connect, configured
from ..keycloak import admin_client, require_auth
from ..mailer import MailerError, access_request_verification_body, send_text_email
from ..rate_limit import rate_limit_allow
from ..provisioning import provision_access_request, provision_tasks_complete
from .. import settings
def _extract_request_payload() -> tuple[str, str, str, str, str]:
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip()
email = (payload.get("email") or "").strip()
note = (payload.get("note") or "").strip()
first_name = (payload.get("first_name") or "").strip()
last_name = (payload.get("last_name") or "").strip()
return username, email, note, first_name, last_name
def _normalize_name(value: str) -> str:
return " ".join(value.strip().split())
def _validate_name(value: str, *, label: str, required: bool) -> str | None:
cleaned = _normalize_name(value)
if not cleaned:
return f"{label} is required" if required else None
if len(cleaned) > 80:
return f"{label} must be 1-80 characters"
if any(ch in "\r\n\t" for ch in cleaned):
return f"{label} contains invalid whitespace"
return None
def _validate_username(username: str) -> str | None:
if not username:
return "username is required"
if len(username) < 3 or len(username) > 32:
return "username must be 3-32 characters"
if not re.fullmatch(r"[a-zA-Z0-9._-]+", username):
return "username contains invalid characters"
return None
def _random_request_code(username: str) -> str:
suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10))
return f"{username}~{suffix}"
def _client_ip() -> str:
xff = (request.headers.get("X-Forwarded-For") or "").strip()
if xff:
return xff.split(",", 1)[0].strip() or "unknown"
x_real_ip = (request.headers.get("X-Real-IP") or "").strip()
if x_real_ip:
return x_real_ip
return request.remote_addr or "unknown"
EMAIL_VERIFY_PENDING_STATUS = "pending_email_verification"
def _hash_verification_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def _verify_url(request_code: str, token: str) -> str:
base = settings.PORTAL_PUBLIC_BASE_URL.rstrip("/")
return f"{base}/api/access/request/verify-link?code={quote(request_code)}&token={quote(token)}"
def _send_verification_email(*, request_code: str, email: str, token: str) -> None:
verify_url = _verify_url(request_code, token)
send_text_email(
to_addr=email,
subject="Atlas: confirm your email",
body=access_request_verification_body(request_code=request_code, verify_url=verify_url),
)
class VerificationError(Exception):
def __init__(self, status_code: int, message: str) -> None:
super().__init__(message)
self.status_code = status_code
self.message = message
def _verify_request(conn, code: str, token: str) -> str:
row = conn.execute(
"""
SELECT status, email_verification_token_hash, email_verification_sent_at, email_verified_at
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
raise VerificationError(404, "not found")
status = _normalize_status(row.get("status") or "")
if status != EMAIL_VERIFY_PENDING_STATUS:
return status
stored_hash = str(row.get("email_verification_token_hash") or "")
if not stored_hash:
raise VerificationError(409, "verification token missing")
provided_hash = _hash_verification_token(token)
if not hmac.compare_digest(stored_hash, provided_hash):
raise VerificationError(401, "invalid token")
sent_at = row.get("email_verification_sent_at")
if isinstance(sent_at, datetime):
now = datetime.now(timezone.utc)
if sent_at.tzinfo is None:
sent_at = sent_at.replace(tzinfo=timezone.utc)
age_sec = (now - sent_at).total_seconds()
if age_sec > settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC:
raise VerificationError(410, "verification token expired")
conn.execute(
"""
UPDATE access_requests
SET status = 'pending',
email_verified_at = NOW(),
email_verification_token_hash = NULL
WHERE request_code = %s AND status = %s
""",
(code, EMAIL_VERIFY_PENDING_STATUS),
)
return "pending"
ONBOARDING_STEPS: tuple[str, ...] = (
"vaultwarden_master_password",
"vaultwarden_browser_extension",
"vaultwarden_mobile_app",
"keycloak_password_rotated",
"element_recovery_key",
"element_recovery_key_stored",
"element_mobile_app",
"mail_client_setup",
"nextcloud_web_access",
"nextcloud_mail_integration",
"nextcloud_desktop_app",
"nextcloud_mobile_app",
"budget_encryption_ack",
"firefly_password_rotated",
"wger_password_rotated",
"jellyfin_web_access",
"jellyfin_mobile_app",
"jellyfin_tv_setup",
)
ONBOARDING_OPTIONAL_STEPS: set[str] = {
"element_mobile_app",
"nextcloud_desktop_app",
"nextcloud_mobile_app",
"jellyfin_web_access",
"jellyfin_mobile_app",
"jellyfin_tv_setup",
}
ONBOARDING_REQUIRED_STEPS: tuple[str, ...] = (
"vaultwarden_master_password",
"vaultwarden_browser_extension",
"vaultwarden_mobile_app",
"keycloak_password_rotated",
"element_recovery_key",
"element_recovery_key_stored",
"mail_client_setup",
"nextcloud_web_access",
"nextcloud_mail_integration",
"budget_encryption_ack",
"firefly_password_rotated",
"wger_password_rotated",
)
KEYCLOAK_MANAGED_STEPS: set[str] = {
"keycloak_password_rotated",
"vaultwarden_master_password",
"nextcloud_mail_integration",
"firefly_password_rotated",
"wger_password_rotated",
}
_KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT = "keycloak_password_rotation_requested_at"
ONBOARDING_STEP_PREREQUISITES: dict[str, set[str]] = {
"vaultwarden_master_password": set(),
"vaultwarden_browser_extension": {"vaultwarden_master_password"},
"vaultwarden_mobile_app": {"vaultwarden_master_password"},
"keycloak_password_rotated": {"vaultwarden_master_password"},
"element_recovery_key": {"keycloak_password_rotated"},
"element_recovery_key_stored": {"element_recovery_key"},
"element_mobile_app": {"element_recovery_key"},
"mail_client_setup": {"vaultwarden_master_password"},
"nextcloud_web_access": {"vaultwarden_master_password"},
"nextcloud_mail_integration": {"nextcloud_web_access"},
"nextcloud_desktop_app": {"nextcloud_web_access"},
"nextcloud_mobile_app": {"nextcloud_web_access"},
"budget_encryption_ack": {"nextcloud_mail_integration"},
"firefly_password_rotated": {"element_recovery_key_stored"},
"wger_password_rotated": {"firefly_password_rotated"},
"jellyfin_web_access": {"vaultwarden_master_password"},
"jellyfin_mobile_app": {"jellyfin_web_access"},
"jellyfin_tv_setup": {"jellyfin_web_access"},
}
_ELEMENT_RECOVERY_ARTIFACT = "element_recovery_key_sha256"
_SHA256_HEX_RE = re.compile(r"^[0-9a-f]{64}$")
_VAULTWARDEN_READY_STATUSES = {"already_present", "active", "ready"}
def _normalize_status(status: str) -> str:
cleaned = (status or "").strip().lower()
if cleaned == "approved":
return "accounts_building"
return cleaned or "unknown"
def _fetch_completed_onboarding_steps(conn, request_code: str) -> set[str]:
rows = conn.execute(
"SELECT step FROM access_request_onboarding_steps WHERE request_code = %s",
(request_code,),
).fetchall()
completed: set[str] = set()
for row in rows:
step = row.get("step") if isinstance(row, dict) else None
if isinstance(step, str) and step:
completed.add(step)
return completed
def _password_rotation_requested(conn, request_code: str) -> bool:
row = conn.execute(
"""
SELECT 1
FROM access_request_onboarding_artifacts
WHERE request_code = %s AND artifact = %s
LIMIT 1
""",
(request_code, _KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT),
).fetchone()
return bool(row)
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _auto_completed_service_steps(attrs: Any) -> set[str]:
completed: set[str] = set()
if not isinstance(attrs, dict):
return completed
vaultwarden_status = _extract_attr(attrs, "vaultwarden_status")
vaultwarden_master = _extract_attr(attrs, "vaultwarden_master_password_set_at")
if vaultwarden_master or vaultwarden_status in _VAULTWARDEN_READY_STATUSES:
completed.add("vaultwarden_master_password")
nextcloud_synced_at = _extract_attr(attrs, "nextcloud_mail_synced_at")
if nextcloud_synced_at:
completed.add("nextcloud_mail_integration")
firefly_rotated_at = _extract_attr(attrs, "firefly_password_rotated_at")
if firefly_rotated_at:
completed.add("firefly_password_rotated")
wger_rotated_at = _extract_attr(attrs, "wger_password_rotated_at")
if wger_rotated_at:
completed.add("wger_password_rotated")
return completed
def _auto_completed_keycloak_steps(conn, request_code: str, username: str) -> set[str]:
if not username:
return set()
if not admin_client().ready():
return set()
if not request_code:
return set()
completed: set[str] = set()
try:
user = admin_client().find_user(username) or {}
user_id = user.get("id") if isinstance(user, dict) else None
if not isinstance(user_id, str) or not user_id:
return set()
full = {}
try:
full = admin_client().get_user(user_id)
except Exception:
full = user if isinstance(user, dict) else {}
attrs = full.get("attributes") if isinstance(full, dict) else {}
completed |= _auto_completed_service_steps(attrs)
actions = full.get("requiredActions")
required_actions: set[str] = set()
actions_list: list[str] = []
if isinstance(actions, list):
actions_list = [a for a in actions if isinstance(a, str)]
required_actions = set(actions_list)
if _password_rotation_requested(conn, request_code) and "UPDATE_PASSWORD" not in required_actions:
completed.add("keycloak_password_rotated")
# Backfill: earlier accounts were created with CONFIGURE_TOTP as a required action,
# which forces users to enroll MFA at first login. We no longer require that, so
# remove it if present.
if "CONFIGURE_TOTP" in required_actions:
try:
admin_client().update_user_safe(
user_id,
{"requiredActions": [a for a in actions_list if a != "CONFIGURE_TOTP"]},
)
except Exception:
pass
except Exception:
return set()
return completed
def _completed_onboarding_steps(conn, request_code: str, username: str) -> set[str]:
completed = _fetch_completed_onboarding_steps(conn, request_code)
return completed | _auto_completed_keycloak_steps(conn, request_code, username)
def _automation_ready(conn, request_code: str, username: str) -> bool:
if not username:
return False
if not admin_client().ready():
return False
# Prefer task-based readiness when we have task rows for the request.
task_row = conn.execute(
"SELECT 1 FROM access_request_tasks WHERE request_code = %s LIMIT 1",
(request_code,),
).fetchone()
if task_row:
return provision_tasks_complete(conn, request_code)
# Fallback for legacy requests: confirm user exists and has a mail app password.
try:
user = admin_client().find_user(username)
if not user:
return False
user_id = user.get("id") if isinstance(user, dict) else None
if not user_id:
return False
full = admin_client().get_user(str(user_id))
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
return False
raw_pw = attrs.get("mailu_app_password")
if isinstance(raw_pw, list):
return bool(raw_pw and isinstance(raw_pw[0], str) and raw_pw[0])
return bool(isinstance(raw_pw, str) and raw_pw)
except Exception:
return False
def _advance_status(conn, request_code: str, username: str, status: str) -> str:
status = _normalize_status(status)
if status == "accounts_building" and _automation_ready(conn, request_code, username):
conn.execute(
"UPDATE access_requests SET status = 'awaiting_onboarding' WHERE request_code = %s AND status = 'accounts_building'",
(request_code,),
)
return "awaiting_onboarding"
if status == "awaiting_onboarding":
completed = _completed_onboarding_steps(conn, request_code, username)
if set(ONBOARDING_REQUIRED_STEPS).issubset(completed):
conn.execute(
"UPDATE access_requests SET status = 'ready' WHERE request_code = %s AND status = 'awaiting_onboarding'",
(request_code,),
)
return "ready"
return status
def _onboarding_payload(conn, request_code: str, username: str) -> dict[str, Any]:
completed_steps = sorted(_completed_onboarding_steps(conn, request_code, username))
password_rotation_requested = _password_rotation_requested(conn, request_code)
return {
"required_steps": list(ONBOARDING_REQUIRED_STEPS),
"optional_steps": sorted(ONBOARDING_OPTIONAL_STEPS),
"completed_steps": completed_steps,
"keycloak": {
"password_rotation_requested": password_rotation_requested,
},
}
def register(app) -> None:
@app.route("/api/access/request/availability", methods=["GET"])
def request_access_availability() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
username = (request.args.get("username") or "").strip()
error = _validate_username(username)
if error:
return jsonify({"available": False, "reason": "invalid", "detail": error})
if admin_client().ready() and admin_client().find_user(username):
return jsonify({"available": False, "reason": "exists", "detail": "username already exists"})
try:
with connect() as conn:
existing = conn.execute(
"""
SELECT status
FROM access_requests
WHERE username = %s
ORDER BY created_at DESC
LIMIT 1
""",
(username,),
).fetchone()
except Exception:
return jsonify({"error": "failed to check availability"}), 502
if existing:
status = str(existing.get("status") or "")
return jsonify(
{
"available": False,
"reason": "requested",
"status": _normalize_status(status),
}
)
return jsonify({"available": True})
@app.route("/api/access/request", methods=["POST"])
def request_access() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
username, email, note, first_name, last_name = _extract_request_payload()
first_name = _normalize_name(first_name)
last_name = _normalize_name(last_name)
rate_key = ip
if username:
rate_key = f"{ip}:{username}"
if not rate_limit_allow(
rate_key,
key="access_request_submit",
limit=settings.ACCESS_REQUEST_SUBMIT_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_SUBMIT_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
username_error = _validate_username(username)
if username_error:
return jsonify({"error": username_error}), 400
name_error = _validate_name(first_name, label="first name", required=False)
if name_error:
return jsonify({"error": name_error}), 400
name_error = _validate_name(last_name, label="last name", required=True)
if name_error:
return jsonify({"error": name_error}), 400
if not email:
return jsonify({"error": "email is required"}), 400
if "@" not in email:
return jsonify({"error": "invalid email"}), 400
email_lower = email.lower()
if email_lower.endswith(f"@{settings.MAILU_DOMAIN.lower()}") and (
email_lower not in settings.ACCESS_REQUEST_INTERNAL_EMAIL_ALLOWLIST
):
return jsonify({"error": "email must be an external address"}), 400
if admin_client().ready() and admin_client().find_user(username):
return jsonify({"error": "username already exists"}), 409
if admin_client().ready() and admin_client().find_user_by_email(email):
return jsonify({"error": "email is already associated with an existing Atlas account"}), 409
try:
with connect() as conn:
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status IN (%s, 'pending')
ORDER BY created_at DESC
LIMIT 1
""",
(username, EMAIL_VERIFY_PENDING_STATUS),
).fetchone()
if existing:
existing_status = str(existing.get("status") or "")
request_code = str(existing.get("request_code") or "")
if existing_status != EMAIL_VERIFY_PENDING_STATUS:
return jsonify({"ok": True, "request_code": request_code, "status": existing_status})
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
conn.execute(
"""
UPDATE access_requests
SET contact_email = %s,
note = %s,
first_name = %s,
last_name = %s,
email_verification_token_hash = %s,
email_verification_sent_at = NOW(),
email_verified_at = NULL
WHERE request_code = %s AND status = %s
""",
(
email,
note or None,
first_name or None,
last_name or None,
token_hash,
request_code,
EMAIL_VERIFY_PENDING_STATUS,
),
)
try:
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return (
jsonify({"error": "failed to send verification email", "request_code": request_code}),
502,
)
return jsonify({"ok": True, "request_code": request_code, "status": EMAIL_VERIFY_PENDING_STATUS})
request_code = _random_request_code(username)
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
try:
conn.execute(
"""
INSERT INTO access_requests
(request_code, username, contact_email, note, first_name, last_name, status,
email_verification_token_hash, email_verification_sent_at)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, NOW())
""",
(
request_code,
username,
email,
note or None,
first_name or None,
last_name or None,
EMAIL_VERIFY_PENDING_STATUS,
token_hash,
),
)
except psycopg.errors.UniqueViolation:
conn.rollback()
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status IN (%s, 'pending')
ORDER BY created_at DESC
LIMIT 1
""",
(username, EMAIL_VERIFY_PENDING_STATUS),
).fetchone()
if not existing:
raise
return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]})
try:
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": request_code}), 502
except Exception:
return jsonify({"error": "failed to submit request"}), 502
return jsonify({"ok": True, "request_code": request_code, "status": EMAIL_VERIFY_PENDING_STATUS})
@app.route("/api/access/request/verify", methods=["POST"])
def request_access_verify() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_verify",
limit=60,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
reveal_initial_password = bool(
payload.get("reveal_initial_password") or payload.get("reveal_password")
)
token = (payload.get("token") or payload.get("verify") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not token:
return jsonify({"error": "token is required"}), 400
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_verify_code",
limit=30,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
status = _verify_request(conn, code, token)
return jsonify({"ok": True, "status": status})
except VerificationError as exc:
return jsonify({"error": exc.message}), exc.status_code
except Exception:
return jsonify({"error": "failed to verify"}), 502
@app.route("/api/access/request/verify-link", methods=["GET"])
def request_access_verify_link() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
code = (request.args.get("code") or "").strip()
token = (request.args.get("token") or "").strip()
if not code or not token:
return redirect(f"/request-access?code={quote(code)}&verify_error=missing+token")
try:
with connect() as conn:
_verify_request(conn, code, token)
return redirect(f"/request-access?code={quote(code)}&verified=1")
except VerificationError as exc:
return redirect(f"/request-access?code={quote(code)}&verify_error={quote(exc.message)}")
except Exception:
return redirect(f"/request-access?code={quote(code)}&verify_error=failed+to+verify")
@app.route("/api/access/request/resend", methods=["POST"])
def request_access_resend() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_resend",
limit=30,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_resend_code",
limit=10,
window_sec=300,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
row = conn.execute(
"""
SELECT status, contact_email
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = _normalize_status(row.get("status") or "")
if status != EMAIL_VERIFY_PENDING_STATUS:
return jsonify({"ok": True, "status": status})
email = str(row.get("contact_email") or "").strip()
if not email:
return jsonify({"error": "missing email"}), 409
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
conn.execute(
"""
UPDATE access_requests
SET email_verification_token_hash = %s,
email_verification_sent_at = NOW()
WHERE request_code = %s AND status = %s
""",
(token_hash, code, EMAIL_VERIFY_PENDING_STATUS),
)
try:
_send_verification_email(request_code=code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": code}), 502
return jsonify({"ok": True, "status": EMAIL_VERIFY_PENDING_STATUS})
except Exception:
return jsonify({"error": "failed to resend verification"}), 502
@app.route("/api/access/request/status", methods=["POST"])
def request_access_status() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_status",
limit=settings.ACCESS_REQUEST_STATUS_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
reveal_initial_password = bool(
payload.get("reveal_initial_password") or payload.get("reveal_password")
)
if not code:
return jsonify({"error": "request_code is required"}), 400
# Additional per-code limiter to avoid global NAT rate-limit blowups.
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_status_code",
limit=max(20, settings.ACCESS_REQUEST_STATUS_RATE_LIMIT),
window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
row = conn.execute(
"""
SELECT status,
username,
initial_password,
initial_password_revealed_at,
email_verified_at
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
current_status = _normalize_status(row.get("status") or "")
if current_status == "accounts_building" and not ariadne_client.enabled():
try:
provision_access_request(code)
except Exception:
pass
row = conn.execute(
"""
SELECT status,
username,
initial_password,
initial_password_revealed_at,
email_verified_at
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = _advance_status(conn, code, row.get("username") or "", row.get("status") or "")
response: dict[str, Any] = {
"ok": True,
"status": status,
"username": row.get("username") or "",
"email_verified": bool(row.get("email_verified_at")),
}
task_rows = conn.execute(
"""
SELECT task, status, detail, updated_at
FROM access_request_tasks
WHERE request_code = %s
ORDER BY task
""",
(code,),
).fetchall()
if task_rows:
tasks: list[dict[str, Any]] = []
blocked = False
for task_row in task_rows:
task_name = task_row.get("task") if isinstance(task_row, dict) else None
task_status = task_row.get("status") if isinstance(task_row, dict) else None
detail = task_row.get("detail") if isinstance(task_row, dict) else None
updated_at = task_row.get("updated_at") if isinstance(task_row, dict) else None
if isinstance(task_status, str) and task_status == "error":
blocked = True
task_payload: dict[str, Any] = {
"task": task_name or "",
"status": task_status or "",
}
if isinstance(detail, str) and detail:
task_payload["detail"] = detail
if isinstance(updated_at, datetime):
task_payload["updated_at"] = updated_at.astimezone(timezone.utc).isoformat()
tasks.append(task_payload)
response["tasks"] = tasks
response["automation_complete"] = provision_tasks_complete(conn, code)
response["blocked"] = blocked
if status in {"awaiting_onboarding", "ready"} and reveal_initial_password:
password = row.get("initial_password")
revealed_at = row.get("initial_password_revealed_at")
if isinstance(password, str) and password:
response["initial_password"] = password
if revealed_at is None:
conn.execute(
"UPDATE access_requests SET initial_password_revealed_at = NOW() WHERE request_code = %s AND initial_password_revealed_at IS NULL",
(code,),
)
if status in {"awaiting_onboarding", "ready"}:
response["onboarding_url"] = f"/onboarding?code={code}"
if status in {"awaiting_onboarding", "ready"}:
response["onboarding"] = _onboarding_payload(conn, code, row.get("username") or "")
return jsonify(response)
except Exception:
return jsonify({"error": "failed to load status"}), 502
@app.route("/api/access/request/onboarding/attest", methods=["POST"])
@require_auth
def request_access_onboarding_attest() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
step = (payload.get("step") or "").strip()
completed = payload.get("completed")
if not code:
return jsonify({"error": "request_code is required"}), 400
if step not in ONBOARDING_STEPS:
return jsonify({"error": "invalid step"}), 400
if step in KEYCLOAK_MANAGED_STEPS:
return jsonify({"error": "step is managed by keycloak"}), 400
username = getattr(g, "keycloak_username", "") or ""
if not username:
return jsonify({"error": "invalid token"}), 401
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
mark_done = True
if isinstance(completed, bool):
mark_done = completed
if mark_done:
if step == "element_recovery_key":
return (
jsonify({"error": "step requires verification"}),
400,
)
prerequisites = ONBOARDING_STEP_PREREQUISITES.get(step, set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
conn.execute(
"""
INSERT INTO access_request_onboarding_steps (request_code, step)
VALUES (%s, %s)
ON CONFLICT (request_code, step) DO NOTHING
""",
(code, step),
)
else:
conn.execute(
"DELETE FROM access_request_onboarding_steps WHERE request_code = %s AND step = %s",
(code, step),
)
if step == "element_recovery_key":
conn.execute(
"DELETE FROM access_request_onboarding_artifacts WHERE request_code = %s AND artifact = %s",
(code, _ELEMENT_RECOVERY_ARTIFACT),
)
# Re-evaluate completion to update request status to ready if applicable.
status = _advance_status(conn, code, username, status)
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to update onboarding"}), 502
return jsonify(
{
"ok": True,
"status": status,
"onboarding": onboarding_payload,
}
)
@app.route("/api/access/request/onboarding/element-recovery", methods=["POST"])
@require_auth
def request_access_onboarding_element_recovery() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
sha256_hex = (payload.get("sha256") or payload.get("sha256_hex") or "").strip().lower()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not sha256_hex:
return jsonify({"error": "sha256 is required"}), 400
if not _SHA256_HEX_RE.fullmatch(sha256_hex):
return jsonify({"error": "invalid sha256"}), 400
username = getattr(g, "keycloak_username", "") or ""
if not username:
return jsonify({"error": "invalid token"}), 401
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
prerequisites = ONBOARDING_STEP_PREREQUISITES.get("element_recovery_key", set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
conn.execute(
"""
INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash)
VALUES (%s, %s, %s)
ON CONFLICT (request_code, artifact) DO UPDATE
SET value_hash = EXCLUDED.value_hash,
created_at = NOW()
""",
(code, _ELEMENT_RECOVERY_ARTIFACT, sha256_hex),
)
conn.execute(
"""
INSERT INTO access_request_onboarding_steps (request_code, step)
VALUES (%s, %s)
ON CONFLICT (request_code, step) DO NOTHING
""",
(code, "element_recovery_key"),
)
status = _advance_status(conn, code, username, status)
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to verify element recovery key"}), 502
return jsonify(
{
"ok": True,
"status": status,
"onboarding": onboarding_payload,
}
)
@app.route("/api/access/request/onboarding/keycloak-password-rotate", methods=["POST"])
@require_auth
def request_access_onboarding_keycloak_password_rotate() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
username = getattr(g, "keycloak_username", "") or ""
if not username:
return jsonify({"error": "invalid token"}), 401
if not admin_client().ready():
return jsonify({"error": "keycloak admin unavailable"}), 503
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
prerequisites = ONBOARDING_STEP_PREREQUISITES.get("keycloak_password_rotated", set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
user = admin_client().find_user(username) or {}
user_id = user.get("id") if isinstance(user, dict) else None
if not isinstance(user_id, str) or not user_id:
return jsonify({"error": "keycloak user not found"}), 409
full = admin_client().get_user(user_id)
actions = full.get("requiredActions")
actions_list: list[str] = []
if isinstance(actions, list):
actions_list = [a for a in actions if isinstance(a, str)]
if "UPDATE_PASSWORD" not in actions_list:
actions_list.append("UPDATE_PASSWORD")
admin_client().update_user_safe(user_id, {"requiredActions": actions_list})
conn.execute(
"""
INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash)
VALUES (%s, %s, NOW()::text)
ON CONFLICT (request_code, artifact) DO NOTHING
""",
(code, _KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT),
)
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to request password rotation"}), 502
return jsonify({"ok": True, "status": status, "onboarding": onboarding_payload})