from __future__ import annotations from ariadne.services.mailu_events import MailuEventRunner def _instant_thread_factory(target=None, args=(), daemon=None): class DummyThread: def start(self) -> None: if target: target(*args) return DummyThread() def test_mailu_event_wait_success() -> None: calls = [] def runner(reason: str, force: bool): calls.append((reason, force)) return "ok", "" events = MailuEventRunner( min_interval_sec=0.0, wait_timeout_sec=0.1, runner=runner, thread_factory=_instant_thread_factory, ) status, payload = events.handle_event({"wait": True}) assert status == 200 assert payload["status"] == "ok" assert calls def test_mailu_event_debounce() -> None: def runner(_reason: str, _force: bool): return "ok", "" events = MailuEventRunner( min_interval_sec=60.0, wait_timeout_sec=0.1, runner=runner, thread_factory=_instant_thread_factory, ) status, payload = events.handle_event({}) assert status == 202 assert payload["status"] == "accepted" status, payload = events.handle_event({}) assert status == 202 assert payload["status"] == "skipped" status, payload = events.handle_event({"force": True}) assert status == 202 assert payload["status"] == "accepted"