diff --git a/scripts/mailu_sync_listener.py b/scripts/mailu_sync_listener.py new file mode 100644 index 0000000..27070c0 --- /dev/null +++ b/scripts/mailu_sync_listener.py @@ -0,0 +1,93 @@ +import http.server +import json +import subprocess +import threading + +from time import time + +# Simple debounce to avoid hammering on bursts +MIN_INTERVAL_SECONDS = 10 +last_run = 0.0 +lock = threading.Lock() +sync_done = threading.Event() +sync_done.set() +sync_running = False + + +def _run_sync_blocking() -> int: + global last_run, sync_running + with lock: + if sync_running: + return 0 + sync_running = True + sync_done.clear() + + try: + print("mailu-sync-listener: starting sync", flush=True) + proc = subprocess.run(["python", "/app/sync.py"], check=False) + rc = int(proc.returncode) + print(f"mailu-sync-listener: sync completed rc={rc}", flush=True) + return rc + finally: + with lock: + sync_running = False + last_run = time() + sync_done.set() + + +def _trigger_sync_async() -> bool: + with lock: + now = time() + if sync_running: + return False + if now - last_run < MIN_INTERVAL_SECONDS: + return False + + thread = threading.Thread(target=_run_sync_blocking, daemon=True) + thread.start() + return True + + +class Handler(http.server.BaseHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) if length else b"" + try: + payload = json.loads(body or b"{}") + except json.JSONDecodeError: + self.send_response(400) + self.end_headers() + return + + wait = False + if isinstance(payload, dict): + wait = bool(payload.get("wait")) + + if wait: + with lock: + already_running = sync_running + if already_running: + sync_done.wait(timeout=120) + with lock: + still_running = sync_running + self.send_response(200 if not still_running else 503) + self.end_headers() + return + + rc = _run_sync_blocking() + self.send_response(200 if rc == 0 else 500) + self.end_headers() + return + + _trigger_sync_async() + self.send_response(202) + self.end_headers() + + def log_message(self, fmt, *args): + # Quiet logging + return + + +if __name__ == "__main__": + server = http.server.ThreadingHTTPServer(("", 8080), Handler) + server.serve_forever() diff --git a/services/mailu/kustomization.yaml b/services/mailu/kustomization.yaml index a23e0b1..9e9359b 100644 --- a/services/mailu/kustomization.yaml +++ b/services/mailu/kustomization.yaml @@ -22,3 +22,7 @@ configMapGenerator: - sync.py=../../scripts/mailu_sync.py options: disableNameSuffixHash: true + - name: mailu-sync-listener + namespace: mailu-mailserver + files: + - listener.py=../../scripts/mailu_sync_listener.py diff --git a/services/mailu/mailu-sync-listener.yaml b/services/mailu/mailu-sync-listener.yaml index 4d70716..2127313 100644 --- a/services/mailu/mailu-sync-listener.yaml +++ b/services/mailu/mailu-sync-listener.yaml @@ -100,102 +100,3 @@ spec: configMap: name: mailu-sync-listener defaultMode: 0444 ---- -apiVersion: v1 -kind: ConfigMap -metadata: - name: mailu-sync-listener - namespace: mailu-mailserver -data: - listener.py: | - import http.server - import json - import os - import subprocess - import threading - - from time import time - - # Simple debounce to avoid hammering on bursts - MIN_INTERVAL_SECONDS = 10 - last_run = 0.0 - lock = threading.Lock() - sync_done = threading.Event() - sync_done.set() - sync_running = False - - def _run_sync_blocking() -> int: - global last_run, sync_running - with lock: - if sync_running: - return 0 - sync_running = True - sync_done.clear() - - try: - print("mailu-sync-listener: starting sync", flush=True) - proc = subprocess.run(["python", "/app/sync.py"], check=False) - rc = int(proc.returncode) - print(f"mailu-sync-listener: sync completed rc={rc}", flush=True) - return rc - finally: - with lock: - sync_running = False - last_run = time() - sync_done.set() - - def _trigger_sync_async() -> bool: - with lock: - now = time() - if sync_running: - return False - if now - last_run < MIN_INTERVAL_SECONDS: - return False - - thread = threading.Thread(target=_run_sync_blocking, daemon=True) - thread.start() - return True - - class Handler(http.server.BaseHTTPRequestHandler): - def do_POST(self): - length = int(self.headers.get("Content-Length", 0)) - body = self.rfile.read(length) if length else b"" - try: - payload = json.loads(body or b"{}") - except json.JSONDecodeError: - self.send_response(400) - self.end_headers() - return - - wait = False - if isinstance(payload, dict): - wait = bool(payload.get("wait")) - - if wait: - # If a sync is already running, wait for it to complete. - with lock: - already_running = sync_running - if already_running: - sync_done.wait(timeout=120) - with lock: - still_running = sync_running - self.send_response(200 if not still_running else 503) - self.end_headers() - return - - rc = _run_sync_blocking() - self.send_response(200 if rc == 0 else 500) - self.end_headers() - return - - _trigger_sync_async() - self.send_response(202) - self.end_headers() - - def log_message(self, fmt, *args): - # Quiet logging - return - - if __name__ == "__main__": - server = http.server.ThreadingHTTPServer(("", 8080), Handler) - server.serve_forever()