464 lines
16 KiB
Go
464 lines
16 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// reconcileNodeAccess runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error {
|
|
if len(nodes) == 0 {
|
|
return nil
|
|
}
|
|
parallelism := o.cfg.Shutdown.SSHParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
if parallelism > len(nodes) {
|
|
parallelism = len(nodes)
|
|
}
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan error, len(nodes))
|
|
for _, node := range nodes {
|
|
node := strings.TrimSpace(node)
|
|
if node == "" || !o.sshManaged(node) {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
if _, err := o.ssh(ctx, node, "sudo -n /usr/bin/systemctl --version"); err != nil {
|
|
errCh <- fmt.Errorf("%s: missing sudo access to /usr/bin/systemctl (--version): %w", node, err)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
close(errCh)
|
|
if len(errCh) == 0 {
|
|
return nil
|
|
}
|
|
samples := []string{}
|
|
for err := range errCh {
|
|
samples = append(samples, err.Error())
|
|
if len(samples) >= 4 {
|
|
break
|
|
}
|
|
}
|
|
return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | "))
|
|
}
|
|
|
|
// waitForNodeSSHAuth runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error {
|
|
if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth {
|
|
return nil
|
|
}
|
|
|
|
wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second
|
|
if wait <= 0 {
|
|
wait = 4 * time.Minute
|
|
}
|
|
poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second
|
|
if poll <= 0 {
|
|
poll = 5 * time.Second
|
|
}
|
|
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
|
|
seen := map[string]struct{}{}
|
|
targets := make([]string, 0, len(nodes))
|
|
for _, node := range nodes {
|
|
node = strings.TrimSpace(node)
|
|
if node == "" {
|
|
continue
|
|
}
|
|
if _, skip := ignored[node]; skip {
|
|
continue
|
|
}
|
|
if _, ok := seen[node]; ok {
|
|
continue
|
|
}
|
|
seen[node] = struct{}{}
|
|
targets = append(targets, node)
|
|
}
|
|
sort.Strings(targets)
|
|
if len(targets) == 0 {
|
|
return nil
|
|
}
|
|
|
|
deadline := time.Now().Add(wait)
|
|
lastFailure := "unknown"
|
|
lastLogged := time.Time{}
|
|
for {
|
|
authDenied := []string{}
|
|
pending := []string{}
|
|
for _, node := range targets {
|
|
if !o.sshManaged(node) {
|
|
authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node))
|
|
continue
|
|
}
|
|
out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second)
|
|
if err != nil {
|
|
detail := strings.TrimSpace(err.Error())
|
|
full := strings.ToLower(strings.TrimSpace(detail + " " + out))
|
|
switch {
|
|
case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"):
|
|
authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node))
|
|
default:
|
|
pending = append(pending, fmt.Sprintf("%s(unreachable)", node))
|
|
}
|
|
continue
|
|
}
|
|
if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") {
|
|
pending = append(pending, fmt.Sprintf("%s(unexpected output)", node))
|
|
}
|
|
}
|
|
if len(authDenied) > 0 {
|
|
sort.Strings(authDenied)
|
|
return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8))
|
|
}
|
|
if len(pending) == 0 {
|
|
return nil
|
|
}
|
|
|
|
sort.Strings(pending)
|
|
lastFailure = joinLimited(pending, 8)
|
|
if time.Now().After(deadline) {
|
|
return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure)
|
|
}
|
|
if time.Since(lastLogged) >= 30*time.Second {
|
|
remaining := time.Until(deadline).Round(time.Second)
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure)
|
|
lastLogged = time.Now()
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(poll):
|
|
}
|
|
}
|
|
}
|
|
|
|
// fluxSourceReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
|
|
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return strings.Contains(out, "True"), nil
|
|
}
|
|
|
|
// reportFluxSource runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) {
|
|
urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}")
|
|
if urlErr == nil {
|
|
currentURL := strings.TrimSpace(urlOut)
|
|
o.log.Printf("flux-source-url=%s", currentURL)
|
|
expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource)
|
|
if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) {
|
|
o.log.Printf("warning: flux source URL is %q, expected %q", currentURL, expectedURL)
|
|
}
|
|
}
|
|
branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}")
|
|
if branchErr == nil {
|
|
branch := strings.TrimSpace(branchOut)
|
|
o.log.Printf("flux-source-branch=%s", branch)
|
|
if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch {
|
|
o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch)
|
|
}
|
|
}
|
|
}
|
|
|
|
// guardFluxSourceDrift runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error {
|
|
urlOut, err := o.kubectl(
|
|
ctx,
|
|
10*time.Second,
|
|
"-n", "flux-system",
|
|
"get", "gitrepository", "flux-system",
|
|
"-o", "jsonpath={.spec.url}",
|
|
)
|
|
if err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: flux gitrepository/flux-system not found while checking source drift")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("read flux source url: %w", err)
|
|
}
|
|
currentURL := strings.TrimSpace(urlOut)
|
|
expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource)
|
|
if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) {
|
|
return fmt.Errorf("startup blocked: flux source url drift detected (current=%q expected=%q)", currentURL, expectedURL)
|
|
}
|
|
|
|
branchOut, err := o.kubectl(
|
|
ctx,
|
|
10*time.Second,
|
|
"-n", "flux-system",
|
|
"get", "gitrepository", "flux-system",
|
|
"-o", "jsonpath={.spec.ref.branch}",
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("read flux source branch: %w", err)
|
|
}
|
|
currentBranch := strings.TrimSpace(branchOut)
|
|
if expectedBranch == "" || currentBranch == expectedBranch || allowBranchPatch {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("startup blocked: flux source branch drift detected (current=%q expected=%q)", currentBranch, expectedBranch)
|
|
}
|
|
|
|
// ensureFluxBranch runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error {
|
|
branch = strings.TrimSpace(branch)
|
|
if branch == "" {
|
|
return nil
|
|
}
|
|
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
10*time.Second,
|
|
"-n", "flux-system",
|
|
"get", "gitrepository", "flux-system",
|
|
"-o", "jsonpath={.spec.ref.branch}",
|
|
)
|
|
if err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch)
|
|
return nil
|
|
}
|
|
return fmt.Errorf("read flux source branch: %w", err)
|
|
}
|
|
current := strings.TrimSpace(out)
|
|
if current == branch {
|
|
return nil
|
|
}
|
|
if !allowPatch {
|
|
return fmt.Errorf("startup blocked: flux source branch is %q but expected %q (use --force-flux-branch to patch intentionally)", current, branch)
|
|
}
|
|
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch)
|
|
if _, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"-n", "flux-system",
|
|
"patch", "gitrepository", "flux-system",
|
|
"--type=merge",
|
|
"-p", patch,
|
|
); err != nil {
|
|
return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err)
|
|
}
|
|
o.log.Printf("updated flux source branch from %q to %q", current, branch)
|
|
return nil
|
|
}
|
|
|
|
// normalizeGitURL runs one orchestration or CLI step.
|
|
// Signature: normalizeGitURL(raw string) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func normalizeGitURL(raw string) string {
|
|
raw = strings.TrimSpace(strings.ToLower(raw))
|
|
raw = strings.TrimSuffix(raw, "/")
|
|
raw = strings.TrimSuffix(raw, ".git")
|
|
return raw
|
|
}
|
|
|
|
// bootstrapLocal runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) bootstrapLocal(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
|
|
failures := 0
|
|
successes := 0
|
|
for _, rel := range o.cfg.LocalBootstrapPaths {
|
|
full := filepath.Join(o.cfg.IACRepoPath, rel)
|
|
o.log.Printf("local bootstrap apply rel=%s path=%s", rel, full)
|
|
if o.runner.DryRun {
|
|
successes++
|
|
continue
|
|
}
|
|
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
|
|
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
|
|
o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full)
|
|
if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil {
|
|
o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr)
|
|
o.log.Printf("local bootstrap cache apply for rel=%s", rel)
|
|
if cacheErr := o.applyBootstrapCache(ctx, rel); cacheErr != nil {
|
|
failures++
|
|
o.log.Printf("warning: local bootstrap cache apply failed for rel=%s: %v", rel, cacheErr)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
successes++
|
|
}
|
|
if failures > 0 && successes == 0 {
|
|
return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// applyKustomizeFallback runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error {
|
|
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full)
|
|
if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// syncLocalIACRepo runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error {
|
|
repo := strings.TrimSpace(o.cfg.IACRepoPath)
|
|
if repo == "" {
|
|
return fmt.Errorf("iac repo path is empty")
|
|
}
|
|
gitDir := filepath.Join(repo, ".git")
|
|
if stat, err := os.Stat(gitDir); err != nil || !stat.IsDir() {
|
|
return fmt.Errorf("iac repo %s is not a git checkout", repo)
|
|
}
|
|
statusOut, statusErr := o.runSensitive(ctx, 10*time.Second, "git", "-C", repo, "status", "--porcelain")
|
|
if statusErr != nil {
|
|
return fmt.Errorf("inspect iac repo working tree: %w", statusErr)
|
|
}
|
|
if strings.TrimSpace(statusOut) != "" {
|
|
o.log.Printf("warning: skipping local titan-iac sync because working tree is dirty")
|
|
return nil
|
|
}
|
|
branch := strings.TrimSpace(o.cfg.ExpectedFluxBranch)
|
|
if branch == "" {
|
|
branch = "main"
|
|
}
|
|
if _, err := o.runSensitive(ctx, 45*time.Second, "git", "-C", repo, "fetch", "origin", "--prune"); err != nil {
|
|
return fmt.Errorf("git fetch origin: %w", err)
|
|
}
|
|
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "checkout", branch); err != nil {
|
|
return fmt.Errorf("git checkout %s: %w", branch, err)
|
|
}
|
|
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "reset", "--hard", "origin/"+branch); err != nil {
|
|
return fmt.Errorf("git reset --hard origin/%s: %w", branch, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// refreshBootstrapCache runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error {
|
|
if len(o.cfg.LocalBootstrapPaths) == 0 {
|
|
return nil
|
|
}
|
|
if err := os.MkdirAll(o.bootstrapCacheDir(), 0o755); err != nil {
|
|
return fmt.Errorf("ensure bootstrap cache dir: %w", err)
|
|
}
|
|
rendered := 0
|
|
for _, rel := range o.cfg.LocalBootstrapPaths {
|
|
rel = strings.TrimSpace(rel)
|
|
if rel == "" {
|
|
continue
|
|
}
|
|
full := filepath.Join(o.cfg.IACRepoPath, rel)
|
|
if stat, err := os.Stat(full); err != nil || !stat.IsDir() {
|
|
o.log.Printf("warning: skip bootstrap cache render for rel=%s (path missing)", rel)
|
|
continue
|
|
}
|
|
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q", full)
|
|
manifest, err := o.runSensitive(ctx, 2*time.Minute, "sh", "-lc", cmd)
|
|
if err != nil {
|
|
o.log.Printf("warning: bootstrap cache render failed for rel=%s: %v", rel, err)
|
|
continue
|
|
}
|
|
cachePath := o.bootstrapCachePath(rel)
|
|
if err := os.WriteFile(cachePath, []byte(manifest+"\n"), 0o644); err != nil {
|
|
o.log.Printf("warning: bootstrap cache write failed for rel=%s path=%s: %v", rel, cachePath, err)
|
|
continue
|
|
}
|
|
rendered++
|
|
}
|
|
if rendered == 0 {
|
|
return fmt.Errorf("no bootstrap cache manifests rendered")
|
|
}
|
|
o.log.Printf("bootstrap cache refreshed (%d paths)", rendered)
|
|
return nil
|
|
}
|
|
|
|
// applyBootstrapCache runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error {
|
|
cachePath := o.bootstrapCachePath(rel)
|
|
if _, err := os.Stat(cachePath); err != nil {
|
|
return fmt.Errorf("bootstrap cache missing at %s: %w", cachePath, err)
|
|
}
|
|
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-f", cachePath); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// bootstrapCacheDir runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) bootstrapCacheDir() string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) bootstrapCacheDir() string {
|
|
return filepath.Join(o.cfg.State.Dir, "bootstrap-cache")
|
|
}
|
|
|
|
// bootstrapCachePath runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) bootstrapCachePath(rel string) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) bootstrapCachePath(rel string) string {
|
|
safe := strings.TrimSpace(rel)
|
|
safe = strings.ReplaceAll(safe, "/", "__")
|
|
safe = strings.ReplaceAll(safe, string(os.PathSeparator), "__")
|
|
return filepath.Join(o.bootstrapCacheDir(), safe+".yaml")
|
|
}
|
|
|
|
// waitForFluxSourceReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) {
|
|
if o.runner.DryRun {
|
|
return true, nil
|
|
}
|
|
deadline := time.Now().Add(window)
|
|
for {
|
|
ready, err := o.fluxSourceReady(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if ready {
|
|
return true, nil
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return false, nil
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|