182 lines
5.6 KiB
Python
182 lines
5.6 KiB
Python
import importlib.util
|
|
import pathlib
|
|
|
|
import pytest
|
|
|
|
|
|
def load_sync_module(monkeypatch):
|
|
# Minimal env required by module import
|
|
env = {
|
|
"KEYCLOAK_BASE_URL": "http://keycloak",
|
|
"KEYCLOAK_REALM": "atlas",
|
|
"KEYCLOAK_CLIENT_ID": "mailu-sync",
|
|
"KEYCLOAK_CLIENT_SECRET": "secret",
|
|
"MAILU_DOMAIN": "example.com",
|
|
"MAILU_DB_HOST": "localhost",
|
|
"MAILU_DB_PORT": "5432",
|
|
"MAILU_DB_NAME": "mailu",
|
|
"MAILU_DB_USER": "mailu",
|
|
"MAILU_DB_PASSWORD": "pw",
|
|
}
|
|
for k, v in env.items():
|
|
monkeypatch.setenv(k, v)
|
|
module_path = pathlib.Path(__file__).resolve().parents[1] / "mailu_sync.py"
|
|
spec = importlib.util.spec_from_file_location("mailu_sync_testmod", module_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def test_random_password_length_and_charset(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
pw = sync.random_password()
|
|
assert len(pw) == 24
|
|
assert all(ch.isalnum() for ch in pw)
|
|
|
|
|
|
class _FakeResponse:
|
|
def __init__(self, json_data=None, status=200):
|
|
self._json_data = json_data or {}
|
|
self.status_code = status
|
|
|
|
def raise_for_status(self):
|
|
if self.status_code >= 400:
|
|
raise AssertionError(f"status {self.status_code}")
|
|
|
|
def json(self):
|
|
return self._json_data
|
|
|
|
|
|
class _FakeSession:
|
|
def __init__(self, put_resp, get_resp):
|
|
self.put_resp = put_resp
|
|
self.get_resp = get_resp
|
|
self.put_called = False
|
|
self.get_called = False
|
|
|
|
def post(self, *args, **kwargs):
|
|
return _FakeResponse({"access_token": "dummy"})
|
|
|
|
def put(self, *args, **kwargs):
|
|
self.put_called = True
|
|
return self.put_resp
|
|
|
|
def get(self, *args, **kwargs):
|
|
self.get_called = True
|
|
return self.get_resp
|
|
|
|
|
|
def test_kc_update_attributes_succeeds(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
ok_resp = _FakeResponse({"attributes": {"mailu_app_password": ["abc"]}})
|
|
sync.SESSION = _FakeSession(_FakeResponse({}), ok_resp)
|
|
sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"})
|
|
assert sync.SESSION.put_called and sync.SESSION.get_called
|
|
|
|
|
|
def test_kc_update_attributes_raises_without_attribute(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
missing_attr_resp = _FakeResponse({"attributes": {}}, status=200)
|
|
sync.SESSION = _FakeSession(_FakeResponse({}), missing_attr_resp)
|
|
with pytest.raises(Exception):
|
|
sync.kc_update_attributes("token", {"id": "u1", "username": "u1"}, {"mailu_app_password": "abc"})
|
|
|
|
|
|
def test_kc_get_users_paginates(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
|
|
class _PagedSession:
|
|
def __init__(self):
|
|
self.calls = 0
|
|
|
|
def post(self, *_, **__):
|
|
return _FakeResponse({"access_token": "tok"})
|
|
|
|
def get(self, *_, **__):
|
|
self.calls += 1
|
|
if self.calls == 1:
|
|
return _FakeResponse([{"id": "u1"}, {"id": "u2"}])
|
|
return _FakeResponse([]) # stop pagination
|
|
|
|
sync.SESSION = _PagedSession()
|
|
users = sync.kc_get_users("tok")
|
|
assert [u["id"] for u in users] == ["u1", "u2"]
|
|
assert sync.SESSION.calls == 2
|
|
|
|
|
|
def test_ensure_mailu_user_skips_foreign_domain(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
executed = []
|
|
|
|
class _Cursor:
|
|
def execute(self, sql, params):
|
|
executed.append((sql, params))
|
|
|
|
sync.ensure_mailu_user(_Cursor(), "user@other.com", "pw", "User")
|
|
assert not executed
|
|
|
|
|
|
def test_ensure_mailu_user_upserts(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
captured = {}
|
|
|
|
class _Cursor:
|
|
def execute(self, sql, params):
|
|
captured.update(params)
|
|
|
|
sync.ensure_mailu_user(_Cursor(), "user@example.com", "pw", "User Example")
|
|
assert captured["email"] == "user@example.com"
|
|
assert captured["localpart"] == "user"
|
|
# password should be hashed, not the raw string
|
|
assert captured["password"] != "pw"
|
|
|
|
|
|
def test_main_generates_password_and_upserts(monkeypatch):
|
|
sync = load_sync_module(monkeypatch)
|
|
users = [
|
|
{"id": "u1", "username": "user1", "email": "user1@example.com", "attributes": {}},
|
|
{"id": "u2", "username": "user2", "email": "user2@example.com", "attributes": {"mailu_app_password": ["keepme"]}},
|
|
{"id": "u3", "username": "user3", "email": "user3@other.com", "attributes": {}},
|
|
]
|
|
updated = []
|
|
|
|
class _Cursor:
|
|
def __init__(self):
|
|
self.executions = []
|
|
|
|
def execute(self, sql, params):
|
|
self.executions.append(params)
|
|
|
|
def close(self):
|
|
return None
|
|
|
|
class _Conn:
|
|
def __init__(self):
|
|
self.autocommit = False
|
|
self._cursor = _Cursor()
|
|
|
|
def cursor(self, cursor_factory=None):
|
|
return self._cursor
|
|
|
|
def close(self):
|
|
return None
|
|
|
|
monkeypatch.setattr(sync, "get_kc_token", lambda: "tok")
|
|
monkeypatch.setattr(sync, "kc_get_users", lambda token: users)
|
|
monkeypatch.setattr(sync, "kc_update_attributes", lambda token, user, attrs: updated.append((user["id"], attrs["mailu_app_password"])))
|
|
conns = []
|
|
|
|
def _connect(**kwargs):
|
|
conn = _Conn()
|
|
conns.append(conn)
|
|
return conn
|
|
|
|
monkeypatch.setattr(sync.psycopg2, "connect", _connect)
|
|
|
|
sync.main()
|
|
|
|
# Should attempt two inserts (third user skipped due to domain mismatch)
|
|
assert len(updated) == 1 # only one missing attr was backfilled
|
|
assert conns and len(conns[0]._cursor.executions) == 2
|