630 lines
21 KiB
Go
630 lines
21 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/hecate/internal/cluster"
|
|
"scm.bstein.dev/bstein/hecate/internal/config"
|
|
"scm.bstein.dev/bstein/hecate/internal/execx"
|
|
"scm.bstein.dev/bstein/hecate/internal/service"
|
|
"scm.bstein.dev/bstein/hecate/internal/sshutil"
|
|
"scm.bstein.dev/bstein/hecate/internal/state"
|
|
"scm.bstein.dev/bstein/hecate/internal/ups"
|
|
)
|
|
|
|
func main() {
|
|
logger := log.New(os.Stdout, "[hecate] ", log.LstdFlags)
|
|
if len(os.Args) < 2 {
|
|
usage()
|
|
os.Exit(2)
|
|
}
|
|
|
|
sub := os.Args[1]
|
|
switch sub {
|
|
case "startup":
|
|
if err := runStartup(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("startup failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "shutdown":
|
|
if err := runShutdown(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("shutdown failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "etcd-restore":
|
|
if err := runEtcdRestore(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("etcd-restore failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "daemon":
|
|
if err := runDaemon(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("daemon failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "status":
|
|
if err := runStatus(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("status failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "intent":
|
|
if err := runIntent(logger, os.Args[2:]); err != nil {
|
|
logger.Printf("intent failed: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
case "help", "-h", "--help":
|
|
usage()
|
|
default:
|
|
logger.Printf("unknown command: %s", sub)
|
|
usage()
|
|
os.Exit(2)
|
|
}
|
|
}
|
|
|
|
func runStartup(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("startup", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
|
|
forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume")
|
|
skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies")
|
|
allowPeerStartup := fs.Bool("allow-peer-startup", false, "Allow startup to run on a peer instance")
|
|
autoPeerFailover := fs.Bool("auto-peer-failover", false, "On peer role, try coordinator bootstrap handoff first and only run local startup as fallback")
|
|
peerWaitSeconds := fs.Int("peer-wait-seconds", 180, "How long auto peer failover waits for coordinator handoff before local fallback startup")
|
|
allowOnBattery := fs.Bool("allow-on-battery", false, "Allow startup when UPS reports on-battery")
|
|
reason := fs.String("reason", "manual-startup", "Startup reason for run history")
|
|
_ = fs.Parse(args)
|
|
|
|
cfg, orch, err := buildOrchestrator(logger, *configPath, !*execute)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
allowPeer := *allowPeerStartup
|
|
if *execute {
|
|
if cfg.Coordination.Role == "peer" && !allowPeer {
|
|
if *autoPeerFailover {
|
|
handoffCtx, cancel := context.WithTimeout(context.Background(), time.Duration(maxInt(*peerWaitSeconds, 1))*time.Second)
|
|
defer cancel()
|
|
handoff, handoffErr := tryPeerBootstrapHandoff(handoffCtx, cfg, logger)
|
|
if handoffErr != nil {
|
|
logger.Printf("warning: peer bootstrap handoff failed: %v", handoffErr)
|
|
}
|
|
if handoff {
|
|
logger.Printf("peer startup handoff complete; skipping local startup")
|
|
return nil
|
|
}
|
|
guardCtx, guardCancel := context.WithTimeout(context.Background(), time.Duration(maxInt(cfg.Coordination.CommandTimeoutSeconds, 15))*time.Second)
|
|
defer guardCancel()
|
|
allowed, guardReason, guardErr := coordinatorAllowsPeerFallbackStartup(guardCtx, cfg, logger)
|
|
if guardErr != nil {
|
|
return fmt.Errorf("startup blocked: unable to evaluate coordinator startup guard: %w", guardErr)
|
|
}
|
|
if !allowed {
|
|
return fmt.Errorf("startup blocked: coordinator guard disallowed peer fallback (%s)", guardReason)
|
|
}
|
|
logger.Printf("peer startup handoff unavailable; proceeding with local peer startup fallback")
|
|
allowPeer = true
|
|
} else {
|
|
return fmt.Errorf("startup blocked: this instance is configured as role=peer (use --allow-peer-startup to override)")
|
|
}
|
|
}
|
|
if cfg.UPS.Enabled && !cfg.Coordination.AllowStartupOnBattery && !*allowOnBattery {
|
|
targets, targetErr := buildUPSTargets(cfg)
|
|
if targetErr != nil {
|
|
return targetErr
|
|
}
|
|
checkCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
if err := ensureStartupPowerSafe(checkCtx, targets); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
return orch.Startup(ctx, cluster.StartupOptions{
|
|
ForceFluxBranch: *forceBranch,
|
|
SkipLocalBootstrap: *skipLocalBootstrap,
|
|
Reason: *reason,
|
|
})
|
|
}
|
|
|
|
func runShutdown(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("shutdown", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
|
|
skipEtcd := fs.Bool("skip-etcd-snapshot", false, "Skip etcd snapshot")
|
|
skipDrain := fs.Bool("skip-drain", false, "Skip worker drain")
|
|
reason := fs.String("reason", "manual-shutdown", "Shutdown reason for run history")
|
|
_ = fs.Parse(args)
|
|
|
|
_, orch, err := buildOrchestrator(logger, *configPath, !*execute)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
return orch.Shutdown(ctx, cluster.ShutdownOptions{
|
|
SkipEtcdSnapshot: *skipEtcd,
|
|
SkipDrain: *skipDrain,
|
|
Reason: *reason,
|
|
})
|
|
}
|
|
|
|
func runDaemon(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("daemon", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
dryRunActions := fs.Bool("dry-run-actions", false, "Log planned actions without executing")
|
|
_ = fs.Parse(args)
|
|
|
|
cfg, orch, err := buildOrchestrator(logger, *configPath, *dryRunActions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !cfg.UPS.Enabled {
|
|
return fmt.Errorf("UPS monitoring is disabled in config")
|
|
}
|
|
targets, err := buildUPSTargets(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
d := service.NewDaemon(cfg, orch, targets, logger)
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer stop()
|
|
if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runEtcdRestore(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("etcd-restore", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
execute := fs.Bool("execute", false, "Actually execute restore (default dry-run)")
|
|
controlPlane := fs.String("control-plane", "", "Control plane to run restore on (defaults to startup.etcd_restore_control_plane)")
|
|
snapshotPath := fs.String("snapshot", "", "Explicit snapshot path (defaults to latest on selected control plane)")
|
|
_ = fs.Parse(args)
|
|
|
|
_, orch, err := buildOrchestrator(logger, *configPath, !*execute)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
return orch.EtcdRestore(ctx, cluster.EtcdRestoreOptions{
|
|
ControlPlane: *controlPlane,
|
|
SnapshotPath: *snapshotPath,
|
|
})
|
|
}
|
|
|
|
func runStatus(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("status", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
_ = fs.Parse(args)
|
|
|
|
cfg, orch, err := buildOrchestrator(logger, *configPath, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
recs, err := state.New(cfg.State.RunHistoryPath).Load()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
last := "none"
|
|
if len(recs) > 0 {
|
|
r := recs[len(recs)-1]
|
|
last = fmt.Sprintf("%s success=%t duration=%ds at=%s", r.Action, r.Success, r.DurationSeconds, r.EndedAt.Format(time.RFC3339))
|
|
}
|
|
logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch)
|
|
logger.Printf("control_planes=%v", cfg.ControlPlanes)
|
|
logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds())
|
|
intent, intentErr := state.ReadIntent(cfg.State.IntentPath)
|
|
if intentErr != nil {
|
|
logger.Printf("intent_read_error=%v", intentErr)
|
|
} else if intent.State == "" {
|
|
logger.Printf("intent=none")
|
|
} else {
|
|
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
|
|
intent.State, intent.Reason, intent.Source, intent.UpdatedAt.Format(time.RFC3339))
|
|
}
|
|
logger.Printf("last_run=%s", last)
|
|
return nil
|
|
}
|
|
|
|
func runIntent(logger *log.Logger, args []string) error {
|
|
fs := flag.NewFlagSet("intent", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
|
|
setState := fs.String("set", "", "Set intent state (normal|startup_in_progress|shutting_down|shutdown_complete)")
|
|
reason := fs.String("reason", "manual-intent", "Intent reason (used with --set)")
|
|
source := fs.String("source", "manual", "Intent source (used with --set)")
|
|
execute := fs.Bool("execute", false, "Actually write intent state (default read-only)")
|
|
_ = fs.Parse(args)
|
|
|
|
cfg, err := config.Load(*configPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if strings.TrimSpace(*setState) == "" {
|
|
in, readErr := state.ReadIntent(cfg.State.IntentPath)
|
|
if readErr != nil {
|
|
return readErr
|
|
}
|
|
if in.State == "" {
|
|
logger.Printf("intent=none")
|
|
return nil
|
|
}
|
|
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
|
|
in.State, in.Reason, in.Source, in.UpdatedAt.Format(time.RFC3339))
|
|
return nil
|
|
}
|
|
if !*execute {
|
|
return fmt.Errorf("refusing to write intent without --execute")
|
|
}
|
|
stateValue := strings.TrimSpace(*setState)
|
|
if err := state.MustWriteIntent(cfg.State.IntentPath, stateValue, *reason, *source); err != nil {
|
|
return err
|
|
}
|
|
logger.Printf("intent updated: state=%s reason=%q source=%s", stateValue, *reason, *source)
|
|
return nil
|
|
}
|
|
|
|
func buildUPSTargets(cfg config.Config) ([]service.Target, error) {
|
|
targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1)
|
|
switch cfg.UPS.Provider {
|
|
case "nut":
|
|
if len(cfg.UPS.Targets) == 0 {
|
|
target := cfg.UPS.Target
|
|
if target == "" {
|
|
return nil, fmt.Errorf("ups target must be set")
|
|
}
|
|
targets = append(targets, service.Target{
|
|
Name: "primary",
|
|
Target: target,
|
|
Provider: ups.NewNUTProvider(target),
|
|
})
|
|
} else {
|
|
for idx, t := range cfg.UPS.Targets {
|
|
name := t.Name
|
|
if name == "" {
|
|
name = fmt.Sprintf("target-%d", idx+1)
|
|
}
|
|
targets = append(targets, service.Target{
|
|
Name: name,
|
|
Target: t.Target,
|
|
Provider: ups.NewNUTProvider(t.Target),
|
|
})
|
|
}
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider)
|
|
}
|
|
return targets, nil
|
|
}
|
|
|
|
func ensureStartupPowerSafe(ctx context.Context, targets []service.Target) error {
|
|
type targetState struct {
|
|
seenGood bool
|
|
lastErr error
|
|
}
|
|
states := make(map[string]*targetState, len(targets))
|
|
for _, t := range targets {
|
|
key := t.Name + "|" + t.Target
|
|
states[key] = &targetState{}
|
|
}
|
|
const pollInterval = 3 * time.Second
|
|
for {
|
|
onBatteryTargets := []string{}
|
|
allSeen := true
|
|
for _, t := range targets {
|
|
key := t.Name + "|" + t.Target
|
|
st := states[key]
|
|
sample, err := t.Provider.Read(ctx)
|
|
if err != nil {
|
|
st.lastErr = err
|
|
if !st.seenGood {
|
|
allSeen = false
|
|
}
|
|
continue
|
|
}
|
|
st.seenGood = true
|
|
st.lastErr = nil
|
|
if sample.OnBattery {
|
|
onBatteryTargets = append(onBatteryTargets, fmt.Sprintf("%s(status=%s runtime_s=%d)", t.Name, sample.RawStatus, sample.RuntimeSeconds))
|
|
}
|
|
}
|
|
if len(onBatteryTargets) > 0 {
|
|
return fmt.Errorf("startup blocked: UPS is on battery for %s", strings.Join(onBatteryTargets, ", "))
|
|
}
|
|
if allSeen {
|
|
return nil
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
unverified := make([]string, 0, len(targets))
|
|
for _, t := range targets {
|
|
key := t.Name + "|" + t.Target
|
|
st := states[key]
|
|
if st.seenGood {
|
|
continue
|
|
}
|
|
if st.lastErr != nil {
|
|
unverified = append(unverified, fmt.Sprintf("%s(%s): %v", t.Name, t.Target, st.lastErr))
|
|
} else {
|
|
unverified = append(unverified, fmt.Sprintf("%s(%s): no telemetry sample yet", t.Name, t.Target))
|
|
}
|
|
}
|
|
return fmt.Errorf("startup blocked: unable to verify UPS telemetry before timeout: %s", strings.Join(unverified, " | "))
|
|
case <-time.After(pollInterval):
|
|
}
|
|
}
|
|
}
|
|
|
|
func buildOrchestrator(logger *log.Logger, cfgPath string, dryRun bool) (config.Config, *cluster.Orchestrator, error) {
|
|
cfg, err := config.Load(cfgPath)
|
|
if err != nil {
|
|
return config.Config{}, nil, err
|
|
}
|
|
if err := state.EnsureDir(cfg.State.Dir); err != nil {
|
|
return config.Config{}, nil, err
|
|
}
|
|
runner := &execx.Runner{
|
|
DryRun: dryRun,
|
|
Kubeconfig: cfg.Kubeconfig,
|
|
Logger: logger,
|
|
}
|
|
store := state.New(cfg.State.RunHistoryPath)
|
|
orch := cluster.New(cfg, runner, store, logger)
|
|
return cfg, orch, nil
|
|
}
|
|
|
|
func usage() {
|
|
fmt.Print(`hecate: staged startup/shutdown + UPS-triggered protection
|
|
|
|
Usage:
|
|
hecate <command> [flags]
|
|
|
|
Commands:
|
|
startup Perform staged cluster startup
|
|
shutdown Perform graceful cluster shutdown
|
|
etcd-restore Restore etcd from snapshot on a control plane
|
|
daemon Monitor UPS and auto-trigger shutdown
|
|
status Print current hecate status and estimates
|
|
intent Read or manually set intent state
|
|
|
|
Examples:
|
|
hecate startup --config /etc/hecate/hecate.yaml --execute --force-flux-branch main
|
|
hecate shutdown --config /etc/hecate/hecate.yaml --execute --reason "manual-maintenance"
|
|
hecate etcd-restore --config /etc/hecate/hecate.yaml --execute
|
|
hecate daemon --config /etc/hecate/hecate.yaml
|
|
hecate status --config /etc/hecate/hecate.yaml
|
|
hecate intent --config /etc/hecate/hecate.yaml --set normal --reason "manual-clear" --execute
|
|
`)
|
|
}
|
|
|
|
func tryPeerBootstrapHandoff(ctx context.Context, cfg config.Config, logger *log.Logger) (bool, error) {
|
|
coordinator := strings.TrimSpace(cfg.Coordination.ForwardShutdownHost)
|
|
if coordinator == "" {
|
|
return false, fmt.Errorf("coordination.forward_shutdown_host is empty for peer role")
|
|
}
|
|
user := strings.TrimSpace(cfg.Coordination.ForwardShutdownUser)
|
|
if user == "" {
|
|
if override, ok := cfg.SSHNodeUsers[coordinator]; ok && strings.TrimSpace(override) != "" {
|
|
user = strings.TrimSpace(override)
|
|
} else {
|
|
user = strings.TrimSpace(cfg.SSHUser)
|
|
}
|
|
}
|
|
|
|
host := coordinator
|
|
if mapped, ok := cfg.SSHNodeHosts[coordinator]; ok && strings.TrimSpace(mapped) != "" {
|
|
host = strings.TrimSpace(mapped)
|
|
}
|
|
target := host
|
|
if user != "" {
|
|
target = user + "@" + host
|
|
}
|
|
|
|
args := buildSSHBaseArgs(cfg)
|
|
|
|
remote := "sudo -n systemctl start hecate-bootstrap.service"
|
|
attempt := 1
|
|
for {
|
|
cmdArgs := append(append([]string{}, args...), target, remote)
|
|
_, err := runSSHWithRecovery(ctx, logger, cfg, cmdArgs, []string{coordinator, host, cfg.SSHJumpHost})
|
|
if err == nil {
|
|
logger.Printf("peer bootstrap handoff succeeded on %s (attempt=%d)", coordinator, attempt)
|
|
return true, nil
|
|
}
|
|
logger.Printf("peer bootstrap handoff attempt %d failed for %s: %v", attempt, coordinator, err)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, fmt.Errorf("coordinator handoff timeout for %s: %w", coordinator, ctx.Err())
|
|
case <-time.After(5 * time.Second):
|
|
attempt++
|
|
}
|
|
}
|
|
}
|
|
|
|
func coordinatorAllowsPeerFallbackStartup(ctx context.Context, cfg config.Config, logger *log.Logger) (bool, string, error) {
|
|
coordinator := strings.TrimSpace(cfg.Coordination.ForwardShutdownHost)
|
|
if coordinator == "" {
|
|
return true, "no coordinator configured", nil
|
|
}
|
|
user := strings.TrimSpace(cfg.Coordination.ForwardShutdownUser)
|
|
if user == "" {
|
|
if override, ok := cfg.SSHNodeUsers[coordinator]; ok && strings.TrimSpace(override) != "" {
|
|
user = strings.TrimSpace(override)
|
|
} else {
|
|
user = strings.TrimSpace(cfg.SSHUser)
|
|
}
|
|
}
|
|
host := coordinator
|
|
if mapped, ok := cfg.SSHNodeHosts[coordinator]; ok && strings.TrimSpace(mapped) != "" {
|
|
host = strings.TrimSpace(mapped)
|
|
}
|
|
target := host
|
|
if user != "" {
|
|
target = user + "@" + host
|
|
}
|
|
remoteCmd := "sudo -n sh -lc 'if systemctl is-active --quiet hecate-bootstrap.service; then echo __HECATE_BOOTSTRAP_ACTIVE__; else echo __HECATE_BOOTSTRAP_IDLE__; fi; if [ -s /var/lib/hecate/intent.json ]; then cat /var/lib/hecate/intent.json; else echo \"{}\"; fi'"
|
|
args := append(buildSSHBaseArgs(cfg), target, remoteCmd)
|
|
out, err := runSSHWithRecovery(ctx, logger, cfg, args, []string{coordinator, host, cfg.SSHJumpHost})
|
|
if err != nil {
|
|
logger.Printf("warning: coordinator guard check unavailable on %s: %v; allowing peer fallback startup", coordinator, err)
|
|
return true, "coordinator unreachable", nil
|
|
}
|
|
trimmed := strings.TrimSpace(out)
|
|
if strings.Contains(trimmed, "__HECATE_BOOTSTRAP_ACTIVE__") {
|
|
return false, "coordinator bootstrap service is active", nil
|
|
}
|
|
start := strings.Index(trimmed, "{")
|
|
end := strings.LastIndex(trimmed, "}")
|
|
if start < 0 || end < start {
|
|
return false, "coordinator intent payload missing", nil
|
|
}
|
|
rawIntent := trimmed[start : end+1]
|
|
var remoteIntent state.Intent
|
|
if err := json.Unmarshal([]byte(rawIntent), &remoteIntent); err != nil {
|
|
return false, "", fmt.Errorf("decode coordinator intent: %w", err)
|
|
}
|
|
if remoteIntent.State == "" || remoteIntent.State == state.IntentNormal {
|
|
return true, "coordinator intent is normal", nil
|
|
}
|
|
guardAge := time.Duration(maxInt(cfg.Coordination.StartupGuardMaxAgeSec, 60)) * time.Second
|
|
intentAge := time.Duration(0)
|
|
if !remoteIntent.UpdatedAt.IsZero() {
|
|
intentAge = time.Since(remoteIntent.UpdatedAt)
|
|
}
|
|
switch remoteIntent.State {
|
|
case state.IntentShuttingDown:
|
|
if remoteIntent.UpdatedAt.IsZero() || intentAge <= guardAge {
|
|
return false, fmt.Sprintf("coordinator intent=%s age=%s reason=%q", remoteIntent.State, intentAge.Round(time.Second), remoteIntent.Reason), nil
|
|
}
|
|
logger.Printf("warning: coordinator shutdown intent appears stale (age=%s > guard=%s); allowing peer fallback startup", intentAge.Round(time.Second), guardAge)
|
|
return true, "coordinator shutdown intent stale", nil
|
|
case state.IntentStartupInProgress:
|
|
if remoteIntent.UpdatedAt.IsZero() || intentAge <= guardAge {
|
|
return false, fmt.Sprintf("coordinator intent=%s age=%s reason=%q", remoteIntent.State, intentAge.Round(time.Second), remoteIntent.Reason), nil
|
|
}
|
|
logger.Printf("warning: coordinator startup intent appears stale (age=%s > guard=%s); allowing peer fallback startup", intentAge.Round(time.Second), guardAge)
|
|
return true, "coordinator startup intent stale", nil
|
|
case state.IntentShutdownComplete:
|
|
if remoteIntent.UpdatedAt.IsZero() {
|
|
return false, "coordinator reported shutdown_complete with unknown age", nil
|
|
}
|
|
if intentAge <= 45*time.Second {
|
|
return false, fmt.Sprintf("coordinator recently completed shutdown (%s ago)", intentAge.Round(time.Second)), nil
|
|
}
|
|
return true, "coordinator shutdown_complete is old enough", nil
|
|
default:
|
|
return false, fmt.Sprintf("coordinator intent state %q is unknown", remoteIntent.State), nil
|
|
}
|
|
}
|
|
|
|
func runSSHWithRecovery(ctx context.Context, logger *log.Logger, cfg config.Config, args []string, repairHosts []string) (string, error) {
|
|
try := func() (string, error) {
|
|
cmd := exec.CommandContext(ctx, "ssh", args...)
|
|
out, err := cmd.CombinedOutput()
|
|
trimmed := strings.TrimSpace(string(out))
|
|
if err != nil {
|
|
if trimmed == "" {
|
|
return "", fmt.Errorf("ssh failed: %w", err)
|
|
}
|
|
return trimmed, fmt.Errorf("ssh failed: %w: %s", err, trimmed)
|
|
}
|
|
return trimmed, nil
|
|
}
|
|
|
|
out, err := try()
|
|
if err == nil {
|
|
return out, nil
|
|
}
|
|
if !sshutil.ShouldAttemptKnownHostsRepair(out, err) {
|
|
return out, err
|
|
}
|
|
|
|
sshutil.RepairKnownHosts(ctx, logger, sshutil.KnownHostsFiles(resolveSSHConfigFile(cfg), resolveSSHIdentityFile(cfg)), repairHosts, cfg.SSHPort)
|
|
return try()
|
|
}
|
|
|
|
func buildSSHBaseArgs(cfg config.Config) []string {
|
|
args := []string{
|
|
"-o", "BatchMode=yes",
|
|
"-o", "ConnectTimeout=8",
|
|
"-o", "StrictHostKeyChecking=accept-new",
|
|
}
|
|
if cfgPath := resolveSSHConfigFile(cfg); cfgPath != "" {
|
|
args = append(args, "-F", cfgPath)
|
|
}
|
|
if idPath := resolveSSHIdentityFile(cfg); idPath != "" {
|
|
args = append(args, "-i", idPath)
|
|
}
|
|
if cfg.SSHPort > 0 {
|
|
args = append(args, "-p", strconv.Itoa(cfg.SSHPort))
|
|
}
|
|
if cfg.SSHJumpHost != "" {
|
|
jump := cfg.SSHJumpHost
|
|
if cfg.SSHJumpUser != "" {
|
|
jump = cfg.SSHJumpUser + "@" + jump
|
|
}
|
|
if cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
|
|
jump = fmt.Sprintf("%s:%d", jump, cfg.SSHPort)
|
|
}
|
|
args = append(args, "-J", jump)
|
|
}
|
|
return args
|
|
}
|
|
|
|
func resolveSSHConfigFile(cfg config.Config) string {
|
|
if strings.TrimSpace(cfg.SSHConfigFile) != "" {
|
|
return strings.TrimSpace(cfg.SSHConfigFile)
|
|
}
|
|
candidates := []string{
|
|
"/home/atlas/.ssh/config",
|
|
"/home/tethys/.ssh/config",
|
|
}
|
|
for _, p := range candidates {
|
|
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
|
|
return p
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func resolveSSHIdentityFile(cfg config.Config) string {
|
|
if strings.TrimSpace(cfg.SSHIdentityFile) != "" {
|
|
return strings.TrimSpace(cfg.SSHIdentityFile)
|
|
}
|
|
candidates := []string{
|
|
"/home/atlas/.ssh/id_ed25519",
|
|
"/home/tethys/.ssh/id_ed25519",
|
|
}
|
|
for _, p := range candidates {
|
|
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
|
|
return p
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func maxInt(a, b int) int {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|