diff --git a/ariadne/services/mailu.py b/ariadne/services/mailu.py index 092a6a6..ad9adcc 100644 --- a/ariadne/services/mailu.py +++ b/ariadne/services/mailu.py @@ -138,11 +138,7 @@ class MailuService: ) @staticmethod - def resolve_mailu_email( - username: str, - attributes: dict[str, Any] | None, - fallback_email: str = "", - ) -> str: + def resolve_mailu_email(username: str, attributes: dict[str, Any] | None, fallback_email: str = "") -> str: attrs = attributes or {} explicit = _extract_attr(attrs, MAILU_EMAIL_ATTR) if explicit: @@ -182,12 +178,7 @@ class MailuService: }, ) - def _prepare_updates( - self, - username: str, - attrs: dict[str, Any], - mailu_email: str, - ) -> tuple[bool, dict[str, list[str]], str]: + def _prepare_updates(self, username: str, attrs: dict[str, Any], mailu_email: str) -> tuple[bool, dict[str, list[str]], str]: updates: dict[str, list[str]] = {} if not _extract_attr(attrs, MAILU_EMAIL_ATTR): updates[MAILU_EMAIL_ATTR] = [mailu_email] @@ -228,10 +219,7 @@ class MailuService: return True return self._is_service_account(user, username) - def _build_sync_context( - self, - user: dict[str, Any], - ) -> tuple[MailuSyncContext | None, MailuUserSyncResult | None]: + def _build_sync_context(self, user: dict[str, Any]) -> tuple[MailuSyncContext | None, MailuUserSyncResult | None]: username = self._username(user) if self._should_skip_user(user, username): return None, MailuUserSyncResult(skipped=1) @@ -270,11 +258,7 @@ class MailuService: None, ) - def _ensure_mailbox_with_retry( - self, - conn: psycopg.Connection, - ctx: MailuSyncContext, - ) -> tuple[bool, bool, bool]: + def _ensure_mailbox_with_retry(self, conn: psycopg.Connection, ctx: MailuSyncContext) -> tuple[bool, bool, bool]: mailbox_ok = False rotated = False failed = False @@ -305,12 +289,7 @@ class MailuService: return mailbox_ok, failed, rotated @staticmethod - def _build_sync_result( - updated: int, - mailbox_ok: bool, - failed: bool, - rotated: bool, - ) -> MailuUserSyncResult: + def _build_sync_result(updated: int, mailbox_ok: bool, failed: bool, rotated: bool) -> MailuUserSyncResult: if failed: return MailuUserSyncResult(failures=1, updated=updated) if mailbox_ok: @@ -326,13 +305,7 @@ class MailuService: mailbox_ok, failed, rotated = self._ensure_mailbox_with_retry(conn, ctx) return self._build_sync_result(ctx.updated, mailbox_ok, failed, rotated) - def _ensure_mailbox( - self, - conn: psycopg.Connection, - email: str, - password: str, - display_name: str, - ) -> bool: + def _ensure_mailbox(self, conn: psycopg.Connection, email: str, password: str, display_name: str) -> bool: email = (email or "").strip() if not email or "@" not in email: return False diff --git a/tests/unit/services/test_mailu_service_edges.py b/tests/unit/services/test_mailu_service_edges.py new file mode 100644 index 0000000..05b26a1 --- /dev/null +++ b/tests/unit/services/test_mailu_service_edges.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import types + +import pytest + +from ariadne.services import mailu as mailu_module +from ariadne.services.mailu import ( + MAILU_APP_PASSWORD_ATTR, + MAILU_EMAIL_ATTR, + MAILU_ENABLED_ATTR, + MailuService, + MailuSyncContext, + MailuUserSyncResult, + PasswordTooLongError, +) + + +def _settings(**overrides): + values = { + "mailu_domain": "bstein.dev", + "mailu_db_host": "localhost", + "mailu_db_port": 5432, + "mailu_db_name": "mailu", + "mailu_db_user": "mailu", + "mailu_db_password": "secret", + "mailu_default_quota": 20_000_000_000, + "mailu_system_users": [], + "mailu_system_password": "", + } + values.update(overrides) + return types.SimpleNamespace(**values) + + +class _Admin: + def __init__(self, *, ready: bool = True, fail_update: bool = False): + self._ready = ready + self.fail_update = fail_update + self.calls = [] + + def ready(self): + return self._ready + + def iter_users(self, page_size=200, brief=False): + return [] + + def update_user_safe(self, user_id, payload): + if self.fail_update: + raise RuntimeError("update failed") + self.calls.append((user_id, payload)) + + def set_user_attribute(self, username, key, value): + self.calls.append((username, key, value)) + + +class _Cursor: + def __init__(self): + self.executed = [] + + def execute(self, query, params=None): + self.executed.append((query, params)) + + def fetchone(self): + return None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class _Conn: + def __init__(self): + self.cursor_obj = _Cursor() + + def cursor(self): + return self.cursor_obj + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +def _service(monkeypatch, **settings_overrides) -> MailuService: + monkeypatch.setattr(mailu_module, "settings", _settings(**settings_overrides)) + return MailuService() + + +def test_mailu_helper_and_context_edges(monkeypatch) -> None: + svc = _service(monkeypatch) + assert mailu_module._extract_attr(None, MAILU_EMAIL_ATTR) is None + assert mailu_module._extract_attr({MAILU_EMAIL_ATTR: ["", " alias@bstein.dev "]}, MAILU_EMAIL_ATTR) == "alias@bstein.dev" + assert mailu_module._extract_attr({MAILU_EMAIL_ATTR: ["", " "]}, MAILU_EMAIL_ATTR) is None + assert mailu_module._extract_attr({MAILU_EMAIL_ATTR: " alias@bstein.dev "}, MAILU_EMAIL_ATTR) == "alias@bstein.dev" + assert mailu_module._display_name({"firstName": " Alice ", "lastName": " Example "}) == "Alice Example" + assert mailu_module._domain_matches("ALICE@BSTEIN.DEV") + assert not mailu_module._domain_matches("alice@example.com") + assert mailu_module._password_too_long("x" * 100) + assert svc.ready() + + updates = {} + assert svc._mailu_enabled({}, updates) + assert updates == {MAILU_ENABLED_ATTR: ["true"]} + assert svc._mailu_enabled({MAILU_ENABLED_ATTR: ["YES"]}, {}) + assert not svc._mailu_enabled({MAILU_ENABLED_ATTR: ["no"]}, {}) + assert svc.resolve_mailu_email("alice", {}, "alice@example.com") == "alice@bstein.dev" + + monkeypatch.setattr(mailu_module, "random_password", lambda *_args: "generated") + enabled, prepared_updates, password = svc._prepare_updates("alice", {}, "alice@bstein.dev") + assert enabled + assert prepared_updates[MAILU_EMAIL_ATTR] == ["alice@bstein.dev"] + assert prepared_updates[MAILU_APP_PASSWORD_ATTR] == ["generated"] + assert password == "generated" + + admin = _Admin() + monkeypatch.setattr(mailu_module, "keycloak_admin", admin) + assert svc._apply_updates("1", {}, "alice") + assert svc._apply_updates("1", {MAILU_EMAIL_ATTR: ["alice@bstein.dev"]}, "alice") + assert admin.calls + monkeypatch.setattr(mailu_module, "keycloak_admin", _Admin(fail_update=True)) + assert not svc._apply_updates("1", {MAILU_EMAIL_ATTR: ["alice@bstein.dev"]}, "alice") + + assert svc._should_skip_user({}, "") + assert svc._should_skip_user({"enabled": False}, "alice") + assert svc._should_skip_user({"serviceAccountClientId": "svc"}, "alice") + assert svc._build_sync_context({})[1].skipped == 1 + assert svc._build_sync_context({"username": "alice"})[1].failures == 1 + monkeypatch.setattr(svc, "_apply_updates", lambda *_args: False) + assert svc._build_sync_context({"id": "1", "username": "alice", "attributes": "bad"})[1].failures == 1 + + +def test_mailu_retry_and_result_edges(monkeypatch) -> None: + svc = _service(monkeypatch) + ctx = MailuSyncContext("alice", "1", "alice@bstein.dev", "pw", 0, "Alice") + monkeypatch.setattr(mailu_module, "random_password", lambda *_args: "retry") + admin = _Admin() + monkeypatch.setattr(mailu_module, "keycloak_admin", admin) + calls = {"count": 0} + + def retrying_ensure(_conn, _email, password, _display): + calls["count"] += 1 + if calls["count"] == 1: + raise PasswordTooLongError("too long") + assert password == "retry" + return True + + monkeypatch.setattr(svc, "_ensure_mailbox", retrying_ensure) + assert svc._ensure_mailbox_with_retry(_Conn(), ctx) == (True, False, True) + assert admin.calls + + def failing_retry(_conn, _email, _password, _display): + raise PasswordTooLongError("too long") + + monkeypatch.setattr(svc, "_ensure_mailbox", failing_retry) + monkeypatch.setattr(mailu_module, "keycloak_admin", _Admin(fail_update=True)) + assert svc._ensure_mailbox_with_retry(_Conn(), ctx) == (False, True, True) + monkeypatch.setattr(svc, "_ensure_mailbox", lambda *_args: (_ for _ in ()).throw(RuntimeError("boom"))) + assert svc._ensure_mailbox_with_retry(_Conn(), ctx) == (False, True, False) + + assert svc._build_sync_result(1, False, True, False) == MailuUserSyncResult(failures=1, updated=1) + assert svc._build_sync_result(0, True, False, False) == MailuUserSyncResult(processed=1, mailboxes=1) + assert svc._build_sync_result(0, False, False, True) == MailuUserSyncResult(skipped=1, updated=1) + assert svc._build_sync_result(0, False, False, False) == MailuUserSyncResult(skipped=1) + + +def test_mailu_ensure_mailbox_edges(monkeypatch) -> None: + svc = _service(monkeypatch) + assert not svc._ensure_mailbox(_Conn(), "", "pw", "") + assert not svc._ensure_mailbox(_Conn(), "invalid", "pw", "") + assert not svc._ensure_mailbox(_Conn(), "alice@example.com", "pw", "") + with pytest.raises(PasswordTooLongError): + svc._ensure_mailbox(_Conn(), "alice@bstein.dev", "x" * 100, "") + + conn = _Conn() + monkeypatch.setattr(mailu_module.bcrypt_sha256, "hash", lambda password: f"hashed:{password}") + assert svc._ensure_mailbox(conn, "alice@bstein.dev", "pw", "Alice") + query, params = conn.cursor_obj.executed[0] + assert "INSERT INTO" in query + assert params["localpart"] == "alice" + assert params["display"] == "Alice" + + def raise_password_limit(_password): + raise ValueError("password cannot be longer than 72 bytes") + + monkeypatch.setattr(mailu_module.bcrypt_sha256, "hash", raise_password_limit) + with pytest.raises(PasswordTooLongError): + svc._ensure_mailbox(_Conn(), "alice@bstein.dev", "pw", "") + monkeypatch.setattr(mailu_module.bcrypt_sha256, "hash", lambda _password: (_ for _ in ()).throw(ValueError("other"))) + with pytest.raises(ValueError, match="other"): + svc._ensure_mailbox(_Conn(), "alice@bstein.dev", "pw", "") + + +def test_mailu_system_sync_and_wait_edges(monkeypatch) -> None: + svc = _service(monkeypatch) + assert svc._ensure_system_mailboxes(_Conn()) == 0 + svc = _service(monkeypatch, mailu_system_users=["ops@bstein.dev"], mailu_system_password="") + assert svc._ensure_system_mailboxes(_Conn()) == 0 + svc = _service(monkeypatch, mailu_system_users=["ops@bstein.dev"], mailu_system_password="x" * 100) + assert svc._ensure_system_mailboxes(_Conn()) == 0 + svc = _service(monkeypatch, mailu_system_users=["", "ops@bstein.dev"], mailu_system_password="pw") + monkeypatch.setattr(svc, "_ensure_mailbox", lambda *_args: True) + assert svc._ensure_system_mailboxes(_Conn()) == 1 + + monkeypatch.setattr(mailu_module, "keycloak_admin", _Admin(ready=False)) + with pytest.raises(RuntimeError, match="keycloak"): + svc.sync("schedule") + monkeypatch.setattr(mailu_module, "keycloak_admin", _Admin()) + svc = _service(monkeypatch, mailu_db_password="") + with pytest.raises(RuntimeError, match="database"): + svc.sync("schedule") + + svc = _service(monkeypatch) + monkeypatch.setattr(svc, "mailbox_exists", lambda _email: False) + ticks = iter([0, 1, 3]) + monkeypatch.setattr(mailu_module.time, "time", lambda: next(ticks)) + monkeypatch.setattr(mailu_module.time, "sleep", lambda _seconds: None) + assert not svc.wait_for_mailbox("missing@bstein.dev", timeout_sec=2) +