import importlib.util import pathlib import sys import types import pytest def load_sync_module(monkeypatch): # Minimal env required by module import env = { "KEYCLOAK_BASE_URL": "http://keycloak", "KEYCLOAK_REALM": "atlas", "KEYCLOAK_CLIENT_ID": "mailu-sync", "KEYCLOAK_CLIENT_SECRET": "secret", "MAILU_DOMAIN": "example.com", "MAILU_DB_HOST": "localhost", "MAILU_DB_PORT": "5432", "MAILU_DB_NAME": "mailu", "MAILU_DB_USER": "mailu", "MAILU_DB_PASSWORD": "pw", } for k, v in env.items(): monkeypatch.setenv(k, v) fake_psycopg2 = types.ModuleType("psycopg2") fake_psycopg2.Error = Exception fake_psycopg2.connect = lambda **kwargs: None fake_psycopg2_extras = types.ModuleType("psycopg2.extras") fake_psycopg2_extras.RealDictCursor = object fake_passlib = types.ModuleType("passlib") fake_passlib_hash = types.ModuleType("passlib.hash") class _FakeBcryptSha256: @staticmethod def hash(password): return f"stub:{password}" fake_passlib_hash.bcrypt_sha256 = _FakeBcryptSha256 fake_passlib.hash = fake_passlib_hash monkeypatch.setitem(sys.modules, "psycopg2", fake_psycopg2) monkeypatch.setitem(sys.modules, "psycopg2.extras", fake_psycopg2_extras) monkeypatch.setitem(sys.modules, "passlib", fake_passlib) monkeypatch.setitem(sys.modules, "passlib.hash", fake_passlib_hash) module_path = ( pathlib.Path(__file__).resolve().parents[2] / "services" / "mailu" / "scripts" / "mailu_sync.py" ) spec = importlib.util.spec_from_file_location("mailu_sync_testmod", module_path) module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) return module def test_random_password_length_and_charset(monkeypatch): sync = load_sync_module(monkeypatch) pw = sync.random_password() assert len(pw) == 24 assert all(ch.isalnum() for ch in pw) class _FakeResponse: def __init__(self, json_data=None, status=200): self._json_data = json_data or {} self.status_code = status def raise_for_status(self): if self.status_code >= 400: raise AssertionError(f"status {self.status_code}") def json(self): return self._json_data class _FakeSession: def __init__(self, put_resp, get_resps): self.put_resp = put_resp self.get_resps = list(get_resps) self.put_called = False self.get_calls = 0 def post(self, *args, **kwargs): return _FakeResponse({"access_token": "dummy"}) def put(self, *args, **kwargs): self.put_called = True return self.put_resp def get(self, *args, **kwargs): self.get_calls += 1 if self.get_resps: return self.get_resps.pop(0) return _FakeResponse({}) def test_kc_update_attributes_succeeds(monkeypatch): sync = load_sync_module(monkeypatch) current_resp = _FakeResponse({"attributes": {}}) ok_resp = _FakeResponse({"attributes": {"mailu_app_password": ["abc"]}}) sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, ok_resp]) sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"}) assert sync.SESSION.put_called and sync.SESSION.get_calls == 2 def test_kc_update_attributes_raises_without_attribute(monkeypatch): sync = load_sync_module(monkeypatch) current_resp = _FakeResponse({"attributes": {}}) missing_attr_resp = _FakeResponse({"attributes": {}}, status=200) sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, missing_attr_resp]) with pytest.raises(Exception): sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"}) def test_kc_get_users_paginates(monkeypatch): sync = load_sync_module(monkeypatch) class _PagedSession: def __init__(self): self.calls = 0 def post(self, *_, **__): return _FakeResponse({"access_token": "tok"}) def get(self, *_, **__): self.calls += 1 if self.calls == 1: return _FakeResponse([{"id": "u1"}, {"id": "u2"}]) return _FakeResponse([]) # stop pagination sync.SESSION = _PagedSession() users = sync.kc_get_users("tok") assert [u["id"] for u in users] == ["u1", "u2"] # Pagination stops when results < page size. assert sync.SESSION.calls == 1 def test_kc_get_users_fetches_second_page_after_full_batch(monkeypatch): sync = load_sync_module(monkeypatch) class _PagedSession: def __init__(self): self.calls = 0 self.first_params = [] def get(self, *_, **kwargs): self.calls += 1 self.first_params.append(kwargs["params"]["first"]) if self.calls == 1: return _FakeResponse([{"id": f"u{i}"} for i in range(200)]) return _FakeResponse([{"id": "last"}]) sync.SESSION = _PagedSession() users = sync.kc_get_users("tok") assert len(users) == 201 assert sync.SESSION.first_params == [0, 200] def test_get_kc_token_posts_client_credentials(monkeypatch): sync = load_sync_module(monkeypatch) calls = [] class _TokenSession: def post(self, url, data, timeout): calls.append((url, data, timeout)) return _FakeResponse({"access_token": "tok"}) sync.SESSION = _TokenSession() assert sync.get_kc_token() == "tok" assert calls[0][1]["grant_type"] == "client_credentials" def test_retry_request_retries_then_succeeds(monkeypatch): sync = load_sync_module(monkeypatch) attempts = [] sleeps = [] def _flaky(): attempts.append(1) if len(attempts) == 1: raise sync.requests.RequestException("temporary") return "ok" monkeypatch.setattr(sync.time, "sleep", lambda seconds: sleeps.append(seconds)) assert sync.retry_request("request", _flaky, attempts=2) == "ok" assert sleeps == [2] def test_retry_request_reraises_final_error(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.time, "sleep", lambda seconds: None) with pytest.raises(sync.requests.RequestException): sync.retry_request( "request", lambda: (_ for _ in ()).throw(sync.requests.RequestException("nope")), attempts=1, ) def test_retry_db_connect_retries_then_succeeds(monkeypatch): sync = load_sync_module(monkeypatch) attempts = [] sleeps = [] def _connect(**kwargs): attempts.append(kwargs) if len(attempts) == 1: raise sync.psycopg2.Error("not yet") return "conn" monkeypatch.setattr(sync.psycopg2, "connect", _connect) monkeypatch.setattr(sync.time, "sleep", lambda seconds: sleeps.append(seconds)) assert sync.retry_db_connect(attempts=2) == "conn" assert sleeps == [2] def test_retry_db_connect_reraises_final_error(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.psycopg2, "connect", lambda **kwargs: (_ for _ in ()).throw(sync.psycopg2.Error("down"))) monkeypatch.setattr(sync.time, "sleep", lambda seconds: None) with pytest.raises(sync.psycopg2.Error): sync.retry_db_connect(attempts=1) def test_ensure_mailu_user_skips_foreign_domain(monkeypatch): sync = load_sync_module(monkeypatch) executed = [] class _Cursor: def execute(self, sql, params): executed.append((sql, params)) sync.ensure_mailu_user(_Cursor(), "user@other.com", "pw", "User") assert not executed def test_ensure_mailu_user_upserts(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}") captured = {} class _Cursor: def execute(self, sql, params): captured.update(params) sync.ensure_mailu_user(_Cursor(), "user@example.com", "pw", "User Example") assert captured["email"] == "user@example.com" assert captured["localpart"] == "user" # password should be hashed, not the raw string assert captured["password"] != "pw" def test_attribute_and_email_helpers(monkeypatch): sync = load_sync_module(monkeypatch) assert sync.get_attribute_value({"x": ["first", "second"]}, "x") == "first" assert sync.get_attribute_value({"x": []}, "x") is None assert sync.get_attribute_value({"x": "value"}, "x") == "value" assert sync.mailu_enabled({"mailu_email": ["legacy@example.com"]}) is True assert sync.mailu_enabled({"mailu_enabled": ["off"]}) is False assert sync.resolve_mailu_email({"username": "fallback", "email": "user@example.com"}, {}) == "user@example.com" assert sync.resolve_mailu_email({"username": "fallback", "email": "user@other.com"}, {}) == "fallback@example.com" def test_safe_update_payload_filters_fields(monkeypatch): sync = load_sync_module(monkeypatch) payload = sync._safe_update_payload( { "username": "user", "enabled": True, "email": "user@example.com", "emailVerified": False, "firstName": "User", "lastName": "Example", "requiredActions": ["UPDATE_PASSWORD", 7], "attributes": "not-a-dict", "ignored": "value", } ) assert payload == { "username": "user", "enabled": True, "email": "user@example.com", "emailVerified": False, "firstName": "User", "lastName": "Example", "requiredActions": ["UPDATE_PASSWORD"], "attributes": {}, } def test_ensure_system_mailboxes_handles_configurations(monkeypatch, capsys): sync = load_sync_module(monkeypatch) ensured = [] monkeypatch.setattr(sync, "MAILU_SYSTEM_USERS", ["postmaster@example.com", "abuse"]) monkeypatch.setattr(sync, "MAILU_SYSTEM_PASSWORD", "") sync.ensure_system_mailboxes(object()) assert "MAILU_SYSTEM_PASSWORD is missing" in capsys.readouterr().out def _ensure(cursor, email, password, display_name): ensured.append((email, password, display_name)) if email == "abuse": raise RuntimeError("boom") monkeypatch.setattr(sync, "MAILU_SYSTEM_PASSWORD", "pw") monkeypatch.setattr(sync, "ensure_mailu_user", _ensure) sync.ensure_system_mailboxes(object()) out = capsys.readouterr().out assert ensured == [ ("postmaster@example.com", "pw", "postmaster"), ("abuse", "pw", "abuse"), ] assert "Ensured system mailbox for postmaster@example.com" in out assert "Failed to ensure system mailbox abuse" in out def test_main_exits_without_users_or_system_mailboxes(monkeypatch, capsys): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync, "MAILU_SYSTEM_USERS", []) monkeypatch.setattr(sync, "get_kc_token", lambda: "tok") monkeypatch.setattr(sync, "kc_get_users", lambda token: []) sync.main() assert "No users found; exiting." in capsys.readouterr().out def test_main_generates_password_and_upserts(monkeypatch): sync = load_sync_module(monkeypatch) monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}") users = [ { "id": "u1", "username": "user1", "email": "user1@example.com", "attributes": {"mailu_enabled": ["true"]}, }, { "id": "u2", "username": "user2", "email": "user2@example.com", "attributes": {"mailu_app_password": ["keepme"], "mailu_enabled": ["true"]}, }, { "id": "u3", "username": "user3", "email": "user3@example.com", "attributes": {"mailu_email": ["user3@example.com"]}, }, {"id": "u4", "username": "user4", "email": "user4@other.com", "attributes": {}}, ] updated = [] class _Cursor: def __init__(self): self.executions = [] def execute(self, sql, params): self.executions.append(params) def close(self): return None class _Conn: def __init__(self): self.autocommit = False self._cursor = _Cursor() def cursor(self, cursor_factory=None): return self._cursor def close(self): return None monkeypatch.setattr(sync, "get_kc_token", lambda: "tok") monkeypatch.setattr(sync, "kc_get_users", lambda token: users) monkeypatch.setattr(sync, "kc_update_attributes", lambda token, user, attrs: updated.append((user["id"], attrs["mailu_app_password"]))) conns = [] def _connect(**kwargs): conn = _Conn() conns.append(conn) return conn monkeypatch.setattr(sync.psycopg2, "connect", _connect) sync.main() # Only mail-enabled users (or legacy users with a mailbox) are synced and backfilled. assert len(updated) == 3 assert conns and len(conns[0]._cursor.executions) == 3