From 9a262742429bc617a2f21b2ee94686533c86b951 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 13 Apr 2026 12:51:19 -0300 Subject: [PATCH] backup: add restic size accounting and dedupe controls --- internal/api/types.go | 7 + internal/k8s/jobs.go | 185 +++++++++++++++++++---- internal/server/server.go | 264 ++++++++++++++++++++++++++++++--- internal/server/server_test.go | 204 ++++++++++++++++++++++++- web/src/App.tsx | 34 +++-- 5 files changed, 630 insertions(+), 64 deletions(-) diff --git a/internal/api/types.go b/internal/api/types.go index 678df2c..2aa9234 100644 --- a/internal/api/types.go +++ b/internal/api/types.go @@ -6,6 +6,7 @@ type BackupRequest struct { Tags []string `json:"tags,omitempty"` Snapshot bool `json:"snapshot"` DryRun bool `json:"dry_run"` + Dedupe *bool `json:"dedupe,omitempty"` } type BackupResponse struct { @@ -17,6 +18,7 @@ type BackupResponse struct { Secret string `json:"secret,omitempty"` RequestedBy string `json:"requested_by,omitempty"` DryRun bool `json:"dry_run"` + Dedupe bool `json:"dedupe"` } type RestoreTestRequest struct { @@ -24,6 +26,7 @@ type RestoreTestRequest struct { PVC string `json:"pvc,omitempty"` Snapshot string `json:"snapshot,omitempty"` BackupURL string `json:"backup_url,omitempty"` + Repository string `json:"repository,omitempty"` TargetNamespace string `json:"target_namespace,omitempty"` TargetPVC string `json:"target_pvc,omitempty"` DryRun bool `json:"dry_run"` @@ -108,6 +111,7 @@ type BackupPolicy struct { PVC string `json:"pvc,omitempty"` IntervalHours float64 `json:"interval_hours"` Enabled bool `json:"enabled"` + Dedupe bool `json:"dedupe"` CreatedAt string `json:"created_at,omitempty"` UpdatedAt string `json:"updated_at,omitempty"` } @@ -117,6 +121,7 @@ type BackupPolicyUpsertRequest struct { PVC string `json:"pvc,omitempty"` IntervalHours float64 `json:"interval_hours"` Enabled *bool `json:"enabled,omitempty"` + Dedupe *bool `json:"dedupe,omitempty"` } type BackupPolicyListResponse struct { @@ -126,6 +131,7 @@ type BackupPolicyListResponse struct { type NamespaceBackupRequest struct { Namespace string `json:"namespace"` DryRun bool `json:"dry_run"` + Dedupe *bool `json:"dedupe,omitempty"` } type NamespaceBackupResult struct { @@ -142,6 +148,7 @@ type NamespaceBackupResponse struct { RequestedBy string `json:"requested_by,omitempty"` Driver string `json:"driver"` DryRun bool `json:"dry_run"` + Dedupe bool `json:"dedupe"` Total int `json:"total"` Succeeded int `json:"succeeded"` Failed int `json:"failed"` diff --git a/internal/k8s/jobs.go b/internal/k8s/jobs.go index 5132b8e..e03925d 100644 --- a/internal/k8s/jobs.go +++ b/internal/k8s/jobs.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sort" + "strconv" "strings" "time" @@ -17,16 +18,20 @@ import ( ) const ( - labelAppName = "app.kubernetes.io/name" - labelComponent = "app.kubernetes.io/component" - labelAction = "soteria.bstein.dev/action" - labelPVC = "soteria.bstein.dev/pvc" + labelAppName = "app.kubernetes.io/name" + labelComponent = "app.kubernetes.io/component" + labelAction = "soteria.bstein.dev/action" + labelPVC = "soteria.bstein.dev/pvc" + annotationResticRepository = "soteria.bstein.dev/restic-repository" + annotationDedupeEnabled = "soteria.bstein.dev/dedupe-enabled" ) type BackupJobSummary struct { Name string Namespace string PVC string + Repository string + DedupeEnabled bool CreatedAt time.Time CompletionTime time.Time State string @@ -52,6 +57,42 @@ func (c *Client) ListBackupJobs(ctx context.Context, namespace string) ([]Backup return out, nil } +func (c *Client) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) { + selector := fmt.Sprintf("job-name=%s", jobName) + pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) + if err != nil { + return "", fmt.Errorf("list pods for backup job %s/%s: %w", namespace, jobName, err) + } + if len(pods.Items) == 0 { + return "", fmt.Errorf("no pod found for backup job %s/%s", namespace, jobName) + } + + sort.Slice(pods.Items, func(i, j int) bool { + left := pods.Items[i].Status.StartTime + right := pods.Items[j].Status.StartTime + switch { + case left != nil && right != nil: + if left.Time.Equal(right.Time) { + return pods.Items[i].Name > pods.Items[j].Name + } + return left.Time.After(right.Time) + case left != nil: + return true + case right != nil: + return false + default: + return pods.Items[i].CreationTimestamp.Time.After(pods.Items[j].CreationTimestamp.Time) + } + }) + + podName := pods.Items[0].Name + raw, err := c.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx) + if err != nil { + return "", fmt.Errorf("read logs for backup job pod %s/%s: %w", namespace, podName, err) + } + return string(raw), nil +} + func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) { selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc) jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector}) @@ -71,12 +112,17 @@ func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string func summarizeBackupJob(job batchv1.Job, pvc string) BackupJobSummary { summary := BackupJobSummary{ - Name: job.Name, - Namespace: job.Namespace, - PVC: pvc, - CreatedAt: job.CreationTimestamp.Time, - State: "Pending", + Name: job.Name, + Namespace: job.Namespace, + PVC: pvc, + DedupeEnabled: true, + CreatedAt: job.CreationTimestamp.Time, + State: "Pending", } + if raw := strings.TrimSpace(job.Annotations[annotationResticRepository]); raw != "" { + summary.Repository = raw + } + summary.DedupeEnabled = parseBoolWithDefault(job.Annotations[annotationDedupeEnabled], true) if job.Status.CompletionTime != nil { summary.CompletionTime = job.Status.CompletionTime.Time } @@ -121,6 +167,8 @@ func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req ap jobName := jobName("backup", req.PVC) secretName := fmt.Sprintf("soteria-%s-restic", jobName) + dedupeEnabled := dedupeEnabled(req.Dedupe) + repository := resticRepositoryForBackup(cfg.ResticRepository, req.Namespace, req.PVC, dedupeEnabled) if req.DryRun { return jobName, secretName, nil @@ -136,7 +184,7 @@ func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req ap return "", "", err } - job := buildBackupJob(cfg, req, jobName, secretName) + job := buildBackupJob(cfg, req, jobName, secretName, repository, dedupeEnabled) if nodeName, err := c.resolvePVCMountedNode(ctx, req.Namespace, req.PVC); err == nil && nodeName != "" { job.Spec.Template.Spec.NodeName = nodeName } @@ -192,6 +240,10 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a jobName := jobName("restore", snapshot) secretName := fmt.Sprintf("soteria-%s-restic", jobName) + repository := strings.TrimSpace(req.Repository) + if repository == "" { + repository = cfg.ResticRepository + } if req.DryRun { return jobName, secretName, nil @@ -206,7 +258,7 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a return "", "", err } - job := buildRestoreJob(cfg, req, jobName, secretName, snapshot) + job := buildRestoreJob(cfg, req, jobName, secretName, snapshot, repository) created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { _ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) @@ -220,13 +272,17 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a return jobName, secretName, nil } -func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName string) *batchv1.Job { +func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName, repository string, dedupeEnabled bool) *batchv1.Job { labels := map[string]string{ labelAppName: "soteria", labelComponent: "backup", labelAction: "backup", labelPVC: req.PVC, } + annotations := map[string]string{ + annotationResticRepository: repository, + annotationDedupeEnabled: strconv.FormatBool(dedupeEnabled), + } command := backupCommand(cfg, req) @@ -239,7 +295,7 @@ func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretNa ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{"/bin/sh", "-c"}, Args: []string{command}, - Env: resticEnv(cfg, secretName), + Env: resticEnv(cfg, secretName, repository), VolumeMounts: []corev1.VolumeMount{ {Name: "data", MountPath: "/data", ReadOnly: true}, {Name: "cache", MountPath: "/cache"}, @@ -274,27 +330,31 @@ func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretNa return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - Namespace: req.Namespace, - Labels: labels, + Name: jobName, + Namespace: req.Namespace, + Labels: labels, + Annotations: annotations, }, Spec: batchv1.JobSpec{ BackoffLimit: int32Ptr(0), TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds), Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations}, Spec: pod, }, }, } } -func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot string) *batchv1.Job { +func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot, repository string) *batchv1.Job { labels := map[string]string{ labelAppName: "soteria", labelComponent: "restore", labelAction: "restore", } + annotations := map[string]string{ + annotationResticRepository: repository, + } command := restoreCommand(snapshot) @@ -307,7 +367,7 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se ImagePullPolicy: corev1.PullIfNotPresent, Command: []string{"/bin/sh", "-c"}, Args: []string{command}, - Env: resticEnv(cfg, secretName), + Env: resticEnv(cfg, secretName, repository), VolumeMounts: []corev1.VolumeMount{ {Name: "restore", MountPath: "/restore"}, {Name: "cache", MountPath: "/cache"}, @@ -350,15 +410,16 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - Namespace: req.Namespace, - Labels: labels, + Name: jobName, + Namespace: req.Namespace, + Labels: labels, + Annotations: annotations, }, Spec: batchv1.JobSpec{ BackoffLimit: int32Ptr(0), TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds), Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: labels}, + ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations}, Spec: pod, }, }, @@ -366,7 +427,16 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se } func backupCommand(cfg *config.Config, req api.BackupRequest) string { - args := []string{"restic", "backup", "/data", "--tag", "soteria", "--tag", fmt.Sprintf("pvc=%s", req.PVC)} + mode := "on" + if !dedupeEnabled(req.Dedupe) { + mode = "off" + } + args := []string{ + "restic", "backup", "/data", + "--tag", "soteria", + "--tag", fmt.Sprintf("pvc=%s", req.PVC), + "--tag", fmt.Sprintf("dedupe=%s", mode), + } for _, tag := range req.Tags { tag = strings.TrimSpace(tag) if tag == "" { @@ -393,9 +463,12 @@ func restoreCommand(snapshot string) string { ) } -func resticEnv(cfg *config.Config, secretName string) []corev1.EnvVar { +func resticEnv(cfg *config.Config, secretName, repository string) []corev1.EnvVar { + if strings.TrimSpace(repository) == "" { + repository = cfg.ResticRepository + } env := []corev1.EnvVar{ - {Name: "RESTIC_REPOSITORY", Value: cfg.ResticRepository}, + {Name: "RESTIC_REPOSITORY", Value: repository}, {Name: "RESTIC_CACHE_DIR", Value: "/cache"}, { Name: "AWS_ACCESS_KEY_ID", @@ -493,6 +566,66 @@ func sanitizeName(value string) string { return value } +func dedupeEnabled(raw *bool) bool { + if raw == nil { + return true + } + return *raw +} + +func resticRepositoryForBackup(base, namespace, pvc string, dedupe bool) string { + if dedupe { + return strings.TrimSpace(base) + } + ns := sanitizeRepositorySegment(namespace) + pvcName := sanitizeRepositorySegment(pvc) + suffix := strings.Trim(strings.Join([]string{"isolated", ns, pvcName}, "/"), "/") + return appendRepositoryPath(base, suffix) +} + +func sanitizeRepositorySegment(value string) string { + sanitized := sanitizeName(value) + if sanitized == "" { + return "unknown" + } + return sanitized +} + +func appendRepositoryPath(base, suffix string) string { + base = strings.TrimSpace(base) + suffix = strings.Trim(suffix, "/") + if base == "" || suffix == "" { + return base + } + + backendPrefix := "" + location := base + if idx := strings.Index(base, ":"); idx > 0 { + backendPrefix = base[:idx+1] + location = base[idx+1:] + } + location = strings.TrimRight(location, "/") + if location == "" { + return base + } + return backendPrefix + location + "/" + suffix +} + +func parseBoolWithDefault(raw string, fallback bool) bool { + value := strings.ToLower(strings.TrimSpace(raw)) + if value == "" { + return fallback + } + switch value { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return fallback + } +} + func int32Ptr(val int32) *int32 { return &val } diff --git a/internal/server/server.go b/internal/server/server.go index 0c64517..8758088 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -2,12 +2,14 @@ package server import ( "context" + "encoding/base64" "encoding/json" "fmt" "log" "math" "net/http" "net/url" + "regexp" "sort" "strconv" "strings" @@ -30,6 +32,7 @@ type kubeClient interface { CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) ListBackupJobs(ctx context.Context, namespace string) ([]k8s.BackupJobSummary, error) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error) + ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) @@ -48,18 +51,20 @@ type longhornClient interface { } type Server struct { - cfg *config.Config - client kubeClient - longhorn longhornClient - metrics *telemetry - handler http.Handler - ui *uiRenderer - policyMu sync.RWMutex - policies map[string]api.BackupPolicy - runMu sync.Mutex - running bool - b2Mu sync.RWMutex - b2Usage api.B2UsageResponse + cfg *config.Config + client kubeClient + longhorn longhornClient + metrics *telemetry + handler http.Handler + ui *uiRenderer + policyMu sync.RWMutex + policies map[string]api.BackupPolicy + runMu sync.Mutex + running bool + b2Mu sync.RWMutex + b2Usage api.B2UsageResponse + jobUsage map[string]resticJobUsageCacheEntry + jobUsageMu sync.RWMutex } type authIdentity struct { @@ -77,6 +82,19 @@ const ( policySecretKey = "policies.json" defaultPolicyHours = 24.0 maxPolicyIntervalHrs = 24 * 365 + maxUsageSampleJobs = 20 + resticSelectorPrefix = "restic-latest:" +) + +type resticJobUsageCacheEntry struct { + Known bool + Bytes float64 + CheckedAt time.Time +} + +var ( + resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`) + resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`) ) func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { @@ -87,6 +105,7 @@ func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { metrics: newTelemetry(), ui: newUIRenderer(), policies: map[string]api.BackupPolicy{}, + jobUsage: map[string]resticJobUsageCacheEntry{}, } s.handler = http.HandlerFunc(s.route) return s @@ -287,7 +306,7 @@ func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { Namespace: namespace, PVC: pvcName, Volume: volumeName, - Backups: buildResticBackupRecords(jobs), + Backups: buildResticBackupRecords(jobs, s.cfg.ResticRepository), }) default: writeError(w, http.StatusBadRequest, "unsupported backup driver") @@ -430,11 +449,13 @@ func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { } requester := currentRequester(r.Context()) + resolvedDedupe := dedupeDefault(req.Dedupe) response := api.NamespaceBackupResponse{ Namespace: req.Namespace, RequestedBy: requester, Driver: s.cfg.BackupDriver, DryRun: req.DryRun, + Dedupe: resolvedDedupe, Results: make([]api.NamespaceBackupResult, 0, len(pvcs)), } @@ -443,6 +464,7 @@ func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { Namespace: req.Namespace, PVC: pvc.Name, DryRun: req.DryRun, + Dedupe: boolPtr(resolvedDedupe), } result, status, execErr := s.executeBackup(r.Context(), backupReq, requester) s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status) @@ -614,6 +636,8 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque if req.Namespace == "" || req.PVC == "" { return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required") } + resolvedDedupe := dedupeDefault(req.Dedupe) + req.Dedupe = boolPtr(resolvedDedupe) switch s.cfg.BackupDriver { case "longhorn": @@ -630,6 +654,7 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque Namespace: req.Namespace, RequestedBy: requester, DryRun: req.DryRun, + Dedupe: resolvedDedupe, } if req.DryRun { return response, "dry_run", nil @@ -663,6 +688,7 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, + Dedupe: resolvedDedupe, }, result, nil default: return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") @@ -745,6 +771,14 @@ func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, } return response, "success", nil case "restic": + if repo, snapshot, ok := decodeResticSelector(req.BackupURL); ok { + if strings.TrimSpace(req.Snapshot) == "" { + req.Snapshot = snapshot + } + if strings.TrimSpace(req.Repository) == "" { + req.Repository = repo + } + } jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req) if err != nil { return api.RestoreTestResponse{}, "backend_error", err @@ -1008,6 +1042,25 @@ func (s *Server) enrichPVCInventory( } entry.ActiveBackups = active entry.CompletedBackups = len(completed) + totalStoredBytes := 0.0 + storedSamples := 0 + for index, job := range completed { + if index >= maxUsageSampleJobs { + break + } + storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name) + if !ok { + continue + } + if index == 0 { + entry.LastBackupSizeBytes = storedBytes + } + totalStoredBytes += storedBytes + storedSamples++ + } + if storedSamples > 0 { + entry.TotalBackupSizeBytes = totalStoredBytes + } if len(completed) == 0 { entry.Healthy = false switch { @@ -1058,6 +1111,98 @@ func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) { }) } +func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) { + key := namespace + "/" + jobName + + s.jobUsageMu.RLock() + cached, ok := s.jobUsage[key] + s.jobUsageMu.RUnlock() + if ok && time.Since(cached.CheckedAt) < 15*time.Minute { + return cached.Bytes, cached.Known + } + + logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName) + entry := resticJobUsageCacheEntry{ + Known: false, + Bytes: 0, + CheckedAt: time.Now().UTC(), + } + if err == nil { + if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed { + entry.Known = true + entry.Bytes = parsedBytes + } + } + + s.jobUsageMu.Lock() + if s.jobUsage == nil { + s.jobUsage = map[string]resticJobUsageCacheEntry{} + } + s.jobUsage[key] = entry + s.jobUsageMu.Unlock() + + return entry.Bytes, entry.Known +} + +func parseResticStoredBytes(logBody string) (float64, bool) { + if logBody == "" { + return 0, false + } + matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1) + if len(matches) > 0 { + last := matches[len(matches)-1] + if len(last) > 1 { + if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil { + return value, true + } + } + } + + textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1) + if len(textMatches) == 0 { + return 0, false + } + last := textMatches[len(textMatches)-1] + if len(last) < 2 { + return 0, false + } + return parseHumanByteSize(last[1]) +} + +func parseHumanByteSize(raw string) (float64, bool) { + parts := strings.Fields(strings.TrimSpace(raw)) + if len(parts) < 2 { + return 0, false + } + value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64) + if err != nil { + return 0, false + } + unit := strings.ToUpper(strings.TrimSpace(parts[1])) + switch unit { + case "B": + return value, true + case "KIB": + return value * 1024, true + case "MIB": + return value * 1024 * 1024, true + case "GIB": + return value * 1024 * 1024 * 1024, true + case "TIB": + return value * 1024 * 1024 * 1024 * 1024, true + case "KB": + return value * 1000, true + case "MB": + return value * 1000 * 1000, true + case "GB": + return value * 1000 * 1000 * 1000, true + case "TB": + return value * 1000 * 1000 * 1000 * 1000, true + default: + return 0, false + } +} + func (s *Server) refreshTelemetry(ctx context.Context) { refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() @@ -1102,7 +1247,11 @@ func (s *Server) runPolicyCycle(ctx context.Context) { } } - effectiveIntervals := map[string]float64{} + type effectivePolicy struct { + IntervalHours float64 + Dedupe bool + } + effectivePolicies := map[string]effectivePolicy{} for _, policy := range policies { matches := []api.PVCInventory{} if policy.PVC != "" { @@ -1115,19 +1264,22 @@ func (s *Server) runPolicyCycle(ctx context.Context) { for _, pvc := range matches { key := pvc.Namespace + "/" + pvc.PVC - current, exists := effectiveIntervals[key] - if !exists || policy.IntervalHours < current { - effectiveIntervals[key] = policy.IntervalHours + current, exists := effectivePolicies[key] + if !exists || policy.IntervalHours < current.IntervalHours { + effectivePolicies[key] = effectivePolicy{ + IntervalHours: policy.IntervalHours, + Dedupe: policy.Dedupe, + } } } } - for key, intervalHours := range effectiveIntervals { + for key, effective := range effectivePolicies { pvc, ok := pvcMap[key] if !ok { continue } - if !backupDue(pvc.LastBackupAt, intervalHours) { + if !backupDue(pvc.LastBackupAt, effective.IntervalHours) { s.metrics.RecordPolicyBackup("not_due") continue } @@ -1136,6 +1288,7 @@ func (s *Server) runPolicyCycle(ctx context.Context) { Namespace: pvc.Namespace, PVC: pvc.PVC, DryRun: false, + Dedupe: boolPtr(effective.Dedupe), }, "policy-scheduler") s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) if err != nil { @@ -1173,7 +1326,16 @@ func (s *Server) loadPolicies(ctx context.Context) error { } var doc struct { - Policies []api.BackupPolicy `json:"policies"` + Policies []struct { + ID string `json:"id"` + Namespace string `json:"namespace"` + PVC string `json:"pvc,omitempty"` + IntervalHours float64 `json:"interval_hours"` + Enabled bool `json:"enabled"` + Dedupe *bool `json:"dedupe,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` + } `json:"policies"` } if err := json.Unmarshal(raw, &doc); err != nil { return fmt.Errorf("decode policy document: %w", err) @@ -1191,6 +1353,10 @@ func (s *Server) loadPolicies(ctx context.Context) error { if interval <= 0 { interval = defaultPolicyHours } + dedupe := true + if policy.Dedupe != nil { + dedupe = *policy.Dedupe + } id := policyKey(namespace, pvc) createdAt := policy.CreatedAt if createdAt == "" { @@ -1206,6 +1372,7 @@ func (s *Server) loadPolicies(ctx context.Context) error { PVC: pvc, IntervalHours: interval, Enabled: policy.Enabled, + Dedupe: dedupe, CreatedAt: createdAt, UpdatedAt: updatedAt, } @@ -1289,6 +1456,7 @@ func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertReq if req.Enabled != nil { enabled = *req.Enabled } + dedupe := dedupeDefault(req.Dedupe) id := policyKey(namespace, pvc) now := time.Now().UTC().Format(time.RFC3339) @@ -1305,6 +1473,7 @@ func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertReq PVC: pvc, IntervalHours: interval, Enabled: enabled, + Dedupe: dedupe, CreatedAt: createdAt, UpdatedAt: now, } @@ -1542,7 +1711,7 @@ func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { return records } -func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord { +func buildResticBackupRecords(jobs []k8s.BackupJobSummary, defaultRepository string) []api.BackupRecord { records := make([]api.BackupRecord, 0, len(jobs)) latestName := "" for _, job := range jobs { @@ -1560,8 +1729,11 @@ func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord { url := "" latest := job.Name == latestName if latest && strings.EqualFold(job.State, "Completed") { - // The restore API defaults to "latest"; expose one selectable option in the UI. - url = "latest" + repository := strings.TrimSpace(job.Repository) + if repository == "" { + repository = strings.TrimSpace(defaultRepository) + } + url = encodeResticSelector(repository) } records = append(records, api.BackupRecord{ Name: job.Name, @@ -1575,6 +1747,40 @@ func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord { return records } +func encodeResticSelector(repository string) string { + repository = strings.TrimSpace(repository) + if repository == "" { + return "latest" + } + return resticSelectorPrefix + base64.RawURLEncoding.EncodeToString([]byte(repository)) +} + +func decodeResticSelector(raw string) (string, string, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return "", "", false + } + if raw == "latest" { + return "", "latest", true + } + if !strings.HasPrefix(raw, resticSelectorPrefix) { + return "", "", false + } + encoded := strings.TrimPrefix(raw, resticSelectorPrefix) + if encoded == "" { + return "", "", false + } + decoded, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + return "", "", false + } + repository := strings.TrimSpace(string(decoded)) + if repository == "" { + return "", "", false + } + return repository, "latest", true +} + func backupJobTimestamp(job k8s.BackupJobSummary) time.Time { if !job.CompletionTime.IsZero() { return job.CompletionTime @@ -1699,3 +1905,15 @@ func parseSizeBytes(raw string) int64 { } return 0 } + +func dedupeDefault(value *bool) bool { + if value == nil { + return true + } + return *value +} + +func boolPtr(value bool) *bool { + ptr := value + return &ptr +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 20f03f7..facec44 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -3,6 +3,7 @@ package server import ( "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -19,21 +20,26 @@ import ( ) type fakeKubeClient struct { - pvcs []k8s.PVCSummary - backupJobs map[string][]k8s.BackupJobSummary - targetExists bool - secretData map[string][]byte + pvcs []k8s.PVCSummary + backupJobs map[string][]k8s.BackupJobSummary + jobLogs map[string]string + lastBackupReq api.BackupRequest + lastRestoreReq api.RestoreTestRequest + targetExists bool + secretData map[string][]byte } func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { return namespace + "-" + pvcName + "-pv", nil, nil, nil } -func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, _ api.BackupRequest) (string, string, error) { +func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, req api.BackupRequest) (string, string, error) { + f.lastBackupReq = req return "backup-job", "backup-secret", nil } -func (f *fakeKubeClient) CreateRestoreJob(_ context.Context, _ *config.Config, _ api.RestoreTestRequest) (string, string, error) { +func (f *fakeKubeClient) CreateRestoreJob(_ context.Context, _ *config.Config, req api.RestoreTestRequest) (string, string, error) { + f.lastRestoreReq = req return "restore-job", "restore-secret", nil } @@ -76,6 +82,14 @@ func (f *fakeKubeClient) ListBackupJobs(_ context.Context, namespace string) ([] return out, nil } +func (f *fakeKubeClient) ReadBackupJobLog(_ context.Context, namespace, jobName string) (string, error) { + if f.jobLogs == nil { + return "", nil + } + key := namespace + "/" + jobName + return f.jobLogs[key], nil +} + func (f *fakeKubeClient) PersistentVolumeClaimExists(_ context.Context, _, _ string) (bool, error) { return f.targetExists, nil } @@ -298,6 +312,41 @@ func TestBackupCreatesSnapshotBeforeBackup(t *testing.T) { } } +func TestResticBackupDefaultsDedupeEnabled(t *testing.T) { + kube := &fakeKubeClient{} + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + body := `{"namespace":"apps","pvc":"data","dry_run":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/backup", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + if kube.lastBackupReq.Dedupe == nil || !*kube.lastBackupReq.Dedupe { + t.Fatalf("expected dedupe default true, got %#v", kube.lastBackupReq.Dedupe) + } + + var payload api.BackupResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode response: %v", err) + } + if !payload.Dedupe { + t.Fatalf("expected response dedupe=true, got %#v", payload) + } +} + func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) { completedAt := time.Now().UTC().Add(-2 * time.Hour) srv := &Server{ @@ -322,9 +371,13 @@ func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) { }, }, }, + jobLogs: map[string]string{ + "apps/soteria-backup-data-20260413-000000": "Added to the repository: 25.000 MiB (10.500 MiB stored)", + }, }, longhorn: &fakeLonghornClient{}, metrics: newTelemetry(), + jobUsage: map[string]resticJobUsageCacheEntry{}, } srv.handler = http.HandlerFunc(srv.route) @@ -353,6 +406,12 @@ func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) { if entry.CompletedBackups != 1 || entry.BackupCount != 1 { t.Fatalf("expected one completed backup, got %#v", entry) } + if entry.LastBackupSizeBytes <= 0 { + t.Fatalf("expected restic stored bytes from job logs, got %#v", entry.LastBackupSizeBytes) + } + if entry.TotalBackupSizeBytes <= 0 { + t.Fatalf("expected restic total stored bytes from job logs, got %#v", entry.TotalBackupSizeBytes) + } } func TestResticInventoryMarksInProgressWhenOnlyActiveJobsExist(t *testing.T) { @@ -463,6 +522,66 @@ func TestResticBackupsEndpointReturnsLatestSelector(t *testing.T) { } } +func TestResticRestoreUsesRepositorySelector(t *testing.T) { + repository := "s3:https://example.invalid/atlas-soteria/isolated/apps/data" + kube := &fakeKubeClient{} + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + body := fmt.Sprintf( + `{"namespace":"apps","pvc":"data","backup_url":"%s","target_namespace":"apps","target_pvc":"restore-data","dry_run":false}`, + encodeResticSelector(repository), + ) + req := httptest.NewRequest(http.MethodPost, "/v1/restores", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + if kube.lastRestoreReq.Repository != repository { + t.Fatalf("expected repository selector %q, got %#v", repository, kube.lastRestoreReq.Repository) + } + if kube.lastRestoreReq.Snapshot != "latest" { + t.Fatalf("expected latest snapshot selector, got %#v", kube.lastRestoreReq.Snapshot) + } +} + +func TestParseResticStoredBytesFromTextSummary(t *testing.T) { + logBody := ` +Files: 100 new, 10 changed, 900 unmodified +Added to the repository: 120.500 MiB (35.250 MiB stored) +snapshot 12345678 saved +` + value, ok := parseResticStoredBytes(logBody) + if !ok { + t.Fatalf("expected to parse stored bytes from text summary") + } + if value <= 0 { + t.Fatalf("expected positive parsed value, got %f", value) + } +} + +func TestParseResticStoredBytesFromJSONSummary(t *testing.T) { + logBody := `{"message_type":"summary","files_new":1,"data_added":2048}` + value, ok := parseResticStoredBytes(logBody) + if !ok { + t.Fatalf("expected to parse stored bytes from json summary") + } + if value != 2048 { + t.Fatalf("expected 2048 bytes, got %f", value) + } +} + func TestMetricsStayPublic(t *testing.T) { srv := &Server{ cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin"}}, @@ -550,6 +669,9 @@ func TestPoliciesCRUD(t *testing.T) { if created.Namespace != "apps" || created.IntervalHours != 6 { t.Fatalf("unexpected created policy: %#v", created) } + if !created.Dedupe { + t.Fatalf("expected policy dedupe default true, got %#v", created) + } listReq := httptest.NewRequest(http.MethodGet, "/v1/policies", nil) listRes := httptest.NewRecorder() @@ -573,6 +695,36 @@ func TestPoliciesCRUD(t *testing.T) { } } +func TestLoadPoliciesDefaultsDedupeEnabledWhenMissing(t *testing.T) { + kube := &fakeKubeClient{ + secretData: map[string][]byte{ + policySecretKey: []byte(`{"policies":[{"namespace":"apps","pvc":"data","interval_hours":12,"enabled":true}]}`), + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + Namespace: "maintenance", + PolicySecretName: "soteria-policies", + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, + } + if err := srv.loadPolicies(context.Background()); err != nil { + t.Fatalf("load policies: %v", err) + } + policies := srv.listPolicies() + if len(policies) != 1 { + t.Fatalf("expected one policy, got %#v", policies) + } + if !policies[0].Dedupe { + t.Fatalf("expected dedupe to default true for legacy policy, got %#v", policies[0]) + } +} + func TestNamespaceBackupDryRun(t *testing.T) { srv := &Server{ cfg: &config.Config{ @@ -610,6 +762,46 @@ func TestNamespaceBackupDryRun(t *testing.T) { } } +func TestNamespaceBackupUsesDedupeFlag(t *testing.T) { + kube := &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data-a", VolumeName: "pv-apps-a", Phase: "Bound"}, + }, + } + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "restic", + }, + client: kube, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, + } + srv.handler = http.HandlerFunc(srv.route) + + body := `{"namespace":"apps","dry_run":false,"dedupe":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/backup/namespace", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + + var payload api.NamespaceBackupResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode payload: %v", err) + } + if payload.Dedupe { + t.Fatalf("expected response dedupe false, got %#v", payload) + } + if kube.lastBackupReq.Dedupe == nil || *kube.lastBackupReq.Dedupe { + t.Fatalf("expected backup request dedupe false, got %#v", kube.lastBackupReq.Dedupe) + } +} + func TestNamespaceRestoreDryRun(t *testing.T) { srv := &Server{ cfg: &config.Config{ diff --git a/web/src/App.tsx b/web/src/App.tsx index f8a3291..3d7ee5a 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -67,6 +67,7 @@ interface BackupPolicy { pvc?: string; interval_hours: number; enabled: boolean; + dedupe?: boolean; created_at?: string; updated_at?: string; } @@ -268,6 +269,8 @@ function App() { const [policyPVC, setPolicyPVC] = useState(''); const [policyIntervalHours, setPolicyIntervalHours] = useState(24); const [policyEnabled, setPolicyEnabled] = useState(true); + const [policyDedupe, setPolicyDedupe] = useState(true); + const [manualDedupe, setManualDedupe] = useState(true); const [lastAction, setLastAction] = useState('No action yet.'); const [busy, setBusy] = useState(false); @@ -401,7 +404,7 @@ function App() { const payload = await fetchJSON('/v1/backup', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ namespace, pvc, dry_run: false }) + body: JSON.stringify({ namespace, pvc, dry_run: false, dedupe: manualDedupe }) }); writeAction(payload); await Promise.all([loadInventory(), loadB2Usage()]); @@ -418,7 +421,7 @@ function App() { const payload = await fetchJSON('/v1/backup/namespace', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ namespace, dry_run: false }) + body: JSON.stringify({ namespace, dry_run: false, dedupe: manualDedupe }) }); writeAction(payload); await Promise.all([loadInventory(), loadB2Usage()]); @@ -530,7 +533,8 @@ function App() { namespace: policyNamespace, pvc: policyPVC, interval_hours: policyIntervalHours, - enabled: policyEnabled + enabled: policyEnabled, + dedupe: policyDedupe }) }); writeAction(payload); @@ -591,6 +595,11 @@ function App() {

PVC Inventory

{inventory?.generated_at ? `Updated ${formatTimestamp(inventory.generated_at)}` : 'No inventory yet'} + +

This setting applies to both `Backup now` and `Backup namespace` actions.

{inventoryError &&

{inventoryError}

} {!inventory && !inventoryError &&

Loading inventory...

} {inventory?.namespaces.map((namespace) => ( @@ -613,8 +622,10 @@ function App() { const progressPct = Math.max(0, Math.min(100, Number(pvc.last_job_progress_pct || 0))); const progressClass = progressChipClass(pvc.last_job_state); const showProgress = Boolean(pvc.last_job_name) || (pvc.active_backups || 0) > 0; - const latestSizeLabel = pvc.driver === 'restic' ? 'n/a' : formatBytes(pvc.last_backup_size_bytes); - const totalStoredLabel = pvc.driver === 'restic' ? 'n/a' : formatBytes(pvc.total_backup_size_bytes); + const latestSizeLabel = formatBytes(pvc.last_backup_size_bytes); + const totalStoredLabel = formatBytes(pvc.total_backup_size_bytes); + const showResticSizeHint = pvc.driver === 'restic' + && (pvc.last_backup_size_bytes === undefined || pvc.total_backup_size_bytes === undefined); return (
@@ -633,8 +644,8 @@ function App() {

Backups: {pvc.completed_backups}/{pvc.backup_count} completed | Latest size: {latestSizeLabel} | Total stored: {totalStoredLabel}

- {pvc.driver === 'restic' && ( -

Per-PVC size is not currently emitted for restic snapshots because repository storage is deduplicated and shared.

+ {showResticSizeHint && ( +

Per-PVC upload bytes are estimated from retained restic backup job logs; older jobs outside retention may show n/a.

)} {showProgress && (
@@ -809,7 +820,7 @@ function App() {

Backup Policies

-

Policy backups create new restic snapshots, but unchanged blocks are deduplicated, so repeated runs do not re-upload identical data.

+

Policy backups create new restic snapshots. With dedupe on, unchanged blocks are reused in the shared repository. With dedupe off, Soteria isolates each PVC to its own repository path.

+
@@ -846,7 +861,7 @@ function App() { {policy.namespace}/{policy.pvc || '*'} {policy.enabled ? 'Enabled' : 'Disabled'}
-

Every {policy.interval_hours}h | Updated {formatTimestamp(policy.updated_at || policy.created_at)}

+

Every {policy.interval_hours}h | Dedupe: {policy.dedupe === false ? 'off' : 'on'} | Updated {formatTimestamp(policy.updated_at || policy.created_at)}