keycloak: enforce bstein group membership

This commit is contained in:
Brad Stein 2026-01-16 17:36:07 -03:00
parent 401df4d68c
commit 1eb7d58259

View File

@ -2,7 +2,7 @@
apiVersion: batch/v1
kind: Job
metadata:
name: keycloak-user-overrides-6
name: keycloak-user-overrides-7
namespace: sso
spec:
backoffLimit: 0
@ -150,53 +150,62 @@ spec:
if not isinstance(attrs, dict):
attrs = {}
existing = attrs.get("mailu_email")
needs_update = True
if isinstance(existing, list) and existing and existing[0] == override_mailu_email:
raise SystemExit(0)
needs_update = False
if isinstance(existing, str) and existing == override_mailu_email:
raise SystemExit(0)
needs_update = False
attrs["mailu_email"] = [override_mailu_email]
status, _ = http_json(
"PUT",
f"{base_url}/admin/realms/{realm}/users/{user_id}",
access_token,
{"attributes": attrs},
)
if status not in (200, 204):
raise SystemExit(f"Unexpected user update response: {status}")
if needs_update:
attrs["mailu_email"] = [override_mailu_email]
status, _ = http_json(
"PUT",
f"{base_url}/admin/realms/{realm}/users/{user_id}",
access_token,
{"attributes": attrs},
)
if status not in (200, 204):
raise SystemExit(f"Unexpected user update response: {status}")
# Ensure the user is in the admin group for Vault access.
status, groups = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/groups?search=admin",
access_token,
)
if status != 200 or not isinstance(groups, list):
raise SystemExit("Unable to fetch groups")
group_id = ""
for item in groups:
if isinstance(item, dict) and item.get("name") == "admin":
group_id = item.get("id") or ""
break
if not group_id:
raise SystemExit("admin group not found")
status, memberships = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/users/{user_id}/groups",
access_token,
)
if status != 200 or not isinstance(memberships, list):
raise SystemExit("Unable to read user groups")
already = any(
isinstance(item, dict) and item.get("id") == group_id for item in memberships
)
if not already:
# Ensure the user is in the admin and planka-users groups.
def ensure_group(group_name: str) -> None:
status, groups = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/groups?search={urllib.parse.quote(group_name)}",
access_token,
)
if status != 200 or not isinstance(groups, list):
raise SystemExit("Unable to fetch groups")
group_id = ""
for item in groups:
if isinstance(item, dict) and item.get("name") == group_name:
group_id = item.get("id") or ""
break
if not group_id:
raise SystemExit(f"{group_name} group not found")
status, memberships = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/users/{user_id}/groups",
access_token,
)
if status != 200 or not isinstance(memberships, list):
raise SystemExit("Unable to read user groups")
already = any(
isinstance(item, dict) and item.get("id") == group_id for item in memberships
)
if already:
return
status, _ = http_json(
"PUT",
f"{base_url}/admin/realms/{realm}/users/{user_id}/groups/{group_id}",
access_token,
)
if status not in (200, 204):
raise SystemExit(f"Unexpected group update response: {status}")
raise SystemExit(
f"Unexpected group update response for {group_name}: {status}"
)
for group in ("admin", "planka-users"):
ensure_group(group)
PY
volumeMounts: