ariadne/tests/test_mailu_events.py

55 lines
1.4 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from ariadne.services.mailu_events import MailuEventRunner
def _instant_thread_factory(target=None, args=(), daemon=None):
class DummyThread:
def start(self) -> None:
if target:
target(*args)
return DummyThread()
def test_mailu_event_wait_success() -> None:
calls = []
def runner(reason: str, force: bool):
calls.append((reason, force))
return "ok", ""
events = MailuEventRunner(
min_interval_sec=0.0,
wait_timeout_sec=0.1,
runner=runner,
thread_factory=_instant_thread_factory,
)
status, payload = events.handle_event({"wait": True})
assert status == 200
assert payload["status"] == "ok"
assert calls
def test_mailu_event_debounce() -> None:
def runner(_reason: str, _force: bool):
return "ok", ""
events = MailuEventRunner(
min_interval_sec=60.0,
wait_timeout_sec=0.1,
runner=runner,
thread_factory=_instant_thread_factory,
)
status, payload = events.handle_event({})
assert status == 202
assert payload["status"] == "accepted"
status, payload = events.handle_event({})
assert status == 202
assert payload["status"] == "skipped"
status, payload = events.handle_event({"force": True})
assert status == 202
assert payload["status"] == "accepted"