from __future__ import annotations import re import secrets import string from typing import Any from flask import jsonify, request import psycopg from ..db import connect, configured from ..keycloak import admin_client from ..rate_limit import rate_limit_allow from .. import settings def _extract_request_payload() -> tuple[str, str, str]: payload = request.get_json(silent=True) or {} username = (payload.get("username") or "").strip() email = (payload.get("email") or "").strip() note = (payload.get("note") or "").strip() return username, email, note def _random_request_code(username: str) -> str: suffix = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(10)) return f"{username}~{suffix}" def _client_ip() -> str: xff = (request.headers.get("X-Forwarded-For") or "").strip() if xff: return xff.split(",", 1)[0].strip() or "unknown" x_real_ip = (request.headers.get("X-Real-IP") or "").strip() if x_real_ip: return x_real_ip return request.remote_addr or "unknown" def register(app) -> None: @app.route("/api/access/request", methods=["POST"]) def request_access() -> Any: if not settings.ACCESS_REQUEST_ENABLED: return jsonify({"error": "request access disabled"}), 503 if not configured(): return jsonify({"error": "server not configured"}), 503 ip = _client_ip() if not rate_limit_allow( ip, key="access_request_submit", limit=settings.ACCESS_REQUEST_SUBMIT_RATE_LIMIT, window_sec=settings.ACCESS_REQUEST_SUBMIT_RATE_WINDOW_SEC, ): return jsonify({"error": "rate limited"}), 429 username, email, note = _extract_request_payload() if not username: return jsonify({"error": "username is required"}), 400 if len(username) < 3 or len(username) > 32: return jsonify({"error": "username must be 3-32 characters"}), 400 if not re.fullmatch(r"[a-zA-Z0-9._-]+", username): return jsonify({"error": "username contains invalid characters"}), 400 if email and "@" not in email: return jsonify({"error": "invalid email"}), 400 if admin_client().ready() and admin_client().find_user(username): return jsonify({"error": "username already exists"}), 409 try: with connect() as conn: existing = conn.execute( """ SELECT request_code, status FROM access_requests WHERE username = %s AND status = 'pending' ORDER BY created_at DESC LIMIT 1 """, (username,), ).fetchone() if existing: return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]}) request_code = _random_request_code(username) try: conn.execute( """ INSERT INTO access_requests (request_code, username, contact_email, note, status) VALUES (%s, %s, %s, %s, 'pending') """, (request_code, username, email or None, note or None), ) except psycopg.errors.UniqueViolation: conn.rollback() existing = conn.execute( """ SELECT request_code, status FROM access_requests WHERE username = %s AND status = 'pending' ORDER BY created_at DESC LIMIT 1 """, (username,), ).fetchone() if not existing: raise return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]}) except Exception: return jsonify({"error": "failed to submit request"}), 502 return jsonify({"ok": True, "request_code": request_code}) @app.route("/api/access/request/status", methods=["POST"]) def request_access_status() -> Any: if not settings.ACCESS_REQUEST_ENABLED: return jsonify({"error": "request access disabled"}), 503 if not configured(): return jsonify({"error": "server not configured"}), 503 ip = _client_ip() if not rate_limit_allow( ip, key="access_request_status", limit=settings.ACCESS_REQUEST_STATUS_RATE_LIMIT, window_sec=settings.ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC, ): return jsonify({"error": "rate limited"}), 429 payload = request.get_json(silent=True) or {} code = (payload.get("request_code") or payload.get("code") or "").strip() if not code: return jsonify({"error": "request_code is required"}), 400 try: with connect() as conn: row = conn.execute( "SELECT status, username FROM access_requests WHERE request_code = %s", (code,), ).fetchone() if not row: return jsonify({"error": "not found"}), 404 status = row["status"] or "unknown" response: dict[str, Any] = { "ok": True, "status": status, "username": row.get("username") or "", } if status == "approved": response["onboarding_url"] = f"/onboarding?code={code}" return jsonify(response) except Exception: return jsonify({"error": "failed to load status"}), 502