from tests.unit.services.service_helpers import * def test_mailu_sync_updates_attrs(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_default_quota=20000000000, mailu_system_users=[], mailu_system_password="", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.keycloak_admin.ready", lambda: True) monkeypatch.setattr( "ariadne.services.mailu.keycloak_admin.iter_users", lambda *args, **kwargs: [ { "id": "1", "username": "alice", "enabled": True, "email": "alice@example.com", "attributes": {}, "firstName": "Alice", "lastName": "Example", } ], ) updates: list[tuple[str, dict[str, object]]] = [] monkeypatch.setattr( "ariadne.services.mailu.keycloak_admin.update_user_safe", lambda user_id, payload: updates.append((user_id, payload)), ) mailbox_calls: list[tuple[str, str, str]] = [] monkeypatch.setattr( "ariadne.services.mailu.MailuService._ensure_mailbox", lambda self, _conn, email, password, display: mailbox_calls.append((email, password, display)) or True, ) class DummyConn: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() summary = svc.sync("provision", force=True) assert summary.processed == 1 assert summary.updated == 1 assert mailbox_calls assert updates assert "mailu_email" in updates[0][1]["attributes"] def test_mailu_sync_rotates_long_password(monkeypatch) -> None: long_password = "x" * 100 dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_default_quota=20000000000, mailu_system_users=[], mailu_system_password="", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.keycloak_admin.ready", lambda: True) monkeypatch.setattr( "ariadne.services.mailu.keycloak_admin.iter_users", lambda *args, **kwargs: [ { "id": "1", "username": "alice", "enabled": True, "email": "alice@example.com", "attributes": {"mailu_app_password": [long_password]}, "firstName": "Alice", "lastName": "Example", } ], ) monkeypatch.setattr("ariadne.services.mailu.random_password", lambda *_args, **_kwargs: "short-pass-123") updates: list[tuple[str, dict[str, object]]] = [] monkeypatch.setattr( "ariadne.services.mailu.keycloak_admin.update_user_safe", lambda user_id, payload: updates.append((user_id, payload)), ) mailbox_calls: list[tuple[str, str, str]] = [] monkeypatch.setattr( "ariadne.services.mailu.MailuService._ensure_mailbox", lambda self, _conn, email, password, display: mailbox_calls.append((email, password, display)) or True, ) class DummyConn: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() summary = svc.sync("provision", force=True) assert summary.processed == 1 assert updates attrs = updates[0][1]["attributes"] assert attrs["mailu_app_password"] == ["short-pass-123"] assert mailbox_calls assert mailbox_calls[0][1] == "short-pass-123" def test_mailu_sync_retries_on_password_limit(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_default_quota=20000000000, mailu_system_users=[], mailu_system_password="", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr(mailu_module.keycloak_admin, "ready", lambda: True) update_calls: list[tuple[str, dict[str, object]]] = [] monkeypatch.setattr( mailu_module.keycloak_admin, "update_user_safe", lambda user_id, payload: update_calls.append((user_id, payload)), ) monkeypatch.setattr( mailu_module.keycloak_admin, "iter_users", lambda *args, **kwargs: [ { "id": "1", "username": "alice", "enabled": True, "email": "alice@example.com", "attributes": {"mailu_app_password": ["short-pass"]}, "firstName": "Alice", "lastName": "Example", } ], ) monkeypatch.setattr("ariadne.services.mailu.random_password", lambda *_args, **_kwargs: "retry-pass-1") set_calls: list[tuple[str, str, str]] = [] monkeypatch.setattr( mailu_module.keycloak_admin, "set_user_attribute", lambda username, key, value: set_calls.append((username, key, value)), ) call_count = {"count": 0} def fake_ensure(self, _conn, _email, password, _display): call_count["count"] += 1 if call_count["count"] == 1: raise mailu_module.PasswordTooLongError("password cannot be longer than 72 bytes") assert password == "retry-pass-1" return True monkeypatch.setattr("ariadne.services.mailu.MailuService._ensure_mailbox", fake_ensure) class DummyConn: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() summary = svc.sync("provision", force=True) assert summary.processed == 1 assert update_calls assert call_count["count"] == 2 assert set_calls def test_mailu_sync_skips_disabled(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_default_quota=20000000000, mailu_system_users=[], mailu_system_password="", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.keycloak_admin.ready", lambda: True) monkeypatch.setattr( "ariadne.services.mailu.keycloak_admin.iter_users", lambda *args, **kwargs: [ { "id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_enabled": ["false"]}, } ], ) mailbox_calls: list[tuple[str, str, str]] = [] monkeypatch.setattr( "ariadne.services.mailu.MailuService._ensure_mailbox", lambda self, _conn, email, password, display: mailbox_calls.append((email, password, display)) or True, ) class DummyConn: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() summary = svc.sync("provision") assert summary.skipped == 1 assert mailbox_calls == [] def test_mailu_sync_system_mailboxes(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_domain="bstein.dev", mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_default_quota=20000000000, mailu_system_users=["no-reply-portal@bstein.dev"], mailu_system_password="systempw", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.keycloak_admin.ready", lambda: True) monkeypatch.setattr("ariadne.services.mailu.keycloak_admin.iter_users", lambda *args, **kwargs: []) mailbox_calls: list[tuple[str, str, str]] = [] monkeypatch.setattr( "ariadne.services.mailu.MailuService._ensure_mailbox", lambda self, _conn, email, password, display: mailbox_calls.append((email, password, display)) or True, ) class DummyConn: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() summary = svc.sync("schedule") assert summary.system_mailboxes == 1 assert mailbox_calls[0][0] == "no-reply-portal@bstein.dev" def test_mailu_mailbox_exists_handles_error(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_sync_url="", mailu_sync_wait_timeout_sec=10.0, mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_domain="bstein.dev", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("boom"))) svc = MailuService() assert svc.mailbox_exists("alice@bstein.dev") is False def test_mailu_mailbox_exists_success(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_sync_url="", mailu_sync_wait_timeout_sec=10.0, mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_domain="bstein.dev", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) class DummyCursor: def execute(self, *_args, **_kwargs): return None def fetchone(self): return {"id": 1} def __enter__(self): return self def __exit__(self, *_args): return False class DummyConn: def cursor(self): return DummyCursor() def __enter__(self): return self def __exit__(self, *_args): return False monkeypatch.setattr("ariadne.services.mailu.psycopg.connect", lambda *args, **kwargs: DummyConn()) svc = MailuService() assert svc.mailbox_exists("alice@bstein.dev") is True def test_mailu_wait_for_mailbox(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_sync_url="", mailu_sync_wait_timeout_sec=10.0, mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_domain="bstein.dev", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr(MailuService, "mailbox_exists", lambda self, email: True) svc = MailuService() assert svc.wait_for_mailbox("alice@bstein.dev", timeout_sec=1.0) is True def test_mailu_mailbox_exists_empty_email(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_sync_url="", mailu_sync_wait_timeout_sec=10.0, mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", mailu_domain="bstein.dev", ) monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) svc = MailuService() assert svc.mailbox_exists("") is False