155 lines
5.4 KiB
Python

from __future__ import annotations
import time
from typing import Any
from flask import jsonify, request
import httpx
from .. import settings
from ..keycloak import admin_client, require_auth, require_portal_admin
def _kc_admin_list_pending_requests(limit: int = 100) -> list[dict[str, Any]]:
def is_pending(user: dict[str, Any]) -> bool:
attrs = user.get("attributes") or {}
if not isinstance(attrs, dict):
return False
status = attrs.get("access_request_status")
if isinstance(status, list) and status:
return str(status[0]) == "pending"
if isinstance(status, str):
return status == "pending"
return False
url = f"{settings.KEYCLOAK_ADMIN_URL}/admin/realms/{settings.KEYCLOAK_REALM}/users"
candidates: list[dict[str, Any]] = []
for params in (
{"max": str(limit), "enabled": "false", "q": "access_request_status:pending"},
{"max": str(limit), "enabled": "false"},
{"max": str(limit)},
):
try:
with httpx.Client(timeout=settings.HTTP_CHECK_TIMEOUT_SEC) as client:
resp = client.get(url, params=params, headers=admin_client().headers())
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list):
continue
candidates = [u for u in users if isinstance(u, dict)]
break
except httpx.HTTPStatusError:
continue
pending = [u for u in candidates if is_pending(u)]
return pending[:limit]
def register(app) -> None:
@app.route("/api/admin/access/requests", methods=["GET"])
@require_auth
def admin_list_requests() -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
try:
items = _kc_admin_list_pending_requests()
except Exception:
return jsonify({"error": "failed to load requests"}), 502
output: list[dict[str, Any]] = []
for user in items:
attrs = user.get("attributes") or {}
if not isinstance(attrs, dict):
attrs = {}
output.append(
{
"id": user.get("id") or "",
"username": user.get("username") or "",
"email": user.get("email") or "",
"request_code": (attrs.get("access_request_code") or [""])[0],
"created_at": (attrs.get("access_request_created_at") or [""])[0],
"note": (attrs.get("access_request_note") or [""])[0],
}
)
return jsonify({"requests": output})
@app.route("/api/admin/access/requests/<username>/approve", methods=["POST"])
@require_auth
def admin_approve_request(username: str) -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
user = admin_client().find_user(username)
if not user:
return jsonify({"error": "user not found"}), 404
user_id = user.get("id") or ""
if not user_id:
return jsonify({"error": "user id missing"}), 502
full = admin_client().get_user(user_id)
full["enabled"] = True
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
attrs = {}
attrs["access_request_status"] = ["approved"]
attrs["access_request_approved_at"] = [str(int(time.time()))]
full["attributes"] = attrs
try:
admin_client().update_user(user_id, full)
except Exception:
return jsonify({"error": "failed to enable user"}), 502
group_id = admin_client().get_group_id("dev")
if group_id:
try:
admin_client().add_user_to_group(user_id, group_id)
except Exception:
pass
if (full.get("email") or "").strip():
try:
admin_client().execute_actions_email(user_id, ["UPDATE_PASSWORD"], request.host_url.rstrip("/") + "/")
except Exception:
pass
return jsonify({"ok": True})
@app.route("/api/admin/access/requests/<username>/deny", methods=["POST"])
@require_auth
def admin_deny_request(username: str) -> Any:
ok, resp = require_portal_admin()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
user = admin_client().find_user(username)
if not user:
return jsonify({"ok": True})
user_id = user.get("id") or ""
if not user_id:
return jsonify({"error": "user id missing"}), 502
full = admin_client().get_user(user_id)
full["enabled"] = False
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
attrs = {}
attrs["access_request_status"] = ["denied"]
attrs["access_request_denied_at"] = [str(int(time.time()))]
full["attributes"] = attrs
try:
admin_client().update_user(user_id, full)
except Exception:
return jsonify({"error": "failed to deny user"}), 502
return jsonify({"ok": True})