test(ariadne): cover Mailu event runner edges

This commit is contained in:
codex 2026-04-21 03:48:59 -03:00
parent d3ae03f935
commit e6c7b1ab9f
2 changed files with 96 additions and 7 deletions

View File

@ -56,13 +56,7 @@ def _event_context(payload: dict[str, Any] | None) -> dict[str, Any]:
class MailuEventRunner:
"""Debounce Keycloak events into Mailu synchronization runs."""
def __init__(
self,
min_interval_sec: float,
wait_timeout_sec: float,
runner: Callable[[str, bool], tuple[str, str]] | None = None,
thread_factory: Callable[..., threading.Thread] = threading.Thread,
) -> None:
def __init__(self, min_interval_sec: float, wait_timeout_sec: float, runner: Callable[[str, bool], tuple[str, str]] | None = None, thread_factory: Callable[..., threading.Thread] = threading.Thread) -> None:
self._min_interval_sec = min_interval_sec
self._wait_timeout_sec = wait_timeout_sec
self._runner = runner or self._default_runner

View File

@ -1,5 +1,8 @@
from __future__ import annotations
from types import SimpleNamespace
from ariadne.services import mailu_events as mailu_events_module
from ariadne.services.mailu_events import MailuEventRunner
@ -52,3 +55,95 @@ def test_mailu_event_debounce() -> None:
status, payload = events.handle_event({"force": True})
assert status == 202
assert payload["status"] == "accepted"
def test_mailu_event_parses_string_flags_and_context() -> None:
calls = []
def runner(reason: str, force: bool):
calls.append((reason, force))
return "ok", ""
events = MailuEventRunner(
min_interval_sec=0.0,
wait_timeout_sec=0.1,
runner=runner,
thread_factory=_instant_thread_factory,
)
status, payload = events.handle_event({"wait": "yes", "force": "on", "eventType": " UPDATE_PROFILE ", "userId": " u1 "})
assert status == 200
assert payload["status"] == "ok"
assert calls == [("keycloak_event:UPDATE_PROFILE", True)]
def test_mailu_event_defaults_for_missing_payload() -> None:
calls = []
def runner(reason: str, force: bool):
calls.append((reason, force))
return "ok", ""
events = MailuEventRunner(
min_interval_sec=0.0,
wait_timeout_sec=0.1,
runner=runner,
thread_factory=_instant_thread_factory,
)
status, payload = events.handle_event(None)
assert status == 202
assert payload == {"status": "accepted", "triggered": True}
assert calls == [("keycloak_event", False)]
def test_mailu_event_running_skip_and_wait_timeout() -> None:
def parked_thread_factory(target=None, args=(), daemon=None):
class ParkedThread:
def start(self) -> None:
return None
return ParkedThread()
events = MailuEventRunner(
min_interval_sec=0.0,
wait_timeout_sec=0.0,
runner=lambda _reason, _force: ("ok", ""),
thread_factory=parked_thread_factory,
)
status, payload = events.handle_event({"wait": True})
assert status == 200
assert payload == {"status": "running"}
status, payload = events.handle_event({})
assert status == 202
assert payload == {"status": "skipped", "triggered": False}
def test_mailu_event_runner_reports_exceptions() -> None:
def failing_runner(_reason: str, _force: bool):
raise RuntimeError("mailu exploded")
events = MailuEventRunner(
min_interval_sec=0.0,
wait_timeout_sec=0.1,
runner=failing_runner,
thread_factory=_instant_thread_factory,
)
status, payload = events.handle_event({"wait": True})
assert status == 500
assert payload == {"status": "error", "detail": "mailu exploded"}
def test_default_runner_maps_mailu_summary(monkeypatch) -> None:
events = MailuEventRunner(min_interval_sec=0.0, wait_timeout_sec=0.1)
monkeypatch.setattr(mailu_events_module.mailu, "sync", lambda reason, force=False: SimpleNamespace(failures=0, detail="synced"))
assert events._default_runner("test", True) == ("ok", "synced")
monkeypatch.setattr(mailu_events_module.mailu, "sync", lambda reason, force=False: SimpleNamespace(failures=1, detail="failed"))
assert events._default_runner("test", False) == ("error", "failed")