from __future__ import annotations import types import pytest from ariadne.services import firefly as firefly_module from ariadne.services import wger as wger_module from ariadne.services.firefly import FireflyService, FireflySyncInput from ariadne.services.wger import WgerService, WgerSyncInput class _Executor: def __init__(self, *, exit_code: int = 0, stdout: str = "ok", stderr: str = "", exc: Exception | None = None): self.exit_code = exit_code self.stdout = stdout self.stderr = stderr self.exc = exc self.calls = [] def exec(self, command, env=None, timeout_sec=None, check=True): self.calls.append((command, env or {}, timeout_sec, check)) if self.exc: raise self.exc return types.SimpleNamespace( stdout=self.stdout, stderr=self.stderr, exit_code=self.exit_code, ok=self.exit_code == 0, ) class _Admin: def __init__(self, *, ready: bool = True, user=None, full=None, fail_get: bool = False, fail_set: bool = False): self._ready = ready self._user = user self._full = full self.fail_get = fail_get self.fail_set = fail_set self.calls = [] def ready(self): return self._ready def find_user(self, username): return self._user if self._user is not None else {"id": "1", "username": username} def get_user(self, user_id): if self.fail_get: raise RuntimeError("get failed") return self._full if self._full is not None else {"id": user_id, "username": "alice", "attributes": {}} def iter_users(self, page_size=200, brief=False): return self._full if isinstance(self._full, list) else [] def set_user_attribute(self, username, key, value): if self.fail_set: raise RuntimeError("set failed") self.calls.append((username, key, value)) def _settings(prefix: str, namespace: str = "apps"): values = { f"{prefix}_namespace": namespace, f"{prefix}_pod_label": f"app={prefix}", f"{prefix}_container": prefix, f"{prefix}_user_sync_wait_timeout_sec": 60.0, "mailu_domain": "bstein.dev", } if prefix == "firefly": values.update( { "firefly_cron_base_url": "http://firefly/cron", "firefly_cron_token": "token", "firefly_cron_timeout_sec": 10.0, } ) else: values.update( { "wger_admin_username": "admin", "wger_admin_password": "pw", "wger_admin_email": "admin@bstein.dev", } ) return types.SimpleNamespace(**values) def _install(monkeypatch, module, service_cls, prefix: str, executor: _Executor | None = None, namespace: str = "apps"): monkeypatch.setattr(module, "settings", _settings(prefix, namespace)) executor = executor or _Executor() monkeypatch.setattr(module, "PodExecutor", lambda *_args, **_kwargs: executor) return service_cls(), executor @pytest.mark.parametrize( ("module", "service_cls", "input_cls", "prefix", "password_attr", "updated_attr", "rotated_attr"), [ ( firefly_module, FireflyService, FireflySyncInput, "firefly", firefly_module.FIREFLY_PASSWORD_ATTR, firefly_module.FIREFLY_PASSWORD_UPDATED_ATTR, firefly_module.FIREFLY_PASSWORD_ROTATED_ATTR, ), ( wger_module, WgerService, WgerSyncInput, "wger", wger_module.WGER_PASSWORD_ATTR, wger_module.WGER_PASSWORD_UPDATED_ATTR, wger_module.WGER_PASSWORD_ROTATED_ATTR, ), ], ) def test_shared_helper_edges(monkeypatch, module, service_cls, input_cls, prefix, password_attr, updated_attr, rotated_attr) -> None: assert "PASSWORD" in getattr(module, f"_{prefix}_check_command")() counters = getattr(module, f"{prefix.title()}SyncCounters")(failures=1) assert counters.status() == "error" assert counters.summary("detail").detail == "detail" assert module._extract_attr(None, password_attr) == "" assert module._extract_attr({password_attr: ["", " pw "]}, password_attr) == "pw" assert module._extract_attr({password_attr: ["", " "]}, password_attr) == "" assert module._extract_attr({password_attr: " pw "}, password_attr) == "pw" assert module._should_skip_user({"enabled": False}, "alice") assert module._should_skip_user({"serviceAccountClientId": "svc"}, "alice") assert module._should_skip_user({}, "service-account-demo") assert not module._should_skip_user({"enabled": True}, "alice") monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_get=True)) assert module._load_attrs("1", {}) is None monkeypatch.setattr(module, "keycloak_admin", _Admin(full={"attributes": "bad"})) assert module._load_attrs("1", {}) == {} monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "") assert module._ensure_mailu_email("alice", {}, "") is None monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "alice@bstein.dev") assert module._ensure_mailu_email("alice", {"mailu_email": ["stored@bstein.dev"]}, "") == "alice@bstein.dev" monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True)) assert module._ensure_mailu_email("alice", {}, "") is None assert getattr(module, f"_ensure_{prefix}_password")("alice", {password_attr: ["pw"]}) == ("pw", False) monkeypatch.setattr(module, "random_password", lambda *_args: "generated") admin = _Admin() monkeypatch.setattr(module, "keycloak_admin", admin) assert getattr(module, f"_ensure_{prefix}_password")("alice", {}) == ("generated", True) assert admin.calls[-1][1] == password_attr monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True)) assert getattr(module, f"_ensure_{prefix}_password")("alice", {}) == (None, False) assert not getattr(module, f"_set_{prefix}_updated_at")("alice") assert not getattr(module, f"_set_{prefix}_rotated_at")("alice") assert module._normalize_user({})[0].status == "skipped" assert module._normalize_user({"username": "alice"})[0].detail == "missing user id" identity = module.UserIdentity("alice", "1") monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_get=True)) assert module._load_attrs_or_outcome(identity, {})[1].detail == "missing attributes" monkeypatch.setattr(module.mailu, "resolve_mailu_email", lambda *_args: "") assert module._mailu_email_or_outcome(identity, {}, {"email": ""})[1].detail == "missing mailu email" monkeypatch.setattr(module, "keycloak_admin", _Admin(fail_set=True)) assert getattr(module, f"_{prefix}_password_or_outcome")(identity, {})[2].detail.endswith("password") assert module._should_skip_sync(False, "2025-01-01T00:00:00Z") monkeypatch.setattr(module, "_normalize_user", lambda _user: (module.UserSyncOutcome("skipped"), None)) assert module._build_sync_input({"username": "alice"}).status == "skipped" monkeypatch.setattr(module, "_normalize_user", lambda _user: (None, None)) assert module._build_sync_input({"username": "alice"}).detail == "missing identity" monkeypatch.setattr(module, "_normalize_user", lambda _user: (None, identity)) monkeypatch.setattr(module, "_load_attrs_or_outcome", lambda *_args: (None, None)) assert module._build_sync_input({"username": "alice"}).detail == "missing attributes" monkeypatch.setattr(module, "_load_attrs_or_outcome", lambda *_args: ({}, None)) monkeypatch.setattr(module, "_mailu_email_or_outcome", lambda *_args: (None, None)) assert module._build_sync_input({"username": "alice"}).detail == "missing mailu email" monkeypatch.setattr(module, "_mailu_email_or_outcome", lambda *_args: ("alice@bstein.dev", None)) monkeypatch.setattr(module, f"_{prefix}_password_or_outcome", lambda *_args: (None, False, None)) assert module._build_sync_input({"username": "alice"}).detail.endswith("password") assert module._rotation_result("error", "detail") == {"status": "error", "detail": "detail"} assert module._rotation_result("ok", rotated=False) == {"status": "ok", "rotated": False} @pytest.mark.parametrize( ("module", "service_cls", "input_cls", "prefix"), [ (firefly_module, FireflyService, FireflySyncInput, "firefly"), (wger_module, WgerService, WgerSyncInput, "wger"), ], ) def test_rotation_input_and_service_outcomes(monkeypatch, module, service_cls, input_cls, prefix) -> None: _install(monkeypatch, module, service_cls, prefix) assert module._rotation_check_input("")[1] == "missing username" monkeypatch.setattr(module, "keycloak_admin", _Admin(ready=False)) assert module._rotation_check_input("alice")[1] == "keycloak admin not configured" monkeypatch.setattr(module, "keycloak_admin", _Admin(user=None)) monkeypatch.setattr(module.keycloak_admin, "find_user", lambda _username: "bad") assert module._rotation_check_input("alice")[1] == "user not found" monkeypatch.setattr(module, "keycloak_admin", _Admin(user={"username": "alice"})) assert module._rotation_check_input("alice")[1] == "missing user id" svc, _executor = _install(monkeypatch, module, service_cls, prefix) monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (None, "boom")) assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "boom"} monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (module.UserSyncOutcome("skipped"), "")) assert svc.check_rotation_for_user("alice") == {"status": "ok", "rotated": False} monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (module.UserSyncOutcome("failed", "bad"), "")) assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "bad"} prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "") monkeypatch.setattr(module, "_rotation_check_input", lambda _username: (prepared, "")) monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("skipped")) assert svc.check_rotation_for_user("alice") == {"status": "ok", "rotated": False} monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("failed", "")) assert svc.check_rotation_for_user("alice") == {"status": "error", "detail": "rotation check failed"} svc, _executor = _install(monkeypatch, module, service_cls, prefix) prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "") rotated = input_cls("alice", "alice@bstein.dev", "pw", False, "", "rotated") assert svc._rotation_outcome(rotated).status == "skipped" monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "match"}) assert svc._rotation_outcome(prepared).status == "skipped" monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"}) monkeypatch.setattr(module, f"_set_{prefix}_rotated_at", lambda _username: False) assert svc._rotation_outcome(prepared).detail == "failed to set rotated_at" monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {}) assert svc._rotation_outcome(prepared).detail == "password check failed" @pytest.mark.parametrize( ("module", "service_cls", "input_cls", "prefix"), [ (firefly_module, FireflyService, FireflySyncInput, "firefly"), (wger_module, WgerService, WgerSyncInput, "wger"), ], ) def test_exec_and_sync_entry_edges(monkeypatch, module, service_cls, input_cls, prefix) -> None: svc, _executor = _install(monkeypatch, module, service_cls, prefix, _Executor(exc=TimeoutError("timeout"))) if prefix == "firefly": assert svc.sync_user("alice@bstein.dev", "pw")["status"] == "error" assert svc.check_password("", "pw", "alice")["status"] == "error" with pytest.raises(RuntimeError, match="missing email"): svc.check_password("", "pw") with pytest.raises(RuntimeError, match="missing password"): svc.check_password("alice@bstein.dev", "") no_config, _executor = _install(monkeypatch, module, service_cls, prefix, namespace="") with pytest.raises(RuntimeError, match="not configured"): no_config.check_password("alice@bstein.dev", "pw", "alice") else: assert svc.sync_user("alice", "alice@bstein.dev", "pw")["status"] == "error" assert svc.check_password("alice", "pw")["status"] == "error" assert svc.ensure_admin()["status"] == "error" with pytest.raises(RuntimeError, match="missing username"): svc.check_password("", "pw") with pytest.raises(RuntimeError, match="missing password"): svc.check_password("alice", "") no_config, _executor = _install(monkeypatch, module, service_cls, prefix, namespace="") with pytest.raises(RuntimeError, match="not configured"): no_config.check_password("alice", "pw") for exit_code, expected in ((0, "match"), (1, "mismatch"), (3, "missing"), (42, "error")): svc, _executor = _install(monkeypatch, module, service_cls, prefix, _Executor(exit_code=exit_code, stdout="")) if prefix == "firefly": assert svc.check_password("alice@bstein.dev", "pw", "alice")["status"] == expected else: assert svc.check_password("alice", "pw")["status"] == expected prepared = input_cls("alice", "alice@bstein.dev", "pw", False, "", "") svc, _executor = _install(monkeypatch, module, service_cls, prefix) monkeypatch.setattr(module, "_build_sync_input", lambda _user: module.UserSyncOutcome("skipped")) assert svc._sync_user_entry({}).status == "skipped" monkeypatch.setattr(module, "_build_sync_input", lambda _user: prepared) monkeypatch.setattr(module, "_should_skip_sync", lambda *_args: True) monkeypatch.setattr(svc, "_rotation_outcome", lambda _prepared: module.UserSyncOutcome("skipped")) assert svc._sync_user_entry({}).status == "skipped" monkeypatch.setattr(module, "_should_skip_sync", lambda *_args: False) monkeypatch.setattr(svc, "sync_user", lambda *_args, **_kwargs: {"status": "error", "detail": "bad"}) assert svc._sync_user_entry({}).detail in {"bad", "sync error"} monkeypatch.setattr(svc, "sync_user", lambda *_args, **_kwargs: {"status": "ok"}) monkeypatch.setattr(module, f"_set_{prefix}_updated_at", lambda _username: False) assert svc._sync_user_entry({}).detail == "failed to set updated_at" if prefix == "firefly": svc, _executor = _install(monkeypatch, module, service_cls, prefix) monkeypatch.setattr(module, "settings", _settings(prefix)) module.settings.firefly_cron_token = "" with pytest.raises(RuntimeError, match="cron token"): svc.run_cron() class StatusClient: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def get(self, _url): return types.SimpleNamespace(status_code=500) monkeypatch.setattr(module.httpx, "Client", lambda *args, **kwargs: StatusClient()) module.settings.firefly_cron_token = "token" assert svc.run_cron() == {"status": "error", "detail": "status=500"} class RaisingClient(StatusClient): def get(self, _url): raise RuntimeError("network") monkeypatch.setattr(module.httpx, "Client", lambda *args, **kwargs: RaisingClient()) assert svc.run_cron() == {"status": "error", "detail": "network"} @pytest.mark.parametrize( ("module", "service_cls", "prefix"), [ (firefly_module, FireflyService, "firefly"), (wger_module, WgerService, "wger"), ], ) def test_sync_users_summary_edges(monkeypatch, module, service_cls, prefix) -> None: _install(monkeypatch, module, service_cls, prefix) monkeypatch.setattr(module, "keycloak_admin", _Admin(ready=False)) assert service_cls().sync_users()["status"] == "error" _install(monkeypatch, module, service_cls, prefix, namespace="") monkeypatch.setattr(module, "keycloak_admin", _Admin()) with pytest.raises(RuntimeError): service_cls().sync_users() users = [{"username": "ok"}, {"username": "skip"}, {"username": "fail"}] svc, _executor = _install(monkeypatch, module, service_cls, prefix) monkeypatch.setattr(module, "keycloak_admin", _Admin(full=users)) def outcome(user): return module.UserSyncOutcome({"ok": "synced", "skip": "skipped"}.get(user["username"], "failed")) monkeypatch.setattr(svc, "_sync_user_entry", outcome) result = svc.sync_users() assert result["status"] == "error" assert result["summary"]["synced"] == 1 assert result["summary"]["skipped"] == 1 assert result["summary"]["failures"] == 1