138 lines
4.7 KiB
Python
138 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
import time
|
|
from typing import Any
|
|
|
|
from . import settings
|
|
from .k8s import get_json, post_json
|
|
|
|
|
|
def _safe_name_fragment(value: str, max_len: int = 24) -> str:
|
|
cleaned = re.sub(r"[^a-z0-9-]+", "-", (value or "").lower()).strip("-")
|
|
if not cleaned:
|
|
cleaned = "user"
|
|
return cleaned[:max_len].rstrip("-") or "user"
|
|
|
|
|
|
def _job_from_cronjob(
|
|
cronjob: dict[str, Any],
|
|
username: str,
|
|
email: str,
|
|
password: str,
|
|
) -> dict[str, Any]:
|
|
spec = cronjob.get("spec") if isinstance(cronjob.get("spec"), dict) else {}
|
|
jt = spec.get("jobTemplate") if isinstance(spec.get("jobTemplate"), dict) else {}
|
|
job_spec = jt.get("spec") if isinstance(jt.get("spec"), dict) else {}
|
|
|
|
now = int(time.time())
|
|
safe_user = _safe_name_fragment(username)
|
|
job_name = f"wger-user-sync-{safe_user}-{now}"
|
|
|
|
job: dict[str, Any] = {
|
|
"apiVersion": "batch/v1",
|
|
"kind": "Job",
|
|
"metadata": {
|
|
"name": job_name,
|
|
"namespace": settings.WGER_NAMESPACE,
|
|
"labels": {
|
|
"app": "wger-user-sync",
|
|
"atlas.bstein.dev/trigger": "portal",
|
|
"atlas.bstein.dev/username": safe_user,
|
|
},
|
|
},
|
|
"spec": job_spec,
|
|
}
|
|
|
|
tpl = job.get("spec", {}).get("template", {})
|
|
pod_spec = tpl.get("spec") if isinstance(tpl.get("spec"), dict) else {}
|
|
containers = pod_spec.get("containers") if isinstance(pod_spec.get("containers"), list) else []
|
|
if containers and isinstance(containers[0], dict):
|
|
env = containers[0].get("env")
|
|
if not isinstance(env, list):
|
|
env = []
|
|
env = [
|
|
e
|
|
for e in env
|
|
if not (
|
|
isinstance(e, dict)
|
|
and e.get("name") in {"WGER_USERNAME", "WGER_EMAIL", "WGER_PASSWORD"}
|
|
)
|
|
]
|
|
env.append({"name": "WGER_USERNAME", "value": username})
|
|
env.append({"name": "WGER_EMAIL", "value": email})
|
|
env.append({"name": "WGER_PASSWORD", "value": password})
|
|
containers[0]["env"] = env
|
|
pod_spec["containers"] = containers
|
|
tpl["spec"] = pod_spec
|
|
job["spec"]["template"] = tpl
|
|
|
|
return job
|
|
|
|
|
|
def _job_succeeded(job: dict[str, Any]) -> bool:
|
|
status = job.get("status") if isinstance(job.get("status"), dict) else {}
|
|
if int(status.get("succeeded") or 0) > 0:
|
|
return True
|
|
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
|
|
for cond in conditions:
|
|
if not isinstance(cond, dict):
|
|
continue
|
|
if cond.get("type") == "Complete" and cond.get("status") == "True":
|
|
return True
|
|
return False
|
|
|
|
|
|
def _job_failed(job: dict[str, Any]) -> bool:
|
|
status = job.get("status") if isinstance(job.get("status"), dict) else {}
|
|
if int(status.get("failed") or 0) > 0:
|
|
return True
|
|
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
|
|
for cond in conditions:
|
|
if not isinstance(cond, dict):
|
|
continue
|
|
if cond.get("type") == "Failed" and cond.get("status") == "True":
|
|
return True
|
|
return False
|
|
|
|
|
|
def trigger(username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
|
|
username = (username or "").strip()
|
|
if not username:
|
|
raise RuntimeError("missing username")
|
|
if not password:
|
|
raise RuntimeError("missing password")
|
|
|
|
namespace = settings.WGER_NAMESPACE
|
|
cronjob_name = settings.WGER_USER_SYNC_CRONJOB
|
|
if not namespace or not cronjob_name:
|
|
raise RuntimeError("wger sync not configured")
|
|
|
|
cronjob = get_json(f"/apis/batch/v1/namespaces/{namespace}/cronjobs/{cronjob_name}")
|
|
job_payload = _job_from_cronjob(cronjob, username, email, password)
|
|
created = post_json(f"/apis/batch/v1/namespaces/{namespace}/jobs", job_payload)
|
|
|
|
job_name = (
|
|
created.get("metadata", {}).get("name")
|
|
if isinstance(created.get("metadata"), dict)
|
|
else job_payload.get("metadata", {}).get("name")
|
|
)
|
|
if not isinstance(job_name, str) or not job_name:
|
|
raise RuntimeError("job name missing")
|
|
|
|
if not wait:
|
|
return {"job": job_name, "status": "queued"}
|
|
|
|
deadline = time.time() + float(settings.WGER_USER_SYNC_WAIT_TIMEOUT_SEC)
|
|
last_state = "running"
|
|
while time.time() < deadline:
|
|
job = get_json(f"/apis/batch/v1/namespaces/{namespace}/jobs/{job_name}")
|
|
if _job_succeeded(job):
|
|
return {"job": job_name, "status": "ok"}
|
|
if _job_failed(job):
|
|
return {"job": job_name, "status": "error"}
|
|
time.sleep(2)
|
|
last_state = "running"
|
|
|
|
return {"job": job_name, "status": last_state}
|