auth: allow Wolf OIDC audience
This commit is contained in:
parent
a1e197b4e3
commit
0bbcdfb7a7
@ -21,10 +21,10 @@ class AuthContext:
|
||||
class KeycloakOIDC:
|
||||
"""Validate Keycloak-issued OIDC tokens and return trusted claims."""
|
||||
|
||||
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
|
||||
def __init__(self, jwks_url: str, issuer: str, client_id: str, extra_client_ids: list[str] | None = None) -> None:
|
||||
self._jwks_url = jwks_url
|
||||
self._issuer = issuer
|
||||
self._client_id = client_id
|
||||
self._client_ids = [client_id, *(extra_client_ids or [])]
|
||||
self._jwks: dict[str, Any] | None = None
|
||||
self._jwks_fetched_at: float = 0.0
|
||||
self._jwks_ttl_sec = 300.0
|
||||
@ -77,7 +77,7 @@ class KeycloakOIDC:
|
||||
aud_list = [aud]
|
||||
elif isinstance(aud, list):
|
||||
aud_list = [item for item in aud if isinstance(item, str)]
|
||||
if azp != self._client_id and self._client_id not in aud_list:
|
||||
if not any(client_id and (azp == client_id or client_id in aud_list) for client_id in self._client_ids):
|
||||
raise ValueError("token not issued for expected client")
|
||||
|
||||
def verify(self, token: str) -> dict[str, Any]:
|
||||
@ -108,7 +108,12 @@ class Authenticator:
|
||||
"""Translate bearer tokens into Ariadne authorization context."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)
|
||||
self._oidc = KeycloakOIDC(
|
||||
settings.keycloak_jwks_url,
|
||||
settings.keycloak_issuer,
|
||||
settings.keycloak_client_id,
|
||||
settings.keycloak_extra_client_ids,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_groups(groups: Any) -> list[str]:
|
||||
|
||||
@ -47,6 +47,7 @@ class Settings:
|
||||
keycloak_url: str
|
||||
keycloak_realm: str
|
||||
keycloak_client_id: str
|
||||
keycloak_extra_client_ids: list[str]
|
||||
keycloak_issuer: str
|
||||
keycloak_jwks_url: str
|
||||
|
||||
|
||||
@ -16,6 +16,9 @@ def _keycloak_config() -> dict[str, Any]:
|
||||
"keycloak_url": keycloak_url,
|
||||
"keycloak_realm": keycloak_realm,
|
||||
"keycloak_client_id": keycloak_client_id,
|
||||
"keycloak_extra_client_ids": [
|
||||
item.strip() for item in _env("KEYCLOAK_EXTRA_CLIENT_IDS", "").split(",") if item.strip()
|
||||
],
|
||||
"keycloak_issuer": keycloak_issuer,
|
||||
"keycloak_jwks_url": keycloak_jwks_url,
|
||||
"keycloak_admin_url": _env("KEYCLOAK_ADMIN_URL", keycloak_url).rstrip("/"),
|
||||
|
||||
@ -47,6 +47,22 @@ def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
|
||||
kc.verify(token)
|
||||
|
||||
|
||||
def test_keycloak_verify_accepts_extra_client_audience(monkeypatch) -> None:
|
||||
token = _make_token()
|
||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal", ["wolf"])
|
||||
|
||||
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
||||
monkeypatch.setattr(kc, "_key_from_jwk", lambda key: "dummy")
|
||||
monkeypatch.setattr(
|
||||
jwt,
|
||||
"decode",
|
||||
lambda *args, **kwargs: {"azp": "wolf", "preferred_username": "robotuser", "groups": ["admin"]},
|
||||
)
|
||||
|
||||
claims = kc.verify(token)
|
||||
assert claims["preferred_username"] == "robotuser"
|
||||
|
||||
|
||||
def test_keycloak_verify_missing_token() -> None:
|
||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||
with pytest.raises(ValueError):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user