keycloak(atlas): harden realm settings job

This commit is contained in:
Brad Stein 2026-01-02 20:01:51 -03:00
parent 3f1780daed
commit 54d324f555

View File

@ -2,7 +2,7 @@
apiVersion: batch/v1 apiVersion: batch/v1
kind: Job kind: Job
metadata: metadata:
name: keycloak-realm-settings-6 name: keycloak-realm-settings-7
namespace: sso namespace: sso
spec: spec:
backoffLimit: 2 backoffLimit: 2
@ -64,6 +64,19 @@ spec:
admin_user = os.environ["KEYCLOAK_ADMIN_USER"] admin_user = os.environ["KEYCLOAK_ADMIN_USER"]
admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"] admin_password = os.environ["KEYCLOAK_ADMIN_PASSWORD"]
def http_json(method: str, url: str, token: str, payload=None):
data = None
headers = {"Authorization": f"Bearer {token}"}
if payload is not None:
data = json.dumps(payload).encode()
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read()
if not body:
return resp.status, None
return resp.status, json.loads(body.decode())
token_data = urllib.parse.urlencode( token_data = urllib.parse.urlencode(
{ {
"grant_type": "password", "grant_type": "password",
@ -82,9 +95,17 @@ spec:
token_body = json.loads(resp.read().decode()) token_body = json.loads(resp.read().decode())
access_token = token_body["access_token"] access_token = token_body["access_token"]
payload = { # Update realm settings safely by fetching the full realm representation first.
"resetPasswordAllowed": True, realm_url = f"{base_url}/admin/realms/{realm}"
"smtpServer": { status, realm_rep = http_json("GET", realm_url, access_token)
if status != 200 or not realm_rep:
raise SystemExit(f"Unable to fetch realm {realm} (status={status})")
realm_rep["resetPasswordAllowed"] = True
smtp = realm_rep.get("smtpServer") or {}
smtp.update(
{
"host": os.environ["KEYCLOAK_SMTP_HOST"], "host": os.environ["KEYCLOAK_SMTP_HOST"],
"port": os.environ["KEYCLOAK_SMTP_PORT"], "port": os.environ["KEYCLOAK_SMTP_PORT"],
"from": os.environ["KEYCLOAK_SMTP_FROM"], "from": os.environ["KEYCLOAK_SMTP_FROM"],
@ -94,19 +115,43 @@ spec:
"auth": "false", "auth": "false",
"starttls": "false", "starttls": "false",
"ssl": "false", "ssl": "false",
}, }
}
update_req = urllib.request.Request(
f"{base_url}/admin/realms/{realm}",
data=json.dumps(payload).encode(),
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
method="PUT",
) )
with urllib.request.urlopen(update_req, timeout=10) as resp: realm_rep["smtpServer"] = smtp
if resp.status not in (200, 204):
raise SystemExit(f"Unexpected response: {resp.status}") status, _ = http_json("PUT", realm_url, access_token, realm_rep)
if status not in (200, 204):
raise SystemExit(f"Unexpected realm update response: {status}")
# Disable Identity Provider Redirector in the browser flow for this realm.
status, executions = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/authentication/flows/browser/executions",
access_token,
)
if status == 200 and executions:
for ex in executions:
if ex.get("providerId") != "identity-provider-redirector":
continue
ex_id = ex.get("id")
if not ex_id:
continue
status, ex_rep = http_json(
"GET",
f"{base_url}/admin/realms/{realm}/authentication/executions/{ex_id}",
access_token,
)
if status != 200 or not ex_rep:
raise SystemExit(f"Unable to fetch browser execution {ex_id} (status={status})")
if ex_rep.get("requirement") == "DISABLED":
continue
ex_rep["requirement"] = "DISABLED"
status, _ = http_json(
"PUT",
f"{base_url}/admin/realms/{realm}/authentication/executions/{ex_id}",
access_token,
ex_rep,
)
if status not in (200, 204):
raise SystemExit(f"Unexpected execution update response for {ex_id}: {status}")
PY PY