from __future__ import annotations """Coverage for Keycloak token verification, admin operations, and route guards.""" from types import SimpleNamespace import pytest from atlas_portal import keycloak from atlas_portal.app_factory import create_app class DummyResponse: """Small HTTP response double for Keycloak admin tests.""" def __init__(self, payload=None, *, headers=None, status_code: int = 200) -> None: self._payload = payload if payload is not None else {} self.headers = headers or {} self.status_code = status_code def json(self): """Return the configured response payload.""" return self._payload def raise_for_status(self) -> None: """Raise for configured error statuses.""" if self.status_code >= 400: raise RuntimeError("bad status") class SequenceClient: """httpx.Client replacement that returns queued responses.""" responses: list[DummyResponse] = [] calls: list[tuple[str, str, dict]] = [] def __init__(self, timeout): self.timeout = timeout def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False @classmethod def reset(cls, *responses: DummyResponse) -> None: """Replace queued responses and clear captured calls.""" cls.responses = list(responses) cls.calls = [] def _next(self) -> DummyResponse: return self.responses.pop(0) def get(self, url, **kwargs): self.calls.append(("GET", url, kwargs)) return self._next() def post(self, url, **kwargs): self.calls.append(("POST", url, kwargs)) return self._next() def put(self, url, **kwargs): self.calls.append(("PUT", url, kwargs)) return self._next() def test_oidc_verify_checks_issuer_and_client(monkeypatch) -> None: verifier = keycloak.KeycloakOIDC() monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", False) with pytest.raises(ValueError, match="not enabled"): verifier.verify("token") class DummyJwk: key = "signing-key" monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", True) monkeypatch.setattr(keycloak.settings, "KEYCLOAK_CLIENT_ID", "portal") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ISSUER", "https://sso/realms/atlas") monkeypatch.setattr(verifier, "_client", lambda: SimpleNamespace(get_signing_key_from_jwt=lambda token: DummyJwk())) monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "portal", "aud": ["other"]}) assert verifier.verify("token")["azp"] == "portal" monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "other", "aud": ["not-portal"]}) with pytest.raises(ValueError, match="not issued"): verifier.verify("token") made: list[str] = [] class DummyJwkClient: def __init__(self, url): made.append(url) monkeypatch.setattr(keycloak.settings, "KEYCLOAK_JWKS_URL", "https://sso/jwks") monkeypatch.setattr(keycloak, "PyJWKClient", DummyJwkClient) verifier = keycloak.KeycloakOIDC() assert verifier._client() is verifier._client() assert made == ["https://sso/jwks"] verifier = keycloak.KeycloakOIDC() monkeypatch.setattr(verifier, "_client", lambda: SimpleNamespace(get_signing_key_from_jwt=lambda token: SimpleNamespace(key="k"))) monkeypatch.setattr(keycloak.jwt, "decode", lambda *a, **k: {"azp": "other", "aud": "portal"}) assert verifier.verify("token")["aud"] == "portal" def test_admin_token_cache_and_basic_user_operations(monkeypatch) -> None: monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_REALM", "master") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") client = keycloak.KeycloakAdminClient() SequenceClient.reset(DummyResponse({"access_token": "tok", "expires_in": 120})) assert client.ready() assert client.headers() == {"Authorization": "Bearer tok"} assert client.headers() == {"Authorization": "Bearer tok"} assert len([call for call in SequenceClient.calls if call[0] == "POST"]) == 1 SequenceClient.reset(DummyResponse([{"id": "u1", "username": "alice"}])) assert client.find_user("alice") == {"id": "u1", "username": "alice"} SequenceClient.reset(DummyResponse([{"email": "ALICE@example.dev"}])) assert client.find_user_by_email("alice@example.dev") == {"email": "ALICE@example.dev"} assert client.find_user_by_email("") is None SequenceClient.reset(DummyResponse({"id": "u1", "username": "alice"})) assert client.get_user("u1")["username"] == "alice" SequenceClient.reset(DummyResponse({})) client.update_user("u1", {"enabled": True}) assert SequenceClient.calls[-1][0] == "PUT" SequenceClient.reset( DummyResponse({"username": "alice", "enabled": True, "attributes": {"old": ["1"]}}), DummyResponse({}), ) client.update_user_safe("u1", {"attributes": {"new": ["2"]}, "email": "alice@example.dev"}) sent = SequenceClient.calls[-1][2]["json"] assert sent["attributes"]["old"] == ["1"] assert sent["attributes"]["new"] == ["2"] assert sent["email"] == "alice@example.dev" full_payload = keycloak.KeycloakAdminClient._safe_update_payload( { "username": "alice", "enabled": True, "email": "alice@example.dev", "emailVerified": True, "firstName": "Alice", "lastName": "Atlas", "requiredActions": ["UPDATE_PASSWORD", 7], "attributes": "bad", } ) assert full_payload["emailVerified"] is True assert full_payload["firstName"] == "Alice" assert full_payload["lastName"] == "Atlas" assert full_payload["requiredActions"] == ["UPDATE_PASSWORD"] assert full_payload["attributes"] == {} unready = keycloak.KeycloakAdminClient() monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "") with pytest.raises(RuntimeError, match="not configured"): unready._get_token() monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") SequenceClient.reset(DummyResponse({})) client = keycloak.KeycloakAdminClient() with pytest.raises(RuntimeError, match="no access_token"): client._get_token() client._token = "tok" client._expires_at = 9999999999 SequenceClient.reset(DummyResponse([]), DummyResponse("not-list"), DummyResponse([{"email": "other@example.dev"}])) assert client.find_user("missing") is None assert client.find_user_by_email("alice@example.dev") is None assert client.find_user_by_email("alice@example.dev") is None SequenceClient.reset(DummyResponse([])) assert client.find_user_by_email("alice@example.dev") is None SequenceClient.reset(DummyResponse("not-dict")) with pytest.raises(RuntimeError, match="unexpected user payload"): client.get_user("u1") SequenceClient.reset(DummyResponse({"username": "alice", "attributes": "bad"}), DummyResponse({})) client.update_user_safe("u1", {"attributes": {"new": ["2"]}}) assert SequenceClient.calls[-1][2]["json"]["attributes"] == {"new": ["2"]} def test_admin_create_password_groups_and_credentials(monkeypatch) -> None: monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_REALM", "master") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_CLIENT_ID", "portal") client = keycloak.KeycloakAdminClient() client._token = "tok" client._expires_at = 9999999999 SequenceClient.reset(DummyResponse({}, headers={"Location": "https://sso/admin/users/u1"})) assert client.create_user({"username": "alice"}) == "u1" SequenceClient.reset(DummyResponse({})) client.reset_password("u1", "pw", temporary=False) assert SequenceClient.calls[-1][2]["json"]["temporary"] is False SequenceClient.reset( DummyResponse([{"name": "dev", "id": "g1"}]), DummyResponse([{"name": "ignored", "id": "g2"}]), ) assert client.get_group_id("dev") == "g1" assert client.get_group_id("dev") == "g1" assert len(SequenceClient.calls) == 1 groups = [{"name": "root", "subGroups": [{"name": "child"}]}] SequenceClient.reset(DummyResponse(groups)) assert client.list_group_names() == ["child", "root"] SequenceClient.reset(DummyResponse([{"name": "/dev"}, {"name": "admin"}, "bad"])) assert client.list_user_groups("u1") == ["dev", "admin"] SequenceClient.reset(DummyResponse({})) client.add_user_to_group("u1", "g1") assert "/groups/g1" in SequenceClient.calls[-1][1] SequenceClient.reset(DummyResponse({})) client.execute_actions_email("u1", ["UPDATE_PASSWORD"], "https://portal/account") assert SequenceClient.calls[-1][2]["json"] == ["UPDATE_PASSWORD"] SequenceClient.reset(DummyResponse([{"type": "password"}, "bad"])) assert client.get_user_credentials("u1") == [{"type": "password"}] SequenceClient.reset(DummyResponse({"not": "a-list"})) assert client.get_user_credentials("u1") == [] SequenceClient.reset(DummyResponse([{"name": "other", "id": "g2"}, "bad"])) assert client.get_group_id("missing") is None SequenceClient.reset(DummyResponse([{"name": "root", "subGroups": ["bad"]}])) assert client.list_group_names() == ["root"] def test_admin_set_attribute_and_error_edges(monkeypatch) -> None: client = keycloak.KeycloakAdminClient() monkeypatch.setattr(client, "find_user", lambda username: {"id": "u1"} if username == "alice" else None) monkeypatch.setattr(client, "get_user", lambda user_id: {"username": "alice", "attributes": {"old": ["1"]}}) updated: list[dict] = [] monkeypatch.setattr(client, "update_user", lambda user_id, payload: updated.append(payload)) client.set_user_attribute("alice", "mailu", "pw") assert updated[0]["attributes"]["mailu"] == ["pw"] monkeypatch.setattr(client, "get_user", lambda user_id: {"username": "alice", "attributes": "bad"}) client.set_user_attribute("alice", "next", "value") assert updated[-1]["attributes"]["next"] == ["value"] with pytest.raises(RuntimeError, match="user not found"): client.set_user_attribute("nobody", "mailu", "pw") monkeypatch.setattr(client, "find_user", lambda username: {"id": ""}) with pytest.raises(RuntimeError, match="user id missing"): client.set_user_attribute("alice", "mailu", "pw") monkeypatch.setattr(keycloak.httpx, "Client", SequenceClient) monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_ID", "admin-client") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_CLIENT_SECRET", "secret") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ADMIN_URL", "https://sso.example.dev") monkeypatch.setattr(keycloak.settings, "KEYCLOAK_REALM", "atlas") client = keycloak.KeycloakAdminClient() client._token = "tok" client._expires_at = 9999999999 SequenceClient.reset(DummyResponse({})) with pytest.raises(RuntimeError, match="created user id"): client.create_user({"username": "alice"}) SequenceClient.reset(DummyResponse([])) assert client.get_group_id("missing") is None SequenceClient.reset(DummyResponse({"not": "groups"})) assert client.list_group_names() == [] SequenceClient.reset(DummyResponse({"not": "groups"})) assert client.list_user_groups("u1") == [] def test_singletons_and_auth_guards(monkeypatch) -> None: app = create_app() monkeypatch.setattr(keycloak, "_OIDC", None) monkeypatch.setattr(keycloak, "_ADMIN", None) assert keycloak.oidc_client() is keycloak.oidc_client() assert keycloak.admin_client() is keycloak.admin_client() with app.test_request_context(): assert keycloak._extract_bearer_token() is None with app.test_request_context(headers={"Authorization": "Bearer tok"}): assert keycloak._extract_bearer_token() == "tok" with app.test_request_context(headers={"Authorization": "Token tok"}): assert keycloak._extract_bearer_token() is None with app.test_request_context(headers={"Authorization": "Bearer "}): assert keycloak._extract_bearer_token() is None assert keycloak._normalize_groups("bad") == [] assert keycloak._normalize_groups(["/dev", 7, ""]) == ["dev"] @keycloak.require_auth def protected(): return {"ok": True} monkeypatch.setattr( keycloak, "oidc_client", lambda: SimpleNamespace(verify=lambda token: {"preferred_username": "alice", "email": "a@example.dev", "groups": ["/dev"]}), ) with app.test_request_context(headers={"Authorization": "Bearer tok"}): assert protected() == {"ok": True} assert keycloak.g.keycloak_groups == ["dev"] monkeypatch.setattr( keycloak, "oidc_client", lambda: SimpleNamespace(verify=lambda token: (_ for _ in ()).throw(ValueError("bad"))), ) with app.test_request_context(headers={"Authorization": "Bearer tok"}): response, status = protected() assert status == 401 assert response.get_json()["error"] == "invalid token" with app.test_request_context(): response, status = protected() assert status == 401 assert response.get_json()["error"] == "missing bearer token" monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", False) with app.test_request_context(): ok, response = keycloak.require_portal_admin() assert not ok and response[1] == 503 ok, response = keycloak.require_account_access() assert not ok and response[1] == 503 monkeypatch.setattr(keycloak.settings, "KEYCLOAK_ENABLED", True) monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_USERS", {"alice"}) monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_GROUPS", {"admin"}) monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", {"dev"}) with app.test_request_context(): keycloak.g.keycloak_username = "alice" keycloak.g.keycloak_groups = ["dev"] assert keycloak.require_portal_admin() == (True, None) assert keycloak.require_account_access() == (True, None) monkeypatch.setattr(keycloak.settings, "PORTAL_ADMIN_USERS", set()) with app.test_request_context(): keycloak.g.keycloak_username = "carol" keycloak.g.keycloak_groups = ["admin"] assert keycloak.require_portal_admin() == (True, None) monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", set()) with app.test_request_context(): assert keycloak.require_account_access() == (True, None) monkeypatch.setattr(keycloak.settings, "ACCOUNT_ALLOWED_GROUPS", {"dev"}) with app.test_request_context(): keycloak.g.keycloak_groups = [] assert keycloak.require_account_access() == (True, None) with app.test_request_context(): keycloak.g.keycloak_username = "bob" keycloak.g.keycloak_groups = ["other"] assert keycloak.require_portal_admin()[0] is False assert keycloak.require_account_access()[0] is False