From 11f49ec80750f3f980eb180fa4041a5ea94a4096 Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 07:39:34 -0300 Subject: [PATCH] test(bstein-home): cover keycloak integration client --- backend/tests/test_keycloak_client.py | 380 ++++++++++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 backend/tests/test_keycloak_client.py diff --git a/backend/tests/test_keycloak_client.py b/backend/tests/test_keycloak_client.py new file mode 100644 index 0000000..f194bfc --- /dev/null +++ b/backend/tests/test_keycloak_client.py @@ -0,0 +1,380 @@ +from __future__ import annotations + +"""Coverage for Keycloak token verification, admin operations, and route guards.""" + +from types import SimpleNamespace + +import pytest + +from atlas_portal import keycloak +from atlas_portal.app_factory import create_app + + +class DummyResponse: + """Small HTTP response double for Keycloak admin tests.""" + + def __init__(self, payload=None, *, headers=None, status_code: int = 200) -> None: + self._payload = payload if payload is not None else {} + self.headers = headers or {} + self.status_code = status_code + + def json(self): + """Return the configured response payload.""" + + return self._payload + + def raise_for_status(self) -> None: + """Raise for configured error statuses.""" + + if self.status_code >= 400: + raise RuntimeError("bad status") + + +class SequenceClient: + """httpx.Client replacement that returns queued responses.""" + + responses: list[DummyResponse] = [] + calls: list[tuple[str, str, dict]] = [] + + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + @classmethod + def reset(cls, *responses: DummyResponse) -> None: + """Replace queued responses and clear captured calls.""" + + cls.responses = list(responses) + cls.calls = [] + + def _next(self) -> DummyResponse: + return self.responses.pop(0) + + def get(self, url, **kwargs): + self.calls.append(("GET", url, kwargs)) + return self._next() + + def post(self, url, **kwargs): + self.calls.append(("POST", url, kwargs)) + return self._next() + + def put(self, url, **kwargs): + self.calls.append(("PUT", url, kwargs)) + return self._next() + + +def test_oidc_verify_checks_issuer_and_client(monkeypatch) -> None: + verifier = keycloak.KeycloakOIDC() + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", False) + with pytest.raises(ValueError, match="not enabled"): + verifier.verify("token") + + class DummyJwk: + key = "signing-key" + + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", True) + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_CLIENT_ID", "portal") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ISSUER", "https://sso/realms/atlas") + monkeypatch.setattr(verifier, "_client", lambda: SimpleNamespace(get_signing_key_from_jwt=lambda token: DummyJwk())) + monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "portal", "aud": ["other"]}) + + assert verifier.verify("token")["azp"] == "portal" + + monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "other", "aud": ["not-portal"]}) + with pytest.raises(ValueError, match="not issued"): + verifier.verify("token") + + made: list[str] = [] + + class DummyJwkClient: + def __init__(self, url): + made.append(url) + + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_JWKS_URL", "https://sso/jwks") + monkeypatch.setattr(keycloak, "PyJWKClient", DummyJwkClient) + verifier = keycloak.KeycloakOIDC() + assert verifier._client() is verifier._client() + assert made == ["https://sso/jwks"] + + verifier = keycloak.KeycloakOIDC() + monkeypatch.setattr(verifier, "_client", lambda: SimpleNamespace(get_signing_key_from_jwt=lambda token: SimpleNamespace(key="k"))) + monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "other", "aud": "portal"}) + assert verifier.verify("token")["aud"] == "portal" + + +def test_admin_token_cache_and_basic_user_operations(monkeypatch) -> None: + monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_REALM", "master") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") + + client = keycloak.KeycloakAdminClient() + SequenceClient.reset(DummyResponse({"access_token": "tok", "expires_in": 120})) + assert client.ready() + assert client.headers() == {"Authorization": "Bearer tok"} + assert client.headers() == {"Authorization": "Bearer tok"} + assert len([call for call in SequenceClient.calls if call[0] == "POST"]) == 1 + + SequenceClient.reset(DummyResponse([{"id": "u1", "username": "alice"}])) + assert client.find_user("alice") == {"id": "u1", "username": "alice"} + + SequenceClient.reset(DummyResponse([{"email": "ALICE@example.dev"}])) + assert client.find_user_by_email("alice@example.dev") == {"email": "ALICE@example.dev"} + assert client.find_user_by_email("") is None + + SequenceClient.reset(DummyResponse({"id": "u1", "username": "alice"})) + assert client.get_user("u1")["username"] == "alice" + + SequenceClient.reset(DummyResponse({})) + client.update_user("u1", {"enabled": True}) + assert SequenceClient.calls[-1][0] == "PUT" + + SequenceClient.reset( + DummyResponse({"username": "alice", "enabled": True, "attributes": {"old": ["1"]}}), + DummyResponse({}), + ) + client.update_user_safe("u1", {"attributes": {"new": ["2"]}, "email": "alice@example.dev"}) + sent = SequenceClient.calls[-1][2]["json"] + assert sent["attributes"]["old"] == ["1"] + assert sent["attributes"]["new"] == ["2"] + assert sent["email"] == "alice@example.dev" + + full_payload = keycloak.KeycloakAdminClient._safe_update_payload( + { + "username": "alice", + "enabled": True, + "email": "alice@example.dev", + "emailVerified": True, + "firstName": "Alice", + "lastName": "Atlas", + "requiredActions": ["UPDATE_PASSWORD", 7], + "attributes": "bad", + } + ) + assert full_payload["emailVerified"] is True + assert full_payload["firstName"] == "Alice" + assert full_payload["lastName"] == "Atlas" + assert full_payload["requiredActions"] == ["UPDATE_PASSWORD"] + assert full_payload["attributes"] == {} + + unready = keycloak.KeycloakAdminClient() + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "") + with pytest.raises(RuntimeError, match="not configured"): + unready._get_token() + + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") + SequenceClient.reset(DummyResponse({})) + client = keycloak.KeycloakAdminClient() + with pytest.raises(RuntimeError, match="no access_token"): + client._get_token() + + client._token = "tok" + client._expires_at = 9999999999 + SequenceClient.reset(DummyResponse([]), DummyResponse("not-list"), DummyResponse([{"email": "other@example.dev"}])) + assert client.find_user("missing") is None + assert client.find_user_by_email("alice@example.dev") is None + assert client.find_user_by_email("alice@example.dev") is None + + SequenceClient.reset(DummyResponse([])) + assert client.find_user_by_email("alice@example.dev") is None + + SequenceClient.reset(DummyResponse("not-dict")) + with pytest.raises(RuntimeError, match="unexpected user payload"): + client.get_user("u1") + + SequenceClient.reset(DummyResponse({"username": "alice", "attributes": "bad"}), DummyResponse({})) + client.update_user_safe("u1", {"attributes": {"new": ["2"]}}) + assert SequenceClient.calls[-1][2]["json"]["attributes"] == {"new": ["2"]} + + +def test_admin_create_password_groups_and_credentials(monkeypatch) -> None: + monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_REALM", "master") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_CLIENT_ID", "portal") + + client = keycloak.KeycloakAdminClient() + client._token = "tok" + client._expires_at = 9999999999 + + SequenceClient.reset(DummyResponse({}, headers={"Location": "https://sso/admin/users/u1"})) + assert client.create_user({"username": "alice"}) == "u1" + + SequenceClient.reset(DummyResponse({})) + client.reset_password("u1", "pw", temporary=False) + assert SequenceClient.calls[-1][2]["json"]["temporary"] is False + + SequenceClient.reset( + DummyResponse([{"name": "dev", "id": "g1"}]), + DummyResponse([{"name": "ignored", "id": "g2"}]), + ) + assert client.get_group_id("dev") == "g1" + assert client.get_group_id("dev") == "g1" + assert len(SequenceClient.calls) == 1 + + groups = [{"name": "root", "subGroups": [{"name": "child"}]}] + SequenceClient.reset(DummyResponse(groups)) + assert client.list_group_names() == ["child", "root"] + + SequenceClient.reset(DummyResponse([{"name": "/dev"}, {"name": "admin"}, "bad"])) + assert client.list_user_groups("u1") == ["dev", "admin"] + + SequenceClient.reset(DummyResponse({})) + client.add_user_to_group("u1", "g1") + assert "/groups/g1" in SequenceClient.calls[-1][1] + + SequenceClient.reset(DummyResponse({})) + client.execute_actions_email("u1", ["UPDATE_PASSWORD"], "https://portal/account") + assert SequenceClient.calls[-1][2]["json"] == ["UPDATE_PASSWORD"] + + SequenceClient.reset(DummyResponse([{"type": "password"}, "bad"])) + assert client.get_user_credentials("u1") == [{"type": "password"}] + + SequenceClient.reset(DummyResponse({"not": "a-list"})) + assert client.get_user_credentials("u1") == [] + + SequenceClient.reset(DummyResponse([{"name": "other", "id": "g2"}, "bad"])) + assert client.get_group_id("missing") is None + + SequenceClient.reset(DummyResponse([{"name": "root", "subGroups": ["bad"]}])) + assert client.list_group_names() == ["root"] + + +def test_admin_set_attribute_and_error_edges(monkeypatch) -> None: + client = keycloak.KeycloakAdminClient() + monkeypatch.setattr(client, "find_user", lambda username: {"id": "u1"} if username == "alice" else None) + monkeypatch.setattr(client, "get_user", lambda user_id: {"username": "alice", "attributes": {"old": ["1"]}}) + updated: list[dict] = [] + monkeypatch.setattr(client, "update_user", lambda user_id, payload: updated.append(payload)) + + client.set_user_attribute("alice", "mailu", "pw") + assert updated[0]["attributes"]["mailu"] == ["pw"] + + monkeypatch.setattr(client, "get_user", lambda user_id: {"username": "alice", "attributes": "bad"}) + client.set_user_attribute("alice", "next", "value") + assert updated[-1]["attributes"]["next"] == ["value"] + with pytest.raises(RuntimeError, match="user not found"): + client.set_user_attribute("nobody", "mailu", "pw") + + monkeypatch.setattr(client, "find_user", lambda username: {"id": ""}) + with pytest.raises(RuntimeError, match="user id missing"): + client.set_user_attribute("alice", "mailu", "pw") + + monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") + client = keycloak.KeycloakAdminClient() + client._token = "tok" + client._expires_at = 9999999999 + + SequenceClient.reset(DummyResponse({})) + with pytest.raises(RuntimeError, match="created user id"): + client.create_user({"username": "alice"}) + + SequenceClient.reset(DummyResponse([])) + assert client.get_group_id("missing") is None + + SequenceClient.reset(DummyResponse({"not": "groups"})) + assert client.list_group_names() == [] + + SequenceClient.reset(DummyResponse({"not": "groups"})) + assert client.list_user_groups("u1") == [] + + +def test_singletons_and_auth_guards(monkeypatch) -> None: + app = create_app() + monkeypatch.setattr(keycloak, "_OIDC", None) + monkeypatch.setattr(keycloak, "_ADMIN", None) + assert keycloak.oidc_client() is keycloak.oidc_client() + assert keycloak.admin_client() is keycloak.admin_client() + + with app.test_request_context(): + assert keycloak._extract_bearer_token() is None + + with app.test_request_context(headers={"Authorization": "Bearer tok"}): + assert keycloak._extract_bearer_token() == "tok" + with app.test_request_context(headers={"Authorization": "Token tok"}): + assert keycloak._extract_bearer_token() is None + with app.test_request_context(headers={"Authorization": "Bearer "}): + assert keycloak._extract_bearer_token() is None + + assert keycloak._normalize_groups("bad") == [] + assert keycloak._normalize_groups(["/dev", 7, ""]) == ["dev"] + + @keycloak.require_auth + def protected(): + return {"ok": True} + + monkeypatch.setattr( + keycloak, + "oidc_client", + lambda: SimpleNamespace(verify=lambda token: {"preferred_username": "alice", "email": "a@example.dev", "groups": ["/dev"]}), + ) + with app.test_request_context(headers={"Authorization": "Bearer tok"}): + assert protected() == {"ok": True} + assert keycloak.g.keycloak_groups == ["dev"] + + monkeypatch.setattr( + keycloak, + "oidc_client", + lambda: SimpleNamespace(verify=lambda token: (_ for _ in ()).throw(ValueError("bad"))), + ) + with app.test_request_context(headers={"Authorization": "Bearer tok"}): + response, status = protected() + assert status == 401 + assert response.get_json()["error"] == "invalid token" + + with app.test_request_context(): + response, status = protected() + assert status == 401 + assert response.get_json()["error"] == "missing bearer token" + + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", False) + with app.test_request_context(): + ok, response = keycloak.require_portal_admin() + assert not ok and response[1] == 503 + ok, response = keycloak.require_account_access() + assert not ok and response[1] == 503 + + monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", True) + monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_USERS", {"alice"}) + monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_GROUPS", {"admin"}) + monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", {"dev"}) + with app.test_request_context(): + keycloak.g.keycloak_username = "alice" + keycloak.g.keycloak_groups = ["dev"] + assert keycloak.require_portal_admin() == (True, None) + assert keycloak.require_account_access() == (True, None) + + monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_USERS", set()) + with app.test_request_context(): + keycloak.g.keycloak_username = "carol" + keycloak.g.keycloak_groups = ["admin"] + assert keycloak.require_portal_admin() == (True, None) + + monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", set()) + with app.test_request_context(): + assert keycloak.require_account_access() == (True, None) + + monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", {"dev"}) + with app.test_request_context(): + keycloak.g.keycloak_groups = [] + assert keycloak.require_account_access() == (True, None) + + with app.test_request_context(): + keycloak.g.keycloak_username = "bob" + keycloak.g.keycloak_groups = ["other"] + assert keycloak.require_portal_admin()[0] is False + assert keycloak.require_account_access()[0] is False