144 lines
5.5 KiB
Python

from __future__ import annotations
from typing import Any
from urllib.parse import quote
from flask import jsonify, g, request
from .. import ariadne_client
from ..db import connect, configured
from ..keycloak import require_auth, require_portal_admin
from ..provisioning import provision_access_request
def register(app) -> None:
@app.route("/api/admin/access/requests", methods=["GET"])
@require_auth
def admin_list_requests() -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not configured():
return jsonify({"error": "server not configured"}), 503
if ariadne_client.enabled():
return ariadne_client.proxy("GET", "/api/admin/access/requests")
try:
with connect() as conn:
rows = conn.execute(
"""
SELECT request_code, username, contact_email, note, status, created_at
FROM access_requests
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 200
"""
).fetchall()
except Exception:
return jsonify({"error": "failed to load requests"}), 502
output: list[dict[str, Any]] = []
for row in rows:
output.append(
{
"id": row["request_code"],
"username": row["username"],
"email": row.get("contact_email") or "",
"request_code": row["request_code"],
"created_at": (row.get("created_at").isoformat() if row.get("created_at") else ""),
"note": row.get("note") or "",
}
)
return jsonify({"requests": output})
@app.route("/api/admin/access/requests/<username>/approve", methods=["POST"])
@require_auth
def admin_approve_request(username: str) -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
if ariadne_client.enabled():
return ariadne_client.proxy(
"POST",
f"/api/admin/access/requests/{quote(username, safe='')}/approve",
payload=payload,
)
decided_by = getattr(g, "keycloak_username", "") or ""
flags_raw = payload.get("flags") if isinstance(payload, dict) else None
flags = [f for f in flags_raw if isinstance(f, str)] if isinstance(flags_raw, list) else []
note = payload.get("note") if isinstance(payload, dict) else None
note = str(note).strip() if isinstance(note, str) else None
try:
with connect() as conn:
row = conn.execute(
"""
UPDATE access_requests
SET status = 'accounts_building',
decided_at = NOW(),
decided_by = %s,
approval_flags = %s,
approval_note = %s
WHERE username = %s AND status = 'pending'
AND email_verified_at IS NOT NULL
RETURNING request_code
""",
(decided_by or None, flags or None, note, username),
).fetchone()
except Exception:
return jsonify({"error": "failed to approve request"}), 502
if not row:
return jsonify({"ok": True, "request_code": ""})
# Provision the account best-effort (Keycloak user + Mailu password + sync).
try:
provision_access_request(row["request_code"])
except Exception:
# Keep the request in accounts_building; status checks will surface it.
pass
return jsonify({"ok": True, "request_code": row["request_code"]})
@app.route("/api/admin/access/requests/<username>/deny", methods=["POST"])
@require_auth
def admin_deny_request(username: str) -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not configured():
return jsonify({"error": "server not configured"}), 503
payload = request.get_json(silent=True) or {}
if ariadne_client.enabled():
return ariadne_client.proxy(
"POST",
f"/api/admin/access/requests/{quote(username, safe='')}/deny",
payload=payload,
)
decided_by = getattr(g, "keycloak_username", "") or ""
note = payload.get("note") if isinstance(payload, dict) else None
note = str(note).strip() if isinstance(note, str) else None
try:
with connect() as conn:
row = conn.execute(
"""
UPDATE access_requests
SET status = 'denied',
decided_at = NOW(),
decided_by = %s,
denial_note = %s
WHERE username = %s AND status = 'pending'
RETURNING request_code
""",
(decided_by or None, note, username),
).fetchone()
except Exception:
return jsonify({"error": "failed to deny request"}), 502
if not row:
return jsonify({"ok": True, "request_code": ""})
return jsonify({"ok": True, "request_code": row["request_code"]})