ananke/cmd/ananke/command_handlers.go

421 lines
16 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"sort"
"strings"
"syscall"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/service"
"scm.bstein.dev/bstein/ananke/internal/state"
)
type startupStatusSnapshot struct {
StartedAt time.Time `json:"started_at"`
Completed time.Time `json:"completed_at"`
Reason string `json:"reason"`
Status string `json:"status"`
Phase string `json:"phase"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Checks map[string]startupCheckRecord `json:"checks"`
AutoHeals []string `json:"auto_heals"`
SourceHost string `json:"source_host"`
LastUpdated time.Time `json:"last_updated"`
}
type startupCheckRecord struct {
Status string `json:"status"`
Detail string `json:"detail"`
UpdatedAt time.Time `json:"updated_at"`
}
var (
buildOrchestratorCommand = buildOrchestrator
tryPeerBootstrapHandoffCommand = tryPeerBootstrapHandoff
coordinatorAllowsPeerFallbackCommand = coordinatorAllowsPeerFallbackStartup
buildUPSTargetsCommand = buildUPSTargets
ensureStartupPowerSafeCommand = ensureStartupPowerSafe
startupOrchestratorCommand = func(ctx context.Context, orch *cluster.Orchestrator, opts cluster.StartupOptions) error {
return orch.Startup(ctx, opts)
}
shutdownOrchestratorCommand = func(ctx context.Context, orch *cluster.Orchestrator, opts cluster.ShutdownOptions) error {
return orch.Shutdown(ctx, opts)
}
etcdRestoreOrchestratorCommand = func(ctx context.Context, orch *cluster.Orchestrator, opts cluster.EtcdRestoreOptions) error {
return orch.EtcdRestore(ctx, opts)
}
daemonRunCommand = func(ctx context.Context, daemon *service.Daemon) error { return daemon.Run(ctx) }
readIntentCommand = state.ReadIntent
writeIntentCommand = state.MustWriteIntent
)
// runStartup runs one orchestration or CLI step.
// Signature: runStartup(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runStartup(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("startup", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
forceBranch := fs.String("force-flux-branch", "", "Patch Flux source branch before resume")
skipLocalBootstrap := fs.Bool("skip-local-bootstrap", false, "Skip local fallback bootstrap applies")
allowPeerStartup := fs.Bool("allow-peer-startup", false, "Allow startup to run on a peer instance")
autoPeerFailover := fs.Bool("auto-peer-failover", false, "On peer role, try coordinator bootstrap handoff first and only run local startup as fallback")
peerWaitSeconds := fs.Int("peer-wait-seconds", 180, "How long auto peer failover waits for coordinator handoff before local fallback startup")
allowOnBattery := fs.Bool("allow-on-battery", false, "Allow startup when UPS reports on-battery")
reason := fs.String("reason", "manual-startup", "Startup reason for run history")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestratorCommand(logger, *configPath, !*execute)
if err != nil {
return err
}
allowPeer := *allowPeerStartup
if *execute {
if cfg.Coordination.Role == "peer" && !allowPeer {
if *autoPeerFailover {
handoffCtx, cancel := context.WithTimeout(context.Background(), time.Duration(maxInt(*peerWaitSeconds, 1))*time.Second)
defer cancel()
handoff, handoffErr := tryPeerBootstrapHandoffCommand(handoffCtx, cfg, logger)
if handoffErr != nil {
logger.Printf("warning: peer bootstrap handoff failed: %v", handoffErr)
}
if handoff {
logger.Printf("peer startup handoff complete; skipping local startup")
return nil
}
guardCtx, guardCancel := context.WithTimeout(context.Background(), time.Duration(maxInt(cfg.Coordination.CommandTimeoutSeconds, 15))*time.Second)
defer guardCancel()
allowed, guardReason, guardErr := coordinatorAllowsPeerFallbackCommand(guardCtx, cfg, logger)
if guardErr != nil {
return fmt.Errorf("startup blocked: unable to evaluate coordinator startup guard: %w", guardErr)
}
if !allowed {
return fmt.Errorf("startup blocked: coordinator guard disallowed peer fallback (%s)", guardReason)
}
logger.Printf("peer startup handoff unavailable; proceeding with local peer startup fallback")
allowPeer = true
} else {
return fmt.Errorf("startup blocked: this instance is configured as role=peer (use --allow-peer-startup to override)")
}
}
if cfg.UPS.Enabled && !cfg.Coordination.AllowStartupOnBattery && !*allowOnBattery {
targets, targetErr := buildUPSTargetsCommand(cfg)
if targetErr != nil {
return targetErr
}
checkCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := ensureStartupPowerSafeCommand(checkCtx, targets, cfg.Startup.MinimumBatteryPercent); err != nil {
return err
}
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return startupOrchestratorCommand(ctx, orch, cluster.StartupOptions{
ForceFluxBranch: *forceBranch,
SkipLocalBootstrap: *skipLocalBootstrap,
Reason: *reason,
})
}
// runShutdown runs one orchestration or CLI step.
// Signature: runShutdown(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runShutdown(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("shutdown", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
execute := fs.Bool("execute", false, "Actually execute changes (default dry-run)")
skipEtcd := fs.Bool("skip-etcd-snapshot", false, "Skip etcd snapshot")
skipDrain := fs.Bool("skip-drain", false, "Skip worker drain")
mode := fs.String("mode", "cluster-only", "Shutdown mode: config|cluster-only")
reason := fs.String("reason", "manual-shutdown", "Shutdown reason for run history")
_ = fs.Parse(args)
_, orch, err := buildOrchestratorCommand(logger, *configPath, !*execute)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return shutdownOrchestratorCommand(ctx, orch, cluster.ShutdownOptions{
SkipEtcdSnapshot: *skipEtcd,
SkipDrain: *skipDrain,
Mode: *mode,
Reason: *reason,
})
}
// runDaemon runs one orchestration or CLI step.
// Signature: runDaemon(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runDaemon(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("daemon", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
dryRunActions := fs.Bool("dry-run-actions", false, "Log planned actions without executing")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestratorCommand(logger, *configPath, *dryRunActions)
if err != nil {
return err
}
if !cfg.UPS.Enabled {
return fmt.Errorf("UPS monitoring is disabled in config")
}
targets, err := buildUPSTargetsCommand(cfg)
if err != nil {
return err
}
d := service.NewDaemon(cfg, orch, targets, logger)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := daemonRunCommand(ctx, d); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
// runEtcdRestore runs one orchestration or CLI step.
// Signature: runEtcdRestore(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runEtcdRestore(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("etcd-restore", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
execute := fs.Bool("execute", false, "Actually execute restore (default dry-run)")
controlPlane := fs.String("control-plane", "", "Control plane to run restore on (defaults to startup.etcd_restore_control_plane)")
snapshotPath := fs.String("snapshot", "", "Explicit snapshot path (defaults to latest on selected control plane)")
_ = fs.Parse(args)
_, orch, err := buildOrchestratorCommand(logger, *configPath, !*execute)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return etcdRestoreOrchestratorCommand(ctx, orch, cluster.EtcdRestoreOptions{
ControlPlane: *controlPlane,
SnapshotPath: *snapshotPath,
})
}
// runStatus runs one orchestration or CLI step.
// Signature: runStatus(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runStatus(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("status", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
jsonOut := fs.Bool("json", false, "Emit startup status snapshot as JSON")
_ = fs.Parse(args)
cfg, orch, err := buildOrchestratorCommand(logger, *configPath, true)
if err != nil {
return err
}
recs, err := state.New(cfg.State.RunHistoryPath).Load()
if err != nil {
return err
}
last := "none"
if len(recs) > 0 {
r := recs[len(recs)-1]
last = fmt.Sprintf("%s success=%t duration=%ds at=%s", r.Action, r.Success, r.DurationSeconds, r.EndedAt.Format(time.RFC3339))
}
snapshot, snapshotPath, snapshotErr := loadStartupStatusSnapshot(cfg.State.Dir)
if snapshotErr != nil {
logger.Printf("startup_status_read_error=%v", snapshotErr)
}
logger.Printf("expected_flux_branch=%s", cfg.ExpectedFluxBranch)
logger.Printf("control_planes=%v", cfg.ControlPlanes)
logger.Printf("estimated_shutdown_budget_seconds=%d", orch.EstimatedShutdownSeconds())
logger.Printf("estimated_emergency_shutdown_budget_seconds=%d", orch.EstimatedEmergencyShutdownSeconds())
intent, intentErr := readIntentCommand(cfg.State.IntentPath)
if intentErr != nil {
logger.Printf("intent_read_error=%v", intentErr)
} else if intent.State == "" {
logger.Printf("intent=none")
} else {
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
intent.State, intent.Reason, intent.Source, intent.UpdatedAt.Format(time.RFC3339))
}
if snapshot != nil {
status := strings.TrimSpace(snapshot.Status)
if status == "" {
if snapshot.Completed.IsZero() {
status = "running"
} else if snapshot.Success {
status = "success"
} else {
status = "failed"
}
}
phase := strings.TrimSpace(snapshot.Phase)
if phase == "" {
phase = "unknown"
}
passed, failed, running, other, failedChecks := summarizeChecklist(snapshot.Checks)
totalChecks := len(snapshot.Checks)
logger.Printf("startup_status_source=%s", snapshotPath)
logger.Printf("startup_status=%s", status)
logger.Printf("startup_phase=%s", phase)
logger.Printf("startup_reason=%q", strings.TrimSpace(snapshot.Reason))
logger.Printf("startup_started_at=%s", formatMaybeTime(snapshot.StartedAt))
logger.Printf("startup_last_updated=%s", formatMaybeTime(snapshot.LastUpdated))
logger.Printf("startup_completed_at=%s", formatMaybeTime(snapshot.Completed))
logger.Printf("startup_checklist_total=%d passed=%d failed=%d running=%d other=%d", totalChecks, passed, failed, running, other)
if len(snapshot.AutoHeals) > 0 {
logger.Printf("startup_auto_heals=%d", len(snapshot.AutoHeals))
for _, heal := range snapshot.AutoHeals {
logger.Printf("startup_auto_heal_detail=%s", strings.TrimSpace(heal))
}
}
if len(failedChecks) > 0 {
for _, entry := range failedChecks {
logger.Printf("startup_failed_check=%s", entry)
}
}
if *jsonOut {
payload, err := json.MarshalIndent(snapshot, "", " ")
if err != nil {
return fmt.Errorf("encode startup status json: %w", err)
}
fmt.Println(string(payload))
}
}
logger.Printf("last_run=%s", last)
return nil
}
// loadStartupStatusSnapshot runs one orchestration or CLI step.
// Signature: loadStartupStatusSnapshot(stateDir string) (*startupStatusSnapshot, string, error).
// Why: `ananke status` should expose active checklist progress while startup is
// still running, not only terminal run history.
func loadStartupStatusSnapshot(stateDir string) (*startupStatusSnapshot, string, error) {
stateDir = strings.TrimSpace(stateDir)
if stateDir == "" {
return nil, "", nil
}
candidates := []string{
filepath.Join(stateDir, "startup-progress.json"),
filepath.Join(stateDir, "last-startup-report.json"),
}
for _, path := range candidates {
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return nil, path, err
}
var snapshot startupStatusSnapshot
if err := json.Unmarshal(data, &snapshot); err != nil {
return nil, path, fmt.Errorf("parse %s: %w", path, err)
}
if snapshot.Checks == nil {
snapshot.Checks = map[string]startupCheckRecord{}
}
return &snapshot, path, nil
}
return nil, "", nil
}
// summarizeChecklist runs one orchestration or CLI step.
// Signature: summarizeChecklist(checks map[string]startupCheckRecord) (int, int, int, int, []string).
// Why: status polling should show checklist completion in a compact operator-
// friendly form, with explicit failed check details.
func summarizeChecklist(checks map[string]startupCheckRecord) (int, int, int, int, []string) {
passed := 0
failed := 0
running := 0
other := 0
failedChecks := []string{}
names := make([]string, 0, len(checks))
for name := range checks {
names = append(names, name)
}
sort.Strings(names)
for _, name := range names {
check := checks[name]
switch strings.TrimSpace(check.Status) {
case "passed":
passed++
case "failed":
failed++
detail := strings.TrimSpace(check.Detail)
if detail == "" {
detail = "no detail provided"
}
failedChecks = append(failedChecks, fmt.Sprintf("%s: %s", name, detail))
case "running":
running++
default:
other++
}
}
return passed, failed, running, other, failedChecks
}
// formatMaybeTime runs one orchestration or CLI step.
// Signature: formatMaybeTime(value time.Time) string.
// Why: status output should avoid zero-time noise while still being parseable.
func formatMaybeTime(value time.Time) string {
if value.IsZero() {
return "none"
}
return value.UTC().Format(time.RFC3339)
}
// runIntent runs one orchestration or CLI step.
// Signature: runIntent(logger *log.Logger, args []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func runIntent(logger *log.Logger, args []string) error {
fs := flag.NewFlagSet("intent", flag.ExitOnError)
configPath := fs.String("config", "/etc/ananke/ananke.yaml", "Path to config file")
setState := fs.String("set", "", "Set intent state (normal|startup_in_progress|shutting_down|shutdown_complete)")
reason := fs.String("reason", "manual-intent", "Intent reason (used with --set)")
source := fs.String("source", "manual", "Intent source (used with --set)")
execute := fs.Bool("execute", false, "Actually write intent state (default read-only)")
_ = fs.Parse(args)
cfg, err := config.Load(*configPath)
if err != nil {
return err
}
if strings.TrimSpace(*setState) == "" {
in, readErr := readIntentCommand(cfg.State.IntentPath)
if readErr != nil {
return readErr
}
if in.State == "" {
logger.Printf("intent=none")
return nil
}
logger.Printf("intent=%s reason=%q source=%s updated_at=%s",
in.State, in.Reason, in.Source, in.UpdatedAt.Format(time.RFC3339))
return nil
}
if !*execute {
return fmt.Errorf("refusing to write intent without --execute")
}
stateValue := strings.TrimSpace(*setState)
if err := writeIntentCommand(cfg.State.IntentPath, stateValue, *reason, *source); err != nil {
return err
}
logger.Printf("intent updated: state=%s reason=%q source=%s", stateValue, *reason, *source)
return nil
}