bstein-dev-home/backend/atlas_portal/routes/access_request_onboarding.py

246 lines
12 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from flask import jsonify, redirect, request
def register_access_request_onboarding(app, deps) -> None:
"""Register access request onboarding routes."""
@app.route("/api/access/request/onboarding/attest", methods=["POST"])
def request_access_onboarding_attest() -> Any:
if not deps.configured():
return jsonify({"error": "server not deps.configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
step = (payload.get("step") or "").strip()
completed = payload.get("completed")
vaultwarden_claim = bool(payload.get("vaultwarden_claim"))
if not code:
return jsonify({"error": "request_code is required"}), 400
if step not in deps.ONBOARDING_STEPS:
return jsonify({"error": "invalid step"}), 400
if step in deps.KEYCLOAK_MANAGED_STEPS:
return jsonify({"error": "step is managed by keycloak"}), 400
username = ""
token_groups: set[str] = set()
bearer = request.headers.get("Authorization", "")
if bearer:
parts = bearer.split(None, 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
return jsonify({"error": "invalid token"}), 401
token = parts[1].strip()
if not token:
return jsonify({"error": "invalid token"}), 401
try:
claims = deps.oidc_client().verify(token)
except Exception:
return jsonify({"error": "invalid token"}), 401
username = claims.get("preferred_username") or ""
groups = claims.get("groups")
if isinstance(groups, list):
token_groups = {g.lstrip("/") for g in groups if isinstance(g, str) and g}
try:
with deps.connect() as conn:
row = conn.execute(
"SELECT username, status, approval_flags, contact_email FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
if username and (row.get("username") or "") != username:
return jsonify({"error": "forbidden"}), 403
status = deps._normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
mark_done = True
if isinstance(completed, bool):
mark_done = completed
request_username = row.get("username") or ""
approval_flags = deps._normalize_flag_list(row.get("approval_flags"))
contact_email = (row.get("contact_email") or "").strip()
if mark_done:
prerequisites = deps.ONBOARDING_STEP_PREREQUISITES.get(step, set())
if prerequisites:
current_completed = deps._completed_onboarding_steps(conn, code, request_username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
if step in {"vaultwarden_master_password", "vaultwarden_store_temp_password"}:
if not deps._password_rotation_requested(conn, code):
try:
deps._request_keycloak_password_rotation(conn, code, request_username)
except Exception:
return jsonify({"error": "failed to request keycloak password rotation"}), 502
if step == "vaultwarden_master_password":
if vaultwarden_claim and not username:
return jsonify({"error": "login required"}), 401
grandfathered = (
deps.VAULTWARDEN_GRANDFATHERED_FLAG in approval_flags
or deps.VAULTWARDEN_GRANDFATHERED_FLAG in token_groups
or deps._user_in_group(request_username, deps.VAULTWARDEN_GRANDFATHERED_FLAG)
)
if vaultwarden_claim and not grandfathered:
return jsonify({"error": "vaultwarden claim not allowed"}), 403
if vaultwarden_claim and not deps.admin_client().ready():
return jsonify({"error": "keycloak admin unavailable"}), 503
if request_username and deps.admin_client().ready():
try:
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
if vaultwarden_claim:
recovery_email = deps._resolve_recovery_email(request_username, contact_email)
if not recovery_email:
return jsonify({"error": "recovery email missing"}), 409
deps.admin_client().set_user_attribute(
request_username,
"vaultwarden_email",
recovery_email,
)
deps.admin_client().set_user_attribute(
request_username,
"vaultwarden_status",
"grandfathered",
)
deps.admin_client().set_user_attribute(
request_username,
"vaultwarden_synced_at",
now,
)
else:
deps.admin_client().set_user_attribute(
request_username,
"vaultwarden_status",
"already_present",
)
deps.admin_client().set_user_attribute(
request_username,
"vaultwarden_master_password_set_at",
now,
)
except Exception:
return jsonify({"error": "failed to update vaultwarden status"}), 502
conn.execute(
"""
INSERT INTO access_request_onboarding_steps (request_code, step)
VALUES (%s, %s)
ON CONFLICT (request_code, step) DO NOTHING
""",
(code, step),
)
else:
conn.execute(
"DELETE FROM access_request_onboarding_steps WHERE request_code = %s AND step = %s",
(code, step),
)
# Re-evaluate completion to update request status to ready if applicable.
status = deps._advance_status(conn, code, request_username, status)
onboarding_payload = deps._onboarding_payload(conn, code, request_username)
except Exception:
return jsonify({"error": "failed to update onboarding"}), 502
return jsonify(
{
"ok": True,
"status": status,
"onboarding": onboarding_payload,
}
)
@app.route("/api/access/request/onboarding/keycloak-password-rotate", methods=["POST"])
def request_access_onboarding_keycloak_password_rotate() -> Any:
if not deps.configured():
return jsonify({"error": "server not deps.configured"}), 503
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
token_username = ""
bearer = request.headers.get("Authorization", "")
if bearer:
parts = bearer.split(None, 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
return jsonify({"error": "invalid token"}), 401
token = parts[1].strip()
if not token:
return jsonify({"error": "invalid token"}), 401
try:
claims = deps.oidc_client().verify(token)
except Exception:
return jsonify({"error": "invalid token"}), 401
token_username = claims.get("preferred_username") or ""
if not deps.admin_client().ready():
return jsonify({"error": "keycloak admin unavailable"}), 503
try:
with deps.connect() as conn:
row = conn.execute(
"SELECT username, status FROM access_requests WHERE request_code = %s",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
request_username = row.get("username") or ""
if token_username and request_username != token_username:
return jsonify({"error": "forbidden"}), 403
status = deps._normalize_status(row.get("status") or "")
if status not in {"awaiting_onboarding", "ready"}:
return jsonify({"error": "onboarding not available"}), 409
prerequisites = deps.ONBOARDING_STEP_PREREQUISITES.get("keycloak_password_rotated", set())
if prerequisites:
current_completed = deps._completed_onboarding_steps(conn, code, request_username)
missing = sorted(prerequisites - current_completed)
if missing:
return jsonify({"error": "step is blocked", "blocked_by": missing}), 409
user = deps.admin_client().find_user(request_username) or {}
user_id = user.get("id") if isinstance(user, dict) else None
if not isinstance(user_id, str) or not user_id:
return jsonify({"error": "keycloak user not found"}), 409
full = deps.admin_client().get_user(user_id)
actions = full.get("requiredActions")
actions_list: list[str] = []
if isinstance(actions, list):
actions_list = [a for a in actions if isinstance(a, str)]
rotation_requested = deps._password_rotation_requested(conn, code)
already_rotated = rotation_requested and "UPDATE_PASSWORD" not in actions_list
if not already_rotated:
if "UPDATE_PASSWORD" not in actions_list:
actions_list.append("UPDATE_PASSWORD")
deps.admin_client().update_user_safe(user_id, {"requiredActions": actions_list})
if not rotation_requested:
conn.execute(
"""
INSERT INTO access_request_onboarding_artifacts (request_code, artifact, value_hash)
VALUES (%s, %s, NOW()::text)
ON CONFLICT (request_code, artifact) DO NOTHING
""",
(code, deps._KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT),
)
onboarding_payload = deps._onboarding_payload(conn, code, request_username)
except Exception:
return jsonify({"error": "failed to request password rotation"}), 502
return jsonify({"ok": True, "status": status, "onboarding": onboarding_payload})