514 lines
24 KiB
Python

from __future__ import annotations
import socket
import time
from urllib.parse import quote
from typing import Any
import httpx
from flask import jsonify, g, request
from .. import settings
from ..db import connect
from ..keycloak import admin_client, require_auth, require_account_access
from ..nextcloud_mail_sync import trigger as trigger_nextcloud_mail_sync
from ..utils import random_password
from ..firefly_user_sync import trigger as trigger_firefly_user_sync
from ..wger_user_sync import trigger as trigger_wger_user_sync
def _tcp_check(host: str, port: int, timeout_sec: float) -> bool:
if not host or port <= 0:
return False
try:
with socket.create_connection((host, port), timeout=timeout_sec):
return True
except OSError:
return False
def register(app) -> None:
@app.route("/api/account/overview", methods=["GET"])
@require_auth
def account_overview() -> Any:
ok, resp = require_account_access()
if not ok:
return resp
username = g.keycloak_username
keycloak_email = g.keycloak_email or ""
mailu_email = ""
mailu_app_password = ""
mailu_status = "ready"
nextcloud_mail_status = "unknown"
nextcloud_mail_primary_email = ""
nextcloud_mail_account_count = ""
nextcloud_mail_synced_at = ""
wger_status = "ready"
wger_password = ""
wger_password_updated_at = ""
firefly_status = "ready"
firefly_password = ""
firefly_password_updated_at = ""
vaultwarden_email = ""
vaultwarden_status = ""
vaultwarden_synced_at = ""
jellyfin_status = "ready"
jellyfin_sync_status = "unknown"
jellyfin_sync_detail = ""
jellyfin_user_is_ldap = False
onboarding_url = ""
if not admin_client().ready():
mailu_status = "server not configured"
wger_status = "server not configured"
firefly_status = "server not configured"
jellyfin_status = "server not configured"
jellyfin_sync_status = "unknown"
jellyfin_sync_detail = "keycloak admin not configured"
elif username:
try:
user = admin_client().find_user(username) or {}
if isinstance(user, dict):
jellyfin_user_is_ldap = bool(user.get("federationLink"))
if not keycloak_email:
keycloak_email = str(user.get("email") or "")
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
mailu_email = str(raw_mailu[0])
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
raw_pw = attrs.get("mailu_app_password")
if isinstance(raw_pw, list) and raw_pw:
mailu_app_password = str(raw_pw[0])
elif isinstance(raw_pw, str) and raw_pw:
mailu_app_password = raw_pw
raw_primary = attrs.get("nextcloud_mail_primary_email")
if isinstance(raw_primary, list) and raw_primary:
nextcloud_mail_primary_email = str(raw_primary[0])
elif isinstance(raw_primary, str) and raw_primary:
nextcloud_mail_primary_email = raw_primary
raw_count = attrs.get("nextcloud_mail_account_count")
if isinstance(raw_count, list) and raw_count:
nextcloud_mail_account_count = str(raw_count[0])
elif isinstance(raw_count, str) and raw_count:
nextcloud_mail_account_count = raw_count
raw_synced = attrs.get("nextcloud_mail_synced_at")
if isinstance(raw_synced, list) and raw_synced:
nextcloud_mail_synced_at = str(raw_synced[0])
elif isinstance(raw_synced, str) and raw_synced:
nextcloud_mail_synced_at = raw_synced
raw_wger_password = attrs.get("wger_password")
if isinstance(raw_wger_password, list) and raw_wger_password:
wger_password = str(raw_wger_password[0])
elif isinstance(raw_wger_password, str) and raw_wger_password:
wger_password = raw_wger_password
raw_wger_updated = attrs.get("wger_password_updated_at")
if isinstance(raw_wger_updated, list) and raw_wger_updated:
wger_password_updated_at = str(raw_wger_updated[0])
elif isinstance(raw_wger_updated, str) and raw_wger_updated:
wger_password_updated_at = raw_wger_updated
raw_firefly_password = attrs.get("firefly_password")
if isinstance(raw_firefly_password, list) and raw_firefly_password:
firefly_password = str(raw_firefly_password[0])
elif isinstance(raw_firefly_password, str) and raw_firefly_password:
firefly_password = raw_firefly_password
raw_firefly_updated = attrs.get("firefly_password_updated_at")
if isinstance(raw_firefly_updated, list) and raw_firefly_updated:
firefly_password_updated_at = str(raw_firefly_updated[0])
elif isinstance(raw_firefly_updated, str) and raw_firefly_updated:
firefly_password_updated_at = raw_firefly_updated
raw_vw_email = attrs.get("vaultwarden_email")
if isinstance(raw_vw_email, list) and raw_vw_email:
vaultwarden_email = str(raw_vw_email[0])
elif isinstance(raw_vw_email, str) and raw_vw_email:
vaultwarden_email = raw_vw_email
raw_vw_status = attrs.get("vaultwarden_status")
if isinstance(raw_vw_status, list) and raw_vw_status:
vaultwarden_status = str(raw_vw_status[0])
elif isinstance(raw_vw_status, str) and raw_vw_status:
vaultwarden_status = raw_vw_status
raw_vw_synced = attrs.get("vaultwarden_synced_at")
if isinstance(raw_vw_synced, list) and raw_vw_synced:
vaultwarden_synced_at = str(raw_vw_synced[0])
elif isinstance(raw_vw_synced, str) and raw_vw_synced:
vaultwarden_synced_at = raw_vw_synced
user_id = user.get("id") if isinstance(user, dict) else None
if user_id and (
not keycloak_email
or not mailu_email
or not mailu_app_password
or not wger_password
or not wger_password_updated_at
or not firefly_password
or not firefly_password_updated_at
or not vaultwarden_email
or not vaultwarden_status
or not vaultwarden_synced_at
):
full = admin_client().get_user(str(user_id))
if not keycloak_email:
keycloak_email = str(full.get("email") or "")
attrs = full.get("attributes") or {}
if isinstance(attrs, dict):
if not mailu_email:
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu and isinstance(raw_mailu[0], str):
mailu_email = raw_mailu[0]
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
if not mailu_app_password:
raw_pw = attrs.get("mailu_app_password")
if isinstance(raw_pw, list) and raw_pw:
mailu_app_password = str(raw_pw[0])
elif isinstance(raw_pw, str) and raw_pw:
mailu_app_password = raw_pw
if not nextcloud_mail_primary_email:
raw_primary = attrs.get("nextcloud_mail_primary_email")
if isinstance(raw_primary, list) and raw_primary:
nextcloud_mail_primary_email = str(raw_primary[0])
elif isinstance(raw_primary, str) and raw_primary:
nextcloud_mail_primary_email = raw_primary
if not nextcloud_mail_account_count:
raw_count = attrs.get("nextcloud_mail_account_count")
if isinstance(raw_count, list) and raw_count:
nextcloud_mail_account_count = str(raw_count[0])
elif isinstance(raw_count, str) and raw_count:
nextcloud_mail_account_count = raw_count
if not nextcloud_mail_synced_at:
raw_synced = attrs.get("nextcloud_mail_synced_at")
if isinstance(raw_synced, list) and raw_synced:
nextcloud_mail_synced_at = str(raw_synced[0])
elif isinstance(raw_synced, str) and raw_synced:
nextcloud_mail_synced_at = raw_synced
if not wger_password:
raw_wger_password = attrs.get("wger_password")
if isinstance(raw_wger_password, list) and raw_wger_password:
wger_password = str(raw_wger_password[0])
elif isinstance(raw_wger_password, str) and raw_wger_password:
wger_password = raw_wger_password
if not wger_password_updated_at:
raw_wger_updated = attrs.get("wger_password_updated_at")
if isinstance(raw_wger_updated, list) and raw_wger_updated:
wger_password_updated_at = str(raw_wger_updated[0])
elif isinstance(raw_wger_updated, str) and raw_wger_updated:
wger_password_updated_at = raw_wger_updated
if not firefly_password:
raw_firefly_password = attrs.get("firefly_password")
if isinstance(raw_firefly_password, list) and raw_firefly_password:
firefly_password = str(raw_firefly_password[0])
elif isinstance(raw_firefly_password, str) and raw_firefly_password:
firefly_password = raw_firefly_password
if not firefly_password_updated_at:
raw_firefly_updated = attrs.get("firefly_password_updated_at")
if isinstance(raw_firefly_updated, list) and raw_firefly_updated:
firefly_password_updated_at = str(raw_firefly_updated[0])
elif isinstance(raw_firefly_updated, str) and raw_firefly_updated:
firefly_password_updated_at = raw_firefly_updated
if not vaultwarden_email:
raw_vw_email = attrs.get("vaultwarden_email")
if isinstance(raw_vw_email, list) and raw_vw_email:
vaultwarden_email = str(raw_vw_email[0])
elif isinstance(raw_vw_email, str) and raw_vw_email:
vaultwarden_email = raw_vw_email
if not vaultwarden_status:
raw_vw_status = attrs.get("vaultwarden_status")
if isinstance(raw_vw_status, list) and raw_vw_status:
vaultwarden_status = str(raw_vw_status[0])
elif isinstance(raw_vw_status, str) and raw_vw_status:
vaultwarden_status = raw_vw_status
if not vaultwarden_synced_at:
raw_vw_synced = attrs.get("vaultwarden_synced_at")
if isinstance(raw_vw_synced, list) and raw_vw_synced:
vaultwarden_synced_at = str(raw_vw_synced[0])
elif isinstance(raw_vw_synced, str) and raw_vw_synced:
vaultwarden_synced_at = raw_vw_synced
except Exception:
mailu_status = "unavailable"
nextcloud_mail_status = "unavailable"
wger_status = "unavailable"
firefly_status = "unavailable"
vaultwarden_status = "unavailable"
jellyfin_status = "unavailable"
jellyfin_sync_status = "unknown"
jellyfin_sync_detail = "unavailable"
mailu_username = mailu_email or (f"{username}@{settings.MAILU_DOMAIN}" if username else "")
firefly_username = mailu_username
vaultwarden_username = vaultwarden_email or mailu_username
if not mailu_app_password and mailu_status == "ready":
mailu_status = "needs app password"
if not wger_password and wger_status == "ready":
wger_status = "needs provisioning"
if not firefly_password and firefly_status == "ready":
firefly_status = "needs provisioning"
if nextcloud_mail_status == "unknown":
try:
count_val = int(nextcloud_mail_account_count) if nextcloud_mail_account_count else 0
except ValueError:
count_val = 0
if count_val > 0:
nextcloud_mail_status = "ready"
else:
nextcloud_mail_status = "needs sync"
if jellyfin_status == "ready":
ldap_reachable = _tcp_check(
settings.JELLYFIN_LDAP_HOST,
settings.JELLYFIN_LDAP_PORT,
settings.JELLYFIN_LDAP_CHECK_TIMEOUT_SEC,
)
if not ldap_reachable:
jellyfin_sync_status = "degraded"
jellyfin_sync_detail = "LDAP unreachable"
elif not jellyfin_user_is_ldap:
jellyfin_sync_status = "degraded"
jellyfin_sync_detail = "Keycloak user is not LDAP-backed"
else:
jellyfin_sync_status = "ok"
jellyfin_sync_detail = "LDAP-backed (Keycloak is source of truth)"
if not vaultwarden_status:
vaultwarden_status = "needs provisioning"
if settings.PORTAL_DATABASE_URL and username:
request_code = ""
try:
with connect() as conn:
row = conn.execute(
"SELECT request_code FROM access_requests WHERE username = %s ORDER BY created_at DESC LIMIT 1",
(username,),
).fetchone()
if not row and keycloak_email:
row = conn.execute(
"SELECT request_code FROM access_requests WHERE contact_email = %s ORDER BY created_at DESC LIMIT 1",
(keycloak_email,),
).fetchone()
if row and isinstance(row, dict):
request_code = str(row.get("request_code") or "").strip()
except Exception:
request_code = ""
if request_code:
onboarding_url = f"{settings.PORTAL_PUBLIC_BASE_URL}/onboarding?code={quote(request_code)}"
return jsonify(
{
"user": {"username": username, "email": keycloak_email, "groups": g.keycloak_groups},
"onboarding_url": onboarding_url,
"mailu": {"status": mailu_status, "username": mailu_username, "app_password": mailu_app_password},
"nextcloud_mail": {
"status": nextcloud_mail_status,
"primary_email": nextcloud_mail_primary_email,
"account_count": nextcloud_mail_account_count,
"synced_at": nextcloud_mail_synced_at,
},
"wger": {
"status": wger_status,
"username": username,
"password": wger_password,
"password_updated_at": wger_password_updated_at,
},
"firefly": {
"status": firefly_status,
"username": firefly_username,
"password": firefly_password,
"password_updated_at": firefly_password_updated_at,
},
"vaultwarden": {
"status": vaultwarden_status,
"username": vaultwarden_username,
"synced_at": vaultwarden_synced_at,
},
"jellyfin": {
"status": jellyfin_status,
"username": username,
"sync_status": jellyfin_sync_status,
"sync_detail": jellyfin_sync_detail,
},
}
)
@app.route("/api/account/mailu/rotate", methods=["POST"])
@require_auth
def account_mailu_rotate() -> Any:
ok, resp = require_account_access()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
password = random_password()
try:
admin_client().set_user_attribute(username, "mailu_app_password", password)
except Exception:
return jsonify({"error": "failed to update mail password"}), 502
sync_enabled = bool(settings.MAILU_SYNC_URL)
sync_ok = False
sync_error = ""
if sync_enabled:
try:
with httpx.Client(timeout=30) as client:
resp = client.post(
settings.MAILU_SYNC_URL,
json={"ts": int(time.time()), "wait": True, "reason": "portal_mailu_rotate"},
)
sync_ok = resp.status_code == 200
if not sync_ok:
sync_error = f"sync status {resp.status_code}"
except Exception:
sync_error = "sync request failed"
nextcloud_sync: dict[str, Any] = {"status": "skipped"}
try:
nextcloud_sync = trigger_nextcloud_mail_sync(username, wait=True)
except Exception:
nextcloud_sync = {"status": "error"}
return jsonify(
{
"password": password,
"sync_enabled": sync_enabled,
"sync_ok": sync_ok,
"sync_error": sync_error,
"nextcloud_sync": nextcloud_sync,
}
)
@app.route("/api/account/wger/reset", methods=["POST"])
@require_auth
def account_wger_reset() -> Any:
ok, resp = require_account_access()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
keycloak_email = g.keycloak_email or ""
mailu_email = ""
try:
user = admin_client().find_user(username) or {}
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
mailu_email = str(raw_mailu[0])
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
except Exception:
pass
email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}"
password = random_password()
try:
result = trigger_wger_user_sync(username, email, password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"wger sync {status_val}")
except Exception as exc:
message = str(exc).strip() or "wger sync failed"
return jsonify({"error": message}), 502
try:
admin_client().set_user_attribute(username, "wger_password", password)
admin_client().set_user_attribute(
username,
"wger_password_updated_at",
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
return jsonify({"error": "failed to store wger password"}), 502
return jsonify({"status": "ok", "password": password})
@app.route("/api/account/firefly/reset", methods=["POST"])
@require_auth
def account_firefly_reset() -> Any:
ok, resp = require_account_access()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
keycloak_email = g.keycloak_email or ""
mailu_email = ""
try:
user = admin_client().find_user(username) or {}
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
mailu_email = str(raw_mailu[0])
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
except Exception:
pass
email = mailu_email or f"{username}@{settings.MAILU_DOMAIN}"
password = random_password(24)
try:
result = trigger_firefly_user_sync(username, email, password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
except Exception as exc:
message = str(exc).strip() or "firefly sync failed"
return jsonify({"error": message}), 502
try:
admin_client().set_user_attribute(username, "firefly_password", password)
admin_client().set_user_attribute(
username,
"firefly_password_updated_at",
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
return jsonify({"error": "failed to store firefly password"}), 502
return jsonify({"status": "ok", "password": password})
@app.route("/api/account/nextcloud/mail/sync", methods=["POST"])
@require_auth
def account_nextcloud_mail_sync() -> Any:
ok, resp = require_account_access()
if not ok:
return resp
if not admin_client().ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
payload = request.get_json(silent=True) or {}
wait = bool(payload.get("wait", True))
try:
result = trigger_nextcloud_mail_sync(username, wait=wait)
return jsonify(result)
except Exception:
return jsonify({"error": "failed to sync nextcloud mail"}), 502