105 lines
3.7 KiB
Python
105 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from flask import jsonify, request
|
|
|
|
from .. import ariadne_client
|
|
from ..keycloak import require_auth, require_account_access
|
|
|
|
|
|
def _client_ip() -> str:
|
|
for header in ("CF-Connecting-IP", "X-Forwarded-For", "X-Real-IP"):
|
|
value = request.headers.get(header, "").strip()
|
|
if value:
|
|
return value.split(",", 1)[0].strip()
|
|
return request.remote_addr or ""
|
|
|
|
|
|
def _json_payload() -> dict[str, Any]:
|
|
payload = request.get_json(silent=True)
|
|
return payload if isinstance(payload, dict) else {}
|
|
|
|
|
|
def _require_account() -> tuple[bool, Any]:
|
|
ok, resp = require_account_access()
|
|
if not ok:
|
|
return False, resp
|
|
if not ariadne_client.enabled():
|
|
return False, (jsonify({"error": "ariadne not configured"}), 503)
|
|
return True, None
|
|
|
|
|
|
def register_account_wolf(app) -> None:
|
|
"""Register Wolf/Moonlight self-service account routes."""
|
|
|
|
@app.route("/api/account/wolf", methods=["GET"])
|
|
@require_auth
|
|
def account_wolf_status() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
return ariadne_client.proxy("GET", "/api/game-stream/status", params={"source_ip": _client_ip()})
|
|
|
|
@app.route("/api/account/wolf/firewall/unlock", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_firewall_unlock() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
payload = _json_payload()
|
|
payload["ip"] = _client_ip()
|
|
return ariadne_client.proxy("POST", "/api/game-stream/firewall/unlock", payload=payload)
|
|
|
|
@app.route("/api/account/wolf/firewall/revoke", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_firewall_revoke() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
payload = _json_payload()
|
|
payload["ip"] = payload.get("ip") or _client_ip()
|
|
return ariadne_client.proxy("POST", "/api/game-stream/firewall/revoke", payload=payload)
|
|
|
|
@app.route("/api/account/wolf/pairing/status", methods=["GET"])
|
|
@require_auth
|
|
def account_wolf_pairing_status() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
return ariadne_client.proxy("GET", "/api/game-stream/pairing/status", params={"source_ip": _client_ip()})
|
|
|
|
@app.route("/api/account/wolf/pairing/submit-pin", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_pairing_submit_pin() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
payload = _json_payload()
|
|
payload["source_ip"] = _client_ip()
|
|
return ariadne_client.proxy("POST", "/api/game-stream/pairing/submit-pin", payload=payload)
|
|
|
|
@app.route("/api/account/wolf/game-mode/start", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_game_mode_start() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
return ariadne_client.proxy("POST", "/api/admin/game-mode/start", payload=_json_payload())
|
|
|
|
@app.route("/api/account/wolf/game-mode/stop", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_game_mode_stop() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
return ariadne_client.proxy("POST", "/api/admin/game-mode/stop", payload=_json_payload())
|
|
|
|
@app.route("/api/account/wolf/admin/firewall/unlock", methods=["POST"])
|
|
@require_auth
|
|
def account_wolf_admin_firewall_unlock() -> Any:
|
|
ok, resp = _require_account()
|
|
if not ok:
|
|
return resp
|
|
return ariadne_client.proxy("POST", "/api/admin/game-stream/firewall/unlock", payload=_json_payload())
|