package cluster import ( "context" "fmt" "os" "os/exec" "strconv" "strings" "time" "scm.bstein.dev/bstein/ananke/internal/sshutil" ) // waitForPostStartProbes runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error { if o.runner.DryRun { return nil } wait := time.Duration(o.cfg.Startup.PostStartProbeWaitSeconds) * time.Second if wait <= 0 { wait = 240 * time.Second } poll := time.Duration(o.cfg.Startup.PostStartProbePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} for { ok, failure := o.postStartProbesReady(ctx) if ok { o.log.Printf("post-start probes passed") return nil } if failure != lastFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for post-start probes (%s remaining): %s", remaining, failure) lastLogged = time.Now() } lastFailure = failure if time.Now().After(deadline) { return fmt.Errorf("startup blocked: post-start probes did not pass within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // postStartProbesReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string) { probes := make([]string, 0, len(o.cfg.Startup.PostStartProbes)) for _, p := range o.cfg.Startup.PostStartProbes { p = strings.TrimSpace(p) if p != "" { probes = append(probes, p) } } if len(probes) == 0 { return true, "no probes configured" } for _, probe := range probes { code, err := o.httpProbe(ctx, probe) if err != nil { return false, fmt.Sprintf("%s: %v", probe, err) } if !probeStatusAccepted(probe, code) { return false, fmt.Sprintf("%s: unexpected status code=%d", probe, code) } } return true, "all probes successful" } // probeStatusAccepted runs one orchestration or CLI step. // Signature: probeStatusAccepted(_ string, code int) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func probeStatusAccepted(_ string, code int) bool { if code >= 200 && code < 400 { return true } // Auth fronts often return unauthorized/forbidden while still proving the service is up. if code == 401 || code == 403 { return true } return false } // httpProbe runs one orchestration or CLI step. // Signature: (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error) { out, err := o.run( ctx, 20*time.Second, "curl", "--silent", "--show-error", "--location", "--max-time", "12", "--output", "/dev/null", "--write-out", "%{http_code}", probeURL, ) if err != nil { return 0, err } code, convErr := strconv.Atoi(strings.TrimSpace(out)) if convErr != nil { return 0, fmt.Errorf("parse http status %q: %w", strings.TrimSpace(out), convErr) } return code, nil } // resumeFluxAndReconcile runs one orchestration or CLI step. // Signature: (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error { if err := o.patchFluxSuspendAll(ctx, false); err != nil { return err } now := time.Now().UTC().Format(time.RFC3339) if _, err := o.kubectl( ctx, 25*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err) } if _, err := o.kubectl( ctx, 25*time.Second, "annotate", "--all-namespaces", "helmreleases.helm.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err) } if o.runner.CommandExists("flux") { sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"} if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil { o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err) } } return nil } // kubectl runs one orchestration or CLI step. // Signature: (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) { return o.run(ctx, timeout, "kubectl", args...) } // ssh runs one orchestration or CLI step. // Signature: (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { return o.sshWithTimeout(ctx, node, command, 45*time.Second) } // sshWithTimeout runs one orchestration or CLI step. // Signature: (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error) { host := node if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) } sshUser := o.cfg.SSHUser if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" { sshUser = strings.TrimSpace(override) } target := host if sshUser != "" { target = sshUser + "@" + host } sshConfigFile := o.resolveSSHConfigFile() sshIdentity := o.resolveSSHIdentityFile() baseArgs := []string{ "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", "-o", "StrictHostKeyChecking=accept-new", } if sshConfigFile != "" { baseArgs = append(baseArgs, "-F", sshConfigFile) } if sshIdentity != "" { baseArgs = append(baseArgs, "-i", sshIdentity) } if o.cfg.SSHPort > 0 { baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort)) } attempts := make([][]string, 0, 2) attemptNames := make([]string, 0, 2) knownHostsFiles := sshutil.KnownHostsFiles(sshConfigFile, sshIdentity) repairHosts := []string{node, host} if o.cfg.SSHJumpHost != "" { jump := o.cfg.SSHJumpHost repairHosts = append(repairHosts, jump) if mapped, ok := o.cfg.SSHNodeHosts[jump]; ok && strings.TrimSpace(mapped) != "" { repairHosts = append(repairHosts, strings.TrimSpace(mapped)) } if o.cfg.SSHJumpUser != "" { jump = o.cfg.SSHJumpUser + "@" + jump } if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort) } withJump := append([]string{}, baseArgs...) withJump = append(withJump, "-J", jump, target, command) attempts = append(attempts, withJump) attemptNames = append(attemptNames, "jump") } direct := append([]string{}, baseArgs...) direct = append(direct, target, command) attempts = append(attempts, direct) attemptNames = append(attemptNames, "direct") var lastOut string var lastErr error for i, args := range attempts { out, err := o.run(ctx, timeout, "ssh", args...) if err == nil { if i > 0 { o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i]) } return out, nil } if sshutil.ShouldAttemptKnownHostsRepair(out, err) { o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i]) sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort) retryOut, retryErr := o.run(ctx, timeout, "ssh", args...) if retryErr == nil { return retryOut, nil } out = retryOut err = retryErr } lastOut = out lastErr = err if i < len(attempts)-1 { o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1]) } } return lastOut, lastErr } // run runs one orchestration or CLI step. // Signature: (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { if o.runOverride != nil { return o.runOverride(ctx, timeout, name, args...) } runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return o.runner.Run(runCtx, name, args...) } // runSensitive runs one orchestration or CLI step. // Signature: (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { if o.runSensitiveOverride != nil { return o.runSensitiveOverride(ctx, timeout, name, args...) } runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cmd := exec.CommandContext(runCtx, name, args...) cmd.Env = os.Environ() if o.runner.Kubeconfig != "" { cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig) } out, err := cmd.CombinedOutput() trimmed := strings.TrimSpace(string(out)) if err != nil { if trimmed == "" { return "", fmt.Errorf("%s failed: %w", name, err) } return trimmed, fmt.Errorf("%s failed: %w", name, err) } return trimmed, nil } // lines runs one orchestration or CLI step. // Signature: lines(in string) []string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func lines(in string) []string { in = strings.TrimSpace(in) if in == "" { return nil } parts := strings.Split(in, "\n") out := make([]string, 0, len(parts)) for _, p := range parts { v := strings.TrimSpace(p) if v != "" { out = append(out, v) } } return out } // sshManaged runs one orchestration or CLI step. // Signature: (o *Orchestrator) sshManaged(node string) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) sshManaged(node string) bool { if len(o.cfg.SSHManagedNodes) == 0 { return true } for _, allowed := range o.cfg.SSHManagedNodes { if strings.TrimSpace(allowed) == node { return true } } return false } // resolveSSHConfigFile runs one orchestration or CLI step. // Signature: (o *Orchestrator) resolveSSHConfigFile() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) resolveSSHConfigFile() string { if strings.TrimSpace(o.cfg.SSHConfigFile) != "" { return strings.TrimSpace(o.cfg.SSHConfigFile) } candidates := []string{ "/home/atlas/.ssh/config", "/home/tethys/.ssh/config", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } // resolveSSHIdentityFile runs one orchestration or CLI step. // Signature: (o *Orchestrator) resolveSSHIdentityFile() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) resolveSSHIdentityFile() string { if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" { return strings.TrimSpace(o.cfg.SSHIdentityFile) } candidates := []string{ "/home/atlas/.ssh/id_ed25519", "/home/tethys/.ssh/id_ed25519", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" }