from __future__ import annotations import socket import time from typing import Any import httpx from flask import jsonify, g, request from .. import settings from ..keycloak import admin_client, require_auth, require_account_access from ..nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync from ..utils import random_password def _tcp_check(host: str, port: int, timeout_sec: float) -> bool: if not host or port <= 0: return False try: with socket.create_connection((host, port), timeout=timeout_sec): return True except OSError: return False def register(app) -> None: @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = require_account_access() if not ok: return resp username = g.keycloak_username keycloak_email = g.keycloak_email or "" mailu_email = "" mailu_app_password = "" mailu_status = "ready" nextcloud_mail_status = "unknown" nextcloud_mail_primary_email = "" nextcloud_mail_account_count = "" nextcloud_mail_synced_at = "" vaultwarden_email = "" vaultwarden_status = "" vaultwarden_synced_at = "" jellyfin_status = "ready" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "" jellyfin_user_is_ldap = False if not admin_client().ready(): mailu_status = "server not configured" jellyfin_status = "server not configured" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "keycloak admin not configured" elif username: try: user = admin_client().find_user(username) or {} if isinstance(user, dict): jellyfin_user_is_ldap = bool(user.get("federationLink")) if not keycloak_email: keycloak_email = str(user.get("email") or "") attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced user_id = user.get("id") if isinstance(user, dict) else None if user_id and ( not keycloak_email or not mailu_email or not mailu_app_password or not vaultwarden_email or not vaultwarden_status or not vaultwarden_synced_at ): full = admin_client().get_user(str(user_id)) if not keycloak_email: keycloak_email = str(full.get("email") or "") attrs = full.get("attributes") or {} if isinstance(attrs, dict): if not mailu_email: raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu and isinstance(raw_mailu[0], str): mailu_email = raw_mailu[0] elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu if not mailu_app_password: raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw if not nextcloud_mail_primary_email: raw_primary = attrs.get("nextcloud_mail_primary_email") if isinstance(raw_primary, list) and raw_primary: nextcloud_mail_primary_email = str(raw_primary[0]) elif isinstance(raw_primary, str) and raw_primary: nextcloud_mail_primary_email = raw_primary if not nextcloud_mail_account_count: raw_count = attrs.get("nextcloud_mail_account_count") if isinstance(raw_count, list) and raw_count: nextcloud_mail_account_count = str(raw_count[0]) elif isinstance(raw_count, str) and raw_count: nextcloud_mail_account_count = raw_count if not nextcloud_mail_synced_at: raw_synced = attrs.get("nextcloud_mail_synced_at") if isinstance(raw_synced, list) and raw_synced: nextcloud_mail_synced_at = str(raw_synced[0]) elif isinstance(raw_synced, str) and raw_synced: nextcloud_mail_synced_at = raw_synced if not vaultwarden_email: raw_vw_email = attrs.get("vaultwarden_email") if isinstance(raw_vw_email, list) and raw_vw_email: vaultwarden_email = str(raw_vw_email[0]) elif isinstance(raw_vw_email, str) and raw_vw_email: vaultwarden_email = raw_vw_email if not vaultwarden_status: raw_vw_status = attrs.get("vaultwarden_status") if isinstance(raw_vw_status, list) and raw_vw_status: vaultwarden_status = str(raw_vw_status[0]) elif isinstance(raw_vw_status, str) and raw_vw_status: vaultwarden_status = raw_vw_status if not vaultwarden_synced_at: raw_vw_synced = attrs.get("vaultwarden_synced_at") if isinstance(raw_vw_synced, list) and raw_vw_synced: vaultwarden_synced_at = str(raw_vw_synced[0]) elif isinstance(raw_vw_synced, str) and raw_vw_synced: vaultwarden_synced_at = raw_vw_synced except Exception: mailu_status = "unavailable" nextcloud_mail_status = "unavailable" vaultwarden_status = "unavailable" jellyfin_status = "unavailable" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "unavailable" mailu_username = mailu_email or (f"{username}@{settings.MAILU_DOMAIN}" if username else "") vaultwarden_username = vaultwarden_email or mailu_username if not mailu_app_password and mailu_status == "ready": mailu_status = "needs app password" if nextcloud_mail_status == "unknown": try: count_val = int(nextcloud_mail_account_count) if nextcloud_mail_account_count else 0 except ValueError: count_val = 0 if count_val > 0: nextcloud_mail_status = "ready" else: nextcloud_mail_status = "needs sync" if jellyfin_status == "ready": ldap_reachable = _tcp_check( settings.JELLYFIN_LDAP_HOST, settings.JELLYFIN_LDAP_PORT, settings.JELLYFIN_LDAP_CHECK_TIMEOUT_SEC, ) if not ldap_reachable: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "LDAP unreachable" elif not jellyfin_user_is_ldap: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "Keycloak user is not LDAP-backed" else: jellyfin_sync_status = "ok" jellyfin_sync_detail = "LDAP-backed (Keycloak is source of truth)" if not vaultwarden_status: vaultwarden_status = "needs provisioning" return jsonify( { "user": {"username": username, "email": keycloak_email, "groups": g.keycloak_groups}, "mailu": {"status": mailu_status, "username": mailu_username, "app_password": mailu_app_password}, "nextcloud_mail": { "status": nextcloud_mail_status, "primary_email": nextcloud_mail_primary_email, "account_count": nextcloud_mail_account_count, "synced_at": nextcloud_mail_synced_at, }, "vaultwarden": { "status": vaultwarden_status, "username": vaultwarden_username, "synced_at": vaultwarden_synced_at, }, "jellyfin": { "status": jellyfin_status, "username": username, "sync_status": jellyfin_sync_status, "sync_detail": jellyfin_sync_detail, }, } ) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = random_password() try: admin_client().set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 sync_enabled = bool(settings.MAILU_SYNC_URL) sync_ok = False sync_error = "" if sync_enabled: try: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_mailu_rotate"}, ) sync_ok = resp.status_code == 200 if not sync_ok: sync_error = f"sync status {resp.status_code}" except Exception: sync_error = "sync request failed" nextcloud_sync: dict[str, Any] = {"status": "skipped"} try: nextcloud_sync = trigger_nextcloud_mail_sync(username, wait=True) except Exception: nextcloud_sync = {"status": "error"} return jsonify( { "password": password, "sync_enabled": sync_enabled, "sync_ok": sync_ok, "sync_error": sync_error, "nextcloud_sync": nextcloud_sync, } ) @app.route("/api/account/nextcloud/mail/sync", methods=["POST"]) @require_auth def account_nextcloud_mail_sync() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 payload = request.get_json(silent=True) or {} wait = bool(payload.get("wait", True)) try: result = trigger_nextcloud_mail_sync(username, wait=wait) return jsonify(result) except Exception: return jsonify({"error": "failed to sync nextcloud mail"}), 502