mailu: add wait-mode sync endpoint
Also bump portal timeouts and relax access request rate limits.
This commit is contained in:
parent
9daf8b345a
commit
b95eab5876
@ -70,6 +70,16 @@ spec:
|
||||
secretKeyRef:
|
||||
name: atlas-portal-db
|
||||
key: PORTAL_DATABASE_URL
|
||||
- name: HTTP_CHECK_TIMEOUT_SEC
|
||||
value: "10"
|
||||
- name: ACCESS_REQUEST_SUBMIT_RATE_LIMIT
|
||||
value: "30"
|
||||
- name: ACCESS_REQUEST_SUBMIT_RATE_WINDOW_SEC
|
||||
value: "3600"
|
||||
- name: ACCESS_REQUEST_STATUS_RATE_LIMIT
|
||||
value: "120"
|
||||
- name: ACCESS_REQUEST_STATUS_RATE_WINDOW_SEC
|
||||
value: "60"
|
||||
ports:
|
||||
- name: http
|
||||
containerPort: 8080
|
||||
|
||||
@ -120,28 +120,75 @@ data:
|
||||
MIN_INTERVAL_SECONDS = 10
|
||||
last_run = 0.0
|
||||
lock = threading.Lock()
|
||||
sync_done = threading.Event()
|
||||
sync_done.set()
|
||||
sync_running = False
|
||||
|
||||
def trigger_sync():
|
||||
global last_run
|
||||
def _run_sync_blocking() -> int:
|
||||
global last_run, sync_running
|
||||
with lock:
|
||||
if sync_running:
|
||||
return 0
|
||||
sync_running = True
|
||||
sync_done.clear()
|
||||
|
||||
try:
|
||||
print("mailu-sync-listener: starting sync", flush=True)
|
||||
proc = subprocess.run(["python", "/app/sync.py"], check=False)
|
||||
rc = int(proc.returncode)
|
||||
print(f"mailu-sync-listener: sync completed rc={rc}", flush=True)
|
||||
return rc
|
||||
finally:
|
||||
with lock:
|
||||
sync_running = False
|
||||
last_run = time()
|
||||
sync_done.set()
|
||||
|
||||
def _trigger_sync_async() -> bool:
|
||||
with lock:
|
||||
now = time()
|
||||
if sync_running:
|
||||
return False
|
||||
if now - last_run < MIN_INTERVAL_SECONDS:
|
||||
return
|
||||
last_run = now
|
||||
# Fire and forget; output to stdout
|
||||
subprocess.Popen(["python", "/app/sync.py"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
return False
|
||||
|
||||
thread = threading.Thread(target=_run_sync_blocking, daemon=True)
|
||||
thread.start()
|
||||
return True
|
||||
|
||||
class Handler(http.server.BaseHTTPRequestHandler):
|
||||
def do_POST(self):
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(length) if length else b""
|
||||
try:
|
||||
json.loads(body or b"{}")
|
||||
payload = json.loads(body or b"{}")
|
||||
except json.JSONDecodeError:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
return
|
||||
trigger_sync()
|
||||
|
||||
wait = False
|
||||
if isinstance(payload, dict):
|
||||
wait = bool(payload.get("wait"))
|
||||
|
||||
if wait:
|
||||
# If a sync is already running, wait for it to complete.
|
||||
with lock:
|
||||
already_running = sync_running
|
||||
if already_running:
|
||||
sync_done.wait(timeout=120)
|
||||
with lock:
|
||||
still_running = sync_running
|
||||
self.send_response(200 if not still_running else 503)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
rc = _run_sync_blocking()
|
||||
self.send_response(200 if rc == 0 else 500)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
_trigger_sync_async()
|
||||
self.send_response(202)
|
||||
self.end_headers()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user