ananke/internal/service/daemon.go

341 lines
10 KiB
Go

package service
import (
"context"
"fmt"
"log"
"math"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
"time"
"scm.bstein.dev/bstein/hecate/internal/cluster"
"scm.bstein.dev/bstein/hecate/internal/config"
"scm.bstein.dev/bstein/hecate/internal/metrics"
"scm.bstein.dev/bstein/hecate/internal/sshutil"
"scm.bstein.dev/bstein/hecate/internal/state"
"scm.bstein.dev/bstein/hecate/internal/ups"
)
type Target struct {
Name string
Target string
Provider ups.Provider
}
type Daemon struct {
cfg config.Config
orch *cluster.Orchestrator
targets []Target
log *log.Logger
exporter *metrics.Exporter
}
func NewDaemon(cfg config.Config, orch *cluster.Orchestrator, targets []Target, logger *log.Logger) *Daemon {
return &Daemon{
cfg: cfg,
orch: orch,
targets: targets,
log: logger,
exporter: metrics.New(),
}
}
func (d *Daemon) Run(ctx context.Context) error {
if !d.cfg.UPS.Enabled {
return fmt.Errorf("ups monitoring is disabled in config")
}
if len(d.targets) == 0 {
return fmt.Errorf("no UPS targets configured")
}
poll := time.Duration(d.cfg.UPS.PollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
telemetryTimeout := time.Duration(d.cfg.UPS.TelemetryTimeoutSeconds) * time.Second
if telemetryTimeout <= 0 {
telemetryTimeout = 90 * time.Second
}
debounce := d.cfg.UPS.DebounceCount
if debounce <= 0 {
debounce = 3
}
if d.cfg.Metrics.Enabled {
if err := d.startMetricsServer(); err != nil {
return err
}
}
t := time.NewTicker(poll)
defer t.Stop()
lastGood := map[string]time.Time{}
lastOnBattery := map[string]bool{}
breachCount := map[string]int{}
for _, t := range d.targets {
lastGood[t.Name] = time.Now()
}
d.log.Printf("hecate daemon started: poll=%s debounce=%d telemetry_timeout=%s targets=%s",
poll, debounce, telemetryTimeout, d.targetList())
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
budget := d.orch.EstimatedEmergencyShutdownSeconds()
threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor))
d.exporter.UpdateBudget(budget)
for _, target := range d.targets {
sample, err := target.Provider.Read(ctx)
if err != nil {
d.log.Printf("warning: ups read failed target=%s (%s): %v", target.Name, target.Target, err)
d.exporter.UpdateSample(metrics.Sample{
Name: target.Name,
Target: target.Target,
ThresholdSec: threshold,
BreachCount: breachCount[target.Name],
LastError: err.Error(),
UpdatedAt: time.Now().UTC(),
})
if lastOnBattery[target.Name] && time.Since(lastGood[target.Name]) > telemetryTimeout {
d.log.Printf("ups telemetry timeout while on battery (target=%s), triggering shutdown", target.Name)
reason := fmt.Sprintf("ups-telemetry-timeout target=%s", target.Name)
return d.triggerShutdown(ctx, reason)
}
continue
}
lastGood[target.Name] = time.Now()
lastOnBattery[target.Name] = sample.OnBattery
trigger := sample.LowBattery || (sample.OnBattery && sample.RuntimeSeconds > 0 && sample.RuntimeSeconds <= threshold)
if trigger {
breachCount[target.Name]++
} else {
breachCount[target.Name] = 0
}
d.log.Printf("ups target=%s status=%s on_battery=%t runtime_s=%d threshold_s=%d budget_s=%d trigger=%t breach=%d",
target.Name, sample.RawStatus, sample.OnBattery, sample.RuntimeSeconds, threshold, budget, trigger, breachCount[target.Name])
d.exporter.UpdateSample(metrics.Sample{
Name: target.Name,
Target: target.Target,
OnBattery: sample.OnBattery,
LowBattery: sample.LowBattery,
RuntimeSecond: sample.RuntimeSeconds,
BatteryCharge: sample.BatteryCharge,
LoadPercent: sample.LoadPercent,
PowerNominalW: sample.NominalPowerW,
ThresholdSec: threshold,
Trigger: trigger,
BreachCount: breachCount[target.Name],
Status: sample.RawStatus,
UpdatedAt: time.Now().UTC(),
})
if breachCount[target.Name] >= debounce {
reason := fmt.Sprintf("ups-threshold target=%s runtime=%ds threshold=%ds status=%s", target.Name, sample.RuntimeSeconds, threshold, sample.RawStatus)
return d.triggerShutdown(ctx, reason)
}
}
}
}
}
func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error {
intent, err := state.ReadIntent(d.cfg.State.IntentPath)
if err == nil && intent.State == state.IntentShuttingDown {
d.log.Printf("shutdown already in progress; skipping duplicate trigger: %s", reason)
return nil
}
if err := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShuttingDown, reason, "daemon"); err != nil {
d.log.Printf("warning: unable to persist shutdown intent before trigger: %v", err)
}
d.log.Printf("triggering shutdown: %s", reason)
d.exporter.MarkShutdown(reason)
if d.cfg.Coordination.ForwardShutdownHost != "" {
if err := d.forwardShutdown(ctx, reason); err == nil {
if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-forwarded"); setErr != nil {
d.log.Printf("warning: unable to persist forwarded shutdown completion intent: %v", setErr)
}
d.log.Printf("shutdown trigger forwarded to %s", d.cfg.Coordination.ForwardShutdownHost)
return nil
} else if !d.cfg.Coordination.FallbackLocalShutdown {
return fmt.Errorf("forward shutdown failed and local fallback disabled: %w", err)
} else {
d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err)
}
}
if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{
Reason: reason,
SkipDrain: d.cfg.Shutdown.EmergencySkipDrain,
SkipEtcdSnapshot: d.cfg.Shutdown.EmergencySkipEtcd,
}); err != nil {
return err
}
if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil {
d.log.Printf("warning: unable to persist local shutdown completion intent: %v", setErr)
}
return nil
}
func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error {
timeout := time.Duration(d.cfg.Coordination.CommandTimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 25 * time.Second
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
remoteCmd := fmt.Sprintf("sudo /usr/local/bin/hecate shutdown --config %q --execute --reason %q", d.cfg.Coordination.ForwardShutdownConfig, reason)
if d.cfg.Shutdown.EmergencySkipEtcd {
remoteCmd += " --skip-etcd-snapshot"
}
if d.cfg.Shutdown.EmergencySkipDrain {
remoteCmd += " --skip-drain"
}
host := d.cfg.Coordination.ForwardShutdownHost
if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
user := d.cfg.Coordination.ForwardShutdownUser
if user == "" {
if override, ok := d.cfg.SSHNodeUsers[d.cfg.Coordination.ForwardShutdownHost]; ok && strings.TrimSpace(override) != "" {
user = strings.TrimSpace(override)
} else {
user = d.cfg.SSHUser
}
}
target := host
if user != "" {
target = user + "@" + host
}
args := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if cfgFile := d.resolveSSHConfigFile(); cfgFile != "" {
args = append(args, "-F", cfgFile)
}
if idFile := d.resolveSSHIdentityFile(); idFile != "" {
args = append(args, "-i", idFile)
}
if d.cfg.SSHPort > 0 {
args = append(args, "-p", strconv.Itoa(d.cfg.SSHPort))
}
if d.cfg.SSHJumpHost != "" {
jump := d.cfg.SSHJumpHost
if d.cfg.SSHJumpUser != "" {
jump = d.cfg.SSHJumpUser + "@" + jump
}
if d.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, d.cfg.SSHPort)
}
args = append(args, "-J", jump)
}
args = append(args, target, remoteCmd)
try := func() (string, error) {
cmd := exec.CommandContext(runCtx, "ssh", args...)
out, err := cmd.CombinedOutput()
trimmed := strings.TrimSpace(string(out))
if err != nil {
if trimmed == "" {
return "", fmt.Errorf("forward shutdown via ssh failed: %w", err)
}
return trimmed, fmt.Errorf("forward shutdown via ssh failed: %w: %s", err, trimmed)
}
return trimmed, nil
}
out, err := try()
if err != nil && sshutil.ShouldAttemptKnownHostsRepair(out, err) {
repairHosts := []string{d.cfg.Coordination.ForwardShutdownHost, host}
if d.cfg.SSHJumpHost != "" {
repairHosts = append(repairHosts, d.cfg.SSHJumpHost)
}
sshutil.RepairKnownHosts(runCtx, d.log, sshutil.KnownHostsFiles(d.resolveSSHConfigFile(), d.resolveSSHIdentityFile()), repairHosts, d.cfg.SSHPort)
if _, err2 := try(); err2 == nil {
return nil
} else {
return err2
}
}
if err != nil {
return err
}
return nil
}
func (d *Daemon) resolveSSHConfigFile() string {
if strings.TrimSpace(d.cfg.SSHConfigFile) != "" {
return strings.TrimSpace(d.cfg.SSHConfigFile)
}
candidates := []string{
"/home/atlas/.ssh/config",
"/home/tethys/.ssh/config",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func (d *Daemon) resolveSSHIdentityFile() string {
if strings.TrimSpace(d.cfg.SSHIdentityFile) != "" {
return strings.TrimSpace(d.cfg.SSHIdentityFile)
}
candidates := []string{
"/home/atlas/.ssh/id_ed25519",
"/home/tethys/.ssh/id_ed25519",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func (d *Daemon) targetList() string {
names := make([]string, 0, len(d.targets))
for _, t := range d.targets {
names = append(names, t.Name+"="+t.Target)
}
return strings.Join(names, ",")
}
func (d *Daemon) startMetricsServer() error {
if d.cfg.Metrics.BindAddr == "" {
return fmt.Errorf("metrics.bind_addr must not be empty when metrics are enabled")
}
handler := d.exporter.Handler(d.cfg.Metrics.Path)
srv := &http.Server{
Addr: d.cfg.Metrics.BindAddr,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
d.log.Printf("metrics server listening on %s%s", d.cfg.Metrics.BindAddr, d.cfg.Metrics.Path)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
d.log.Printf("warning: metrics server failed: %v", err)
}
}()
return nil
}