game-stream: add Wolf orchestration groundwork

This commit is contained in:
codex 2026-05-21 02:04:19 -03:00
parent 76499d46e3
commit 2687ac441e
16 changed files with 4354 additions and 101 deletions

File diff suppressed because it is too large Load Diff

View File

@ -24,10 +24,17 @@ def _read_service_account() -> tuple[str, str]:
return token, str(ca_path) return token, str(ca_path)
def _k8s_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any: def _k8s_request(
method: str,
path: str,
payload: dict[str, Any] | None = None,
extra_headers: dict[str, str] | None = None,
) -> Any:
token, ca_path = _read_service_account() token, ca_path = _read_service_account()
url = f"{_K8S_BASE_URL}{path}" url = f"{_K8S_BASE_URL}{path}"
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
if extra_headers:
headers.update(extra_headers)
with httpx.Client(verify=ca_path, timeout=settings.k8s_api_timeout_sec, headers=headers) as client: with httpx.Client(verify=ca_path, timeout=settings.k8s_api_timeout_sec, headers=headers) as client:
resp = client.request(method, url, json=payload) resp = client.request(method, url, json=payload)
resp.raise_for_status() resp.raise_for_status()
@ -52,6 +59,20 @@ def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
return data return data
def patch_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
"""Patch a Kubernetes API resource with a JSON merge patch."""
data = _k8s_request(
"PATCH",
path,
payload,
{"Content-Type": "application/merge-patch+json"},
)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
return data
def delete_json(path: str) -> dict[str, Any]: def delete_json(path: str) -> dict[str, Any]:
"""Delete a Kubernetes API resource and return the response payload.""" """Delete a Kubernetes API resource and return the response payload."""

View File

@ -69,6 +69,26 @@ CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY = Gauge(
"ariadne_cluster_kustomizations_not_ready", "ariadne_cluster_kustomizations_not_ready",
"Flux kustomizations not Ready", "Flux kustomizations not Ready",
) )
GAME_MODE_ACTIVE = Gauge(
"ariadne_game_mode_active",
"Ariadne game mode state (1=active,0=idle)",
["node", "game"],
)
GAME_MODE_TRANSITIONS_TOTAL = Counter(
"ariadne_game_mode_transitions_total",
"Ariadne game mode transitions by action and status",
["action", "status", "game"],
)
GAME_MODE_MANAGED_REPLICAS = Gauge(
"ariadne_game_mode_managed_replicas",
"Replicas for workloads managed by Ariadne game mode",
["namespace", "deployment"],
)
GAME_MODE_LAST_TRANSITION_TS = Gauge(
"ariadne_game_mode_last_transition_timestamp_seconds",
"Last Ariadne game mode transition timestamp",
["action", "status", "game"],
)
def record_task_run(task: str, status: str, duration_sec: float | None) -> None: def record_task_run(task: str, status: str, duration_sec: float | None) -> None:
@ -79,7 +99,13 @@ def record_task_run(task: str, status: str, duration_sec: float | None) -> None:
TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec) TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec)
def record_schedule_state(task: str, last_run_ts: float | None, last_success_ts: float | None, next_run_ts: float | None, ok: bool | None) -> None: def record_schedule_state(
task: str,
last_run_ts: float | None,
last_success_ts: float | None,
next_run_ts: float | None,
ok: bool | None,
) -> None:
"""Publish the latest scheduler timestamps and status for a task.""" """Publish the latest scheduler timestamps and status for a task."""
if last_run_ts: if last_run_ts:
@ -101,7 +127,13 @@ def set_access_request_counts(counts: dict[str, int]) -> None:
ACCESS_REQUESTS.labels(status=status).set(count) ACCESS_REQUESTS.labels(status=status).set(count)
def set_cluster_state_metrics(collected_at: datetime, nodes_total: int | None, nodes_ready: int | None, pods_running: float | None, kustomizations_not_ready: int | None) -> None: def set_cluster_state_metrics(
collected_at: datetime,
nodes_total: int | None,
nodes_ready: int | None,
pods_running: float | None,
kustomizations_not_ready: int | None,
) -> None:
"""Set cluster-state gauges from the most recent collector snapshot.""" """Set cluster-state gauges from the most recent collector snapshot."""
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp()) CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
@ -113,3 +145,24 @@ def set_cluster_state_metrics(collected_at: datetime, nodes_total: int | None, n
CLUSTER_STATE_PODS_RUNNING.set(pods_running) CLUSTER_STATE_PODS_RUNNING.set(pods_running)
if kustomizations_not_ready is not None: if kustomizations_not_ready is not None:
CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY.set(kustomizations_not_ready) CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY.set(kustomizations_not_ready)
def set_game_mode_state(node: str, game: str, active: bool) -> None:
"""Publish the current game-mode state."""
GAME_MODE_ACTIVE.labels(node=node or "unknown", game=game or "unknown").set(1 if active else 0)
def record_game_mode_transition(action: str, status: str, game: str) -> None:
"""Increment game-mode transition metrics."""
labels = {"action": action or "unknown", "status": status or "unknown", "game": game or "unknown"}
GAME_MODE_TRANSITIONS_TOTAL.labels(**labels).inc()
GAME_MODE_LAST_TRANSITION_TS.labels(**labels).set(datetime.now().timestamp())
def set_game_mode_managed_replicas(namespace: str, deployment: str, replicas: int | None) -> None:
"""Publish desired replicas for a workload that game mode can move."""
if replicas is not None:
GAME_MODE_MANAGED_REPLICAS.labels(namespace=namespace, deployment=deployment).set(replicas)

View File

@ -0,0 +1,162 @@
from __future__ import annotations
from dataclasses import dataclass
import threading
from typing import Any
from ..k8s.client import get_json, patch_json
from ..metrics.metrics import (
record_game_mode_transition,
set_game_mode_managed_replicas,
set_game_mode_state,
)
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
class ManagedWorkload:
kind: str
namespace: str
name: str
restore_replicas: int
class GameModeService:
"""Move shared titan-24 GPU resources between infrastructure and gaming."""
def __init__(self) -> None:
self._lock = threading.Lock()
self._current_game = ""
def _workloads(self) -> list[ManagedWorkload]:
workloads: list[ManagedWorkload] = []
for item in settings.game_mode_displace_workloads:
namespace = str(item.get("namespace") or "").strip()
name = str(item.get("name") or "").strip()
kind = str(item.get("kind") or "Deployment").strip() or "Deployment"
replicas = item.get("restoreReplicas", item.get("restore_replicas", 1))
if not namespace or not name:
continue
try:
restore_replicas = int(replicas)
except (TypeError, ValueError):
restore_replicas = 1
workloads.append(
ManagedWorkload(
kind=kind,
namespace=namespace,
name=name,
restore_replicas=max(0, restore_replicas),
)
)
return workloads
@staticmethod
def _game_name(game: str | None) -> str:
normalized = (game or "wolf").strip().lower().replace(" ", "-")
return normalized[:64] or "wolf"
@staticmethod
def _scale_path(workload: ManagedWorkload) -> str:
resource = {
"deployment": "deployments",
"deployments": "deployments",
"statefulset": "statefulsets",
"statefulsets": "statefulsets",
}.get(workload.kind.lower())
if not resource:
raise ValueError(f"unsupported game-mode workload kind: {workload.kind}")
return f"/apis/apps/v1/namespaces/{workload.namespace}/{resource}/{workload.name}/scale"
def _replicas(self, workload: ManagedWorkload) -> tuple[int | None, int | None]:
payload = get_json(self._scale_path(workload))
spec = payload.get("spec") if isinstance(payload.get("spec"), dict) else {}
status = payload.get("status") if isinstance(payload.get("status"), dict) else {}
desired = spec.get("replicas")
current = status.get("replicas")
return (
int(desired) if isinstance(desired, int) else None,
int(current) if isinstance(current, int) else None,
)
def _set_replicas(self, workload: ManagedWorkload, replicas: int) -> dict[str, Any]:
payload = patch_json(self._scale_path(workload), {"spec": {"replicas": replicas}})
set_game_mode_managed_replicas(workload.namespace, workload.name, replicas)
return payload
def status(self) -> dict[str, Any]:
workloads: list[dict[str, Any]] = []
for workload in self._workloads():
desired, current = self._replicas(workload)
set_game_mode_managed_replicas(workload.namespace, workload.name, desired)
workloads.append(
{
"kind": workload.kind,
"namespace": workload.namespace,
"name": workload.name,
"desired_replicas": desired,
"current_replicas": current,
"restore_replicas": workload.restore_replicas,
}
)
active = bool(workloads) and all(item["desired_replicas"] == 0 for item in workloads)
game = self._current_game or "unknown"
set_game_mode_state(settings.game_mode_node_name, game, active)
return {
"status": "active" if active else "idle",
"active": active,
"node": settings.game_mode_node_name,
"game": game,
"workloads": workloads,
}
def start(self, game: str | None = None, note: str | None = None) -> dict[str, Any]:
game_name = self._game_name(game)
with self._lock:
try:
for workload in self._workloads():
self._set_replicas(workload, 0)
self._current_game = game_name
set_game_mode_state(settings.game_mode_node_name, game_name, True)
record_game_mode_transition("start", "ok", game_name)
logger.info(
"game mode started",
extra={"event": "game_mode_start", "game": game_name, "note": note or ""},
)
result = self.status()
result["action"] = "start"
return result
except Exception:
record_game_mode_transition("start", "error", game_name)
logger.exception("game mode start failed", extra={"event": "game_mode_start", "game": game_name})
raise
def stop(self, game: str | None = None, note: str | None = None) -> dict[str, Any]:
game_name = self._game_name(game or self._current_game or "wolf")
with self._lock:
try:
for workload in self._workloads():
self._set_replicas(workload, workload.restore_replicas)
self._current_game = ""
set_game_mode_state(settings.game_mode_node_name, game_name, False)
record_game_mode_transition("stop", "ok", game_name)
logger.info(
"game mode stopped",
extra={"event": "game_mode_stop", "game": game_name, "note": note or ""},
)
result = self.status()
result["action"] = "stop"
result["game"] = game_name
return result
except Exception:
record_game_mode_transition("stop", "error", game_name)
logger.exception("game mode stop failed", extra={"event": "game_mode_stop", "game": game_name})
raise
game_mode = GameModeService()

View File

@ -0,0 +1,63 @@
from __future__ import annotations
import re
from typing import Any
from ..settings import settings
_SLUG_RE = re.compile(r"[^a-z0-9_-]+")
def _slug(value: str, fallback: str) -> str:
normalized = _SLUG_RE.sub("-", value.strip().lower()).strip("-")
return normalized[:64] or fallback
def _group_values(groups: list[str]) -> set[str]:
values: set[str] = set()
for group in groups:
stripped = group.strip()
if not stripped:
continue
values.add(stripped)
values.add(stripped.lstrip("/"))
return values
class GameStreamProfileService:
"""Map Keycloak users and groups into Wolf profile policy."""
def profile_for(self, username: str, groups: list[str]) -> dict[str, Any]:
clean_username = _slug(username, "user")
group_values = _group_values(groups)
user_group = settings.game_stream_user_group
admin_group = settings.game_stream_admin_group
prefix = settings.game_stream_profile_group_prefix
allowed = user_group in group_values or admin_group in group_values
profile_group = ""
profile_id = f"user-{clean_username}"
for group in sorted(group_values):
if not group.startswith(prefix):
continue
suffix = group[len(prefix) :]
if not suffix:
continue
profile_group = group
profile_id = _slug(suffix, profile_id)
allowed = True
break
return {
"username": username,
"allowed": allowed,
"profile_id": profile_id,
"profile_group": profile_group,
"user_group": user_group,
"admin_group": admin_group,
"profile_group_prefix": prefix,
}
game_stream_profiles = GameStreamProfileService()

View File

@ -158,6 +158,66 @@ class KeycloakAdminClient:
return location.rstrip("/").split("/")[-1] return location.rstrip("/").split("/")[-1]
raise RuntimeError("failed to determine created user id") raise RuntimeError("failed to determine created user id")
def find_client(self, client_id: str) -> dict[str, Any] | None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/clients"
params = {"clientId": client_id, "max": "1"}
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, list) or not payload:
return None
item = payload[0]
return item if isinstance(item, dict) else None
def create_client(self, payload: dict[str, Any]) -> None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/clients"
with httpx.Client(timeout=10.0) as client:
resp = client.post(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
def update_client(self, client_uuid: str, payload: dict[str, Any]) -> None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/clients/{client_uuid}"
with httpx.Client(timeout=10.0) as client:
resp = client.put(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
def get_client_secret(self, client_uuid: str) -> str:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/clients/{client_uuid}/client-secret"
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, headers=self._headers())
resp.raise_for_status()
payload = resp.json()
secret = payload.get("value") if isinstance(payload, dict) else None
if not isinstance(secret, str) or not secret:
raise RuntimeError("client secret missing")
return secret
def find_client_scope_id(self, scope_name: str) -> str | None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/client-scopes"
params = {"search": scope_name}
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, list):
return None
for item in payload:
if not isinstance(item, dict):
continue
if item.get("name") == scope_name and item.get("id"):
return str(item["id"])
return None
def attach_optional_client_scope(self, client_uuid: str, scope_id: str) -> None:
url = (
f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}"
f"/clients/{client_uuid}/optional-client-scopes/{scope_id}"
)
with httpx.Client(timeout=10.0) as client:
resp = client.put(url, headers=self._headers())
resp.raise_for_status()
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None: def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
url = ( url = (
f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}" f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}"

View File

@ -0,0 +1,91 @@
from __future__ import annotations
import secrets
from typing import Any
from .keycloak_admin import keycloak_admin
from .vault import vault
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
def _oauth_client_payload(client_id: str, base_url: str) -> dict[str, Any]:
return {
"clientId": client_id,
"enabled": True,
"protocol": "openid-connect",
"publicClient": False,
"standardFlowEnabled": True,
"implicitFlowEnabled": False,
"directAccessGrantsEnabled": False,
"serviceAccountsEnabled": False,
"redirectUris": [f"{base_url}/oauth2/callback"],
"webOrigins": [base_url],
"rootUrl": base_url,
"baseUrl": "/",
}
def _valid_cookie_secret(value: Any) -> str:
if not isinstance(value, str):
return ""
stripped = value.strip()
return stripped if len(stripped) in {16, 24, 32} else ""
class OAuth2ProxyService:
"""Ensure Keycloak and Vault state for oauth2-proxy frontends."""
def ensure_wolf(self) -> dict[str, Any]:
if not keycloak_admin.ready():
return {"status": "error", "detail": "keycloak admin client not configured"}
client_id = settings.wolf_oidc_client_id
base_url = settings.wolf_oidc_base_url
client = keycloak_admin.find_client(client_id)
payload = _oauth_client_payload(client_id, base_url)
if not client:
keycloak_admin.create_client(payload)
client = keycloak_admin.find_client(client_id)
if not client:
raise RuntimeError(f"keycloak client {client_id} not found")
client_uuid = str(client.get("id") or "")
if not client_uuid:
raise RuntimeError("keycloak client id missing")
keycloak_admin.update_client(client_uuid, payload)
scope_id = keycloak_admin.find_client_scope_id("groups")
if scope_id:
keycloak_admin.attach_optional_client_scope(client_uuid, scope_id)
client_secret = keycloak_admin.get_client_secret(client_uuid)
existing = vault.read_kv_secret(settings.wolf_oidc_vault_path) or {}
cookie_secret = _valid_cookie_secret(existing.get("cookie_secret")) or secrets.token_hex(16)
vault.write_kv_secret(
settings.wolf_oidc_vault_path,
{
"client_id": client_id,
"client_secret": client_secret,
"cookie_secret": cookie_secret,
},
)
logger.info(
"wolf oauth2 proxy secret ensured",
extra={"event": "wolf_oidc_ensure", "client_id": client_id, "vault_path": settings.wolf_oidc_vault_path},
)
return {
"status": "ok",
"client_id": client_id,
"base_url": base_url,
"vault_path": settings.wolf_oidc_vault_path,
}
def ensure_sunshine(self) -> dict[str, Any]:
return self.ensure_wolf()
oauth2_proxy = OAuth2ProxyService()

View File

@ -8,12 +8,10 @@ import httpx
from ..settings import settings from ..settings import settings
from ..utils.logging import get_logger from ..utils.logging import get_logger
from .vault_policies import DEV_KV_POLICY as _DEV_KV_POLICY
from .vault_policies import K8S_ROLES as _K8S_ROLES
from .vault_policies import VAULT_ADMIN_POLICY as _VAULT_ADMIN_POLICY
logger = get_logger(__name__) logger = get_logger(__name__)
HTTP_NOT_FOUND = 404
@dataclass(frozen=True) @dataclass(frozen=True)
@ -48,6 +46,270 @@ def _build_policy(read_paths: str, write_paths: str) -> str:
) )
return "\n".join(policy_parts).strip() + "\n" return "\n".join(policy_parts).strip() + "\n"
_K8S_ROLES: list[dict[str, str]] = [
{
"role": "outline",
"namespace": "outline",
"service_accounts": "outline-vault",
"read_paths": "outline/* shared/postmark-relay",
"write_paths": "",
},
{
"role": "planka",
"namespace": "planka",
"service_accounts": "planka-vault",
"read_paths": "planka/* shared/postmark-relay",
"write_paths": "",
},
{
"role": "bstein-dev-home",
"namespace": "bstein-dev-home",
"service_accounts": "bstein-dev-home,bstein-dev-home-vault-sync",
"read_paths": "portal/* shared/chat-ai-keys-runtime shared/portal-e2e-client shared/postmark-relay "
"mailu/mailu-initial-account-secret shared/harbor-pull",
"write_paths": "",
},
{
"role": "gitea",
"namespace": "gitea",
"service_accounts": "gitea-vault",
"read_paths": "gitea/*",
"write_paths": "",
},
{
"role": "vaultwarden",
"namespace": "vaultwarden",
"service_accounts": "vaultwarden-vault",
"read_paths": "vaultwarden/* mailu/mailu-initial-account-secret",
"write_paths": "",
},
{
"role": "sso",
"namespace": "sso",
"service_accounts": "sso-vault,sso-vault-sync,mas-secrets-ensure",
"read_paths": "sso/* portal/bstein-dev-home-keycloak-admin shared/keycloak-admin "
"shared/portal-e2e-client shared/postmark-relay shared/harbor-pull",
"write_paths": "",
},
{
"role": "mailu-mailserver",
"namespace": "mailu-mailserver",
"service_accounts": "mailu-vault-sync",
"read_paths": "mailu/* shared/postmark-relay shared/harbor-pull",
"write_paths": "",
},
{
"role": "harbor",
"namespace": "harbor",
"service_accounts": "harbor-vault-sync",
"read_paths": "harbor/* shared/harbor-pull",
"write_paths": "",
},
{
"role": "nextcloud",
"namespace": "nextcloud",
"service_accounts": "nextcloud-vault",
"read_paths": "nextcloud/* shared/keycloak-admin shared/postmark-relay",
"write_paths": "",
},
{
"role": "comms",
"namespace": "comms",
"service_accounts": "comms-vault,atlasbot",
"read_paths": "comms/* shared/chat-ai-keys-runtime shared/harbor-pull",
"write_paths": "",
},
{
"role": "jenkins",
"namespace": "jenkins",
"service_accounts": "jenkins",
"read_paths": "jenkins/*",
"write_paths": "",
},
{
"role": "monitoring",
"namespace": "monitoring",
"service_accounts": "monitoring-vault-sync",
"read_paths": "monitoring/* shared/postmark-relay shared/harbor-pull",
"write_paths": "",
},
{
"role": "logging",
"namespace": "logging",
"service_accounts": "logging-vault-sync",
"read_paths": "logging/* shared/harbor-pull",
"write_paths": "",
},
{
"role": "pegasus",
"namespace": "jellyfin",
"service_accounts": "pegasus-vault-sync",
"read_paths": "pegasus/* shared/harbor-pull",
"write_paths": "",
},
{
"role": "crypto",
"namespace": "crypto",
"service_accounts": "crypto-vault-sync",
"read_paths": "crypto/* shared/harbor-pull",
"write_paths": "",
},
{
"role": "health",
"namespace": "health",
"service_accounts": "health-vault-sync",
"read_paths": "health/*",
"write_paths": "",
},
{
"role": "game-stream",
"namespace": "game-stream",
"service_accounts": "game-stream-vault",
"read_paths": "game-stream/*",
"write_paths": "",
},
{
"role": "maintenance",
"namespace": "maintenance",
"service_accounts": "ariadne,maintenance-vault-sync",
"read_paths": "maintenance/ariadne-db portal/bstein-dev-home-keycloak-admin mailu/mailu-db-secret "
"mailu/mailu-initial-account-secret comms/synapse-admin shared/harbor-pull",
"write_paths": "",
},
{
"role": "finance",
"namespace": "finance",
"service_accounts": "finance-vault",
"read_paths": "finance/* shared/postmark-relay",
"write_paths": "",
},
{
"role": "finance-secrets",
"namespace": "finance",
"service_accounts": "finance-secrets-ensure",
"read_paths": "",
"write_paths": "finance/*",
},
{
"role": "longhorn",
"namespace": "longhorn-system",
"service_accounts": "longhorn-vault,longhorn-vault-sync",
"read_paths": "longhorn/* shared/harbor-pull",
"write_paths": "",
},
{
"role": "postgres",
"namespace": "postgres",
"service_accounts": "postgres-vault",
"read_paths": "postgres/postgres-db",
"write_paths": "",
},
{
"role": "vault",
"namespace": "vault",
"service_accounts": "vault",
"read_paths": "vault/*",
"write_paths": "",
},
{
"role": "sso-secrets",
"namespace": "sso",
"service_accounts": "mas-secrets-ensure",
"read_paths": "shared/keycloak-admin",
"write_paths": "harbor/harbor-oidc vault/vault-oidc-config comms/synapse-oidc "
"logging/oauth2-proxy-logs-oidc finance/actual-oidc",
},
{
"role": "crypto-secrets",
"namespace": "crypto",
"service_accounts": "crypto-secrets-ensure",
"read_paths": "",
"write_paths": "crypto/wallet-monero-temp-rpc-auth",
},
{
"role": "comms-secrets",
"namespace": "comms",
"service_accounts": "comms-secrets-ensure,mas-db-ensure,mas-admin-client-secret-writer,othrys-synapse-signingkey-job",
"read_paths": "",
"write_paths": "comms/turn-shared-secret comms/livekit-api comms/synapse-redis comms/synapse-macaroon "
"comms/atlasbot-credentials-runtime comms/synapse-db comms/synapse-admin comms/synapse-registration "
"comms/mas-db comms/mas-admin-client-runtime comms/mas-secrets-runtime comms/othrys-synapse-signingkey",
},
]
_VAULT_ADMIN_POLICY = """
path "sys/auth" {
capabilities = ["read"]
}
path "sys/auth/*" {
capabilities = ["create", "update", "delete", "sudo", "read"]
}
path "auth/kubernetes/*" {
capabilities = ["create", "update", "read"]
}
path "auth/oidc/*" {
capabilities = ["create", "update", "read"]
}
path "sys/policies/acl" {
capabilities = ["list"]
}
path "sys/policies/acl/*" {
capabilities = ["create", "update", "read"]
}
path "sys/internal/ui/mounts" {
capabilities = ["read"]
}
path "sys/mounts" {
capabilities = ["read"]
}
path "sys/mounts/auth/*" {
capabilities = ["read", "update", "sudo"]
}
path "kv/data/atlas/vault/*" {
capabilities = ["read"]
}
path "kv/metadata/atlas/vault/*" {
capabilities = ["list"]
}
path "kv/data/*" {
capabilities = ["create", "update", "read", "delete", "patch"]
}
path "kv/metadata" {
capabilities = ["list"]
}
path "kv/metadata/*" {
capabilities = ["read", "list", "delete"]
}
path "kv/data/atlas/shared/*" {
capabilities = ["create", "update", "read", "patch"]
}
path "kv/metadata/atlas/shared/*" {
capabilities = ["list"]
}
""".strip()
_DEV_KV_POLICY = """
path "kv/metadata" {
capabilities = ["list"]
}
path "kv/metadata/atlas" {
capabilities = ["list"]
}
path "kv/metadata/atlas/shared" {
capabilities = ["list"]
}
path "kv/metadata/atlas/shared/*" {
capabilities = ["list"]
}
path "kv/data/atlas/shared/*" {
capabilities = ["read"]
}
""".strip()
class VaultClient: class VaultClient:
"""Minimal HTTP client for Vault API requests.""" """Minimal HTTP client for Vault API requests."""
@ -108,6 +370,21 @@ class VaultService:
token = self._ensure_token() token = self._ensure_token()
return VaultClient(settings.vault_addr, token) return VaultClient(settings.vault_addr, token)
def read_kv_secret(self, path: str) -> dict[str, Any] | None:
clean_path = path.strip("/")
resp = self._client().request("GET", f"/v1/kv/data/atlas/{clean_path}")
if resp.status_code == HTTP_NOT_FOUND:
return None
resp.raise_for_status()
payload = resp.json()
data = payload.get("data", {}).get("data") if isinstance(payload, dict) else None
return data if isinstance(data, dict) else {}
def write_kv_secret(self, path: str, data: dict[str, Any]) -> None:
clean_path = path.strip("/")
resp = self._client().request("POST", f"/v1/kv/data/atlas/{clean_path}", json={"data": data})
resp.raise_for_status()
def _ensure_auth_enabled(self, client: VaultClient, auth_name: str, auth_type: str) -> None: def _ensure_auth_enabled(self, client: VaultClient, auth_name: str, auth_type: str) -> None:
resp = client.request("GET", "/v1/sys/auth") resp = client.request("GET", "/v1/sys/auth")
resp.raise_for_status() resp.raise_for_status()

View File

@ -1,29 +1,47 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import json
import os
from typing import Any
from .settings_env import _env, _env_bool, _env_float, _env_int
from .settings_sections import ( def _env(name: str, default: str = "") -> str:
_cluster_state_config, value = os.getenv(name, default)
_comms_config, return value.strip() if isinstance(value, str) else default
_firefly_config,
_image_sweeper_config,
_jenkins_build_weather_config, def _env_bool(name: str, default: str = "false") -> bool:
_jenkins_workspace_cleanup_config, return _env(name, default).lower() in {"1", "true", "yes", "y", "on"}
_keycloak_config,
_mailu_config,
_metis_config, def _env_int(name: str, default: int) -> int:
_nextcloud_config, raw = _env(name, str(default))
_opensearch_config, try:
_platform_quality_probe_config, return int(raw)
_portal_group_config, except ValueError:
_schedule_config, return default
_smtp_config,
_testing_triage_config,
_vault_config, def _env_float(name: str, default: float) -> float:
_vaultwarden_config, raw = _env(name, str(default))
_wger_config, try:
) return float(raw)
except ValueError:
return default
def _env_json_list(name: str, default: list[dict[str, Any]]) -> list[dict[str, Any]]:
raw = _env(name, "")
if not raw:
return default
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return default
if not isinstance(parsed, list):
return default
return [item for item in parsed if isinstance(item, dict)]
@dataclass(frozen=True) @dataclass(frozen=True)
@ -173,9 +191,6 @@ class Settings:
jenkins_workspace_cleanup_min_age_hours: float jenkins_workspace_cleanup_min_age_hours: float
jenkins_workspace_cleanup_dry_run: bool jenkins_workspace_cleanup_dry_run: bool
jenkins_workspace_cleanup_max_deletions_per_run: int jenkins_workspace_cleanup_max_deletions_per_run: int
testing_triage_model_url: str
testing_triage_model: str
testing_triage_model_timeout_sec: float
vaultwarden_namespace: str vaultwarden_namespace: str
vaultwarden_pod_label: str vaultwarden_pod_label: str
@ -228,6 +243,19 @@ class Settings:
keycloak_profile_cron: str keycloak_profile_cron: str
cluster_state_cron: str cluster_state_cron: str
cluster_state_keep: int cluster_state_keep: int
game_mode_node_name: str
game_mode_displace_workloads: list[dict[str, Any]]
game_mode_ollama_namespace: str
game_mode_ollama_deployment: str
game_mode_ollama_restore_replicas: int
game_mode_hook_token: str
wolf_oidc_client_id: str
wolf_oidc_base_url: str
wolf_oidc_vault_path: str
wolf_oidc_cron: str
game_stream_user_group: str
game_stream_admin_group: str
game_stream_profile_group_prefix: str
metis_base_url: str metis_base_url: str
metis_watch_url: str metis_watch_url: str
metis_timeout_sec: float metis_timeout_sec: float
@ -244,7 +272,6 @@ class Settings:
platform_quality_suite_probe_cron: str platform_quality_suite_probe_cron: str
jenkins_build_weather_cron: str jenkins_build_weather_cron: str
jenkins_workspace_cleanup_cron: str jenkins_workspace_cleanup_cron: str
testing_triage_cron: str
opensearch_url: str opensearch_url: str
opensearch_limit_bytes: int opensearch_limit_bytes: int
@ -253,27 +280,394 @@ class Settings:
metrics_path: str metrics_path: str
@classmethod
def _keycloak_config(cls) -> dict[str, Any]:
keycloak_url = _env("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/")
keycloak_realm = _env("KEYCLOAK_REALM", "atlas")
keycloak_client_id = _env("KEYCLOAK_CLIENT_ID", "bstein-dev-home")
keycloak_issuer = _env("KEYCLOAK_ISSUER", f"{keycloak_url}/realms/{keycloak_realm}").rstrip("/")
keycloak_jwks_url = _env("KEYCLOAK_JWKS_URL", f"{keycloak_issuer}/protocol/openid-connect/certs").rstrip("/")
return {
"keycloak_url": keycloak_url,
"keycloak_realm": keycloak_realm,
"keycloak_client_id": keycloak_client_id,
"keycloak_issuer": keycloak_issuer,
"keycloak_jwks_url": keycloak_jwks_url,
"keycloak_admin_url": _env("KEYCLOAK_ADMIN_URL", keycloak_url).rstrip("/"),
"keycloak_admin_realm": _env("KEYCLOAK_ADMIN_REALM", keycloak_realm),
"keycloak_admin_client_id": _env("KEYCLOAK_ADMIN_CLIENT_ID", ""),
"keycloak_admin_client_secret": _env("KEYCLOAK_ADMIN_CLIENT_SECRET", ""),
}
@classmethod
def _portal_group_config(cls) -> dict[str, Any]:
return {
"portal_admin_users": [u for u in (_env("PORTAL_ADMIN_USERS", "bstein")).split(",") if u.strip()],
"portal_admin_groups": [g for g in (_env("PORTAL_ADMIN_GROUPS", "admin")).split(",") if g.strip()],
"account_allowed_groups": [
g for g in (_env("ACCOUNT_ALLOWED_GROUPS", "dev,admin")).split(",") if g.strip()
],
"allowed_flag_groups": [g for g in (_env("ALLOWED_FLAG_GROUPS", "demo,test")).split(",") if g.strip()],
"default_user_groups": [g for g in (_env("DEFAULT_USER_GROUPS", "dev")).split(",") if g.strip()],
}
@classmethod
def _mailu_config(cls) -> dict[str, Any]:
mailu_domain = _env("MAILU_DOMAIN", "bstein.dev")
return {
"mailu_domain": mailu_domain,
"mailu_sync_url": _env(
"MAILU_SYNC_URL",
"http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events",
).rstrip("/"),
"mailu_event_min_interval_sec": _env_float("MAILU_EVENT_MIN_INTERVAL_SEC", 10.0),
"mailu_sync_wait_timeout_sec": _env_float("MAILU_SYNC_WAIT_TIMEOUT_SEC", 60.0),
"mailu_mailbox_wait_timeout_sec": _env_float("MAILU_MAILBOX_WAIT_TIMEOUT_SEC", 60.0),
"mailu_db_host": _env("MAILU_DB_HOST", "postgres-service.postgres.svc.cluster.local"),
"mailu_db_port": _env_int("MAILU_DB_PORT", 5432),
"mailu_db_name": _env("MAILU_DB_NAME", "mailu"),
"mailu_db_user": _env("MAILU_DB_USER", "mailu"),
"mailu_db_password": _env("MAILU_DB_PASSWORD", ""),
"mailu_host": _env("MAILU_HOST", f"mail.{mailu_domain}"),
"mailu_default_quota": _env_int("MAILU_DEFAULT_QUOTA", 20000000000),
"mailu_system_users": [u for u in _env("MAILU_SYSTEM_USERS", "").split(",") if u.strip()],
"mailu_system_password": _env("MAILU_SYSTEM_PASSWORD", ""),
}
@classmethod
def _smtp_config(cls, mailu_domain: str) -> dict[str, Any]:
return {
"smtp_host": _env("SMTP_HOST", ""),
"smtp_port": _env_int("SMTP_PORT", 25),
"smtp_username": _env("SMTP_USERNAME", ""),
"smtp_password": _env("SMTP_PASSWORD", ""),
"smtp_starttls": _env_bool("SMTP_STARTTLS", "false"),
"smtp_use_tls": _env_bool("SMTP_USE_TLS", "false"),
"smtp_from": _env("SMTP_FROM", f"postmaster@{mailu_domain}"),
"smtp_timeout_sec": _env_float("SMTP_TIMEOUT_SEC", 10.0),
"welcome_email_enabled": _env_bool("WELCOME_EMAIL_ENABLED", "true"),
}
@classmethod
def _nextcloud_config(cls) -> dict[str, Any]:
return {
"nextcloud_namespace": _env("NEXTCLOUD_NAMESPACE", "nextcloud"),
"nextcloud_pod_label": _env("NEXTCLOUD_POD_LABEL", "app=nextcloud"),
"nextcloud_container": _env("NEXTCLOUD_CONTAINER", "nextcloud"),
"nextcloud_exec_timeout_sec": _env_float("NEXTCLOUD_EXEC_TIMEOUT_SEC", 120.0),
"nextcloud_db_host": _env("NEXTCLOUD_DB_HOST", "postgres-service.postgres.svc.cluster.local"),
"nextcloud_db_port": _env_int("NEXTCLOUD_DB_PORT", 5432),
"nextcloud_db_name": _env("NEXTCLOUD_DB_NAME", "nextcloud"),
"nextcloud_db_user": _env("NEXTCLOUD_DB_USER", "nextcloud"),
"nextcloud_db_password": _env("NEXTCLOUD_DB_PASSWORD", ""),
"nextcloud_url": _env("NEXTCLOUD_URL", "https://cloud.bstein.dev").rstrip("/"),
"nextcloud_admin_user": _env("NEXTCLOUD_ADMIN_USER", ""),
"nextcloud_admin_password": _env("NEXTCLOUD_ADMIN_PASSWORD", ""),
}
@classmethod
def _wger_config(cls) -> dict[str, Any]:
return {
"wger_namespace": _env("WGER_NAMESPACE", "health"),
"wger_user_sync_wait_timeout_sec": _env_float("WGER_USER_SYNC_WAIT_TIMEOUT_SEC", 60.0),
"wger_pod_label": _env("WGER_POD_LABEL", "app=wger"),
"wger_container": _env("WGER_CONTAINER", "wger"),
"wger_admin_username": _env("WGER_ADMIN_USERNAME", ""),
"wger_admin_password": _env("WGER_ADMIN_PASSWORD", ""),
"wger_admin_email": _env("WGER_ADMIN_EMAIL", ""),
}
@classmethod
def _firefly_config(cls) -> dict[str, Any]:
return {
"firefly_namespace": _env("FIREFLY_NAMESPACE", "finance"),
"firefly_user_sync_wait_timeout_sec": _env_float("FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC", 90.0),
"firefly_pod_label": _env("FIREFLY_POD_LABEL", "app=firefly"),
"firefly_container": _env("FIREFLY_CONTAINER", "firefly"),
"firefly_cron_base_url": _env(
"FIREFLY_CRON_BASE_URL",
"http://firefly.finance.svc.cluster.local/api/v1/cron",
),
"firefly_cron_token": _env("FIREFLY_CRON_TOKEN", ""),
"firefly_cron_timeout_sec": _env_float("FIREFLY_CRON_TIMEOUT_SEC", 30.0),
}
@classmethod
def _vault_config(cls) -> dict[str, Any]:
return {
"vault_namespace": _env("VAULT_NAMESPACE", "vault"),
"vault_addr": _env("VAULT_ADDR", "http://vault.vault.svc.cluster.local:8200").rstrip("/"),
"vault_token": _env("VAULT_TOKEN", ""),
"vault_k8s_role": _env("VAULT_K8S_ROLE", "vault"),
"vault_k8s_role_ttl": _env("VAULT_K8S_ROLE_TTL", "1h"),
"vault_k8s_token_reviewer_jwt": _env("VAULT_K8S_TOKEN_REVIEWER_JWT", ""),
"vault_k8s_token_reviewer_jwt_file": _env("VAULT_K8S_TOKEN_REVIEWER_JWT_FILE", ""),
"vault_oidc_discovery_url": _env("VAULT_OIDC_DISCOVERY_URL", ""),
"vault_oidc_client_id": _env("VAULT_OIDC_CLIENT_ID", ""),
"vault_oidc_client_secret": _env("VAULT_OIDC_CLIENT_SECRET", ""),
"vault_oidc_default_role": _env("VAULT_OIDC_DEFAULT_ROLE", "admin"),
"vault_oidc_scopes": _env("VAULT_OIDC_SCOPES", "openid profile email groups"),
"vault_oidc_user_claim": _env("VAULT_OIDC_USER_CLAIM", "preferred_username"),
"vault_oidc_groups_claim": _env("VAULT_OIDC_GROUPS_CLAIM", "groups"),
"vault_oidc_token_policies": _env("VAULT_OIDC_TOKEN_POLICIES", ""),
"vault_oidc_admin_group": _env("VAULT_OIDC_ADMIN_GROUP", "admin"),
"vault_oidc_admin_policies": _env("VAULT_OIDC_ADMIN_POLICIES", "default,vault-admin"),
"vault_oidc_dev_group": _env("VAULT_OIDC_DEV_GROUP", "dev"),
"vault_oidc_dev_policies": _env("VAULT_OIDC_DEV_POLICIES", "default,dev-kv"),
"vault_oidc_user_group": _env("VAULT_OIDC_USER_GROUP", ""),
"vault_oidc_user_policies": _env("VAULT_OIDC_USER_POLICIES", ""),
"vault_oidc_redirect_uris": _env(
"VAULT_OIDC_REDIRECT_URIS",
"https://secret.bstein.dev/ui/vault/auth/oidc/oidc/callback",
),
"vault_oidc_bound_audiences": _env("VAULT_OIDC_BOUND_AUDIENCES", ""),
"vault_oidc_bound_claims_type": _env("VAULT_OIDC_BOUND_CLAIMS_TYPE", "string"),
}
@classmethod
def _comms_config(cls) -> dict[str, Any]:
return {
"comms_namespace": _env("COMMS_NAMESPACE", "comms"),
"comms_synapse_base": _env(
"COMMS_SYNAPSE_BASE",
"http://othrys-synapse-matrix-synapse:8008",
).rstrip("/"),
"comms_auth_base": _env(
"COMMS_AUTH_BASE",
"http://matrix-authentication-service:8080",
).rstrip("/"),
"comms_mas_admin_api_base": _env(
"COMMS_MAS_ADMIN_API_BASE",
"http://matrix-authentication-service:8081/api/admin/v1",
).rstrip("/"),
"comms_mas_token_url": _env(
"COMMS_MAS_TOKEN_URL",
"http://matrix-authentication-service:8080/oauth2/token",
),
"comms_mas_admin_client_id": _env("COMMS_MAS_ADMIN_CLIENT_ID", "01KDXMVQBQ5JNY6SEJPZW6Z8BM"),
"comms_mas_admin_client_secret": _env("COMMS_MAS_ADMIN_CLIENT_SECRET", ""),
"comms_server_name": _env("COMMS_SERVER_NAME", "live.bstein.dev"),
"comms_room_alias": _env("COMMS_ROOM_ALIAS", "#othrys:live.bstein.dev"),
"comms_room_name": _env("COMMS_ROOM_NAME", "Othrys"),
"comms_pin_message": _env(
"COMMS_PIN_MESSAGE",
"Invite guests: share https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join and choose 'Continue' -> 'Join as guest'.",
),
"comms_seeder_user": _env("COMMS_SEEDER_USER", "othrys-seeder"),
"comms_seeder_password": _env("COMMS_SEEDER_PASSWORD", ""),
"comms_bot_user": _env("COMMS_BOT_USER", "atlasbot"),
"comms_bot_password": _env("COMMS_BOT_PASSWORD", ""),
"comms_synapse_db_host": _env(
"COMMS_SYNAPSE_DB_HOST",
"postgres-service.postgres.svc.cluster.local",
),
"comms_synapse_db_port": _env_int("COMMS_SYNAPSE_DB_PORT", 5432),
"comms_synapse_db_name": _env("COMMS_SYNAPSE_DB_NAME", "synapse"),
"comms_synapse_db_user": _env("COMMS_SYNAPSE_DB_USER", "synapse"),
"comms_synapse_db_password": _env("COMMS_SYNAPSE_DB_PASSWORD", ""),
"comms_synapse_admin_token": _env("COMMS_SYNAPSE_ADMIN_TOKEN", ""),
"comms_timeout_sec": _env_float("COMMS_TIMEOUT_SEC", 30.0),
"comms_guest_stale_days": _env_int("COMMS_GUEST_STALE_DAYS", 14),
}
@classmethod
def _image_sweeper_config(cls) -> dict[str, Any]:
return {
"image_sweeper_namespace": _env("IMAGE_SWEEPER_NAMESPACE", "maintenance"),
"image_sweeper_service_account": _env("IMAGE_SWEEPER_SERVICE_ACCOUNT", "node-image-sweeper"),
"image_sweeper_job_ttl_sec": _env_int("IMAGE_SWEEPER_JOB_TTL_SEC", 3600),
"image_sweeper_wait_timeout_sec": _env_float("IMAGE_SWEEPER_WAIT_TIMEOUT_SEC", 1200.0),
}
@classmethod
def _platform_quality_probe_config(cls) -> dict[str, Any]:
return {
"platform_quality_probe_namespace": _env("PLATFORM_QUALITY_PROBE_NAMESPACE", "monitoring"),
"platform_quality_probe_script_configmap": _env(
"PLATFORM_QUALITY_PROBE_SCRIPT_CONFIGMAP",
"platform-quality-suite-probe-script",
),
"platform_quality_probe_image": _env("PLATFORM_QUALITY_PROBE_IMAGE", "curlimages/curl:8.12.1"),
"platform_quality_probe_job_ttl_sec": _env_int("PLATFORM_QUALITY_PROBE_JOB_TTL_SEC", 1800),
"platform_quality_probe_wait_timeout_sec": _env_float("PLATFORM_QUALITY_PROBE_WAIT_TIMEOUT_SEC", 180.0),
"platform_quality_probe_pushgateway_url": _env(
"PLATFORM_QUALITY_PROBE_PUSHGATEWAY_URL",
"http://platform-quality-gateway.monitoring.svc.cluster.local:9091",
).rstrip("/"),
"platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12),
}
@classmethod
def _jenkins_build_weather_config(cls) -> dict[str, Any]:
return {
"jenkins_base_url": _env("JENKINS_BASE_URL", "https://ci.bstein.dev").rstrip("/"),
"jenkins_api_user": _env("JENKINS_API_USER", ""),
"jenkins_api_token": _env("JENKINS_API_TOKEN", ""),
"jenkins_api_timeout_sec": _env_float("JENKINS_API_TIMEOUT_SEC", 10.0),
}
@classmethod
def _jenkins_workspace_cleanup_config(cls) -> dict[str, Any]:
return {
"jenkins_workspace_namespace": _env("JENKINS_WORKSPACE_NAMESPACE", "jenkins"),
"jenkins_workspace_pvc_prefix": _env("JENKINS_WORKSPACE_PVC_PREFIX", "pvc-workspace-"),
"jenkins_workspace_cleanup_min_age_hours": _env_float("JENKINS_WORKSPACE_CLEANUP_MIN_AGE_HOURS", 12.0),
"jenkins_workspace_cleanup_dry_run": _env_bool("JENKINS_WORKSPACE_CLEANUP_DRY_RUN", "false"),
"jenkins_workspace_cleanup_max_deletions_per_run": _env_int(
"JENKINS_WORKSPACE_CLEANUP_MAX_DELETIONS_PER_RUN",
20,
),
}
@classmethod
def _vaultwarden_config(cls) -> dict[str, Any]:
return {
"vaultwarden_namespace": _env("VAULTWARDEN_NAMESPACE", "vaultwarden"),
"vaultwarden_pod_label": _env("VAULTWARDEN_POD_LABEL", "app=vaultwarden"),
"vaultwarden_pod_port": _env_int("VAULTWARDEN_POD_PORT", 80),
"vaultwarden_service_host": _env(
"VAULTWARDEN_SERVICE_HOST",
"vaultwarden-service.vaultwarden.svc.cluster.local",
),
"vaultwarden_admin_secret_name": _env("VAULTWARDEN_ADMIN_SECRET_NAME", "vaultwarden-admin"),
"vaultwarden_admin_secret_key": _env("VAULTWARDEN_ADMIN_SECRET_KEY", "ADMIN_TOKEN"),
"vaultwarden_admin_session_ttl_sec": _env_float("VAULTWARDEN_ADMIN_SESSION_TTL_SEC", 300.0),
"vaultwarden_admin_rate_limit_backoff_sec": _env_float("VAULTWARDEN_ADMIN_RATE_LIMIT_BACKOFF_SEC", 600.0),
"vaultwarden_retry_cooldown_sec": _env_float("VAULTWARDEN_RETRY_COOLDOWN_SEC", 1800.0),
"vaultwarden_failure_bailout": _env_int("VAULTWARDEN_FAILURE_BAILOUT", 2),
"vaultwarden_invite_refresh_sec": _env_float("VAULTWARDEN_INVITE_REFRESH_SEC", 86400.0),
}
@classmethod
def _schedule_config(cls) -> dict[str, Any]:
return {
"mailu_sync_cron": _env("ARIADNE_SCHEDULE_MAILU_SYNC", "30 4 * * *"),
"nextcloud_sync_cron": _env("ARIADNE_SCHEDULE_NEXTCLOUD_SYNC", "0 5 * * *"),
"nextcloud_cron": _env("ARIADNE_SCHEDULE_NEXTCLOUD_CRON", "*/5 * * * *"),
"nextcloud_maintenance_cron": _env("ARIADNE_SCHEDULE_NEXTCLOUD_MAINTENANCE", "30 4 * * *"),
"vaultwarden_sync_cron": _env("ARIADNE_SCHEDULE_VAULTWARDEN_SYNC", "0 * * * *"),
"wger_user_sync_cron": _env("ARIADNE_SCHEDULE_WGER_USER_SYNC", "0 5 * * *"),
"wger_admin_cron": _env("ARIADNE_SCHEDULE_WGER_ADMIN", "15 3 * * *"),
"firefly_user_sync_cron": _env("ARIADNE_SCHEDULE_FIREFLY_USER_SYNC", "0 6 * * *"),
"firefly_cron": _env("ARIADNE_SCHEDULE_FIREFLY_CRON", "0 3 * * *"),
"pod_cleaner_cron": _env("ARIADNE_SCHEDULE_POD_CLEANER", "0 * * * *"),
"opensearch_prune_cron": _env("ARIADNE_SCHEDULE_OPENSEARCH_PRUNE", "23 3 * * *"),
"image_sweeper_cron": _env("ARIADNE_SCHEDULE_IMAGE_SWEEPER", "30 4 * * 0"),
"vault_k8s_auth_cron": _env("ARIADNE_SCHEDULE_VAULT_K8S_AUTH", "0 * * * *"),
"vault_oidc_cron": _env("ARIADNE_SCHEDULE_VAULT_OIDC", "0 * * * *"),
"comms_guest_name_cron": _env("ARIADNE_SCHEDULE_COMMS_GUEST_NAME", "*/5 * * * *"),
"comms_pin_invite_cron": _env("ARIADNE_SCHEDULE_COMMS_PIN_INVITE", "*/30 * * * *"),
"comms_reset_room_cron": _env("ARIADNE_SCHEDULE_COMMS_RESET_ROOM", "0 0 1 1 *"),
"comms_seed_room_cron": _env("ARIADNE_SCHEDULE_COMMS_SEED_ROOM", "*/10 * * * *"),
"keycloak_profile_cron": _env("ARIADNE_SCHEDULE_KEYCLOAK_PROFILE", "0 */6 * * *"),
"metis_k3s_token_sync_cron": _env("ARIADNE_SCHEDULE_METIS_K3S_TOKEN_SYNC", "11 */6 * * *"),
"platform_quality_suite_probe_cron": _env(
"ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE",
"*/15 * * * *",
),
"jenkins_build_weather_cron": _env(
"ARIADNE_SCHEDULE_JENKINS_BUILD_WEATHER",
"*/10 * * * *",
),
"jenkins_workspace_cleanup_cron": _env(
"ARIADNE_SCHEDULE_JENKINS_WORKSPACE_CLEANUP",
"45 */6 * * *",
),
}
@classmethod
def _cluster_state_config(cls) -> dict[str, Any]:
return {
"vm_url": _env(
"ARIADNE_VM_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/"),
"cluster_state_vm_timeout_sec": _env_float("ARIADNE_CLUSTER_STATE_VM_TIMEOUT_SEC", 5.0),
"alertmanager_url": _env("ARIADNE_ALERTMANAGER_URL", "").rstrip("/"),
"cluster_state_cron": _env("ARIADNE_SCHEDULE_CLUSTER_STATE", "*/15 * * * *"),
"cluster_state_keep": _env_int("ARIADNE_CLUSTER_STATE_KEEP", 168),
}
@classmethod
def _game_mode_config(cls) -> dict[str, Any]:
legacy_ollama = {
"kind": "Deployment",
"namespace": _env("GAME_MODE_OLLAMA_NAMESPACE", "openclaw"),
"name": _env("GAME_MODE_OLLAMA_DEPLOYMENT", "openclaw-ollama"),
"restoreReplicas": _env_int("GAME_MODE_OLLAMA_RESTORE_REPLICAS", 1),
}
return {
"game_mode_node_name": _env("GAME_MODE_NODE_NAME", "titan-24"),
"game_mode_displace_workloads": _env_json_list("GAME_MODE_DISPLACE_WORKLOADS", [legacy_ollama]),
"game_mode_ollama_namespace": legacy_ollama["namespace"],
"game_mode_ollama_deployment": legacy_ollama["name"],
"game_mode_ollama_restore_replicas": legacy_ollama["restoreReplicas"],
"game_mode_hook_token": _env("GAME_MODE_HOOK_TOKEN", ""),
"wolf_oidc_client_id": _env("WOLF_OIDC_CLIENT_ID", _env("SUNSHINE_OIDC_CLIENT_ID", "wolf")),
"wolf_oidc_base_url": _env(
"WOLF_OIDC_BASE_URL",
_env("SUNSHINE_OIDC_BASE_URL", "https://wolf.bstein.dev"),
).rstrip("/"),
"wolf_oidc_vault_path": _env("WOLF_OIDC_VAULT_PATH", _env("SUNSHINE_OIDC_VAULT_PATH", "game-stream/wolf-oidc")),
"wolf_oidc_cron": _env("ARIADNE_SCHEDULE_WOLF_OIDC", _env("ARIADNE_SCHEDULE_SUNSHINE_OIDC", "17 */6 * * *")),
"game_stream_user_group": _env("GAME_STREAM_USER_GROUP", "game-stream-users"),
"game_stream_admin_group": _env("GAME_STREAM_ADMIN_GROUP", "admin"),
"game_stream_profile_group_prefix": _env("GAME_STREAM_PROFILE_GROUP_PREFIX", "game-stream-profile-"),
}
@classmethod
def _metis_config(cls) -> dict[str, Any]:
return {
"metis_base_url": _env("METIS_BASE_URL", "http://metis.maintenance.svc.cluster.local").rstrip("/"),
"metis_watch_url": _env("METIS_WATCH_URL", "").rstrip("/"),
"metis_timeout_sec": _env_float("METIS_TIMEOUT_SEC", 10.0),
"metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"),
"metis_token_sync_namespace": _env("METIS_TOKEN_SYNC_NAMESPACE", "maintenance"),
"metis_token_sync_service_account": _env("METIS_TOKEN_SYNC_SERVICE_ACCOUNT", "metis-token-sync"),
"metis_token_sync_node_name": _env("METIS_TOKEN_SYNC_NODE_NAME", "titan-0a"),
"metis_token_sync_image": _env("METIS_TOKEN_SYNC_IMAGE", "hashicorp/vault:1.17.6"),
"metis_token_sync_job_ttl_sec": _env_int("METIS_TOKEN_SYNC_JOB_TTL_SEC", 1800),
"metis_token_sync_wait_timeout_sec": _env_float("METIS_TOKEN_SYNC_WAIT_TIMEOUT_SEC", 180.0),
"metis_token_sync_vault_addr": _env(
"METIS_TOKEN_SYNC_VAULT_ADDR",
"http://vault.vault.svc.cluster.local:8200",
).rstrip("/"),
"metis_token_sync_vault_k8s_role": _env("METIS_TOKEN_SYNC_VAULT_K8S_ROLE", "maintenance-metis-token-sync"),
}
@classmethod
def _opensearch_config(cls) -> dict[str, Any]:
return {
"opensearch_url": _env(
"OPENSEARCH_URL",
"http://opensearch-master.logging.svc.cluster.local:9200",
).rstrip("/"),
"opensearch_limit_bytes": _env_int("OPENSEARCH_LIMIT_BYTES", 1024**4),
"opensearch_index_patterns": _env("OPENSEARCH_INDEX_PATTERNS", "kube-*,journald-*"),
"opensearch_timeout_sec": _env_float("OPENSEARCH_TIMEOUT_SEC", 30.0),
}
@classmethod @classmethod
def from_env(cls) -> "Settings": def from_env(cls) -> "Settings":
keycloak_cfg = _keycloak_config() keycloak_cfg = cls._keycloak_config()
portal_cfg = _portal_group_config() portal_cfg = cls._portal_group_config()
mailu_cfg = _mailu_config() mailu_cfg = cls._mailu_config()
smtp_cfg = _smtp_config(mailu_cfg["mailu_domain"]) smtp_cfg = cls._smtp_config(mailu_cfg["mailu_domain"])
nextcloud_cfg = _nextcloud_config() nextcloud_cfg = cls._nextcloud_config()
wger_cfg = _wger_config() wger_cfg = cls._wger_config()
firefly_cfg = _firefly_config() firefly_cfg = cls._firefly_config()
vault_cfg = _vault_config() vault_cfg = cls._vault_config()
comms_cfg = _comms_config() comms_cfg = cls._comms_config()
image_cfg = _image_sweeper_config() image_cfg = cls._image_sweeper_config()
platform_quality_probe_cfg = _platform_quality_probe_config() platform_quality_probe_cfg = cls._platform_quality_probe_config()
jenkins_build_weather_cfg = _jenkins_build_weather_config() jenkins_build_weather_cfg = cls._jenkins_build_weather_config()
jenkins_workspace_cleanup_cfg = _jenkins_workspace_cleanup_config() jenkins_workspace_cleanup_cfg = cls._jenkins_workspace_cleanup_config()
testing_triage_cfg = _testing_triage_config() vaultwarden_cfg = cls._vaultwarden_config()
vaultwarden_cfg = _vaultwarden_config() schedule_cfg = cls._schedule_config()
schedule_cfg = _schedule_config() cluster_cfg = cls._cluster_state_config()
cluster_cfg = _cluster_state_config() game_mode_cfg = cls._game_mode_config()
metis_cfg = _metis_config() metis_cfg = cls._metis_config()
opensearch_cfg = _opensearch_config() opensearch_cfg = cls._opensearch_config()
portal_db = _env("PORTAL_DATABASE_URL", "") portal_db = _env("PORTAL_DATABASE_URL", "")
ariadne_db = _env("ARIADNE_DATABASE_URL", portal_db) ariadne_db = _env("ARIADNE_DATABASE_URL", portal_db)
@ -311,10 +705,10 @@ class Settings:
**platform_quality_probe_cfg, **platform_quality_probe_cfg,
**jenkins_build_weather_cfg, **jenkins_build_weather_cfg,
**jenkins_workspace_cleanup_cfg, **jenkins_workspace_cleanup_cfg,
**testing_triage_cfg,
**vaultwarden_cfg, **vaultwarden_cfg,
**schedule_cfg, **schedule_cfg,
**cluster_cfg, **cluster_cfg,
**game_mode_cfg,
**metis_cfg, **metis_cfg,
**opensearch_cfg, **opensearch_cfg,
) )

1094
tests/test_app.py Normal file

File diff suppressed because it is too large Load Diff

102
tests/test_game_mode.py Normal file
View File

@ -0,0 +1,102 @@
from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import game_mode as game_mode_module
from ariadne.services.game_mode import GameModeService
def _settings() -> SimpleNamespace:
return SimpleNamespace(
game_mode_node_name="titan-24",
game_mode_displace_workloads=[
{
"kind": "Deployment",
"namespace": "openclaw",
"name": "openclaw-ollama",
"restoreReplicas": 1,
}
],
)
def test_game_mode_start_and_stop_patch_scale(monkeypatch) -> None:
monkeypatch.setattr(game_mode_module, "settings", _settings())
calls: list[tuple[str, dict]] = []
replicas = {"desired": 1, "current": 1}
def fake_get_json(_path):
return {"spec": {"replicas": replicas["desired"]}, "status": {"replicas": replicas["current"]}}
def fake_patch_json(path, payload):
calls.append((path, payload))
replicas["desired"] = payload["spec"]["replicas"]
replicas["current"] = payload["spec"]["replicas"]
return {"ok": True}
monkeypatch.setattr(game_mode_module, "get_json", fake_get_json)
monkeypatch.setattr(game_mode_module, "patch_json", fake_patch_json)
monkeypatch.setattr(game_mode_module, "set_game_mode_state", lambda *args, **kwargs: None)
monkeypatch.setattr(game_mode_module, "set_game_mode_managed_replicas", lambda *args, **kwargs: None)
monkeypatch.setattr(game_mode_module, "record_game_mode_transition", lambda *args, **kwargs: None)
svc = GameModeService()
started = svc.start("Arc Raiders")
assert started["active"] is True
assert calls[-1][1] == {"spec": {"replicas": 0}}
stopped = svc.stop()
assert stopped["active"] is False
assert calls[-1][1] == {"spec": {"replicas": 1}}
def test_game_mode_status_reports_workload(monkeypatch) -> None:
monkeypatch.setattr(game_mode_module, "settings", _settings())
monkeypatch.setattr(
game_mode_module,
"get_json",
lambda _path: {"spec": {"replicas": 0}, "status": {"replicas": 0}},
)
monkeypatch.setattr(game_mode_module, "set_game_mode_state", lambda *args, **kwargs: None)
monkeypatch.setattr(game_mode_module, "set_game_mode_managed_replicas", lambda *args, **kwargs: None)
status = GameModeService().status()
assert status["status"] == "active"
assert status["workloads"][0]["name"] == "openclaw-ollama"
def test_game_mode_supports_statefulset_workload(monkeypatch) -> None:
monkeypatch.setattr(
game_mode_module,
"settings",
SimpleNamespace(
game_mode_node_name="titan-24",
game_mode_displace_workloads=[
{
"kind": "StatefulSet",
"namespace": "hermes",
"name": "hermes-llm",
"restoreReplicas": "2",
}
],
),
)
calls: list[str] = []
monkeypatch.setattr(
game_mode_module,
"get_json",
lambda _path: {"spec": {"replicas": 2}, "status": {"replicas": 2}},
)
def fake_patch_json(path, _payload):
calls.append(path)
return {"ok": True}
monkeypatch.setattr(game_mode_module, "patch_json", fake_patch_json)
monkeypatch.setattr(game_mode_module, "set_game_mode_state", lambda *args, **kwargs: None)
monkeypatch.setattr(game_mode_module, "set_game_mode_managed_replicas", lambda *args, **kwargs: None)
monkeypatch.setattr(game_mode_module, "record_game_mode_transition", lambda *args, **kwargs: None)
GameModeService().start("wolf")
assert calls == ["/apis/apps/v1/namespaces/hermes/statefulsets/hermes-llm/scale"]

View File

@ -0,0 +1,43 @@
from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import game_stream_profiles as profile_module
from ariadne.services.game_stream_profiles import GameStreamProfileService
def _settings() -> SimpleNamespace:
return SimpleNamespace(
game_stream_user_group="game-stream-users",
game_stream_admin_group="admin",
game_stream_profile_group_prefix="game-stream-profile-",
)
def test_profile_defaults_to_user_profile(monkeypatch) -> None:
monkeypatch.setattr(profile_module, "settings", _settings())
profile = GameStreamProfileService().profile_for("Brad Stein", ["game-stream-users"])
assert profile["allowed"] is True
assert profile["profile_id"] == "user-brad-stein"
assert profile["profile_group"] == ""
def test_profile_group_overrides_user_profile(monkeypatch) -> None:
monkeypatch.setattr(profile_module, "settings", _settings())
profile = GameStreamProfileService().profile_for("brad", ["/game-stream-profile-family"])
assert profile["allowed"] is True
assert profile["profile_id"] == "family"
assert profile["profile_group"] == "game-stream-profile-family"
def test_profile_denies_unlisted_user(monkeypatch) -> None:
monkeypatch.setattr(profile_module, "settings", _settings())
profile = GameStreamProfileService().profile_for("guest", ["other"])
assert profile["allowed"] is False
assert profile["profile_id"] == "user-guest"

View File

@ -70,6 +70,30 @@ def test_post_json_success(monkeypatch) -> None:
assert result == {"ok": True} assert result == {"ok": True}
def test_patch_json_success(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0)
monkeypatch.setattr(k8s_client, "settings", dummy_settings)
monkeypatch.setattr(k8s_client, "_read_service_account", lambda: ("token", "/tmp/ca"))
client = DummyClient()
monkeypatch.setattr(k8s_client.httpx, "Client", lambda *args, **kwargs: client)
result = k8s_client.patch_json("/api/test", {"spec": {"replicas": 0}})
assert result == {"ok": True}
assert client.calls[0][0] == "PATCH"
def test_patch_json_rejects_non_dict(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0)
monkeypatch.setattr(k8s_client, "settings", dummy_settings)
monkeypatch.setattr(k8s_client, "_read_service_account", lambda: ("token", "/tmp/ca"))
client = DummyClient()
client.payload = ["bad"]
monkeypatch.setattr(k8s_client.httpx, "Client", lambda *args, **kwargs: client)
with pytest.raises(RuntimeError):
k8s_client.patch_json("/api/test", {"spec": {"replicas": 0}})
def test_get_json_rejects_non_dict(monkeypatch) -> None: def test_get_json_rejects_non_dict(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0) dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0)
monkeypatch.setattr(k8s_client, "settings", dummy_settings) monkeypatch.setattr(k8s_client, "settings", dummy_settings)

View File

@ -0,0 +1,768 @@
from __future__ import annotations
from typing import Any
import types
import httpx
import pytest
from ariadne.services.keycloak_admin import KeycloakAdminClient
class DummyResponse:
def __init__(self, payload=None, status_code=200, headers=None):
self._payload = payload
self.status_code = status_code
self.headers = headers or {}
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
request = httpx.Request("GET", "https://example.com")
response = httpx.Response(self.status_code, request=request)
raise httpx.HTTPStatusError("error", request=request, response=response)
class DummyClient:
def __init__(self, responses):
self._responses = list(responses)
self.calls = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def _next(self):
if not self._responses:
raise RuntimeError("missing response")
return self._responses.pop(0)
def get(self, url, params=None, headers=None):
self.calls.append(("get", url, params))
return self._next()
def post(self, url, data=None, json=None, headers=None):
self.calls.append(("post", url, data, json))
return self._next()
def put(self, url, headers=None, json=None):
self.calls.append(("put", url, json))
return self._next()
def test_set_user_attribute_preserves_profile(monkeypatch) -> None:
client = KeycloakAdminClient()
captured: dict[str, Any] = {}
def fake_find_user(username: str) -> dict[str, Any]:
return {"id": "user-123"}
def fake_get_user(user_id: str) -> dict[str, Any]:
return {
"id": user_id,
"username": "alice",
"email": "alice@bstein.dev",
"emailVerified": True,
"enabled": True,
"firstName": "Alice",
"lastName": "Smith",
"requiredActions": ["UPDATE_PASSWORD", 123],
"attributes": {"existing": ["value"]},
}
def fake_update_user(user_id: str, payload: dict[str, Any]) -> None:
captured["user_id"] = user_id
captured["payload"] = payload
monkeypatch.setattr(client, "find_user", fake_find_user)
monkeypatch.setattr(client, "get_user", fake_get_user)
monkeypatch.setattr(client, "update_user", fake_update_user)
client.set_user_attribute("alice", "mailu_app_password", "secret")
payload = captured.get("payload") or {}
assert payload.get("username") == "alice"
assert payload.get("email") == "alice@bstein.dev"
assert payload.get("emailVerified") is True
assert payload.get("enabled") is True
assert payload.get("firstName") == "Alice"
assert payload.get("lastName") == "Smith"
assert payload.get("requiredActions") == ["UPDATE_PASSWORD"]
assert payload.get("attributes") == {
"existing": ["value"],
"mailu_app_password": ["secret"],
}
def test_update_user_safe_merges_payload(monkeypatch) -> None:
client = KeycloakAdminClient()
captured: dict[str, Any] = {}
def fake_get_user(user_id: str) -> dict[str, Any]:
return {
"id": user_id,
"username": "alice",
"enabled": True,
"attributes": {"existing": ["value"]},
}
def fake_update_user(user_id: str, payload: dict[str, Any]) -> None:
captured["user_id"] = user_id
captured["payload"] = payload
monkeypatch.setattr(client, "get_user", fake_get_user)
monkeypatch.setattr(client, "update_user", fake_update_user)
client.update_user_safe(
"user-123",
{"attributes": {"new": ["item"]}, "requiredActions": ["UPDATE_PASSWORD"]},
)
payload = captured.get("payload") or {}
assert payload.get("username") == "alice"
assert payload.get("attributes") == {"existing": ["value"], "new": ["item"]}
assert payload.get("requiredActions") == ["UPDATE_PASSWORD"]
def test_get_token_fetches_once(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
dummy = DummyClient([DummyResponse({"access_token": "token", "expires_in": 120})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client._get_token() == "token"
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("should not call")))
assert client._get_token() == "token"
def test_find_user_by_email_case_insensitive(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([{"email": "Alice@Example.com", "id": "1"}])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
user = client.find_user_by_email("alice@example.com")
assert user["id"] == "1"
def test_find_user_invalid_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse(["bad"])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_user("alice") is None
def test_find_user_by_email_empty() -> None:
client = KeycloakAdminClient()
assert client.find_user_by_email("") is None
def test_find_user_by_email_invalid_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({"bad": "payload"})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_user_by_email("alice@example.com") is None
def test_list_group_names_filters(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([{"name": "demo"}, {"name": "admin"}, {"name": "test"}])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.list_group_names(exclude={"admin"}) == ["demo", "test"]
def test_find_user_by_email_skips_non_dict(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse(["bad"])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_user_by_email("alice@example.com") is None
def test_get_user_invalid_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse("bad")])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client.get_user("user-1")
def test_update_user_calls_put(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
client.update_user("user-1", {"enabled": True})
assert dummy.calls
def test_update_user_safe_handles_bad_attrs(monkeypatch) -> None:
client = KeycloakAdminClient()
captured: dict[str, Any] = {}
def fake_get_user(user_id: str) -> dict[str, Any]:
return {"id": user_id, "username": "alice", "attributes": "bad"}
def fake_update_user(user_id: str, payload: dict[str, Any]) -> None:
captured["payload"] = payload
monkeypatch.setattr(client, "get_user", fake_get_user)
monkeypatch.setattr(client, "update_user", fake_update_user)
client.update_user_safe("user-1", {"attributes": {"new": ["item"]}})
assert captured["payload"]["attributes"] == {"new": ["item"]}
def test_set_user_attribute_user_id_missing(monkeypatch) -> None:
client = KeycloakAdminClient()
def fake_find_user(username: str) -> dict[str, Any]:
return {"id": ""}
monkeypatch.setattr(client, "find_user", fake_find_user)
with pytest.raises(RuntimeError):
client.set_user_attribute("alice", "attr", "val")
def test_set_user_attribute_handles_bad_attrs(monkeypatch) -> None:
client = KeycloakAdminClient()
def fake_find_user(username: str) -> dict[str, Any]:
return {"id": "user-1"}
def fake_get_user(user_id: str) -> dict[str, Any]:
return {"id": user_id, "username": "alice", "attributes": "bad"}
monkeypatch.setattr(client, "find_user", fake_find_user)
monkeypatch.setattr(client, "get_user", fake_get_user)
monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None)
client.set_user_attribute("alice", "attr", "val")
def test_get_group_id_skips_non_dict(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse(["bad"])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.get_group_id("demo") is None
def test_get_group_id_cached(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([{"name": "demo", "id": "gid"}])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.get_group_id("demo") == "gid"
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("no call")))
assert client.get_group_id("demo") == "gid"
def test_get_group_id_invalid_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({"bad": "payload"})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.get_group_id("demo") is None
def test_iter_users_paginates(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient(
[
DummyResponse([{"id": "1"}, {"id": "2"}]),
DummyResponse([{"id": "3"}]),
]
)
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
users = client.iter_users(page_size=2, brief=True)
assert [u["id"] for u in users] == ["1", "2", "3"]
def test_iter_users_empty(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.iter_users(page_size=2) == []
def test_create_user_parses_location(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({}, headers={"Location": "http://kc/admin/realms/atlas/users/abc"})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.create_user({"username": "alice"}) == "abc"
def test_create_user_missing_location(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({}, headers={})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client.create_user({"username": "alice"})
def test_find_client(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([{"id": "abc", "clientId": "sunshine"}])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_client("sunshine") == {"id": "abc", "clientId": "sunshine"}
def test_find_client_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_client("sunshine") is None
def test_client_crud_helpers(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient(
[
DummyResponse({}),
DummyResponse({}),
DummyResponse({"value": "secret"}),
DummyResponse([{"name": "groups", "id": "scope-id"}]),
DummyResponse({}),
]
)
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
client.create_client({"clientId": "sunshine"})
client.update_client("uuid", {"clientId": "sunshine"})
assert client.get_client_secret("uuid") == "secret"
assert client.find_client_scope_id("groups") == "scope-id"
client.attach_optional_client_scope("uuid", "scope-id")
def test_get_client_secret_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client.get_client_secret("uuid")
def test_get_token_missing_access_token(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
dummy = DummyClient([DummyResponse({"expires_in": 120})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client._get_token()
def test_reset_password_raises_on_error(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({}, status_code=400)])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(httpx.HTTPStatusError):
client.reset_password("user", "pw", temporary=True)
def test_get_token_requires_config(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="",
keycloak_admin_client_secret="",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
with pytest.raises(RuntimeError):
client._get_token()
def test_headers_includes_bearer(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "_get_token", lambda: "token")
headers = client.headers()
assert headers["Authorization"] == "Bearer token"
def test_find_user_returns_none(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse([])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
assert client.find_user("alice") is None
def test_get_user_invalid_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse(["bad"])])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client.get_user("id")
def test_get_user_success(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({"id": "1"})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
user = client.get_user("id")
assert user["id"] == "1"
def test_set_user_attribute_user_missing(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "find_user", lambda username: None)
with pytest.raises(RuntimeError):
client.set_user_attribute("alice", "attr", "value")
def test_set_user_attribute_user_id_missing(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "find_user", lambda username: {})
with pytest.raises(RuntimeError):
client.set_user_attribute("alice", "attr", "value")
def test_add_user_to_group(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse({})])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
client.add_user_to_group("user", "group")
assert dummy.calls[0][0] == "put"
def test_get_user_raises_on_non_dict_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse("bad")])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError):
client.get_user("user-1")
def test_update_user_safe_coerces_bad_attrs(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "get_user", lambda *_args, **_kwargs: {"id": "user-1"})
monkeypatch.setattr(client, "_safe_update_payload", lambda *_args, **_kwargs: {"attributes": "bad"})
monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None)
client.update_user_safe("user-1", {"attributes": {"new": ["item"]}})
def test_set_user_attribute_coerces_bad_attrs(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "find_user", lambda username: {"id": "user-1"})
monkeypatch.setattr(client, "get_user", lambda *_args, **_kwargs: {"id": "user-1"})
monkeypatch.setattr(client, "_safe_update_payload", lambda *_args, **_kwargs: {"attributes": "bad"})
monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None)
client.set_user_attribute("alice", "attr", "value")
def test_set_user_attribute_user_id_missing_raises(monkeypatch) -> None:
client = KeycloakAdminClient()
monkeypatch.setattr(client, "find_user", lambda username: {"id": ""})
with pytest.raises(RuntimeError):
client.set_user_attribute("alice", "attr", "value")
def test_get_user_rejects_non_dict_payload(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
keycloak_admin_client_id="client",
keycloak_admin_client_secret="secret",
keycloak_realm="atlas",
)
monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings)
client = KeycloakAdminClient()
client._token = "token"
client._expires_at = 9999999999
dummy = DummyClient([DummyResponse(123)])
monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy)
with pytest.raises(RuntimeError) as exc:
client.get_user("user-1")
assert "unexpected user payload" in str(exc.value)

View File

@ -0,0 +1,95 @@
from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import oauth2_proxy as oauth_module
from ariadne.services.oauth2_proxy import OAuth2ProxyService, _oauth_client_payload, _valid_cookie_secret
def test_oauth_client_payload() -> None:
payload = _oauth_client_payload("wolf", "https://wolf.bstein.dev")
assert payload["clientId"] == "wolf"
assert payload["redirectUris"] == ["https://wolf.bstein.dev/oauth2/callback"]
assert payload["webOrigins"] == ["https://wolf.bstein.dev"]
def test_valid_cookie_secret() -> None:
assert _valid_cookie_secret("x" * 32) == "x" * 32
assert _valid_cookie_secret("short") == ""
assert _valid_cookie_secret(None) == ""
def test_ensure_wolf_creates_client_and_writes_vault(monkeypatch) -> None:
monkeypatch.setattr(
oauth_module,
"settings",
SimpleNamespace(
wolf_oidc_client_id="wolf",
wolf_oidc_base_url="https://wolf.bstein.dev",
wolf_oidc_vault_path="game-stream/wolf-oidc",
),
)
calls: list[str] = []
written = {}
class DummyKeycloak:
def __init__(self) -> None:
self.created = False
def ready(self):
return True
def find_client(self, client_id):
if not self.created:
return None
return {"id": "client-uuid", "clientId": client_id}
def create_client(self, _payload):
self.created = True
calls.append("create")
def update_client(self, _client_uuid, _payload):
calls.append("update")
def find_client_scope_id(self, name):
assert name == "groups"
return "scope-uuid"
def attach_optional_client_scope(self, _client_uuid, _scope_id):
calls.append("scope")
def get_client_secret(self, _client_uuid):
return "client-secret"
class DummyVault:
def read_kv_secret(self, path):
assert path == "game-stream/wolf-oidc"
return {"cookie_secret": "a" * 32}
def write_kv_secret(self, path, data):
written["path"] = path
written["data"] = data
monkeypatch.setattr(oauth_module, "keycloak_admin", DummyKeycloak())
monkeypatch.setattr(oauth_module, "vault", DummyVault())
result = OAuth2ProxyService().ensure_wolf()
assert result["status"] == "ok"
assert calls == ["create", "update", "scope"]
assert written["data"]["client_secret"] == "client-secret"
assert written["data"]["cookie_secret"] == "a" * 32
def test_ensure_wolf_reports_missing_keycloak(monkeypatch) -> None:
monkeypatch.setattr(
oauth_module,
"settings",
SimpleNamespace(
wolf_oidc_client_id="wolf",
wolf_oidc_base_url="https://wolf.bstein.dev",
wolf_oidc_vault_path="game-stream/wolf-oidc",
),
)
monkeypatch.setattr(oauth_module, "keycloak_admin", SimpleNamespace(ready=lambda: False))
assert OAuth2ProxyService().ensure_wolf()["status"] == "error"

View File

@ -186,3 +186,52 @@ def test_vault_ensure_token_login(monkeypatch) -> None:
svc = VaultService() svc = VaultService()
assert svc._ensure_token() == "tok" assert svc._ensure_token() == "tok"
def test_vault_read_write_kv(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="token",
vault_k8s_role="vault",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
calls: list[tuple[str, str, dict | None]] = []
def fake_request(self, method: str, path: str, json=None):
calls.append((method, path, json))
if method == "GET":
return DummyResponse({"data": {"data": {"client_id": "sunshine"}}})
return DummyResponse({})
monkeypatch.setattr(vault_module.VaultClient, "request", fake_request)
svc = VaultService()
assert svc.read_kv_secret("game-stream/sunshine-oidc") == {"client_id": "sunshine"}
svc.write_kv_secret("game-stream/sunshine-oidc", {"client_id": "sunshine"})
assert calls[-1] == (
"POST",
"/v1/kv/data/atlas/game-stream/sunshine-oidc",
{"data": {"client_id": "sunshine"}},
)
def test_vault_read_kv_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="token",
vault_k8s_role="vault",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
monkeypatch.setattr(
vault_module.VaultClient,
"request",
lambda *args, **kwargs: DummyResponse({}, status_code=404),
)
assert VaultService().read_kv_secret("missing") is None