from __future__ import annotations import re import secrets import string from typing import Any from flask import jsonify, request import psycopg from ..db import connect, configured from ..keycloak import admin_client from ..rate_limit import rate_limit_allow from .. import settings def _extract_request_payload() -> tuple[str, str, str]: payload = request.get_json(silent=True) or {} username = (payload.get("username") or "").strip() email = (payload.get("email") or "").strip() note = (payload.get("note") or "").strip() return username, email, note def _random_request_code(username: str) -> str: suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10)) return f"{username}~{suffix}" def register(app) -> None: @app.route("/api/access/request", methods=["POST"]) def request_access() -> Any: if not settings.ACCESS_REQUEST_ENABLED: return jsonify({"error": "request access disabled"}), 503 if not configured(): return jsonify({"error": "server not configured"}), 503 ip = request.remote_addr or "unknown" if not rate_limit_allow(ip): return jsonify({"error": "rate limited"}), 429 username, email, note = _extract_request_payload() if not username: return jsonify({"error": "username is required"}), 400 if len(username) < 3 or len(username) > 32: return jsonify({"error": "username must be 3-32 characters"}), 400 if not re.fullmatch(r"[a-zA-Z0-9._-]+", username): return jsonify({"error": "username contains invalid characters"}), 400 if email and "@" not in email: return jsonify({"error": "invalid email"}), 400 if admin_client().ready() and admin_client().find_user(username): return jsonify({"error": "username already exists"}), 409 try: with connect() as conn: existing = conn.execute( """ SELECT request_code, status FROM access_requests WHERE username = %s AND status = 'pending' ORDER BY created_at DESC LIMIT 1 """, (username,), ).fetchone() if existing: return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]}) request_code = _random_request_code(username) try: conn.execute( """ INSERT INTO access_requests (request_code, username, contact_email, note, status) VALUES (%s, %s, %s, %s, 'pending') """, (request_code, username, email or None, note or None), ) except psycopg.errors.UniqueViolation: conn.rollback() existing = conn.execute( """ SELECT request_code, status FROM access_requests WHERE username = %s AND status = 'pending' ORDER BY created_at DESC LIMIT 1 """, (username,), ).fetchone() if not existing: raise return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]}) except Exception: return jsonify({"error": "failed to submit request"}), 502 return jsonify({"ok": True, "request_code": request_code}) @app.route("/api/access/request/status", methods=["POST"]) def request_access_status() -> Any: if not settings.ACCESS_REQUEST_ENABLED: return jsonify({"error": "request access disabled"}), 503 if not configured(): return jsonify({"error": "server not configured"}), 503 ip = request.remote_addr or "unknown" if not rate_limit_allow(ip): return jsonify({"error": "rate limited"}), 429 payload = request.get_json(silent=True) or {} code = (payload.get("request_code") or payload.get("code") or "").strip() if not code: return jsonify({"error": "request_code is required"}), 400 try: with connect() as conn: row = conn.execute( "SELECT status FROM access_requests WHERE request_code = %s", (code,), ).fetchone() if not row: return jsonify({"error": "not found"}), 404 return jsonify({"ok": True, "status": row["status"] or "unknown"}) except Exception: return jsonify({"error": "failed to load status"}), 502