59 lines
2.0 KiB
Python

from __future__ import annotations
from typing import Any
from ..k8s.jobs import JobSpawner
from ..k8s.manifests import load_cronjob_manifest
from ..settings import settings
class WgerService:
def __init__(self) -> None:
self._user_spawner = JobSpawner(
settings.wger_namespace,
settings.wger_user_sync_cronjob,
load_cronjob_manifest("health/wger-user-sync-cronjob.yaml"),
)
self._admin_spawner = JobSpawner(
settings.wger_namespace,
settings.wger_admin_cronjob,
load_cronjob_manifest("health/wger-admin-ensure-cronjob.yaml"),
)
def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
username = (username or "").strip()
if not username:
raise RuntimeError("missing username")
if not password:
raise RuntimeError("missing password")
if not settings.wger_namespace or not settings.wger_user_sync_cronjob:
raise RuntimeError("wger sync not configured")
env_overrides = [
{"name": "WGER_USERNAME", "value": username},
{"name": "WGER_EMAIL", "value": email},
{"name": "WGER_PASSWORD", "value": password},
]
if wait:
return self._user_spawner.trigger_and_wait(
username,
env_overrides,
settings.wger_user_sync_wait_timeout_sec,
)
return self._user_spawner.trigger(username, env_overrides)
def ensure_admin(self, wait: bool = False) -> dict[str, Any]:
if not settings.wger_namespace or not settings.wger_admin_cronjob:
raise RuntimeError("wger admin sync not configured")
if wait:
return self._admin_spawner.trigger_and_wait(
"admin",
None,
settings.wger_user_sync_wait_timeout_sec,
)
return self._admin_spawner.trigger("admin", None)
wger = WgerService()