package service import ( "context" "fmt" "log" "math" "net/http" "os" "os/exec" "strconv" "strings" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/metrics" "scm.bstein.dev/bstein/ananke/internal/sshutil" "scm.bstein.dev/bstein/ananke/internal/state" "scm.bstein.dev/bstein/ananke/internal/ups" ) type Target struct { Name string Target string Provider ups.Provider } type Daemon struct { cfg config.Config orch *cluster.Orchestrator targets []Target log *log.Logger exporter *metrics.Exporter } var sshConfigCandidates = []string{ "/home/atlas/.ssh/config", "/home/tethys/.ssh/config", } var sshIdentityCandidates = []string{ "/home/atlas/.ssh/id_ed25519", "/home/tethys/.ssh/id_ed25519", } // NewDaemon runs one orchestration or CLI step. // Signature: NewDaemon(cfg config.Config, orch *cluster.Orchestrator, targets []Target, logger *log.Logger) *Daemon. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func NewDaemon(cfg config.Config, orch *cluster.Orchestrator, targets []Target, logger *log.Logger) *Daemon { return &Daemon{ cfg: cfg, orch: orch, targets: targets, log: logger, exporter: metrics.New(), } } // Run runs one orchestration or CLI step. // Signature: (d *Daemon) Run(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) Run(ctx context.Context) error { if !d.cfg.UPS.Enabled { return fmt.Errorf("ups monitoring is disabled in config") } if len(d.targets) == 0 { return fmt.Errorf("no UPS targets configured") } poll := time.Duration(d.cfg.UPS.PollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } telemetryTimeout := time.Duration(d.cfg.UPS.TelemetryTimeoutSeconds) * time.Second if telemetryTimeout <= 0 { telemetryTimeout = 90 * time.Second } debounce := d.cfg.UPS.DebounceCount if debounce <= 0 { debounce = 3 } if d.cfg.Metrics.Enabled { if err := d.startMetricsServer(); err != nil { return err } } t := time.NewTicker(poll) defer t.Stop() lastGood := map[string]time.Time{} lastOnBattery := map[string]bool{} breachCount := map[string]int{} for _, t := range d.targets { lastGood[t.Name] = time.Now() } d.log.Printf("ananke daemon started: poll=%s debounce=%d telemetry_timeout=%s targets=%s", poll, debounce, telemetryTimeout, d.targetList()) for { select { case <-ctx.Done(): return ctx.Err() case <-t.C: budget := d.orch.EstimatedEmergencyShutdownSeconds() threshold := int(math.Ceil(float64(budget) * d.cfg.UPS.RuntimeSafetyFactor)) d.exporter.UpdateBudget(budget) for _, target := range d.targets { sample, err := target.Provider.Read(ctx) if err != nil { d.log.Printf("warning: ups read failed target=%s (%s): %v", target.Name, target.Target, err) d.exporter.UpdateSample(metrics.Sample{ Name: target.Name, Target: target.Target, ThresholdSec: threshold, BreachCount: breachCount[target.Name], LastError: err.Error(), UpdatedAt: time.Now().UTC(), }) if lastOnBattery[target.Name] && time.Since(lastGood[target.Name]) > telemetryTimeout { d.log.Printf("ups telemetry timeout while on battery (target=%s), triggering shutdown", target.Name) reason := fmt.Sprintf("ups-telemetry-timeout target=%s", target.Name) return d.triggerShutdown(ctx, reason) } continue } lastGood[target.Name] = time.Now() lastOnBattery[target.Name] = sample.OnBattery trigger := sample.LowBattery || (sample.OnBattery && sample.RuntimeSeconds > 0 && sample.RuntimeSeconds <= threshold) if trigger { breachCount[target.Name]++ } else { breachCount[target.Name] = 0 } d.log.Printf("ups target=%s status=%s on_battery=%t runtime_s=%d threshold_s=%d budget_s=%d trigger=%t breach=%d", target.Name, sample.RawStatus, sample.OnBattery, sample.RuntimeSeconds, threshold, budget, trigger, breachCount[target.Name]) d.exporter.UpdateSample(metrics.Sample{ Name: target.Name, Target: target.Target, OnBattery: sample.OnBattery, LowBattery: sample.LowBattery, RuntimeSecond: sample.RuntimeSeconds, BatteryCharge: sample.BatteryCharge, LoadPercent: sample.LoadPercent, PowerNominalW: sample.NominalPowerW, ThresholdSec: threshold, Trigger: trigger, BreachCount: breachCount[target.Name], Status: sample.RawStatus, UpdatedAt: time.Now().UTC(), }) if breachCount[target.Name] >= debounce { reason := fmt.Sprintf("ups-threshold target=%s runtime=%ds threshold=%ds status=%s", target.Name, sample.RuntimeSeconds, threshold, sample.RawStatus) return d.triggerShutdown(ctx, reason) } } } } } // triggerShutdown runs one orchestration or CLI step. // Signature: (d *Daemon) triggerShutdown(ctx context.Context, reason string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) triggerShutdown(ctx context.Context, reason string) error { intent, err := state.ReadIntent(d.cfg.State.IntentPath) if err == nil && intent.State == state.IntentShuttingDown { d.log.Printf("shutdown already in progress; skipping duplicate trigger: %s", reason) return nil } if err := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShuttingDown, reason, "daemon"); err != nil { d.log.Printf("warning: unable to persist shutdown intent before trigger: %v", err) } d.log.Printf("triggering shutdown: %s", reason) d.exporter.MarkShutdown(reason) if d.cfg.Coordination.ForwardShutdownHost != "" { if err := d.forwardShutdown(ctx, reason); err == nil { if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-forwarded"); setErr != nil { d.log.Printf("warning: unable to persist forwarded shutdown completion intent: %v", setErr) } d.log.Printf("shutdown trigger forwarded to %s", d.cfg.Coordination.ForwardShutdownHost) return nil } else if !d.cfg.Coordination.FallbackLocalShutdown { return fmt.Errorf("forward shutdown failed and local fallback disabled: %w", err) } else { d.log.Printf("warning: forward shutdown failed; falling back to local shutdown: %v", err) } } if err := d.orch.Shutdown(ctx, cluster.ShutdownOptions{ Reason: reason, SkipDrain: d.cfg.Shutdown.EmergencySkipDrain, SkipEtcdSnapshot: d.cfg.Shutdown.EmergencySkipEtcd, }); err != nil { return err } if setErr := state.MustWriteIntent(d.cfg.State.IntentPath, state.IntentShutdownComplete, reason, "daemon-local"); setErr != nil { d.log.Printf("warning: unable to persist local shutdown completion intent: %v", setErr) } return nil } // forwardShutdown runs one orchestration or CLI step. // Signature: (d *Daemon) forwardShutdown(ctx context.Context, reason string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) forwardShutdown(ctx context.Context, reason string) error { timeout := time.Duration(d.cfg.Coordination.CommandTimeoutSeconds) * time.Second if timeout <= 0 { timeout = 25 * time.Second } runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() remoteCmd := fmt.Sprintf("sudo /usr/local/bin/ananke shutdown --config %q --execute --mode cluster-only --reason %q", d.cfg.Coordination.ForwardShutdownConfig, reason) if d.cfg.Shutdown.EmergencySkipEtcd { remoteCmd += " --skip-etcd-snapshot" } if d.cfg.Shutdown.EmergencySkipDrain { remoteCmd += " --skip-drain" } host := d.cfg.Coordination.ForwardShutdownHost if mapped, ok := d.cfg.SSHNodeHosts[host]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) } user := d.cfg.Coordination.ForwardShutdownUser if user == "" { if override, ok := d.cfg.SSHNodeUsers[d.cfg.Coordination.ForwardShutdownHost]; ok && strings.TrimSpace(override) != "" { user = strings.TrimSpace(override) } else { user = d.cfg.SSHUser } } target := host if user != "" { target = user + "@" + host } args := []string{ "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", "-o", "StrictHostKeyChecking=accept-new", } if cfgFile := d.resolveSSHConfigFile(); cfgFile != "" { args = append(args, "-F", cfgFile) } if idFile := d.resolveSSHIdentityFile(); idFile != "" { args = append(args, "-i", idFile) } if d.cfg.SSHPort > 0 { args = append(args, "-p", strconv.Itoa(d.cfg.SSHPort)) } if d.cfg.SSHJumpHost != "" { jump := d.cfg.SSHJumpHost if d.cfg.SSHJumpUser != "" { jump = d.cfg.SSHJumpUser + "@" + jump } if d.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { jump = fmt.Sprintf("%s:%d", jump, d.cfg.SSHPort) } args = append(args, "-J", jump) } args = append(args, target, remoteCmd) try := func() (string, error) { cmd := exec.CommandContext(runCtx, "ssh", args...) out, err := cmd.CombinedOutput() trimmed := strings.TrimSpace(string(out)) if err != nil { if trimmed == "" { return "", fmt.Errorf("forward shutdown via ssh failed: %w", err) } return trimmed, fmt.Errorf("forward shutdown via ssh failed: %w: %s", err, trimmed) } return trimmed, nil } out, err := try() if err != nil && sshutil.ShouldAttemptKnownHostsRepair(out, err) { repairHosts := []string{d.cfg.Coordination.ForwardShutdownHost, host} if d.cfg.SSHJumpHost != "" { repairHosts = append(repairHosts, d.cfg.SSHJumpHost) } sshutil.RepairKnownHosts(runCtx, d.log, sshutil.KnownHostsFiles(d.resolveSSHConfigFile(), d.resolveSSHIdentityFile()), repairHosts, d.cfg.SSHPort) if _, err2 := try(); err2 == nil { return nil } else { return err2 } } if err != nil { return err } return nil } // resolveSSHConfigFile runs one orchestration or CLI step. // Signature: (d *Daemon) resolveSSHConfigFile() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) resolveSSHConfigFile() string { if strings.TrimSpace(d.cfg.SSHConfigFile) != "" { return strings.TrimSpace(d.cfg.SSHConfigFile) } for _, p := range sshConfigCandidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } // resolveSSHIdentityFile runs one orchestration or CLI step. // Signature: (d *Daemon) resolveSSHIdentityFile() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) resolveSSHIdentityFile() string { if strings.TrimSpace(d.cfg.SSHIdentityFile) != "" { return strings.TrimSpace(d.cfg.SSHIdentityFile) } for _, p := range sshIdentityCandidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } // targetList runs one orchestration or CLI step. // Signature: (d *Daemon) targetList() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) targetList() string { names := make([]string, 0, len(d.targets)) for _, t := range d.targets { names = append(names, t.Name+"="+t.Target) } return strings.Join(names, ",") } // startMetricsServer runs one orchestration or CLI step. // Signature: (d *Daemon) startMetricsServer() error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (d *Daemon) startMetricsServer() error { if d.cfg.Metrics.BindAddr == "" { return fmt.Errorf("metrics.bind_addr must not be empty when metrics are enabled") } handler := d.exporter.Handler(d.cfg.Metrics.Path) srv := &http.Server{ Addr: d.cfg.Metrics.BindAddr, Handler: handler, ReadHeaderTimeout: 5 * time.Second, } go func() { d.log.Printf("metrics server listening on %s%s", d.cfg.Metrics.BindAddr, d.cfg.Metrics.Path) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { d.log.Printf("warning: metrics server failed: %v", err) } }() return nil }