1119 lines
44 KiB
Python
Raw Normal View History

from __future__ import annotations
from datetime import datetime, timezone
import hashlib
import hmac
import re
import secrets
import string
from typing import Any
from urllib.parse import quote
2026-01-21 20:44:07 -03:00
from flask import jsonify, request, g, redirect
import psycopg
2026-01-19 19:21:22 -03:00
from .. import ariadne_client
from ..db import connect, configured
from ..keycloak import admin_client, oidc_client, require_auth
from ..mailer import MailerError, access_request_verification_body, send_text_email
from ..rate_limit import rate_limit_allow
2026-01-03 04:08:13 -03:00
from ..provisioning import provision_access_request, provision_tasks_complete
from .. import settings
def _extract_request_payload() -> tuple[str, str, str, str, str]:
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip()
email = (payload.get("email") or "").strip()
note = (payload.get("note") or "").strip()
first_name = (payload.get("first_name") or "").strip()
last_name = (payload.get("last_name") or "").strip()
return username, email, note, first_name, last_name
def _normalize_name(value: str) -> str:
return " ".join(value.strip().split())
def _validate_name(value: str, *, label: str, required: bool) -> str | None:
cleaned = _normalize_name(value)
if not cleaned:
return f"{label} is required" if required else None
if len(cleaned) > 80:
return f"{label} must be 1-80 characters"
if any(ch in "\r\n\t" for ch in cleaned):
return f"{label} contains invalid whitespace"
return None
def _validate_username(username: str) -> str | None:
if not username:
return "username is required"
if len(username) < 3 or len(username) > 32:
return "username must be 3-32 characters"
if not re.fullmatch(r"[a-zA-Z0-9._-]+", username):
return "username contains invalid characters"
return None
def _random_request_code(username: str) -> str:
suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10))
return f"{username}~{suffix}"
def _client_ip() -> str:
xff = (request.headers.get("X-Forwarded-For") or "").strip()
if xff:
return xff.split(",", 1)[0].strip() or "unknown"
x_real_ip = (request.headers.get("X-Real-IP") or "").strip()
if x_real_ip:
return x_real_ip
return request.remote_addr or "unknown"
EMAIL_VERIFY_PENDING_STATUS = "pending_email_verification"
def _hash_verification_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def _verify_url(request_code: str, token: str) -> str:
base = settings.PORTAL_PUBLIC_BASE_URL.rstrip("/")
2026-01-21 20:44:07 -03:00
return f"{base}/api/access/request/verify-link?code={quote(request_code)}&token={quote(token)}"
2026-01-21 20:18:48 -03:00
def _send_verification_email(*, request_code: str, email: str, token: str) -> None:
verify_url = _verify_url(request_code, token)
send_text_email(
to_addr=email,
subject="Atlas: confirm your email",
body=access_request_verification_body(request_code=request_code, verify_url=verify_url),
)
2026-01-21 20:44:07 -03:00
class VerificationError(Exception):
def __init__(self, status_code: int, message: str) -> None:
super().__init__(message)
self.status_code = status_code
self.message = message
def _verify_request(conn, code: str, token: str) -> str:
row = conn.execute(
"""
SELECT status, email_verification_token_hash, email_verification_sent_at, email_verified_at
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
raise VerificationError(404, "not found")
status = _normalize_status(row.get("status") or "")
if status != EMAIL_VERIFY_PENDING_STATUS:
return status
stored_hash = str(row.get("email_verification_token_hash") or "")
if not stored_hash:
raise VerificationError(409, "verification token missing")
provided_hash = _hash_verification_token(token)
if not hmac.compare_digest(stored_hash, provided_hash):
raise VerificationError(401, "invalid token")
sent_at = row.get("email_verification_sent_at")
if isinstance(sent_at, datetime):
now = datetime.now(timezone.utc)
if sent_at.tzinfo is None:
sent_at = sent_at.replace(tzinfo=timezone.utc)
age_sec = (now - sent_at).total_seconds()
if age_sec > settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC:
raise VerificationError(410, "verification token expired")
conn.execute(
"""
UPDATE access_requests
SET status = 'pending',
email_verified_at = NOW(),
email_verification_token_hash = NULL
WHERE request_code = %s AND status = %s
""",
(code, EMAIL_VERIFY_PENDING_STATUS),
)
return "pending"
ONBOARDING_STEPS: tuple[str, ...] = (
"vaultwarden_master_password",
"vaultwarden_browser_extension",
"vaultwarden_mobile_app",
"keycloak_password_rotated",
"element_recovery_key",
"element_recovery_key_stored",
"element_mobile_app",
"mail_client_setup",
"nextcloud_web_access",
"nextcloud_mail_integration",
"nextcloud_desktop_app",
"nextcloud_mobile_app",
"budget_encryption_ack",
"firefly_password_rotated",
"wger_password_rotated",
"jellyfin_web_access",
"jellyfin_mobile_app",
"jellyfin_tv_setup",
)
ONBOARDING_OPTIONAL_STEPS: set[str] = {
"element_mobile_app",
"nextcloud_desktop_app",
"nextcloud_mobile_app",
"jellyfin_web_access",
"jellyfin_mobile_app",
"jellyfin_tv_setup",
}
ONBOARDING_REQUIRED_STEPS: tuple[str, ...] = (
"vaultwarden_master_password",
"vaultwarden_browser_extension",
"vaultwarden_mobile_app",
"keycloak_password_rotated",
"element_recovery_key",
"element_recovery_key_stored",
"mail_client_setup",
"nextcloud_web_access",
"nextcloud_mail_integration",
"budget_encryption_ack",
"firefly_password_rotated",
"wger_password_rotated",
2026-01-04 21:57:31 -03:00
)
KEYCLOAK_MANAGED_STEPS: set[str] = {
"keycloak_password_rotated",
"nextcloud_mail_integration",
"firefly_password_rotated",
"wger_password_rotated",
}
_KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT = "keycloak_password_rotation_requested_at"
ONBOARDING_STEP_PREREQUISITES: dict[str, set[str]] = {
"vaultwarden_master_password": set(),
"vaultwarden_browser_extension": {"vaultwarden_master_password"},
"vaultwarden_mobile_app": {"vaultwarden_master_password"},
"keycloak_password_rotated": {"vaultwarden_master_password"},
"element_recovery_key": {"keycloak_password_rotated"},
"element_recovery_key_stored": {"element_recovery_key"},
"element_mobile_app": {"element_recovery_key"},
"mail_client_setup": {"vaultwarden_master_password"},
"nextcloud_web_access": {"vaultwarden_master_password"},
"nextcloud_mail_integration": {"nextcloud_web_access"},
"nextcloud_desktop_app": {"nextcloud_web_access"},
"nextcloud_mobile_app": {"nextcloud_web_access"},
"budget_encryption_ack": {"nextcloud_mail_integration"},
"firefly_password_rotated": {"element_recovery_key_stored"},
"wger_password_rotated": {"firefly_password_rotated"},
"jellyfin_web_access": {"vaultwarden_master_password"},
"jellyfin_mobile_app": {"jellyfin_web_access"},
"jellyfin_tv_setup": {"jellyfin_web_access"},
}
_ELEMENT_RECOVERY_ARTIFACT = "element_recovery_key_sha256"
_SHA256_HEX_RE = re.compile(r"^[0-9a-f]{64}$")
_VAULTWARDEN_READY_STATUSES = {"already_present", "active", "ready"}
def _normalize_status(status: str) -> str:
cleaned = (status or "").strip().lower()
if cleaned == "approved":
return "accounts_building"
return cleaned or "unknown"
def _fetch_completed_onboarding_steps(conn, request_code: str) -> set[str]:
rows = conn.execute(
"SELECT step FROM access_request_onboarding_steps WHERE request_code = %s",
(request_code,),
).fetchall()
completed: set[str] = set()
for row in rows:
step = row.get("step") if isinstance(row, dict) else None
if isinstance(step, str) and step:
completed.add(step)
return completed
def _password_rotation_requested(conn, request_code: str) -> bool:
row = conn.execute(
"""
SELECT 1
FROM access_request_onboarding_artifacts
WHERE request_code = %s AND artifact = %s
LIMIT 1
""",
(request_code, _KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT),
).fetchone()
return bool(row)
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _auto_completed_service_steps(attrs: Any) -> set[str]:
completed: set[str] = set()
if not isinstance(attrs, dict):
return completed
vaultwarden_status = _extract_attr(attrs, "vaultwarden_status")
vaultwarden_master = _extract_attr(attrs, "vaultwarden_master_password_set_at")
if vaultwarden_master or vaultwarden_status in _VAULTWARDEN_READY_STATUSES:
completed.add("vaultwarden_master_password")
nextcloud_synced_at = _extract_attr(attrs, "nextcloud_mail_synced_at")
if nextcloud_synced_at:
completed.add("nextcloud_mail_integration")
firefly_rotated_at = _extract_attr(attrs, "firefly_password_rotated_at")
if firefly_rotated_at:
completed.add("firefly_password_rotated")
wger_rotated_at = _extract_attr(attrs, "wger_password_rotated_at")
if wger_rotated_at:
completed.add("wger_password_rotated")
return completed
def _auto_completed_keycloak_steps(conn, request_code: str, username: str) -> set[str]:
if not username:
return set()
if not admin_client().ready():
return set()
if not request_code:
return set()
completed: set[str] = set()
try:
user = admin_client().find_user(username) or {}
user_id = user.get("id") if isinstance(user, dict) else None
if not isinstance(user_id, str) or not user_id:
return set()
full = {}
try:
full = admin_client().get_user(user_id)
except Exception:
full = user if isinstance(user, dict) else {}
attrs = full.get("attributes") if isinstance(full, dict) else {}
completed |= _auto_completed_service_steps(attrs)
actions = full.get("requiredActions")
required_actions: set[str] = set()
actions_list: list[str] = []
if isinstance(actions, list):
actions_list = [a for a in actions if isinstance(a, str)]
required_actions = set(actions_list)
if _password_rotation_requested(conn, request_code) and "UPDATE_PASSWORD" not in required_actions:
completed.add("keycloak_password_rotated")
# Backfill: earlier accounts were created with CONFIGURE_TOTP as a required action,
# which forces users to enroll MFA at first login. We no longer require that, so
# remove it if present.
if "CONFIGURE_TOTP" in required_actions:
try:
2026-01-20 03:58:34 -03:00
admin_client().update_user_safe(
user_id,
{"requiredActions": [a for a in actions_list if a != "CONFIGURE_TOTP"]},
)
except Exception:
pass
except Exception:
return set()
return completed
def _completed_onboarding_steps(conn, request_code: str, username: str) -> set[str]:
completed = _fetch_completed_onboarding_steps(conn, request_code)
return completed | _auto_completed_keycloak_steps(conn, request_code, username)
def _automation_ready(conn, request_code: str, username: str) -> bool:
if not username:
return False
if not admin_client().ready():
return False
# Prefer task-based readiness when we have task rows for the request.
task_row = conn.execute(
"SELECT 1 FROM access_request_tasks WHERE request_code = %s LIMIT 1",
(request_code,),
).fetchone()
if task_row:
return provision_tasks_complete(conn, request_code)
# Fallback for legacy requests: confirm user exists and has a mail app password.
try:
user = admin_client().find_user(username)
if not user:
return False
user_id = user.get("id") if isinstance(user, dict) else None
if not user_id:
return False
full = admin_client().get_user(str(user_id))
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
return False
raw_pw = attrs.get("mailu_app_password")
if isinstance(raw_pw, list):
return bool(raw_pw and isinstance(raw_pw[0], str) and raw_pw[0])
return bool(isinstance(raw_pw, str) and raw_pw)
except Exception:
return False
def _advance_status(conn, request_code: str, username: str, status: str) -> str:
status = _normalize_status(status)
if status == "accounts_building" and _automation_ready(conn, request_code, username):
conn.execute(
"UPDATE access_requests SET status = 'awaiting_onboarding' WHERE request_code = %s AND status = 'accounts_building'",
(request_code,),
)
return "awaiting_onboarding"
if status == "awaiting_onboarding":
completed = _completed_onboarding_steps(conn, request_code, username)
2026-01-04 21:57:31 -03:00
if set(ONBOARDING_REQUIRED_STEPS).issubset(completed):
conn.execute(
"UPDATE access_requests SET status = 'ready' WHERE request_code = %s AND status = 'awaiting_onboarding'",
(request_code,),
)
return "ready"
return status
2026-01-04 21:57:31 -03:00
def _onboarding_payload(conn, request_code: str, username: str) -> dict[str, Any]:
completed_steps = sorted(_completed_onboarding_steps(conn, request_code, username))
password_rotation_requested = _password_rotation_requested(conn, request_code)
2026-01-04 21:57:31 -03:00
return {
"required_steps": list(ONBOARDING_REQUIRED_STEPS),
"optional_steps": sorted(ONBOARDING_OPTIONAL_STEPS),
"completed_steps": completed_steps,
"keycloak": {
"password_rotation_requested": password_rotation_requested,
},
2026-01-04 21:57:31 -03:00
}
def register(app) -> None:
@app.route("/api/access/request/availability", methods=["GET"])
def request_access_availability() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
username = (request.args.get("username") or "").strip()
error = _validate_username(username)
if error:
return jsonify({"available": False, "reason": "invalid", "detail": error})
if admin_client().ready() and admin_client().find_user(username):
return jsonify({"available": False, "reason": "exists", "detail": "username already exists"})
try:
with connect() as conn:
existing = conn.execute(
"""
SELECT status
FROM access_requests
WHERE username = %s
ORDER BY created_at DESC
LIMIT 1
""",
(username,),
).fetchone()
except Exception:
return jsonify({"error": "failed to check availability"}), 502
if existing:
status = str(existing.get("status") or "")
return jsonify(
{
"available": False,
"reason": "requested",
"status": _normalize_status(status),
}
)
return jsonify({"available": True})
@app.route("/api/access/request", methods=["POST"])
def request_access() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
username, email, note, first_name, last_name = _extract_request_payload()
first_name = _normalize_name(first_name)
last_name = _normalize_name(last_name)
rate_key = ip
if username:
rate_key = f"{ip}:{username}"
2026-01-02 01:34:18 -03:00
if not rate_limit_allow(
rate_key,
2026-01-02 01:34:18 -03:00
key="access_request_submit",
limit=settings.ACCESS_REQUEST_SUBMIT_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_SUBMIT_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
username_error = _validate_username(username)
if username_error:
return jsonify({"error": username_error}), 400
name_error = _validate_name(first_name, label="first name", required=False)
if name_error:
return jsonify({"error": name_error}), 400
name_error = _validate_name(last_name, label="last name", required=True)
if name_error:
return jsonify({"error": name_error}), 400
if not email:
return jsonify({"error": "email is required"}), 400
if "@" not in email:
return jsonify({"error": "invalid email"}), 400
email_lower = email.lower()
if email_lower.endswith(f"@{settings.MAILU_DOMAIN.lower()}") and (
email_lower not in settings.ACCESS_REQUEST_INTERNAL_EMAIL_ALLOWLIST
):
return jsonify({"error": "email must be an external address"}), 400
if admin_client().ready() and admin_client().find_user(username):
return jsonify({"error": "username already exists"}), 409
if admin_client().ready() and admin_client().find_user_by_email(email):
return jsonify({"error": "email is already associated with an existing Atlas account"}), 409
try:
with connect() as conn:
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status IN (%s, 'pending')
ORDER BY created_at DESC
LIMIT 1
""",
(username, EMAIL_VERIFY_PENDING_STATUS),
).fetchone()
if existing:
existing_status = str(existing.get("status") or "")
request_code = str(existing.get("request_code") or "")
if existing_status != EMAIL_VERIFY_PENDING_STATUS:
return jsonify({"ok": True, "request_code": request_code, "status": existing_status})
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
conn.execute(
"""
UPDATE access_requests
SET contact_email = %s,
note = %s,
first_name = %s,
last_name = %s,
email_verification_token_hash = %s,
email_verification_sent_at = NOW(),
email_verified_at = NULL
WHERE request_code = %s AND status = %s
""",
(
email,
note or None,
first_name or None,
last_name or None,
token_hash,
request_code,
EMAIL_VERIFY_PENDING_STATUS,
),
)
try:
2026-01-21 20:18:48 -03:00
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return (
jsonify({"error": "failed to send verification email", "request_code": request_code}),
502,
)
return jsonify({"ok": True, "request_code": request_code, "status": EMAIL_VERIFY_PENDING_STATUS})
request_code = _random_request_code(username)
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
try:
conn.execute(
"""
INSERT INTO access_requests
(request_code, username, contact_email, note, first_name, last_name, status,
email_verification_token_hash, email_verification_sent_at)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, NOW())
""",
(
request_code,
username,
email,
note or None,
first_name or None,
last_name or None,
EMAIL_VERIFY_PENDING_STATUS,
token_hash,
),
)
except psycopg.errors.UniqueViolation:
conn.rollback()
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status IN (%s, 'pending')
ORDER BY created_at DESC
LIMIT 1
""",
(username, EMAIL_VERIFY_PENDING_STATUS),
).fetchone()
if not existing:
raise
return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]})
try:
2026-01-21 20:18:48 -03:00
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": request_code}), 502
except Exception:
return jsonify({"error": "failed to submit request"}), 502
return jsonify({"ok": True, "request_code": request_code, "status": EMAIL_VERIFY_PENDING_STATUS})
@app.route("/api/access/request/verify", methods=["POST"])
def request_access_verify() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_verify",
limit=60,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
reveal_initial_password = bool(
payload.get("reveal_initial_password") or payload.get("reveal_password")
)
token = (payload.get("token") or payload.get("verify") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not token:
return jsonify({"error": "token is required"}), 400
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_verify_code",
limit=30,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
2026-01-21 20:44:07 -03:00
status = _verify_request(conn, code, token)
return jsonify({"ok": True, "status": status})
except VerificationError as exc:
return jsonify({"error": exc.message}), exc.status_code
except Exception:
return jsonify({"error": "failed to verify"}), 502
2026-01-21 20:44:07 -03:00
@app.route("/api/access/request/verify-link", methods=["GET"])
def request_access_verify_link() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
code = (request.args.get("code") or "").strip()
token = (request.args.get("token") or "").strip()
if not code or not token:
return redirect(f"/request-access?code={quote(code)}&verify_error=missing+token")
try:
with connect() as conn:
_verify_request(conn, code, token)
return redirect(f"/request-access?code={quote(code)}&verified=1")
except VerificationError as exc:
return redirect(f"/request-access?code={quote(code)}&verify_error={quote(exc.message)}")
except Exception:
return redirect(f"/request-access?code={quote(code)}&verify_error=failed+to+verify")
2026-01-21 20:18:48 -03:00
@app.route("/api/access/request/resend", methods=["POST"])
def request_access_resend() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_resend",
limit=30,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_resend_code",
limit=10,
window_sec=300,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
row = conn.execute(
"""
SELECT status, contact_email
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = _normalize_status(row.get("status") or "")
if status != EMAIL_VERIFY_PENDING_STATUS:
return jsonify({"ok": True, "status": status})
email = str(row.get("contact_email") or "").strip()
if not email:
return jsonify({"error": "missing email"}), 409
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
conn.execute(
"""
UPDATE access_requests
SET email_verification_token_hash = %s,
email_verification_sent_at = NOW()
WHERE request_code = %s AND status = %s
""",
(token_hash, code, EMAIL_VERIFY_PENDING_STATUS),
)
try:
_send_verification_email(request_code=code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": code}), 502
return jsonify({"ok": True, "status": EMAIL_VERIFY_PENDING_STATUS})
except Exception:
return jsonify({"error": "failed to resend verification"}), 502
@app.route("/api/access/request/status", methods=["POST"])
def request_access_status() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
2026-01-02 01:34:18 -03:00
if not rate_limit_allow(
ip,
key="access_request_status",
limit=settings.ACCESS_REQUEST_STATUS_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
reveal_initial_password = bool(
payload.get("reveal_initial_password") or payload.get("reveal_password")
)
if not code:
return jsonify({"error": "request_code is required"}), 400
# Additional per-code limiter to avoid global NAT rate-limit blowups.
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_status_code",
limit=max(20, settings.ACCESS_REQUEST_STATUS_RATE_LIMIT),
window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
row = conn.execute(
2026-01-21 21:02:39 -03:00
"""
SELECT status,
username,
initial_password,
initial_password_revealed_at,
email_verified_at
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
2026-01-03 04:08:13 -03:00
current_status = _normalize_status(row.get("status") or "")
2026-01-19 19:21:22 -03:00
if current_status == "accounts_building" and not ariadne_client.enabled():
2026-01-03 04:08:13 -03:00
try:
provision_access_request(code)
except Exception:
pass
row = conn.execute(
2026-01-21 21:02:39 -03:00
"""
SELECT status,
username,
initial_password,
initial_password_revealed_at,
email_verified_at
FROM access_requests
WHERE request_code = %s
""",
2026-01-03 04:08:13 -03:00
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = _advance_status(conn, code, row.get("username") or "", row.get("status") or "")
2026-01-02 01:34:18 -03:00
response: dict[str, Any] = {
"ok": True,
"status": status,
"username": row.get("username") or "",
2026-01-21 21:02:39 -03:00
"email_verified": bool(row.get("email_verified_at")),
2026-01-02 01:34:18 -03:00
}
task_rows = conn.execute(
"""
SELECT task, status, detail, updated_at
FROM access_request_tasks
WHERE request_code = %s
ORDER BY task
""",
(code,),
).fetchall()
if task_rows:
tasks: list[dict[str, Any]] = []
blocked = False
for task_row in task_rows:
task_name = task_row.get("task") if isinstance(task_row, dict) else None
task_status = task_row.get("status") if isinstance(task_row, dict) else None
detail = task_row.get("detail") if isinstance(task_row, dict) else None
updated_at = task_row.get("updated_at") if isinstance(task_row, dict) else None
if isinstance(task_status, str) and task_status == "error":
blocked = True
task_payload: dict[str, Any] = {
"task": task_name or "",
"status": task_status or "",
}
if isinstance(detail, str) and detail:
task_payload["detail"] = detail
if isinstance(updated_at, datetime):
task_payload["updated_at"] = updated_at.astimezone(timezone.utc).isoformat()
tasks.append(task_payload)
response["tasks"] = tasks
response["automation_complete"] = provision_tasks_complete(conn, code)
response["blocked"] = blocked
if status in {"awaiting_onboarding", "ready"}:
revealed_at = row.get("initial_password_revealed_at")
if isinstance(revealed_at, datetime):
response["initial_password_revealed_at"] = revealed_at.astimezone(timezone.utc).isoformat()
if reveal_initial_password:
password = row.get("initial_password")
if isinstance(password, str) and password and revealed_at is None:
response["initial_password"] = password
conn.execute(
"UPDATE access_requests SET initial_password_revealed_at = NOW() WHERE request_code = %s AND initial_password_revealed_at IS NULL",
(code,),
)
if status in {"awaiting_onboarding", "ready"}:
2026-01-02 01:34:18 -03:00
response["onboarding_url"] = f"/onboarding?code={code}"
if status in {"awaiting_onboarding", "ready"}:
2026-01-04 21:57:31 -03:00
response["onboarding"] = _onboarding_payload(conn, code, row.get("username") or "")
2026-01-02 01:34:18 -03:00
return jsonify(response)
except Exception:
return jsonify({"error": "failed to load status"}), 502
@app.route("/api/access/request/onboarding/attest", methods=["POST"])
def request_access_onboarding_attest() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
step = (payload.get("step") or "").strip()
completed = payload.get("completed")
if not code:
return jsonify({"error": "request_code is required"}), 400
if step not in ONBOARDING_STEPS:
return jsonify({"error": "invalid step"}), 400
if step in KEYCLOAK_MANAGED_STEPS:
return jsonify({"error": "step is managed by keycloak"}), 400
username = ""
bearer = request.headers.get("Authorization", "")
if bearer:
parts = bearer.split(None, 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
return jsonify({"error": "invalid token"}), 401
token = parts[1].strip()
if not token:
return jsonify({"error": "invalid token"}), 401
try:
claims = oidc_client().verify(token)
except Exception:
return jsonify({"error": "invalid token"}), 401
username = claims.get("preferred_username") or ""
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if username and (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
mark_done = True
if isinstance(completed, bool):
mark_done = completed
if mark_done:
if step == "element_recovery_key":
return (
jsonify({"error": "step requires verification"}),
400,
)
prerequisites = ONBOARDING_STEP_PREREQUISITES.get(step, set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
conn.execute(
"""
INSERT INTO access_request_onboarding_steps (request_code, step)
VALUES (%s, %s)
ON CONFLICT (request_code, step) DO NOTHING
""",
(code, step),
)
else:
conn.execute(
"DELETE FROM access_request_onboarding_steps WHERE request_code = %s AND step = %s",
(code, step),
)
if step == "element_recovery_key":
conn.execute(
"DELETE FROM access_request_onboarding_artifacts WHERE request_code = %s AND artifact = %s",
(code, _ELEMENT_RECOVERY_ARTIFACT),
)
# Re-evaluate completion to update request status to ready if applicable.
status = _advance_status(conn, code, username, status)
2026-01-04 21:57:31 -03:00
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to update onboarding"}), 502
return jsonify(
{
"ok": True,
"status": status,
2026-01-04 21:57:31 -03:00
"onboarding": onboarding_payload,
}
)
@app.route("/api/access/request/onboarding/element-recovery", methods=["POST"])
@require_auth
def request_access_onboarding_element_recovery() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
sha256_hex = (payload.get("sha256") or payload.get("sha256_hex") or "").strip().lower()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not sha256_hex:
return jsonify({"error": "sha256 is required"}), 400
if not _SHA256_HEX_RE.fullmatch(sha256_hex):
return jsonify({"error": "invalid sha256"}), 400
username = getattr(g, "keycloak_username", "") or ""
if not username:
return jsonify({"error": "invalid token"}), 401
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
prerequisites = ONBOARDING_STEP_PREREQUISITES.get("element_recovery_key", set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
conn.execute(
"""
INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash)
VALUES (%s, %s, %s)
ON CONFLICT (request_code, artifact) DO UPDATE
SET value_hash = EXCLUDED.value_hash,
created_at = NOW()
""",
(code, _ELEMENT_RECOVERY_ARTIFACT, sha256_hex),
)
conn.execute(
"""
INSERT INTO access_request_onboarding_steps (request_code, step)
VALUES (%s, %s)
ON CONFLICT (request_code, step) DO NOTHING
""",
(code, "element_recovery_key"),
)
status = _advance_status(conn, code, username, status)
2026-01-04 21:57:31 -03:00
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to verify element recovery key"}), 502
return jsonify(
{
"ok": True,
"status": status,
2026-01-04 21:57:31 -03:00
"onboarding": onboarding_payload,
}
)
2026-01-04 21:57:31 -03:00
@app.route("/api/access/request/onboarding/keycloak-password-rotate", methods=["POST"])
@require_auth
def request_access_onboarding_keycloak_password_rotate() -> Any:
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
username = getattr(g, "keycloak_username", "") or ""
if not username:
return jsonify({"error": "invalid token"}), 401
if not admin_client().ready():
return jsonify({"error": "keycloak admin unavailable"}), 503
try:
with connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = _normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
prerequisites = ONBOARDING_STEP_PREREQUISITES.get("keycloak_password_rotated", set())
if prerequisites:
current_completed = _completed_onboarding_steps(conn, code, username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
user = admin_client().find_user(username) or {}
user_id = user.get("id") if isinstance(user, dict) else None
if not isinstance(user_id, str) or not user_id:
return jsonify({"error": "keycloak user not found"}), 409
full = admin_client().get_user(user_id)
actions = full.get("requiredActions")
actions_list: list[str] = []
if isinstance(actions, list):
actions_list = [a for a in actions if isinstance(a, str)]
if "UPDATE_PASSWORD" not in actions_list:
actions_list.append("UPDATE_PASSWORD")
2026-01-20 03:58:34 -03:00
admin_client().update_user_safe(user_id, {"requiredActions": actions_list})
conn.execute(
"""
INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash)
VALUES (%s, %s, NOW()::text)
ON CONFLICT (request_code, artifact) DO NOTHING
""",
(code, _KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT),
)
onboarding_payload = _onboarding_payload(conn, code, username)
except Exception:
return jsonify({"error": "failed to request password rotation"}), 502
return jsonify({"ok": True, "status": status, "onboarding": onboarding_payload})