feat: orchestrate longhorn backups
This commit is contained in:
parent
eb6de2db73
commit
3eee5fc7e1
26
README.md
26
README.md
@ -1,16 +1,16 @@
|
||||
# soteria
|
||||
|
||||
Soteria is a small in-cluster service that launches restic Jobs to back up PVCs. It is intended to be called by Ariadne (or another controller) and focuses on:
|
||||
Soteria is a small in-cluster service that orchestrates Longhorn backups for PVCs. It is intended to be called by Ariadne (or another controller) and focuses on:
|
||||
|
||||
- Encrypted restic backups to an S3-compatible backend (Backblaze B2 by default).
|
||||
- On-demand restore tests into an emptyDir or a target PVC.
|
||||
- Minimal long-running footprint (the backup work happens in Jobs).
|
||||
- Longhorn-managed backups to an S3-compatible backend (Backblaze B2 by default).
|
||||
- On-demand restore tests into a target PVC.
|
||||
- Minimal long-running footprint (the backup work happens in Longhorn).
|
||||
|
||||
Snapshots are not implemented yet; backups are crash-consistent for the PVC as mounted.
|
||||
Snapshots are managed by Longhorn; backups are crash-consistent for the PVC as mounted.
|
||||
|
||||
## API
|
||||
|
||||
### POST /v1/backup
|
||||
### POST /v1/backup (Longhorn)
|
||||
|
||||
```json
|
||||
{
|
||||
@ -32,22 +32,30 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
### POST /v1/restore-test
|
||||
### POST /v1/restore-test (Longhorn)
|
||||
|
||||
```json
|
||||
{
|
||||
"namespace": "ai",
|
||||
"snapshot": "latest",
|
||||
"pvc": "ollama-models",
|
||||
"target_pvc": "restore-sandbox",
|
||||
"dry_run": false
|
||||
}
|
||||
```
|
||||
|
||||
Notes:
|
||||
- `pvc` is required to resolve the Longhorn volume and locate the latest backup.
|
||||
- `snapshot` may be `latest` or a specific backup/snapshot name. You can also pass `backup_url`.
|
||||
|
||||
## Configuration
|
||||
|
||||
Environment variables:
|
||||
|
||||
- `SOTERIA_RESTIC_REPOSITORY` (required) Example: `s3:s3.us-west-004.backblazeb2.com/atlas-backups`
|
||||
- `SOTERIA_BACKUP_DRIVER` (default: `longhorn`, allowed: `longhorn`, `restic`)
|
||||
- `SOTERIA_LONGHORN_URL` (default: `http://longhorn-backend.longhorn-system.svc:9500`)
|
||||
- `SOTERIA_LONGHORN_BACKUP_MODE` (default: `incremental`, allowed: `incremental`, `full`)
|
||||
- `SOTERIA_RESTIC_REPOSITORY` (required for restic driver) Example: `s3:s3.us-west-004.backblazeb2.com/atlas-backups`
|
||||
- `SOTERIA_RESTIC_SECRET_NAME` (default: `soteria-restic`)
|
||||
- `SOTERIA_SECRET_NAMESPACE` (default: service namespace)
|
||||
- `SOTERIA_RESTIC_IMAGE` (default: `restic/restic:0.16.4`)
|
||||
@ -64,7 +72,7 @@ The restic repository is encrypted with `RESTIC_PASSWORD` from the secret below.
|
||||
|
||||
## Secrets
|
||||
|
||||
Create a secret named `soteria-restic` in the Soteria namespace (or set `SOTERIA_RESTIC_SECRET_NAME`). Keys required:
|
||||
Create a secret named `soteria-restic` in the Soteria namespace (or set `SOTERIA_RESTIC_SECRET_NAME`) if using the restic driver. Keys required:
|
||||
|
||||
- `AWS_ACCESS_KEY_ID`
|
||||
- `AWS_SECRET_ACCESS_KEY`
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"scm.bstein.dev/bstein/soteria/internal/config"
|
||||
"scm.bstein.dev/bstein/soteria/internal/k8s"
|
||||
"scm.bstein.dev/bstein/soteria/internal/longhorn"
|
||||
"scm.bstein.dev/bstein/soteria/internal/server"
|
||||
)
|
||||
|
||||
@ -28,7 +29,8 @@ func main() {
|
||||
log.Fatalf("k8s client: %v", err)
|
||||
}
|
||||
|
||||
srv := server.New(cfg, client)
|
||||
longhornClient := longhorn.New(cfg.LonghornURL)
|
||||
srv := server.New(cfg, client, longhornClient)
|
||||
httpServer := &http.Server{
|
||||
Addr: cfg.ListenAddr,
|
||||
Handler: srv.Handler(),
|
||||
|
||||
@ -9,22 +9,31 @@ type BackupRequest struct {
|
||||
}
|
||||
|
||||
type BackupResponse struct {
|
||||
JobName string `json:"job_name"`
|
||||
Namespace string `json:"namespace"`
|
||||
Secret string `json:"secret"`
|
||||
Driver string `json:"driver,omitempty"`
|
||||
Volume string `json:"volume,omitempty"`
|
||||
Backup string `json:"backup,omitempty"`
|
||||
JobName string `json:"job_name,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Secret string `json:"secret,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
}
|
||||
|
||||
type RestoreTestRequest struct {
|
||||
Namespace string `json:"namespace"`
|
||||
PVC string `json:"pvc,omitempty"`
|
||||
Snapshot string `json:"snapshot,omitempty"`
|
||||
BackupURL string `json:"backup_url,omitempty"`
|
||||
TargetPVC string `json:"target_pvc,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
}
|
||||
|
||||
type RestoreTestResponse struct {
|
||||
JobName string `json:"job_name"`
|
||||
Namespace string `json:"namespace"`
|
||||
Secret string `json:"secret"`
|
||||
Driver string `json:"driver,omitempty"`
|
||||
Volume string `json:"volume,omitempty"`
|
||||
TargetPVC string `json:"target_pvc,omitempty"`
|
||||
BackupURL string `json:"backup_url,omitempty"`
|
||||
JobName string `json:"job_name,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Secret string `json:"secret,omitempty"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
}
|
||||
|
||||
@ -8,16 +8,20 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultResticImage = "restic/restic:0.16.4"
|
||||
defaultJobTTLSeconds = 86400
|
||||
defaultListenAddr = ":8080"
|
||||
defaultResticSecret = "soteria-restic"
|
||||
serviceNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
|
||||
defaultBackupDriver = "longhorn"
|
||||
defaultResticImage = "restic/restic:0.16.4"
|
||||
defaultJobTTLSeconds = 86400
|
||||
defaultListenAddr = ":8080"
|
||||
defaultResticSecret = "soteria-restic"
|
||||
defaultLonghornURL = "http://longhorn-backend.longhorn-system.svc:9500"
|
||||
defaultLonghornMode = "incremental"
|
||||
serviceNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Namespace string
|
||||
SecretNamespace string
|
||||
BackupDriver string
|
||||
ResticImage string
|
||||
ResticRepository string
|
||||
ResticSecretName string
|
||||
@ -29,6 +33,8 @@ type Config struct {
|
||||
JobNodeSelector map[string]string
|
||||
WorkerServiceAccount string
|
||||
ListenAddr string
|
||||
LonghornURL string
|
||||
LonghornBackupMode string
|
||||
}
|
||||
|
||||
func Load() (*Config, error) {
|
||||
@ -48,6 +54,7 @@ func Load() (*Config, error) {
|
||||
cfg.SecretNamespace = cfg.Namespace
|
||||
}
|
||||
|
||||
cfg.BackupDriver = getenvDefault("SOTERIA_BACKUP_DRIVER", defaultBackupDriver)
|
||||
cfg.ResticImage = getenvDefault("SOTERIA_RESTIC_IMAGE", defaultResticImage)
|
||||
cfg.ResticRepository = getenv("SOTERIA_RESTIC_REPOSITORY")
|
||||
cfg.ResticSecretName = getenvDefault("SOTERIA_RESTIC_SECRET_NAME", defaultResticSecret)
|
||||
@ -58,6 +65,8 @@ func Load() (*Config, error) {
|
||||
cfg.WorkerServiceAccount = getenv("SOTERIA_JOB_SERVICE_ACCOUNT")
|
||||
cfg.ListenAddr = getenvDefault("SOTERIA_LISTEN_ADDR", defaultListenAddr)
|
||||
cfg.JobNodeSelector = parseNodeSelector(getenv("SOTERIA_JOB_NODE_SELECTOR"))
|
||||
cfg.LonghornURL = getenvDefault("SOTERIA_LONGHORN_URL", defaultLonghornURL)
|
||||
cfg.LonghornBackupMode = getenvDefault("SOTERIA_LONGHORN_BACKUP_MODE", defaultLonghornMode)
|
||||
|
||||
if ttl, ok := getenvInt("SOTERIA_JOB_TTL_SECONDS"); ok {
|
||||
cfg.JobTTLSeconds = int32(ttl)
|
||||
@ -66,18 +75,37 @@ func Load() (*Config, error) {
|
||||
}
|
||||
|
||||
if cfg.ResticRepository == "" {
|
||||
return nil, errors.New("SOTERIA_RESTIC_REPOSITORY is required")
|
||||
if cfg.BackupDriver == "restic" {
|
||||
return nil, errors.New("SOTERIA_RESTIC_REPOSITORY is required for restic driver")
|
||||
}
|
||||
}
|
||||
if cfg.ResticSecretName == "" {
|
||||
return nil, errors.New("SOTERIA_RESTIC_SECRET_NAME is required")
|
||||
if cfg.BackupDriver == "restic" {
|
||||
return nil, errors.New("SOTERIA_RESTIC_SECRET_NAME is required for restic driver")
|
||||
}
|
||||
}
|
||||
|
||||
if strings.Contains(cfg.ResticRepository, "..") {
|
||||
return nil, errors.New("SOTERIA_RESTIC_REPOSITORY contains invalid path segments")
|
||||
if cfg.BackupDriver == "restic" {
|
||||
return nil, errors.New("SOTERIA_RESTIC_REPOSITORY contains invalid path segments")
|
||||
}
|
||||
}
|
||||
if cfg.JobNodeSelector == nil {
|
||||
return nil, errors.New("SOTERIA_JOB_NODE_SELECTOR is invalid; expected key=value pairs")
|
||||
}
|
||||
if cfg.BackupDriver != "longhorn" && cfg.BackupDriver != "restic" {
|
||||
return nil, errors.New("SOTERIA_BACKUP_DRIVER must be longhorn or restic")
|
||||
}
|
||||
if cfg.BackupDriver == "longhorn" && cfg.LonghornURL == "" {
|
||||
return nil, errors.New("SOTERIA_LONGHORN_URL is required for longhorn driver")
|
||||
}
|
||||
if cfg.BackupDriver == "longhorn" && cfg.LonghornBackupMode != "" {
|
||||
switch cfg.LonghornBackupMode {
|
||||
case "incremental", "full":
|
||||
default:
|
||||
return nil, errors.New("SOTERIA_LONGHORN_BACKUP_MODE must be incremental or full")
|
||||
}
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
26
internal/k8s/volumes.go
Normal file
26
internal/k8s/volumes.go
Normal file
@ -0,0 +1,26 @@
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func (c *Client) ResolvePVCVolume(ctx context.Context, namespace, pvcName string) (string, *corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) {
|
||||
pvc, err := c.Clientset.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", nil, nil, fmt.Errorf("get pvc %s/%s: %w", namespace, pvcName, err)
|
||||
}
|
||||
if pvc.Spec.VolumeName == "" {
|
||||
return "", pvc, nil, fmt.Errorf("pvc %s/%s has no bound volume", namespace, pvcName)
|
||||
}
|
||||
|
||||
pv, err := c.Clientset.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", pvc, nil, fmt.Errorf("get pv %s: %w", pvc.Spec.VolumeName, err)
|
||||
}
|
||||
|
||||
return pvc.Spec.VolumeName, pvc, pv, nil
|
||||
}
|
||||
267
internal/longhorn/client.go
Normal file
267
internal/longhorn/client.go
Normal file
@ -0,0 +1,267 @@
|
||||
package longhorn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const defaultTimeout = 30 * time.Second
|
||||
|
||||
type Client struct {
|
||||
baseURL string
|
||||
http *http.Client
|
||||
}
|
||||
|
||||
func New(baseURL string) *Client {
|
||||
baseURL = strings.TrimSuffix(baseURL, "/")
|
||||
return &Client{
|
||||
baseURL: baseURL,
|
||||
http: &http.Client{
|
||||
Timeout: defaultTimeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type APIError struct {
|
||||
Status int
|
||||
Message string
|
||||
}
|
||||
|
||||
func (e *APIError) Error() string {
|
||||
return fmt.Sprintf("longhorn api error: status=%d message=%s", e.Status, e.Message)
|
||||
}
|
||||
|
||||
type Volume struct {
|
||||
Name string `json:"name"`
|
||||
Size string `json:"size"`
|
||||
NumberOfReplicas int `json:"numberOfReplicas"`
|
||||
BackupStatus []BackupStatus `json:"backupStatus"`
|
||||
LastBackup string `json:"lastBackup"`
|
||||
LastBackupAt string `json:"lastBackupAt"`
|
||||
Actions map[string]any `json:"actions"`
|
||||
}
|
||||
|
||||
type BackupStatus struct {
|
||||
Snapshot string `json:"snapshot"`
|
||||
BackupURL string `json:"backupURL"`
|
||||
State string `json:"state"`
|
||||
Error string `json:"error"`
|
||||
Progress int `json:"progress"`
|
||||
}
|
||||
|
||||
type BackupVolume struct {
|
||||
Name string `json:"name"`
|
||||
Actions map[string]string `json:"actions"`
|
||||
}
|
||||
|
||||
type Backup struct {
|
||||
Name string `json:"name"`
|
||||
SnapshotName string `json:"snapshotName"`
|
||||
VolumeName string `json:"volumeName"`
|
||||
Created string `json:"created"`
|
||||
State string `json:"state"`
|
||||
URL string `json:"url"`
|
||||
Size string `json:"size"`
|
||||
}
|
||||
|
||||
type backupListOutput struct {
|
||||
Data []Backup `json:"data"`
|
||||
}
|
||||
|
||||
type backupVolumeListOutput struct {
|
||||
Data []BackupVolume `json:"data"`
|
||||
}
|
||||
|
||||
type snapshotInput struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
BackupMode string `json:"backupMode,omitempty"`
|
||||
}
|
||||
|
||||
type pvcCreateInput struct {
|
||||
Namespace string `json:"namespace"`
|
||||
PVCName string `json:"pvcName"`
|
||||
}
|
||||
|
||||
func (c *Client) SnapshotBackup(ctx context.Context, volume, name string, labels map[string]string, backupMode string) (*Volume, error) {
|
||||
path := fmt.Sprintf("%s/v1/volumes/%s?action=snapshotBackup", c.baseURL, url.PathEscape(volume))
|
||||
input := snapshotInput{
|
||||
Name: name,
|
||||
Labels: labels,
|
||||
}
|
||||
if backupMode != "" {
|
||||
input.BackupMode = backupMode
|
||||
}
|
||||
var out Volume
|
||||
if err := c.doJSON(ctx, http.MethodPost, path, input, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (c *Client) GetVolume(ctx context.Context, volume string) (*Volume, error) {
|
||||
path := fmt.Sprintf("%s/v1/volumes/%s", c.baseURL, url.PathEscape(volume))
|
||||
var out Volume
|
||||
if err := c.doJSON(ctx, http.MethodGet, path, nil, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (c *Client) CreateVolumeFromBackup(ctx context.Context, name, size string, replicas int, backupURL string) (*Volume, error) {
|
||||
path := fmt.Sprintf("%s/v1/volumes", c.baseURL)
|
||||
payload := map[string]any{
|
||||
"name": name,
|
||||
"size": size,
|
||||
"fromBackup": backupURL,
|
||||
"dataEngine": "v1",
|
||||
}
|
||||
if replicas > 0 {
|
||||
payload["numberOfReplicas"] = replicas
|
||||
}
|
||||
var out Volume
|
||||
if err := c.doJSON(ctx, http.MethodPost, path, payload, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (c *Client) CreatePVC(ctx context.Context, volumeName, namespace, pvcName string) error {
|
||||
path := fmt.Sprintf("%s/v1/volumes/%s?action=pvcCreate", c.baseURL, url.PathEscape(volumeName))
|
||||
input := pvcCreateInput{
|
||||
Namespace: namespace,
|
||||
PVCName: pvcName,
|
||||
}
|
||||
return c.doJSON(ctx, http.MethodPost, path, input, nil)
|
||||
}
|
||||
|
||||
func (c *Client) GetBackupVolume(ctx context.Context, volumeName string) (*BackupVolume, error) {
|
||||
path := fmt.Sprintf("%s/v1/backupvolumes/%s", c.baseURL, url.PathEscape(volumeName))
|
||||
var out BackupVolume
|
||||
if err := c.doJSON(ctx, http.MethodGet, path, nil, &out); err == nil {
|
||||
return &out, nil
|
||||
} else if apiErr, ok := err.(*APIError); ok && apiErr.Status == http.StatusNotFound {
|
||||
listPath := fmt.Sprintf("%s/v1/backupvolumes", c.baseURL)
|
||||
var list backupVolumeListOutput
|
||||
if listErr := c.doJSON(ctx, http.MethodGet, listPath, nil, &list); listErr != nil {
|
||||
return nil, listErr
|
||||
}
|
||||
for _, item := range list.Data {
|
||||
if item.Name == volumeName {
|
||||
return &item, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("backup volume %s not found", volumeName)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) ListBackups(ctx context.Context, volumeName string) ([]Backup, error) {
|
||||
backupVolume, err := c.GetBackupVolume(ctx, volumeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
listURL, ok := backupVolume.Actions["backupList"]
|
||||
if !ok || listURL == "" {
|
||||
return nil, fmt.Errorf("backup list action missing for volume %s", volumeName)
|
||||
}
|
||||
var out backupListOutput
|
||||
if err := c.doJSON(ctx, http.MethodPost, listURL, map[string]any{}, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out.Data, nil
|
||||
}
|
||||
|
||||
func (c *Client) FindBackup(ctx context.Context, volumeName, snapshot string) (*Backup, error) {
|
||||
backups, err := c.ListBackups(ctx, volumeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(backups) == 0 {
|
||||
return nil, fmt.Errorf("no backups found for volume %s", volumeName)
|
||||
}
|
||||
|
||||
if snapshot != "" && snapshot != "latest" {
|
||||
for _, backup := range backups {
|
||||
if backup.Name == snapshot || backup.SnapshotName == snapshot || backup.URL == snapshot {
|
||||
return &backup, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("backup %s not found for volume %s", snapshot, volumeName)
|
||||
}
|
||||
|
||||
var selected *Backup
|
||||
var selectedTime time.Time
|
||||
for _, backup := range backups {
|
||||
if backup.State != "Completed" {
|
||||
continue
|
||||
}
|
||||
createdAt, err := time.Parse(time.RFC3339, backup.Created)
|
||||
if err != nil {
|
||||
if selected == nil {
|
||||
candidate := backup
|
||||
selected = &candidate
|
||||
}
|
||||
continue
|
||||
}
|
||||
if selected == nil || createdAt.After(selectedTime) {
|
||||
candidate := backup
|
||||
selected = &candidate
|
||||
selectedTime = createdAt
|
||||
}
|
||||
}
|
||||
if selected == nil {
|
||||
return nil, fmt.Errorf("no completed backups found for volume %s", volumeName)
|
||||
}
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
func (c *Client) doJSON(ctx context.Context, method, url string, payload any, out any) error {
|
||||
var body io.Reader
|
||||
if payload != nil {
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode request: %w", err)
|
||||
}
|
||||
body = bytes.NewReader(data)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("build request: %w", err)
|
||||
}
|
||||
if payload != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
msg := strings.TrimSpace(string(respBody))
|
||||
return &APIError{Status: resp.StatusCode, Message: msg}
|
||||
}
|
||||
|
||||
if out == nil || len(respBody) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := json.Unmarshal(respBody, out); err != nil {
|
||||
return fmt.Errorf("decode response: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -4,23 +4,28 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"scm.bstein.dev/bstein/soteria/internal/api"
|
||||
"scm.bstein.dev/bstein/soteria/internal/config"
|
||||
"scm.bstein.dev/bstein/soteria/internal/k8s"
|
||||
"scm.bstein.dev/bstein/soteria/internal/longhorn"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
cfg *config.Config
|
||||
client *k8s.Client
|
||||
mux *http.ServeMux
|
||||
cfg *config.Config
|
||||
client *k8s.Client
|
||||
longhorn *longhorn.Client
|
||||
mux *http.ServeMux
|
||||
}
|
||||
|
||||
func New(cfg *config.Config, client *k8s.Client) *Server {
|
||||
func New(cfg *config.Config, client *k8s.Client, lh *longhorn.Client) *Server {
|
||||
s := &Server{
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
mux: http.NewServeMux(),
|
||||
cfg: cfg,
|
||||
client: client,
|
||||
longhorn: lh,
|
||||
mux: http.NewServeMux(),
|
||||
}
|
||||
|
||||
s.mux.HandleFunc("/healthz", s.handleHealth)
|
||||
@ -56,19 +61,59 @@ func (s *Server) handleBackup(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
jobName, secretName, err := s.client.CreateBackupJob(r.Context(), s.cfg, req)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
switch s.cfg.BackupDriver {
|
||||
case "longhorn":
|
||||
volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
resp := api.BackupResponse{
|
||||
JobName: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Secret: secretName,
|
||||
DryRun: req.DryRun,
|
||||
backupName := backupName("backup", req.PVC)
|
||||
if req.DryRun {
|
||||
writeJSON(w, http.StatusOK, api.BackupResponse{
|
||||
Driver: "longhorn",
|
||||
Volume: volumeName,
|
||||
Backup: backupName,
|
||||
Namespace: req.Namespace,
|
||||
DryRun: true,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
labels := map[string]string{
|
||||
"soteria.bstein.dev/namespace": req.Namespace,
|
||||
"soteria.bstein.dev/pvc": req.PVC,
|
||||
}
|
||||
if _, err := s.longhorn.SnapshotBackup(r.Context(), volumeName, backupName, labels, s.cfg.LonghornBackupMode); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, api.BackupResponse{
|
||||
Driver: "longhorn",
|
||||
Volume: volumeName,
|
||||
Backup: backupName,
|
||||
Namespace: req.Namespace,
|
||||
DryRun: false,
|
||||
})
|
||||
case "restic":
|
||||
jobName, secretName, err := s.client.CreateBackupJob(r.Context(), s.cfg, req)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, api.BackupResponse{
|
||||
Driver: "restic",
|
||||
JobName: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Secret: secretName,
|
||||
DryRun: req.DryRun,
|
||||
})
|
||||
default:
|
||||
writeError(w, http.StatusBadRequest, "unsupported backup driver")
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) {
|
||||
@ -84,19 +129,94 @@ func (s *Server) handleRestore(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
jobName, secretName, err := s.client.CreateRestoreJob(r.Context(), s.cfg, req)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
switch s.cfg.BackupDriver {
|
||||
case "longhorn":
|
||||
if req.TargetPVC == "" {
|
||||
writeError(w, http.StatusBadRequest, "target_pvc is required")
|
||||
return
|
||||
}
|
||||
if req.PVC == "" {
|
||||
writeError(w, http.StatusBadRequest, "pvc is required to locate backup volume")
|
||||
return
|
||||
}
|
||||
|
||||
resp := api.RestoreTestResponse{
|
||||
JobName: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Secret: secretName,
|
||||
DryRun: req.DryRun,
|
||||
volumeName, _, _, err := s.client.ResolvePVCVolume(r.Context(), req.Namespace, req.PVC)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
backupURL := strings.TrimSpace(req.BackupURL)
|
||||
if backupURL == "" {
|
||||
backup, err := s.longhorn.FindBackup(r.Context(), volumeName, req.Snapshot)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
backupURL = backup.URL
|
||||
}
|
||||
if backupURL == "" {
|
||||
writeError(w, http.StatusBadRequest, "backup_url is required")
|
||||
return
|
||||
}
|
||||
|
||||
restoreVolumeName := backupName("restore", req.TargetPVC)
|
||||
if req.DryRun {
|
||||
writeJSON(w, http.StatusOK, api.RestoreTestResponse{
|
||||
Driver: "longhorn",
|
||||
Volume: restoreVolumeName,
|
||||
TargetPVC: req.TargetPVC,
|
||||
BackupURL: backupURL,
|
||||
Namespace: req.Namespace,
|
||||
DryRun: true,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
sourceVolume, err := s.longhorn.GetVolume(r.Context(), volumeName)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
replicas := sourceVolume.NumberOfReplicas
|
||||
if replicas == 0 {
|
||||
replicas = 2
|
||||
}
|
||||
|
||||
if _, err := s.longhorn.CreateVolumeFromBackup(r.Context(), restoreVolumeName, sourceVolume.Size, replicas, backupURL); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if err := s.longhorn.CreatePVC(r.Context(), restoreVolumeName, req.Namespace, req.TargetPVC); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, api.RestoreTestResponse{
|
||||
Driver: "longhorn",
|
||||
Volume: restoreVolumeName,
|
||||
TargetPVC: req.TargetPVC,
|
||||
BackupURL: backupURL,
|
||||
Namespace: req.Namespace,
|
||||
DryRun: false,
|
||||
})
|
||||
case "restic":
|
||||
jobName, secretName, err := s.client.CreateRestoreJob(r.Context(), s.cfg, req)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, api.RestoreTestResponse{
|
||||
Driver: "restic",
|
||||
JobName: jobName,
|
||||
Namespace: req.Namespace,
|
||||
Secret: secretName,
|
||||
DryRun: req.DryRun,
|
||||
})
|
||||
default:
|
||||
writeError(w, http.StatusBadRequest, "unsupported backup driver")
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, payload any) {
|
||||
@ -108,3 +228,29 @@ func writeJSON(w http.ResponseWriter, status int, payload any) {
|
||||
func writeError(w http.ResponseWriter, status int, message string) {
|
||||
writeJSON(w, status, map[string]string{"error": message})
|
||||
}
|
||||
|
||||
func backupName(prefix, value string) string {
|
||||
base := sanitizeName(fmt.Sprintf("soteria-%s-%s", prefix, value))
|
||||
timestamp := time.Now().UTC().Format("20060102-150405")
|
||||
name := fmt.Sprintf("%s-%s", base, timestamp)
|
||||
if len(name) <= 63 {
|
||||
return name
|
||||
}
|
||||
maxBase := 63 - len(timestamp) - 1
|
||||
if maxBase < 1 {
|
||||
maxBase = 1
|
||||
}
|
||||
if len(base) > maxBase {
|
||||
base = base[:maxBase]
|
||||
}
|
||||
return fmt.Sprintf("%s-%s", base, timestamp)
|
||||
}
|
||||
|
||||
func sanitizeName(value string) string {
|
||||
value = strings.ToLower(value)
|
||||
value = strings.ReplaceAll(value, "_", "-")
|
||||
value = strings.ReplaceAll(value, ".", "-")
|
||||
value = strings.ReplaceAll(value, " ", "-")
|
||||
value = strings.Trim(value, "-")
|
||||
return value
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user