from __future__ import annotations import ipaddress from typing import Any from flask import jsonify, request from .. import ariadne_client from ..keycloak import require_auth, require_account_access def _valid_ip(value: str) -> str: return str(ipaddress.ip_address(value.strip())) def _public_ip_from_chain(values: list[str]) -> str: """Return the nearest public client IP from a trusted proxy chain.""" candidates: list[str] = [] for value in values: for item in value.split(","): item = item.strip() if item: candidates.append(item) for candidate in reversed(candidates): try: ip = ipaddress.ip_address(candidate) except ValueError: continue if ip.is_global: return str(ip) for candidate in reversed(candidates): try: return _valid_ip(candidate) except ValueError: continue return "" def _client_ip() -> str: # Walk right-to-left through proxy-added headers. This ignores spoofed # left-most values while still finding the public IP before cluster hops. chain = [ request.headers.get("X-Forwarded-For", ""), request.headers.get("X-Real-IP", ""), request.remote_addr or "", ] return _public_ip_from_chain(chain) def _public_payload_ip(payload: dict[str, Any] | None = None) -> str: values = [] if payload: value = payload.get("ip") or payload.get("source_ip") if isinstance(value, str): values.append(value) query_ip = request.args.get("source_ip", "") if query_ip: values.append(query_ip) for value in values: try: ip = ipaddress.ip_address(value.strip()) except ValueError: continue if ip.is_global: return str(ip) return "" def _source_ip(payload: dict[str, Any] | None = None) -> str: return _public_payload_ip(payload) or _client_ip() def _json_payload() -> dict[str, Any]: payload = request.get_json(silent=True) return payload if isinstance(payload, dict) else {} def _require_account() -> tuple[bool, Any]: ok, resp = require_account_access() if not ok: return False, resp if not ariadne_client.enabled(): return False, (jsonify({"error": "ariadne not configured"}), 503) return True, None def register_account_wolf(app) -> None: """Register Wolf/Moonlight self-service account routes.""" @app.route("/api/account/wolf", methods=["GET"]) @require_auth def account_wolf_status() -> Any: ok, resp = _require_account() if not ok: return resp return ariadne_client.proxy("GET", "/api/game-stream/status", params={"source_ip": _source_ip()}) @app.route("/api/account/wolf/firewall/unlock", methods=["POST"]) @require_auth def account_wolf_firewall_unlock() -> Any: ok, resp = _require_account() if not ok: return resp payload = _json_payload() payload["ip"] = _source_ip(payload) return ariadne_client.proxy("POST", "/api/game-stream/firewall/unlock", payload=payload) @app.route("/api/account/wolf/firewall/revoke", methods=["POST"]) @require_auth def account_wolf_firewall_revoke() -> Any: ok, resp = _require_account() if not ok: return resp payload = _json_payload() payload["ip"] = _source_ip(payload) return ariadne_client.proxy("POST", "/api/game-stream/firewall/revoke", payload=payload) @app.route("/api/account/wolf/pairing/status", methods=["GET"]) @require_auth def account_wolf_pairing_status() -> Any: ok, resp = _require_account() if not ok: return resp return ariadne_client.proxy("GET", "/api/game-stream/pairing/status", params={"source_ip": _source_ip()}) @app.route("/api/account/wolf/pairing/submit-pin", methods=["POST"]) @require_auth def account_wolf_pairing_submit_pin() -> Any: ok, resp = _require_account() if not ok: return resp payload = _json_payload() payload["source_ip"] = _source_ip(payload) return ariadne_client.proxy("POST", "/api/game-stream/pairing/submit-pin", payload=payload) @app.route("/api/account/wolf/game-mode/start", methods=["POST"]) @require_auth def account_wolf_game_mode_start() -> Any: ok, resp = _require_account() if not ok: return resp return ariadne_client.proxy("POST", "/api/admin/game-mode/start", payload=_json_payload()) @app.route("/api/account/wolf/game-mode/stop", methods=["POST"]) @require_auth def account_wolf_game_mode_stop() -> Any: ok, resp = _require_account() if not ok: return resp return ariadne_client.proxy("POST", "/api/admin/game-mode/stop", payload=_json_payload()) @app.route("/api/account/wolf/admin/firewall/unlock", methods=["POST"]) @require_auth def account_wolf_admin_firewall_unlock() -> Any: ok, resp = _require_account() if not ok: return resp return ariadne_client.proxy("POST", "/api/admin/game-stream/firewall/unlock", payload=_json_payload())