keycloak: robust policy lookup for token exchange job

This commit is contained in:
Brad Stein 2026-01-03 15:50:43 -03:00
parent 3f19d01d00
commit e73baa6ecd

View File

@ -2,7 +2,7 @@
apiVersion: batch/v1
kind: Job
metadata:
name: keycloak-portal-e2e-token-exchange-permissions-4
name: keycloak-portal-e2e-token-exchange-permissions-5
namespace: sso
spec:
backoffLimit: 6
@ -163,17 +163,35 @@ spec:
raise SystemExit(f"Target client permissions missing token-exchange scope (have: {keys})")
policy_name = "test-portal-e2e-token-exchange"
status, policies = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy/search?name={urllib.parse.quote(policy_name)}&fields=id,name,type,config",
token,
)
policy = None
if status == 200 and isinstance(policies, list):
for item in policies:
if isinstance(item, dict) and item.get("name") == policy_name:
policy = item
break
policy_base_url = f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy"
def find_policy_by_name(name: str):
urls = [
f"{policy_base_url}/search?name={urllib.parse.quote(name)}&fields=id,name,type,config",
f"{policy_base_url}/search?name={urllib.parse.quote(name)}",
policy_base_url,
]
for url in urls:
st, body = http_json("GET", url, token)
if st != 200:
continue
items = None
if isinstance(body, list):
items = body
elif isinstance(body, dict):
for key in ("policies", "items", "data"):
value = body.get(key)
if isinstance(value, list):
items = value
break
if not isinstance(items, list):
continue
for item in items:
if isinstance(item, dict) and item.get("name") == name and item.get("id"):
return item
return None
policy = find_policy_by_name(policy_name)
if policy is None:
create_rep: dict[str, Any] = {
@ -185,27 +203,18 @@ spec:
}
status, created = http_json(
"POST",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy",
policy_base_url,
token,
create_rep,
)
if status == 409:
status, policies = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/clients/{rm_uuid}/authz/resource-server/policy/search?name={urllib.parse.quote(policy_name)}&fields=id,name,type,config",
token,
)
if status == 200 and isinstance(policies, list):
for item in policies:
if isinstance(item, dict) and item.get("name") == policy_name:
policy = item
break
if status == 201 and isinstance(created, dict) and created.get("id"):
policy = created
elif status == 409:
policy = find_policy_by_name(policy_name)
if policy is None:
raise SystemExit(f"Policy {policy_name!r} exists but could not be retrieved")
else:
if status != 201 or not isinstance(created, dict) or not created.get("id"):
raise SystemExit(f"Failed creating policy {policy_name!r} (status={status}) resp={created}")
policy = created
raise SystemExit(f"Failed creating policy {policy_name!r} (status={status}) resp={created}")
policy_id = policy.get("id")
if not isinstance(policy_id, str) or not policy_id: