from __future__ import annotations import socket import time from urllib.parse import quote from typing import Any import httpx from flask import jsonify, g, request from .. import settings from .. import ariadne_client from ..db import connect from ..keycloak import admin_client, require_auth, require_account_access from ..nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync from ..utils import random_password from ..firefly_user_sync import trigger as trigger_firefly_user_sync from ..wger_user_sync import trigger as trigger_wger_user_sync def _tcp_check(host: str, port: int, timeout_sec: float) -> bool: if not host or port <= 0: return False try: with socket.create_connection((host, port), timeout=timeout_sec): return True except OSError: return False def register(app) -> None: @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = require_account_access() if not ok: return resp username = g.keycloak_username keycloak_email = g.keycloak_email or "" mailu_email = "" mailu_app_password = "" mailu_status = "ready" nextcloud_mail_status = "unknown" nextcloud_mail_primary_email = "" nextcloud_mail_account_count = "" nextcloud_mail_synced_at = "" wger_status = "ready" wger_password = "" wger_password_updated_at = "" firefly_status = "ready" firefly_password = "" firefly_password_updated_at = "" vaultwarden_email = "" vaultwarden_status = "" vaultwarden_synced_at = "" jellyfin_status = "ready" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "" jellyfin_user_is_ldap = False onboarding_url = "" if not admin_client().ready(): mailu_status = "server not configured" wger_status = "server not configured" firefly_status = "server not configured" jellyfin_status = "server not configured" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "keycloak admin not configured" elif username: try: user = admin_client().find_user(username) or {} if isinstance(user, dict): jellyfin_user_is_ldap = bool(user.get("federationLink")) if not keycloak_email: keycloak_email = str(user.get("email") or "") attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced raw_wger_password = attrs.get("wger_password") if isinstance(raw_wger_password, list) and raw_wger_password: wger_password = str(raw_wger_password[0]) elif isinstance(raw_wger_password, str) and raw_wger_password: wger_password = raw_wger_password raw_wger_updated = attrs.get("wger_password_updated_at") if isinstance(raw_wger_updated, list) and raw_wger_updated: wger_password_updated_at = str(raw_wger_updated[0]) elif isinstance(raw_wger_updated, str) and raw_wger_updated: wger_password_updated_at = raw_wger_updated raw_firefly_password = attrs.get("firefly_password") if isinstance(raw_firefly_password, list) and raw_firefly_password: firefly_password = str(raw_firefly_password[0]) elif isinstance(raw_firefly_password, str) and raw_firefly_password: firefly_password = raw_firefly_password raw_firefly_updated = attrs.get("firefly_password_updated_at") if isinstance(raw_firefly_updated, list) and raw_firefly_updated: firefly_password_updated_at = str(raw_firefly_updated[0]) elif isinstance(raw_firefly_updated, str) and raw_firefly_updated: firefly_password_updated_at = raw_firefly_updated raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced user_id = user.get("id") if isinstance(user, dict) else None if user_id and ( not keycloak_email or not mailu_email or not mailu_app_password or not wger_password or not wger_password_updated_at or not firefly_password or not firefly_password_updated_at or not vaultwarden_email or not vaultwarden_status or not vaultwarden_synced_at ): full = admin_client().get_user(str(user_id)) if not keycloak_email: keycloak_email = str(full.get("email") or "") attrs = full.get("attributes") or {} if isinstance(attrs, dict): if not mailu_email: raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu and isinstance(raw_mailu[0], str): mailu_email = raw_mailu[0] elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu if not mailu_app_password: raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw if not nextcloud_mail_primary_email: raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary if not nextcloud_mail_account_count: raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count if not nextcloud_mail_synced_at: raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced if not wger_password: raw_wger_password = attrs.get("wger_password") if isinstance(raw_wger_password, list) and raw_wger_password: wger_password = str(raw_wger_password[0]) elif isinstance(raw_wger_password, str) and raw_wger_password: wger_password = raw_wger_password if not wger_password_updated_at: raw_wger_updated = attrs.get("wger_password_updated_at") if isinstance(raw_wger_updated, list) and raw_wger_updated: wger_password_updated_at = str(raw_wger_updated[0]) elif isinstance(raw_wger_updated, str) and raw_wger_updated: wger_password_updated_at = raw_wger_updated if not firefly_password: raw_firefly_password = attrs.get("firefly_password") if isinstance(raw_firefly_password, list) and raw_firefly_password: firefly_password = str(raw_firefly_password[0]) elif isinstance(raw_firefly_password, str) and raw_firefly_password: firefly_password = raw_firefly_password if not firefly_password_updated_at: raw_firefly_updated = attrs.get("firefly_password_updated_at") if isinstance(raw_firefly_updated, list) and raw_firefly_updated: firefly_password_updated_at = str(raw_firefly_updated[0]) elif isinstance(raw_firefly_updated, str) and raw_firefly_updated: firefly_password_updated_at = raw_firefly_updated if not vaultwarden_email: raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email if not vaultwarden_status: raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status if not vaultwarden_synced_at: raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced except Exception: mailu_status = "unavailable" nextcloud_mail_status = "unavailable" wger_status = "unavailable" firefly_status = "unavailable" vaultwarden_status = "unavailable" jellyfin_status = "unavailable" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "unavailable" mailu_username = mailu_email or (f"{username}@{settings.MAILU_DOMAIN}" if username else "") firefly_username = mailu_username vaultwarden_username = vaultwarden_email or mailu_username if not mailu_app_password and mailu_status == "ready": mailu_status = "needs app password" if not wger_password and wger_status == "ready": wger_status = "needs provisioning" if not firefly_password and firefly_status == "ready": firefly_status = "needs provisioning" if nextcloud_mail_status == "unknown": try: count_val = int(nextcloud_mail_account_count) if nextcloud_mail_account_count else 0 except ValueError: count_val = 0 if count_val > 0: nextcloud_mail_status = "ready" else: nextcloud_mail_status = "needs sync" if jellyfin_status == "ready": ldap_reachable = _tcp_check( settings.JELLYFIN_LDAP_HOST, settings.JELLYFIN_LDAP_PORT, settings.JELLYFIN_LDAP_CHECK_TIMEOUT_SEC, ) if not ldap_reachable: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "LDAP unreachable" elif not jellyfin_user_is_ldap: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "Keycloak user is not LDAP-backed" else: jellyfin_sync_status = "ok" jellyfin_sync_detail = "LDAP-backed (Keycloak is source of truth)" if not vaultwarden_status: vaultwarden_status = "needs provisioning" if settings.PORTAL_DATABASE_URL and username: request_code = "" try: with connect() as conn: row = conn.execute( "SELECT request_code FROM access_requests WHERE username = %s ORDER BY created_at DESC LIMIT 1", (username,), ).fetchone() if not row and keycloak_email: row = conn.execute( "SELECT request_code FROM access_requests WHERE contact_email = %s ORDER BY created_at DESC LIMIT 1", (keycloak_email,), ).fetchone() if row and isinstance(row, dict): request_code = str(row.get("request_code") or "").strip() except Exception: request_code = "" if request_code: onboarding_url = f"{settings.PORTAL_PUBLIC_BASE_URL}/onboarding?code={quote(request_code)}" return jsonify( { "user": {"username": username, "email": keycloak_email, "groups": g.keycloak_groups}, "onboarding_url": onboarding_url, "mailu": {"status": mailu_status, "username": mailu_username, "app_password": mailu_app_password}, "nextcloud_mail": { "status": nextcloud_mail_status, "primary_email": nextcloud_mail_primary_email, "account_count": nextcloud_mail_account_count, "synced_at": nextcloud_mail_synced_at, }, "wger": { "status": wger_status, "username": username, "password": wger_password, "password_updated_at": wger_password_updated_at, }, "firefly": { "status": firefly_status, "username": firefly_username, "password": firefly_password, "password_updated_at": firefly_password_updated_at, }, "vaultwarden": { "status": vaultwarden_status, "username": vaultwarden_username, "synced_at": vaultwarden_synced_at, }, "jellyfin": { "status": jellyfin_status, "username": username, "sync_status": jellyfin_sync_status, "sync_detail": jellyfin_sync_detail, }, } ) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = require_account_access() if not ok: return resp if ariadne_client.enabled(): return ariadne_client.proxy("POST", "/api/account/mailu/rotate") if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = random_password() try: admin_client().set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 sync_enabled = bool(settings.MAILU_SYNC_URL) sync_ok = False sync_error = "" if sync_enabled: try: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_mailu_rotate"}, ) sync_ok = resp.status_code == 200 if not sync_ok: sync_error = f"sync status {resp.status_code}" except Exception: sync_error = "sync request failed" nextcloud_sync: dict[str, Any] = {"status": "skipped"} try: nextcloud_sync = trigger_nextcloud_mail_sync(username, wait=True) except Exception: nextcloud_sync = {"status": "error"} return jsonify( { "password": password, "sync_enabled": sync_enabled, "sync_ok": sync_ok, "sync_error": sync_error, "nextcloud_sync": nextcloud_sync, } ) @app.route("/api/account/wger/reset", methods=["POST"]) @require_auth def account_wger_reset() -> Any: ok, resp = require_account_access() if not ok: return resp if ariadne_client.enabled(): return ariadne_client.proxy("POST", "/api/account/wger/reset") if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 keycloak_email = g.keycloak_email or "" mailu_email = "" try: user = admin_client().find_user(username) or {} attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu except Exception: pass email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}" password = random_password() try: result = trigger_wger_user_sync(username, email, password, wait=True) status_val = result.get("status") if isinstance(result, dict) else "error" if status_val != "ok": raise RuntimeError(f"wger sync {status_val}") except Exception as exc: message = str(exc).strip() or "wger sync failed" return jsonify({"error": message}), 502 try: admin_client().set_user_attribute(username, "wger_password", password) admin_client().set_user_attribute( username, "wger_password_updated_at", time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), ) except Exception: return jsonify({"error": "failed to store wger password"}), 502 return jsonify({"status": "ok", "password": password}) @app.route("/api/account/firefly/reset", methods=["POST"]) @require_auth def account_firefly_reset() -> Any: ok, resp = require_account_access() if not ok: return resp if ariadne_client.enabled(): return ariadne_client.proxy("POST", "/api/account/firefly/reset") if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 keycloak_email = g.keycloak_email or "" mailu_email = "" try: user = admin_client().find_user(username) or {} attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu except Exception: pass email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}" password = random_password(24) try: result = trigger_firefly_user_sync(username, email, password, wait=True) status_val = result.get("status") if isinstance(result, dict) else "error" if status_val != "ok": raise RuntimeError(f"firefly sync {status_val}") except Exception as exc: message = str(exc).strip() or "firefly sync failed" return jsonify({"error": message}), 502 try: admin_client().set_user_attribute(username, "firefly_password", password) admin_client().set_user_attribute( username, "firefly_password_updated_at", time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), ) except Exception: return jsonify({"error": "failed to store firefly password"}), 502 return jsonify({"status": "ok", "password": password}) @app.route("/api/account/nextcloud/mail/sync", methods=["POST"]) @require_auth def account_nextcloud_mail_sync() -> Any: ok, resp = require_account_access() if not ok: return resp if ariadne_client.enabled(): payload = request.get_json(silent=True) or {} return ariadne_client.proxy("POST", "/api/account/nextcloud/mail/sync", payload=payload) if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 payload = request.get_json(silent=True) or {} wait = bool(payload.get("wait", True)) try: result = trigger_nextcloud_mail_sync(username, wait=wait) return jsonify(result) except Exception as exc: message = str(exc).strip() or "failed to sync nextcloud mail" return jsonify({"error": message}), 502