refactor(server): split API concerns and retire LOC waivers

This commit is contained in:
codex 2026-04-20 17:24:58 -03:00
parent e3863b9109
commit 0932cae7c3
11 changed files with 2184 additions and 1946 deletions

View File

@ -0,0 +1,119 @@
package server
import (
"context"
"fmt"
"net/http"
"strings"
)
func (s *Server) authorize(r *http.Request) (authIdentity, int, error) {
if !s.cfg.AuthRequired {
return authIdentity{}, http.StatusOK, nil
}
authorization := strings.TrimSpace(r.Header.Get("Authorization"))
if strings.HasPrefix(strings.ToLower(authorization), "bearer ") {
token := strings.TrimSpace(authorization[7:])
for _, expected := range s.cfg.AuthBearerTokens {
if token != "" && token == expected {
return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil
}
}
}
identity := authIdentity{
Authenticated: true,
User: firstHeader(r, "X-Auth-Request-User", "X-Forwarded-User"),
Email: firstHeader(r, "X-Auth-Request-Email", "X-Forwarded-Email"),
Groups: normalizeGroups(splitGroups(firstHeader(r, "X-Auth-Request-Groups", "X-Forwarded-Groups"))),
}
if identity.User == "" && identity.Email == "" {
return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required")
}
if len(s.cfg.AllowedGroups) == 0 {
return identity, http.StatusOK, nil
}
if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) {
return identity, http.StatusOK, nil
}
return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", "))
}
func requesterFromContext(ctx context.Context) authIdentity {
identity, _ := ctx.Value(authContextKey).(authIdentity)
return identity
}
func currentRequester(ctx context.Context) string {
identity := requesterFromContext(ctx)
if identity.User != "" {
return identity.User
}
if identity.Email != "" {
return identity.Email
}
if identity.Authenticated {
return "authenticated"
}
return "anonymous"
}
func authzReason(status int, err error) string {
if err == nil {
return "unknown"
}
switch status {
case http.StatusUnauthorized:
return "unauthenticated"
case http.StatusForbidden:
return "forbidden_group"
default:
return "error"
}
}
func hasAllowedGroup(actual, allowed []string) bool {
allowedSet := make(map[string]struct{}, len(allowed))
for _, group := range normalizeGroups(allowed) {
allowedSet[group] = struct{}{}
}
for _, group := range normalizeGroups(actual) {
if _, ok := allowedSet[group]; ok {
return true
}
}
return false
}
func normalizeGroups(values []string) []string {
groups := make([]string, 0, len(values))
for _, value := range values {
value = strings.TrimSpace(strings.TrimPrefix(value, "/"))
if value == "" {
continue
}
groups = append(groups, value)
}
return groups
}
func firstHeader(r *http.Request, names ...string) string {
for _, name := range names {
value := strings.TrimSpace(r.Header.Get(name))
if value != "" {
return value
}
}
return ""
}
func splitGroups(raw string) []string {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
return strings.FieldsFunc(raw, func(r rune) bool {
return r == ',' || r == ';'
})
}

View File

@ -0,0 +1,303 @@
package server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/k8s"
)
func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
inventory, err := s.buildInventory(r.Context())
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, inventory)
}
func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
namespace := strings.TrimSpace(r.URL.Query().Get("namespace"))
pvcName := strings.TrimSpace(r.URL.Query().Get("pvc"))
if namespace == "" || pvcName == "" {
writeError(w, http.StatusBadRequest, "namespace and pvc are required")
return
}
volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
switch s.cfg.BackupDriver {
case "longhorn":
backups, err := s.longhorn.ListBackups(r.Context(), volumeName)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, api.BackupListResponse{
Namespace: namespace,
PVC: pvcName,
Volume: volumeName,
Backups: buildBackupRecords(backups),
})
case "restic":
jobs, err := s.client.ListBackupJobsForPVC(r.Context(), namespace, pvcName)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
records := s.buildResticBackupRecords(r.Context(), namespace, jobs, s.cfg.ResticRepository)
writeJSON(w, http.StatusOK, api.BackupListResponse{
Namespace: namespace,
PVC: pvcName,
Volume: volumeName,
Backups: records,
})
default:
writeError(w, http.StatusBadRequest, "unsupported backup driver")
}
}
func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
var req api.BackupRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json")
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err))
return
}
if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" {
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "namespace and pvc are required")
return
}
if err := validateKeepLast(req.KeepLast); err != nil {
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
req.Namespace = strings.TrimSpace(req.Namespace)
req.PVC = strings.TrimSpace(req.PVC)
requester := currentRequester(r.Context())
response, result, err := s.executeBackup(r.Context(), req, requester)
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result)
if err != nil {
writeError(w, backupStatusCode(result), err.Error())
return
}
writeJSON(w, http.StatusOK, response)
}
func (s *Server) handleNamespaceBackup(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
var req api.NamespaceBackupRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "invalid_json")
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err))
return
}
req.Namespace = strings.TrimSpace(req.Namespace)
if req.Namespace == "" {
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "namespace is required")
return
}
if err := validateKeepLast(req.KeepLast); err != nil {
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if err := validateKubernetesName("namespace", req.Namespace); err != nil {
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace)
if err != nil {
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, "backend_error")
writeError(w, http.StatusBadGateway, err.Error())
return
}
requester := currentRequester(r.Context())
resolvedDedupe := dedupeDefault(req.Dedupe)
resolvedKeepLast := keepLastDefault(req.KeepLast)
response := api.NamespaceBackupResponse{
Namespace: req.Namespace,
RequestedBy: requester,
Driver: s.cfg.BackupDriver,
DryRun: req.DryRun,
Dedupe: resolvedDedupe,
KeepLast: resolvedKeepLast,
Results: make([]api.NamespaceBackupResult, 0, len(pvcs)),
}
for _, pvc := range pvcs {
backupReq := api.BackupRequest{
Namespace: req.Namespace,
PVC: pvc.Name,
DryRun: req.DryRun,
Dedupe: boolPtr(resolvedDedupe),
KeepLast: intPtr(resolvedKeepLast),
}
result, status, execErr := s.executeBackup(r.Context(), backupReq, requester)
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, status)
item := api.NamespaceBackupResult{
Namespace: req.Namespace,
PVC: pvc.Name,
Status: status,
Volume: result.Volume,
Backup: result.Backup,
}
if execErr != nil {
item.Error = execErr.Error()
response.Failed++
} else {
response.Succeeded++
}
response.Results = append(response.Results, item)
}
response.Total = len(response.Results)
s.metrics.RecordNamespaceBackupRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed))
writeJSON(w, http.StatusOK, response)
}
func (s *Server) executeBackup(ctx context.Context, req api.BackupRequest, requester string) (api.BackupResponse, string, error) {
req.Namespace = strings.TrimSpace(req.Namespace)
req.PVC = strings.TrimSpace(req.PVC)
if req.Namespace == "" || req.PVC == "" {
return api.BackupResponse{}, "validation_error", fmt.Errorf("namespace and pvc are required")
}
resolvedDedupe := dedupeDefault(req.Dedupe)
resolvedKeepLast := keepLastDefault(req.KeepLast)
req.Dedupe = boolPtr(resolvedDedupe)
req.KeepLast = intPtr(resolvedKeepLast)
switch s.cfg.BackupDriver {
case "longhorn":
volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC)
if err != nil {
return api.BackupResponse{}, "validation_error", err
}
backupID := backupName("backup", req.Namespace+"-"+req.PVC)
response := api.BackupResponse{
Driver: "longhorn",
Volume: volumeName,
Backup: backupID,
Namespace: req.Namespace,
RequestedBy: requester,
DryRun: req.DryRun,
Dedupe: resolvedDedupe,
KeepLast: resolvedKeepLast,
}
if req.DryRun {
return response, "dry_run", nil
}
labels := map[string]string{
"soteria.bstein.dev/namespace": req.Namespace,
"soteria.bstein.dev/pvc": req.PVC,
"soteria.bstein.dev/requested-by": requester,
}
if err := s.longhorn.CreateSnapshot(ctx, volumeName, backupID, labels); err != nil {
return api.BackupResponse{}, "backend_error", err
}
if _, err := s.longhorn.SnapshotBackup(ctx, volumeName, backupID, labels, s.cfg.LonghornBackupMode); err != nil {
return api.BackupResponse{}, "backend_error", err
}
return response, "success", nil
case "restic":
jobName, secretName, err := s.client.CreateBackupJob(ctx, s.cfg, req)
if err != nil {
return api.BackupResponse{}, "backend_error", err
}
result := "success"
if req.DryRun {
result = "dry_run"
}
return api.BackupResponse{
Driver: "restic",
JobName: jobName,
Namespace: req.Namespace,
Secret: secretName,
RequestedBy: requester,
DryRun: req.DryRun,
Dedupe: resolvedDedupe,
KeepLast: resolvedKeepLast,
}, result, nil
default:
return api.BackupResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver")
}
}
func (s *Server) listNamespaceBoundPVCs(ctx context.Context, namespace string) ([]k8s.PVCSummary, error) {
items, err := s.client.ListBoundPVCs(ctx)
if err != nil {
return nil, err
}
filtered := make([]k8s.PVCSummary, 0, len(items))
for _, item := range items {
if item.Namespace == namespace {
filtered = append(filtered, item)
}
}
return filtered, nil
}
func backupStatusCode(result string) int {
switch result {
case "validation_error", "unsupported_driver":
return http.StatusBadRequest
case "backend_error":
return http.StatusBadGateway
default:
return http.StatusInternalServerError
}
}
func namespaceResultStatus(dryRun bool, total, succeeded, failed int) string {
if dryRun {
return "dry_run"
}
if total == 0 {
return "empty"
}
if failed == 0 {
return "success"
}
if succeeded == 0 {
return "failed"
}
return "partial"
}

View File

@ -0,0 +1,266 @@
package server
import (
"context"
"log"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/k8s"
)
func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) {
pvcs, err := s.client.ListBoundPVCs(ctx)
if err != nil {
return api.InventoryResponse{}, err
}
resticJobsByPVC, resticLookupErrors := s.prefetchResticBackupJobs(ctx, pvcs)
groups := make(map[string][]api.PVCInventory)
for _, summary := range pvcs {
entry := api.PVCInventory{
Namespace: summary.Namespace,
PVC: summary.Name,
Volume: summary.VolumeName,
Phase: summary.Phase,
StorageClass: summary.StorageClass,
Capacity: summary.Capacity,
AccessModes: summary.AccessModes,
Driver: s.cfg.BackupDriver,
}
s.enrichPVCInventory(ctx, &entry, resticJobsByPVC, resticLookupErrors)
groups[summary.Namespace] = append(groups[summary.Namespace], entry)
}
namespaceNames := make([]string, 0, len(groups))
for namespace := range groups {
namespaceNames = append(namespaceNames, namespace)
}
sort.Strings(namespaceNames)
response := api.InventoryResponse{
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)),
}
for _, namespace := range namespaceNames {
response.Namespaces = append(response.Namespaces, api.NamespaceInventory{
Name: namespace,
PVCs: groups[namespace],
})
}
return response, nil
}
func (s *Server) prefetchResticBackupJobs(ctx context.Context, pvcs []k8s.PVCSummary) (map[string][]k8s.BackupJobSummary, map[string]error) {
if s.cfg.BackupDriver != "restic" {
return nil, nil
}
namespaces := map[string]struct{}{}
for _, pvc := range pvcs {
namespaces[pvc.Namespace] = struct{}{}
}
namespaceNames := make([]string, 0, len(namespaces))
for namespace := range namespaces {
namespaceNames = append(namespaceNames, namespace)
}
sort.Strings(namespaceNames)
jobsByPVC := map[string][]k8s.BackupJobSummary{}
lookupErrors := map[string]error{}
for _, namespace := range namespaceNames {
jobs, err := s.client.ListBackupJobs(ctx, namespace)
if err != nil {
lookupErrors[namespace] = err
continue
}
for _, job := range jobs {
key := job.Namespace + "/" + job.PVC
jobsByPVC[key] = append(jobsByPVC[key], job)
}
}
for key := range jobsByPVC {
sortBackupJobsNewestFirst(jobsByPVC[key])
}
return jobsByPVC, lookupErrors
}
func (s *Server) enrichPVCInventory(
ctx context.Context,
entry *api.PVCInventory,
resticJobsByPVC map[string][]k8s.BackupJobSummary,
resticLookupErrors map[string]error,
) {
switch s.cfg.BackupDriver {
case "longhorn":
backups, err := s.longhorn.ListBackups(ctx, entry.Volume)
if err != nil {
entry.Healthy = false
entry.HealthReason = "lookup_failed"
entry.Error = err.Error()
return
}
entry.BackupCount = len(backups)
totalBackupSize := int64(0)
completedBackups := 0
for _, backup := range backups {
totalBackupSize += parseSizeBytes(backup.Size)
if strings.EqualFold(backup.State, "Completed") {
completedBackups++
}
}
entry.CompletedBackups = completedBackups
entry.TotalBackupSizeBytes = float64(totalBackupSize)
latest, latestTime, ok := latestCompletedBackup(backups)
if !ok {
entry.Healthy = false
if len(backups) == 0 {
entry.HealthReason = "missing"
} else {
entry.HealthReason = "no_completed"
}
return
}
entry.LastBackupAt = latest.Created
entry.LastBackupSizeBytes = float64(parseSizeBytes(latest.Size))
if latestTime.IsZero() {
entry.Healthy = false
entry.HealthReason = "unknown_timestamp"
return
}
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
if entry.Healthy {
entry.HealthReason = "fresh"
} else {
entry.HealthReason = "stale"
}
case "restic":
if err, hasErr := resticLookupErrors[entry.Namespace]; hasErr {
entry.Healthy = false
entry.HealthReason = "lookup_failed"
entry.Error = err.Error()
return
}
key := entry.Namespace + "/" + entry.PVC
jobs := resticJobsByPVC[key]
if jobs == nil {
jobs = []k8s.BackupJobSummary{}
}
entry.BackupCount = len(jobs)
if len(jobs) > 0 {
entry.LastJobName = jobs[0].Name
entry.LastJobState = jobs[0].State
entry.LastJobProgressPct = backupJobProgressPct(jobs[0].State)
if !jobs[0].CreatedAt.IsZero() {
entry.LastJobStartedAt = jobs[0].CreatedAt.UTC().Format(time.RFC3339)
}
}
completed := make([]k8s.BackupJobSummary, 0, len(jobs))
active := 0
for _, job := range jobs {
if backupJobInProgress(job.State) {
active++
}
if strings.EqualFold(job.State, "Completed") {
completed = append(completed, job)
}
}
entry.ActiveBackups = active
entry.CompletedBackups = len(completed)
sizeSamples := completed
if len(sizeSamples) > 0 {
retained := sizeSamples[0].KeepLast
if retained > 0 && retained < len(sizeSamples) {
sizeSamples = sizeSamples[:retained]
}
}
totalStoredBytes := 0.0
storedSamples := 0
for index, job := range sizeSamples {
if index >= maxUsageSampleJobs {
break
}
storedBytes, ok := s.lookupResticStoredBytesForJob(ctx, entry.Namespace, job.Name)
if !ok {
continue
}
if index == 0 {
entry.LastBackupSizeBytes = storedBytes
}
totalStoredBytes += storedBytes
storedSamples++
}
if storedSamples > 0 {
entry.TotalBackupSizeBytes = totalStoredBytes
}
if len(completed) == 0 {
entry.Healthy = false
switch {
case active > 0:
entry.HealthReason = "in_progress"
case len(jobs) == 0:
entry.HealthReason = "missing"
default:
entry.HealthReason = "no_completed"
}
return
}
latestTime := backupJobTimestamp(completed[0])
if latestTime.IsZero() {
entry.Healthy = false
entry.HealthReason = "unknown_timestamp"
return
}
entry.LastBackupAt = latestTime.UTC().Format(time.RFC3339)
entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours())
entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge
if entry.Healthy {
entry.HealthReason = "fresh"
} else {
entry.HealthReason = "stale"
}
default:
entry.Healthy = false
entry.HealthReason = "unsupported_driver"
}
}
func sortBackupJobsNewestFirst(items []k8s.BackupJobSummary) {
sort.Slice(items, func(i, j int) bool {
left := items[i].CompletionTime
if left.IsZero() {
left = items[i].CreatedAt
}
right := items[j].CompletionTime
if right.IsZero() {
right = items[j].CreatedAt
}
if left.Equal(right) {
return items[i].Name > items[j].Name
}
return left.After(right)
})
}
func (s *Server) refreshTelemetry(ctx context.Context) {
refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
inventory, err := s.buildInventory(refreshCtx)
if err != nil {
log.Printf("inventory refresh failed: %v", err)
s.metrics.RecordInventoryFailure()
return
}
s.metrics.RecordInventory(inventory)
}

View File

@ -0,0 +1,60 @@
package server
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"scm.bstein.dev/bstein/soteria/internal/api"
)
func (s *Server) handlePolicies(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
writeJSON(w, http.StatusOK, api.BackupPolicyListResponse{Policies: s.listPolicies()})
case http.MethodPost:
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
var req api.BackupPolicyUpsertRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err))
return
}
policy, err := s.upsertPolicy(r.Context(), req)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
writeJSON(w, http.StatusOK, policy)
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
}
}
func (s *Server) handlePolicyByID(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
encodedID := strings.TrimPrefix(r.URL.Path, "/v1/policies/")
if encodedID == "" {
writeError(w, http.StatusBadRequest, "policy id is required")
return
}
id, err := url.PathUnescape(encodedID)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid policy id")
return
}
removed, err := s.deletePolicy(r.Context(), id)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
if !removed {
writeError(w, http.StatusNotFound, "policy not found")
return
}
writeJSON(w, http.StatusOK, map[string]string{"deleted": id})
}

View File

@ -0,0 +1,379 @@
package server
import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
)
func (s *Server) runPolicyCycle(ctx context.Context) {
if !s.beginRun() {
return
}
defer s.endRun()
policies := s.activePolicies()
if len(policies) == 0 {
return
}
runCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
inventory, err := s.buildInventory(runCtx)
if err != nil {
log.Printf("policy cycle inventory failed: %v", err)
s.metrics.RecordPolicyBackup("inventory_error")
return
}
pvcMap := make(map[string]api.PVCInventory)
namespaceMap := make(map[string][]api.PVCInventory)
for _, group := range inventory.Namespaces {
for _, pvc := range group.PVCs {
key := pvc.Namespace + "/" + pvc.PVC
pvcMap[key] = pvc
namespaceMap[pvc.Namespace] = append(namespaceMap[pvc.Namespace], pvc)
}
}
type effectivePolicy struct {
IntervalHours float64
Dedupe bool
KeepLast int
}
effectivePolicies := map[string]effectivePolicy{}
for _, policy := range policies {
matches := []api.PVCInventory{}
if policy.PVC != "" {
if pvc, ok := pvcMap[policy.Namespace+"/"+policy.PVC]; ok {
matches = append(matches, pvc)
}
} else {
matches = append(matches, namespaceMap[policy.Namespace]...)
}
for _, pvc := range matches {
key := pvc.Namespace + "/" + pvc.PVC
current, exists := effectivePolicies[key]
if !exists || policy.IntervalHours < current.IntervalHours || (policy.IntervalHours == current.IntervalHours && keepLastStricter(policy.KeepLast, current.KeepLast)) {
effectivePolicies[key] = effectivePolicy{
IntervalHours: policy.IntervalHours,
Dedupe: policy.Dedupe,
KeepLast: policy.KeepLast,
}
}
}
}
for key, effective := range effectivePolicies {
pvc, ok := pvcMap[key]
if !ok {
continue
}
// Never enqueue a new policy backup while one is already active for this PVC.
// This prevents runaway job storms when a backup is stuck Pending/Running.
if pvc.ActiveBackups > 0 {
s.metrics.RecordPolicyBackup("in_progress")
continue
}
lastRunRef := strings.TrimSpace(pvc.LastBackupAt)
if lastRunRef == "" {
// If no successful backup exists yet, fall back to the most recent job start
// so failed attempts are still throttled by interval_hours.
lastRunRef = strings.TrimSpace(pvc.LastJobStartedAt)
}
if !backupDue(lastRunRef, effective.IntervalHours) {
s.metrics.RecordPolicyBackup("not_due")
continue
}
_, result, err := s.executeBackup(runCtx, api.BackupRequest{
Namespace: pvc.Namespace,
PVC: pvc.PVC,
DryRun: false,
Dedupe: boolPtr(effective.Dedupe),
KeepLast: intPtr(effective.KeepLast),
}, "policy-scheduler")
s.metrics.RecordBackupRequest(s.cfg.BackupDriver, result)
if err != nil {
s.metrics.RecordPolicyBackup(result)
log.Printf("policy backup failed for %s/%s: %v", pvc.Namespace, pvc.PVC, err)
continue
}
s.metrics.RecordPolicyBackup("success")
}
}
func (s *Server) beginRun() bool {
s.runMu.Lock()
defer s.runMu.Unlock()
if s.running {
return false
}
s.running = true
return true
}
func (s *Server) endRun() {
s.runMu.Lock()
defer s.runMu.Unlock()
s.running = false
}
func (s *Server) loadPolicies(ctx context.Context) error {
raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey)
if err != nil {
return err
}
if len(raw) == 0 {
return nil
}
var doc struct {
Policies []struct {
ID string `json:"id"`
Namespace string `json:"namespace"`
PVC string `json:"pvc,omitempty"`
IntervalHours float64 `json:"interval_hours"`
Enabled bool `json:"enabled"`
Dedupe *bool `json:"dedupe,omitempty"`
KeepLast *int `json:"keep_last,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
UpdatedAt string `json:"updated_at,omitempty"`
} `json:"policies"`
}
if err := json.Unmarshal(raw, &doc); err != nil {
return fmt.Errorf("decode policy document: %w", err)
}
next := map[string]api.BackupPolicy{}
now := time.Now().UTC().Format(time.RFC3339)
for _, policy := range doc.Policies {
namespace := strings.TrimSpace(policy.Namespace)
pvc := strings.TrimSpace(policy.PVC)
if namespace == "" {
continue
}
interval := policy.IntervalHours
if interval <= 0 {
interval = defaultPolicyHours
}
dedupe := true
if policy.Dedupe != nil {
dedupe = *policy.Dedupe
}
keepLast := keepLastDefault(policy.KeepLast)
id := policyKey(namespace, pvc)
createdAt := policy.CreatedAt
if createdAt == "" {
createdAt = now
}
updatedAt := policy.UpdatedAt
if updatedAt == "" {
updatedAt = createdAt
}
next[id] = api.BackupPolicy{
ID: id,
Namespace: namespace,
PVC: pvc,
IntervalHours: interval,
Enabled: policy.Enabled,
Dedupe: dedupe,
KeepLast: keepLast,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
}
s.policyMu.Lock()
s.policies = next
s.policyMu.Unlock()
return nil
}
func (s *Server) persistPolicies(ctx context.Context, policies []api.BackupPolicy) error {
doc := struct {
Policies []api.BackupPolicy `json:"policies"`
}{
Policies: policies,
}
payload, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("encode policy document: %w", err)
}
return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.PolicySecretName, policySecretKey, payload, map[string]string{
"app.kubernetes.io/name": "soteria",
"app.kubernetes.io/component": "policy-store",
})
}
func (s *Server) listPolicies() []api.BackupPolicy {
s.policyMu.RLock()
defer s.policyMu.RUnlock()
policies := make([]api.BackupPolicy, 0, len(s.policies))
for _, policy := range s.policies {
policies = append(policies, policy)
}
sort.Slice(policies, func(i, j int) bool {
if policies[i].Namespace != policies[j].Namespace {
return policies[i].Namespace < policies[j].Namespace
}
if policies[i].PVC != policies[j].PVC {
return policies[i].PVC < policies[j].PVC
}
return policies[i].ID < policies[j].ID
})
return policies
}
func (s *Server) activePolicies() []api.BackupPolicy {
policies := s.listPolicies()
filtered := make([]api.BackupPolicy, 0, len(policies))
for _, policy := range policies {
if policy.Enabled {
filtered = append(filtered, policy)
}
}
return filtered
}
func (s *Server) upsertPolicy(ctx context.Context, req api.BackupPolicyUpsertRequest) (api.BackupPolicy, error) {
namespace := strings.TrimSpace(req.Namespace)
pvc := strings.TrimSpace(req.PVC)
if namespace == "" {
return api.BackupPolicy{}, fmt.Errorf("namespace is required")
}
if err := validateKubernetesName("namespace", namespace); err != nil {
return api.BackupPolicy{}, err
}
if pvc != "" {
if err := validateKubernetesName("pvc", pvc); err != nil {
return api.BackupPolicy{}, err
}
}
interval := req.IntervalHours
if interval <= 0 {
interval = defaultPolicyHours
}
if interval > maxPolicyIntervalHrs {
return api.BackupPolicy{}, fmt.Errorf("interval_hours must be <= %d", maxPolicyIntervalHrs)
}
enabled := true
if req.Enabled != nil {
enabled = *req.Enabled
}
dedupe := dedupeDefault(req.Dedupe)
if err := validateKeepLast(req.KeepLast); err != nil {
return api.BackupPolicy{}, err
}
keepLast := keepLastDefault(req.KeepLast)
id := policyKey(namespace, pvc)
now := time.Now().UTC().Format(time.RFC3339)
s.policyMu.Lock()
before := clonePolicyMap(s.policies)
createdAt := now
if existing, ok := s.policies[id]; ok && existing.CreatedAt != "" {
createdAt = existing.CreatedAt
}
policy := api.BackupPolicy{
ID: id,
Namespace: namespace,
PVC: pvc,
IntervalHours: interval,
Enabled: enabled,
Dedupe: dedupe,
KeepLast: keepLast,
CreatedAt: createdAt,
UpdatedAt: now,
}
s.policies[id] = policy
snapshot := policySliceFromMap(s.policies)
s.policyMu.Unlock()
if err := s.persistPolicies(ctx, snapshot); err != nil {
s.policyMu.Lock()
s.policies = before
s.policyMu.Unlock()
return api.BackupPolicy{}, err
}
return policy, nil
}
func (s *Server) deletePolicy(ctx context.Context, id string) (bool, error) {
id = strings.TrimSpace(id)
if id == "" {
return false, nil
}
s.policyMu.Lock()
if _, ok := s.policies[id]; !ok {
s.policyMu.Unlock()
return false, nil
}
before := clonePolicyMap(s.policies)
delete(s.policies, id)
snapshot := policySliceFromMap(s.policies)
s.policyMu.Unlock()
if err := s.persistPolicies(ctx, snapshot); err != nil {
s.policyMu.Lock()
s.policies = before
s.policyMu.Unlock()
return false, err
}
return true, nil
}
func policyKey(namespace, pvc string) string {
scope := strings.TrimSpace(pvc)
if scope == "" {
scope = "_all"
}
return strings.TrimSpace(namespace) + "__" + scope
}
func policySliceFromMap(source map[string]api.BackupPolicy) []api.BackupPolicy {
out := make([]api.BackupPolicy, 0, len(source))
for _, policy := range source {
out = append(out, policy)
}
sort.Slice(out, func(i, j int) bool {
return out[i].ID < out[j].ID
})
return out
}
func clonePolicyMap(source map[string]api.BackupPolicy) map[string]api.BackupPolicy {
cloned := make(map[string]api.BackupPolicy, len(source))
for key, value := range source {
cloned[key] = value
}
return cloned
}
func backupDue(lastBackupAt string, intervalHours float64) bool {
if intervalHours <= 0 {
intervalHours = defaultPolicyHours
}
if strings.TrimSpace(lastBackupAt) == "" {
return true
}
timestamp, ok := parseBackupTime(lastBackupAt)
if !ok {
return true
}
interval := time.Duration(intervalHours * float64(time.Hour))
return time.Since(timestamp) >= interval
}

View File

@ -0,0 +1,259 @@
package server
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
)
var (
resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`)
resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`)
)
func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) {
key := namespace + "/" + jobName
s.jobUsageMu.RLock()
cached, ok := s.jobUsage[key]
s.jobUsageMu.RUnlock()
if ok && time.Since(cached.CheckedAt) < 15*time.Minute {
return cached.Bytes, cached.Known
}
if bytes, known := s.lookupPersistedResticUsage(key); known {
entry := resticJobUsageCacheEntry{
Known: true,
Bytes: bytes,
CheckedAt: time.Now().UTC(),
}
s.jobUsageMu.Lock()
if s.jobUsage == nil {
s.jobUsage = map[string]resticJobUsageCacheEntry{}
}
s.jobUsage[key] = entry
s.jobUsageMu.Unlock()
return bytes, true
}
logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName)
entry := resticJobUsageCacheEntry{
Known: false,
Bytes: 0,
CheckedAt: time.Now().UTC(),
}
if err == nil {
if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed {
entry.Known = true
entry.Bytes = parsedBytes
s.storePersistedResticUsage(ctx, key, parsedBytes)
}
}
s.jobUsageMu.Lock()
if s.jobUsage == nil {
s.jobUsage = map[string]resticJobUsageCacheEntry{}
}
s.jobUsage[key] = entry
s.jobUsageMu.Unlock()
return entry.Bytes, entry.Known
}
func parseResticStoredBytes(logBody string) (float64, bool) {
if logBody == "" {
return 0, false
}
matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1)
if len(matches) > 0 {
last := matches[len(matches)-1]
if len(last) > 1 {
if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil {
return value, true
}
}
}
textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1)
if len(textMatches) == 0 {
return 0, false
}
last := textMatches[len(textMatches)-1]
if len(last) < 2 {
return 0, false
}
return parseHumanByteSize(last[1])
}
func parseHumanByteSize(raw string) (float64, bool) {
parts := strings.Fields(strings.TrimSpace(raw))
if len(parts) < 2 {
return 0, false
}
value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64)
if err != nil {
return 0, false
}
unit := strings.ToUpper(strings.TrimSpace(parts[1]))
switch unit {
case "B":
return value, true
case "KIB":
return value * 1024, true
case "MIB":
return value * 1024 * 1024, true
case "GIB":
return value * 1024 * 1024 * 1024, true
case "TIB":
return value * 1024 * 1024 * 1024 * 1024, true
case "KB":
return value * 1000, true
case "MB":
return value * 1000 * 1000, true
case "GB":
return value * 1000 * 1000 * 1000, true
case "TB":
return value * 1000 * 1000 * 1000 * 1000, true
default:
return 0, false
}
}
func (s *Server) loadResticUsage(ctx context.Context) error {
if strings.TrimSpace(s.cfg.UsageSecretName) == "" {
return nil
}
raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey)
if err != nil {
return err
}
if len(raw) == 0 {
return nil
}
var doc resticPersistedUsageDocument
if err := json.Unmarshal(raw, &doc); err != nil {
return fmt.Errorf("decode restic usage document: %w", err)
}
next := map[string]resticPersistedUsageEntry{}
for _, item := range doc.Jobs {
key := strings.TrimSpace(item.Key)
if key == "" || item.Bytes < 0 || math.IsNaN(item.Bytes) || math.IsInf(item.Bytes, 0) {
continue
}
next[key] = resticPersistedUsageEntry{
Bytes: item.Bytes,
UpdatedAt: strings.TrimSpace(item.UpdatedAt),
}
}
s.usageMu.Lock()
s.usageStore = next
s.usageMu.Unlock()
return nil
}
func (s *Server) lookupPersistedResticUsage(key string) (float64, bool) {
s.usageMu.RLock()
defer s.usageMu.RUnlock()
if s.usageStore == nil {
return 0, false
}
entry, ok := s.usageStore[key]
if !ok {
return 0, false
}
if entry.Bytes < 0 || math.IsNaN(entry.Bytes) || math.IsInf(entry.Bytes, 0) {
return 0, false
}
return entry.Bytes, true
}
func (s *Server) storePersistedResticUsage(ctx context.Context, key string, value float64) {
if key == "" || value < 0 || math.IsNaN(value) || math.IsInf(value, 0) {
return
}
now := time.Now().UTC().Format(time.RFC3339)
changed := false
s.usageMu.Lock()
if s.usageStore == nil {
s.usageStore = map[string]resticPersistedUsageEntry{}
}
current, exists := s.usageStore[key]
if !exists || current.Bytes != value || strings.TrimSpace(current.UpdatedAt) == "" {
s.usageStore[key] = resticPersistedUsageEntry{
Bytes: value,
UpdatedAt: now,
}
changed = true
}
s.usageMu.Unlock()
if !changed {
return
}
if err := s.persistResticUsage(ctx); err != nil {
log.Printf("persist restic usage failed: %v", err)
}
}
func (s *Server) persistResticUsage(ctx context.Context) error {
if strings.TrimSpace(s.cfg.UsageSecretName) == "" {
return nil
}
s.usageMu.RLock()
entries := make([]struct {
Key string
Value resticPersistedUsageEntry
}, 0, len(s.usageStore))
for key, value := range s.usageStore {
entries = append(entries, struct {
Key string
Value resticPersistedUsageEntry
}{Key: key, Value: value})
}
s.usageMu.RUnlock()
sort.Slice(entries, func(i, j int) bool {
return entries[i].Key < entries[j].Key
})
doc := resticPersistedUsageDocument{
Jobs: make([]struct {
Key string `json:"key"`
Bytes float64 `json:"bytes"`
UpdatedAt string `json:"updated_at,omitempty"`
}, 0, len(entries)),
}
for _, entry := range entries {
if entry.Key == "" || entry.Value.Bytes < 0 || math.IsNaN(entry.Value.Bytes) || math.IsInf(entry.Value.Bytes, 0) {
continue
}
doc.Jobs = append(doc.Jobs, struct {
Key string `json:"key"`
Bytes float64 `json:"bytes"`
UpdatedAt string `json:"updated_at,omitempty"`
}{
Key: entry.Key,
Bytes: entry.Value.Bytes,
UpdatedAt: strings.TrimSpace(entry.Value.UpdatedAt),
})
}
payload, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("encode restic usage document: %w", err)
}
return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey, payload, map[string]string{
"app.kubernetes.io/name": "soteria",
"app.kubernetes.io/component": "usage-store",
})
}

View File

@ -0,0 +1,316 @@
package server
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"scm.bstein.dev/bstein/soteria/internal/api"
)
func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
var req api.RestoreTestRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json")
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err))
return
}
req.Namespace = strings.TrimSpace(req.Namespace)
req.PVC = strings.TrimSpace(req.PVC)
req.TargetPVC = strings.TrimSpace(req.TargetPVC)
req.TargetNamespace = strings.TrimSpace(req.TargetNamespace)
if req.Namespace == "" {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "namespace is required")
return
}
if req.PVC == "" {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "pvc is required")
return
}
if req.TargetPVC == "" {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "target_pvc is required")
return
}
if req.TargetNamespace == "" {
req.TargetNamespace = req.Namespace
}
if err := validateKubernetesName("namespace", req.Namespace); err != nil {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if err := validateKubernetesName("pvc", req.PVC); err != nil {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if err := validateKubernetesName("target_pvc", req.TargetPVC); err != nil {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if req.Namespace == req.TargetNamespace && req.PVC == req.TargetPVC {
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "conflict")
writeError(w, http.StatusConflict, "target namespace/pvc must differ from source")
return
}
requester := currentRequester(r.Context())
response, result, err := s.executeRestore(r.Context(), req, requester)
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, result)
if err != nil {
writeError(w, restoreStatusCode(result), err.Error())
return
}
writeJSON(w, http.StatusOK, response)
}
func (s *Server) handleNamespaceRestore(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
r.Body = http.MaxBytesReader(w, r.Body, 1<<20)
var req api.NamespaceRestoreRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "invalid_json")
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err))
return
}
req.Namespace = strings.TrimSpace(req.Namespace)
req.TargetNamespace = strings.TrimSpace(req.TargetNamespace)
req.TargetPrefix = strings.TrimSpace(req.TargetPrefix)
req.Snapshot = strings.TrimSpace(req.Snapshot)
if req.Namespace == "" {
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, "namespace is required")
return
}
if req.TargetNamespace == "" {
req.TargetNamespace = req.Namespace
}
if err := validateKubernetesName("namespace", req.Namespace); err != nil {
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
if err := validateKubernetesName("target_namespace", req.TargetNamespace); err != nil {
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "validation_error")
writeError(w, http.StatusBadRequest, err.Error())
return
}
pvcs, err := s.listNamespaceBoundPVCs(r.Context(), req.Namespace)
if err != nil {
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, "backend_error")
writeError(w, http.StatusBadGateway, err.Error())
return
}
requester := currentRequester(r.Context())
response := api.NamespaceRestoreResponse{
Namespace: req.Namespace,
TargetNamespace: req.TargetNamespace,
RequestedBy: requester,
Driver: s.cfg.BackupDriver,
DryRun: req.DryRun,
Results: make([]api.NamespaceRestoreResult, 0, len(pvcs)),
}
for _, pvc := range pvcs {
targetPVC := targetPVCName(req.TargetPrefix, pvc.Name)
restoreReq := api.RestoreTestRequest{
Namespace: req.Namespace,
PVC: pvc.Name,
Snapshot: req.Snapshot,
TargetNamespace: req.TargetNamespace,
TargetPVC: targetPVC,
DryRun: req.DryRun,
}
result, status, execErr := s.executeRestore(r.Context(), restoreReq, requester)
s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, status)
item := api.NamespaceRestoreResult{
Namespace: req.Namespace,
PVC: pvc.Name,
TargetNamespace: req.TargetNamespace,
TargetPVC: targetPVC,
Status: status,
Volume: result.Volume,
BackupURL: result.BackupURL,
}
if execErr != nil {
item.Error = execErr.Error()
response.Failed++
} else {
response.Succeeded++
}
response.Results = append(response.Results, item)
}
response.Total = len(response.Results)
s.metrics.RecordNamespaceRestoreRequest(s.cfg.BackupDriver, namespaceResultStatus(req.DryRun, response.Total, response.Succeeded, response.Failed))
writeJSON(w, http.StatusOK, response)
}
func (s *Server) executeRestore(ctx context.Context, req api.RestoreTestRequest, requester string) (api.RestoreTestResponse, string, error) {
req.Namespace = strings.TrimSpace(req.Namespace)
req.PVC = strings.TrimSpace(req.PVC)
req.TargetNamespace = strings.TrimSpace(req.TargetNamespace)
req.TargetPVC = strings.TrimSpace(req.TargetPVC)
req.BackupURL = strings.TrimSpace(req.BackupURL)
req.Snapshot = strings.TrimSpace(req.Snapshot)
if req.TargetNamespace == "" {
req.TargetNamespace = req.Namespace
}
switch s.cfg.BackupDriver {
case "longhorn":
exists, err := s.client.PersistentVolumeClaimExists(ctx, req.TargetNamespace, req.TargetPVC)
if err != nil {
return api.RestoreTestResponse{}, "validation_error", err
}
if exists {
return api.RestoreTestResponse{}, "conflict", fmt.Errorf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC)
}
volumeName, _, _, err := s.client.ResolvePVCVolume(ctx, req.Namespace, req.PVC)
if err != nil {
return api.RestoreTestResponse{}, "validation_error", err
}
backupURL := req.BackupURL
if backupURL == "" {
backup, err := s.longhorn.FindBackup(ctx, volumeName, req.Snapshot)
if err != nil {
return api.RestoreTestResponse{}, "validation_error", err
}
backupURL = strings.TrimSpace(backup.URL)
}
if backupURL == "" {
return api.RestoreTestResponse{}, "validation_error", fmt.Errorf("backup_url is required")
}
restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC)
response := api.RestoreTestResponse{
Driver: "longhorn",
Volume: restoreVolumeName,
TargetNamespace: req.TargetNamespace,
TargetPVC: req.TargetPVC,
BackupURL: backupURL,
Namespace: req.Namespace,
RequestedBy: requester,
DryRun: req.DryRun,
}
if req.DryRun {
return response, "dry_run", nil
}
sourceVolume, err := s.longhorn.GetVolume(ctx, volumeName)
if err != nil {
return api.RestoreTestResponse{}, "backend_error", err
}
replicas := sourceVolume.NumberOfReplicas
if replicas == 0 {
replicas = 2
}
if _, err := s.longhorn.CreateVolumeFromBackup(ctx, restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil {
return api.RestoreTestResponse{}, "backend_error", err
}
if err := s.longhorn.CreatePVC(ctx, restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil {
cleanupErr := s.longhorn.DeleteVolume(ctx, restoreVolumeName)
if cleanupErr != nil {
log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr)
return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr)
}
return api.RestoreTestResponse{}, "backend_error", fmt.Errorf("create restore pvc: %v", err)
}
return response, "success", nil
case "restic":
if repo, snapshot, ok := decodeResticSelector(req.BackupURL); ok {
if strings.TrimSpace(req.Snapshot) == "" {
req.Snapshot = snapshot
}
if strings.TrimSpace(req.Repository) == "" {
req.Repository = repo
}
}
jobName, secretName, err := s.client.CreateRestoreJob(ctx, s.cfg, req)
if err != nil {
return api.RestoreTestResponse{}, "backend_error", err
}
result := "success"
if req.DryRun {
result = "dry_run"
}
return api.RestoreTestResponse{
Driver: "restic",
JobName: jobName,
Namespace: req.Namespace,
TargetNamespace: req.TargetNamespace,
TargetPVC: req.TargetPVC,
Secret: secretName,
RequestedBy: requester,
DryRun: req.DryRun,
}, result, nil
default:
return api.RestoreTestResponse{}, "unsupported_driver", fmt.Errorf("unsupported backup driver")
}
}
func restoreStatusCode(result string) int {
switch result {
case "validation_error", "unsupported_driver":
return http.StatusBadRequest
case "conflict":
return http.StatusConflict
case "backend_error":
return http.StatusBadGateway
default:
return http.StatusInternalServerError
}
}
func targetPVCName(prefix, sourcePVC string) string {
prefix = sanitizeName(prefix)
if prefix == "" {
prefix = "restore"
}
if !strings.HasSuffix(prefix, "-") {
prefix += "-"
}
name := sanitizeName(prefix + sourcePVC)
if name == "" {
name = "restore"
}
if len(name) > 63 {
name = strings.Trim(name[:63], "-")
}
if name == "" {
name = "restore"
}
return name
}

File diff suppressed because it is too large Load Diff

View File

@ -175,6 +175,152 @@ func (f *fakeLonghornClient) ListBackups(_ context.Context, volumeName string) (
return f.backups, nil
}
func TestNewInitializesCoreServerFields(t *testing.T) {
srv := New(&config.Config{}, nil, nil)
if srv == nil {
t.Fatalf("expected server instance")
}
if srv.metrics == nil {
t.Fatalf("expected telemetry to be initialized")
}
if srv.ui == nil {
t.Fatalf("expected UI renderer to be initialized")
}
if srv.handler == nil {
t.Fatalf("expected handler to be initialized")
}
if srv.policies == nil || srv.jobUsage == nil || srv.usageStore == nil {
t.Fatalf("expected server maps to be initialized: %#v", srv)
}
}
func TestHealthAndReadyEndpoints(t *testing.T) {
srv := New(&config.Config{}, nil, nil)
testCases := []struct {
path string
status string
}{
{path: "/healthz", status: "ok"},
{path: "/readyz", status: "ready"},
}
for _, tc := range testCases {
req := httptest.NewRequest(http.MethodGet, tc.path, nil)
res := httptest.NewRecorder()
srv.Handler().ServeHTTP(res, req)
if res.Code != http.StatusOK {
t.Fatalf("%s expected 200, got %d: %s", tc.path, res.Code, res.Body.String())
}
var payload map[string]string
if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil {
t.Fatalf("%s decode response: %v", tc.path, err)
}
if payload["status"] != tc.status {
t.Fatalf("%s expected status %q, got %#v", tc.path, tc.status, payload)
}
}
}
func TestWhoAmIReturnsForwardedIdentity(t *testing.T) {
srv := New(&config.Config{AuthRequired: true}, nil, nil)
req := httptest.NewRequest(http.MethodGet, "/v1/whoami", nil)
req.Header.Set("X-Forwarded-User", "brad")
req.Header.Set("X-Forwarded-Email", "brad@bstein.dev")
req.Header.Set("X-Forwarded-Groups", "/ops,/dev")
res := httptest.NewRecorder()
srv.Handler().ServeHTTP(res, req)
if res.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String())
}
var payload api.AuthInfoResponse
if err := json.Unmarshal(res.Body.Bytes(), &payload); err != nil {
t.Fatalf("decode whoami response: %v", err)
}
if !payload.Authenticated || payload.User != "brad" || payload.Email != "brad@bstein.dev" {
t.Fatalf("unexpected whoami payload: %#v", payload)
}
if len(payload.Groups) != 2 || payload.Groups[0] != "ops" || payload.Groups[1] != "dev" {
t.Fatalf("unexpected whoami groups: %#v", payload.Groups)
}
}
func TestWhoAmIRejectsUnsupportedMethod(t *testing.T) {
srv := New(&config.Config{}, nil, nil)
req := httptest.NewRequest(http.MethodPost, "/v1/whoami", nil)
res := httptest.NewRecorder()
srv.Handler().ServeHTTP(res, req)
if res.Code != http.StatusMethodNotAllowed {
t.Fatalf("expected 405, got %d: %s", res.Code, res.Body.String())
}
}
func TestRootRejectsUnsupportedMethod(t *testing.T) {
srv := New(&config.Config{}, nil, nil)
req := httptest.NewRequest(http.MethodPost, "/", nil)
res := httptest.NewRecorder()
srv.Handler().ServeHTTP(res, req)
if res.Code != http.StatusMethodNotAllowed {
t.Fatalf("expected 405, got %d: %s", res.Code, res.Body.String())
}
}
func TestRootFailsWhenUIRendererUnavailable(t *testing.T) {
srv := New(&config.Config{}, nil, nil)
srv.ui = nil
req := httptest.NewRequest(http.MethodGet, "/", nil)
res := httptest.NewRecorder()
srv.Handler().ServeHTTP(res, req)
if res.Code != http.StatusInternalServerError {
t.Fatalf("expected 500, got %d: %s", res.Code, res.Body.String())
}
}
func TestStartSeedsInitialBackgroundState(t *testing.T) {
srv := &Server{
cfg: &config.Config{
AuthRequired: false,
BackupDriver: "longhorn",
BackupMaxAge: 24 * time.Hour,
MetricsRefreshInterval: time.Hour,
PolicyEvalInterval: time.Hour,
B2Enabled: false,
Namespace: "maintenance",
PolicySecretName: "soteria-policies",
UsageSecretName: "",
},
client: &fakeKubeClient{},
longhorn: &fakeLonghornClient{},
metrics: newTelemetry(),
ui: newUIRenderer(),
policies: map[string]api.BackupPolicy{},
jobUsage: map[string]resticJobUsageCacheEntry{},
usageStore: map[string]resticPersistedUsageEntry{},
}
srv.handler = http.HandlerFunc(srv.route)
ctx, cancel := context.WithCancel(context.Background())
srv.Start(ctx)
cancel()
b2Usage := srv.getB2Usage()
if b2Usage.Enabled {
t.Fatalf("expected B2 usage to remain disabled, got %#v", b2Usage)
}
}
func TestProtectedInventoryRequiresAuth(t *testing.T) {
srv := &Server{
cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin", "maintenance"}, BackupDriver: "longhorn"},

View File

@ -0,0 +1,336 @@
package server
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math"
"net/http"
"sort"
"strconv"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"scm.bstein.dev/bstein/soteria/internal/k8s"
"scm.bstein.dev/bstein/soteria/internal/longhorn"
"k8s.io/apimachinery/pkg/api/resource"
k8svalidation "k8s.io/apimachinery/pkg/util/validation"
)
func validateKubernetesName(field, value string) error {
if errs := k8svalidation.IsDNS1123Label(value); len(errs) > 0 {
return fmt.Errorf("%s must be a valid Kubernetes DNS-1123 label", field)
}
return nil
}
func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord {
records := make([]api.BackupRecord, 0, len(backups))
latestName := ""
if latest, _, ok := latestCompletedBackup(backups); ok {
latestName = latest.Name
}
sort.Slice(backups, func(i, j int) bool {
left, lok := parseBackupTime(backups[i].Created)
right, rok := parseBackupTime(backups[j].Created)
switch {
case lok && rok:
return left.After(right)
case lok:
return true
case rok:
return false
default:
return backups[i].Name > backups[j].Name
}
})
for _, backup := range backups {
records = append(records, api.BackupRecord{
Name: backup.Name,
SnapshotName: backup.SnapshotName,
Created: backup.Created,
State: backup.State,
URL: backup.URL,
Size: backup.Size,
Latest: backup.Name == latestName,
})
}
return records
}
func (s *Server) buildResticBackupRecords(ctx context.Context, namespace string, jobs []k8s.BackupJobSummary, defaultRepository string) []api.BackupRecord {
records := make([]api.BackupRecord, 0, len(jobs))
latestName := ""
for _, job := range jobs {
if strings.EqualFold(job.State, "Completed") {
latestName = job.Name
break
}
}
for _, job := range jobs {
created := ""
if ts := backupJobTimestamp(job); !ts.IsZero() {
created = ts.UTC().Format(time.RFC3339)
}
url := ""
size := ""
latest := job.Name == latestName
if latest && strings.EqualFold(job.State, "Completed") {
repository := strings.TrimSpace(job.Repository)
if repository == "" {
repository = strings.TrimSpace(defaultRepository)
}
url = encodeResticSelector(repository)
}
if strings.EqualFold(job.State, "Completed") {
if bytes, ok := s.lookupResticStoredBytesForJob(ctx, namespace, job.Name); ok {
size = formatBytesIEC(bytes)
}
}
records = append(records, api.BackupRecord{
Name: job.Name,
SnapshotName: job.Name,
Created: created,
State: job.State,
URL: url,
Size: size,
Latest: latest,
})
}
return records
}
func encodeResticSelector(repository string) string {
repository = strings.TrimSpace(repository)
if repository == "" {
return "latest"
}
return resticSelectorPrefix + base64.RawURLEncoding.EncodeToString([]byte(repository))
}
func decodeResticSelector(raw string) (string, string, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", "", false
}
if raw == "latest" {
return "", "latest", true
}
if !strings.HasPrefix(raw, resticSelectorPrefix) {
return "", "", false
}
encoded := strings.TrimPrefix(raw, resticSelectorPrefix)
if encoded == "" {
return "", "", false
}
decoded, err := base64.RawURLEncoding.DecodeString(encoded)
if err != nil {
return "", "", false
}
repository := strings.TrimSpace(string(decoded))
if repository == "" {
return "", "", false
}
return repository, "latest", true
}
func backupJobTimestamp(job k8s.BackupJobSummary) time.Time {
if !job.CompletionTime.IsZero() {
return job.CompletionTime
}
return job.CreatedAt
}
func backupJobInProgress(state string) bool {
switch strings.ToLower(strings.TrimSpace(state)) {
case "pending", "running":
return true
default:
return false
}
}
func backupJobProgressPct(state string) int {
switch strings.ToLower(strings.TrimSpace(state)) {
case "pending":
return 20
case "running":
return 70
case "completed", "failed":
return 100
default:
return 0
}
}
func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) {
var selected longhorn.Backup
var selectedTime time.Time
found := false
for _, backup := range backups {
if backup.State != "Completed" {
continue
}
createdAt, ok := parseBackupTime(backup.Created)
if !ok {
if !found {
selected = backup
found = true
}
continue
}
if !found || createdAt.After(selectedTime) {
selected = backup
selectedTime = createdAt
found = true
}
}
return selected, selectedTime, found
}
func parseBackupTime(raw string) (time.Time, bool) {
layouts := []string{time.RFC3339Nano, time.RFC3339}
for _, layout := range layouts {
parsed, err := time.Parse(layout, raw)
if err == nil {
return parsed, true
}
}
return time.Time{}, false
}
func writeJSON(w http.ResponseWriter, status int, payload any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(payload)
}
func writeError(w http.ResponseWriter, status int, message string) {
writeJSON(w, status, map[string]string{"error": message})
}
func backupName(prefix, value string) string {
base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value))
timestamp := time.Now().UTC().Format("20060102-150405")
name := fmt.Sprintf("%s-%s", base, timestamp)
if len(name) <= 63 {
return name
}
maxBase := 63 - len(timestamp) - 1
if maxBase < 1 {
maxBase = 1
}
if len(base) > maxBase {
base = base[:maxBase]
}
return fmt.Sprintf("%s-%s", base, timestamp)
}
func sanitizeName(value string) string {
value = strings.ToLower(value)
value = strings.ReplaceAll(value, "_", "-")
value = strings.ReplaceAll(value, ".", "-")
value = strings.ReplaceAll(value, " ", "-")
value = strings.Trim(value, "-")
return value
}
func roundHours(value float64) float64 {
return math.Round(value*100) / 100
}
func parseSizeBytes(raw string) int64 {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0
}
if value, err := strconv.ParseInt(raw, 10, 64); err == nil {
return value
}
if value, err := strconv.ParseFloat(raw, 64); err == nil {
if value < 0 {
return 0
}
return int64(value)
}
if quantity, err := resource.ParseQuantity(raw); err == nil {
return quantity.Value()
}
return 0
}
func formatBytesIEC(value float64) string {
if value <= 0 || math.IsNaN(value) || math.IsInf(value, 0) {
return "0 B"
}
units := []string{"B", "KiB", "MiB", "GiB", "TiB"}
size := value
unit := 0
for size >= 1024 && unit < len(units)-1 {
size /= 1024
unit++
}
if unit == 0 {
return fmt.Sprintf("%.0f %s", size, units[unit])
}
return fmt.Sprintf("%.2f %s", size, units[unit])
}
func dedupeDefault(value *bool) bool {
if value == nil {
return true
}
return *value
}
func boolPtr(value bool) *bool {
ptr := value
return &ptr
}
func keepLastDefault(value *int) int {
if value == nil {
return 0
}
if *value < 0 {
return 0
}
return *value
}
func intPtr(value int) *int {
ptr := value
return &ptr
}
func validateKeepLast(value *int) error {
if value == nil {
return nil
}
if *value < 0 {
return fmt.Errorf("keep_last must be >= 0")
}
if *value > maxPolicyKeepLast {
return fmt.Errorf("keep_last must be <= %d", maxPolicyKeepLast)
}
return nil
}
func keepLastStricter(candidate, current int) bool {
switch {
case candidate > 0 && current == 0:
return true
case candidate == 0:
return false
case current == 0:
return true
default:
return candidate < current
}
}

View File

@ -1,2 +1 @@
# relative_path max_lines reason
internal/server/server.go 2203 legacy-oversize

1 # relative_path max_lines reason
internal/server/server.go 2203 legacy-oversize