#!/usr/bin/env python3 import base64 import json import os import secrets import ssl import sys import urllib.error import urllib.request from pathlib import Path def read_file(path: Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8").strip() def require_value(label: str, value: str) -> None: if not value: raise RuntimeError(f"missing {label}") def http_json(method: str, url: str, headers=None, payload=None, context=None): data = None if payload is not None: data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers=headers or {}, method=method) with urllib.request.urlopen(req, timeout=15, context=context) as resp: body = resp.read() if not body: return resp.status, None return resp.status, json.loads(body.decode()) def k8s_context() -> ssl.SSLContext: ca_path = Path("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") if ca_path.exists(): return ssl.create_default_context(cafile=str(ca_path)) return ssl.create_default_context() def k8s_api_url(path: str) -> str: host = os.environ.get("KUBERNETES_SERVICE_HOST") port = os.environ.get("KUBERNETES_SERVICE_PORT", "443") if not host: raise RuntimeError("missing kubernetes service host") return f"https://{host}:{port}{path}" def k8s_get_secret(namespace: str, name: str, token: str): try: _, body = http_json( "GET", k8s_api_url(f"/api/v1/namespaces/{namespace}/secrets/{name}"), headers={"Authorization": f"Bearer {token}"}, context=k8s_context(), ) except urllib.error.HTTPError as exc: if exc.code == 404: return None raise return body def k8s_create_secret(namespace: str, name: str, token: str, string_data: dict): payload = { "apiVersion": "v1", "kind": "Secret", "metadata": {"name": name, "namespace": namespace}, "type": "Opaque", "stringData": string_data, } try: status, _ = http_json( "POST", k8s_api_url(f"/api/v1/namespaces/{namespace}/secrets"), headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json", }, payload=payload, context=k8s_context(), ) except urllib.error.HTTPError as exc: if exc.code == 409: return raise if status not in (200, 201): raise RuntimeError(f"k8s secret create failed for {name} (status {status})") def decode_secret_value(value: str) -> str: if not value: return "" return base64.b64decode(value.encode()).decode("utf-8") def vault_login(vault_addr: str, role: str, jwt: str) -> str: status, body = http_json( "POST", f"{vault_addr}/v1/auth/kubernetes/login", headers={"Content-Type": "application/json"}, payload={"jwt": jwt, "role": role}, ) if status != 200 or not body: raise RuntimeError("vault login failed") token = body.get("auth", {}).get("client_token") if not token: raise RuntimeError("vault login returned no token") return token def vault_read(vault_addr: str, token: str, path: str): try: status, body = http_json( "GET", f"{vault_addr}/v1/kv/data/atlas/{path}", headers={"X-Vault-Token": token}, ) except urllib.error.HTTPError as exc: if exc.code == 404: return {} raise if status != 200 or not body: return {} return body.get("data", {}).get("data", {}) or {} def vault_write(vault_addr: str, token: str, path: str, data: dict): payload = {"data": data} status, _ = http_json( "POST", f"{vault_addr}/v1/kv/data/atlas/{path}", headers={"X-Vault-Token": token, "Content-Type": "application/json"}, payload=payload, ) if status not in (200, 204): raise RuntimeError(f"vault write failed for {path} (status {status})") def ensure_firefly_db(vault_addr: str, token: str): base = Path("/secrets/firefly-db") host = read_file(base / "DB_HOST") or read_file(base / "DB_HOSTNAME") port = read_file(base / "DB_PORT") db_name = read_file(base / "DB_DATABASE") or read_file(base / "DB_NAME") user = read_file(base / "DB_USERNAME") or read_file(base / "DB_USER") password = read_file(base / "DB_PASSWORD") or read_file(base / "DB_PASS") require_value("firefly-db/DB_HOST", host) require_value("firefly-db/DB_PORT", port) require_value("firefly-db/DB_DATABASE", db_name) require_value("firefly-db/DB_USERNAME", user) require_value("firefly-db/DB_PASSWORD", password) vault_write( vault_addr, token, "finance/firefly-db", { "DB_HOST": host, "DB_PORT": port, "DB_DATABASE": db_name, "DB_USERNAME": user, "DB_PASSWORD": password, }, ) def ensure_firefly_secrets(vault_addr: str, token: str): current = vault_read(vault_addr, token, "finance/firefly-secrets") app_key = current.get("APP_KEY") if not app_key: app_key = "base64:" + base64.b64encode(secrets.token_bytes(32)).decode() cron_token = current.get("STATIC_CRON_TOKEN") if not cron_token: cron_token = secrets.token_urlsafe(32) vault_write( vault_addr, token, "finance/firefly-secrets", {"APP_KEY": app_key, "STATIC_CRON_TOKEN": cron_token}, ) def ensure_actual_db(vault_addr: str, token: str): base = Path("/secrets/actualbudget-db") if not base.exists(): return host = read_file(base / "DB_HOST") or read_file(base / "DB_HOSTNAME") port = read_file(base / "DB_PORT") db_name = read_file(base / "DB_DATABASE") or read_file(base / "DB_NAME") user = read_file(base / "DB_USERNAME") or read_file(base / "DB_USER") password = read_file(base / "DB_PASSWORD") or read_file(base / "DB_PASS") if not any([host, port, db_name, user, password]): return require_value("actualbudget-db/DB_HOST", host) require_value("actualbudget-db/DB_PORT", port) require_value("actualbudget-db/DB_DATABASE", db_name) require_value("actualbudget-db/DB_USERNAME", user) require_value("actualbudget-db/DB_PASSWORD", password) vault_write( vault_addr, token, "finance/actual-db", { "DB_HOST": host, "DB_PORT": port, "DB_DATABASE": db_name, "DB_USERNAME": user, "DB_PASSWORD": password, }, ) def ensure_actual_encryption(vault_addr: str, token: str, sa_token: str): namespace = os.environ.get("FINANCE_NAMESPACE", "finance") secret_name = os.environ.get("ACTUAL_BUDGET_PVC_NAME", "actual-budget-data-encrypted") if not sa_token: raise RuntimeError("missing service account token for k8s") vault_data = vault_read(vault_addr, token, "finance/actual-encryption") vault_key = vault_data.get("CRYPTO_KEY_VALUE", "") k8s_secret = k8s_get_secret(namespace, secret_name, sa_token) k8s_key = "" if k8s_secret: data = k8s_secret.get("data", {}) or {} k8s_key = decode_secret_value(data.get("CRYPTO_KEY_VALUE", "")) if vault_key and k8s_key and vault_key != k8s_key: raise RuntimeError("actual encryption key mismatch between vault and k8s") key = vault_key or k8s_key provider = "secret" if not key: key = secrets.token_urlsafe(48) vault_write( vault_addr, token, "finance/actual-encryption", {"CRYPTO_KEY_VALUE": key, "CRYPTO_KEY_PROVIDER": provider}, ) elif not vault_key: vault_write( vault_addr, token, "finance/actual-encryption", {"CRYPTO_KEY_VALUE": key, "CRYPTO_KEY_PROVIDER": provider}, ) if not k8s_secret: k8s_create_secret( namespace, secret_name, sa_token, {"CRYPTO_KEY_VALUE": key, "CRYPTO_KEY_PROVIDER": provider}, ) def main() -> int: vault_addr = os.environ.get("VAULT_ADDR", "http://vault.vault.svc.cluster.local:8200") vault_role = os.environ.get("VAULT_ROLE", "finance-secrets") sa_token = read_file(Path("/var/run/secrets/kubernetes.io/serviceaccount/token")) if not sa_token: raise RuntimeError("missing service account token") token = vault_login(vault_addr, vault_role, sa_token) ensure_firefly_db(vault_addr, token) ensure_firefly_secrets(vault_addr, token) ensure_actual_db(vault_addr, token) ensure_actual_encryption(vault_addr, token, sa_token) print("finance secrets ensured") return 0 if __name__ == "__main__": try: sys.exit(main()) except Exception as exc: print(f"finance secrets ensure failed: {exc}", file=sys.stderr) sys.exit(1)