backup: add restic size accounting and dedupe controls
This commit is contained in:
parent
9e210ceffb
commit
9a26274242
@ -6,6 +6,7 @@ type BackupRequest struct {
|
||||
Tags []string `json:"tags,omitempty"`
|
||||
Snapshot bool `json:"snapshot"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Dedupe *bool `json:"dedupe,omitempty"`
|
||||
}
|
||||
|
||||
type BackupResponse struct {
|
||||
@ -17,6 +18,7 @@ type BackupResponse struct {
|
||||
Secret string `json:"secret,omitempty"`
|
||||
RequestedBy string `json:"requested_by,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Dedupe bool `json:"dedupe"`
|
||||
}
|
||||
|
||||
type RestoreTestRequest struct {
|
||||
@ -24,6 +26,7 @@ type RestoreTestRequest struct {
|
||||
PVC string `json:"pvc,omitempty"`
|
||||
Snapshot string `json:"snapshot,omitempty"`
|
||||
BackupURL string `json:"backup_url,omitempty"`
|
||||
Repository string `json:"repository,omitempty"`
|
||||
TargetNamespace string `json:"target_namespace,omitempty"`
|
||||
TargetPVC string `json:"target_pvc,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
@ -108,6 +111,7 @@ type BackupPolicy struct {
|
||||
PVC string `json:"pvc,omitempty"`
|
||||
IntervalHours float64 `json:"interval_hours"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Dedupe bool `json:"dedupe"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
UpdatedAt string `json:"updated_at,omitempty"`
|
||||
}
|
||||
@ -117,6 +121,7 @@ type BackupPolicyUpsertRequest struct {
|
||||
PVC string `json:"pvc,omitempty"`
|
||||
IntervalHours float64 `json:"interval_hours"`
|
||||
Enabled *bool `json:"enabled,omitempty"`
|
||||
Dedupe *bool `json:"dedupe,omitempty"`
|
||||
}
|
||||
|
||||
type BackupPolicyListResponse struct {
|
||||
@ -126,6 +131,7 @@ type BackupPolicyListResponse struct {
|
||||
type NamespaceBackupRequest struct {
|
||||
Namespace string `json:"namespace"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Dedupe *bool `json:"dedupe,omitempty"`
|
||||
}
|
||||
|
||||
type NamespaceBackupResult struct {
|
||||
@ -142,6 +148,7 @@ type NamespaceBackupResponse struct {
|
||||
RequestedBy string `json:"requested_by,omitempty"`
|
||||
Driver string `json:"driver"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Dedupe bool `json:"dedupe"`
|
||||
Total int `json:"total"`
|
||||
Succeeded int `json:"succeeded"`
|
||||
Failed int `json:"failed"`
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -17,16 +18,20 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
labelAppName = "app.kubernetes.io/name"
|
||||
labelComponent = "app.kubernetes.io/component"
|
||||
labelAction = "soteria.bstein.dev/action"
|
||||
labelPVC = "soteria.bstein.dev/pvc"
|
||||
labelAppName = "app.kubernetes.io/name"
|
||||
labelComponent = "app.kubernetes.io/component"
|
||||
labelAction = "soteria.bstein.dev/action"
|
||||
labelPVC = "soteria.bstein.dev/pvc"
|
||||
annotationResticRepository = "soteria.bstein.dev/restic-repository"
|
||||
annotationDedupeEnabled = "soteria.bstein.dev/dedupe-enabled"
|
||||
)
|
||||
|
||||
type BackupJobSummary struct {
|
||||
Name string
|
||||
Namespace string
|
||||
PVC string
|
||||
Repository string
|
||||
DedupeEnabled bool
|
||||
CreatedAt time.Time
|
||||
CompletionTime time.Time
|
||||
State string
|
||||
@ -52,6 +57,42 @@ func (c *Client) ListBackupJobs(ctx context.Context, namespace string) ([]Backup
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *Client) ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error) {
|
||||
selector := fmt.Sprintf("job-name=%s", jobName)
|
||||
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("list pods for backup job %s/%s: %w", namespace, jobName, err)
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
return "", fmt.Errorf("no pod found for backup job %s/%s", namespace, jobName)
|
||||
}
|
||||
|
||||
sort.Slice(pods.Items, func(i, j int) bool {
|
||||
left := pods.Items[i].Status.StartTime
|
||||
right := pods.Items[j].Status.StartTime
|
||||
switch {
|
||||
case left != nil && right != nil:
|
||||
if left.Time.Equal(right.Time) {
|
||||
return pods.Items[i].Name > pods.Items[j].Name
|
||||
}
|
||||
return left.Time.After(right.Time)
|
||||
case left != nil:
|
||||
return true
|
||||
case right != nil:
|
||||
return false
|
||||
default:
|
||||
return pods.Items[i].CreationTimestamp.Time.After(pods.Items[j].CreationTimestamp.Time)
|
||||
}
|
||||
})
|
||||
|
||||
podName := pods.Items[0].Name
|
||||
raw, err := c.Clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("read logs for backup job pod %s/%s: %w", namespace, podName, err)
|
||||
}
|
||||
return string(raw), nil
|
||||
}
|
||||
|
||||
func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]BackupJobSummary, error) {
|
||||
selector := fmt.Sprintf("%s=soteria,%s=backup,%s=backup,%s=%s", labelAppName, labelComponent, labelAction, labelPVC, pvc)
|
||||
jobs, err := c.Clientset.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{LabelSelector: selector})
|
||||
@ -71,12 +112,17 @@ func (c *Client) ListBackupJobsForPVC(ctx context.Context, namespace, pvc string
|
||||
|
||||
func summarizeBackupJob(job batchv1.Job, pvc string) BackupJobSummary {
|
||||
summary := BackupJobSummary{
|
||||
Name: job.Name,
|
||||
Namespace: job.Namespace,
|
||||
PVC: pvc,
|
||||
CreatedAt: job.CreationTimestamp.Time,
|
||||
State: "Pending",
|
||||
Name: job.Name,
|
||||
Namespace: job.Namespace,
|
||||
PVC: pvc,
|
||||
DedupeEnabled: true,
|
||||
CreatedAt: job.CreationTimestamp.Time,
|
||||
State: "Pending",
|
||||
}
|
||||
if raw := strings.TrimSpace(job.Annotations[annotationResticRepository]); raw != "" {
|
||||
summary.Repository = raw
|
||||
}
|
||||
summary.DedupeEnabled = parseBoolWithDefault(job.Annotations[annotationDedupeEnabled], true)
|
||||
if job.Status.CompletionTime != nil {
|
||||
summary.CompletionTime = job.Status.CompletionTime.Time
|
||||
}
|
||||
@ -121,6 +167,8 @@ func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req ap
|
||||
|
||||
jobName := jobName("backup", req.PVC)
|
||||
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
|
||||
dedupeEnabled := dedupeEnabled(req.Dedupe)
|
||||
repository := resticRepositoryForBackup(cfg.ResticRepository, req.Namespace, req.PVC, dedupeEnabled)
|
||||
|
||||
if req.DryRun {
|
||||
return jobName, secretName, nil
|
||||
@ -136,7 +184,7 @@ func (c *Client) CreateBackupJob(ctx context.Context, cfg *config.Config, req ap
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
job := buildBackupJob(cfg, req, jobName, secretName)
|
||||
job := buildBackupJob(cfg, req, jobName, secretName, repository, dedupeEnabled)
|
||||
if nodeName, err := c.resolvePVCMountedNode(ctx, req.Namespace, req.PVC); err == nil && nodeName != "" {
|
||||
job.Spec.Template.Spec.NodeName = nodeName
|
||||
}
|
||||
@ -192,6 +240,10 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a
|
||||
|
||||
jobName := jobName("restore", snapshot)
|
||||
secretName := fmt.Sprintf("soteria-%s-restic", jobName)
|
||||
repository := strings.TrimSpace(req.Repository)
|
||||
if repository == "" {
|
||||
repository = cfg.ResticRepository
|
||||
}
|
||||
|
||||
if req.DryRun {
|
||||
return jobName, secretName, nil
|
||||
@ -206,7 +258,7 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
job := buildRestoreJob(cfg, req, jobName, secretName, snapshot)
|
||||
job := buildRestoreJob(cfg, req, jobName, secretName, snapshot, repository)
|
||||
created, err := c.Clientset.BatchV1().Jobs(req.Namespace).Create(ctx, job, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
_ = c.Clientset.CoreV1().Secrets(req.Namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
|
||||
@ -220,13 +272,17 @@ func (c *Client) CreateRestoreJob(ctx context.Context, cfg *config.Config, req a
|
||||
return jobName, secretName, nil
|
||||
}
|
||||
|
||||
func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName string) *batchv1.Job {
|
||||
func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretName, repository string, dedupeEnabled bool) *batchv1.Job {
|
||||
labels := map[string]string{
|
||||
labelAppName: "soteria",
|
||||
labelComponent: "backup",
|
||||
labelAction: "backup",
|
||||
labelPVC: req.PVC,
|
||||
}
|
||||
annotations := map[string]string{
|
||||
annotationResticRepository: repository,
|
||||
annotationDedupeEnabled: strconv.FormatBool(dedupeEnabled),
|
||||
}
|
||||
|
||||
command := backupCommand(cfg, req)
|
||||
|
||||
@ -239,7 +295,7 @@ func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretNa
|
||||
ImagePullPolicy: corev1.PullIfNotPresent,
|
||||
Command: []string{"/bin/sh", "-c"},
|
||||
Args: []string{command},
|
||||
Env: resticEnv(cfg, secretName),
|
||||
Env: resticEnv(cfg, secretName, repository),
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{Name: "data", MountPath: "/data", ReadOnly: true},
|
||||
{Name: "cache", MountPath: "/cache"},
|
||||
@ -274,27 +330,31 @@ func buildBackupJob(cfg *config.Config, req api.BackupRequest, jobName, secretNa
|
||||
|
||||
return &batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Labels: labels,
|
||||
Name: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
},
|
||||
Spec: batchv1.JobSpec{
|
||||
BackoffLimit: int32Ptr(0),
|
||||
TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds),
|
||||
Template: corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: labels},
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations},
|
||||
Spec: pod,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot string) *batchv1.Job {
|
||||
func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, secretName, snapshot, repository string) *batchv1.Job {
|
||||
labels := map[string]string{
|
||||
labelAppName: "soteria",
|
||||
labelComponent: "restore",
|
||||
labelAction: "restore",
|
||||
}
|
||||
annotations := map[string]string{
|
||||
annotationResticRepository: repository,
|
||||
}
|
||||
|
||||
command := restoreCommand(snapshot)
|
||||
|
||||
@ -307,7 +367,7 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se
|
||||
ImagePullPolicy: corev1.PullIfNotPresent,
|
||||
Command: []string{"/bin/sh", "-c"},
|
||||
Args: []string{command},
|
||||
Env: resticEnv(cfg, secretName),
|
||||
Env: resticEnv(cfg, secretName, repository),
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{Name: "restore", MountPath: "/restore"},
|
||||
{Name: "cache", MountPath: "/cache"},
|
||||
@ -350,15 +410,16 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se
|
||||
|
||||
return &batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Labels: labels,
|
||||
Name: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
},
|
||||
Spec: batchv1.JobSpec{
|
||||
BackoffLimit: int32Ptr(0),
|
||||
TTLSecondsAfterFinished: int32Ptr(cfg.JobTTLSeconds),
|
||||
Template: corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: labels},
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: labels, Annotations: annotations},
|
||||
Spec: pod,
|
||||
},
|
||||
},
|
||||
@ -366,7 +427,16 @@ func buildRestoreJob(cfg *config.Config, req api.RestoreTestRequest, jobName, se
|
||||
}
|
||||
|
||||
func backupCommand(cfg *config.Config, req api.BackupRequest) string {
|
||||
args := []string{"restic", "backup", "/data", "--tag", "soteria", "--tag", fmt.Sprintf("pvc=%s", req.PVC)}
|
||||
mode := "on"
|
||||
if !dedupeEnabled(req.Dedupe) {
|
||||
mode = "off"
|
||||
}
|
||||
args := []string{
|
||||
"restic", "backup", "/data",
|
||||
"--tag", "soteria",
|
||||
"--tag", fmt.Sprintf("pvc=%s", req.PVC),
|
||||
"--tag", fmt.Sprintf("dedupe=%s", mode),
|
||||
}
|
||||
for _, tag := range req.Tags {
|
||||
tag = strings.TrimSpace(tag)
|
||||
if tag == "" {
|
||||
@ -393,9 +463,12 @@ func restoreCommand(snapshot string) string {
|
||||
)
|
||||
}
|
||||
|
||||
func resticEnv(cfg *config.Config, secretName string) []corev1.EnvVar {
|
||||
func resticEnv(cfg *config.Config, secretName, repository string) []corev1.EnvVar {
|
||||
if strings.TrimSpace(repository) == "" {
|
||||
repository = cfg.ResticRepository
|
||||
}
|
||||
env := []corev1.EnvVar{
|
||||
{Name: "RESTIC_REPOSITORY", Value: cfg.ResticRepository},
|
||||
{Name: "RESTIC_REPOSITORY", Value: repository},
|
||||
{Name: "RESTIC_CACHE_DIR", Value: "/cache"},
|
||||
{
|
||||
Name: "AWS_ACCESS_KEY_ID",
|
||||
@ -493,6 +566,66 @@ func sanitizeName(value string) string {
|
||||
return value
|
||||
}
|
||||
|
||||
func dedupeEnabled(raw *bool) bool {
|
||||
if raw == nil {
|
||||
return true
|
||||
}
|
||||
return *raw
|
||||
}
|
||||
|
||||
func resticRepositoryForBackup(base, namespace, pvc string, dedupe bool) string {
|
||||
if dedupe {
|
||||
return strings.TrimSpace(base)
|
||||
}
|
||||
ns := sanitizeRepositorySegment(namespace)
|
||||
pvcName := sanitizeRepositorySegment(pvc)
|
||||
suffix := strings.Trim(strings.Join([]string{"isolated", ns, pvcName}, "/"), "/")
|
||||
return appendRepositoryPath(base, suffix)
|
||||
}
|
||||
|
||||
func sanitizeRepositorySegment(value string) string {
|
||||
sanitized := sanitizeName(value)
|
||||
if sanitized == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return sanitized
|
||||
}
|
||||
|
||||
func appendRepositoryPath(base, suffix string) string {
|
||||
base = strings.TrimSpace(base)
|
||||
suffix = strings.Trim(suffix, "/")
|
||||
if base == "" || suffix == "" {
|
||||
return base
|
||||
}
|
||||
|
||||
backendPrefix := ""
|
||||
location := base
|
||||
if idx := strings.Index(base, ":"); idx > 0 {
|
||||
backendPrefix = base[:idx+1]
|
||||
location = base[idx+1:]
|
||||
}
|
||||
location = strings.TrimRight(location, "/")
|
||||
if location == "" {
|
||||
return base
|
||||
}
|
||||
return backendPrefix + location + "/" + suffix
|
||||
}
|
||||
|
||||
func parseBoolWithDefault(raw string, fallback bool) bool {
|
||||
value := strings.ToLower(strings.TrimSpace(raw))
|
||||
if value == "" {
|
||||
return fallback
|
||||
}
|
||||
switch value {
|
||||
case "1", "true", "yes", "on":
|
||||
return true
|
||||
case "0", "false", "no", "off":
|
||||
return false
|
||||
default:
|
||||
return fallback
|
||||
}
|
||||
}
|
||||
|
||||
func int32Ptr(val int32) *int32 {
|
||||
return &val
|
||||
}
|
||||
|
||||
@ -2,12 +2,14 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -30,6 +32,7 @@ type kubeClient interface {
|
||||
CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error)
|
||||
ListBackupJobs(ctx context.Context, namespace string) ([]k8s.BackupJobSummary, error)
|
||||
ListBackupJobsForPVC(ctx context.Context, namespace, pvc string) ([]k8s.BackupJobSummary, error)
|
||||
ReadBackupJobLog(ctx context.Context, namespace, jobName string) (string, error)
|
||||
ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error)
|
||||
PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error)
|
||||
LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error)
|
||||
@ -48,18 +51,20 @@ type longhornClient interface {
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
cfg *config.Config
|
||||
client kubeClient
|
||||
longhorn longhornClient
|
||||
metrics *telemetry
|
||||
handler http.Handler
|
||||
ui *uiRenderer
|
||||
policyMu sync.RWMutex
|
||||
policies map[string]api.BackupPolicy
|
||||
runMu sync.Mutex
|
||||
running bool
|
||||
b2Mu sync.RWMutex
|
||||
b2Usage api.B2UsageResponse
|
||||
cfg *config.Config
|
||||
client kubeClient
|
||||
longhorn longhornClient
|
||||
metrics *telemetry
|
||||
handler http.Handler
|
||||
ui *uiRenderer
|
||||
policyMu sync.RWMutex
|
||||
policies map[string]api.BackupPolicy
|
||||
runMu sync.Mutex
|
||||
running bool
|
||||
b2Mu sync.RWMutex
|
||||
b2Usage api.B2UsageResponse
|
||||
jobUsage map[string]resticJobUsageCacheEntry
|
||||
jobUsageMu sync.RWMutex
|
||||
}
|
||||
|
||||
type authIdentity struct {
|
||||
@ -77,6 +82,19 @@ const (
|
||||
policySecretKey = "policies.json"
|
||||
defaultPolicyHours = 24.0
|
||||
maxPolicyIntervalHrs = 24 * 365
|
||||
maxUsageSampleJobs = 20
|
||||
resticSelectorPrefix = "restic-latest:"
|
||||
)
|
||||
|
||||
type resticJobUsageCacheEntry struct {
|
||||
Known bool
|
||||
Bytes float64
|
||||
CheckedAt time.Time
|
||||
}
|
||||
|
||||
var (
|
||||
resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`)
|
||||
resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`)
|
||||
)
|
||||
|
||||
func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server {
|
||||
@ -87,6 +105,7 @@ func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server {
|
||||
metrics: newTelemetry(),
|
||||
ui: newUIRenderer(),
|
||||
policies: map[string]api.BackupPolicy{},
|
||||
jobUsage: map[string]resticJobUsageCacheEntry{},
|
||||
}
|
||||
s.handler = http.HandlerFunc(s.route)
|
||||
return s
|
||||
@ -287,7 +306,7 @@ func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) {
|
||||
Namespace: namespace,
|
||||
PVC: pvcName,
|
||||
Volume: volumeName,
|
||||
Backups: buildResticBackupRecords(jobs),
|
||||
Backups: buildResticBackupRecords(jobs, s.cfg.ResticRepository),
|
||||
})
|
||||
default:
|
||||
writeError(w, http.StatusBadRequest, "unsupported backup driver")
|
||||
@ -430,11 +449,13 @@ func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
requester := currentRequester(r.Context())
|
||||
resolvedDedupe := dedupeDefault(req.Dedupe)
|
||||
response := api.NamespaceBackupResponse{
|
||||
Namespace: req.Namespace,
|
||||
RequestedBy: requester,
|
||||
Driver: s.cfg.BackupDriver,
|
||||
DryRun: req.DryRun,
|
||||
Dedupe: resolvedDedupe,
|
||||
Results: make([]api.NamespaceBackupResult, 0, len(pvcs)),
|
||||
}
|
||||
|
||||
@ -443,6 +464,7 @@ func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) {
|
||||
Namespace: req.Namespace,
|
||||
PVC: pvc.Name,
|
||||
DryRun: req.DryRun,
|
||||
Dedupe: boolPtr(resolvedDedupe),
|
||||
}
|
||||
result, status, execErr := s.executeBackup(r.Context(), backupReq, requester)
|
||||
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status)
|
||||
@ -614,6 +636,8 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque
|
||||
if req.Namespace == "" || req.PVC == "" {
|
||||
return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required")
|
||||
}
|
||||
resolvedDedupe := dedupeDefault(req.Dedupe)
|
||||
req.Dedupe = boolPtr(resolvedDedupe)
|
||||
|
||||
switch s.cfg.BackupDriver {
|
||||
case "longhorn":
|
||||
@ -630,6 +654,7 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque
|
||||
Namespace: req.Namespace,
|
||||
RequestedBy: requester,
|
||||
DryRun: req.DryRun,
|
||||
Dedupe: resolvedDedupe,
|
||||
}
|
||||
if req.DryRun {
|
||||
return response, "dry_run", nil
|
||||
@ -663,6 +688,7 @@ func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, reque
|
||||
Secret: secretName,
|
||||
RequestedBy: requester,
|
||||
DryRun: req.DryRun,
|
||||
Dedupe: resolvedDedupe,
|
||||
}, result, nil
|
||||
default:
|
||||
return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver")
|
||||
@ -745,6 +771,14 @@ func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest,
|
||||
}
|
||||
return response, "success", nil
|
||||
case "restic":
|
||||
if repo, snapshot, ok := decodeResticSelector(req.BackupURL); ok {
|
||||
if strings.TrimSpace(req.Snapshot) == "" {
|
||||
req.Snapshot = snapshot
|
||||
}
|
||||
if strings.TrimSpace(req.Repository) == "" {
|
||||
req.Repository = repo
|
||||
}
|
||||
}
|
||||
jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req)
|
||||
if err != nil {
|
||||
return api.RestoreTestResponse{}, "backend_error", err
|
||||
@ -1008,6 +1042,25 @@ func (s *Server) enrichPVCInventory(
|
||||
}
|
||||
entry.ActiveBackups = active
|
||||
entry.CompletedBackups = len(completed)
|
||||
totalStoredBytes := 0.0
|
||||
storedSamples := 0
|
||||
for index, job := range completed {
|
||||
if index >= maxUsageSampleJobs {
|
||||
break
|
||||
}
|
||||
storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if index == 0 {
|
||||
entry.LastBackupSizeBytes = storedBytes
|
||||
}
|
||||
totalStoredBytes += storedBytes
|
||||
storedSamples++
|
||||
}
|
||||
if storedSamples > 0 {
|
||||
entry.TotalBackupSizeBytes = totalStoredBytes
|
||||
}
|
||||
if len(completed) == 0 {
|
||||
entry.Healthy = false
|
||||
switch {
|
||||
@ -1058,6 +1111,98 @@ func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) {
|
||||
key := namespace + "/" + jobName
|
||||
|
||||
s.jobUsageMu.RLock()
|
||||
cached, ok := s.jobUsage[key]
|
||||
s.jobUsageMu.RUnlock()
|
||||
if ok && time.Since(cached.CheckedAt) < 15*time.Minute {
|
||||
return cached.Bytes, cached.Known
|
||||
}
|
||||
|
||||
logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName)
|
||||
entry := resticJobUsageCacheEntry{
|
||||
Known: false,
|
||||
Bytes: 0,
|
||||
CheckedAt: time.Now().UTC(),
|
||||
}
|
||||
if err == nil {
|
||||
if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed {
|
||||
entry.Known = true
|
||||
entry.Bytes = parsedBytes
|
||||
}
|
||||
}
|
||||
|
||||
s.jobUsageMu.Lock()
|
||||
if s.jobUsage == nil {
|
||||
s.jobUsage = map[string]resticJobUsageCacheEntry{}
|
||||
}
|
||||
s.jobUsage[key] = entry
|
||||
s.jobUsageMu.Unlock()
|
||||
|
||||
return entry.Bytes, entry.Known
|
||||
}
|
||||
|
||||
func parseResticStoredBytes(logBody string) (float64, bool) {
|
||||
if logBody == "" {
|
||||
return 0, false
|
||||
}
|
||||
matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1)
|
||||
if len(matches) > 0 {
|
||||
last := matches[len(matches)-1]
|
||||
if len(last) > 1 {
|
||||
if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil {
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1)
|
||||
if len(textMatches) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
last := textMatches[len(textMatches)-1]
|
||||
if len(last) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
return parseHumanByteSize(last[1])
|
||||
}
|
||||
|
||||
func parseHumanByteSize(raw string) (float64, bool) {
|
||||
parts := strings.Fields(strings.TrimSpace(raw))
|
||||
if len(parts) < 2 {
|
||||
return 0, false
|
||||
}
|
||||
value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
unit := strings.ToUpper(strings.TrimSpace(parts[1]))
|
||||
switch unit {
|
||||
case "B":
|
||||
return value, true
|
||||
case "KIB":
|
||||
return value * 1024, true
|
||||
case "MIB":
|
||||
return value * 1024 * 1024, true
|
||||
case "GIB":
|
||||
return value * 1024 * 1024 * 1024, true
|
||||
case "TIB":
|
||||
return value * 1024 * 1024 * 1024 * 1024, true
|
||||
case "KB":
|
||||
return value * 1000, true
|
||||
case "MB":
|
||||
return value * 1000 * 1000, true
|
||||
case "GB":
|
||||
return value * 1000 * 1000 * 1000, true
|
||||
case "TB":
|
||||
return value * 1000 * 1000 * 1000 * 1000, true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) refreshTelemetry(ctx context.Context) {
|
||||
refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer cancel()
|
||||
@ -1102,7 +1247,11 @@ func (s *Server) runPolicyCycle(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
effectiveIntervals := map[string]float64{}
|
||||
type effectivePolicy struct {
|
||||
IntervalHours float64
|
||||
Dedupe bool
|
||||
}
|
||||
effectivePolicies := map[string]effectivePolicy{}
|
||||
for _, policy := range policies {
|
||||
matches := []api.PVCInventory{}
|
||||
if policy.PVC != "" {
|
||||
@ -1115,19 +1264,22 @@ func (s *Server) runPolicyCycle(ctx context.Context) {
|
||||
|
||||
for _, pvc := range matches {
|
||||
key := pvc.Namespace + "/" + pvc.PVC
|
||||
current, exists := effectiveIntervals[key]
|
||||
if !exists || policy.IntervalHours < current {
|
||||
effectiveIntervals[key] = policy.IntervalHours
|
||||
current, exists := effectivePolicies[key]
|
||||
if !exists || policy.IntervalHours < current.IntervalHours {
|
||||
effectivePolicies[key] = effectivePolicy{
|
||||
IntervalHours: policy.IntervalHours,
|
||||
Dedupe: policy.Dedupe,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for key, intervalHours := range effectiveIntervals {
|
||||
for key, effective := range effectivePolicies {
|
||||
pvc, ok := pvcMap[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if !backupDue(pvc.LastBackupAt, intervalHours) {
|
||||
if !backupDue(pvc.LastBackupAt, effective.IntervalHours) {
|
||||
s.metrics.RecordPolicyBackup("not_due")
|
||||
continue
|
||||
}
|
||||
@ -1136,6 +1288,7 @@ func (s *Server) runPolicyCycle(ctx context.Context) {
|
||||
Namespace: pvc.Namespace,
|
||||
PVC: pvc.PVC,
|
||||
DryRun: false,
|
||||
Dedupe: boolPtr(effective.Dedupe),
|
||||
}, "policy-scheduler")
|
||||
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result)
|
||||
if err != nil {
|
||||
@ -1173,7 +1326,16 @@ func (s *Server) loadPolicies(ctx context.Context) error {
|
||||
}
|
||||
|
||||
var doc struct {
|
||||
Policies []api.BackupPolicy `json:"policies"`
|
||||
Policies []struct {
|
||||
ID string `json:"id"`
|
||||
Namespace string `json:"namespace"`
|
||||
PVC string `json:"pvc,omitempty"`
|
||||
IntervalHours float64 `json:"interval_hours"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Dedupe *bool `json:"dedupe,omitempty"`
|
||||
CreatedAt string `json:"created_at,omitempty"`
|
||||
UpdatedAt string `json:"updated_at,omitempty"`
|
||||
} `json:"policies"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &doc); err != nil {
|
||||
return fmt.Errorf("decode policy document: %w", err)
|
||||
@ -1191,6 +1353,10 @@ func (s *Server) loadPolicies(ctx context.Context) error {
|
||||
if interval <= 0 {
|
||||
interval = defaultPolicyHours
|
||||
}
|
||||
dedupe := true
|
||||
if policy.Dedupe != nil {
|
||||
dedupe = *policy.Dedupe
|
||||
}
|
||||
id := policyKey(namespace, pvc)
|
||||
createdAt := policy.CreatedAt
|
||||
if createdAt == "" {
|
||||
@ -1206,6 +1372,7 @@ func (s *Server) loadPolicies(ctx context.Context) error {
|
||||
PVC: pvc,
|
||||
IntervalHours: interval,
|
||||
Enabled: policy.Enabled,
|
||||
Dedupe: dedupe,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: updatedAt,
|
||||
}
|
||||
@ -1289,6 +1456,7 @@ func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertReq
|
||||
if req.Enabled != nil {
|
||||
enabled = *req.Enabled
|
||||
}
|
||||
dedupe := dedupeDefault(req.Dedupe)
|
||||
|
||||
id := policyKey(namespace, pvc)
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
@ -1305,6 +1473,7 @@ func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertReq
|
||||
PVC: pvc,
|
||||
IntervalHours: interval,
|
||||
Enabled: enabled,
|
||||
Dedupe: dedupe,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
@ -1542,7 +1711,7 @@ func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord {
|
||||
return records
|
||||
}
|
||||
|
||||
func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord {
|
||||
func buildResticBackupRecords(jobs []k8s.BackupJobSummary, defaultRepository string) []api.BackupRecord {
|
||||
records := make([]api.BackupRecord, 0, len(jobs))
|
||||
latestName := ""
|
||||
for _, job := range jobs {
|
||||
@ -1560,8 +1729,11 @@ func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord {
|
||||
url := ""
|
||||
latest := job.Name == latestName
|
||||
if latest && strings.EqualFold(job.State, "Completed") {
|
||||
// The restore API defaults to "latest"; expose one selectable option in the UI.
|
||||
url = "latest"
|
||||
repository := strings.TrimSpace(job.Repository)
|
||||
if repository == "" {
|
||||
repository = strings.TrimSpace(defaultRepository)
|
||||
}
|
||||
url = encodeResticSelector(repository)
|
||||
}
|
||||
records = append(records, api.BackupRecord{
|
||||
Name: job.Name,
|
||||
@ -1575,6 +1747,40 @@ func buildResticBackupRecords(jobs []k8s.BackupJobSummary) []api.BackupRecord {
|
||||
return records
|
||||
}
|
||||
|
||||
func encodeResticSelector(repository string) string {
|
||||
repository = strings.TrimSpace(repository)
|
||||
if repository == "" {
|
||||
return "latest"
|
||||
}
|
||||
return resticSelectorPrefix + base64.RawURLEncoding.EncodeToString([]byte(repository))
|
||||
}
|
||||
|
||||
func decodeResticSelector(raw string) (string, string, bool) {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return "", "", false
|
||||
}
|
||||
if raw == "latest" {
|
||||
return "", "latest", true
|
||||
}
|
||||
if !strings.HasPrefix(raw, resticSelectorPrefix) {
|
||||
return "", "", false
|
||||
}
|
||||
encoded := strings.TrimPrefix(raw, resticSelectorPrefix)
|
||||
if encoded == "" {
|
||||
return "", "", false
|
||||
}
|
||||
decoded, err := base64.RawURLEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
return "", "", false
|
||||
}
|
||||
repository := strings.TrimSpace(string(decoded))
|
||||
if repository == "" {
|
||||
return "", "", false
|
||||
}
|
||||
return repository, "latest", true
|
||||
}
|
||||
|
||||
func backupJobTimestamp(job k8s.BackupJobSummary) time.Time {
|
||||
if !job.CompletionTime.IsZero() {
|
||||
return job.CompletionTime
|
||||
@ -1699,3 +1905,15 @@ func parseSizeBytes(raw string) int64 {
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func dedupeDefault(value *bool) bool {
|
||||
if value == nil {
|
||||
return true
|
||||
}
|
||||
return *value
|
||||
}
|
||||
|
||||
func boolPtr(value bool) *bool {
|
||||
ptr := value
|
||||
return &ptr
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
@ -19,21 +20,26 @@ import (
|
||||
)
|
||||
|
||||
type fakeKubeClient struct {
|
||||
pvcs []k8s.PVCSummary
|
||||
backupJobs map[string][]k8s.BackupJobSummary
|
||||
targetExists bool
|
||||
secretData map[string][]byte
|
||||
pvcs []k8s.PVCSummary
|
||||
backupJobs map[string][]k8s.BackupJobSummary
|
||||
jobLogs map[string]string
|
||||
lastBackupReq api.BackupRequest
|
||||
lastRestoreReq api.RestoreTestRequest
|
||||
targetExists bool
|
||||
secretData map[string][]byte
|
||||
}
|
||||
|
||||
func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) {
|
||||
return namespace + "-" + pvcName + "-pv", nil, nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, _ api.BackupRequest) (string, string, error) {
|
||||
func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, req api.BackupRequest) (string, string, error) {
|
||||
f.lastBackupReq = req
|
||||
return "backup-job", "backup-secret", nil
|
||||
}
|
||||
|
||||
func (f *fakeKubeClient) CreateRestoreJob(_ context.Context, _ *config.Config, _ api.RestoreTestRequest) (string, string, error) {
|
||||
func (f *fakeKubeClient) CreateRestoreJob(_ context.Context, _ *config.Config, req api.RestoreTestRequest) (string, string, error) {
|
||||
f.lastRestoreReq = req
|
||||
return "restore-job", "restore-secret", nil
|
||||
}
|
||||
|
||||
@ -76,6 +82,14 @@ func (f *fakeKubeClient) ListBackupJobs(_ context.Context, namespace string) ([]
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (f *fakeKubeClient) ReadBackupJobLog(_ context.Context, namespace, jobName string) (string, error) {
|
||||
if f.jobLogs == nil {
|
||||
return "", nil
|
||||
}
|
||||
key := namespace + "/" + jobName
|
||||
return f.jobLogs[key], nil
|
||||
}
|
||||
|
||||
func (f *fakeKubeClient) PersistentVolumeClaimExists(_ context.Context, _, _ string) (bool, error) {
|
||||
return f.targetExists, nil
|
||||
}
|
||||
@ -298,6 +312,41 @@ func TestBackupCreatesSnapshotBeforeBackup(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResticBackupDefaultsDedupeEnabled(t *testing.T) {
|
||||
kube := &fakeKubeClient{}
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
AuthRequired: false,
|
||||
BackupDriver: "restic",
|
||||
},
|
||||
client: kube,
|
||||
longhorn: &fakeLonghornClient{},
|
||||
metrics: newTelemetry(),
|
||||
}
|
||||
srv.handler = http.HandlerFunc(srv.route)
|
||||
|
||||
body := `{"namespace":"apps","pvc":"data","dry_run":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/backup", strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
res := httptest.NewRecorder()
|
||||
srv.Handler().ServeHTTP(res, req)
|
||||
|
||||
if res.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String())
|
||||
}
|
||||
if kube.lastBackupReq.Dedupe == nil || !*kube.lastBackupReq.Dedupe {
|
||||
t.Fatalf("expected dedupe default true, got %#v", kube.lastBackupReq.Dedupe)
|
||||
}
|
||||
|
||||
var payload api.BackupResponse
|
||||
if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if !payload.Dedupe {
|
||||
t.Fatalf("expected response dedupe=true, got %#v", payload)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) {
|
||||
completedAt := time.Now().UTC().Add(-2 * time.Hour)
|
||||
srv := &Server{
|
||||
@ -322,9 +371,13 @@ func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
jobLogs: map[string]string{
|
||||
"apps/soteria-backup-data-20260413-000000": "Added to the repository: 25.000 MiB (10.500 MiB stored)",
|
||||
},
|
||||
},
|
||||
longhorn: &fakeLonghornClient{},
|
||||
metrics: newTelemetry(),
|
||||
jobUsage: map[string]resticJobUsageCacheEntry{},
|
||||
}
|
||||
srv.handler = http.HandlerFunc(srv.route)
|
||||
|
||||
@ -353,6 +406,12 @@ func TestResticInventoryUsesCompletedBackupJobs(t *testing.T) {
|
||||
if entry.CompletedBackups != 1 || entry.BackupCount != 1 {
|
||||
t.Fatalf("expected one completed backup, got %#v", entry)
|
||||
}
|
||||
if entry.LastBackupSizeBytes <= 0 {
|
||||
t.Fatalf("expected restic stored bytes from job logs, got %#v", entry.LastBackupSizeBytes)
|
||||
}
|
||||
if entry.TotalBackupSizeBytes <= 0 {
|
||||
t.Fatalf("expected restic total stored bytes from job logs, got %#v", entry.TotalBackupSizeBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResticInventoryMarksInProgressWhenOnlyActiveJobsExist(t *testing.T) {
|
||||
@ -463,6 +522,66 @@ func TestResticBackupsEndpointReturnsLatestSelector(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResticRestoreUsesRepositorySelector(t *testing.T) {
|
||||
repository := "s3:https://example.invalid/atlas-soteria/isolated/apps/data"
|
||||
kube := &fakeKubeClient{}
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
AuthRequired: false,
|
||||
BackupDriver: "restic",
|
||||
},
|
||||
client: kube,
|
||||
longhorn: &fakeLonghornClient{},
|
||||
metrics: newTelemetry(),
|
||||
}
|
||||
srv.handler = http.HandlerFunc(srv.route)
|
||||
|
||||
body := fmt.Sprintf(
|
||||
`{"namespace":"apps","pvc":"data","backup_url":"%s","target_namespace":"apps","target_pvc":"restore-data","dry_run":false}`,
|
||||
encodeResticSelector(repository),
|
||||
)
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/restores", strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
res := httptest.NewRecorder()
|
||||
srv.Handler().ServeHTTP(res, req)
|
||||
|
||||
if res.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String())
|
||||
}
|
||||
if kube.lastRestoreReq.Repository != repository {
|
||||
t.Fatalf("expected repository selector %q, got %#v", repository, kube.lastRestoreReq.Repository)
|
||||
}
|
||||
if kube.lastRestoreReq.Snapshot != "latest" {
|
||||
t.Fatalf("expected latest snapshot selector, got %#v", kube.lastRestoreReq.Snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseResticStoredBytesFromTextSummary(t *testing.T) {
|
||||
logBody := `
|
||||
Files: 100 new, 10 changed, 900 unmodified
|
||||
Added to the repository: 120.500 MiB (35.250 MiB stored)
|
||||
snapshot 12345678 saved
|
||||
`
|
||||
value, ok := parseResticStoredBytes(logBody)
|
||||
if !ok {
|
||||
t.Fatalf("expected to parse stored bytes from text summary")
|
||||
}
|
||||
if value <= 0 {
|
||||
t.Fatalf("expected positive parsed value, got %f", value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseResticStoredBytesFromJSONSummary(t *testing.T) {
|
||||
logBody := `{"message_type":"summary","files_new":1,"data_added":2048}`
|
||||
value, ok := parseResticStoredBytes(logBody)
|
||||
if !ok {
|
||||
t.Fatalf("expected to parse stored bytes from json summary")
|
||||
}
|
||||
if value != 2048 {
|
||||
t.Fatalf("expected 2048 bytes, got %f", value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsStayPublic(t *testing.T) {
|
||||
srv := &Server{
|
||||
cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin"}},
|
||||
@ -550,6 +669,9 @@ func TestPoliciesCRUD(t *testing.T) {
|
||||
if created.Namespace != "apps" || created.IntervalHours != 6 {
|
||||
t.Fatalf("unexpected created policy: %#v", created)
|
||||
}
|
||||
if !created.Dedupe {
|
||||
t.Fatalf("expected policy dedupe default true, got %#v", created)
|
||||
}
|
||||
|
||||
listReq := httptest.NewRequest(http.MethodGet, "/v1/policies", nil)
|
||||
listRes := httptest.NewRecorder()
|
||||
@ -573,6 +695,36 @@ func TestPoliciesCRUD(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadPoliciesDefaultsDedupeEnabledWhenMissing(t *testing.T) {
|
||||
kube := &fakeKubeClient{
|
||||
secretData: map[string][]byte{
|
||||
policySecretKey: []byte(`{"policies":[{"namespace":"apps","pvc":"data","interval_hours":12,"enabled":true}]}`),
|
||||
},
|
||||
}
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
AuthRequired: false,
|
||||
BackupDriver: "restic",
|
||||
Namespace: "maintenance",
|
||||
PolicySecretName: "soteria-policies",
|
||||
},
|
||||
client: kube,
|
||||
longhorn: &fakeLonghornClient{},
|
||||
metrics: newTelemetry(),
|
||||
policies: map[string]api.BackupPolicy{},
|
||||
}
|
||||
if err := srv.loadPolicies(context.Background()); err != nil {
|
||||
t.Fatalf("load policies: %v", err)
|
||||
}
|
||||
policies := srv.listPolicies()
|
||||
if len(policies) != 1 {
|
||||
t.Fatalf("expected one policy, got %#v", policies)
|
||||
}
|
||||
if !policies[0].Dedupe {
|
||||
t.Fatalf("expected dedupe to default true for legacy policy, got %#v", policies[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNamespaceBackupDryRun(t *testing.T) {
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
@ -610,6 +762,46 @@ func TestNamespaceBackupDryRun(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNamespaceBackupUsesDedupeFlag(t *testing.T) {
|
||||
kube := &fakeKubeClient{
|
||||
pvcs: []k8s.PVCSummary{
|
||||
{Namespace: "apps", Name: "data-a", VolumeName: "pv-apps-a", Phase: "Bound"},
|
||||
},
|
||||
}
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
AuthRequired: false,
|
||||
BackupDriver: "restic",
|
||||
},
|
||||
client: kube,
|
||||
longhorn: &fakeLonghornClient{},
|
||||
metrics: newTelemetry(),
|
||||
policies: map[string]api.BackupPolicy{},
|
||||
}
|
||||
srv.handler = http.HandlerFunc(srv.route)
|
||||
|
||||
body := `{"namespace":"apps","dry_run":false,"dedupe":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/backup/namespace", strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
res := httptest.NewRecorder()
|
||||
srv.Handler().ServeHTTP(res, req)
|
||||
|
||||
if res.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String())
|
||||
}
|
||||
|
||||
var payload api.NamespaceBackupResponse
|
||||
if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil {
|
||||
t.Fatalf("decode payload: %v", err)
|
||||
}
|
||||
if payload.Dedupe {
|
||||
t.Fatalf("expected response dedupe false, got %#v", payload)
|
||||
}
|
||||
if kube.lastBackupReq.Dedupe == nil || *kube.lastBackupReq.Dedupe {
|
||||
t.Fatalf("expected backup request dedupe false, got %#v", kube.lastBackupReq.Dedupe)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNamespaceRestoreDryRun(t *testing.T) {
|
||||
srv := &Server{
|
||||
cfg: &config.Config{
|
||||
|
||||
@ -67,6 +67,7 @@ interface BackupPolicy {
|
||||
pvc?: string;
|
||||
interval_hours: number;
|
||||
enabled: boolean;
|
||||
dedupe?: boolean;
|
||||
created_at?: string;
|
||||
updated_at?: string;
|
||||
}
|
||||
@ -268,6 +269,8 @@ function App() {
|
||||
const [policyPVC, setPolicyPVC] = useState<string>('');
|
||||
const [policyIntervalHours, setPolicyIntervalHours] = useState<number>(24);
|
||||
const [policyEnabled, setPolicyEnabled] = useState<boolean>(true);
|
||||
const [policyDedupe, setPolicyDedupe] = useState<boolean>(true);
|
||||
const [manualDedupe, setManualDedupe] = useState<boolean>(true);
|
||||
|
||||
const [lastAction, setLastAction] = useState<string>('No action yet.');
|
||||
const [busy, setBusy] = useState<boolean>(false);
|
||||
@ -401,7 +404,7 @@ function App() {
|
||||
const payload = await fetchJSON<unknown>('/v1/backup', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ namespace, pvc, dry_run: false })
|
||||
body: JSON.stringify({ namespace, pvc, dry_run: false, dedupe: manualDedupe })
|
||||
});
|
||||
writeAction(payload);
|
||||
await Promise.all([loadInventory(), loadB2Usage()]);
|
||||
@ -418,7 +421,7 @@ function App() {
|
||||
const payload = await fetchJSON<unknown>('/v1/backup/namespace', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ namespace, dry_run: false })
|
||||
body: JSON.stringify({ namespace, dry_run: false, dedupe: manualDedupe })
|
||||
});
|
||||
writeAction(payload);
|
||||
await Promise.all([loadInventory(), loadB2Usage()]);
|
||||
@ -530,7 +533,8 @@ function App() {
|
||||
namespace: policyNamespace,
|
||||
pvc: policyPVC,
|
||||
interval_hours: policyIntervalHours,
|
||||
enabled: policyEnabled
|
||||
enabled: policyEnabled,
|
||||
dedupe: policyDedupe
|
||||
})
|
||||
});
|
||||
writeAction(payload);
|
||||
@ -591,6 +595,11 @@ function App() {
|
||||
<h2>PVC Inventory</h2>
|
||||
<span className="subtle">{inventory?.generated_at ? `Updated ${formatTimestamp(inventory.generated_at)}` : 'No inventory yet'}</span>
|
||||
</div>
|
||||
<label className="checkbox-row">
|
||||
<input type="checkbox" checked={manualDedupe} onChange={(event) => setManualDedupe(event.target.checked)} />
|
||||
Dedupe unchanged blocks (default)
|
||||
</label>
|
||||
<p className="subtle tiny">This setting applies to both `Backup now` and `Backup namespace` actions.</p>
|
||||
{inventoryError && <p className="error">{inventoryError}</p>}
|
||||
{!inventory && !inventoryError && <p className="subtle">Loading inventory...</p>}
|
||||
{inventory?.namespaces.map((namespace) => (
|
||||
@ -613,8 +622,10 @@ function App() {
|
||||
const progressPct = Math.max(0, Math.min(100, Number(pvc.last_job_progress_pct || 0)));
|
||||
const progressClass = progressChipClass(pvc.last_job_state);
|
||||
const showProgress = Boolean(pvc.last_job_name) || (pvc.active_backups || 0) > 0;
|
||||
const latestSizeLabel = pvc.driver === 'restic' ? 'n/a' : formatBytes(pvc.last_backup_size_bytes);
|
||||
const totalStoredLabel = pvc.driver === 'restic' ? 'n/a' : formatBytes(pvc.total_backup_size_bytes);
|
||||
const latestSizeLabel = formatBytes(pvc.last_backup_size_bytes);
|
||||
const totalStoredLabel = formatBytes(pvc.total_backup_size_bytes);
|
||||
const showResticSizeHint = pvc.driver === 'restic'
|
||||
&& (pvc.last_backup_size_bytes === undefined || pvc.total_backup_size_bytes === undefined);
|
||||
|
||||
return (
|
||||
<article key={`${pvc.namespace}/${pvc.pvc}`} className="pvc-card">
|
||||
@ -633,8 +644,8 @@ function App() {
|
||||
<p className="subtle tiny">
|
||||
Backups: {pvc.completed_backups}/{pvc.backup_count} completed | Latest size: {latestSizeLabel} | Total stored: {totalStoredLabel}
|
||||
</p>
|
||||
{pvc.driver === 'restic' && (
|
||||
<p className="subtle tiny">Per-PVC size is not currently emitted for restic snapshots because repository storage is deduplicated and shared.</p>
|
||||
{showResticSizeHint && (
|
||||
<p className="subtle tiny">Per-PVC upload bytes are estimated from retained restic backup job logs; older jobs outside retention may show n/a.</p>
|
||||
)}
|
||||
{showProgress && (
|
||||
<div className="backup-progress">
|
||||
@ -809,7 +820,7 @@ function App() {
|
||||
|
||||
<section className="panel scroll-panel">
|
||||
<h2>Backup Policies</h2>
|
||||
<p className="subtle tiny">Policy backups create new restic snapshots, but unchanged blocks are deduplicated, so repeated runs do not re-upload identical data.</p>
|
||||
<p className="subtle tiny">Policy backups create new restic snapshots. With dedupe on, unchanged blocks are reused in the shared repository. With dedupe off, Soteria isolates each PVC to its own repository path.</p>
|
||||
<div className="stack">
|
||||
<label>
|
||||
Namespace
|
||||
@ -834,6 +845,10 @@ function App() {
|
||||
<input type="checkbox" checked={policyEnabled} onChange={(event) => setPolicyEnabled(event.target.checked)} />
|
||||
Enabled
|
||||
</label>
|
||||
<label className="checkbox-row">
|
||||
<input type="checkbox" checked={policyDedupe} onChange={(event) => setPolicyDedupe(event.target.checked)} />
|
||||
Dedupe unchanged blocks
|
||||
</label>
|
||||
<button type="button" onClick={() => void savePolicy()} disabled={busy || !policyNamespace}>Save policy</button>
|
||||
</div>
|
||||
|
||||
@ -846,7 +861,7 @@ function App() {
|
||||
<strong>{policy.namespace}/{policy.pvc || '*'}</strong>
|
||||
<span className={`chip ${policy.enabled ? 'good' : 'bad'}`}>{policy.enabled ? 'Enabled' : 'Disabled'}</span>
|
||||
</div>
|
||||
<p className="subtle tiny">Every {policy.interval_hours}h | Updated {formatTimestamp(policy.updated_at || policy.created_at)}</p>
|
||||
<p className="subtle tiny">Every {policy.interval_hours}h | Dedupe: {policy.dedupe === false ? 'off' : 'on'} | Updated {formatTimestamp(policy.updated_at || policy.created_at)}</p>
|
||||
<div className="actions">
|
||||
<button
|
||||
type="button"
|
||||
@ -856,6 +871,7 @@ function App() {
|
||||
setPolicyPVC(policy.pvc || '');
|
||||
setPolicyIntervalHours(policy.interval_hours);
|
||||
setPolicyEnabled(policy.enabled);
|
||||
setPolicyDedupe(policy.dedupe !== false);
|
||||
}}
|
||||
>
|
||||
Load
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user