from __future__ import annotations import socket import time from typing import Any import httpx from flask import jsonify, g, request from .. import settings from ..keycloak import admin_client, require_auth, require_account_access from ..nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync from ..utils import random_password from ..firefly_user_sync import trigger as trigger_firefly_user_sync from ..wger_user_sync import trigger as trigger_wger_user_sync def _tcp_check(host: str, port: int, timeout_sec: float) -> bool: if not host or port <= 0: return False try: with socket.create_connection((host, port), timeout=timeout_sec): return True except OSError: return False def register(app) -> None: @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = require_account_access() if not ok: return resp username = g.keycloak_username keycloak_email = g.keycloak_email or "" mailu_email = "" mailu_app_password = "" mailu_status = "ready" nextcloud_mail_status = "unknown" nextcloud_mail_primary_email = "" nextcloud_mail_account_count = "" nextcloud_mail_synced_at = "" wger_status = "ready" wger_password = "" wger_password_updated_at = "" firefly_status = "ready" firefly_password = "" firefly_password_updated_at = "" vaultwarden_email = "" vaultwarden_status = "" vaultwarden_synced_at = "" jellyfin_status = "ready" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "" jellyfin_user_is_ldap = False if not admin_client().ready(): mailu_status = "server not configured" wger_status = "server not configured" firefly_status = "server not configured" jellyfin_status = "server not configured" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "keycloak admin not configured" elif username: try: user = admin_client().find_user(username) or {} if isinstance(user, dict): jellyfin_user_is_ldap = bool(user.get("federationLink")) if not keycloak_email: keycloak_email = str(user.get("email") or "") attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced raw_wger_password = attrs.get("wger_password") if isinstance(raw_wger_password, list) and raw_wger_password: wger_password = str(raw_wger_password[0]) elif isinstance(raw_wger_password, str) and raw_wger_password: wger_password = raw_wger_password raw_wger_updated = attrs.get("wger_password_updated_at") if isinstance(raw_wger_updated, list) and raw_wger_updated: wger_password_updated_at = str(raw_wger_updated[0]) elif isinstance(raw_wger_updated, str) and raw_wger_updated: wger_password_updated_at = raw_wger_updated raw_firefly_password = attrs.get("firefly_password") if isinstance(raw_firefly_password, list) and raw_firefly_password: firefly_password = str(raw_firefly_password[0]) elif isinstance(raw_firefly_password, str) and raw_firefly_password: firefly_password = raw_firefly_password raw_firefly_updated = attrs.get("firefly_password_updated_at") if isinstance(raw_firefly_updated, list) and raw_firefly_updated: firefly_password_updated_at = str(raw_firefly_updated[0]) elif isinstance(raw_firefly_updated, str) and raw_firefly_updated: firefly_password_updated_at = raw_firefly_updated raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced user_id = user.get("id") if isinstance(user, dict) else None if user_id and ( not keycloak_email or not mailu_email or not mailu_app_password or not wger_password or not wger_password_updated_at or not firefly_password or not firefly_password_updated_at or not vaultwarden_email or not vaultwarden_status or not vaultwarden_synced_at ): full = admin_client().get_user(str(user_id)) if not keycloak_email: keycloak_email = str(full.get("email") or "") attrs = full.get("attributes") or {} if isinstance(attrs, dict): if not mailu_email: raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu and isinstance(raw_mailu[0], str): mailu_email = raw_mailu[0] elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu if not mailu_app_password: raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw if not nextcloud_mail_primary_email: raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary if not nextcloud_mail_account_count: raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count if not nextcloud_mail_synced_at: raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced if not wger_password: raw_wger_password = attrs.get("wger_password") if isinstance(raw_wger_password, list) and raw_wger_password: wger_password = str(raw_wger_password[0]) elif isinstance(raw_wger_password, str) and raw_wger_password: wger_password = raw_wger_password if not wger_password_updated_at: raw_wger_updated = attrs.get("wger_password_updated_at") if isinstance(raw_wger_updated, list) and raw_wger_updated: wger_password_updated_at = str(raw_wger_updated[0]) elif isinstance(raw_wger_updated, str) and raw_wger_updated: wger_password_updated_at = raw_wger_updated if not firefly_password: raw_firefly_password = attrs.get("firefly_password") if isinstance(raw_firefly_password, list) and raw_firefly_password: firefly_password = str(raw_firefly_password[0]) elif isinstance(raw_firefly_password, str) and raw_firefly_password: firefly_password = raw_firefly_password if not firefly_password_updated_at: raw_firefly_updated = attrs.get("firefly_password_updated_at") if isinstance(raw_firefly_updated, list) and raw_firefly_updated: firefly_password_updated_at = str(raw_firefly_updated[0]) elif isinstance(raw_firefly_updated, str) and raw_firefly_updated: firefly_password_updated_at = raw_firefly_updated if not vaultwarden_email: raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email if not vaultwarden_status: raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status if not vaultwarden_synced_at: raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced except Exception: mailu_status = "unavailable" nextcloud_mail_status = "unavailable" wger_status = "unavailable" firefly_status = "unavailable" vaultwarden_status = "unavailable" jellyfin_status = "unavailable" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "unavailable" mailu_username = mailu_email or (f"{username}@{settings.MAILU_DOMAIN}" if username else "") vaultwarden_username = vaultwarden_email or mailu_username if not mailu_app_password and mailu_status == "ready": mailu_status = "needs app password" if not wger_password and wger_status == "ready": wger_status = "needs provisioning" if not firefly_password and firefly_status == "ready": firefly_status = "needs provisioning" if nextcloud_mail_status == "unknown": try: count_val = int(nextcloud_mail_account_count) if nextcloud_mail_account_count else 0 except ValueError: count_val = 0 if count_val > 0: nextcloud_mail_status = "ready" else: nextcloud_mail_status = "needs sync" if jellyfin_status == "ready": ldap_reachable = _tcp_check( settings.JELLYFIN_LDAP_HOST, settings.JELLYFIN_LDAP_PORT, settings.JELLYFIN_LDAP_CHECK_TIMEOUT_SEC, ) if not ldap_reachable: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "LDAP unreachable" elif not jellyfin_user_is_ldap: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "Keycloak user is not LDAP-backed" else: jellyfin_sync_status = "ok" jellyfin_sync_detail = "LDAP-backed (Keycloak is source of truth)" if not vaultwarden_status: vaultwarden_status = "needs provisioning" return jsonify( { "user": {"username": username, "email": keycloak_email, "groups": g.keycloak_groups}, "mailu": {"status": mailu_status, "username": mailu_username, "app_password": mailu_app_password}, "nextcloud_mail": { "status": nextcloud_mail_status, "primary_email": nextcloud_mail_primary_email, "account_count": nextcloud_mail_account_count, "synced_at": nextcloud_mail_synced_at, }, "wger": { "status": wger_status, "username": username, "password": wger_password, "password_updated_at": wger_password_updated_at, }, "firefly": { "status": firefly_status, "username": mailu_email or username, "password": firefly_password, "password_updated_at": firefly_password_updated_at, }, "vaultwarden": { "status": vaultwarden_status, "username": vaultwarden_username, "synced_at": vaultwarden_synced_at, }, "jellyfin": { "status": jellyfin_status, "username": username, "sync_status": jellyfin_sync_status, "sync_detail": jellyfin_sync_detail, }, } ) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = random_password() try: admin_client().set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 sync_enabled = bool(settings.MAILU_SYNC_URL) sync_ok = False sync_error = "" if sync_enabled: try: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_mailu_rotate"}, ) sync_ok = resp.status_code == 200 if not sync_ok: sync_error = f"sync status {resp.status_code}" except Exception: sync_error = "sync request failed" nextcloud_sync: dict[str, Any] = {"status": "skipped"} try: nextcloud_sync = trigger_nextcloud_mail_sync(username, wait=True) except Exception: nextcloud_sync = {"status": "error"} return jsonify( { "password": password, "sync_enabled": sync_enabled, "sync_ok": sync_ok, "sync_error": sync_error, "nextcloud_sync": nextcloud_sync, } ) @app.route("/api/account/wger/reset", methods=["POST"]) @require_auth def account_wger_reset() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 keycloak_email = g.keycloak_email or "" mailu_email = "" try: user = admin_client().find_user(username) or {} attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu except Exception: pass email = mailu_email or keycloak_email or f"{username}@{settings.MAILU_DOMAIN}" password = random_password() try: result = trigger_wger_user_sync(username, email, password, wait=True) status_val = result.get("status") if isinstance(result, dict) else "error" if status_val != "ok": raise RuntimeError(f"wger sync {status_val}") except Exception as exc: message = str(exc).strip() or "wger sync failed" return jsonify({"error": message}), 502 try: admin_client().set_user_attribute(username, "wger_password", password) admin_client().set_user_attribute( username, "wger_password_updated_at", time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), ) except Exception: return jsonify({"error": "failed to store wger password"}), 502 return jsonify({"status": "ok", "password": password}) @app.route("/api/account/firefly/reset", methods=["POST"]) @require_auth def account_firefly_reset() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 keycloak_email = g.keycloak_email or "" mailu_email = "" try: user = admin_client().find_user(username) or {} attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu except Exception: pass email = mailu_email or keycloak_email or f\"{username}@{settings.MAILU_DOMAIN}\" password = random_password(24) try: result = trigger_firefly_user_sync(username, email, password, wait=True) status_val = result.get(\"status\") if isinstance(result, dict) else \"error\" if status_val != \"ok\": raise RuntimeError(f\"firefly sync {status_val}\") except Exception as exc: message = str(exc).strip() or \"firefly sync failed\" return jsonify({\"error\": message}), 502 try: admin_client().set_user_attribute(username, \"firefly_password\", password) admin_client().set_user_attribute( username, \"firefly_password_updated_at\", time.strftime(\"%Y-%m-%dT%H:%M:%SZ\", time.gmtime()), ) except Exception: return jsonify({\"error\": \"failed to store firefly password\"}), 502 return jsonify({\"status\": \"ok\", \"password\": password}) @app.route("/api/account/nextcloud/mail/sync", methods=["POST"]) @require_auth def account_nextcloud_mail_sync() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 payload = request.get_json(silent=True) or {} wait = bool(payload.get("wait", True)) try: result = trigger_nextcloud_mail_sync(username, wait=wait) return jsonify(result) except Exception: return jsonify({"error": "failed to sync nextcloud mail"}), 502