148 lines
5.6 KiB
Python

from __future__ import annotations
import re
import secrets
import string
from typing import Any
from flask import jsonify, request
import psycopg
from ..db import connect, configured
from ..keycloak import admin_client
from ..rate_limit import rate_limit_allow
from .. import settings
def _extract_request_payload() -> tuple[str, str, str]:
payload = request.get_json(silent=True) or {}
username = (payload.get("username") or "").strip()
email = (payload.get("email") or "").strip()
note = (payload.get("note") or "").strip()
return username, email, note
def _random_request_code(username: str) -> str:
suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10))
return f"{username}~{suffix}"
def register(app) -> None:
@app.route("/api/access/request", methods=["POST"])
def request_access() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = request.remote_addr or "unknown"
if not rate_limit_allow(
ip,
key="access_request_submit",
limit=settings.ACCESS_REQUEST_SUBMIT_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_SUBMIT_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
username, email, note = _extract_request_payload()
if not username:
return jsonify({"error": "username is required"}), 400
if len(username) < 3 or len(username) > 32:
return jsonify({"error": "username must be 3-32 characters"}), 400
if not re.fullmatch(r"[a-zA-Z0-9._-]+", username):
return jsonify({"error": "username contains invalid characters"}), 400
if email and "@" not in email:
return jsonify({"error": "invalid email"}), 400
if admin_client().ready() and admin_client().find_user(username):
return jsonify({"error": "username already exists"}), 409
try:
with connect() as conn:
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status = 'pending'
ORDER BY created_at DESC
LIMIT 1
""",
(username,),
).fetchone()
if existing:
return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]})
request_code = _random_request_code(username)
try:
conn.execute(
"""
INSERT INTO access_requests
(request_code, username, contact_email, note, status)
VALUES
(%s, %s, %s, %s, 'pending')
""",
(request_code, username, email or None, note or None),
)
except psycopg.errors.UniqueViolation:
conn.rollback()
existing = conn.execute(
"""
SELECT request_code, status
FROM access_requests
WHERE username = %s AND status = 'pending'
ORDER BY created_at DESC
LIMIT 1
""",
(username,),
).fetchone()
if not existing:
raise
return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]})
except Exception:
return jsonify({"error": "failed to submit request"}), 502
return jsonify({"ok": True, "request_code": request_code})
@app.route("/api/access/request/status", methods=["POST"])
def request_access_status() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = request.remote_addr or "unknown"
if not rate_limit_allow(
ip,
key="access_request_status",
limit=settings.ACCESS_REQUEST_STATUS_RATE_LIMIT,
window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
try:
with connect() as conn:
row = conn.execute(
"SELECT status, username FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = row["status"] or "unknown"
response: dict[str, Any] = {
"ok": True,
"status": status,
"username": row.get("username") or "",
}
if status == "approved":
response["onboarding_url"] = f"/onboarding?code={code}"
return jsonify(response)
except Exception:
return jsonify({"error": "failed to load status"}), 502