bstein-dev-home/backend/atlas_portal/nextcloud_mail_sync.py

124 lines
4.4 KiB
Python
Raw Permalink Normal View History

2026-01-03 12:18:46 -03:00
from __future__ import annotations
import re
import time
from typing import Any
from . import settings
from .k8s import get_json, post_json
def _safe_name_fragment(value: str, max_len: int = 24) -> str:
cleaned = re.sub(r"[^a-z0-9-]+", "-", (value or "").lower()).strip("-")
if not cleaned:
cleaned = "user"
return cleaned[:max_len].rstrip("-") or "user"
def _job_from_cronjob(cronjob: dict[str, Any], username: str) -> dict[str, Any]:
spec = cronjob.get("spec") if isinstance(cronjob.get("spec"), dict) else {}
jt = spec.get("jobTemplate") if isinstance(spec.get("jobTemplate"), dict) else {}
job_spec = jt.get("spec") if isinstance(jt.get("spec"), dict) else {}
now = int(time.time())
safe_user = _safe_name_fragment(username)
job_name = f"nextcloud-mail-sync-{safe_user}-{now}"
job: dict[str, Any] = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": settings.NEXTCLOUD_NAMESPACE,
"labels": {
"app": "nextcloud-mail-sync",
"atlas.bstein.dev/trigger": "portal",
"atlas.bstein.dev/username": safe_user,
},
},
"spec": job_spec,
}
if isinstance(settings.NEXTCLOUD_MAIL_SYNC_JOB_TTL_SEC, int) and settings.NEXTCLOUD_MAIL_SYNC_JOB_TTL_SEC > 0:
job.setdefault("spec", {})
job["spec"]["ttlSecondsAfterFinished"] = int(settings.NEXTCLOUD_MAIL_SYNC_JOB_TTL_SEC)
tpl = job.get("spec", {}).get("template", {})
pod_spec = tpl.get("spec") if isinstance(tpl.get("spec"), dict) else {}
containers = pod_spec.get("containers") if isinstance(pod_spec.get("containers"), list) else []
if containers and isinstance(containers[0], dict):
env = containers[0].get("env")
if not isinstance(env, list):
env = []
env = [e for e in env if not (isinstance(e, dict) and e.get("name") == "ONLY_USERNAME")]
env.append({"name": "ONLY_USERNAME", "value": username})
containers[0]["env"] = env
pod_spec["containers"] = containers
tpl["spec"] = pod_spec
job["spec"]["template"] = tpl
return job
def _job_succeeded(job: dict[str, Any]) -> bool:
status = job.get("status") if isinstance(job.get("status"), dict) else {}
if int(status.get("succeeded") or 0) > 0:
return True
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
for cond in conditions:
if not isinstance(cond, dict):
continue
if cond.get("type") == "Complete" and cond.get("status") == "True":
return True
return False
def _job_failed(job: dict[str, Any]) -> bool:
status = job.get("status") if isinstance(job.get("status"), dict) else {}
if int(status.get("failed") or 0) > 0:
return True
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
for cond in conditions:
if not isinstance(cond, dict):
continue
if cond.get("type") == "Failed" and cond.get("status") == "True":
return True
return False
def trigger(username: str, wait: bool = True) -> dict[str, Any]:
username = (username or "").strip()
if not username:
raise RuntimeError("missing username")
cronjob = get_json(
f"/apis/batch/v1/namespaces/{settings.NEXTCLOUD_NAMESPACE}/cronjobs/{settings.NEXTCLOUD_MAIL_SYNC_CRONJOB}"
)
job_payload = _job_from_cronjob(cronjob, username)
created = post_json(f"/apis/batch/v1/namespaces/{settings.NEXTCLOUD_NAMESPACE}/jobs", job_payload)
job_name = (
created.get("metadata", {}).get("name")
if isinstance(created.get("metadata"), dict)
else job_payload.get("metadata", {}).get("name")
)
if not isinstance(job_name, str) or not job_name:
raise RuntimeError("job name missing")
if not wait:
return {"job": job_name, "status": "queued"}
deadline = time.time() + float(settings.NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC)
last_state = "running"
while time.time() < deadline:
job = get_json(f"/apis/batch/v1/namespaces/{settings.NEXTCLOUD_NAMESPACE}/jobs/{job_name}")
if _job_succeeded(job):
return {"job": job_name, "status": "ok"}
if _job_failed(job):
return {"job": job_name, "status": "error"}
time.sleep(2)
last_state = "running"
return {"job": job_name, "status": last_state}