289 lines
7.9 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/soteria/internal/api"
"github.com/minio/minio-go/v7"
miniocreds "github.com/minio/minio-go/v7/pkg/credentials"
)
type b2Credentials struct {
Endpoint string
Region string
AccessKeyID string
SecretAccessKey string
}
func (s *Server) handleB2Usage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
forceRefresh := queryBool(r.URL.Query().Get("refresh")) || queryBool(r.URL.Query().Get("force"))
snapshot := s.getB2Usage()
if s.cfg.B2Enabled && (snapshot.ScannedAt == "" || forceRefresh) {
s.refreshB2Usage(r.Context())
snapshot = s.getB2Usage()
}
writeJSON(w, http.StatusOK, snapshot)
}
func queryBool(raw string) bool {
switch strings.ToLower(strings.TrimSpace(raw)) {
case "1", "true", "yes", "y", "on":
return true
default:
return false
}
}
func (s *Server) getB2Usage() api.B2UsageResponse {
s.b2Mu.RLock()
defer s.b2Mu.RUnlock()
return s.b2Usage
}
func (s *Server) setB2Usage(usage api.B2UsageResponse) {
s.b2Mu.Lock()
defer s.b2Mu.Unlock()
s.b2Usage = usage
}
func (s *Server) refreshB2Usage(ctx context.Context) {
usage := api.B2UsageResponse{
Enabled: s.cfg.B2Enabled,
Endpoint: s.cfg.B2Endpoint,
Region: s.cfg.B2Region,
}
if !s.cfg.B2Enabled {
s.setB2Usage(usage)
s.metrics.RecordB2Usage(usage)
return
}
startedAt := time.Now()
scanCtx, cancel := context.WithTimeout(ctx, s.cfg.B2ScanTimeout)
defer cancel()
creds, err := s.resolveB2Credentials(scanCtx)
if err != nil {
usage.ScannedAt = time.Now().UTC().Format(time.RFC3339)
usage.ScanDurationMS = time.Since(startedAt).Milliseconds()
usage.Error = err.Error()
s.setB2Usage(usage)
s.metrics.RecordB2Usage(usage)
return
}
usage.Endpoint = creds.Endpoint
usage.Region = creds.Region
scanned, err := scanB2Usage(scanCtx, creds, s.cfg.B2Buckets)
if err != nil {
usage.ScannedAt = time.Now().UTC().Format(time.RFC3339)
usage.ScanDurationMS = time.Since(startedAt).Milliseconds()
usage.Error = err.Error()
s.setB2Usage(usage)
s.metrics.RecordB2Usage(usage)
return
}
scanned.Enabled = true
scanned.Endpoint = creds.Endpoint
scanned.Region = creds.Region
scanned.ScannedAt = time.Now().UTC().Format(time.RFC3339)
scanned.ScanDurationMS = time.Since(startedAt).Milliseconds()
s.setB2Usage(scanned)
s.metrics.RecordB2Usage(scanned)
}
func (s *Server) resolveB2Credentials(ctx context.Context) (b2Credentials, error) {
creds := b2Credentials{
Endpoint: strings.TrimSpace(s.cfg.B2Endpoint),
Region: strings.TrimSpace(s.cfg.B2Region),
AccessKeyID: strings.TrimSpace(s.cfg.B2AccessKeyID),
SecretAccessKey: strings.TrimSpace(s.cfg.B2SecretAccessKey),
}
if s.cfg.B2SecretName != "" {
if creds.AccessKeyID == "" {
value, err := s.loadB2SecretValue(ctx, s.cfg.B2AccessKeyField)
if err != nil {
return b2Credentials{}, err
}
creds.AccessKeyID = value
}
if creds.SecretAccessKey == "" {
value, err := s.loadB2SecretValue(ctx, s.cfg.B2SecretKeyField)
if err != nil {
return b2Credentials{}, err
}
creds.SecretAccessKey = value
}
if creds.Endpoint == "" && strings.TrimSpace(s.cfg.B2EndpointField) != "" {
value, err := s.loadB2SecretValue(ctx, s.cfg.B2EndpointField)
if err != nil {
return b2Credentials{}, err
}
creds.Endpoint = value
}
}
if creds.Endpoint == "" {
return b2Credentials{}, errors.New("B2 endpoint is not configured")
}
if creds.AccessKeyID == "" {
return b2Credentials{}, errors.New("B2 access key ID is not configured")
}
if creds.SecretAccessKey == "" {
return b2Credentials{}, errors.New("B2 secret access key is not configured")
}
creds.Region = inferB2Region(creds.Endpoint, creds.Region)
return creds, nil
}
func (s *Server) loadB2SecretValue(ctx context.Context, key string) (string, error) {
key = strings.TrimSpace(key)
if key == "" {
return "", errors.New("B2 secret key name is empty")
}
data, err := s.client.LoadSecretData(ctx, s.cfg.B2SecretNamespace, s.cfg.B2SecretName, key)
if err != nil {
return "", fmt.Errorf("load B2 secret %s/%s key %s: %w", s.cfg.B2SecretNamespace, s.cfg.B2SecretName, key, err)
}
value := strings.TrimSpace(string(data))
if value == "" {
return "", fmt.Errorf("B2 secret %s/%s key %s is empty", s.cfg.B2SecretNamespace, s.cfg.B2SecretName, key)
}
return value, nil
}
func scanB2Usage(ctx context.Context, creds b2Credentials, configuredBuckets []string) (api.B2UsageResponse, error) {
endpoint, secure, err := normalizeS3Endpoint(creds.Endpoint)
if err != nil {
return api.B2UsageResponse{}, err
}
client, err := minio.New(endpoint, &minio.Options{
Creds: miniocreds.NewStaticV4(creds.AccessKeyID, creds.SecretAccessKey, ""),
Secure: secure,
Region: creds.Region,
BucketLookup: minio.BucketLookupPath,
})
if err != nil {
return api.B2UsageResponse{}, fmt.Errorf("init B2 S3 client: %w", err)
}
bucketNames := make([]string, 0, len(configuredBuckets))
for _, bucket := range configuredBuckets {
bucket = strings.TrimSpace(bucket)
if bucket == "" {
continue
}
bucketNames = append(bucketNames, bucket)
}
if len(bucketNames) == 0 {
buckets, err := client.ListBuckets(ctx)
if err != nil {
return api.B2UsageResponse{}, fmt.Errorf("list B2 buckets: %w", err)
}
for _, bucket := range buckets {
bucketNames = append(bucketNames, bucket.Name)
}
}
if len(bucketNames) == 0 {
return api.B2UsageResponse{}, errors.New("no B2 buckets available for scan")
}
sort.Strings(bucketNames)
cutoff := time.Now().UTC().Add(-24 * time.Hour)
result := api.B2UsageResponse{
Enabled: true,
Available: true,
Buckets: make([]api.B2BucketUsage, 0, len(bucketNames)),
}
for _, bucketName := range bucketNames {
bucketUsage := api.B2BucketUsage{Name: bucketName}
lastModified := time.Time{}
objects := client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{Recursive: true})
for object := range objects {
if object.Err != nil {
return api.B2UsageResponse{}, fmt.Errorf("scan B2 bucket %s: %w", bucketName, object.Err)
}
bucketUsage.ObjectCount++
bucketUsage.TotalBytes += object.Size
modified := object.LastModified.UTC()
if !modified.IsZero() {
if modified.After(cutoff) {
bucketUsage.RecentObjects24h++
bucketUsage.RecentBytes24h += object.Size
}
if modified.After(lastModified) {
lastModified = modified
}
}
}
if !lastModified.IsZero() {
bucketUsage.LastModifiedAt = lastModified.Format(time.RFC3339)
}
result.Buckets = append(result.Buckets, bucketUsage)
result.TotalObjects += bucketUsage.ObjectCount
result.TotalBytes += bucketUsage.TotalBytes
result.RecentObjects24h += bucketUsage.RecentObjects24h
result.RecentBytes24h += bucketUsage.RecentBytes24h
}
return result, nil
}
func normalizeS3Endpoint(raw string) (string, bool, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", false, errors.New("S3 endpoint is empty")
}
if strings.Contains(raw, "://") {
parsed, err := url.Parse(raw)
if err != nil {
return "", false, fmt.Errorf("parse S3 endpoint: %w", err)
}
if parsed.Host == "" {
return "", false, errors.New("S3 endpoint host is empty")
}
return parsed.Host, !strings.EqualFold(parsed.Scheme, "http"), nil
}
return strings.TrimRight(raw, "/"), true, nil
}
func inferB2Region(endpoint, fallback string) string {
fallback = strings.TrimSpace(fallback)
host, _, err := normalizeS3Endpoint(endpoint)
if err != nil {
return fallback
}
if strings.Contains(host, ":") {
host = strings.Split(host, ":")[0]
}
if strings.HasPrefix(host, "s3.") && strings.HasSuffix(host, ".backblazeb2.com") {
region := strings.TrimPrefix(host, "s3.")
region = strings.TrimSuffix(region, ".backblazeb2.com")
if region != "" {
return region
}
}
return fallback
}