from __future__ import annotations import socket import time from typing import Any import httpx from flask import jsonify, g from .. import settings from ..keycloak import admin_client, require_auth, require_account_access from ..utils import random_password def _tcp_check(host: str, port: int, timeout_sec: float) -> bool: if not host or port <= 0: return False try: with socket.create_connection((host, port), timeout=timeout_sec): return True except OSError: return False def register(app) -> None: @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = require_account_access() if not ok: return resp username = g.keycloak_username keycloak_email = g.keycloak_email or "" mailu_email = "" mailu_app_password = "" mailu_status = "ready" jellyfin_status = "ready" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "" jellyfin_user_is_ldap = False if not admin_client().ready(): mailu_status = "server not configured" jellyfin_status = "server not configured" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "keycloak admin not configured" elif username: try: user = admin_client().find_user(username) or {} if isinstance(user, dict): jellyfin_user_is_ldap = bool(user.get("federationLink")) if not keycloak_email: keycloak_email = str(user.get("email") or "") attrs = user.get("attributes") if isinstance(user, dict) else None if isinstance(attrs, dict): raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu: mailu_email = str(raw_mailu[0]) elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw user_id = user.get("id") if isinstance(user, dict) else None if user_id and (not keycloak_email or not mailu_email or not mailu_app_password): full = admin_client().get_user(str(user_id)) if not keycloak_email: keycloak_email = str(full.get("email") or "") attrs = full.get("attributes") or {} if isinstance(attrs, dict): if not mailu_email: raw_mailu = attrs.get("mailu_email") if isinstance(raw_mailu, list) and raw_mailu and isinstance(raw_mailu[0], str): mailu_email = raw_mailu[0] elif isinstance(raw_mailu, str) and raw_mailu: mailu_email = raw_mailu if not mailu_app_password: raw_pw = attrs.get("mailu_app_password") if isinstance(raw_pw, list) and raw_pw: mailu_app_password = str(raw_pw[0]) elif isinstance(raw_pw, str) and raw_pw: mailu_app_password = raw_pw except Exception: mailu_status = "unavailable" jellyfin_status = "unavailable" jellyfin_sync_status = "unknown" jellyfin_sync_detail = "unavailable" mailu_username = mailu_email or (f"{username}@{settings.MAILU_DOMAIN}" if username else "") if not mailu_app_password and mailu_status == "ready": mailu_status = "needs app password" if jellyfin_status == "ready": ldap_reachable = _tcp_check( settings.JELLYFIN_LDAP_HOST, settings.JELLYFIN_LDAP_PORT, settings.JELLYFIN_LDAP_CHECK_TIMEOUT_SEC, ) if not ldap_reachable: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "LDAP unreachable" elif not jellyfin_user_is_ldap: jellyfin_sync_status = "degraded" jellyfin_sync_detail = "Keycloak user is not LDAP-backed" else: jellyfin_sync_status = "ok" jellyfin_sync_detail = "LDAP-backed (Keycloak is source of truth)" return jsonify( { "user": {"username": username, "email": keycloak_email, "groups": g.keycloak_groups}, "mailu": {"status": mailu_status, "username": mailu_username, "app_password": mailu_app_password}, "jellyfin": { "status": jellyfin_status, "username": username, "sync_status": jellyfin_sync_status, "sync_detail": jellyfin_sync_detail, }, } ) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = require_account_access() if not ok: return resp if not admin_client().ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = random_password() try: admin_client().set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 sync_enabled = bool(settings.MAILU_SYNC_URL) sync_ok = False sync_error = "" if sync_enabled: try: with httpx.Client(timeout=30) as client: resp = client.post( settings.MAILU_SYNC_URL, json={"ts": int(time.time()), "wait": True, "reason": "portal_mailu_rotate"}, ) sync_ok = resp.status_code == 200 if not sync_ok: sync_error = f"sync status {resp.status_code}" except Exception: sync_error = "sync request failed" return jsonify( { "password": password, "sync_enabled": sync_enabled, "sync_ok": sync_ok, "sync_error": sync_error, } )