From a5aa9e6a5f461c0a9e00c98e951bb82a1f236505 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 12 Apr 2026 14:32:39 -0300 Subject: [PATCH] backup: add policy scheduler and namespace bulk operations --- README.md | 55 ++ internal/api/types.go | 77 +++ internal/config/config.go | 15 + internal/k8s/state.go | 73 +++ internal/server/metrics.go | 41 +- internal/server/server.go | 929 +++++++++++++++++++++++++++------ internal/server/server_test.go | 151 ++++++ internal/server/ui.html | 210 +++++++- 8 files changed, 1392 insertions(+), 159 deletions(-) create mode 100644 internal/k8s/state.go diff --git a/README.md b/README.md index 827b86b..6ef2bd2 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,9 @@ Soteria is an in-cluster service for PVC backup and restore operations. The curr - Namespace-grouped PVC inventory for backup and restore selection. - On-demand backup creation for Longhorn volumes. +- Namespace-wide backup and restore batch execution. - Restore into a new target PVC with conflict checks and best-effort cleanup on failure. +- Policy-based scheduled backups (per PVC or all PVCs in a namespace), persisted in-cluster. - A simple built-in UI suitable for publishing behind an authenticated ingress. - Prometheus-format backup freshness telemetry for Grafana rollups. @@ -25,8 +27,13 @@ Protected endpoints when `SOTERIA_AUTH_REQUIRED=true`: - `GET /v1/inventory` - `GET /v1/backups?namespace=&pvc=` - `POST /v1/backup` +- `POST /v1/backup/namespace` - `POST /v1/restores` +- `POST /v1/restores/namespace` - `POST /v1/restore-test` legacy alias for `/v1/restores` +- `GET /v1/policies` +- `POST /v1/policies` +- `DELETE /v1/policies/` ## API examples @@ -114,6 +121,48 @@ Notes: - If Longhorn volume creation succeeds but PVC creation fails, Soteria attempts to delete the just-created restore volume. - You may provide `backup_url` directly instead of `snapshot`. +### POST /v1/backup/namespace + +```json +{ + "namespace": "ai", + "dry_run": false +} +``` + +Runs backup for every currently bound PVC in the namespace and returns a per-PVC result list. + +### POST /v1/restores/namespace + +```json +{ + "namespace": "ai", + "target_namespace": "ai-restore", + "target_prefix": "restore-20260412-", + "snapshot": "", + "dry_run": true +} +``` + +Runs restore planning/execution for every bound PVC in the source namespace. `snapshot` is optional and blank means latest completed backup per PVC. + +### Policy API + +Create or update a policy: + +```json +POST /v1/policies +{ + "namespace": "ai", + "pvc": "llm-cache", + "interval_hours": 6, + "enabled": true +} +``` + +- Leave `pvc` empty to target all PVCs in that namespace. +- Policies are stored in secret `SOTERIA_POLICY_SECRET_NAME` under key `policies.json`. + ## Authentication and authorization When `SOTERIA_AUTH_REQUIRED=true`, Soteria expects trusted auth headers from a fronting proxy such as `oauth2-proxy`: @@ -137,6 +186,9 @@ Implemented metrics: - `soteria_backup_requests_total{driver,result}` - `soteria_restore_requests_total{driver,result}` +- `soteria_policy_backups_total{result}` +- `soteria_namespace_backup_requests_total{driver,result}` +- `soteria_namespace_restore_requests_total{driver,result}` - `soteria_authz_denials_total{reason}` - `soteria_inventory_refresh_failures_total` - `soteria_inventory_refresh_timestamp_seconds` @@ -171,6 +223,8 @@ Environment variables: - `SOTERIA_AUTH_BEARER_TOKENS` optional comma-separated bearer tokens - `SOTERIA_BACKUP_MAX_AGE_HOURS` default `24` - `SOTERIA_METRICS_REFRESH_SECONDS` default `300` +- `SOTERIA_POLICY_EVAL_SECONDS` default `300` +- `SOTERIA_POLICY_SECRET_NAME` default `soteria-policies` ## Secrets @@ -199,5 +253,6 @@ The example Service is annotated for Prometheus scraping of `/metrics`. ## Notes - Longhorn inventory and metrics are based on discovered backup records per PVC. +- Scheduled policy execution currently applies to Longhorn driver. - Restic backup and restore execution exists, but inventory-style telemetry is currently Longhorn-focused. - For Atlas production, place Soteria behind an authenticated ingress and trust only proxy-injected auth headers. diff --git a/internal/api/types.go b/internal/api/types.go index a842a09..92ead47 100644 --- a/internal/api/types.go +++ b/internal/api/types.go @@ -92,3 +92,80 @@ type AuthInfoResponse struct { Email string `json:"email,omitempty"` Groups []string `json:"groups,omitempty"` } + +type BackupPolicy struct { + ID string `json:"id"` + Namespace string `json:"namespace"` + PVC string `json:"pvc,omitempty"` + IntervalHours float64 `json:"interval_hours"` + Enabled bool `json:"enabled"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +type BackupPolicyUpsertRequest struct { + Namespace string `json:"namespace"` + PVC string `json:"pvc,omitempty"` + IntervalHours float64 `json:"interval_hours"` + Enabled *bool `json:"enabled,omitempty"` +} + +type BackupPolicyListResponse struct { + Policies []BackupPolicy `json:"policies"` +} + +type NamespaceBackupRequest struct { + Namespace string `json:"namespace"` + DryRun bool `json:"dry_run"` +} + +type NamespaceBackupResult struct { + Namespace string `json:"namespace"` + PVC string `json:"pvc"` + Status string `json:"status"` + Volume string `json:"volume,omitempty"` + Backup string `json:"backup,omitempty"` + Error string `json:"error,omitempty"` +} + +type NamespaceBackupResponse struct { + Namespace string `json:"namespace"` + RequestedBy string `json:"requested_by,omitempty"` + Driver string `json:"driver"` + DryRun bool `json:"dry_run"` + Total int `json:"total"` + Succeeded int `json:"succeeded"` + Failed int `json:"failed"` + Results []NamespaceBackupResult `json:"results"` +} + +type NamespaceRestoreRequest struct { + Namespace string `json:"namespace"` + TargetNamespace string `json:"target_namespace,omitempty"` + TargetPrefix string `json:"target_prefix,omitempty"` + Snapshot string `json:"snapshot,omitempty"` + DryRun bool `json:"dry_run"` +} + +type NamespaceRestoreResult struct { + Namespace string `json:"namespace"` + PVC string `json:"pvc"` + TargetNamespace string `json:"target_namespace"` + TargetPVC string `json:"target_pvc"` + Status string `json:"status"` + Volume string `json:"volume,omitempty"` + BackupURL string `json:"backup_url,omitempty"` + Error string `json:"error,omitempty"` +} + +type NamespaceRestoreResponse struct { + Namespace string `json:"namespace"` + TargetNamespace string `json:"target_namespace"` + RequestedBy string `json:"requested_by,omitempty"` + Driver string `json:"driver"` + DryRun bool `json:"dry_run"` + Total int `json:"total"` + Succeeded int `json:"succeeded"` + Failed int `json:"failed"` + Results []NamespaceRestoreResult `json:"results"` +} diff --git a/internal/config/config.go b/internal/config/config.go index ee12718..63137f8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,7 +18,9 @@ const ( defaultLonghornMode = "incremental" defaultAllowedGroups = "admin,maintenance" defaultMetricsRefresh = 300 * time.Second + defaultPolicyEval = 300 * time.Second defaultBackupMaxAge = 24 * time.Hour + defaultPolicySecret = "soteria-policies" serviceNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) @@ -43,6 +45,8 @@ type Config struct { AllowedGroups []string AuthBearerTokens []string MetricsRefreshInterval time.Duration + PolicyEvalInterval time.Duration + PolicySecretName string BackupMaxAge time.Duration } @@ -80,7 +84,9 @@ func Load() (*Config, error) { cfg.AllowedGroups = parseCSV(getenvDefault("SOTERIA_ALLOWED_GROUPS", defaultAllowedGroups)) cfg.AuthBearerTokens = parseCSV(getenv("SOTERIA_AUTH_BEARER_TOKENS")) cfg.MetricsRefreshInterval = defaultMetricsRefresh + cfg.PolicyEvalInterval = defaultPolicyEval cfg.BackupMaxAge = defaultBackupMaxAge + cfg.PolicySecretName = getenvDefault("SOTERIA_POLICY_SECRET_NAME", defaultPolicySecret) if ttl, ok := getenvInt("SOTERIA_JOB_TTL_SECONDS"); ok { cfg.JobTTLSeconds = int32(ttl) @@ -91,6 +97,9 @@ func Load() (*Config, error) { if seconds, ok := getenvInt("SOTERIA_METRICS_REFRESH_SECONDS"); ok { cfg.MetricsRefreshInterval = time.Duration(seconds) * time.Second } + if seconds, ok := getenvInt("SOTERIA_POLICY_EVAL_SECONDS"); ok { + cfg.PolicyEvalInterval = time.Duration(seconds) * time.Second + } if hours, ok := getenvFloat("SOTERIA_BACKUP_MAX_AGE_HOURS"); ok { cfg.BackupMaxAge = time.Duration(hours * float64(time.Hour)) } @@ -130,6 +139,12 @@ func Load() (*Config, error) { if cfg.MetricsRefreshInterval <= 0 { return nil, errors.New("SOTERIA_METRICS_REFRESH_SECONDS must be greater than zero") } + if cfg.PolicyEvalInterval <= 0 { + return nil, errors.New("SOTERIA_POLICY_EVAL_SECONDS must be greater than zero") + } + if strings.TrimSpace(cfg.PolicySecretName) == "" { + return nil, errors.New("SOTERIA_POLICY_SECRET_NAME must not be empty") + } if cfg.BackupMaxAge <= 0 { return nil, errors.New("SOTERIA_BACKUP_MAX_AGE_HOURS must be greater than zero") } diff --git a/internal/k8s/state.go b/internal/k8s/state.go new file mode 100644 index 0000000..316afbe --- /dev/null +++ b/internal/k8s/state.go @@ -0,0 +1,73 @@ +package k8s + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (c *Client) LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) { + secret, err := c.Clientset.CoreV1().Secrets(namespace).Get(ctx, secretName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get secret %s/%s: %w", namespace, secretName, err) + } + if secret.Data == nil { + return nil, nil + } + value, ok := secret.Data[key] + if !ok || len(value) == 0 { + return nil, nil + } + out := make([]byte, len(value)) + copy(out, value) + return out, nil +} + +func (c *Client) SaveSecretData(ctx context.Context, namespace, secretName, key string, value []byte, labels map[string]string) error { + secretClient := c.Clientset.CoreV1().Secrets(namespace) + secret, err := secretClient.Get(ctx, secretName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("get secret %s/%s: %w", namespace, secretName, err) + } + secret = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: namespace, + Labels: map[string]string{}, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{}, + } + } + + if secret.Data == nil { + secret.Data = map[string][]byte{} + } + secret.Data[key] = value + + if secret.Labels == nil { + secret.Labels = map[string]string{} + } + for labelKey, labelValue := range labels { + secret.Labels[labelKey] = labelValue + } + + if secret.ResourceVersion == "" { + if _, err := secretClient.Create(ctx, secret, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create secret %s/%s: %w", namespace, secretName, err) + } + return nil + } + + if _, err := secretClient.Update(ctx, secret, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update secret %s/%s: %w", namespace, secretName, err) + } + return nil +} diff --git a/internal/server/metrics.go b/internal/server/metrics.go index 47a7b64..2c53f14 100644 --- a/internal/server/metrics.go +++ b/internal/server/metrics.go @@ -20,6 +20,9 @@ type telemetry struct { mu sync.RWMutex backupRequests map[string]metricSample restoreRequests map[string]metricSample + policyBackups map[string]metricSample + namespaceBackupRequests map[string]metricSample + namespaceRestoreReqs map[string]metricSample authzDenials map[string]metricSample inventoryRefreshFailure float64 inventoryRefreshTime float64 @@ -31,13 +34,16 @@ type telemetry struct { func newTelemetry() *telemetry { return &telemetry{ - backupRequests: map[string]metricSample{}, - restoreRequests: map[string]metricSample{}, - authzDenials: map[string]metricSample{}, - pvcBackupAgeHours: map[string]metricSample{}, - pvcBackupHealth: map[string]metricSample{}, - pvcBackupLastSuccess: map[string]metricSample{}, - pvcBackupCount: map[string]metricSample{}, + backupRequests: map[string]metricSample{}, + restoreRequests: map[string]metricSample{}, + policyBackups: map[string]metricSample{}, + namespaceBackupRequests: map[string]metricSample{}, + namespaceRestoreReqs: map[string]metricSample{}, + authzDenials: map[string]metricSample{}, + pvcBackupAgeHours: map[string]metricSample{}, + pvcBackupHealth: map[string]metricSample{}, + pvcBackupLastSuccess: map[string]metricSample{}, + pvcBackupCount: map[string]metricSample{}, } } @@ -60,6 +66,24 @@ func (t *telemetry) RecordRestoreRequest(driver, result string) { incMetric(t.restoreRequests, map[string]string{"driver": driver, "result": result}) } +func (t *telemetry) RecordPolicyBackup(result string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.policyBackups, map[string]string{"result": result}) +} + +func (t *telemetry) RecordNamespaceBackupRequest(driver, result string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.namespaceBackupRequests, map[string]string{"driver": driver, "result": result}) +} + +func (t *telemetry) RecordNamespaceRestoreRequest(driver, result string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.namespaceRestoreReqs, map[string]string{"driver": driver, "result": result}) +} + func (t *telemetry) RecordAuthzDenied(reason string) { t.mu.Lock() defer t.mu.Unlock() @@ -115,6 +139,9 @@ func (t *telemetry) render() string { var b strings.Builder writeMetricFamily(&b, "soteria_backup_requests_total", "counter", "Backup requests handled by Soteria.", metricValues(t.backupRequests)) writeMetricFamily(&b, "soteria_restore_requests_total", "counter", "Restore requests handled by Soteria.", metricValues(t.restoreRequests)) + writeMetricFamily(&b, "soteria_policy_backups_total", "counter", "Policy scheduler backup execution outcomes.", metricValues(t.policyBackups)) + writeMetricFamily(&b, "soteria_namespace_backup_requests_total", "counter", "Namespace-level backup request outcomes.", metricValues(t.namespaceBackupRequests)) + writeMetricFamily(&b, "soteria_namespace_restore_requests_total", "counter", "Namespace-level restore request outcomes.", metricValues(t.namespaceRestoreReqs)) writeMetricFamily(&b, "soteria_authz_denials_total", "counter", "Authorization denials emitted by Soteria.", metricValues(t.authzDenials)) writeMetricFamily(&b, "soteria_inventory_refresh_failures_total", "counter", "Inventory refresh failures while computing PVC backup telemetry.", []metricSample{{value: t.inventoryRefreshFailure}}) writeMetricFamily(&b, "soteria_inventory_refresh_timestamp_seconds", "gauge", "Unix timestamp of the last successful inventory refresh.", []metricSample{{value: t.inventoryRefreshTime}}) diff --git a/internal/server/server.go b/internal/server/server.go index 3444a11..a13b952 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -7,8 +7,10 @@ import ( "log" "math" "net/http" + "net/url" "sort" "strings" + "sync" "time" "scm.bstein.dev/bstein/soteria/internal/api" @@ -26,6 +28,8 @@ type kubeClient interface { CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) + LoadSecretData(ctx context.Context, namespace, secretName, key string) ([]byte, error) + SaveSecretData(ctx context.Context, namespace, secretName, key string, value []byte, labels map[string]string) error } type longhornClient interface { @@ -44,6 +48,10 @@ type Server struct { longhorn longhornClient metrics *telemetry handler http.Handler + policyMu sync.RWMutex + policies map[string]api.BackupPolicy + runMu sync.Mutex + running bool } type authIdentity struct { @@ -57,29 +65,45 @@ type ctxKey string const authContextKey ctxKey = "soteria-auth" +const ( + policySecretKey = "policies.json" + defaultPolicyHours = 24.0 + maxPolicyIntervalHrs = 24 * 365 +) + func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, client: client, longhorn: lh, metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, } s.handler = http.HandlerFunc(s.route) return s } func (s *Server) Start(ctx context.Context) { - s.refreshTelemetry(ctx) + if err := s.loadPolicies(ctx); err != nil { + log.Printf("policy load failed: %v", err) + } - ticker := time.NewTicker(s.cfg.MetricsRefreshInterval) + s.refreshTelemetry(ctx) + s.runPolicyCycle(ctx) + + metricsTicker := time.NewTicker(s.cfg.MetricsRefreshInterval) + policyTicker := time.NewTicker(s.cfg.PolicyEvalInterval) go func() { - defer ticker.Stop() + defer metricsTicker.Stop() + defer policyTicker.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-metricsTicker.C: s.refreshTelemetry(ctx) + case <-policyTicker.C: + s.runPolicyCycle(ctx) } } }() @@ -122,9 +146,19 @@ func (s *Server) route(w http.ResponseWriter, r *http.Request) { s.handleBackups(w, r) case "/v1/backup": s.handleBackup(w, r) + case "/v1/backup/namespace": + s.handleNamespaceBackup(w, r) case "/v1/restores", "/v1/restore-test": s.handleRestore(w, r) + case "/v1/restores/namespace": + s.handleNamespaceRestore(w, r) + case "/v1/policies": + s.handlePolicies(w, r) default: + if strings.HasPrefix(r.URL.Path, "/v1/policies/") { + s.handlePolicyByID(w, r) + return + } writeError(w, http.StatusNotFound, "not found") } } @@ -224,76 +258,17 @@ func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "namespace and pvc are required") return } - + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) requester := currentRequester(r.Context()) - switch s.cfg.BackupDriver { - case "longhorn": - volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) - if err != nil { - s.metrics.RecordBackupRequest("longhorn", "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - backupName := backupName("backup", req.Namespace+"-"+req.PVC) - if req.DryRun { - s.metrics.RecordBackupRequest("longhorn", "dry_run") - writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "longhorn", - Volume: volumeName, - Backup: backupName, - Namespace: req.Namespace, - RequestedBy: requester, - DryRun: true, - }) - return - } - - labels := map[string]string{ - "soteria.bstein.dev/namespace": req.Namespace, - "soteria.bstein.dev/pvc": req.PVC, - "soteria.bstein.dev/requested-by": requester, - } - if _, err := s.longhorn.SnapshotBackup(r.Context(), volumeName, backupName, labels, s.cfg.LonghornBackupMode); err != nil { - s.metrics.RecordBackupRequest("longhorn", "backend_error") - writeError(w, http.StatusBadGateway, err.Error()) - return - } - - s.metrics.RecordBackupRequest("longhorn", "success") - writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "longhorn", - Volume: volumeName, - Backup: backupName, - Namespace: req.Namespace, - RequestedBy: requester, - DryRun: false, - }) - case "restic": - jobName, secretName, err := s.client.CreateBackupJob(r.Context(), s.cfg, req) - if err != nil { - s.metrics.RecordBackupRequest("restic", "backend_error") - writeError(w, http.StatusBadRequest, err.Error()) - return - } - if req.DryRun { - s.metrics.RecordBackupRequest("restic", "dry_run") - } else { - s.metrics.RecordBackupRequest("restic", "success") - } - writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "restic", - JobName: jobName, - Namespace: req.Namespace, - Secret: secretName, - RequestedBy: requester, - DryRun: req.DryRun, - }) - default: - s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "unsupported_driver") - writeError(w, http.StatusBadRequest, "unsupported backup driver") + response, result, err := s.executeBackup(r.Context(), req, requester) + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) + if err != nil { + writeError(w, backupStatusCode(result), err.Error()) + return } + writeJSON(w, http.StatusOK, response) } func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { @@ -359,90 +334,325 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { } requester := currentRequester(r.Context()) + response, result, err := s.executeRestore(r.Context(), req, requester) + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, result) + if err != nil { + writeError(w, restoreStatusCode(result), err.Error()) + return + } + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.NamespaceBackupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + req.Namespace = strings.TrimSpace(req.Namespace) + if req.Namespace == "" { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if err := validateKubernetesName("namespace", req.Namespace); err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) + if err != nil { + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) + return + } + + requester := currentRequester(r.Context()) + response := api.NamespaceBackupResponse{ + Namespace: req.Namespace, + RequestedBy: requester, + Driver: s.cfg.BackupDriver, + DryRun: req.DryRun, + Results: make([]api.NamespaceBackupResult, 0, len(pvcs)), + } + + for _, pvc := range pvcs { + backupReq := api.BackupRequest{ + Namespace: req.Namespace, + PVC: pvc.Name, + DryRun: req.DryRun, + } + result, status, execErr := s.executeBackup(r.Context(), backupReq, requester) + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status) + + item := api.NamespaceBackupResult{ + Namespace: req.Namespace, + PVC: pvc.Name, + Status: status, + Volume: result.Volume, + Backup: result.Backup, + } + if execErr != nil { + item.Error = execErr.Error() + response.Failed++ + } else { + response.Succeeded++ + } + response.Results = append(response.Results, item) + } + + response.Total = len(response.Results) + s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) handleNamespaceRestore(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.NamespaceRestoreRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "invalid_json") + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + + req.Namespace = strings.TrimSpace(req.Namespace) + req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) + req.TargetPrefix = strings.TrimSpace(req.TargetPrefix) + req.Snapshot = strings.TrimSpace(req.Snapshot) + + if req.Namespace == "" { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if req.TargetNamespace == "" { + req.TargetNamespace = req.Namespace + } + if err := validateKubernetesName("namespace", req.Namespace); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace) + if err != nil { + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) + return + } + + requester := currentRequester(r.Context()) + response := api.NamespaceRestoreResponse{ + Namespace: req.Namespace, + TargetNamespace: req.TargetNamespace, + RequestedBy: requester, + Driver: s.cfg.BackupDriver, + DryRun: req.DryRun, + Results: make([]api.NamespaceRestoreResult, 0, len(pvcs)), + } + + for _, pvc := range pvcs { + targetPVC := targetPVCName(req.TargetPrefix, pvc.Name) + restoreReq := api.RestoreTestRequest{ + Namespace: req.Namespace, + PVC: pvc.Name, + Snapshot: req.Snapshot, + TargetNamespace: req.TargetNamespace, + TargetPVC: targetPVC, + DryRun: req.DryRun, + } + result, status, execErr := s.executeRestore(r.Context(), restoreReq, requester) + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, status) + + item := api.NamespaceRestoreResult{ + Namespace: req.Namespace, + PVC: pvc.Name, + TargetNamespace: req.TargetNamespace, + TargetPVC: targetPVC, + Status: status, + Volume: result.Volume, + BackupURL: result.BackupURL, + } + if execErr != nil { + item.Error = execErr.Error() + response.Failed++ + } else { + response.Succeeded++ + } + response.Results = append(response.Results, item) + } + + response.Total = len(response.Results) + s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed)) + writeJSON(w, http.StatusOK, response) +} + +func (s *Server) handlePolicies(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + writeJSON(w, http.StatusOK, api.BackupPolicyListResponse{Policies: s.listPolicies()}) + case http.MethodPost: + r.Body = http.MaxBytesReader(w, r.Body, 1<<20) + var req api.BackupPolicyUpsertRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) + return + } + policy, err := s.upsertPolicy(r.Context(), req) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + writeJSON(w, http.StatusOK, policy) + default: + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + } +} + +func (s *Server) handlePolicyByID(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + encodedID := strings.TrimPrefix(r.URL.Path, "/v1/policies/") + if encodedID == "" { + writeError(w, http.StatusBadRequest, "policy id is required") + return + } + id, err := url.PathUnescape(encodedID) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid policy id") + return + } + removed, err := s.deletePolicy(r.Context(), id) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + if !removed { + writeError(w, http.StatusNotFound, "policy not found") + return + } + writeJSON(w, http.StatusOK, map[string]string{"deleted": id}) +} + +func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, requester string) (api.BackupResponse, string, error) { + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + if req.Namespace == "" || req.PVC == "" { + return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required") + } switch s.cfg.BackupDriver { case "longhorn": - exists, err := s.client.PersistentVolumeClaimExists(r.Context(), req.TargetNamespace, req.TargetPVC) + volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) if err != nil { - s.metrics.RecordRestoreRequest("longhorn", "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return + return api.BackupResponse{}, "validation_error", err + } + + backupID := backupName("backup", req.Namespace+"-"+req.PVC) + response := api.BackupResponse{ + Driver: "longhorn", + Volume: volumeName, + Backup: backupID, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: req.DryRun, + } + if req.DryRun { + return response, "dry_run", nil + } + + labels := map[string]string{ + "soteria.bstein.dev/namespace": req.Namespace, + "soteria.bstein.dev/pvc": req.PVC, + "soteria.bstein.dev/requested-by": requester, + } + if _, err := s.longhorn.SnapshotBackup(ctx, volumeName, backupID, labels, s.cfg.LonghornBackupMode); err != nil { + return api.BackupResponse{}, "backend_error", err + } + return response, "success", nil + case "restic": + jobName, secretName, err := s.client.CreateBackupJob(ctx, s.cfg, req) + if err != nil { + return api.BackupResponse{}, "backend_error", err + } + result := "success" + if req.DryRun { + result = "dry_run" + } + return api.BackupResponse{ + Driver: "restic", + JobName: jobName, + Namespace: req.Namespace, + Secret: secretName, + RequestedBy: requester, + DryRun: req.DryRun, + }, result, nil + default: + return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") + } +} + +func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, requester string) (api.RestoreTestResponse, string, error) { + req.Namespace = strings.TrimSpace(req.Namespace) + req.PVC = strings.TrimSpace(req.PVC) + req.TargetNamespace = strings.TrimSpace(req.TargetNamespace) + req.TargetPVC = strings.TrimSpace(req.TargetPVC) + req.BackupURL = strings.TrimSpace(req.BackupURL) + req.Snapshot = strings.TrimSpace(req.Snapshot) + + if req.TargetNamespace == "" { + req.TargetNamespace = req.Namespace + } + + switch s.cfg.BackupDriver { + case "longhorn": + exists, err := s.client.PersistentVolumeClaimExists(ctx, req.TargetNamespace, req.TargetPVC) + if err != nil { + return api.RestoreTestResponse{}, "validation_error", err } if exists { - s.metrics.RecordRestoreRequest("longhorn", "conflict") - writeError(w, http.StatusConflict, fmt.Sprintf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC)) - return + return api.RestoreTestResponse{}, "conflict", fmt.Errorf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC) } - volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) + volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC) if err != nil { - s.metrics.RecordRestoreRequest("longhorn", "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return + return api.RestoreTestResponse{}, "validation_error", err } - backupURL := strings.TrimSpace(req.BackupURL) + backupURL := req.BackupURL if backupURL == "" { - backup, err := s.longhorn.FindBackup(r.Context(), volumeName, req.Snapshot) + backup, err := s.longhorn.FindBackup(ctx, volumeName, req.Snapshot) if err != nil { - s.metrics.RecordRestoreRequest("longhorn", "validation_error") - writeError(w, http.StatusBadRequest, err.Error()) - return + return api.RestoreTestResponse{}, "validation_error", err } - backupURL = backup.URL + backupURL = strings.TrimSpace(backup.URL) } if backupURL == "" { - s.metrics.RecordRestoreRequest("longhorn", "validation_error") - writeError(w, http.StatusBadRequest, "backup_url is required") - return + return api.RestoreTestResponse{}, "validation_error", fmt.Errorf("backup_url is required") } restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) - if req.DryRun { - s.metrics.RecordRestoreRequest("longhorn", "dry_run") - writeJSON(w, http.StatusOK, api.RestoreTestResponse{ - Driver: "longhorn", - Volume: restoreVolumeName, - TargetNamespace: req.TargetNamespace, - TargetPVC: req.TargetPVC, - BackupURL: backupURL, - Namespace: req.Namespace, - RequestedBy: requester, - DryRun: true, - }) - return - } - - sourceVolume, err := s.longhorn.GetVolume(r.Context(), volumeName) - if err != nil { - s.metrics.RecordRestoreRequest("longhorn", "backend_error") - writeError(w, http.StatusBadGateway, err.Error()) - return - } - replicas := sourceVolume.NumberOfReplicas - if replicas == 0 { - replicas = 2 - } - - if _, err := s.longhorn.CreateVolumeFromBackup(r.Context(), restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { - s.metrics.RecordRestoreRequest("longhorn", "backend_error") - writeError(w, http.StatusBadGateway, err.Error()) - return - } - if err := s.longhorn.CreatePVC(r.Context(), restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { - cleanupErr := s.longhorn.DeleteVolume(r.Context(), restoreVolumeName) - if cleanupErr != nil { - log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) - writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr)) - } else { - writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v", err)) - } - s.metrics.RecordRestoreRequest("longhorn", "backend_error") - return - } - - s.metrics.RecordRestoreRequest("longhorn", "success") - writeJSON(w, http.StatusOK, api.RestoreTestResponse{ + response := api.RestoreTestResponse{ Driver: "longhorn", Volume: restoreVolumeName, TargetNamespace: req.TargetNamespace, @@ -450,21 +660,43 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { BackupURL: backupURL, Namespace: req.Namespace, RequestedBy: requester, - DryRun: false, - }) - case "restic": - jobName, secretName, err := s.client.CreateRestoreJob(r.Context(), s.cfg, req) - if err != nil { - s.metrics.RecordRestoreRequest("restic", "backend_error") - writeError(w, http.StatusBadRequest, err.Error()) - return + DryRun: req.DryRun, } if req.DryRun { - s.metrics.RecordRestoreRequest("restic", "dry_run") - } else { - s.metrics.RecordRestoreRequest("restic", "success") + return response, "dry_run", nil } - writeJSON(w, http.StatusOK, api.RestoreTestResponse{ + + sourceVolume, err := s.longhorn.GetVolume(ctx, volumeName) + if err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + replicas := sourceVolume.NumberOfReplicas + if replicas == 0 { + replicas = 2 + } + + if _, err := s.longhorn.CreateVolumeFromBackup(ctx, restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + if err := s.longhorn.CreatePVC(ctx, restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { + cleanupErr := s.longhorn.DeleteVolume(ctx, restoreVolumeName) + if cleanupErr != nil { + log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) + return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr) + } + return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v", err) + } + return response, "success", nil + case "restic": + jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req) + if err != nil { + return api.RestoreTestResponse{}, "backend_error", err + } + result := "success" + if req.DryRun { + result = "dry_run" + } + return api.RestoreTestResponse{ Driver: "restic", JobName: jobName, Namespace: req.Namespace, @@ -473,13 +705,87 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { Secret: secretName, RequestedBy: requester, DryRun: req.DryRun, - }) + }, result, nil default: - s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "unsupported_driver") - writeError(w, http.StatusBadRequest, "unsupported backup driver") + return api.RestoreTestResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver") } } +func (s *Server) listNamespaceBoundPVCs(ctx context.Context, namespace string) ([]k8s.PVCSummary, error) { + items, err := s.client.ListBoundPVCs(ctx) + if err != nil { + return nil, err + } + filtered := make([]k8s.PVCSummary, 0, len(items)) + for _, item := range items { + if item.Namespace == namespace { + filtered = append(filtered, item) + } + } + return filtered, nil +} + +func backupStatusCode(result string) int { + switch result { + case "validation_error", "unsupported_driver": + return http.StatusBadRequest + case "backend_error": + return http.StatusBadGateway + default: + return http.StatusInternalServerError + } +} + +func restoreStatusCode(result string) int { + switch result { + case "validation_error", "unsupported_driver": + return http.StatusBadRequest + case "conflict": + return http.StatusConflict + case "backend_error": + return http.StatusBadGateway + default: + return http.StatusInternalServerError + } +} + +func namespaceResultStatus(dryRun bool, total, succeeded, failed int) string { + if dryRun { + return "dry_run" + } + if total == 0 { + return "empty" + } + if failed == 0 { + return "success" + } + if succeeded == 0 { + return "failed" + } + return "partial" +} + +func targetPVCName(prefix, sourcePVC string) string { + prefix = sanitizeName(prefix) + if prefix == "" { + prefix = "restore" + } + if !strings.HasSuffix(prefix, "-") { + prefix += "-" + } + name := sanitizeName(prefix + sourcePVC) + if name == "" { + name = "restore" + } + if len(name) > 63 { + name = strings.Trim(name[:63], "-") + } + if name == "" { + name = "restore" + } + return name +} + func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { pvcs, err := s.client.ListBoundPVCs(ctx) if err != nil { @@ -578,6 +884,327 @@ func (s *Server) refreshTelemetry(ctx context.Context) { s.metrics.RecordInventory(inventory) } +func (s *Server) runPolicyCycle(ctx context.Context) { + if !s.beginRun() { + return + } + defer s.endRun() + + if s.cfg.BackupDriver != "longhorn" { + return + } + + policies := s.activePolicies() + if len(policies) == 0 { + return + } + + runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) + defer cancel() + + inventory, err := s.buildInventory(runCtx) + if err != nil { + log.Printf("policy cycle inventory failed: %v", err) + s.metrics.RecordPolicyBackup("inventory_error") + return + } + + pvcMap := make(map[string]api.PVCInventory) + namespaceMap := make(map[string][]api.PVCInventory) + for _, group := range inventory.Namespaces { + for _, pvc := range group.PVCs { + key := pvc.Namespace + "/" + pvc.PVC + pvcMap[key] = pvc + namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc) + } + } + + effectiveIntervals := map[string]float64{} + for _, policy := range policies { + matches := []api.PVCInventory{} + if policy.PVC != "" { + if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok { + matches = append(matches, pvc) + } + } else { + matches = append(matches, namespaceMap[policy.Namespace]...) + } + + for _, pvc := range matches { + key := pvc.Namespace + "/" + pvc.PVC + current, exists := effectiveIntervals[key] + if !exists || policy.IntervalHours < current { + effectiveIntervals[key] = policy.IntervalHours + } + } + } + + for key, intervalHours := range effectiveIntervals { + pvc, ok := pvcMap[key] + if !ok { + continue + } + if !backupDue(pvc.LastBackupAt, intervalHours) { + s.metrics.RecordPolicyBackup("not_due") + continue + } + + _, result, err := s.executeBackup(runCtx, api.BackupRequest{ + Namespace: pvc.Namespace, + PVC: pvc.PVC, + DryRun: false, + }, "policy-scheduler") + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result) + if err != nil { + s.metrics.RecordPolicyBackup(result) + log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err) + continue + } + s.metrics.RecordPolicyBackup("success") + } +} + +func (s *Server) beginRun() bool { + s.runMu.Lock() + defer s.runMu.Unlock() + if s.running { + return false + } + s.running = true + return true +} + +func (s *Server) endRun() { + s.runMu.Lock() + defer s.runMu.Unlock() + s.running = false +} + +func (s *Server) loadPolicies(ctx context.Context) error { + raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey) + if err != nil { + return err + } + if len(raw) == 0 { + return nil + } + + var doc struct { + Policies []api.BackupPolicy `json:"policies"` + } + if err := json.Unmarshal(raw, &doc); err != nil { + return fmt.Errorf("decode policy document: %w", err) + } + + next := map[string]api.BackupPolicy{} + now := time.Now().UTC().Format(time.RFC3339) + for _, policy := range doc.Policies { + namespace := strings.TrimSpace(policy.Namespace) + pvc := strings.TrimSpace(policy.PVC) + if namespace == "" { + continue + } + interval := policy.IntervalHours + if interval <= 0 { + interval = defaultPolicyHours + } + id := policyKey(namespace, pvc) + createdAt := policy.CreatedAt + if createdAt == "" { + createdAt = now + } + updatedAt := policy.UpdatedAt + if updatedAt == "" { + updatedAt = createdAt + } + next[id] = api.BackupPolicy{ + ID: id, + Namespace: namespace, + PVC: pvc, + IntervalHours: interval, + Enabled: policy.Enabled, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + } + } + + s.policyMu.Lock() + s.policies = next + s.policyMu.Unlock() + return nil +} + +func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error { + doc := struct { + Policies []api.BackupPolicy `json:"policies"` + }{ + Policies: policies, + } + payload, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("encode policy document: %w", err) + } + return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{ + "app.kubernetes.io/name": "soteria", + "app.kubernetes.io/component": "policy-store", + }) +} + +func (s *Server) listPolicies() []api.BackupPolicy { + s.policyMu.RLock() + defer s.policyMu.RUnlock() + policies := make([]api.BackupPolicy, 0, len(s.policies)) + for _, policy := range s.policies { + policies = append(policies, policy) + } + sort.Slice(policies, func(i, j int) bool { + if policies[i].Namespace != policies[j].Namespace { + return policies[i].Namespace < policies[j].Namespace + } + if policies[i].PVC != policies[j].PVC { + return policies[i].PVC < policies[j].PVC + } + return policies[i].ID < policies[j].ID + }) + return policies +} + +func (s *Server) activePolicies() []api.BackupPolicy { + policies := s.listPolicies() + filtered := make([]api.BackupPolicy, 0, len(policies)) + for _, policy := range policies { + if policy.Enabled { + filtered = append(filtered, policy) + } + } + return filtered +} + +func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) { + namespace := strings.TrimSpace(req.Namespace) + pvc := strings.TrimSpace(req.PVC) + if namespace == "" { + return api.BackupPolicy{}, fmt.Errorf("namespace is required") + } + if err := validateKubernetesName("namespace", namespace); err != nil { + return api.BackupPolicy{}, err + } + if pvc != "" { + if err := validateKubernetesName("pvc", pvc); err != nil { + return api.BackupPolicy{}, err + } + } + + interval := req.IntervalHours + if interval <= 0 { + interval = defaultPolicyHours + } + if interval > maxPolicyIntervalHrs { + return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs) + } + enabled := true + if req.Enabled != nil { + enabled = *req.Enabled + } + + id := policyKey(namespace, pvc) + now := time.Now().UTC().Format(time.RFC3339) + + s.policyMu.Lock() + before := clonePolicyMap(s.policies) + createdAt := now + if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" { + createdAt = existing.CreatedAt + } + policy := api.BackupPolicy{ + ID: id, + Namespace: namespace, + PVC: pvc, + IntervalHours: interval, + Enabled: enabled, + CreatedAt: createdAt, + UpdatedAt: now, + } + s.policies[id] = policy + snapshot := policySliceFromMap(s.policies) + s.policyMu.Unlock() + + if err := s.persistPolicies(ctx, snapshot); err != nil { + s.policyMu.Lock() + s.policies = before + s.policyMu.Unlock() + return api.BackupPolicy{}, err + } + return policy, nil +} + +func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) { + id = strings.TrimSpace(id) + if id == "" { + return false, nil + } + + s.policyMu.Lock() + if _, ok := s.policies[id]; !ok { + s.policyMu.Unlock() + return false, nil + } + before := clonePolicyMap(s.policies) + delete(s.policies, id) + snapshot := policySliceFromMap(s.policies) + s.policyMu.Unlock() + + if err := s.persistPolicies(ctx, snapshot); err != nil { + s.policyMu.Lock() + s.policies = before + s.policyMu.Unlock() + return false, err + } + return true, nil +} + +func policyKey(namespace, pvc string) string { + scope := strings.TrimSpace(pvc) + if scope == "" { + scope = "_all" + } + return strings.TrimSpace(namespace) + "__" + scope +} + +func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy { + out := make([]api.BackupPolicy, 0, len(source)) + for _, policy := range source { + out = append(out, policy) + } + sort.Slice(out, func(i, j int) bool { + return out[i].ID < out[j].ID + }) + return out +} + +func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy { + cloned := make(map[string]api.BackupPolicy, len(source)) + for key, value := range source { + cloned[key] = value + } + return cloned +} + +func backupDue(lastBackupAt string, intervalHours float64) bool { + if intervalHours <= 0 { + intervalHours = defaultPolicyHours + } + if strings.TrimSpace(lastBackupAt) == "" { + return true + } + timestamp, ok := parseBackupTime(lastBackupAt) + if !ok { + return true + } + interval := time.Duration(intervalHours * float64(time.Hour)) + return time.Since(timestamp) >= interval +} + func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { if !s.cfg.AuthRequired { return authIdentity{}, http.StatusOK, nil diff --git a/internal/server/server_test.go b/internal/server/server_test.go index b41b2b8..3f60ff8 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -20,6 +21,7 @@ import ( type fakeKubeClient struct { pvcs []k8s.PVCSummary targetExists bool + secretData map[string][]byte } func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { @@ -42,6 +44,29 @@ func (f *fakeKubeClient) PersistentVolumeClaimExists(_ context.Context, _, _ str return f.targetExists, nil } +func (f *fakeKubeClient) LoadSecretData(_ context.Context, _, _, key string) ([]byte, error) { + if f.secretData == nil { + return nil, nil + } + value, ok := f.secretData[key] + if !ok { + return nil, nil + } + copyValue := make([]byte, len(value)) + copy(copyValue, value) + return copyValue, nil +} + +func (f *fakeKubeClient) SaveSecretData(_ context.Context, _, _, key string, value []byte, _ map[string]string) error { + if f.secretData == nil { + f.secretData = map[string][]byte{} + } + copyValue := make([]byte, len(value)) + copy(copyValue, value) + f.secretData[key] = copyValue + return nil +} + type fakeLonghornClient struct { backups []longhorn.Backup } @@ -217,3 +242,129 @@ func TestMetricsStayPublic(t *testing.T) { t.Fatalf("expected prometheus metrics body, got %q", res.Body.String()) } } + +func TestPoliciesCRUD(t *testing.T) { + srv := &Server{ + cfg: &config.Config{AuthRequired: false, BackupDriver: "longhorn", Namespace: "maintenance", PolicySecretName: "soteria-policies"}, + client: &fakeKubeClient{}, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, + } + srv.handler = http.HandlerFunc(srv.route) + + create := httptest.NewRequest(http.MethodPost, "/v1/policies", strings.NewReader(`{"namespace":"apps","interval_hours":6}`)) + create.Header.Set("Content-Type", "application/json") + createRes := httptest.NewRecorder() + srv.Handler().ServeHTTP(createRes, create) + if createRes.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", createRes.Code, createRes.Body.String()) + } + + var created api.BackupPolicy + if err := json.Unmarshal(createRes.Body.Bytes(), &created); err != nil { + t.Fatalf("decode policy: %v", err) + } + if created.Namespace != "apps" || created.IntervalHours != 6 { + t.Fatalf("unexpected created policy: %#v", created) + } + + listReq := httptest.NewRequest(http.MethodGet, "/v1/policies", nil) + listRes := httptest.NewRecorder() + srv.Handler().ServeHTTP(listRes, listReq) + if listRes.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", listRes.Code, listRes.Body.String()) + } + var listPayload api.BackupPolicyListResponse + if err := json.Unmarshal(listRes.Body.Bytes(), &listPayload); err != nil { + t.Fatalf("decode list: %v", err) + } + if len(listPayload.Policies) != 1 { + t.Fatalf("expected one policy, got %#v", listPayload.Policies) + } + + deleteReq := httptest.NewRequest(http.MethodDelete, "/v1/policies/"+url.PathEscape(created.ID), nil) + deleteRes := httptest.NewRecorder() + srv.Handler().ServeHTTP(deleteRes, deleteReq) + if deleteRes.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", deleteRes.Code, deleteRes.Body.String()) + } +} + +func TestNamespaceBackupDryRun(t *testing.T) { + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "longhorn", + }, + client: &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "data-a", VolumeName: "pv-apps-a", Phase: "Bound"}, + {Namespace: "apps", Name: "data-b", VolumeName: "pv-apps-b", Phase: "Bound"}, + {Namespace: "infra", Name: "data-c", VolumeName: "pv-infra-c", Phase: "Bound"}, + }, + }, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, + } + srv.handler = http.HandlerFunc(srv.route) + + body := `{"namespace":"apps","dry_run":true}` + req := httptest.NewRequest(http.MethodPost, "/v1/backup/namespace", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + var payload api.NamespaceBackupResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode payload: %v", err) + } + if payload.Total != 2 || payload.Succeeded != 2 || payload.Failed != 0 { + t.Fatalf("unexpected namespace backup payload: %#v", payload) + } +} + +func TestNamespaceRestoreDryRun(t *testing.T) { + srv := &Server{ + cfg: &config.Config{ + AuthRequired: false, + BackupDriver: "longhorn", + }, + client: &fakeKubeClient{ + pvcs: []k8s.PVCSummary{ + {Namespace: "apps", Name: "cache", VolumeName: "pv-apps-cache", Phase: "Bound"}, + {Namespace: "apps", Name: "models", VolumeName: "pv-apps-models", Phase: "Bound"}, + }, + }, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + policies: map[string]api.BackupPolicy{}, + } + srv.handler = http.HandlerFunc(srv.route) + + body := `{"namespace":"apps","target_namespace":"restore","target_prefix":"drill","dry_run":true}` + req := httptest.NewRequest(http.MethodPost, "/v1/restores/namespace", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + var payload api.NamespaceRestoreResponse + if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode payload: %v", err) + } + if payload.Total != 2 || payload.Succeeded != 2 || payload.Failed != 0 { + t.Fatalf("unexpected namespace restore payload: %#v", payload) + } + for _, result := range payload.Results { + if !strings.HasPrefix(result.TargetPVC, "drill-") { + t.Fatalf("expected target pvc with prefix drill-, got %q", result.TargetPVC) + } + } +} diff --git a/internal/server/ui.html b/internal/server/ui.html index e451843..e2824b9 100644 --- a/internal/server/ui.html +++ b/internal/server/ui.html @@ -129,6 +129,16 @@ } .muted { color: var(--muted); } .error { color: var(--bad); } + .policy-item { + border: 1px solid var(--line); + border-radius: 12px; + padding: 10px 12px; + background: rgba(255,255,255,0.65); + } + .mono { + font-family: "SFMono-Regular", Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; + font-size: 0.88rem; + } @media (max-width: 900px) { main { grid-template-columns: 1fr; } h1 { font-size: 1.7rem; } @@ -167,6 +177,23 @@

Last Action

No action yet.
+
+

Backup Policies

+
+ + + + + + +
+
+

Loading policies...

+
+