From 09ab3ec8893f8090c09cc36906b775fb34e44700 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 12 Apr 2026 11:09:49 -0300 Subject: [PATCH] backup: add pvc inventory, restore UI, and metrics baseline --- README.md | 187 +++++++--- cmd/soteria/main.go | 11 +- deploy/clusterrole.yaml | 5 +- deploy/configmap.yaml | 4 + deploy/service.yaml | 4 + internal/api/types.go | 97 ++++-- internal/config/config.go | 113 ++++-- internal/k8s/volumes.go | 73 ++++ internal/longhorn/client.go | 5 + internal/server/metrics.go | 203 +++++++++++ internal/server/server.go | 617 +++++++++++++++++++++++++++++---- internal/server/server_test.go | 161 +++++++++ internal/server/ui.go | 6 + internal/server/ui.html | 336 ++++++++++++++++++ 14 files changed, 1662 insertions(+), 160 deletions(-) create mode 100644 internal/server/metrics.go create mode 100644 internal/server/server_test.go create mode 100644 internal/server/ui.go create mode 100644 internal/server/ui.html diff --git a/README.md b/README.md index a149cdb..9d1eda9 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,36 @@ # soteria -Soteria is a small in-cluster service that orchestrates Longhorn backups for PVCs. It is intended to be called by Ariadne (or another controller) and focuses on: +Soteria is an in-cluster service for PVC backup and restore operations. The current production baseline focuses on Longhorn-backed PVCs and provides: -- Longhorn-managed backups to an S3-compatible backend (Backblaze B2 by default). -- On-demand restore tests into a target PVC. -- Minimal long-running footprint (the backup work happens in Longhorn). +- Namespace-grouped PVC inventory for backup and restore selection. +- On-demand backup creation for Longhorn volumes. +- Restore into a new target PVC with conflict checks and best-effort cleanup on failure. +- A simple built-in UI suitable for publishing behind an authenticated ingress. +- Prometheus-format backup freshness telemetry for Grafana rollups. -Snapshots are managed by Longhorn; backups are crash-consistent for the PVC as mounted. +For Longhorn, backups are crash-consistent at the volume level and delegated to the Longhorn control plane. -## API +## Endpoints -### POST /v1/backup (Longhorn) +Public endpoints: + +- `GET /healthz` +- `GET /readyz` +- `GET /metrics` + +Protected endpoints when `SOTERIA_AUTH_REQUIRED=true`: + +- `GET /` UI console +- `GET /v1/whoami` +- `GET /v1/inventory` +- `GET /v1/backups?namespace=&pvc=` +- `POST /v1/backup` +- `POST /v1/restores` +- `POST /v1/restore-test` legacy alias for `/v1/restores` + +## API examples + +### POST /v1/backup ```json { @@ -21,78 +41,149 @@ Snapshots are managed by Longhorn; backups are crash-consistent for the PVC as m } ``` -Response: +Longhorn response: ```json { - "job_name": "soteria-backup-llm-cache-20260131-013001", + "driver": "longhorn", + "volume": "pvc-1234abcd", + "backup": "soteria-backup-ai-llm-cache-20260412-153000", "namespace": "ai", - "secret": "soteria-soteria-backup-llm-cache-20260131-013001-restic", + "requested_by": "brad", "dry_run": false } ``` -### POST /v1/restore-test (Longhorn) +### GET /v1/inventory + +Response shape: + +```json +{ + "generated_at": "2026-04-12T15:30:00Z", + "namespaces": [ + { + "name": "ai", + "pvcs": [ + { + "namespace": "ai", + "pvc": "llm-cache", + "volume": "pvc-1234abcd", + "storage_class": "longhorn", + "capacity": "50Gi", + "driver": "longhorn", + "last_backup_at": "2026-04-12T14:55:00Z", + "last_backup_age_hours": 0.58, + "backup_count": 14, + "healthy": true, + "health_reason": "fresh" + } + ] + } + ] +} +``` + +### GET /v1/backups + +```text +/v1/backups?namespace=ai&pvc=llm-cache +``` + +Returns the resolved volume name and backup records so the UI or automation can select a restore source. + +### POST /v1/restores ```json { "namespace": "ai", + "pvc": "llm-cache", "snapshot": "latest", - "pvc": "ollama-models", - "target_pvc": "restore-sandbox", + "target_namespace": "ai", + "target_pvc": "restore-llm-cache", "dry_run": false } ``` Notes: -- `pvc` is required to resolve the Longhorn volume and locate the latest backup. -- `snapshot` may be `latest` or a specific backup/snapshot name. You can also pass `backup_url`. + +- `namespace` and `pvc` identify the source PVC. +- `target_pvc` is required. +- `target_namespace` defaults to `namespace`. +- Soteria refuses to overwrite an existing target PVC. +- If Longhorn volume creation succeeds but PVC creation fails, Soteria attempts to delete the just-created restore volume. +- You may provide `backup_url` directly instead of `snapshot`. + +## Authentication and authorization + +When `SOTERIA_AUTH_REQUIRED=true`, Soteria expects trusted auth headers from a fronting proxy such as `oauth2-proxy`: + +- `X-Auth-Request-User` +- `X-Auth-Request-Email` +- `X-Auth-Request-Groups` + +Allowed groups are configured with `SOTERIA_ALLOWED_GROUPS` and compared after normalizing leading `/` prefixes, so both `maintenance` and `/maintenance` are accepted. + +Optional machine-to-machine access can be enabled with `SOTERIA_AUTH_BEARER_TOKENS`, which accepts a comma-separated list of bearer tokens. + +## Prometheus metrics + +Soteria exports Prometheus-format metrics at `GET /metrics`. + +Implemented metrics: + +- `soteria_backup_requests_total{driver,result}` +- `soteria_restore_requests_total{driver,result}` +- `soteria_authz_denials_total{reason}` +- `soteria_inventory_refresh_failures_total` +- `soteria_inventory_refresh_timestamp_seconds` +- `pvc_backup_age_hours{namespace,pvc,volume,driver}` +- `pvc_backup_health{namespace,pvc,volume,driver}` +- `pvc_backup_last_success_timestamp_seconds{namespace,pvc,volume,driver}` +- `pvc_backup_count{namespace,pvc,volume,driver}` + +`pvc_backup_health` is `1` when the most recent successful backup is within `SOTERIA_BACKUP_MAX_AGE_HOURS`, otherwise `0`. ## Configuration Environment variables: -- `SOTERIA_BACKUP_DRIVER` (default: `longhorn`, allowed: `longhorn`, `restic`) -- `SOTERIA_LONGHORN_URL` (default: `http://longhorn-backend.longhorn-system.svc:9500`) -- `SOTERIA_LONGHORN_BACKUP_MODE` (default: `incremental`, allowed: `incremental`, `full`) -- `SOTERIA_RESTIC_REPOSITORY` (required for restic driver) Example: `s3:s3.us-west-004.backblazeb2.com/atlas-backups` -- `SOTERIA_RESTIC_SECRET_NAME` (default: `soteria-restic`) -- `SOTERIA_SECRET_NAMESPACE` (default: service namespace) -- `SOTERIA_RESTIC_IMAGE` (default: `restic/restic:0.16.4`) -- `SOTERIA_RESTIC_BACKUP_ARGS` (optional) Extra args for `restic backup` -- `SOTERIA_RESTIC_FORGET_ARGS` (optional) Extra args for `restic forget` (include `--prune` if desired) -- `SOTERIA_S3_ENDPOINT` (optional) Example: `s3.us-west-004.backblazeb2.com` -- `SOTERIA_S3_REGION` (optional) Example: `us-west-004` -- `SOTERIA_JOB_TTL_SECONDS` (default: 86400) -- `SOTERIA_JOB_NODE_SELECTOR` (optional) Comma-separated node selector, e.g. `kubernetes.io/arch=arm64,node-role.kubernetes.io/worker=true` -- `SOTERIA_JOB_SERVICE_ACCOUNT` (optional) ServiceAccount for backup Jobs -- `SOTERIA_LISTEN_ADDR` (default: `:8080`) - -The restic repository is encrypted with `RESTIC_PASSWORD` from the secret below. +- `SOTERIA_BACKUP_DRIVER` default `longhorn`, allowed `longhorn`, `restic` +- `SOTERIA_LONGHORN_URL` default `http://longhorn-backend.longhorn-system.svc:9500` +- `SOTERIA_LONGHORN_BACKUP_MODE` default `incremental`, allowed `incremental`, `full` +- `SOTERIA_RESTIC_REPOSITORY` required for restic driver +- `SOTERIA_RESTIC_SECRET_NAME` default `soteria-restic` +- `SOTERIA_SECRET_NAMESPACE` default service namespace +- `SOTERIA_RESTIC_IMAGE` default `restic/restic:0.16.4` +- `SOTERIA_RESTIC_BACKUP_ARGS` optional extra args for `restic backup` +- `SOTERIA_RESTIC_FORGET_ARGS` optional extra args for `restic forget` +- `SOTERIA_S3_ENDPOINT` optional S3-compatible endpoint +- `SOTERIA_S3_REGION` optional region +- `SOTERIA_JOB_TTL_SECONDS` default `86400` +- `SOTERIA_JOB_NODE_SELECTOR` optional comma-separated `key=value` list +- `SOTERIA_JOB_SERVICE_ACCOUNT` optional ServiceAccount for restic Jobs +- `SOTERIA_LISTEN_ADDR` default `:8080` +- `SOTERIA_AUTH_REQUIRED` default `false` +- `SOTERIA_ALLOWED_GROUPS` default `admin,maintenance` +- `SOTERIA_AUTH_BEARER_TOKENS` optional comma-separated bearer tokens +- `SOTERIA_BACKUP_MAX_AGE_HOURS` default `24` +- `SOTERIA_METRICS_REFRESH_SECONDS` default `300` ## Secrets -Create a secret named `soteria-restic` in the Soteria namespace (or set `SOTERIA_RESTIC_SECRET_NAME`) if using the restic driver. Keys required: +Create a secret named `soteria-restic` in the Soteria namespace, or set `SOTERIA_RESTIC_SECRET_NAME`, when using the restic driver. Required keys: - `AWS_ACCESS_KEY_ID` - `AWS_SECRET_ACCESS_KEY` - `RESTIC_PASSWORD` -The service copies this secret into the target namespace per job and attaches an owner reference so it gets cleaned up with the Job. +The service copies this secret into the target namespace per job and attaches an owner reference so it is cleaned up with the Job. -A template is in `deploy/secret-example.yaml` (do not commit real credentials). +A template is in `deploy/secret-example.yaml`. Do not commit real credentials. ## Deployment -The `deploy/` folder includes Kustomize-ready manifests: - -- `namespace.yaml` -- `configmap.yaml` (set your repository and endpoint) -- `serviceaccount.yaml` -- `clusterrole.yaml` -- `clusterrolebinding.yaml` -- `deployment.yaml` -- `service.yaml` +The `deploy/` folder includes Kustomize-ready manifests for namespace, RBAC, config, deployment, and service. Apply with: @@ -100,8 +191,10 @@ Apply with: kubectl apply -k deploy ``` +The example Service is annotated for Prometheus scraping of `/metrics`. + ## Notes -- Backups mount the PVC read-only at `/data` and run `restic backup /data`. -- Restore tests write into `/restore` (either an emptyDir or a target PVC). -- For Backblaze B2, use the S3 endpoint and region for your bucket (example: `s3.us-west-004.backblazeb2.com`). +- Longhorn inventory and metrics are based on discovered backup records per PVC. +- Restic backup and restore execution exists, but inventory-style telemetry is currently Longhorn-focused. +- For Atlas production, place Soteria behind an authenticated ingress and trust only proxy-injected auth headers. diff --git a/cmd/soteria/main.go b/cmd/soteria/main.go index 916a7be..972cc41 100644 --- a/cmd/soteria/main.go +++ b/cmd/soteria/main.go @@ -5,7 +5,6 @@ import ( "errors" "log" "net/http" - "os" "os/signal" "syscall" "time" @@ -18,6 +17,8 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.LUTC) + runCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() cfg, err := config.Load() if err != nil { @@ -31,6 +32,7 @@ func main() { longhornClient := longhorn.New(cfg.LonghornURL) srv := server.New(cfg, client, longhornClient) + srv.Start(runCtx) httpServer := &http.Server{ Addr: cfg.ListenAddr, Handler: srv.Handler(), @@ -46,12 +48,9 @@ func main() { errCh <- httpServer.ListenAndServe() }() - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) - select { - case sig := <-signalCh: - log.Printf("shutdown signal: %s", sig) + case <-runCtx.Done(): + log.Printf("shutdown signal: %v", runCtx.Err()) case err := <-errCh: if err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("server error: %v", err) diff --git a/deploy/clusterrole.yaml b/deploy/clusterrole.yaml index 57ed475..ec2f883 100644 --- a/deploy/clusterrole.yaml +++ b/deploy/clusterrole.yaml @@ -5,9 +5,12 @@ metadata: labels: app.kubernetes.io/name: soteria rules: + - apiGroups: [""] + resources: ["persistentvolumeclaims", "persistentvolumes"] + verbs: ["get", "list"] - apiGroups: [""] resources: ["secrets"] - verbs: ["get", "list", "create", "update"] + verbs: ["get", "list", "create", "update", "delete"] - apiGroups: ["batch"] resources: ["jobs"] verbs: ["get", "list", "create"] diff --git a/deploy/configmap.yaml b/deploy/configmap.yaml index 82dd636..019445a 100644 --- a/deploy/configmap.yaml +++ b/deploy/configmap.yaml @@ -9,3 +9,7 @@ data: SOTERIA_BACKUP_DRIVER: "longhorn" SOTERIA_LONGHORN_URL: "http://longhorn-backend.longhorn-system.svc:9500" SOTERIA_LONGHORN_BACKUP_MODE: "incremental" + SOTERIA_AUTH_REQUIRED: "false" + SOTERIA_ALLOWED_GROUPS: "admin,maintenance" + SOTERIA_BACKUP_MAX_AGE_HOURS: "24" + SOTERIA_METRICS_REFRESH_SECONDS: "300" diff --git a/deploy/service.yaml b/deploy/service.yaml index a73dbb2..6ad8334 100644 --- a/deploy/service.yaml +++ b/deploy/service.yaml @@ -5,6 +5,10 @@ metadata: labels: app.kubernetes.io/name: soteria app.kubernetes.io/component: api + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "80" + prometheus.io/path: "/metrics" spec: type: ClusterIP selector: diff --git a/internal/api/types.go b/internal/api/types.go index 084a243..a842a09 100644 --- a/internal/api/types.go +++ b/internal/api/types.go @@ -9,31 +9,86 @@ type BackupRequest struct { } type BackupResponse struct { - Driver string `json:"driver,omitempty"` - Volume string `json:"volume,omitempty"` - Backup string `json:"backup,omitempty"` - JobName string `json:"job_name,omitempty"` - Namespace string `json:"namespace,omitempty"` - Secret string `json:"secret,omitempty"` - DryRun bool `json:"dry_run"` + Driver string `json:"driver,omitempty"` + Volume string `json:"volume,omitempty"` + Backup string `json:"backup,omitempty"` + JobName string `json:"job_name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Secret string `json:"secret,omitempty"` + RequestedBy string `json:"requested_by,omitempty"` + DryRun bool `json:"dry_run"` } type RestoreTestRequest struct { - Namespace string `json:"namespace"` - PVC string `json:"pvc,omitempty"` - Snapshot string `json:"snapshot,omitempty"` - BackupURL string `json:"backup_url,omitempty"` - TargetPVC string `json:"target_pvc,omitempty"` - DryRun bool `json:"dry_run"` + Namespace string `json:"namespace"` + PVC string `json:"pvc,omitempty"` + Snapshot string `json:"snapshot,omitempty"` + BackupURL string `json:"backup_url,omitempty"` + TargetNamespace string `json:"target_namespace,omitempty"` + TargetPVC string `json:"target_pvc,omitempty"` + DryRun bool `json:"dry_run"` } type RestoreTestResponse struct { - Driver string `json:"driver,omitempty"` - Volume string `json:"volume,omitempty"` - TargetPVC string `json:"target_pvc,omitempty"` - BackupURL string `json:"backup_url,omitempty"` - JobName string `json:"job_name,omitempty"` - Namespace string `json:"namespace,omitempty"` - Secret string `json:"secret,omitempty"` - DryRun bool `json:"dry_run"` + Driver string `json:"driver,omitempty"` + Volume string `json:"volume,omitempty"` + TargetNamespace string `json:"target_namespace,omitempty"` + TargetPVC string `json:"target_pvc,omitempty"` + BackupURL string `json:"backup_url,omitempty"` + JobName string `json:"job_name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Secret string `json:"secret,omitempty"` + RequestedBy string `json:"requested_by,omitempty"` + DryRun bool `json:"dry_run"` +} + +type InventoryResponse struct { + GeneratedAt string `json:"generated_at"` + Namespaces []NamespaceInventory `json:"namespaces"` +} + +type NamespaceInventory struct { + Name string `json:"name"` + PVCs []PVCInventory `json:"pvcs"` +} + +type PVCInventory struct { + Namespace string `json:"namespace"` + PVC string `json:"pvc"` + Volume string `json:"volume,omitempty"` + Phase string `json:"phase,omitempty"` + StorageClass string `json:"storage_class,omitempty"` + Capacity string `json:"capacity,omitempty"` + AccessModes []string `json:"access_modes,omitempty"` + Driver string `json:"driver,omitempty"` + LastBackupAt string `json:"last_backup_at,omitempty"` + LastBackupAgeHours float64 `json:"last_backup_age_hours,omitempty"` + BackupCount int `json:"backup_count"` + Healthy bool `json:"healthy"` + HealthReason string `json:"health_reason,omitempty"` + Error string `json:"error,omitempty"` +} + +type BackupListResponse struct { + Namespace string `json:"namespace"` + PVC string `json:"pvc"` + Volume string `json:"volume"` + Backups []BackupRecord `json:"backups"` +} + +type BackupRecord struct { + Name string `json:"name"` + SnapshotName string `json:"snapshot_name,omitempty"` + Created string `json:"created,omitempty"` + State string `json:"state,omitempty"` + URL string `json:"url,omitempty"` + Size string `json:"size,omitempty"` + Latest bool `json:"latest,omitempty"` +} + +type AuthInfoResponse struct { + Authenticated bool `json:"authenticated"` + User string `json:"user,omitempty"` + Email string `json:"email,omitempty"` + Groups []string `json:"groups,omitempty"` } diff --git a/internal/config/config.go b/internal/config/config.go index 15912ec..ee12718 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,36 +5,45 @@ import ( "os" "strconv" "strings" + "time" ) const ( - defaultBackupDriver = "longhorn" - defaultResticImage = "restic/restic:0.16.4" - defaultJobTTLSeconds = 86400 - defaultListenAddr = ":8080" - defaultResticSecret = "soteria-restic" - defaultLonghornURL = "http://longhorn-backend.longhorn-system.svc:9500" - defaultLonghornMode = "incremental" - serviceNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + defaultBackupDriver = "longhorn" + defaultResticImage = "restic/restic:0.16.4" + defaultJobTTLSeconds = 86400 + defaultListenAddr = ":8080" + defaultResticSecret = "soteria-restic" + defaultLonghornURL = "http://longhorn-backend.longhorn-system.svc:9500" + defaultLonghornMode = "incremental" + defaultAllowedGroups = "admin,maintenance" + defaultMetricsRefresh = 300 * time.Second + defaultBackupMaxAge = 24 * time.Hour + serviceNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) type Config struct { - Namespace string - SecretNamespace string - BackupDriver string - ResticImage string - ResticRepository string - ResticSecretName string - ResticBackupArgs []string - ResticForgetArgs []string - S3Endpoint string - S3Region string - JobTTLSeconds int32 - JobNodeSelector map[string]string - WorkerServiceAccount string - ListenAddr string - LonghornURL string - LonghornBackupMode string + Namespace string + SecretNamespace string + BackupDriver string + ResticImage string + ResticRepository string + ResticSecretName string + ResticBackupArgs []string + ResticForgetArgs []string + S3Endpoint string + S3Region string + JobTTLSeconds int32 + JobNodeSelector map[string]string + WorkerServiceAccount string + ListenAddr string + LonghornURL string + LonghornBackupMode string + AuthRequired bool + AllowedGroups []string + AuthBearerTokens []string + MetricsRefreshInterval time.Duration + BackupMaxAge time.Duration } func Load() (*Config, error) { @@ -67,6 +76,11 @@ func Load() (*Config, error) { cfg.JobNodeSelector = parseNodeSelector(getenv("SOTERIA_JOB_NODE_SELECTOR")) cfg.LonghornURL = getenvDefault("SOTERIA_LONGHORN_URL", defaultLonghornURL) cfg.LonghornBackupMode = getenvDefault("SOTERIA_LONGHORN_BACKUP_MODE", defaultLonghornMode) + cfg.AuthRequired = getenvBool("SOTERIA_AUTH_REQUIRED") + cfg.AllowedGroups = parseCSV(getenvDefault("SOTERIA_ALLOWED_GROUPS", defaultAllowedGroups)) + cfg.AuthBearerTokens = parseCSV(getenv("SOTERIA_AUTH_BEARER_TOKENS")) + cfg.MetricsRefreshInterval = defaultMetricsRefresh + cfg.BackupMaxAge = defaultBackupMaxAge if ttl, ok := getenvInt("SOTERIA_JOB_TTL_SECONDS"); ok { cfg.JobTTLSeconds = int32(ttl) @@ -74,6 +88,13 @@ func Load() (*Config, error) { cfg.JobTTLSeconds = defaultJobTTLSeconds } + if seconds, ok := getenvInt("SOTERIA_METRICS_REFRESH_SECONDS"); ok { + cfg.MetricsRefreshInterval = time.Duration(seconds) * time.Second + } + if hours, ok := getenvFloat("SOTERIA_BACKUP_MAX_AGE_HOURS"); ok { + cfg.BackupMaxAge = time.Duration(hours * float64(time.Hour)) + } + if cfg.ResticRepository == "" { if cfg.BackupDriver == "restic" { return nil, errors.New("SOTERIA_RESTIC_REPOSITORY is required for restic driver") @@ -106,6 +127,12 @@ func Load() (*Config, error) { return nil, errors.New("SOTERIA_LONGHORN_BACKUP_MODE must be incremental or full") } } + if cfg.MetricsRefreshInterval <= 0 { + return nil, errors.New("SOTERIA_METRICS_REFRESH_SECONDS must be greater than zero") + } + if cfg.BackupMaxAge <= 0 { + return nil, errors.New("SOTERIA_BACKUP_MAX_AGE_HOURS must be greater than zero") + } return cfg, nil } @@ -133,6 +160,28 @@ func getenvInt(key string) (int, bool) { return parsed, true } +func getenvFloat(key string) (float64, bool) { + val := getenv(key) + if val == "" { + return 0, false + } + parsed, err := strconv.ParseFloat(val, 64) + if err != nil { + return 0, false + } + return parsed, true +} + +func getenvBool(key string) bool { + val := strings.ToLower(getenv(key)) + switch val { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + func parseNodeSelector(raw string) map[string]string { raw = strings.TrimSpace(raw) if raw == "" { @@ -160,3 +209,19 @@ func parseNodeSelector(raw string) map[string]string { return selector } + +func parseCSV(raw string) []string { + if strings.TrimSpace(raw) == "" { + return nil + } + parts := strings.Split(raw, ",") + values := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + values = append(values, part) + } + return values +} diff --git a/internal/k8s/volumes.go b/internal/k8s/volumes.go index 9d9183d..959a393 100644 --- a/internal/k8s/volumes.go +++ b/internal/k8s/volumes.go @@ -3,11 +3,23 @@ package k8s import ( "context" "fmt" + "sort" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type PVCSummary struct { + Namespace string + Name string + VolumeName string + Phase string + StorageClass string + Capacity string + AccessModes []string +} + func (c *Client) ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { pvc, err := c.Clientset.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) if err != nil { @@ -24,3 +36,64 @@ func (c *Client) ResolvePVCVolume(ctx context.Context, namespace, pvcName string return pvc.Spec.VolumeName, pvc, pv, nil } + +func (c *Client) ListBoundPVCs(ctx context.Context) ([]PVCSummary, error) { + list, err := c.Clientset.CoreV1().PersistentVolumeClaims("").List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("list pvcs: %w", err) + } + + items := make([]PVCSummary, 0, len(list.Items)) + for _, pvc := range list.Items { + if pvc.Spec.VolumeName == "" || pvc.Status.Phase != corev1.ClaimBound { + continue + } + + storageClass := "" + if pvc.Spec.StorageClassName != nil { + storageClass = *pvc.Spec.StorageClassName + } + + capacity := "" + if quantity, ok := pvc.Status.Capacity[corev1.ResourceStorage]; ok { + capacity = quantity.String() + } else if quantity, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; ok { + capacity = quantity.String() + } + + accessModes := make([]string, 0, len(pvc.Spec.AccessModes)) + for _, mode := range pvc.Spec.AccessModes { + accessModes = append(accessModes, string(mode)) + } + + items = append(items, PVCSummary{ + Namespace: pvc.Namespace, + Name: pvc.Name, + VolumeName: pvc.Spec.VolumeName, + Phase: string(pvc.Status.Phase), + StorageClass: storageClass, + Capacity: capacity, + AccessModes: accessModes, + }) + } + + sort.Slice(items, func(i, j int) bool { + if items[i].Namespace == items[j].Namespace { + return items[i].Name < items[j].Name + } + return items[i].Namespace < items[j].Namespace + }) + + return items, nil +} + +func (c *Client) PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) { + _, err := c.Clientset.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) + if err == nil { + return true, nil + } + if apierrors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("get pvc %s/%s: %w", namespace, pvcName, err) +} diff --git a/internal/longhorn/client.go b/internal/longhorn/client.go index a833118..3f0ea63 100644 --- a/internal/longhorn/client.go +++ b/internal/longhorn/client.go @@ -142,6 +142,11 @@ func (c *Client) CreatePVC(ctx context.Context, volumeName, namespace, pvcName s return c.doJSON(ctx, http.MethodPost, path, input, nil) } +func (c *Client) DeleteVolume(ctx context.Context, volumeName string) error { + path := fmt.Sprintf("%s/v1/volumes/%s", c.baseURL, url.PathEscape(volumeName)) + return c.doJSON(ctx, http.MethodDelete, path, nil, nil) +} + func (c *Client) GetBackupVolume(ctx context.Context, volumeName string) (*BackupVolume, error) { path := fmt.Sprintf("%s/v1/backupvolumes/%s", c.baseURL, url.PathEscape(volumeName)) var out BackupVolume diff --git a/internal/server/metrics.go b/internal/server/metrics.go new file mode 100644 index 0000000..47a7b64 --- /dev/null +++ b/internal/server/metrics.go @@ -0,0 +1,203 @@ +package server + +import ( + "fmt" + "net/http" + "sort" + "strings" + "sync" + "time" + + "scm.bstein.dev/bstein/soteria/internal/api" +) + +type metricSample struct { + labels map[string]string + value float64 +} + +type telemetry struct { + mu sync.RWMutex + backupRequests map[string]metricSample + restoreRequests map[string]metricSample + authzDenials map[string]metricSample + inventoryRefreshFailure float64 + inventoryRefreshTime float64 + pvcBackupAgeHours map[string]metricSample + pvcBackupHealth map[string]metricSample + pvcBackupLastSuccess map[string]metricSample + pvcBackupCount map[string]metricSample +} + +func newTelemetry() *telemetry { + return &telemetry{ + backupRequests: map[string]metricSample{}, + restoreRequests: map[string]metricSample{}, + authzDenials: map[string]metricSample{}, + pvcBackupAgeHours: map[string]metricSample{}, + pvcBackupHealth: map[string]metricSample{}, + pvcBackupLastSuccess: map[string]metricSample{}, + pvcBackupCount: map[string]metricSample{}, + } +} + +func (t *telemetry) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + _, _ = w.Write([]byte(t.render())) + }) +} + +func (t *telemetry) RecordBackupRequest(driver, result string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.backupRequests, map[string]string{"driver": driver, "result": result}) +} + +func (t *telemetry) RecordRestoreRequest(driver, result string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.restoreRequests, map[string]string{"driver": driver, "result": result}) +} + +func (t *telemetry) RecordAuthzDenied(reason string) { + t.mu.Lock() + defer t.mu.Unlock() + incMetric(t.authzDenials, map[string]string{"reason": reason}) +} + +func (t *telemetry) RecordInventoryFailure() { + t.mu.Lock() + defer t.mu.Unlock() + t.inventoryRefreshFailure++ +} + +func (t *telemetry) RecordInventory(inv api.InventoryResponse) { + t.mu.Lock() + defer t.mu.Unlock() + + t.pvcBackupAgeHours = map[string]metricSample{} + t.pvcBackupHealth = map[string]metricSample{} + t.pvcBackupLastSuccess = map[string]metricSample{} + t.pvcBackupCount = map[string]metricSample{} + + for _, namespace := range inv.Namespaces { + for _, pvc := range namespace.PVCs { + labels := map[string]string{ + "namespace": pvc.Namespace, + "pvc": pvc.PVC, + "volume": pvc.Volume, + "driver": pvc.Driver, + } + setMetric(t.pvcBackupCount, labels, float64(pvc.BackupCount)) + if pvc.Healthy { + setMetric(t.pvcBackupHealth, labels, 1) + } else { + setMetric(t.pvcBackupHealth, labels, 0) + } + if pvc.LastBackupAt == "" { + continue + } + setMetric(t.pvcBackupAgeHours, labels, pvc.LastBackupAgeHours) + if ts, ok := parseBackupTime(pvc.LastBackupAt); ok { + setMetric(t.pvcBackupLastSuccess, labels, float64(ts.Unix())) + } + } + } + + t.inventoryRefreshTime = float64(time.Now().Unix()) +} + +func (t *telemetry) render() string { + t.mu.RLock() + defer t.mu.RUnlock() + + var b strings.Builder + writeMetricFamily(&b, "soteria_backup_requests_total", "counter", "Backup requests handled by Soteria.", metricValues(t.backupRequests)) + writeMetricFamily(&b, "soteria_restore_requests_total", "counter", "Restore requests handled by Soteria.", metricValues(t.restoreRequests)) + writeMetricFamily(&b, "soteria_authz_denials_total", "counter", "Authorization denials emitted by Soteria.", metricValues(t.authzDenials)) + writeMetricFamily(&b, "soteria_inventory_refresh_failures_total", "counter", "Inventory refresh failures while computing PVC backup telemetry.", []metricSample{{value: t.inventoryRefreshFailure}}) + writeMetricFamily(&b, "soteria_inventory_refresh_timestamp_seconds", "gauge", "Unix timestamp of the last successful inventory refresh.", []metricSample{{value: t.inventoryRefreshTime}}) + writeMetricFamily(&b, "pvc_backup_age_hours", "gauge", "Age in hours of the latest successful PVC backup known to Soteria.", metricValues(t.pvcBackupAgeHours)) + writeMetricFamily(&b, "pvc_backup_health", "gauge", "PVC backup health according to Soteria: 1=fresh backup within policy, 0=missing/stale/error.", metricValues(t.pvcBackupHealth)) + writeMetricFamily(&b, "pvc_backup_last_success_timestamp_seconds", "gauge", "Unix timestamp of the latest successful PVC backup known to Soteria.", metricValues(t.pvcBackupLastSuccess)) + writeMetricFamily(&b, "pvc_backup_count", "gauge", "Count of backup records discovered for a PVC.", metricValues(t.pvcBackupCount)) + return b.String() +} + +func metricValues(source map[string]metricSample) []metricSample { + keys := make([]string, 0, len(source)) + for key := range source { + keys = append(keys, key) + } + sort.Strings(keys) + values := make([]metricSample, 0, len(keys)) + for _, key := range keys { + values = append(values, source[key]) + } + return values +} + +func writeMetricFamily(b *strings.Builder, name, metricType, help string, samples []metricSample) { + b.WriteString("# HELP ") + b.WriteString(name) + b.WriteString(" ") + b.WriteString(help) + b.WriteString("\n") + b.WriteString("# TYPE ") + b.WriteString(name) + b.WriteString(" ") + b.WriteString(metricType) + b.WriteString("\n") + for _, sample := range samples { + b.WriteString(name) + b.WriteString(renderLabels(sample.labels)) + b.WriteString(" ") + b.WriteString(fmt.Sprintf("%g", sample.value)) + b.WriteString("\n") + } +} + +func renderLabels(labels map[string]string) string { + if len(labels) == 0 { + return "" + } + keys := make([]string, 0, len(labels)) + for key := range labels { + keys = append(keys, key) + } + sort.Strings(keys) + parts := make([]string, 0, len(keys)) + for _, key := range keys { + parts = append(parts, fmt.Sprintf("%s=%q", key, labels[key])) + } + return "{" + strings.Join(parts, ",") + "}" +} + +func metricKey(labels map[string]string) string { + return renderLabels(labels) +} + +func incMetric(target map[string]metricSample, labels map[string]string) { + key := metricKey(labels) + sample, ok := target[key] + if !ok { + target[key] = metricSample{labels: cloneLabels(labels), value: 1} + return + } + sample.value++ + target[key] = sample +} + +func setMetric(target map[string]metricSample, labels map[string]string, value float64) { + key := metricKey(labels) + target[key] = metricSample{labels: cloneLabels(labels), value: value} +} + +func cloneLabels(labels map[string]string) map[string]string { + out := make(map[string]string, len(labels)) + for key, value := range labels { + out[key] = value + } + return out +} diff --git a/internal/server/server.go b/internal/server/server.go index f1001b6..203dc06 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,9 +1,13 @@ package server import ( + "context" "encoding/json" "fmt" + "log" + "math" "net/http" + "sort" "strings" "time" @@ -11,43 +15,196 @@ import ( "scm.bstein.dev/bstein/soteria/internal/config" "scm.bstein.dev/bstein/soteria/internal/k8s" "scm.bstein.dev/bstein/soteria/internal/longhorn" + + corev1 "k8s.io/api/core/v1" ) +type kubeClient interface { + ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) + CreateBackupJob(ctx context.Context, cfg *config.Config, req api.BackupRequest) (string, string, error) + CreateRestoreJob(ctx context.Context, cfg *config.Config, req api.RestoreTestRequest) (string, string, error) + ListBoundPVCs(ctx context.Context) ([]k8s.PVCSummary, error) + PersistentVolumeClaimExists(ctx context.Context, namespace, pvcName string) (bool, error) +} + +type longhornClient interface { + SnapshotBackup(ctx context.Context, volume, name string, labels map[string]string, backupMode string) (*longhorn.Volume, error) + GetVolume(ctx context.Context, volume string) (*longhorn.Volume, error) + CreateVolumeFromBackup(ctx context.Context, name, size string, replicas int, backupURL string) (*longhorn.Volume, error) + CreatePVC(ctx context.Context, volumeName, namespace, pvcName string) error + DeleteVolume(ctx context.Context, volumeName string) error + FindBackup(ctx context.Context, volumeName, snapshot string) (*longhorn.Backup, error) + ListBackups(ctx context.Context, volumeName string) ([]longhorn.Backup, error) +} + type Server struct { cfg *config.Config - client *k8s.Client - longhorn *longhorn.Client - mux *http.ServeMux + client kubeClient + longhorn longhornClient + metrics *telemetry + handler http.Handler } +type authIdentity struct { + Authenticated bool + User string + Email string + Groups []string +} + +type ctxKey string + +const authContextKey ctxKey = "soteria-auth" + func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server { s := &Server{ cfg: cfg, client: client, longhorn: lh, - mux: http.NewServeMux(), + metrics: newTelemetry(), } - - s.mux.HandleFunc("/healthz", s.handleHealth) - s.mux.HandleFunc("/readyz", s.handleReady) - s.mux.HandleFunc("/v1/backup", s.handleBackup) - s.mux.HandleFunc("/v1/restore-test", s.handleRestore) - + s.handler = http.HandlerFunc(s.route) return s } -func (s *Server) Handler() http.Handler { - return s.mux +func (s *Server) Start(ctx context.Context) { + s.refreshTelemetry(ctx) + + ticker := time.NewTicker(s.cfg.MetricsRefreshInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.refreshTelemetry(ctx) + } + } + }() } -func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { +func (s *Server) Handler() http.Handler { + return s.handler +} + +func (s *Server) route(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/healthz": + s.handleHealth(w, r) + return + case "/readyz": + s.handleReady(w, r) + return + case "/metrics": + s.metrics.Handler().ServeHTTP(w, r) + return + } + + identity, status, err := s.authorize(r) + if err != nil { + s.metrics.RecordAuthzDenied(authzReason(status, err)) + writeError(w, status, err.Error()) + return + } + + r = r.WithContext(context.WithValue(r.Context(), authContextKey, identity)) + + switch r.URL.Path { + case "/": + s.handleUI(w, r) + case "/v1/whoami": + s.handleWhoAmI(w, r) + case "/v1/inventory": + s.handleInventory(w, r) + case "/v1/backups": + s.handleBackups(w, r) + case "/v1/backup": + s.handleBackup(w, r) + case "/v1/restores", "/v1/restore-test": + s.handleRestore(w, r) + default: + writeError(w, http.StatusNotFound, "not found") + } +} + +func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } -func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleReady(w http.ResponseWriter, _ *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) } +func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = w.Write([]byte(uiHTML)) +} + +func (s *Server) handleWhoAmI(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + identity := requesterFromContext(r.Context()) + writeJSON(w, http.StatusOK, api.AuthInfoResponse{ + Authenticated: identity.Authenticated, + User: identity.User, + Email: identity.Email, + Groups: identity.Groups, + }) +} + +func (s *Server) handleInventory(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + inventory, err := s.buildInventory(r.Context()) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, inventory) +} + +func (s *Server) handleBackups(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + namespace := strings.TrimSpace(r.URL.Query().Get("namespace")) + pvcName := strings.TrimSpace(r.URL.Query().Get("pvc")) + if namespace == "" || pvcName == "" { + writeError(w, http.StatusBadRequest, "namespace and pvc are required") + return + } + + volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), namespace, pvcName) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + backups, err := s.longhorn.ListBackups(r.Context(), volumeName) + if err != nil { + writeError(w, http.StatusBadGateway, err.Error()) + return + } + + writeJSON(w, http.StatusOK, api.BackupListResponse{ + Namespace: namespace, + PVC: pvcName, + Volume: volumeName, + Backups: buildBackupRecords(backups), + }) +} + func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -57,61 +214,83 @@ func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.BackupRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } + if strings.TrimSpace(req.Namespace) == "" || strings.TrimSpace(req.PVC) == "" { + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace and pvc are required") + return + } + + requester := currentRequester(r.Context()) switch s.cfg.BackupDriver { case "longhorn": volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) if err != nil { + s.metrics.RecordBackupRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } - backupName := backupName("backup", req.PVC) + backupName := backupName("backup", req.Namespace+"-"+req.PVC) if req.DryRun { + s.metrics.RecordBackupRequest("longhorn", "dry_run") writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "longhorn", - Volume: volumeName, - Backup: backupName, - Namespace: req.Namespace, - DryRun: true, + Driver: "longhorn", + Volume: volumeName, + Backup: backupName, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: true, }) return } labels := map[string]string{ - "soteria.bstein.dev/namespace": req.Namespace, - "soteria.bstein.dev/pvc": req.PVC, + "soteria.bstein.dev/namespace": req.Namespace, + "soteria.bstein.dev/pvc": req.PVC, + "soteria.bstein.dev/requested-by": requester, } if _, err := s.longhorn.SnapshotBackup(r.Context(), volumeName, backupName, labels, s.cfg.LonghornBackupMode); err != nil { - writeError(w, http.StatusBadRequest, err.Error()) + s.metrics.RecordBackupRequest("longhorn", "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) return } + s.metrics.RecordBackupRequest("longhorn", "success") writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "longhorn", - Volume: volumeName, - Backup: backupName, - Namespace: req.Namespace, - DryRun: false, + Driver: "longhorn", + Volume: volumeName, + Backup: backupName, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: false, }) case "restic": jobName, secretName, err := s.client.CreateBackupJob(r.Context(), s.cfg, req) if err != nil { + s.metrics.RecordBackupRequest("restic", "backend_error") writeError(w, http.StatusBadRequest, err.Error()) return } - + if req.DryRun { + s.metrics.RecordBackupRequest("restic", "dry_run") + } else { + s.metrics.RecordBackupRequest("restic", "success") + } writeJSON(w, http.StatusOK, api.BackupResponse{ - Driver: "restic", - JobName: jobName, - Namespace: req.Namespace, - Secret: secretName, - DryRun: req.DryRun, + Driver: "restic", + JobName: jobName, + Namespace: req.Namespace, + Secret: secretName, + RequestedBy: requester, + DryRun: req.DryRun, }) default: + s.metrics.RecordBackupRequest(s.cfg.BackupDriver, "unsupported_driver") writeError(w, http.StatusBadRequest, "unsupported backup driver") } } @@ -125,23 +304,48 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, 1<<20) var req api.RestoreTestRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "invalid_json") writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid JSON: %v", err)) return } + if strings.TrimSpace(req.Namespace) == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "namespace is required") + return + } + if strings.TrimSpace(req.PVC) == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "pvc is required") + return + } + if strings.TrimSpace(req.TargetPVC) == "" { + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "validation_error") + writeError(w, http.StatusBadRequest, "target_pvc is required") + return + } + if strings.TrimSpace(req.TargetNamespace) == "" { + req.TargetNamespace = req.Namespace + } + + requester := currentRequester(r.Context()) switch s.cfg.BackupDriver { case "longhorn": - if req.TargetPVC == "" { - writeError(w, http.StatusBadRequest, "target_pvc is required") + exists, err := s.client.PersistentVolumeClaimExists(r.Context(), req.TargetNamespace, req.TargetPVC) + if err != nil { + s.metrics.RecordRestoreRequest("longhorn", "validation_error") + writeError(w, http.StatusBadRequest, err.Error()) return } - if req.PVC == "" { - writeError(w, http.StatusBadRequest, "pvc is required to locate backup volume") + if exists { + s.metrics.RecordRestoreRequest("longhorn", "conflict") + writeError(w, http.StatusConflict, fmt.Sprintf("target pvc %s/%s already exists", req.TargetNamespace, req.TargetPVC)) return } volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC) if err != nil { + s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } @@ -150,32 +354,38 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { if backupURL == "" { backup, err := s.longhorn.FindBackup(r.Context(), volumeName, req.Snapshot) if err != nil { + s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, err.Error()) return } backupURL = backup.URL } if backupURL == "" { + s.metrics.RecordRestoreRequest("longhorn", "validation_error") writeError(w, http.StatusBadRequest, "backup_url is required") return } - restoreVolumeName := backupName("restore", req.TargetPVC) + restoreVolumeName := backupName("restore", req.TargetNamespace+"-"+req.TargetPVC) if req.DryRun { + s.metrics.RecordRestoreRequest("longhorn", "dry_run") writeJSON(w, http.StatusOK, api.RestoreTestResponse{ - Driver: "longhorn", - Volume: restoreVolumeName, - TargetPVC: req.TargetPVC, - BackupURL: backupURL, - Namespace: req.Namespace, - DryRun: true, + Driver: "longhorn", + Volume: restoreVolumeName, + TargetNamespace: req.TargetNamespace, + TargetPVC: req.TargetPVC, + BackupURL: backupURL, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: true, }) return } sourceVolume, err := s.longhorn.GetVolume(r.Context(), volumeName) if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) + s.metrics.RecordRestoreRequest("longhorn", "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) return } replicas := sourceVolume.NumberOfReplicas @@ -184,41 +394,322 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) { } if _, err := s.longhorn.CreateVolumeFromBackup(r.Context(), restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil { - writeError(w, http.StatusBadRequest, err.Error()) + s.metrics.RecordRestoreRequest("longhorn", "backend_error") + writeError(w, http.StatusBadGateway, err.Error()) return } - if err := s.longhorn.CreatePVC(r.Context(), restoreVolumeName, req.Namespace, req.TargetPVC); err != nil { - writeError(w, http.StatusBadRequest, err.Error()) + if err := s.longhorn.CreatePVC(r.Context(), restoreVolumeName, req.TargetNamespace, req.TargetPVC); err != nil { + cleanupErr := s.longhorn.DeleteVolume(r.Context(), restoreVolumeName) + if cleanupErr != nil { + log.Printf("restore cleanup failed for %s: %v", restoreVolumeName, cleanupErr) + writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v (cleanup failed: %v)", err, cleanupErr)) + } else { + writeError(w, http.StatusBadGateway, fmt.Sprintf("create restore pvc: %v", err)) + } + s.metrics.RecordRestoreRequest("longhorn", "backend_error") return } + s.metrics.RecordRestoreRequest("longhorn", "success") writeJSON(w, http.StatusOK, api.RestoreTestResponse{ - Driver: "longhorn", - Volume: restoreVolumeName, - TargetPVC: req.TargetPVC, - BackupURL: backupURL, - Namespace: req.Namespace, - DryRun: false, + Driver: "longhorn", + Volume: restoreVolumeName, + TargetNamespace: req.TargetNamespace, + TargetPVC: req.TargetPVC, + BackupURL: backupURL, + Namespace: req.Namespace, + RequestedBy: requester, + DryRun: false, }) case "restic": jobName, secretName, err := s.client.CreateRestoreJob(r.Context(), s.cfg, req) if err != nil { + s.metrics.RecordRestoreRequest("restic", "backend_error") writeError(w, http.StatusBadRequest, err.Error()) return } - + if req.DryRun { + s.metrics.RecordRestoreRequest("restic", "dry_run") + } else { + s.metrics.RecordRestoreRequest("restic", "success") + } writeJSON(w, http.StatusOK, api.RestoreTestResponse{ - Driver: "restic", - JobName: jobName, - Namespace: req.Namespace, - Secret: secretName, - DryRun: req.DryRun, + Driver: "restic", + JobName: jobName, + Namespace: req.Namespace, + TargetNamespace: req.TargetNamespace, + TargetPVC: req.TargetPVC, + Secret: secretName, + RequestedBy: requester, + DryRun: req.DryRun, }) default: + s.metrics.RecordRestoreRequest(s.cfg.BackupDriver, "unsupported_driver") writeError(w, http.StatusBadRequest, "unsupported backup driver") } } +func (s *Server) buildInventory(ctx context.Context) (api.InventoryResponse, error) { + pvcs, err := s.client.ListBoundPVCs(ctx) + if err != nil { + return api.InventoryResponse{}, err + } + + groups := make(map[string][]api.PVCInventory) + for _, summary := range pvcs { + entry := api.PVCInventory{ + Namespace: summary.Namespace, + PVC: summary.Name, + Volume: summary.VolumeName, + Phase: summary.Phase, + StorageClass: summary.StorageClass, + Capacity: summary.Capacity, + AccessModes: summary.AccessModes, + Driver: s.cfg.BackupDriver, + } + s.enrichPVCInventory(ctx, &entry) + groups[summary.Namespace] = append(groups[summary.Namespace], entry) + } + + namespaceNames := make([]string, 0, len(groups)) + for namespace := range groups { + namespaceNames = append(namespaceNames, namespace) + } + sort.Strings(namespaceNames) + + response := api.InventoryResponse{ + GeneratedAt: time.Now().UTC().Format(time.RFC3339), + Namespaces: make([]api.NamespaceInventory, 0, len(namespaceNames)), + } + for _, namespace := range namespaceNames { + response.Namespaces = append(response.Namespaces, api.NamespaceInventory{ + Name: namespace, + PVCs: groups[namespace], + }) + } + return response, nil +} + +func (s *Server) enrichPVCInventory(ctx context.Context, entry *api.PVCInventory) { + switch s.cfg.BackupDriver { + case "longhorn": + backups, err := s.longhorn.ListBackups(ctx, entry.Volume) + if err != nil { + entry.Healthy = false + entry.HealthReason = "lookup_failed" + entry.Error = err.Error() + return + } + entry.BackupCount = len(backups) + latest, latestTime, ok := latestCompletedBackup(backups) + if !ok { + entry.Healthy = false + if len(backups) == 0 { + entry.HealthReason = "missing" + } else { + entry.HealthReason = "no_completed" + } + return + } + entry.LastBackupAt = latest.Created + if latestTime.IsZero() { + entry.Healthy = false + entry.HealthReason = "unknown_timestamp" + return + } + entry.LastBackupAgeHours = roundHours(time.Since(latestTime).Hours()) + entry.Healthy = time.Since(latestTime) <= s.cfg.BackupMaxAge + if entry.Healthy { + entry.HealthReason = "fresh" + } else { + entry.HealthReason = "stale" + } + case "restic": + entry.Healthy = false + entry.HealthReason = "inventory_unavailable" + entry.Error = "restic inventory telemetry is not implemented yet" + default: + entry.Healthy = false + entry.HealthReason = "unsupported_driver" + } +} + +func (s *Server) refreshTelemetry(ctx context.Context) { + refreshCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + inventory, err := s.buildInventory(refreshCtx) + if err != nil { + log.Printf("inventory refresh failed: %v", err) + s.metrics.RecordInventoryFailure() + return + } + s.metrics.RecordInventory(inventory) +} + +func (s *Server) authorize(r *http.Request) (authIdentity, int, error) { + if !s.cfg.AuthRequired { + return authIdentity{}, http.StatusOK, nil + } + + authorization := strings.TrimSpace(r.Header.Get("Authorization")) + if strings.HasPrefix(strings.ToLower(authorization), "bearer ") { + token := strings.TrimSpace(authorization[7:]) + for _, expected := range s.cfg.AuthBearerTokens { + if token != "" && token == expected { + return authIdentity{Authenticated: true, User: "service-token", Groups: []string{"service-token"}}, http.StatusOK, nil + } + } + } + + identity := authIdentity{ + Authenticated: true, + User: strings.TrimSpace(r.Header.Get("X-Auth-Request-User")), + Email: strings.TrimSpace(r.Header.Get("X-Auth-Request-Email")), + Groups: normalizeGroups(strings.Split(strings.TrimSpace(r.Header.Get("X-Auth-Request-Groups")), ",")), + } + if identity.User == "" && identity.Email == "" { + return authIdentity{}, http.StatusUnauthorized, fmt.Errorf("authentication required") + } + if len(s.cfg.AllowedGroups) == 0 { + return identity, http.StatusOK, nil + } + if hasAllowedGroup(identity.Groups, s.cfg.AllowedGroups) { + return identity, http.StatusOK, nil + } + return authIdentity{}, http.StatusForbidden, fmt.Errorf("access requires one of: %s", strings.Join(s.cfg.AllowedGroups, ", ")) +} + +func requesterFromContext(ctx context.Context) authIdentity { + identity, _ := ctx.Value(authContextKey).(authIdentity) + return identity +} + +func currentRequester(ctx context.Context) string { + identity := requesterFromContext(ctx) + if identity.User != "" { + return identity.User + } + if identity.Email != "" { + return identity.Email + } + if identity.Authenticated { + return "authenticated" + } + return "anonymous" +} + +func authzReason(status int, err error) string { + if err == nil { + return "unknown" + } + switch status { + case http.StatusUnauthorized: + return "unauthenticated" + case http.StatusForbidden: + return "forbidden_group" + default: + return "error" + } +} + +func hasAllowedGroup(actual, allowed []string) bool { + allowedSet := make(map[string]struct{}, len(allowed)) + for _, group := range normalizeGroups(allowed) { + allowedSet[group] = struct{}{} + } + for _, group := range normalizeGroups(actual) { + if _, ok := allowedSet[group]; ok { + return true + } + } + return false +} + +func normalizeGroups(values []string) []string { + groups := make([]string, 0, len(values)) + for _, value := range values { + value = strings.TrimSpace(strings.TrimPrefix(value, "/")) + if value == "" { + continue + } + groups = append(groups, value) + } + return groups +} + +func buildBackupRecords(backups []longhorn.Backup) []api.BackupRecord { + records := make([]api.BackupRecord, 0, len(backups)) + latestName := "" + if latest, _, ok := latestCompletedBackup(backups); ok { + latestName = latest.Name + } + + sort.Slice(backups, func(i, j int) bool { + left, lok := parseBackupTime(backups[i].Created) + right, rok := parseBackupTime(backups[j].Created) + switch { + case lok && rok: + return left.After(right) + case lok: + return true + case rok: + return false + default: + return backups[i].Name > backups[j].Name + } + }) + + for _, backup := range backups { + records = append(records, api.BackupRecord{ + Name: backup.Name, + SnapshotName: backup.SnapshotName, + Created: backup.Created, + State: backup.State, + URL: backup.URL, + Size: backup.Size, + Latest: backup.Name == latestName, + }) + } + return records +} + +func latestCompletedBackup(backups []longhorn.Backup) (longhorn.Backup, time.Time, bool) { + var selected longhorn.Backup + var selectedTime time.Time + found := false + for _, backup := range backups { + if backup.State != "Completed" { + continue + } + createdAt, ok := parseBackupTime(backup.Created) + if !ok { + if !found { + selected = backup + found = true + } + continue + } + if !found || createdAt.After(selectedTime) { + selected = backup + selectedTime = createdAt + found = true + } + } + return selected, selectedTime, found +} + +func parseBackupTime(raw string) (time.Time, bool) { + layouts := []string{time.RFC3339Nano, time.RFC3339} + for _, layout := range layouts { + parsed, err := time.Parse(layout, raw) + if err == nil { + return parsed, true + } + } + return time.Time{}, false +} + func writeJSON(w http.ResponseWriter, status int, payload any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) @@ -254,3 +745,7 @@ func sanitizeName(value string) string { value = strings.Trim(value, "-") return value } + +func roundHours(value float64) float64 { + return math.Round(value*100) / 100 +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 0000000..e172460 --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,161 @@ +package server + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/soteria/internal/api" + "scm.bstein.dev/bstein/soteria/internal/config" + "scm.bstein.dev/bstein/soteria/internal/k8s" + "scm.bstein.dev/bstein/soteria/internal/longhorn" + + corev1 "k8s.io/api/core/v1" +) + +type fakeKubeClient struct { + pvcs []k8s.PVCSummary + targetExists bool +} + +func (f *fakeKubeClient) ResolvePVCVolume(_ context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { + return namespace + "-" + pvcName + "-pv", nil, nil, nil +} + +func (f *fakeKubeClient) CreateBackupJob(_ context.Context, _ *config.Config, _ api.BackupRequest) (string, string, error) { + return "backup-job", "backup-secret", nil +} + +func (f *fakeKubeClient) CreateRestoreJob(_ context.Context, _ *config.Config, _ api.RestoreTestRequest) (string, string, error) { + return "restore-job", "restore-secret", nil +} + +func (f *fakeKubeClient) ListBoundPVCs(_ context.Context) ([]k8s.PVCSummary, error) { + return f.pvcs, nil +} + +func (f *fakeKubeClient) PersistentVolumeClaimExists(_ context.Context, _, _ string) (bool, error) { + return f.targetExists, nil +} + +type fakeLonghornClient struct { + backups []longhorn.Backup +} + +func (f *fakeLonghornClient) SnapshotBackup(_ context.Context, volume, name string, labels map[string]string, backupMode string) (*longhorn.Volume, error) { + return &longhorn.Volume{Name: volume}, nil +} + +func (f *fakeLonghornClient) GetVolume(_ context.Context, volume string) (*longhorn.Volume, error) { + return &longhorn.Volume{Name: volume, Size: "1073741824", NumberOfReplicas: 2}, nil +} + +func (f *fakeLonghornClient) CreateVolumeFromBackup(_ context.Context, name, size string, replicas int, backupURL string) (*longhorn.Volume, error) { + return &longhorn.Volume{Name: name, Size: size, NumberOfReplicas: replicas}, nil +} + +func (f *fakeLonghornClient) CreatePVC(_ context.Context, volumeName, namespace, pvcName string) error { + return nil +} + +func (f *fakeLonghornClient) DeleteVolume(_ context.Context, volumeName string) error { + return nil +} + +func (f *fakeLonghornClient) FindBackup(_ context.Context, volumeName, snapshot string) (*longhorn.Backup, error) { + return &longhorn.Backup{Name: "backup-latest", URL: "s3://bucket/backup-latest", State: "Completed"}, nil +} + +func (f *fakeLonghornClient) ListBackups(_ context.Context, volumeName string) ([]longhorn.Backup, error) { + return f.backups, nil +} + +func TestProtectedInventoryRequiresAuth(t *testing.T) { + srv := &Server{ + cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin", "maintenance"}, BackupDriver: "longhorn"}, + client: &fakeKubeClient{}, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + req := httptest.NewRequest(http.MethodGet, "/v1/inventory", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", res.Code) + } +} + +func TestProtectedInventoryAllowsMaintenanceGroup(t *testing.T) { + srv := &Server{ + cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin", "maintenance"}, BackupDriver: "longhorn", BackupMaxAge: 24 * time.Hour}, + client: &fakeKubeClient{pvcs: []k8s.PVCSummary{{Namespace: "apps", Name: "data", VolumeName: "pv-apps-data", Phase: "Bound"}}}, + longhorn: &fakeLonghornClient{backups: []longhorn.Backup{{Name: "backup-1", Created: "2026-04-12T00:00:00Z", State: "Completed", URL: "s3://bucket/backup-1"}}}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + req := httptest.NewRequest(http.MethodGet, "/v1/inventory", nil) + req.Header.Set("X-Auth-Request-User", "brad") + req.Header.Set("X-Auth-Request-Groups", "/maintenance") + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", res.Code, res.Body.String()) + } + + var payload api.InventoryResponse + if err := json.NewDecoder(strings.NewReader(res.Body.String())).Decode(&payload); err != nil { + t.Fatalf("decode inventory: %v", err) + } + if len(payload.Namespaces) != 1 || payload.Namespaces[0].Name != "apps" { + t.Fatalf("unexpected inventory payload: %#v", payload) + } +} + +func TestRestoreRejectsExistingTargetPVC(t *testing.T) { + srv := &Server{ + cfg: &config.Config{AuthRequired: false, BackupDriver: "longhorn"}, + client: &fakeKubeClient{targetExists: true}, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + body := `{"namespace":"apps","pvc":"data","target_namespace":"apps","target_pvc":"restore-data"}` + req := httptest.NewRequest(http.MethodPost, "/v1/restores", strings.NewReader(body)) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusConflict { + t.Fatalf("expected 409, got %d: %s", res.Code, res.Body.String()) + } +} + +func TestMetricsStayPublic(t *testing.T) { + srv := &Server{ + cfg: &config.Config{AuthRequired: true, AllowedGroups: []string{"admin"}}, + client: &fakeKubeClient{}, + longhorn: &fakeLonghornClient{}, + metrics: newTelemetry(), + } + srv.handler = http.HandlerFunc(srv.route) + + req := httptest.NewRequest(http.MethodGet, "/metrics", nil) + res := httptest.NewRecorder() + srv.Handler().ServeHTTP(res, req) + + if res.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", res.Code) + } + if !strings.Contains(res.Body.String(), "soteria_backup_requests_total") { + t.Fatalf("expected prometheus metrics body, got %q", res.Body.String()) + } +} diff --git a/internal/server/ui.go b/internal/server/ui.go new file mode 100644 index 0000000..f0bf955 --- /dev/null +++ b/internal/server/ui.go @@ -0,0 +1,6 @@ +package server + +import _ "embed" + +//go:embed ui.html +var uiHTML string diff --git a/internal/server/ui.html b/internal/server/ui.html new file mode 100644 index 0000000..564f59c --- /dev/null +++ b/internal/server/ui.html @@ -0,0 +1,336 @@ + + + + + + Soteria Backup Console + + + +
+
+
+

Soteria Backup Console

+

Namespace-grouped PVC backup and restore control plane for Atlas.

+
+
+ Checking access... + +
+
+
+
+
+
+

PVC Inventory

+ +
+
+

Loading PVC inventory...

+
+
+ +
+ + +