soteria/internal/server/restic_usage_store.go

260 lines
6.3 KiB
Go
Raw Normal View History

package server
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"regexp"
"sort"
"strconv"
"strings"
"time"
)
var (
resticAddedStoredPattern = regexp.MustCompile(`(?mi)added to the (?:repository|repo):[^\n]*\(([^)]+)\s+stored\)`)
resticDataAddedPattern = regexp.MustCompile(`(?m)"data_added":\s*([0-9]+)`)
)
func (s *Server) lookupResticStoredBytesForJob(ctx context.Context, namespace, jobName string) (float64, bool) {
key := namespace + "/" + jobName
s.jobUsageMu.RLock()
cached, ok := s.jobUsage[key]
s.jobUsageMu.RUnlock()
if ok && time.Since(cached.CheckedAt) < 15*time.Minute {
return cached.Bytes, cached.Known
}
if bytes, known := s.lookupPersistedResticUsage(key); known {
entry := resticJobUsageCacheEntry{
Known: true,
Bytes: bytes,
CheckedAt: time.Now().UTC(),
}
s.jobUsageMu.Lock()
if s.jobUsage == nil {
s.jobUsage = map[string]resticJobUsageCacheEntry{}
}
s.jobUsage[key] = entry
s.jobUsageMu.Unlock()
return bytes, true
}
logBody, err := s.client.ReadBackupJobLog(ctx, namespace, jobName)
entry := resticJobUsageCacheEntry{
Known: false,
Bytes: 0,
CheckedAt: time.Now().UTC(),
}
if err == nil {
if parsedBytes, parsed := parseResticStoredBytes(logBody); parsed {
entry.Known = true
entry.Bytes = parsedBytes
s.storePersistedResticUsage(ctx, key, parsedBytes)
}
}
s.jobUsageMu.Lock()
if s.jobUsage == nil {
s.jobUsage = map[string]resticJobUsageCacheEntry{}
}
s.jobUsage[key] = entry
s.jobUsageMu.Unlock()
return entry.Bytes, entry.Known
}
func parseResticStoredBytes(logBody string) (float64, bool) {
if logBody == "" {
return 0, false
}
matches := resticDataAddedPattern.FindAllStringSubmatch(logBody, -1)
if len(matches) > 0 {
last := matches[len(matches)-1]
if len(last) > 1 {
if value, err := strconv.ParseFloat(strings.TrimSpace(last[1]), 64); err == nil {
return value, true
}
}
}
textMatches := resticAddedStoredPattern.FindAllStringSubmatch(logBody, -1)
if len(textMatches) == 0 {
return 0, false
}
last := textMatches[len(textMatches)-1]
if len(last) < 2 {
return 0, false
}
return parseHumanByteSize(last[1])
}
func parseHumanByteSize(raw string) (float64, bool) {
parts := strings.Fields(strings.TrimSpace(raw))
if len(parts) < 2 {
return 0, false
}
value, err := strconv.ParseFloat(strings.ReplaceAll(parts[0], ",", ""), 64)
if err != nil {
return 0, false
}
unit := strings.ToUpper(strings.TrimSpace(parts[1]))
switch unit {
case "B":
return value, true
case "KIB":
return value * 1024, true
case "MIB":
return value * 1024 * 1024, true
case "GIB":
return value * 1024 * 1024 * 1024, true
case "TIB":
return value * 1024 * 1024 * 1024 * 1024, true
case "KB":
return value * 1000, true
case "MB":
return value * 1000 * 1000, true
case "GB":
return value * 1000 * 1000 * 1000, true
case "TB":
return value * 1000 * 1000 * 1000 * 1000, true
default:
return 0, false
}
}
func (s *Server) loadResticUsage(ctx context.Context) error {
if strings.TrimSpace(s.cfg.UsageSecretName) == "" {
return nil
}
raw, err := s.client.LoadSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey)
if err != nil {
return err
}
if len(raw) == 0 {
return nil
}
var doc resticPersistedUsageDocument
if err := json.Unmarshal(raw, &doc); err != nil {
return fmt.Errorf("decode restic usage document: %w", err)
}
next := map[string]resticPersistedUsageEntry{}
for _, item := range doc.Jobs {
key := strings.TrimSpace(item.Key)
if key == "" || item.Bytes < 0 || math.IsNaN(item.Bytes) || math.IsInf(item.Bytes, 0) {
continue
}
next[key] = resticPersistedUsageEntry{
Bytes: item.Bytes,
UpdatedAt: strings.TrimSpace(item.UpdatedAt),
}
}
s.usageMu.Lock()
s.usageStore = next
s.usageMu.Unlock()
return nil
}
func (s *Server) lookupPersistedResticUsage(key string) (float64, bool) {
s.usageMu.RLock()
defer s.usageMu.RUnlock()
if s.usageStore == nil {
return 0, false
}
entry, ok := s.usageStore[key]
if !ok {
return 0, false
}
if entry.Bytes < 0 || math.IsNaN(entry.Bytes) || math.IsInf(entry.Bytes, 0) {
return 0, false
}
return entry.Bytes, true
}
func (s *Server) storePersistedResticUsage(ctx context.Context, key string, value float64) {
if key == "" || value < 0 || math.IsNaN(value) || math.IsInf(value, 0) {
return
}
now := time.Now().UTC().Format(time.RFC3339)
changed := false
s.usageMu.Lock()
if s.usageStore == nil {
s.usageStore = map[string]resticPersistedUsageEntry{}
}
current, exists := s.usageStore[key]
if !exists || current.Bytes != value || strings.TrimSpace(current.UpdatedAt) == "" {
s.usageStore[key] = resticPersistedUsageEntry{
Bytes: value,
UpdatedAt: now,
}
changed = true
}
s.usageMu.Unlock()
if !changed {
return
}
if err := s.persistResticUsage(ctx); err != nil {
log.Printf("persist restic usage failed: %v", err)
}
}
func (s *Server) persistResticUsage(ctx context.Context) error {
if strings.TrimSpace(s.cfg.UsageSecretName) == "" {
return nil
}
s.usageMu.RLock()
entries := make([]struct {
Key string
Value resticPersistedUsageEntry
}, 0, len(s.usageStore))
for key, value := range s.usageStore {
entries = append(entries, struct {
Key string
Value resticPersistedUsageEntry
}{Key: key, Value: value})
}
s.usageMu.RUnlock()
sort.Slice(entries, func(i, j int) bool {
return entries[i].Key < entries[j].Key
})
doc := resticPersistedUsageDocument{
Jobs: make([]struct {
Key string `json:"key"`
Bytes float64 `json:"bytes"`
UpdatedAt string `json:"updated_at,omitempty"`
}, 0, len(entries)),
}
for _, entry := range entries {
if entry.Key == "" || entry.Value.Bytes < 0 || math.IsNaN(entry.Value.Bytes) || math.IsInf(entry.Value.Bytes, 0) {
continue
}
doc.Jobs = append(doc.Jobs, struct {
Key string `json:"key"`
Bytes float64 `json:"bytes"`
UpdatedAt string `json:"updated_at,omitempty"`
}{
Key: entry.Key,
Bytes: entry.Value.Bytes,
UpdatedAt: strings.TrimSpace(entry.Value.UpdatedAt),
})
}
payload, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("encode restic usage document: %w", err)
}
return s.client.SaveSecretData(ctx, s.cfg.Namespace, s.cfg.UsageSecretName, usageSecretKey, payload, map[string]string{
"app.kubernetes.io/name": "soteria",
"app.kubernetes.io/component": "usage-store",
})
}