from __future__ import annotations from typing import Any import types import httpx import pytest from ariadne.services.keycloak_admin import KeycloakAdminClient class DummyResponse: def __init__(self, payload=None, status_code=200, headers=None): self._payload = payload self.status_code = status_code self.headers = headers or {} def json(self): return self._payload def raise_for_status(self): if self.status_code >= 400: request = httpx.Request("GET", "https://example.com") response = httpx.Response(self.status_code, request=request) raise httpx.HTTPStatusError("error", request=request, response=response) class DummyClient: def __init__(self, responses): self._responses = list(responses) self.calls = [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def _next(self): if not self._responses: raise RuntimeError("missing response") return self._responses.pop(0) def get(self, url, params=None, headers=None): self.calls.append(("get", url, params)) return self._next() def post(self, url, data=None, json=None, headers=None): self.calls.append(("post", url, data, json)) return self._next() def put(self, url, headers=None, json=None): self.calls.append(("put", url, json)) return self._next() def test_set_user_attribute_preserves_profile(monkeypatch) -> None: client = KeycloakAdminClient() captured: dict[str, Any] = {} def fake_find_user(username: str) -> dict[str, Any]: return {"id": "user-123"} def fake_get_user(user_id: str) -> dict[str, Any]: return { "id": user_id, "username": "alice", "email": "alice@bstein.dev", "emailVerified": True, "enabled": True, "firstName": "Alice", "lastName": "Smith", "requiredActions": ["UPDATE_PASSWORD", 123], "attributes": {"existing": ["value"]}, } def fake_update_user(user_id: str, payload: dict[str, Any]) -> None: captured["user_id"] = user_id captured["payload"] = payload monkeypatch.setattr(client, "find_user", fake_find_user) monkeypatch.setattr(client, "get_user", fake_get_user) monkeypatch.setattr(client, "update_user", fake_update_user) client.set_user_attribute("alice", "mailu_app_password", "secret") payload = captured.get("payload") or {} assert payload.get("username") == "alice" assert payload.get("email") == "alice@bstein.dev" assert payload.get("emailVerified") is True assert payload.get("enabled") is True assert payload.get("firstName") == "Alice" assert payload.get("lastName") == "Smith" assert payload.get("requiredActions") == ["UPDATE_PASSWORD"] assert payload.get("attributes") == { "existing": ["value"], "mailu_app_password": ["secret"], } def test_update_user_safe_merges_payload(monkeypatch) -> None: client = KeycloakAdminClient() captured: dict[str, Any] = {} def fake_get_user(user_id: str) -> dict[str, Any]: return { "id": user_id, "username": "alice", "enabled": True, "attributes": {"existing": ["value"]}, } def fake_update_user(user_id: str, payload: dict[str, Any]) -> None: captured["user_id"] = user_id captured["payload"] = payload monkeypatch.setattr(client, "get_user", fake_get_user) monkeypatch.setattr(client, "update_user", fake_update_user) client.update_user_safe( "user-123", {"attributes": {"new": ["item"]}, "requiredActions": ["UPDATE_PASSWORD"]}, ) payload = captured.get("payload") or {} assert payload.get("username") == "alice" assert payload.get("attributes") == {"existing": ["value"], "new": ["item"]} assert payload.get("requiredActions") == ["UPDATE_PASSWORD"] def test_get_token_fetches_once(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() dummy = DummyClient([DummyResponse({"access_token": "token", "expires_in": 120})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client._get_token() == "token" monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("should not call"))) assert client._get_token() == "token" def test_find_user_by_email_case_insensitive(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse([{"email": "Alice@Example.com", "id": "1"}])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) user = client.find_user_by_email("alice@example.com") assert user["id"] == "1" def test_find_user_invalid_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse(["bad"])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.find_user("alice") is None def test_find_user_by_email_empty() -> None: client = KeycloakAdminClient() assert client.find_user_by_email("") is None def test_find_user_by_email_invalid_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({"bad": "payload"})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.find_user_by_email("alice@example.com") is None def test_list_group_names_filters(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse([{"name": "demo"}, {"name": "admin"}, {"name": "test"}])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.list_group_names(exclude={"admin"}) == ["demo", "test"] def test_find_user_by_email_skips_non_dict(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse(["bad"])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.find_user_by_email("alice@example.com") is None def test_get_user_invalid_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse("bad")]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError): client.get_user("user-1") def test_update_user_calls_put(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) client.update_user("user-1", {"enabled": True}) assert dummy.calls def test_update_user_safe_handles_bad_attrs(monkeypatch) -> None: client = KeycloakAdminClient() captured: dict[str, Any] = {} def fake_get_user(user_id: str) -> dict[str, Any]: return {"id": user_id, "username": "alice", "attributes": "bad"} def fake_update_user(user_id: str, payload: dict[str, Any]) -> None: captured["payload"] = payload monkeypatch.setattr(client, "get_user", fake_get_user) monkeypatch.setattr(client, "update_user", fake_update_user) client.update_user_safe("user-1", {"attributes": {"new": ["item"]}}) assert captured["payload"]["attributes"] == {"new": ["item"]} def test_set_user_attribute_user_id_missing(monkeypatch) -> None: client = KeycloakAdminClient() def fake_find_user(username: str) -> dict[str, Any]: return {"id": ""} monkeypatch.setattr(client, "find_user", fake_find_user) with pytest.raises(RuntimeError): client.set_user_attribute("alice", "attr", "val") def test_set_user_attribute_handles_bad_attrs(monkeypatch) -> None: client = KeycloakAdminClient() def fake_find_user(username: str) -> dict[str, Any]: return {"id": "user-1"} def fake_get_user(user_id: str) -> dict[str, Any]: return {"id": user_id, "username": "alice", "attributes": "bad"} monkeypatch.setattr(client, "find_user", fake_find_user) monkeypatch.setattr(client, "get_user", fake_get_user) monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None) client.set_user_attribute("alice", "attr", "val") def test_get_group_id_skips_non_dict(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse(["bad"])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.get_group_id("demo") is None def test_get_group_id_cached(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse([{"name": "demo", "id": "gid"}])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.get_group_id("demo") == "gid" monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("no call"))) assert client.get_group_id("demo") == "gid" def test_get_group_id_invalid_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({"bad": "payload"})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.get_group_id("demo") is None def test_iter_users_paginates(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient( [ DummyResponse([{"id": "1"}, {"id": "2"}]), DummyResponse([{"id": "3"}]), ] ) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) users = client.iter_users(page_size=2, brief=True) assert [u["id"] for u in users] == ["1", "2", "3"] def test_iter_users_empty(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse([])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.iter_users(page_size=2) == [] def test_create_user_parses_location(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({}, headers={"Location": "http://kc/admin/realms/atlas/users/abc"})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.create_user({"username": "alice"}) == "abc" def test_create_user_missing_location(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({}, headers={})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError): client.create_user({"username": "alice"}) def test_get_token_missing_access_token(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() dummy = DummyClient([DummyResponse({"expires_in": 120})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError): client._get_token() def test_reset_password_raises_on_error(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({}, status_code=400)]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(httpx.HTTPStatusError): client.reset_password("user", "pw", temporary=True) def test_get_token_requires_config(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="", keycloak_admin_client_secret="", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() with pytest.raises(RuntimeError): client._get_token() def test_headers_includes_bearer(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "_get_token", lambda: "token") headers = client.headers() assert headers["Authorization"] == "Bearer token" def test_find_user_returns_none(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse([])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) assert client.find_user("alice") is None def test_get_user_invalid_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse(["bad"])]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError): client.get_user("id") def test_get_user_success(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({"id": "1"})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) user = client.get_user("id") assert user["id"] == "1" def test_set_user_attribute_user_missing(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "find_user", lambda username: None) with pytest.raises(RuntimeError): client.set_user_attribute("alice", "attr", "value") def test_set_user_attribute_user_id_missing(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "find_user", lambda username: {}) with pytest.raises(RuntimeError): client.set_user_attribute("alice", "attr", "value") def test_add_user_to_group(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse({})]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) client.add_user_to_group("user", "group") assert dummy.calls[0][0] == "put" def test_get_user_raises_on_non_dict_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse("bad")]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError): client.get_user("user-1") def test_update_user_safe_coerces_bad_attrs(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "get_user", lambda *_args, **_kwargs: {"id": "user-1"}) monkeypatch.setattr(client, "_safe_update_payload", lambda *_args, **_kwargs: {"attributes": "bad"}) monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None) client.update_user_safe("user-1", {"attributes": {"new": ["item"]}}) def test_set_user_attribute_coerces_bad_attrs(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "find_user", lambda username: {"id": "user-1"}) monkeypatch.setattr(client, "get_user", lambda *_args, **_kwargs: {"id": "user-1"}) monkeypatch.setattr(client, "_safe_update_payload", lambda *_args, **_kwargs: {"attributes": "bad"}) monkeypatch.setattr(client, "update_user", lambda *_args, **_kwargs: None) client.set_user_attribute("alice", "attr", "value") def test_set_user_attribute_user_id_missing_raises(monkeypatch) -> None: client = KeycloakAdminClient() monkeypatch.setattr(client, "find_user", lambda username: {"id": ""}) with pytest.raises(RuntimeError): client.set_user_attribute("alice", "attr", "value") def test_get_user_rejects_non_dict_payload(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( keycloak_admin_url="http://kc", keycloak_admin_realm="atlas", keycloak_admin_client_id="client", keycloak_admin_client_secret="secret", keycloak_realm="atlas", ) monkeypatch.setattr("ariadne.services.keycloak_admin.settings", dummy_settings) client = KeycloakAdminClient() client._token = "token" client._expires_at = 9999999999 dummy = DummyClient([DummyResponse(123)]) monkeypatch.setattr("ariadne.services.keycloak_admin.httpx.Client", lambda *args, **kwargs: dummy) with pytest.raises(RuntimeError) as exc: client.get_user("user-1") assert "unexpected user payload" in str(exc.value)