120 lines
4.5 KiB
Python
Raw Normal View History

from __future__ import annotations
import re
import secrets
import string
import time
from typing import Any
from flask import jsonify, request
from ..keycloak import admin_client
from ..rate_limit import rate_limit_allow
from .. import settings
def _extract_request_payload() -> tuple[str, str, str]:
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip()
email = (payload.get("email") or "").strip()
note = (payload.get("note") or "").strip()
return username, email, note
def _random_request_code(username: str) -> str:
suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10))
return f"{username}~{suffix}"
def register(app) -> None:
@app.route("/api/access/request", methods=["POST"])
def request_access() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
ip = request.remote_addr or "unknown"
if not rate_limit_allow(ip):
return jsonify({"error": "rate limited"}), 429
username, email, note = _extract_request_payload()
if not username:
return jsonify({"error": "username is required"}), 400
if len(username) < 3 or len(username) > 32:
return jsonify({"error": "username must be 3-32 characters"}), 400
if not re.fullmatch(r"[a-zA-Z0-9._-]+", username):
return jsonify({"error": "username contains invalid characters"}), 400
if email and "@" not in email:
return jsonify({"error": "invalid email"}), 400
if admin_client().find_user(username):
return jsonify({"error": "username already exists"}), 409
request_code = _random_request_code(username)
attrs: dict[str, Any] = {
"access_request_code": [request_code],
"access_request_status": ["pending"],
"access_request_note": [note] if note else [""],
"access_request_created_at": [str(int(time.time()))],
"access_request_contact_email": [email] if email else [""],
}
user_payload: dict[str, Any] = {"username": username, "enabled": False, "attributes": attrs}
if email:
user_payload["email"] = email
user_payload["emailVerified"] = False
try:
user_id = admin_client().create_user(user_payload)
except Exception:
return jsonify({"error": "failed to submit request"}), 502
return jsonify({"ok": True, "id": user_id, "request_code": request_code})
@app.route("/api/access/request/status", methods=["POST"])
def request_access_status() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
ip = request.remote_addr or "unknown"
if not rate_limit_allow(ip):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
username = (payload.get("username") or "").strip()
if not username:
if "~" not in code:
return jsonify({"error": "invalid request code"}), 400
username = code.split("~", 1)[0].strip()
if not username:
return jsonify({"error": "invalid request code"}), 400
user = admin_client().find_user(username)
if not user:
return jsonify({"error": "not found"}), 404
attrs = user.get("attributes") or {}
if not isinstance(attrs, dict):
return jsonify({"error": "not found"}), 404
stored = attrs.get("access_request_code") or [""]
stored_code = stored[0] if isinstance(stored, list) and stored else (stored if isinstance(stored, str) else "")
if stored_code != code:
return jsonify({"error": "not found"}), 404
status = attrs.get("access_request_status") or [""]
status_value = (
status[0] if isinstance(status, list) and status else (status if isinstance(status, str) else "")
)
return jsonify({"ok": True, "status": status_value or "unknown"})