343 lines
17 KiB
Python
343 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
import types
|
|
|
|
import pytest
|
|
|
|
from ariadne.services import firefly as firefly_module
|
|
from ariadne.services import wger as wger_module
|
|
from ariadne.services.firefly import FireflyService, FireflySyncInput
|
|
from ariadne.services.wger import WgerService, WgerSyncInput
|
|
|
|
|
|
class _Executor:
|
|
def __init__(self, *, exit_code: int = 0, stdout: str = "ok", stderr: str = "", exc: Exception | None = None):
|
|
self.exit_code = exit_code
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
self.exc = exc
|
|
self.calls = []
|
|
|
|
def exec(self, command, env=None, timeout_sec=None, check=True):
|
|
self.calls.append((command, env or {}, timeout_sec, check))
|
|
if self.exc:
|
|
raise self.exc
|
|
return types.SimpleNamespace(
|
|
stdout=self.stdout,
|
|
stderr=self.stderr,
|
|
exit_code=self.exit_code,
|
|
ok=self.exit_code == 0,
|
|
)
|
|
|
|
|
|
class _Admin:
|
|
def __init__(self, *, ready: bool = True, user=None, full=None, fail_get: bool = False, fail_set: bool = False):
|
|
self._ready = ready
|
|
self._user = user
|
|
self._full = full
|
|
self.fail_get = fail_get
|
|
self.fail_set = fail_set
|
|
self.calls = []
|
|
|
|
def ready(self):
|
|
return self._ready
|
|
|
|
def find_user(self, username):
|
|
return self._user if self._user is not None else {"id": "1", "username": username}
|
|
|
|
def get_user(self, user_id):
|
|
if self.fail_get:
|
|
raise RuntimeError("get failed")
|
|
return self._full if self._full is not None else {"id": user_id, "username": "alice", "attributes": {}}
|
|
|
|
def iter_users(self, page_size=200, brief=False):
|
|
return self._full if isinstance(self._full, list) else []
|
|
|
|
def set_user_attribute(self, username, key, value):
|
|
if self.fail_set:
|
|
raise RuntimeError("set failed")
|
|
self.calls.append((username, key, value))
|
|
|
|
|
|
def _settings(prefix: str, namespace: str = "apps"):
|
|
values = {
|
|
f"{prefix}_namespace": namespace,
|
|
f"{prefix}_pod_label": f"app={prefix}",
|
|
f"{prefix}_container": prefix,
|
|
f"{prefix}_user_sync_wait_timeout_sec": 60.0,
|
|
"mailu_domain": "bstein.dev",
|
|
}
|
|
if prefix == "firefly":
|
|
values.update(
|
|
{
|
|
"firefly_cron_base_url": "http://firefly/cron",
|
|
"firefly_cron_token": "token",
|
|
"firefly_cron_timeout_sec": 10.0,
|
|
}
|
|
)
|
|
else:
|
|
values.update(
|
|
{
|
|
"wger_admin_username": "admin",
|
|
"wger_admin_password": "pw",
|
|
"wger_admin_email": "admin@bstein.dev",
|
|
}
|
|
)
|
|
return types.SimpleNamespace(**values)
|
|
|
|
|
|
def _install(monkeypatch, module, service_cls, prefix: str, executor: _Executor | None = None, namespace: str = "apps"):
|
|
monkeypatch.setattr(module, "settings", _settings(prefix, namespace))
|
|
executor = executor or _Executor()
|
|
monkeypatch.setattr(module, "PodExecutor", lambda *_args, **_kwargs: executor)
|
|
return service_cls(), executor
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("module", "service_cls", "input_cls", "prefix", "password_attr", "updated_attr", "rotated_attr"),
|
|
[
|
|
(
|
|
firefly_module,
|
|
FireflyService,
|
|
FireflySyncInput,
|
|
"firefly",
|
|
firefly_module.FIREFLY_PASSWORD_ATTR,
|
|
firefly_module.FIREFLY_PASSWORD_UPDATED_ATTR,
|
|
firefly_module.FIREFLY_PASSWORD_ROTATED_ATTR,
|
|
),
|
|
(
|
|
wger_module,
|
|
WgerService,
|
|
WgerSyncInput,
|
|
"wger",
|
|
wger_module.WGER_PASSWORD_ATTR,
|
|
wger_module.WGER_PASSWORD_UPDATED_ATTR,
|
|
wger_module.WGER_PASSWORD_ROTATED_ATTR,
|
|
),
|
|
],
|
|
)
|
|
def test_shared_helper_edges(monkeypatch, module, service_cls, input_cls, prefix, password_attr, updated_attr, rotated_attr) -> None:
|
|
assert "PASSWORD" in getattr(module, f"_{prefix}_check_command")()
|
|
counters = getattr(module, f"{prefix.title()}SyncCounters")(failures=1)
|
|
assert counters.status() == "error"
|
|
assert counters.summary("detail").detail == "detail"
|
|
|
|
assert module._extract_attr(None, password_attr) == ""
|
|
assert module._extract_attr({password_attr: ["", " pw "]}, password_attr) == "pw"
|
|
assert module._extract_attr({password_attr: ["", " "]}, password_attr) == ""
|
|
assert module._extract_attr({password_attr: " pw "}, password_attr) == "pw"
|
|
assert module._should_skip_user({"enabled": False}, "alice")
|
|
assert module._should_skip_user({"serviceAccountClientId": "svc"}, "alice")
|
|
assert module._should_skip_user({}, "service-account-demo")
|
|
assert not module._should_skip_user({"enabled": True}, "alice")
|
|
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_get=True))
|
|
assert module._load_attrs("1", {}) is None
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(full={"attributes": "bad"}))
|
|
assert module._load_attrs("1", {}) == {}
|
|
|
|
monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "")
|
|
assert module._ensure_mailu_email("alice", {}, "") is None
|
|
monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "alice@bstein.dev")
|
|
assert module._ensure_mailu_email("alice", {"mailu_email": ["stored@bstein.dev"]}, "") == "alice@bstein.dev"
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True))
|
|
assert module._ensure_mailu_email("alice", {}, "") is None
|
|
|
|
assert getattr(module, f"_ensure_{prefix}_password")("alice", {password_attr: ["pw"]}) == ("pw", False)
|
|
monkeypatch.setattr(module, "random_password", lambda *_args: "generated")
|
|
admin = _Admin()
|
|
monkeypatch.setattr(module, "keycloak_admin", admin)
|
|
assert getattr(module, f"_ensure_{prefix}_password")("alice", {}) == ("generated", True)
|
|
assert admin.calls[-1][1] == password_attr
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True))
|
|
assert getattr(module, f"_ensure_{prefix}_password")("alice", {}) == (None, False)
|
|
assert not getattr(module, f"_set_{prefix}_updated_at")("alice")
|
|
assert not getattr(module, f"_set_{prefix}_rotated_at")("alice")
|
|
|
|
assert module._normalize_user({})[0].status == "skipped"
|
|
assert module._normalize_user({"username": "alice"})[0].detail == "missing user id"
|
|
identity = module.UserIdentity("alice", "1")
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_get=True))
|
|
assert module._load_attrs_or_outcome(identity, {})[1].detail == "missing attributes"
|
|
monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "")
|
|
assert module._mailu_email_or_outcome(identity, {}, {"email": ""})[1].detail == "missing mailu email"
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True))
|
|
assert getattr(module, f"_{prefix}_password_or_outcome")(identity, {})[2].detail.endswith("password")
|
|
assert module._should_skip_sync(False, "2025-01-01T00:00:00Z")
|
|
|
|
monkeypatch.setattr(module, "_normalize_user", lambda _user: (module.UserSyncOutcome("skipped"), None))
|
|
assert module._build_sync_input({"username": "alice"}).status == "skipped"
|
|
monkeypatch.setattr(module, "_normalize_user", lambda _user: (None, None))
|
|
assert module._build_sync_input({"username": "alice"}).detail == "missing identity"
|
|
monkeypatch.setattr(module, "_normalize_user", lambda _user: (None, identity))
|
|
monkeypatch.setattr(module, "_load_attrs_or_outcome", lambda *_args: (None, None))
|
|
assert module._build_sync_input({"username": "alice"}).detail == "missing attributes"
|
|
monkeypatch.setattr(module, "_load_attrs_or_outcome", lambda *_args: ({}, None))
|
|
monkeypatch.setattr(module, "_mailu_email_or_outcome", lambda *_args: (None, None))
|
|
assert module._build_sync_input({"username": "alice"}).detail == "missing mailu email"
|
|
monkeypatch.setattr(module, "_mailu_email_or_outcome", lambda *_args: ("alice@bstein.dev", None))
|
|
monkeypatch.setattr(module, f"_{prefix}_password_or_outcome", lambda *_args: (None, False, None))
|
|
assert module._build_sync_input({"username": "alice"}).detail.endswith("password")
|
|
|
|
assert module._rotation_result("error", "detail") == {"status": "error", "detail": "detail"}
|
|
assert module._rotation_result("ok", rotated=False) == {"status": "ok", "rotated": False}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("module", "service_cls", "input_cls", "prefix"),
|
|
[
|
|
(firefly_module, FireflyService, FireflySyncInput, "firefly"),
|
|
(wger_module, WgerService, WgerSyncInput, "wger"),
|
|
],
|
|
)
|
|
def test_rotation_input_and_service_outcomes(monkeypatch, module, service_cls, input_cls, prefix) -> None:
|
|
_install(monkeypatch, module, service_cls, prefix)
|
|
assert module._rotation_check_input("")[1] == "missing username"
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(ready=False))
|
|
assert module._rotation_check_input("alice")[1] == "keycloak admin not configured"
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(user=None))
|
|
monkeypatch.setattr(module.keycloak_admin, "find_user", lambda _username: "bad")
|
|
assert module._rotation_check_input("alice")[1] == "user not found"
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(user={"username": "alice"}))
|
|
assert module._rotation_check_input("alice")[1] == "missing user id"
|
|
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix)
|
|
monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (None, "boom"))
|
|
assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "boom"}
|
|
monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (module.UserSyncOutcome("skipped"), ""))
|
|
assert svc.check_rotation_for_user("alice") == {"status": "ok", "rotated": False}
|
|
monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (module.UserSyncOutcome("failed", "bad"), ""))
|
|
assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "bad"}
|
|
prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "")
|
|
monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (prepared, ""))
|
|
monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("skipped"))
|
|
assert svc.check_rotation_for_user("alice") == {"status": "ok", "rotated": False}
|
|
monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("failed", ""))
|
|
assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "rotation check failed"}
|
|
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix)
|
|
prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "")
|
|
rotated = input_cls("alice", "alice@bstein.dev", "pw", False, "", "rotated")
|
|
assert svc._rotation_outcome(rotated).status == "skipped"
|
|
monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "match"})
|
|
assert svc._rotation_outcome(prepared).status == "skipped"
|
|
monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"})
|
|
monkeypatch.setattr(module, f"_set_{prefix}_rotated_at", lambda _username: False)
|
|
assert svc._rotation_outcome(prepared).detail == "failed to set rotated_at"
|
|
monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {})
|
|
assert svc._rotation_outcome(prepared).detail == "password check failed"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("module", "service_cls", "input_cls", "prefix"),
|
|
[
|
|
(firefly_module, FireflyService, FireflySyncInput, "firefly"),
|
|
(wger_module, WgerService, WgerSyncInput, "wger"),
|
|
],
|
|
)
|
|
def test_exec_and_sync_entry_edges(monkeypatch, module, service_cls, input_cls, prefix) -> None:
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix, _Executor(exc=TimeoutError("timeout")))
|
|
if prefix == "firefly":
|
|
assert svc.sync_user("alice@bstein.dev", "pw")["status"] == "error"
|
|
assert svc.check_password("", "pw", "alice")["status"] == "error"
|
|
with pytest.raises(RuntimeError, match="missing email"):
|
|
svc.check_password("", "pw")
|
|
with pytest.raises(RuntimeError, match="missing password"):
|
|
svc.check_password("alice@bstein.dev", "")
|
|
no_config, _executor = _install(monkeypatch, module, service_cls, prefix, namespace="")
|
|
with pytest.raises(RuntimeError, match="not configured"):
|
|
no_config.check_password("alice@bstein.dev", "pw", "alice")
|
|
else:
|
|
assert svc.sync_user("alice", "alice@bstein.dev", "pw")["status"] == "error"
|
|
assert svc.check_password("alice", "pw")["status"] == "error"
|
|
assert svc.ensure_admin()["status"] == "error"
|
|
with pytest.raises(RuntimeError, match="missing username"):
|
|
svc.check_password("", "pw")
|
|
with pytest.raises(RuntimeError, match="missing password"):
|
|
svc.check_password("alice", "")
|
|
no_config, _executor = _install(monkeypatch, module, service_cls, prefix, namespace="")
|
|
with pytest.raises(RuntimeError, match="not configured"):
|
|
no_config.check_password("alice", "pw")
|
|
|
|
for exit_code, expected in ((0, "match"), (1, "mismatch"), (3, "missing"), (42, "error")):
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix, _Executor(exit_code=exit_code, stdout=""))
|
|
if prefix == "firefly":
|
|
assert svc.check_password("alice@bstein.dev", "pw", "alice")["status"] == expected
|
|
else:
|
|
assert svc.check_password("alice", "pw")["status"] == expected
|
|
|
|
prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "")
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix)
|
|
monkeypatch.setattr(module, "_build_sync_input", lambda _user: module.UserSyncOutcome("skipped"))
|
|
assert svc._sync_user_entry({}).status == "skipped"
|
|
monkeypatch.setattr(module, "_build_sync_input", lambda _user: prepared)
|
|
monkeypatch.setattr(module, "_should_skip_sync", lambda *_args: True)
|
|
monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("skipped"))
|
|
assert svc._sync_user_entry({}).status == "skipped"
|
|
monkeypatch.setattr(module, "_should_skip_sync", lambda *_args: False)
|
|
monkeypatch.setattr(svc, "sync_user", lambda *_args, **_kwargs: {"status": "error", "detail": "bad"})
|
|
assert svc._sync_user_entry({}).detail in {"bad", "sync error"}
|
|
monkeypatch.setattr(svc, "sync_user", lambda *_args, **_kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(module, f"_set_{prefix}_updated_at", lambda _username: False)
|
|
assert svc._sync_user_entry({}).detail == "failed to set updated_at"
|
|
|
|
if prefix == "firefly":
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix)
|
|
monkeypatch.setattr(module, "settings", _settings(prefix))
|
|
module.settings.firefly_cron_token = ""
|
|
with pytest.raises(RuntimeError, match="cron token"):
|
|
svc.run_cron()
|
|
|
|
class StatusClient:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def get(self, _url):
|
|
return types.SimpleNamespace(status_code=500)
|
|
|
|
monkeypatch.setattr(module.httpx, "Client", lambda *args, **kwargs: StatusClient())
|
|
module.settings.firefly_cron_token = "token"
|
|
assert svc.run_cron() == {"status": "error", "detail": "status=500"}
|
|
|
|
class RaisingClient(StatusClient):
|
|
def get(self, _url):
|
|
raise RuntimeError("network")
|
|
|
|
monkeypatch.setattr(module.httpx, "Client", lambda *args, **kwargs: RaisingClient())
|
|
assert svc.run_cron() == {"status": "error", "detail": "network"}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("module", "service_cls", "prefix"),
|
|
[
|
|
(firefly_module, FireflyService, "firefly"),
|
|
(wger_module, WgerService, "wger"),
|
|
],
|
|
)
|
|
def test_sync_users_summary_edges(monkeypatch, module, service_cls, prefix) -> None:
|
|
_install(monkeypatch, module, service_cls, prefix)
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(ready=False))
|
|
assert service_cls().sync_users()["status"] == "error"
|
|
|
|
_install(monkeypatch, module, service_cls, prefix, namespace="")
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin())
|
|
with pytest.raises(RuntimeError):
|
|
service_cls().sync_users()
|
|
|
|
users = [{"username": "ok"}, {"username": "skip"}, {"username": "fail"}]
|
|
svc, _executor = _install(monkeypatch, module, service_cls, prefix)
|
|
monkeypatch.setattr(module, "keycloak_admin", _Admin(full=users))
|
|
|
|
def outcome(user):
|
|
return module.UserSyncOutcome({"ok": "synced", "skip": "skipped"}.get(user["username"], "failed"))
|
|
|
|
monkeypatch.setattr(svc, "_sync_user_entry", outcome)
|
|
result = svc.sync_users()
|
|
assert result["status"] == "error"
|
|
assert result["summary"]["synced"] == 1
|
|
assert result["summary"]["skipped"] == 1
|
|
assert result["summary"]["failures"] == 1
|