ananke/cmd/hecate/main.go

406 lines
12 KiB
Go

package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"scm.bstein.dev/bstein/hecate/internal/cluster"
"scm.bstein.dev/bstein/hecate/internal/config"
"scm.bstein.dev/bstein/hecate/internal/execx"
"scm.bstein.dev/bstein/hecate/internal/service"
"scm.bstein.dev/bstein/hecate/internal/state"
"scm.bstein.dev/bstein/hecate/internal/ups"
)
func main() {
logger := log.New(os.Stdout, "[hecate] ", log.LstdFlags)
if len(os.Args) < 2 {
usage()
os.Exit(2)
}
sub := os.Args[1]
switch sub {
case "startup":
if err := runStartup(logger, os.Args[2:]); err != nil {
logger.Printf("startup failed: %v", err)
os.Exit(1)
}
case "shutdown":
if err := runShutdown(logger, os.Args[2:]); err != nil {
logger.Printf("shutdown failed: %v", err)
os.Exit(1)
}
case "daemon":
if err := runDaemon(logger, os.Args[2:]); err != nil {
logger.Printf("daemon failed: %v", err)
os.Exit(1)
}
case "status":
if err := runStatus(logger, os.Args[2:]); err != nil {
logger.Printf("status failed: %v", err)
os.Exit(1)
}
case "help", "-h", "--help":
usage()
default:
logger.Printf("unknown command: %s", sub)
usage()
os.Exit(2)
}
}
func runStartup(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("startup", flag.ExitOnError)
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume")
skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies")
allowPeerStartup := fs.Bool("allow-peer-startup", false, "Allow startup to run on a peer instance")
autoPeerFailover := fs.Bool("auto-peer-failover", false, "On peer role, try coordinator bootstrap handoff first and only run local startup as fallback")
peerWaitSeconds := fs.Int("peer-wait-seconds", 180, "How long auto peer failover waits for coordinator handoff before local fallback startup")
allowOnBattery := fs.Bool("allow-on-battery", false, "Allow startup when UPS reports on-battery")
reason := fs.String("reason", "manual-startup", "Startup reason for run history")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestrator(logger, *configPath, !*execute)
if err != nil {
return err
}
allowPeer := *allowPeerStartup
if *execute {
if cfg.Coordination.Role == "peer" && !allowPeer {
if *autoPeerFailover {
handoffCtx, cancel := context.WithTimeout(context.Background(), time.Duration(maxInt(*peerWaitSeconds, 1))*time.Second)
defer cancel()
handoff, handoffErr := tryPeerBootstrapHandoff(handoffCtx, cfg, logger)
if handoffErr != nil {
logger.Printf("warning: peer bootstrap handoff failed: %v", handoffErr)
}
if handoff {
logger.Printf("peer startup handoff complete; skipping local startup")
return nil
}
logger.Printf("peer startup handoff unavailable; proceeding with local peer startup fallback")
allowPeer = true
} else {
return fmt.Errorf("startup blocked: this instance is configured as role=peer (use --allow-peer-startup to override)")
}
}
if cfg.UPS.Enabled && !cfg.Coordination.AllowStartupOnBattery && !*allowOnBattery {
targets, targetErr := buildUPSTargets(cfg)
if targetErr != nil {
return targetErr
}
checkCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := ensureStartupPowerSafe(checkCtx, targets); err != nil {
return err
}
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return orch.Startup(ctx, cluster.StartupOptions{
ForceFluxBranch: *forceBranch,
SkipLocalBootstrap: *skipLocalBootstrap,
Reason: *reason,
})
}
func runShutdown(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("shutdown", flag.ExitOnError)
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
skipEtcd := fs.Bool("skip-etcd-snapshot", false, "Skip etcd snapshot")
skipDrain := fs.Bool("skip-drain", false, "Skip worker drain")
reason := fs.String("reason", "manual-shutdown", "Shutdown reason for run history")
_ = fs.Parse(args)
_, orch, err := buildOrchestrator(logger, *configPath, !*execute)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return orch.Shutdown(ctx, cluster.ShutdownOptions{
SkipEtcdSnapshot: *skipEtcd,
SkipDrain: *skipDrain,
Reason: *reason,
})
}
func runDaemon(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("daemon", flag.ExitOnError)
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
dryRunActions := fs.Bool("dry-run-actions", false, "Log planned actions without executing")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestrator(logger, *configPath, *dryRunActions)
if err != nil {
return err
}
if !cfg.UPS.Enabled {
return fmt.Errorf("UPS monitoring is disabled in config")
}
targets, err := buildUPSTargets(cfg)
if err != nil {
return err
}
d := service.NewDaemon(cfg, orch, targets, logger)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := d.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
func runStatus(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("status", flag.ExitOnError)
configPath := fs.String("config", "/etc/hecate/hecate.yaml", "Path to config file")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestrator(logger, *configPath, true)
if err != nil {
return err
}
recs, err := state.New(cfg.State.RunHistoryPath).Load()
if err != nil {
return err
}
last := "none"
if len(recs) > 0 {
r := recs[len(recs)-1]
last = fmt.Sprintf("%s success=%t duration=%ds at=%s", r.Action, r.Success, r.DurationSeconds, r.EndedAt.Format(time.RFC3339))
}
logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch)
logger.Printf("control_planes=%v", cfg.ControlPlanes)
logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds())
intent, intentErr := state.ReadIntent(cfg.State.IntentPath)
if intentErr != nil {
logger.Printf("intent_read_error=%v", intentErr)
} else if intent.State == "" {
logger.Printf("intent=none")
} else {
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
intent.State, intent.Reason, intent.Source, intent.UpdatedAt.Format(time.RFC3339))
}
logger.Printf("last_run=%s", last)
return nil
}
func buildUPSTargets(cfg config.Config) ([]service.Target, error) {
targets := make([]service.Target, 0, len(cfg.UPS.Targets)+1)
switch cfg.UPS.Provider {
case "nut":
if len(cfg.UPS.Targets) == 0 {
target := cfg.UPS.Target
if target == "" {
return nil, fmt.Errorf("ups target must be set")
}
targets = append(targets, service.Target{
Name: "primary",
Target: target,
Provider: ups.NewNUTProvider(target),
})
} else {
for idx, t := range cfg.UPS.Targets {
name := t.Name
if name == "" {
name = fmt.Sprintf("target-%d", idx+1)
}
targets = append(targets, service.Target{
Name: name,
Target: t.Target,
Provider: ups.NewNUTProvider(t.Target),
})
}
}
default:
return nil, fmt.Errorf("unsupported UPS provider: %s", cfg.UPS.Provider)
}
return targets, nil
}
func ensureStartupPowerSafe(ctx context.Context, targets []service.Target) error {
onBatteryTargets := []string{}
for _, t := range targets {
sample, err := t.Provider.Read(ctx)
if err != nil {
return fmt.Errorf("startup blocked: unable to verify UPS target %s (%s): %w", t.Name, t.Target, err)
}
if sample.OnBattery {
onBatteryTargets = append(onBatteryTargets, fmt.Sprintf("%s(status=%s runtime_s=%d)", t.Name, sample.RawStatus, sample.RuntimeSeconds))
}
}
if len(onBatteryTargets) > 0 {
return fmt.Errorf("startup blocked: UPS is on battery for %s", strings.Join(onBatteryTargets, ", "))
}
return nil
}
func buildOrchestrator(logger *log.Logger, cfgPath string, dryRun bool) (config.Config, *cluster.Orchestrator, error) {
cfg, err := config.Load(cfgPath)
if err != nil {
return config.Config{}, nil, err
}
if err := state.EnsureDir(cfg.State.Dir); err != nil {
return config.Config{}, nil, err
}
runner := &execx.Runner{
DryRun: dryRun,
Kubeconfig: cfg.Kubeconfig,
Logger: logger,
}
store := state.New(cfg.State.RunHistoryPath)
orch := cluster.New(cfg, runner, store, logger)
return cfg, orch, nil
}
func usage() {
fmt.Print(`hecate: staged startup/shutdown + UPS-triggered protection
Usage:
hecate <command> [flags]
Commands:
startup Perform staged cluster startup
shutdown Perform graceful cluster shutdown
daemon Monitor UPS and auto-trigger shutdown
status Print current hecate status and estimates
Examples:
hecate startup --config /etc/hecate/hecate.yaml --execute --force-flux-branch main
hecate shutdown --config /etc/hecate/hecate.yaml --execute --reason "manual-maintenance"
hecate daemon --config /etc/hecate/hecate.yaml
hecate status --config /etc/hecate/hecate.yaml
`)
}
func tryPeerBootstrapHandoff(ctx context.Context, cfg config.Config, logger *log.Logger) (bool, error) {
coordinator := strings.TrimSpace(cfg.Coordination.ForwardShutdownHost)
if coordinator == "" {
return false, fmt.Errorf("coordination.forward_shutdown_host is empty for peer role")
}
user := strings.TrimSpace(cfg.Coordination.ForwardShutdownUser)
if user == "" {
if override, ok := cfg.SSHNodeUsers[coordinator]; ok && strings.TrimSpace(override) != "" {
user = strings.TrimSpace(override)
} else {
user = strings.TrimSpace(cfg.SSHUser)
}
}
host := coordinator
if mapped, ok := cfg.SSHNodeHosts[coordinator]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
target := host
if user != "" {
target = user + "@" + host
}
args := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if cfgPath := resolveSSHConfigFile(cfg); cfgPath != "" {
args = append(args, "-F", cfgPath)
}
if idPath := resolveSSHIdentityFile(cfg); idPath != "" {
args = append(args, "-i", idPath)
}
if cfg.SSHPort > 0 {
args = append(args, "-p", strconv.Itoa(cfg.SSHPort))
}
if cfg.SSHJumpHost != "" {
jump := cfg.SSHJumpHost
if cfg.SSHJumpUser != "" {
jump = cfg.SSHJumpUser + "@" + jump
}
if cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, cfg.SSHPort)
}
args = append(args, "-J", jump)
}
remote := "sudo -n systemctl start hecate-bootstrap.service"
attempt := 1
for {
cmdArgs := append(append([]string{}, args...), target, remote)
cmd := exec.CommandContext(ctx, "ssh", cmdArgs...)
out, err := cmd.CombinedOutput()
if err == nil {
logger.Printf("peer bootstrap handoff succeeded on %s (attempt=%d)", coordinator, attempt)
return true, nil
}
trimmed := strings.TrimSpace(string(out))
if trimmed == "" {
logger.Printf("peer bootstrap handoff attempt %d failed for %s: %v", attempt, coordinator, err)
} else {
logger.Printf("peer bootstrap handoff attempt %d failed for %s: %v: %s", attempt, coordinator, err, trimmed)
}
select {
case <-ctx.Done():
return false, fmt.Errorf("coordinator handoff timeout for %s: %w", coordinator, ctx.Err())
case <-time.After(5 * time.Second):
attempt++
}
}
}
func resolveSSHConfigFile(cfg config.Config) string {
if strings.TrimSpace(cfg.SSHConfigFile) != "" {
return strings.TrimSpace(cfg.SSHConfigFile)
}
candidates := []string{
"/home/atlas/.ssh/config",
"/home/tethys/.ssh/config",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func resolveSSHIdentityFile(cfg config.Config) string {
if strings.TrimSpace(cfg.SSHIdentityFile) != "" {
return strings.TrimSpace(cfg.SSHIdentityFile)
}
candidates := []string{
"/home/atlas/.ssh/id_ed25519",
"/home/tethys/.ssh/id_ed25519",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}