ananke/internal/cluster/orchestrator_poststart_exec.go

382 lines
12 KiB
Go

package cluster
import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/sshutil"
)
// waitForPostStartProbes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
wait := time.Duration(o.cfg.Startup.PostStartProbeWaitSeconds) * time.Second
if wait <= 0 {
wait = 240 * time.Second
}
poll := time.Duration(o.cfg.Startup.PostStartProbePollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
for {
ok, failure := o.postStartProbesReady(ctx)
if ok {
o.log.Printf("post-start probes passed")
return nil
}
if failure != lastFailure || time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for post-start probes (%s remaining): %s", remaining, failure)
lastLogged = time.Now()
}
lastFailure = failure
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: post-start probes did not pass within %s (%s)", wait, lastFailure)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// postStartProbesReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string) {
probes := make([]string, 0, len(o.cfg.Startup.PostStartProbes))
for _, p := range o.cfg.Startup.PostStartProbes {
p = strings.TrimSpace(p)
if p != "" {
probes = append(probes, p)
}
}
if len(probes) == 0 {
return true, "no probes configured"
}
for _, probe := range probes {
code, err := o.httpProbe(ctx, probe)
if err != nil {
return false, fmt.Sprintf("%s: %v", probe, err)
}
if !probeStatusAccepted(probe, code) {
return false, fmt.Sprintf("%s: unexpected status code=%d", probe, code)
}
}
return true, "all probes successful"
}
// probeStatusAccepted runs one orchestration or CLI step.
// Signature: probeStatusAccepted(_ string, code int) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func probeStatusAccepted(_ string, code int) bool {
if code >= 200 && code < 400 {
return true
}
// Auth fronts often return unauthorized/forbidden while still proving the service is up.
if code == 401 || code == 403 {
return true
}
return false
}
// httpProbe runs one orchestration or CLI step.
// Signature: (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error) {
out, err := o.run(
ctx,
20*time.Second,
"curl",
"--silent",
"--show-error",
"--location",
"--max-time",
"12",
"--output",
"/dev/null",
"--write-out",
"%{http_code}",
probeURL,
)
if err != nil {
return 0, err
}
code, convErr := strconv.Atoi(strings.TrimSpace(out))
if convErr != nil {
return 0, fmt.Errorf("parse http status %q: %w", strings.TrimSpace(out), convErr)
}
return code, nil
}
// resumeFluxAndReconcile runs one orchestration or CLI step.
// Signature: (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error {
if err := o.patchFluxSuspendAll(ctx, false); err != nil {
return err
}
now := time.Now().UTC().Format(time.RFC3339)
if _, err := o.kubectl(
ctx,
25*time.Second,
"-n", "flux-system",
"annotate",
"kustomizations.kustomize.toolkit.fluxcd.io",
"--all",
"reconcile.fluxcd.io/requestedAt="+now,
"--overwrite",
); err != nil {
o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err)
}
if _, err := o.kubectl(
ctx,
25*time.Second,
"annotate",
"--all-namespaces",
"helmreleases.helm.toolkit.fluxcd.io",
"--all",
"reconcile.fluxcd.io/requestedAt="+now,
"--overwrite",
); err != nil {
o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err)
}
if o.runner.CommandExists("flux") {
sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"}
if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil {
o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err)
}
}
return nil
}
// kubectl runs one orchestration or CLI step.
// Signature: (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) {
return o.run(ctx, timeout, "kubectl", args...)
}
// ssh runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
return o.sshWithTimeout(ctx, node, command, 45*time.Second)
}
// sshWithTimeout runs one orchestration or CLI step.
// Signature: (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error) {
host := node
if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
sshUser := o.cfg.SSHUser
if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" {
sshUser = strings.TrimSpace(override)
}
target := host
if sshUser != "" {
target = sshUser + "@" + host
}
sshConfigFile := o.resolveSSHConfigFile()
sshIdentity := o.resolveSSHIdentityFile()
baseArgs := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if sshConfigFile != "" {
baseArgs = append(baseArgs, "-F", sshConfigFile)
}
if sshIdentity != "" {
baseArgs = append(baseArgs, "-i", sshIdentity)
}
if o.cfg.SSHPort > 0 {
baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort))
}
attempts := make([][]string, 0, 2)
attemptNames := make([]string, 0, 2)
knownHostsFiles := sshutil.KnownHostsFiles(sshConfigFile, sshIdentity)
repairHosts := []string{node, host}
if o.cfg.SSHJumpHost != "" {
jump := o.cfg.SSHJumpHost
repairHosts = append(repairHosts, jump)
if mapped, ok := o.cfg.SSHNodeHosts[jump]; ok && strings.TrimSpace(mapped) != "" {
repairHosts = append(repairHosts, strings.TrimSpace(mapped))
}
if o.cfg.SSHJumpUser != "" {
jump = o.cfg.SSHJumpUser + "@" + jump
}
if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort)
}
withJump := append([]string{}, baseArgs...)
withJump = append(withJump, "-J", jump, target, command)
attempts = append(attempts, withJump)
attemptNames = append(attemptNames, "jump")
}
direct := append([]string{}, baseArgs...)
direct = append(direct, target, command)
attempts = append(attempts, direct)
attemptNames = append(attemptNames, "direct")
var lastOut string
var lastErr error
for i, args := range attempts {
out, err := o.run(ctx, timeout, "ssh", args...)
if err == nil {
if i > 0 {
o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i])
}
return out, nil
}
if sshutil.ShouldAttemptKnownHostsRepair(out, err) {
o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i])
sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort)
retryOut, retryErr := o.run(ctx, timeout, "ssh", args...)
if retryErr == nil {
return retryOut, nil
}
out = retryOut
err = retryErr
}
lastOut = out
lastErr = err
if i < len(attempts)-1 {
o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1])
}
}
return lastOut, lastErr
}
// run runs one orchestration or CLI step.
// Signature: (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
if o.runOverride != nil {
return o.runOverride(ctx, timeout, name, args...)
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return o.runner.Run(runCtx, name, args...)
}
// runSensitive runs one orchestration or CLI step.
// Signature: (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
if o.runSensitiveOverride != nil {
return o.runSensitiveOverride(ctx, timeout, name, args...)
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cmd := exec.CommandContext(runCtx, name, args...)
cmd.Env = os.Environ()
if o.runner.Kubeconfig != "" {
cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig)
}
out, err := cmd.CombinedOutput()
trimmed := strings.TrimSpace(string(out))
if err != nil {
if trimmed == "" {
return "", fmt.Errorf("%s failed: %w", name, err)
}
return trimmed, fmt.Errorf("%s failed: %w", name, err)
}
return trimmed, nil
}
// lines runs one orchestration or CLI step.
// Signature: lines(in string) []string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func lines(in string) []string {
in = strings.TrimSpace(in)
if in == "" {
return nil
}
parts := strings.Split(in, "\n")
out := make([]string, 0, len(parts))
for _, p := range parts {
v := strings.TrimSpace(p)
if v != "" {
out = append(out, v)
}
}
return out
}
// sshManaged runs one orchestration or CLI step.
// Signature: (o *Orchestrator) sshManaged(node string) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) sshManaged(node string) bool {
if len(o.cfg.SSHManagedNodes) == 0 {
return true
}
for _, allowed := range o.cfg.SSHManagedNodes {
if strings.TrimSpace(allowed) == node {
return true
}
}
return false
}
// resolveSSHConfigFile runs one orchestration or CLI step.
// Signature: (o *Orchestrator) resolveSSHConfigFile() string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) resolveSSHConfigFile() string {
if strings.TrimSpace(o.cfg.SSHConfigFile) != "" {
return strings.TrimSpace(o.cfg.SSHConfigFile)
}
candidates := []string{
"/home/atlas/.ssh/config",
"/home/tethys/.ssh/config",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
// resolveSSHIdentityFile runs one orchestration or CLI step.
// Signature: (o *Orchestrator) resolveSSHIdentityFile() string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) resolveSSHIdentityFile() string {
if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" {
return strings.TrimSpace(o.cfg.SSHIdentityFile)
}
candidates := []string{
"/home/atlas/.ssh/id_ed25519",
"/home/tethys/.ssh/id_ed25519",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}