soteria/internal/k8s/jobs.go

373 lines
10 KiB
Go
Raw Normal View History

2026-01-31 03:34:34 -03:00
package k8s
import (
"context"
"errors"
"fmt"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/config"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
labelAppName = "app.kubernetes.io/name"
labelComponent = "app.kubernetes.io/component"
labelAction = "soteria.bstein.dev/action"
labelPVC = "soteria.bstein.dev/pvc"
)
func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) {
if req.Namespace == "" {
return "", "", errors.New("namespace is required")
}
if req.PVC == "" {
return "", "", errors.New("pvc is required")
}
if req.Snapshot {
return "", "", errors.New("snapshot support is not implemented yet")
}
jobName := jobName("backup", req.PVC)
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
if req.DryRun {
return jobName, secretName, nil
}
secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{
labelAppName: "soteria",
labelComponent: "restic",
labelAction: "backup",
labelPVC: req.PVC,
})
if err != nil {
return "", "", err
}
job := buildBackupJob(cfg, req, jobName, secretName)
created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
_ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
return "", "", err
}
if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil {
return jobName, secretName, err
}
return jobName, secretName, nil
}
func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) {
if req.Namespace == "" {
return "", "", errors.New("namespace is required")
}
snapshot := req.Snapshot
if snapshot == "" {
snapshot = "latest"
}
jobName := jobName("restore", snapshot)
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
if req.DryRun {
return jobName, secretName, nil
}
secret, err := c.copySecret(ctx, cfg.SecretNamespace, cfg.ResticSecretName, req.Namespace, secretName, map[string]string{
labelAppName: "soteria",
labelComponent: "restic",
labelAction: "restore",
})
if err != nil {
return "", "", err
}
job := buildRestoreJob(cfg, req, jobName, secretName, snapshot)
created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
_ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
return "", "", err
}
if err := c.bindSecretToJob(ctx, req.Namespace, secret.Name, created); err != nil {
return jobName, secretName, err
}
return jobName, secretName, nil
}
func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName string) *batchv1.Job {
labels := map[string]string{
labelAppName: "soteria",
labelComponent: "backup",
labelAction: "backup",
labelPVC: req.PVC,
}
command := backupCommand(cfg, req)
pod := corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "restic",
Image: cfg.ResticImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"/bin/sh", "-c"},
Args: []string{command},
Env: resticEnv(cfg, secretName),
VolumeMounts: []corev1.VolumeMount{
{Name: "data", MountPath: "/data", ReadOnly: true},
{Name: "cache", MountPath: "/cache"},
},
},
},
Volumes: []corev1.Volume{
{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: req.PVC,
ReadOnly: true,
},
},
},
{
Name: "cache",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
}
if cfg.WorkerServiceAccount != "" {
pod.ServiceAccountName = cfg.WorkerServiceAccount
}
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: req.Namespace,
Labels: labels,
},
Spec: batchv1.JobSpec{
BackoffLimit: int32Ptr(0),
TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: pod,
},
},
}
}
func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot string) *batchv1.Job {
labels := map[string]string{
labelAppName: "soteria",
labelComponent: "restore",
labelAction: "restore",
}
command := restoreCommand(snapshot)
pod := corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "restic",
Image: cfg.ResticImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"/bin/sh", "-c"},
Args: []string{command},
Env: resticEnv(cfg, secretName),
VolumeMounts: []corev1.VolumeMount{
{Name: "restore", MountPath: "/restore"},
{Name: "cache", MountPath: "/cache"},
},
},
},
Volumes: []corev1.Volume{
{
Name: "restore",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
{
Name: "cache",
VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
},
},
}
if req.TargetPVC != "" {
pod.Volumes[0] = corev1.Volume{
Name: "restore",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: req.TargetPVC,
ReadOnly: false,
},
},
}
labels[labelPVC] = req.TargetPVC
}
if cfg.WorkerServiceAccount != "" {
pod.ServiceAccountName = cfg.WorkerServiceAccount
}
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jobName,
Namespace: req.Namespace,
Labels: labels,
},
Spec: batchv1.JobSpec{
BackoffLimit: int32Ptr(0),
TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: pod,
},
},
}
}
func backupCommand(cfg *config.Config, req api.BackupRequest) string {
args := []string{"restic", "backup", "/data", "--tag", "soteria", "--tag", fmt.Sprintf("pvc=%s", req.PVC)}
for _, tag := range req.Tags {
tag = strings.TrimSpace(tag)
if tag == "" {
continue
}
args = append(args, "--tag", tag)
}
args = append(args, cfg.ResticBackupArgs...)
cmd := strings.Join(args, " ")
if len(cfg.ResticForgetArgs) > 0 {
forget := strings.Join(append([]string{"restic", "forget"}, cfg.ResticForgetArgs...), " ")
cmd = fmt.Sprintf("%s && %s", cmd, forget)
}
return "set -euo pipefail; " + cmd
}
func restoreCommand(snapshot string) string {
return fmt.Sprintf("set -euo pipefail; restic restore %s --target /restore", snapshot)
}
func resticEnv(cfg *config.Config, secretName string) []corev1.EnvVar {
env := []corev1.EnvVar{
{Name: "RESTIC_REPOSITORY", Value: cfg.ResticRepository},
{Name: "RESTIC_CACHE_DIR", Value: "/cache"},
{
Name: "AWS_ACCESS_KEY_ID",
ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_ACCESS_KEY_ID"}},
},
{
Name: "AWS_SECRET_ACCESS_KEY",
ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "AWS_SECRET_ACCESS_KEY"}},
},
{
Name: "RESTIC_PASSWORD",
ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, Key: "RESTIC_PASSWORD"}},
},
}
if cfg.S3Endpoint != "" {
env = append(env, corev1.EnvVar{Name: "RESTIC_S3_ENDPOINT", Value: cfg.S3Endpoint})
env = append(env, corev1.EnvVar{Name: "AWS_ENDPOINT", Value: cfg.S3Endpoint})
}
if cfg.S3Region != "" {
env = append(env, corev1.EnvVar{Name: "AWS_REGION", Value: cfg.S3Region})
env = append(env, corev1.EnvVar{Name: "AWS_DEFAULT_REGION", Value: cfg.S3Region})
}
return env
}
func (c *Client) copySecret(ctx context.Context, srcNS, srcName, dstNS, dstName string, labels map[string]string) (*corev1.Secret, error) {
secret, err := c.Clientset.CoreV1().Secrets(srcNS).Get(ctx, srcName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("read secret %s/%s: %w", srcNS, srcName, err)
}
copy := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: dstName,
Namespace: dstNS,
Labels: labels,
},
Type: secret.Type,
Data: secret.Data,
}
created, err := c.Clientset.CoreV1().Secrets(dstNS).Create(ctx, copy, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("create secret %s/%s: %w", dstNS, dstName, err)
}
return created, nil
}
func (c *Client) bindSecretToJob(ctx context.Context, namespace, secretName string, job *batchv1.Job) error {
secret, err := c.Clientset.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{})
if err != nil {
return err
}
controller := true
secret.OwnerReferences = append(secret.OwnerReferences, metav1.OwnerReference{
APIVersion: "batch/v1",
Kind: "Job",
Name: job.Name,
UID: job.UID,
Controller: &controller,
})
_, err = c.Clientset.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{})
return err
}
func jobName(action, suffix string) string {
base := sanitizeName(fmt.Sprintf("soteria-%s-%s", action, suffix))
timestamp := time.Now().UTC().Format("20060102-150405")
name := fmt.Sprintf("%s-%s", base, timestamp)
if len(name) <= 63 {
return name
}
trimmed := base
maxBase := 63 - len(timestamp) - 1
if maxBase < 1 {
maxBase = 1
}
if len(trimmed) > maxBase {
trimmed = trimmed[:maxBase]
}
return fmt.Sprintf("%s-%s", trimmed, timestamp)
}
func sanitizeName(value string) string {
value = strings.ToLower(value)
value = strings.ReplaceAll(value, "_", "-")
value = strings.ReplaceAll(value, ".", "-")
value = strings.ReplaceAll(value, " ", "-")
value = strings.Trim(value, "-")
return value
}
func int32Ptr(val int32) *int32 {
return &val
}