package cluster import ( "context" "fmt" "os" "path/filepath" "sort" "strings" "sync" "time" ) // reconcileNodeAccess runs one orchestration or CLI step. // Signature: (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error { if len(nodes) == 0 { return nil } parallelism := o.cfg.Shutdown.SSHParallelism if parallelism <= 0 { parallelism = 8 } if parallelism > len(nodes) { parallelism = len(nodes) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, len(nodes)) for _, node := range nodes { node := strings.TrimSpace(node) if node == "" || !o.sshManaged(node) { continue } wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() if _, err := o.ssh(ctx, node, "sudo -n /usr/bin/systemctl --version"); err != nil { errCh <- fmt.Errorf("%s: missing sudo access to /usr/bin/systemctl (--version): %w", node, err) } }() } wg.Wait() close(errCh) if len(errCh) == 0 { return nil } samples := []string{} for err := range errCh { samples = append(samples, err.Error()) if len(samples) >= 4 { break } } return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | ")) } // waitForNodeSSHAuth runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error { if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth { return nil } wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second if wait <= 0 { wait = 4 * time.Minute } poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) seen := map[string]struct{}{} targets := make([]string, 0, len(nodes)) for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if _, skip := ignored[node]; skip { continue } if _, ok := seen[node]; ok { continue } seen[node] = struct{}{} targets = append(targets, node) } sort.Strings(targets) if len(targets) == 0 { return nil } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} for { authDenied := []string{} pending := []string{} for _, node := range targets { if !o.sshManaged(node) { authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node)) continue } out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second) if err != nil { detail := strings.TrimSpace(err.Error()) full := strings.ToLower(strings.TrimSpace(detail + " " + out)) switch { case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"): authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node)) default: pending = append(pending, fmt.Sprintf("%s(unreachable)", node)) } continue } if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") { pending = append(pending, fmt.Sprintf("%s(unexpected output)", node)) } } if len(authDenied) > 0 { sort.Strings(authDenied) return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8)) } if len(pending) == 0 { return nil } sort.Strings(pending) lastFailure = joinLimited(pending, 8) if time.Now().After(deadline) { return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure) } if time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // fluxSourceReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") if err != nil { return false, err } return strings.Contains(out, "True"), nil } // reportFluxSource runs one orchestration or CLI step. // Signature: (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) { urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}") if urlErr == nil { currentURL := strings.TrimSpace(urlOut) o.log.Printf("flux-source-url=%s", currentURL) expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource) if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) { o.log.Printf("warning: flux source URL is %q, expected %q", currentURL, expectedURL) } } branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}") if branchErr == nil { branch := strings.TrimSpace(branchOut) o.log.Printf("flux-source-branch=%s", branch) if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch { o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch) } } } // guardFluxSourceDrift runs one orchestration or CLI step. // Signature: (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error { urlOut, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}", ) if err != nil { if isNotFoundErr(err) { o.log.Printf("warning: flux gitrepository/flux-system not found while checking source drift") return nil } return fmt.Errorf("read flux source url: %w", err) } currentURL := strings.TrimSpace(urlOut) expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource) if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) { return fmt.Errorf("startup blocked: flux source url drift detected (current=%q expected=%q)", currentURL, expectedURL) } branchOut, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}", ) if err != nil { return fmt.Errorf("read flux source branch: %w", err) } currentBranch := strings.TrimSpace(branchOut) if expectedBranch == "" || currentBranch == expectedBranch || allowBranchPatch { return nil } return fmt.Errorf("startup blocked: flux source branch drift detected (current=%q expected=%q)", currentBranch, expectedBranch) } // ensureFluxBranch runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error { branch = strings.TrimSpace(branch) if branch == "" { return nil } out, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}", ) if err != nil { if isNotFoundErr(err) { o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch) return nil } return fmt.Errorf("read flux source branch: %w", err) } current := strings.TrimSpace(out) if current == branch { return nil } if !allowPatch { return fmt.Errorf("startup blocked: flux source branch is %q but expected %q (use --force-flux-branch to patch intentionally)", current, branch) } patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch) if _, err := o.kubectl( ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch, ); err != nil { return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err) } o.log.Printf("updated flux source branch from %q to %q", current, branch) return nil } // normalizeGitURL runs one orchestration or CLI step. // Signature: normalizeGitURL(raw string) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func normalizeGitURL(raw string) string { raw = strings.TrimSpace(strings.ToLower(raw)) raw = strings.TrimSuffix(raw, "/") raw = strings.TrimSuffix(raw, ".git") return raw } // bootstrapLocal runs one orchestration or CLI step. // Signature: (o *Orchestrator) bootstrapLocal(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { failures := 0 successes := 0 for _, rel := range o.cfg.LocalBootstrapPaths { full := filepath.Join(o.cfg.IACRepoPath, rel) o.log.Printf("local bootstrap apply rel=%s path=%s", rel, full) if o.runner.DryRun { successes++ continue } if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full) if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil { o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr) o.log.Printf("local bootstrap cache apply for rel=%s", rel) if cacheErr := o.applyBootstrapCache(ctx, rel); cacheErr != nil { failures++ o.log.Printf("warning: local bootstrap cache apply failed for rel=%s: %v", rel, cacheErr) continue } } } successes++ } if failures > 0 && successes == 0 { return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures) } return nil } // applyKustomizeFallback runs one orchestration or CLI step. // Signature: (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error { cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full) if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil { return err } return nil } // syncLocalIACRepo runs one orchestration or CLI step. // Signature: (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error { repo := strings.TrimSpace(o.cfg.IACRepoPath) if repo == "" { return fmt.Errorf("iac repo path is empty") } gitDir := filepath.Join(repo, ".git") if stat, err := os.Stat(gitDir); err != nil || !stat.IsDir() { return fmt.Errorf("iac repo %s is not a git checkout", repo) } statusOut, statusErr := o.runSensitive(ctx, 10*time.Second, "git", "-C", repo, "status", "--porcelain") if statusErr != nil { return fmt.Errorf("inspect iac repo working tree: %w", statusErr) } if strings.TrimSpace(statusOut) != "" { o.log.Printf("warning: skipping local titan-iac sync because working tree is dirty") return nil } branch := strings.TrimSpace(o.cfg.ExpectedFluxBranch) if branch == "" { branch = "main" } if _, err := o.runSensitive(ctx, 45*time.Second, "git", "-C", repo, "fetch", "origin", "--prune"); err != nil { return fmt.Errorf("git fetch origin: %w", err) } if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "checkout", branch); err != nil { return fmt.Errorf("git checkout %s: %w", branch, err) } if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "reset", "--hard", "origin/"+branch); err != nil { return fmt.Errorf("git reset --hard origin/%s: %w", branch, err) } return nil } // refreshBootstrapCache runs one orchestration or CLI step. // Signature: (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error { if len(o.cfg.LocalBootstrapPaths) == 0 { return nil } if err := os.MkdirAll(o.bootstrapCacheDir(), 0o755); err != nil { return fmt.Errorf("ensure bootstrap cache dir: %w", err) } rendered := 0 for _, rel := range o.cfg.LocalBootstrapPaths { rel = strings.TrimSpace(rel) if rel == "" { continue } full := filepath.Join(o.cfg.IACRepoPath, rel) if stat, err := os.Stat(full); err != nil || !stat.IsDir() { o.log.Printf("warning: skip bootstrap cache render for rel=%s (path missing)", rel) continue } cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q", full) manifest, err := o.runSensitive(ctx, 2*time.Minute, "sh", "-lc", cmd) if err != nil { o.log.Printf("warning: bootstrap cache render failed for rel=%s: %v", rel, err) continue } cachePath := o.bootstrapCachePath(rel) if err := os.WriteFile(cachePath, []byte(manifest+"\n"), 0o644); err != nil { o.log.Printf("warning: bootstrap cache write failed for rel=%s path=%s: %v", rel, cachePath, err) continue } rendered++ } if rendered == 0 { return fmt.Errorf("no bootstrap cache manifests rendered") } o.log.Printf("bootstrap cache refreshed (%d paths)", rendered) return nil } // applyBootstrapCache runs one orchestration or CLI step. // Signature: (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error { cachePath := o.bootstrapCachePath(rel) if _, err := os.Stat(cachePath); err != nil { return fmt.Errorf("bootstrap cache missing at %s: %w", cachePath, err) } if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-f", cachePath); err != nil { return err } return nil } // bootstrapCacheDir runs one orchestration or CLI step. // Signature: (o *Orchestrator) bootstrapCacheDir() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) bootstrapCacheDir() string { return filepath.Join(o.cfg.State.Dir, "bootstrap-cache") } // bootstrapCachePath runs one orchestration or CLI step. // Signature: (o *Orchestrator) bootstrapCachePath(rel string) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) bootstrapCachePath(rel string) string { safe := strings.TrimSpace(rel) safe = strings.ReplaceAll(safe, "/", "__") safe = strings.ReplaceAll(safe, string(os.PathSeparator), "__") return filepath.Join(o.bootstrapCacheDir(), safe+".yaml") } // waitForFluxSourceReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) { if o.runner.DryRun { return true, nil } deadline := time.Now().Add(window) for { ready, err := o.fluxSourceReady(ctx) if err != nil { return false, err } if ready { return true, nil } if time.Now().After(deadline) { return false, nil } select { case <-ctx.Done(): return false, ctx.Err() case <-time.After(5 * time.Second): } } }