from __future__ import annotations from datetime import datetime, timezone from typing import Any from flask import jsonify, redirect, request def register_access_request_onboarding(app, deps) -> None: """Register access request onboarding routes.""" @app.route("/api/access/request/onboarding/attest", methods=["POST"]) def request_access_onboarding_attest() -> Any: """Record or clear a user-attested onboarding step. WHY: onboarding mixes manual tasks with Keycloak-managed tasks, so this route enforces prerequisites and only accepts attestations for UI-owned steps. """ if not deps.configured(): return jsonify({"error": "server not deps.configured"}), 503 payload = request.get_json(silent=True) or {} code = (payload.get("request_code") or payload.get("code") or "").strip() step = (payload.get("step") or "").strip() completed = payload.get("completed") vaultwarden_claim = bool(payload.get("vaultwarden_claim")) if not code: return jsonify({"error": "request_code is required"}), 400 if step not in deps.ONBOARDING_STEPS: return jsonify({"error": "invalid step"}), 400 if step in deps.KEYCLOAK_MANAGED_STEPS: return jsonify({"error": "step is managed by keycloak"}), 400 username = "" token_groups: set[str] = set() bearer = request.headers.get("Authorization", "") if bearer: parts = bearer.split(None, 1) if len(parts) != 2 or parts[0].lower() != "bearer": return jsonify({"error": "invalid token"}), 401 token = parts[1].strip() if not token: return jsonify({"error": "invalid token"}), 401 try: claims = deps.oidc_client().verify(token) except Exception: return jsonify({"error": "invalid token"}), 401 username = claims.get("preferred_username") or "" groups = claims.get("groups") if isinstance(groups, list): token_groups = {g.lstrip("/") for g in groups if isinstance(g, str) and g} try: with deps.connect() as conn: row = conn.execute( "SELECT username, status, approval_flags, contact_email FROM access_requests WHERE request_code = %s", (code,), ).fetchone() if not row: return jsonify({"error": "not found"}), 404 if username and (row.get("username") or "") != username: return jsonify({"error": "forbidden"}), 403 status = deps._normalize_status(row.get("status") or "") if status not in {"awaiting_onboarding", "ready"}: return jsonify({"error": "onboarding not available"}), 409 mark_done = True if isinstance(completed, bool): mark_done = completed request_username = row.get("username") or "" approval_flags = deps._normalize_flag_list(row.get("approval_flags")) contact_email = (row.get("contact_email") or "").strip() if mark_done: prerequisites = deps.ONBOARDING_STEP_PREREQUISITES.get(step, set()) if prerequisites: current_completed = deps._completed_onboarding_steps(conn, code, request_username) missing = sorted(prerequisites - current_completed) if missing: return jsonify({"error": "step is blocked", "blocked_by": missing}), 409 if step in {"vaultwarden_master_password", "vaultwarden_store_temp_password"}: if not deps._password_rotation_requested(conn, code): try: deps._request_keycloak_password_rotation(conn, code, request_username) except Exception: return jsonify({"error": "failed to request keycloak password rotation"}), 502 if step == "vaultwarden_master_password": if vaultwarden_claim and not username: return jsonify({"error": "login required"}), 401 grandfathered = ( deps.VAULTWARDEN_GRANDFATHERED_FLAG in approval_flags or deps.VAULTWARDEN_GRANDFATHERED_FLAG in token_groups or deps._user_in_group(request_username, deps.VAULTWARDEN_GRANDFATHERED_FLAG) ) if vaultwarden_claim and not grandfathered: return jsonify({"error": "vaultwarden claim not allowed"}), 403 if vaultwarden_claim and not deps.admin_client().ready(): return jsonify({"error": "keycloak admin unavailable"}), 503 if request_username and deps.admin_client().ready(): try: now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if vaultwarden_claim: recovery_email = deps._resolve_recovery_email(request_username, contact_email) if not recovery_email: return jsonify({"error": "recovery email missing"}), 409 deps.admin_client().set_user_attribute( request_username, "vaultwarden_email", recovery_email, ) deps.admin_client().set_user_attribute( request_username, "vaultwarden_status", "grandfathered", ) deps.admin_client().set_user_attribute( request_username, "vaultwarden_synced_at", now, ) else: deps.admin_client().set_user_attribute( request_username, "vaultwarden_status", "already_present", ) deps.admin_client().set_user_attribute( request_username, "vaultwarden_master_password_set_at", now, ) except Exception: return jsonify({"error": "failed to update vaultwarden status"}), 502 conn.execute( """ INSERT INTO access_request_onboarding_steps (request_code, step) VALUES (%s, %s) ON CONFLICT (request_code, step) DO NOTHING """, (code, step), ) else: conn.execute( "DELETE FROM access_request_onboarding_steps WHERE request_code = %s AND step = %s", (code, step), ) # Re-evaluate completion to update request status to ready if applicable. status = deps._advance_status(conn, code, request_username, status) onboarding_payload = deps._onboarding_payload(conn, code, request_username) except Exception: return jsonify({"error": "failed to update onboarding"}), 502 return jsonify( { "ok": True, "status": status, "onboarding": onboarding_payload, } ) @app.route("/api/access/request/onboarding/keycloak-password-rotate", methods=["POST"]) def request_access_onboarding_keycloak_password_rotate() -> Any: """Request Keycloak password rotation for an onboarding user.""" if not deps.configured(): return jsonify({"error": "server not deps.configured"}), 503 payload = request.get_json(silent=True) or {} code = (payload.get("request_code") or payload.get("code") or "").strip() if not code: return jsonify({"error": "request_code is required"}), 400 token_username = "" bearer = request.headers.get("Authorization", "") if bearer: parts = bearer.split(None, 1) if len(parts) != 2 or parts[0].lower() != "bearer": return jsonify({"error": "invalid token"}), 401 token = parts[1].strip() if not token: return jsonify({"error": "invalid token"}), 401 try: claims = deps.oidc_client().verify(token) except Exception: return jsonify({"error": "invalid token"}), 401 token_username = claims.get("preferred_username") or "" if not deps.admin_client().ready(): return jsonify({"error": "keycloak admin unavailable"}), 503 try: with deps.connect() as conn: row = conn.execute( "SELECT username, status FROM access_requests WHERE request_code = %s", (code,), ).fetchone() if not row: return jsonify({"error": "not found"}), 404 request_username = row.get("username") or "" if token_username and request_username != token_username: return jsonify({"error": "forbidden"}), 403 status = deps._normalize_status(row.get("status") or "") if status not in {"awaiting_onboarding", "ready"}: return jsonify({"error": "onboarding not available"}), 409 prerequisites = deps.ONBOARDING_STEP_PREREQUISITES.get("keycloak_password_rotated", set()) if prerequisites: current_completed = deps._completed_onboarding_steps(conn, code, request_username) missing = sorted(prerequisites - current_completed) if missing: return jsonify({"error": "step is blocked", "blocked_by": missing}), 409 user = deps.admin_client().find_user(request_username) or {} user_id = user.get("id") if isinstance(user, dict) else None if not isinstance(user_id, str) or not user_id: return jsonify({"error": "keycloak user not found"}), 409 full = deps.admin_client().get_user(user_id) actions = full.get("requiredActions") actions_list: list[str] = [] if isinstance(actions, list): actions_list = [a for a in actions if isinstance(a, str)] rotation_requested = deps._password_rotation_requested(conn, code) already_rotated = rotation_requested and "UPDATE_PASSWORD" not in actions_list if not already_rotated: if "UPDATE_PASSWORD" not in actions_list: actions_list.append("UPDATE_PASSWORD") deps.admin_client().update_user_safe(user_id, {"requiredActions": actions_list}) if not rotation_requested: conn.execute( """ INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash) VALUES (%s, %s, NOW()::text) ON CONFLICT (request_code, artifact) DO NOTHING """, (code, deps._KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT), ) onboarding_payload = deps._onboarding_payload(conn, code, request_username) except Exception: return jsonify({"error": "failed to request password rotation"}), 502 return jsonify({"ok": True, "status": status, "onboarding": onboarding_payload})