import importlib.util import pathlib import pytest def load_sync_module(monkeypatch): # Minimal env required by module import env = { "KEYCLOAK_BASE_URL": "http://keycloak", "KEYCLOAK_REALM": "atlas", "KEYCLOAK_CLIENT_ID": "mailu-sync", "KEYCLOAK_CLIENT_SECRET": "secret", "MAILU_DOMAIN": "example.com", "MAILU_DB_HOST": "localhost", "MAILU_DB_PORT": "5432", "MAILU_DB_NAME": "mailu", "MAILU_DB_USER": "mailu", "MAILU_DB_PASSWORD": "pw", } for k, v in env.items(): monkeypatch.setenv(k, v) module_path = ( pathlib.Path(__file__).resolve().parents[2] / "services" / "mailu" / "scripts" / "mailu_sync.py" ) spec = importlib.util.spec_from_file_location("mailu_sync_testmod", module_path) module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) return module def test_random_password_length_and_charset(monkeypatch): sync = load_sync_module(monkeypatch) pw = sync.random_password() assert len(pw) == 24 assert all(ch.isalnum() for ch in pw) class _FakeResponse: def __init__(self, json_data=None, status=200): self._json_data = json_data or {} self.status_code = status def raise_for_status(self): if self.status_code >= 400: raise AssertionError(f"status {self.status_code}") def json(self): return self._json_data class _FakeSession: def __init__(self, put_resp, get_resps): self.put_resp = put_resp self.get_resps = list(get_resps) self.put_called = False self.get_calls = 0 def post(self, *args, **kwargs): return _FakeResponse({"access_token": "dummy"}) def put(self, *args, **kwargs): self.put_called = True return self.put_resp def get(self, *args, **kwargs): self.get_calls += 1 if self.get_resps: return self.get_resps.pop(0) return _FakeResponse({}) def test_kc_update_attributes_succeeds(monkeypatch): sync = load_sync_module(monkeypatch) current_resp = _FakeResponse({"attributes": {}}) ok_resp = _FakeResponse({"attributes": {"mailu_app_password": ["abc"]}}) sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, ok_resp]) sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"}) assert sync.SESSION.put_called and sync.SESSION.get_calls == 2 def test_kc_update_attributes_raises_without_attribute(monkeypatch): sync = load_sync_module(monkeypatch) current_resp = _FakeResponse({"attributes": {}}) missing_attr_resp = _FakeResponse({"attributes": {}}, status=200) sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, missing_attr_resp]) with pytest.raises(Exception): sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"}) def test_kc_get_users_paginates(monkeypatch): sync = load_sync_module(monkeypatch) class _PagedSession: def __init__(self): self.calls = 0 def post(self, *_, **__): return _FakeResponse({"access_token": "tok"}) def get(self, *_, **__): self.calls += 1 if self.calls == 1: return _FakeResponse([{"id": "u1"}, {"id": "u2"}]) return _FakeResponse([]) # stop pagination sync.SESSION = _PagedSession() users = sync.kc_get_users("tok") assert [u["id"] for u in users] == ["u1", "u2"] # Pagination stops when results < page size. assert sync.SESSION.calls == 1 def test_ensure_mailu_user_skips_foreign_domain(monkeypatch): sync = load_sync_module(monkeypatch) executed = [] class _Cursor: def execute(self, sql, params): executed.append((sql, params)) sync.ensure_mailu_user(_Cursor(), "user@other.com", "pw", "User") assert not executed def test_ensure_mailu_user_upserts(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}") captured = {} class _Cursor: def execute(self, sql, params): captured.update(params) sync.ensure_mailu_user(_Cursor(), "user@example.com", "pw", "User Example") assert captured["email"] == "user@example.com" assert captured["localpart"] == "user" # password should be hashed, not the raw string assert captured["password"] != "pw" def test_main_generates_password_and_upserts(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}") users = [ { "id": "u1", "username": "user1", "email": "user1@example.com", "attributes": {"mailu_enabled": ["true"]}, }, { "id": "u2", "username": "user2", "email": "user2@example.com", "attributes": {"mailu_app_password": ["keepme"], "mailu_enabled": ["true"]}, }, { "id": "u3", "username": "user3", "email": "user3@example.com", "attributes": {"mailu_email": ["user3@example.com"]}, }, {"id": "u4", "username": "user4", "email": "user4@other.com", "attributes": {}}, ] updated = [] class _Cursor: def __init__(self): self.executions = [] def execute(self, sql, params): self.executions.append(params) def close(self): return None class _Conn: def __init__(self): self.autocommit = False self._cursor = _Cursor() def cursor(self, cursor_factory=None): return self._cursor def close(self): return None monkeypatch.setattr(sync, "get_kc_token", lambda: "tok") monkeypatch.setattr(sync, "kc_get_users", lambda token: users) monkeypatch.setattr(sync, "kc_update_attributes", lambda token, user, attrs: updated.append((user["id"], attrs["mailu_app_password"]))) conns = [] def _connect(**kwargs): conn = _Conn() conns.append(conn) return conn monkeypatch.setattr(sync.psycopg2, "connect", _connect) sync.main() # Only mail-enabled users (or legacy users with a mailbox) are synced and backfilled. assert len(updated) == 3 assert conns and len(conns[0]._cursor.executions) == 3