ariadne/tests/unit/app/test_app_game_routes.py

586 lines
25 KiB
Python
Raw Normal View History

import asyncio
import pytest
from ariadne import app_game_routes
from tests.unit.app.app_route_helpers import *
2026-05-21 15:53:17 -03:00
class DummyWolfApi:
def __init__(self) -> None:
self.paired: list[tuple[str, str]] = []
def enabled(self) -> bool:
return True
def clients(self):
return {"success": True, "clients": [{"client_id": "abc123456", "hostname": "Moonlight Deck"}]}
def pending_pair_requests(self):
return {"success": True, "requests": [{"pair_secret": "secret-1", "client_ip": "1.2.3.4", "hostname": "Laptop"}]}
def sessions(self):
return {"success": True, "sessions": [{"id": "session-1"}]}
def apps(self):
return {"success": True, "apps": [{"name": "Steam"}]}
def pair_client(self, pair_secret: str, pin: str):
self.paired.append((pair_secret, pin))
return {"success": True}
class DummyGatekeeper:
def __init__(self) -> None:
self.unlocks: list[tuple[str, int, str, str | None]] = []
def enabled(self) -> bool:
return True
def status(self):
return {"success": True, "active_unlocks": [{"ip": "1.2.3.4", "expires_in_seconds": 300}]}
def unlock(self, ip: str, ttl_seconds: int, actor: str, target_user: str | None = None):
self.unlocks.append((ip, ttl_seconds, actor, target_user))
return {"success": True, "ip": ip, "ttl_seconds": ttl_seconds, "target_user": target_user}
def revoke(self, ip: str):
return {"success": True, "revoked": ip}
class DisabledService:
def enabled(self) -> bool:
return False
class FailingWolfApi(DummyWolfApi):
def clients(self):
raise RuntimeError("wolf down")
def pending_pair_requests(self):
raise RuntimeError("wolf down")
def sessions(self):
raise RuntimeError("wolf down")
def apps(self):
raise RuntimeError("wolf down")
def pair_client(self, pair_secret: str, pin: str):
raise RuntimeError("pair down")
class FailingPendingWolfApi(DummyWolfApi):
def pending_pair_requests(self):
raise RuntimeError("pending down")
class FailingGatekeeper(DummyGatekeeper):
def status(self):
raise RuntimeError("gatekeeper down")
def unlock(self, ip: str, ttl_seconds: int, actor: str, target_user: str | None = None):
raise RuntimeError("unlock down")
def revoke(self, ip: str):
raise RuntimeError("revoke down")
def test_game_stream_helper_edges() -> None:
assert app_game_routes._clean_ip(" 127.0.0.1, proxy") == "127.0.0.1"
with pytest.raises(HTTPException) as missing_ip:
app_game_routes._clean_ip("")
assert missing_ip.value.status_code == 400
with pytest.raises(HTTPException) as invalid_ip:
app_game_routes._clean_ip("not-an-ip")
assert invalid_ip.value.status_code == 400
assert app_game_routes._as_list({"clients": "nope"}, ("clients",)) == []
clients = app_game_routes._summarize_clients({"clients": [{"client_id": "abcdef123456"}]})
assert clients[0]["name"] == "paired-123456"
pending = app_game_routes._summarize_pending({"requests": [{}]})
assert pending == [{"name": "paired-ding-1", "client_ip": "", "raw": {}, "pair_secret": ""}]
assert app_game_routes._gpu_priority({"active": True}) == "wolf"
assert app_game_routes._gpu_priority({"active": False, "workloads": []}) == "unknown"
assert (
app_game_routes._gpu_priority(
{"active": False, "workloads": [{"effective_replicas": 0}, {"effective_replicas": 1}]}
)
== "mixed"
)
def test_game_stream_dashboard(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
resp = client.get("/", headers={"Authorization": "Bearer token"})
assert resp.status_code == 200
assert "Wolf Game Stream" in resp.text
assert "/api/admin/game-mode/${kind}" in resp.text
def test_game_stream_profile_me(monkeypatch) -> None:
ctx = AuthContext(username="brad", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
resp = client.get("/api/game-stream/me", headers={"Authorization": "Bearer token"})
assert resp.status_code == 200
assert resp.json()["allowed"] is True
2026-05-21 15:53:17 -03:00
def test_game_stream_status_for_user(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DummyWolfApi())
monkeypatch.setattr(app_module, "wolf_gatekeeper", DummyGatekeeper())
monkeypatch.setattr(
app_module.game_mode,
"status",
lambda: {
"status": "idle",
"active": False,
"node": "titan-24",
"game": "unknown",
"workloads": [{"namespace": "openclaw", "name": "openclaw-ollama", "effective_replicas": 1}],
},
)
resp = client.get("/api/game-stream/status?source_ip=1.2.3.4", headers={"Authorization": "Bearer token"})
data = resp.json()
assert resp.status_code == 200
assert data["can_control_gpu"] is False
assert data["gpu"]["priority"] == "ai"
assert data["moonlight"]["host"] == app_module.settings.game_stream_moonlight_host
assert data["wolf"]["clients"][0]["name"] == "Moonlight Deck"
assert data["wolf"]["pending_pair_requests"][0]["pair_secret"] == "secret-1"
def test_game_stream_status_edges(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DisabledService())
monkeypatch.setattr(app_module, "wolf_gatekeeper", DisabledService())
monkeypatch.setattr(app_module.game_mode, "status", lambda: (_ for _ in ()).throw(RuntimeError("boom")))
resp = client.get("/api/game-stream/status", headers={"Authorization": "Bearer token", "x-portal-client-ip": "1.2.3.4"})
data = resp.json()
assert resp.status_code == 200
assert data["moonlight"]["source_ip"] == ""
assert data["gpu"]["game_mode"]["status"] == "unavailable"
assert data["wolf"]["api_enabled"] is False
assert data["firewall"]["enabled"] is False
monkeypatch.setattr(app_module, "wolf_api", FailingWolfApi())
failed = client.get("/api/game-stream/status?source_ip=1.2.3.4", headers={"Authorization": "Bearer token"})
assert failed.status_code == 200
assert failed.json()["wolf"]["errors"]
def test_game_stream_status_rejects_forbidden_and_bad_ip(monkeypatch) -> None:
client = _client(monkeypatch, AuthContext(username="viewer", email="", groups=["other"], claims={}))
forbidden = client.get("/api/game-stream/status", headers={"Authorization": "Bearer token"})
assert forbidden.status_code == 403
allowed = _client(monkeypatch, AuthContext(username="olya", email="", groups=["game-stream-users"], claims={}))
bad_ip = allowed.get("/api/game-stream/status?source_ip=bad-ip", headers={"Authorization": "Bearer token"})
assert bad_ip.status_code == 400
def test_game_stream_firewall_status(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DummyGatekeeper())
resp = client.get("/api/game-stream/firewall/status", headers={"Authorization": "Bearer token"})
assert resp.status_code == 200
assert resp.json()["active_unlocks"][0]["ip"] == "1.2.3.4"
def test_game_stream_firewall_status_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DisabledService())
disabled = client.get("/api/game-stream/firewall/status", headers={"Authorization": "Bearer token"})
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_gatekeeper", FailingGatekeeper())
failed = client.get("/api/game-stream/firewall/status", headers={"Authorization": "Bearer token"})
assert failed.status_code == 502
2026-05-21 15:53:17 -03:00
def test_game_stream_firewall_unlocks(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
gatekeeper = DummyGatekeeper()
monkeypatch.setattr(app_module, "wolf_gatekeeper", gatekeeper)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post("/api/game-stream/firewall/unlock", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert resp.status_code == 200
assert gatekeeper.unlocks == [("1.2.3.4", app_module.settings.game_stream_firewall_unlock_ttl_sec, "olya", "olya")]
def test_game_stream_firewall_unlock_uses_header_and_clamps_ttl(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
gatekeeper = DummyGatekeeper()
monkeypatch.setattr(app_module, "wolf_gatekeeper", gatekeeper)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post(
"/api/game-stream/firewall/unlock",
headers={"Authorization": "Bearer token", "x-portal-client-ip": "5.6.7.8"},
json={"ttl_seconds": "bad"},
)
assert resp.status_code == 200
assert gatekeeper.unlocks == [("5.6.7.8", app_module.settings.game_stream_firewall_unlock_ttl_sec, "olya", "olya")]
def test_game_stream_firewall_unlock_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DisabledService())
disabled = client.post("/api/game-stream/firewall/unlock", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_gatekeeper", FailingGatekeeper())
failed = client.post("/api/game-stream/firewall/unlock", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert failed.status_code == 502
2026-05-21 15:53:17 -03:00
def test_game_stream_admin_manual_unlock(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
gatekeeper = DummyGatekeeper()
monkeypatch.setattr(app_module, "wolf_gatekeeper", gatekeeper)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post(
"/api/admin/game-stream/firewall/unlock",
headers={"Authorization": "Bearer token"},
json={"ip": "5.6.7.8", "target_user": "olya"},
)
assert resp.status_code == 200
assert gatekeeper.unlocks == [("5.6.7.8", app_module.settings.game_stream_firewall_unlock_ttl_sec, "bstein", "olya")]
def test_game_stream_admin_manual_unlock_edges(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
gatekeeper = DummyGatekeeper()
monkeypatch.setattr(app_module, "wolf_gatekeeper", gatekeeper)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
invalid_ttl = client.post(
"/api/admin/game-stream/firewall/unlock",
headers={"Authorization": "Bearer token"},
json={"ip": "5.6.7.8", "ttl_seconds": "bad"},
)
assert invalid_ttl.status_code == 200
assert gatekeeper.unlocks[-1] == ("5.6.7.8", app_module.settings.game_stream_firewall_unlock_ttl_sec, "bstein", None)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DisabledService())
disabled = client.post(
"/api/admin/game-stream/firewall/unlock",
headers={"Authorization": "Bearer token"},
json={"ip": "5.6.7.8"},
)
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_gatekeeper", FailingGatekeeper())
failed = client.post(
"/api/admin/game-stream/firewall/unlock",
headers={"Authorization": "Bearer token"},
json={"ip": "5.6.7.8"},
)
assert failed.status_code == 502
def test_game_stream_firewall_revoke(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DummyGatekeeper())
resp = client.post("/api/game-stream/firewall/revoke", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert resp.status_code == 200
assert resp.json()["revoked"] == "1.2.3.4"
def test_game_stream_firewall_revoke_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_gatekeeper", DisabledService())
disabled = client.post("/api/game-stream/firewall/revoke", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_gatekeeper", FailingGatekeeper())
failed = client.post("/api/game-stream/firewall/revoke", headers={"Authorization": "Bearer token"}, json={"ip": "1.2.3.4"})
assert failed.status_code == 502
def test_game_stream_pairing_status(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DummyWolfApi())
resp = client.get("/api/game-stream/pairing/status?source_ip=1.2.3.4", headers={"Authorization": "Bearer token"})
data = resp.json()
assert resp.status_code == 200
assert data["pending_pair_requests"][0]["name"] == "Laptop"
assert data["clients"][0]["name"] == "Moonlight Deck"
def test_game_stream_pairing_status_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DisabledService())
disabled = client.get("/api/game-stream/pairing/status", headers={"Authorization": "Bearer token"})
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_api", FailingWolfApi())
failed = client.get("/api/game-stream/pairing/status", headers={"Authorization": "Bearer token"})
assert failed.status_code == 502
def test_game_stream_clients(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DummyWolfApi())
resp = client.get("/api/game-stream/clients", headers={"Authorization": "Bearer token"})
assert resp.status_code == 200
assert resp.json()["clients"][0]["name"] == "Moonlight Deck"
def test_game_stream_clients_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DisabledService())
disabled = client.get("/api/game-stream/clients", headers={"Authorization": "Bearer token"})
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_api", FailingWolfApi())
failed = client.get("/api/game-stream/clients", headers={"Authorization": "Bearer token"})
assert failed.status_code == 502
2026-05-21 15:53:17 -03:00
def test_game_stream_pairing_submit_pin(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
wolf = DummyWolfApi()
monkeypatch.setattr(app_module, "wolf_api", wolf)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"source_ip": "1.2.3.4", "pair_secret": "secret-1", "pin": "1234"},
)
assert resp.status_code == 200
assert wolf.paired == [("secret-1", "1234")]
def test_game_stream_pairing_submit_pin_admin_bypass(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
wolf = DummyWolfApi()
monkeypatch.setattr(app_module, "wolf_api", wolf)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"pair_secret": "secret-without-ip-check", "pin": "1234"},
)
assert resp.status_code == 200
assert wolf.paired == [("secret-without-ip-check", "1234")]
2026-05-21 15:53:17 -03:00
def test_game_stream_pairing_blocks_other_pending_ip(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DummyWolfApi())
resp = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"source_ip": "5.6.7.8", "pair_secret": "secret-1", "pin": "1234"},
)
assert resp.status_code == 403
def test_game_stream_pairing_submit_pin_errors(monkeypatch) -> None:
ctx = AuthContext(username="olya", email="", groups=["game-stream-users"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "wolf_api", DisabledService())
disabled = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"pair_secret": "secret-1", "pin": "1234"},
)
assert disabled.status_code == 503
monkeypatch.setattr(app_module, "wolf_api", DummyWolfApi())
missing_secret = client.post("/api/game-stream/pairing/submit-pin", headers={"Authorization": "Bearer token"}, json={"pin": "1234"})
assert missing_secret.status_code == 400
missing_pin = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"pair_secret": "secret-1"},
)
assert missing_pin.status_code == 400
monkeypatch.setattr(app_module, "wolf_api", FailingPendingWolfApi())
pending_failed = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"source_ip": "1.2.3.4", "pair_secret": "secret-1", "pin": "1234"},
)
assert pending_failed.status_code == 502
monkeypatch.setattr(app_module, "wolf_api", FailingWolfApi())
pair_failed = client.post(
"/api/game-stream/pairing/submit-pin",
headers={"Authorization": "Bearer token"},
json={"source_ip": "1.2.3.4", "pair_secret": "secret-1", "pin": "1234"},
)
assert pair_failed.status_code == 502
def test_game_mode_admin_start_and_stop(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
calls = []
monkeypatch.setattr(app_module.game_mode, "start", lambda game, note=None: calls.append(("start", game, note)) or {"status": "active"})
monkeypatch.setattr(app_module.game_mode, "stop", lambda game, note=None: calls.append(("stop", game, note)) or {"status": "idle"})
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: None)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
start = client.post("/api/admin/game-mode/start", headers={"Authorization": "Bearer token"}, json={"game": "arc", "note": "now"})
stop = client.post("/api/admin/game-mode/stop", headers={"Authorization": "Bearer token"}, json={"game": "arc"})
assert start.status_code == 200
assert stop.status_code == 200
assert calls[0] == ("start", "arc", "now")
def test_game_mode_hook_requires_token(monkeypatch) -> None:
ctx = AuthContext(username="", email="", groups=[], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "settings", dataclasses.replace(app_module.settings, game_mode_hook_token="secret"))
resp = client.post("/api/game-mode/start", json={"game": "arc"})
assert resp.status_code == 401
def test_game_mode_hook_requires_configured_token(monkeypatch) -> None:
ctx = AuthContext(username="", email="", groups=[], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "settings", dataclasses.replace(app_module.settings, game_mode_hook_token=""))
resp = client.post("/api/game-mode/start", headers={"Authorization": "Bearer secret"}, json={"game": "arc"})
assert resp.status_code == 503
def test_game_mode_hook_start_and_stop(monkeypatch) -> None:
ctx = AuthContext(username="", email="", groups=[], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "settings", dataclasses.replace(app_module.settings, game_mode_hook_token="secret"))
monkeypatch.setattr(app_module.game_mode, "start", lambda game, note=None: {"status": "active", "game": game})
monkeypatch.setattr(app_module.game_mode, "stop", lambda game, note=None: {"status": "idle", "game": game})
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: None)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
start = client.post("/api/game-mode/start", headers={"Authorization": "Bearer secret"}, json={"game": "arc"})
stop = client.post("/api/game-mode/stop", headers={"x-ariadne-game-mode-token": "secret"}, json={"game": "arc"})
assert start.status_code == 200
assert stop.status_code == 200
def test_game_mode_status_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.game_mode, "status", lambda: (_ for _ in ()).throw(RuntimeError("boom")))
resp = client.get("/api/admin/game-mode/status", headers={"Authorization": "Bearer token"})
assert resp.status_code == 502
def test_game_mode_action_error_records(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
recorded = []
monkeypatch.setattr(app_module.game_mode, "start", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("boom")))
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: recorded.append(args))
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post("/api/admin/game-mode/start", headers={"Authorization": "Bearer token"}, json={"game": "arc"})
assert resp.status_code == 502
assert recorded
def test_game_mode_action_rejects_invalid_kind(monkeypatch) -> None:
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: None)
with pytest.raises(HTTPException) as exc:
asyncio.run(app_game_routes._run_game_mode_action(app_module, "pause", {}, "tester"))
assert exc.value.status_code == 400
def test_wolf_oauth2_ensure(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.oauth2_proxy, "ensure_wolf", lambda: {"status": "ok", "client_id": "wolf"})
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: None)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post("/api/admin/game-stream/wolf/oauth2/ensure", headers={"Authorization": "Bearer token"})
assert resp.status_code == 200
assert resp.json()["client_id"] == "wolf"
def test_wolf_oauth2_ensure_error_paths(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: None)
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
monkeypatch.setattr(app_module.oauth2_proxy, "ensure_wolf", lambda: {"status": "error", "detail": "missing"})
resp = client.post("/api/admin/game-stream/wolf/oauth2/ensure", headers={"Authorization": "Bearer token"})
assert resp.status_code == 502
monkeypatch.setattr(app_module.oauth2_proxy, "ensure_wolf", lambda: (_ for _ in ()).throw(RuntimeError("boom")))
alias = client.post("/api/admin/game-stream/sunshine/oauth2/ensure", headers={"Authorization": "Bearer token"})
assert alias.status_code == 502
def test_record_simple_task_swallows_storage_errors(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.game_mode, "start", lambda game, note=None: {"status": "active", "game": game})
monkeypatch.setattr(app_module.storage, "record_task_run", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("db")))
monkeypatch.setattr(app_module, "_record_event", lambda *args, **kwargs: None)
resp = client.post("/api/admin/game-mode/start", headers={"Authorization": "Bearer token"}, json={"game": "arc"})
assert resp.status_code == 200