2025-12-12 22:09:04 -03:00
|
|
|
import importlib.util
|
|
|
|
|
import pathlib
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_sync_module(monkeypatch):
|
|
|
|
|
# Minimal env required by module import
|
|
|
|
|
env = {
|
|
|
|
|
"KEYCLOAK_BASE_URL": "http://keycloak",
|
|
|
|
|
"KEYCLOAK_REALM": "atlas",
|
|
|
|
|
"KEYCLOAK_CLIENT_ID": "mailu-sync",
|
|
|
|
|
"KEYCLOAK_CLIENT_SECRET": "secret",
|
|
|
|
|
"MAILU_DOMAIN": "example.com",
|
|
|
|
|
"MAILU_DB_HOST": "localhost",
|
|
|
|
|
"MAILU_DB_PORT": "5432",
|
|
|
|
|
"MAILU_DB_NAME": "mailu",
|
|
|
|
|
"MAILU_DB_USER": "mailu",
|
|
|
|
|
"MAILU_DB_PASSWORD": "pw",
|
|
|
|
|
}
|
|
|
|
|
for k, v in env.items():
|
|
|
|
|
monkeypatch.setenv(k, v)
|
2026-01-13 12:07:03 -03:00
|
|
|
module_path = (
|
|
|
|
|
pathlib.Path(__file__).resolve().parents[2]
|
|
|
|
|
/ "services"
|
|
|
|
|
/ "mailu"
|
|
|
|
|
/ "scripts"
|
|
|
|
|
/ "mailu_sync.py"
|
|
|
|
|
)
|
2025-12-12 22:09:04 -03:00
|
|
|
spec = importlib.util.spec_from_file_location("mailu_sync_testmod", module_path)
|
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
|
|
|
assert spec.loader is not None
|
|
|
|
|
spec.loader.exec_module(module)
|
|
|
|
|
return module
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_random_password_length_and_charset(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
|
|
|
|
pw = sync.random_password()
|
|
|
|
|
assert len(pw) == 24
|
|
|
|
|
assert all(ch.isalnum() for ch in pw)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeResponse:
|
|
|
|
|
def __init__(self, json_data=None, status=200):
|
|
|
|
|
self._json_data = json_data or {}
|
|
|
|
|
self.status_code = status
|
|
|
|
|
|
|
|
|
|
def raise_for_status(self):
|
|
|
|
|
if self.status_code >= 400:
|
|
|
|
|
raise AssertionError(f"status {self.status_code}")
|
|
|
|
|
|
|
|
|
|
def json(self):
|
|
|
|
|
return self._json_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _FakeSession:
|
2026-01-18 01:08:31 -03:00
|
|
|
def __init__(self, put_resp, get_resps):
|
2025-12-12 22:09:04 -03:00
|
|
|
self.put_resp = put_resp
|
2026-01-18 01:08:31 -03:00
|
|
|
self.get_resps = list(get_resps)
|
2025-12-12 22:09:04 -03:00
|
|
|
self.put_called = False
|
2026-01-18 01:08:31 -03:00
|
|
|
self.get_calls = 0
|
2025-12-12 22:09:04 -03:00
|
|
|
|
|
|
|
|
def post(self, *args, **kwargs):
|
|
|
|
|
return _FakeResponse({"access_token": "dummy"})
|
|
|
|
|
|
|
|
|
|
def put(self, *args, **kwargs):
|
|
|
|
|
self.put_called = True
|
|
|
|
|
return self.put_resp
|
|
|
|
|
|
|
|
|
|
def get(self, *args, **kwargs):
|
2026-01-18 01:08:31 -03:00
|
|
|
self.get_calls += 1
|
|
|
|
|
if self.get_resps:
|
|
|
|
|
return self.get_resps.pop(0)
|
|
|
|
|
return _FakeResponse({})
|
2025-12-12 22:09:04 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_kc_update_attributes_succeeds(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
2026-01-18 01:08:31 -03:00
|
|
|
current_resp = _FakeResponse({"attributes": {}})
|
2025-12-12 22:09:04 -03:00
|
|
|
ok_resp = _FakeResponse({"attributes": {"mailu_app_password": ["abc"]}})
|
2026-01-18 01:08:31 -03:00
|
|
|
sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, ok_resp])
|
2025-12-12 22:09:04 -03:00
|
|
|
sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"})
|
2026-01-18 01:08:31 -03:00
|
|
|
assert sync.SESSION.put_called and sync.SESSION.get_calls == 2
|
2025-12-12 22:09:04 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_kc_update_attributes_raises_without_attribute(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
2026-01-18 01:08:31 -03:00
|
|
|
current_resp = _FakeResponse({"attributes": {}})
|
2025-12-12 22:09:04 -03:00
|
|
|
missing_attr_resp = _FakeResponse({"attributes": {}}, status=200)
|
2026-01-18 01:08:31 -03:00
|
|
|
sync.SESSION = _FakeSession(_FakeResponse({}), [current_resp, missing_attr_resp])
|
2025-12-12 22:09:04 -03:00
|
|
|
with pytest.raises(Exception):
|
|
|
|
|
sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"})
|
2025-12-14 14:15:19 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_kc_get_users_paginates(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
|
|
|
|
|
|
|
|
|
class _PagedSession:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.calls = 0
|
|
|
|
|
|
|
|
|
|
def post(self, *_, **__):
|
|
|
|
|
return _FakeResponse({"access_token": "tok"})
|
|
|
|
|
|
|
|
|
|
def get(self, *_, **__):
|
|
|
|
|
self.calls += 1
|
|
|
|
|
if self.calls == 1:
|
|
|
|
|
return _FakeResponse([{"id": "u1"}, {"id": "u2"}])
|
|
|
|
|
return _FakeResponse([]) # stop pagination
|
|
|
|
|
|
|
|
|
|
sync.SESSION = _PagedSession()
|
|
|
|
|
users = sync.kc_get_users("tok")
|
|
|
|
|
assert [u["id"] for u in users] == ["u1", "u2"]
|
2026-01-03 02:35:47 -03:00
|
|
|
# Pagination stops when results < page size.
|
|
|
|
|
assert sync.SESSION.calls == 1
|
2025-12-14 14:15:19 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_ensure_mailu_user_skips_foreign_domain(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
|
|
|
|
executed = []
|
|
|
|
|
|
|
|
|
|
class _Cursor:
|
|
|
|
|
def execute(self, sql, params):
|
|
|
|
|
executed.append((sql, params))
|
|
|
|
|
|
|
|
|
|
sync.ensure_mailu_user(_Cursor(), "user@other.com", "pw", "User")
|
|
|
|
|
assert not executed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_ensure_mailu_user_upserts(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
2026-01-03 02:35:47 -03:00
|
|
|
monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}")
|
2025-12-14 14:15:19 -03:00
|
|
|
captured = {}
|
|
|
|
|
|
|
|
|
|
class _Cursor:
|
|
|
|
|
def execute(self, sql, params):
|
|
|
|
|
captured.update(params)
|
|
|
|
|
|
|
|
|
|
sync.ensure_mailu_user(_Cursor(), "user@example.com", "pw", "User Example")
|
|
|
|
|
assert captured["email"] == "user@example.com"
|
|
|
|
|
assert captured["localpart"] == "user"
|
|
|
|
|
# password should be hashed, not the raw string
|
|
|
|
|
assert captured["password"] != "pw"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_main_generates_password_and_upserts(monkeypatch):
|
|
|
|
|
sync = load_sync_module(monkeypatch)
|
2026-01-03 02:35:47 -03:00
|
|
|
monkeypatch.setattr(sync.bcrypt_sha256, "hash", lambda password: f"hash:{password}")
|
2025-12-14 14:15:19 -03:00
|
|
|
users = [
|
2026-01-18 00:47:38 -03:00
|
|
|
{
|
|
|
|
|
"id": "u1",
|
|
|
|
|
"username": "user1",
|
|
|
|
|
"email": "user1@example.com",
|
|
|
|
|
"attributes": {"mailu_enabled": ["true"]},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
"id": "u2",
|
|
|
|
|
"username": "user2",
|
|
|
|
|
"email": "user2@example.com",
|
|
|
|
|
"attributes": {"mailu_app_password": ["keepme"], "mailu_enabled": ["true"]},
|
|
|
|
|
},
|
2025-12-14 14:15:19 -03:00
|
|
|
{"id": "u3", "username": "user3", "email": "user3@other.com", "attributes": {}},
|
|
|
|
|
]
|
|
|
|
|
updated = []
|
|
|
|
|
|
|
|
|
|
class _Cursor:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.executions = []
|
|
|
|
|
|
|
|
|
|
def execute(self, sql, params):
|
|
|
|
|
self.executions.append(params)
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
class _Conn:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.autocommit = False
|
|
|
|
|
self._cursor = _Cursor()
|
|
|
|
|
|
|
|
|
|
def cursor(self, cursor_factory=None):
|
|
|
|
|
return self._cursor
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
monkeypatch.setattr(sync, "get_kc_token", lambda: "tok")
|
|
|
|
|
monkeypatch.setattr(sync, "kc_get_users", lambda token: users)
|
|
|
|
|
monkeypatch.setattr(sync, "kc_update_attributes", lambda token, user, attrs: updated.append((user["id"], attrs["mailu_app_password"])))
|
|
|
|
|
conns = []
|
|
|
|
|
|
|
|
|
|
def _connect(**kwargs):
|
|
|
|
|
conn = _Conn()
|
|
|
|
|
conns.append(conn)
|
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
monkeypatch.setattr(sync.psycopg2, "connect", _connect)
|
|
|
|
|
|
|
|
|
|
sync.main()
|
|
|
|
|
|
2026-01-18 00:47:38 -03:00
|
|
|
# Only mail-enabled users are synced and backfilled.
|
|
|
|
|
assert len(updated) == 2
|
|
|
|
|
assert conns and len(conns[0]._cursor.executions) == 2
|