ananke/internal/cluster/orchestrator_access_fluxsource.go

464 lines
17 KiB
Go
Raw Permalink Normal View History

package cluster
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
)
// reconcileNodeAccess runs one orchestration or CLI step.
// Signature: (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error {
if len(nodes) == 0 {
return nil
}
parallelism := o.cfg.Shutdown.SSHParallelism
if parallelism <= 0 {
parallelism = 8
}
if parallelism > len(nodes) {
parallelism = len(nodes)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan error, len(nodes))
for _, node := range nodes {
node := strings.TrimSpace(node)
if node == "" || !o.sshManaged(node) {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if _, err := o.ssh(ctx, node, "sudo -n /usr/bin/systemctl --version"); err != nil {
errCh <- fmt.Errorf("%s: missing sudo access to /usr/bin/systemctl (--version): %w", node, err)
}
}()
}
wg.Wait()
close(errCh)
if len(errCh) == 0 {
return nil
}
samples := []string{}
for err := range errCh {
samples = append(samples, err.Error())
if len(samples) >= 4 {
break
}
}
return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | "))
}
// waitForNodeSSHAuth runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error {
if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth {
return nil
}
wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second
if wait <= 0 {
wait = 4 * time.Minute
}
poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
seen := map[string]struct{}{}
targets := make([]string, 0, len(nodes))
for _, node := range startupRequiredNodes(nodes, o.cfg.Startup.NodeSSHAuthRequiredNodes) {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if _, skip := ignored[node]; skip {
continue
}
if _, ok := seen[node]; ok {
continue
}
seen[node] = struct{}{}
targets = append(targets, node)
}
sort.Strings(targets)
if len(targets) == 0 {
return nil
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
for {
authDenied := []string{}
pending := []string{}
for _, node := range targets {
if !o.sshManaged(node) {
authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node))
continue
}
out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second)
if err != nil {
detail := strings.TrimSpace(err.Error())
full := strings.ToLower(strings.TrimSpace(detail + " " + out))
switch {
case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"):
authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node))
default:
pending = append(pending, fmt.Sprintf("%s(unreachable)", node))
}
continue
}
if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") {
pending = append(pending, fmt.Sprintf("%s(unexpected output)", node))
}
}
if len(authDenied) > 0 {
sort.Strings(authDenied)
return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8))
}
if len(pending) == 0 {
return nil
}
sort.Strings(pending)
lastFailure = joinLimited(pending, 8)
if time.Now().After(deadline) {
return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure)
}
if time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure)
lastLogged = time.Now()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// fluxSourceReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
if err != nil {
return false, err
}
return strings.Contains(out, "True"), nil
}
// reportFluxSource runs one orchestration or CLI step.
// Signature: (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) {
urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}")
if urlErr == nil {
currentURL := strings.TrimSpace(urlOut)
o.log.Printf("flux-source-url=%s", currentURL)
expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource)
if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) {
o.log.Printf("warning: flux source URL is %q, expected %q", currentURL, expectedURL)
}
}
branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}")
if branchErr == nil {
branch := strings.TrimSpace(branchOut)
o.log.Printf("flux-source-branch=%s", branch)
if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch {
o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch)
}
}
}
// guardFluxSourceDrift runs one orchestration or CLI step.
// Signature: (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error {
urlOut, err := o.kubectl(
ctx,
10*time.Second,
"-n", "flux-system",
"get", "gitrepository", "flux-system",
"-o", "jsonpath={.spec.url}",
)
if err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: flux gitrepository/flux-system not found while checking source drift")
return nil
}
return fmt.Errorf("read flux source url: %w", err)
}
currentURL := strings.TrimSpace(urlOut)
expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource)
if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) {
return fmt.Errorf("startup blocked: flux source url drift detected (current=%q expected=%q)", currentURL, expectedURL)
}
branchOut, err := o.kubectl(
ctx,
10*time.Second,
"-n", "flux-system",
"get", "gitrepository", "flux-system",
"-o", "jsonpath={.spec.ref.branch}",
)
if err != nil {
return fmt.Errorf("read flux source branch: %w", err)
}
currentBranch := strings.TrimSpace(branchOut)
if expectedBranch == "" || currentBranch == expectedBranch || allowBranchPatch {
return nil
}
return fmt.Errorf("startup blocked: flux source branch drift detected (current=%q expected=%q)", currentBranch, expectedBranch)
}
// ensureFluxBranch runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error {
branch = strings.TrimSpace(branch)
if branch == "" {
return nil
}
out, err := o.kubectl(
ctx,
10*time.Second,
"-n", "flux-system",
"get", "gitrepository", "flux-system",
"-o", "jsonpath={.spec.ref.branch}",
)
if err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch)
return nil
}
return fmt.Errorf("read flux source branch: %w", err)
}
current := strings.TrimSpace(out)
if current == branch {
return nil
}
if !allowPatch {
return fmt.Errorf("startup blocked: flux source branch is %q but expected %q (use --force-flux-branch to patch intentionally)", current, branch)
}
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch)
if _, err := o.kubectl(
ctx,
20*time.Second,
"-n", "flux-system",
"patch", "gitrepository", "flux-system",
"--type=merge",
"-p", patch,
); err != nil {
return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err)
}
o.log.Printf("updated flux source branch from %q to %q", current, branch)
return nil
}
// normalizeGitURL runs one orchestration or CLI step.
// Signature: normalizeGitURL(raw string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func normalizeGitURL(raw string) string {
raw = strings.TrimSpace(strings.ToLower(raw))
raw = strings.TrimSuffix(raw, "/")
raw = strings.TrimSuffix(raw, ".git")
return raw
}
// bootstrapLocal runs one orchestration or CLI step.
// Signature: (o *Orchestrator) bootstrapLocal(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
failures := 0
successes := 0
for _, rel := range o.cfg.LocalBootstrapPaths {
full := filepath.Join(o.cfg.IACRepoPath, rel)
o.log.Printf("local bootstrap apply rel=%s path=%s", rel, full)
if o.runner.DryRun {
successes++
continue
}
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full)
if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil {
o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr)
o.log.Printf("local bootstrap cache apply for rel=%s", rel)
if cacheErr := o.applyBootstrapCache(ctx, rel); cacheErr != nil {
failures++
o.log.Printf("warning: local bootstrap cache apply failed for rel=%s: %v", rel, cacheErr)
continue
}
}
}
successes++
}
if failures > 0 && successes == 0 {
return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures)
}
return nil
}
// applyKustomizeFallback runs one orchestration or CLI step.
// Signature: (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error {
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full)
if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil {
return err
}
return nil
}
// syncLocalIACRepo runs one orchestration or CLI step.
// Signature: (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error {
repo := strings.TrimSpace(o.cfg.IACRepoPath)
if repo == "" {
return fmt.Errorf("iac repo path is empty")
}
gitDir := filepath.Join(repo, ".git")
if stat, err := os.Stat(gitDir); err != nil || !stat.IsDir() {
return fmt.Errorf("iac repo %s is not a git checkout", repo)
}
statusOut, statusErr := o.runSensitive(ctx, 10*time.Second, "git", "-C", repo, "status", "--porcelain")
if statusErr != nil {
return fmt.Errorf("inspect iac repo working tree: %w", statusErr)
}
if strings.TrimSpace(statusOut) != "" {
o.log.Printf("warning: skipping local titan-iac sync because working tree is dirty")
return nil
}
branch := strings.TrimSpace(o.cfg.ExpectedFluxBranch)
if branch == "" {
branch = "main"
}
if _, err := o.runSensitive(ctx, 45*time.Second, "git", "-C", repo, "fetch", "origin", "--prune"); err != nil {
return fmt.Errorf("git fetch origin: %w", err)
}
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "checkout", branch); err != nil {
return fmt.Errorf("git checkout %s: %w", branch, err)
}
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "reset", "--hard", "origin/"+branch); err != nil {
return fmt.Errorf("git reset --hard origin/%s: %w", branch, err)
}
return nil
}
// refreshBootstrapCache runs one orchestration or CLI step.
// Signature: (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error {
if len(o.cfg.LocalBootstrapPaths) == 0 {
return nil
}
if err := os.MkdirAll(o.bootstrapCacheDir(), 0o755); err != nil {
return fmt.Errorf("ensure bootstrap cache dir: %w", err)
}
rendered := 0
for _, rel := range o.cfg.LocalBootstrapPaths {
rel = strings.TrimSpace(rel)
if rel == "" {
continue
}
full := filepath.Join(o.cfg.IACRepoPath, rel)
if stat, err := os.Stat(full); err != nil || !stat.IsDir() {
o.log.Printf("warning: skip bootstrap cache render for rel=%s (path missing)", rel)
continue
}
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q", full)
manifest, err := o.runSensitive(ctx, 2*time.Minute, "sh", "-lc", cmd)
if err != nil {
o.log.Printf("warning: bootstrap cache render failed for rel=%s: %v", rel, err)
continue
}
cachePath := o.bootstrapCachePath(rel)
if err := os.WriteFile(cachePath, []byte(manifest+"\n"), 0o644); err != nil {
o.log.Printf("warning: bootstrap cache write failed for rel=%s path=%s: %v", rel, cachePath, err)
continue
}
rendered++
}
if rendered == 0 {
return fmt.Errorf("no bootstrap cache manifests rendered")
}
o.log.Printf("bootstrap cache refreshed (%d paths)", rendered)
return nil
}
// applyBootstrapCache runs one orchestration or CLI step.
// Signature: (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error {
cachePath := o.bootstrapCachePath(rel)
if _, err := os.Stat(cachePath); err != nil {
return fmt.Errorf("bootstrap cache missing at %s: %w", cachePath, err)
}
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-f", cachePath); err != nil {
return err
}
return nil
}
// bootstrapCacheDir runs one orchestration or CLI step.
// Signature: (o *Orchestrator) bootstrapCacheDir() string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) bootstrapCacheDir() string {
return filepath.Join(o.cfg.State.Dir, "bootstrap-cache")
}
// bootstrapCachePath runs one orchestration or CLI step.
// Signature: (o *Orchestrator) bootstrapCachePath(rel string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) bootstrapCachePath(rel string) string {
safe := strings.TrimSpace(rel)
safe = strings.ReplaceAll(safe, "/", "__")
safe = strings.ReplaceAll(safe, string(os.PathSeparator), "__")
return filepath.Join(o.bootstrapCacheDir(), safe+".yaml")
}
// waitForFluxSourceReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) {
if o.runner.DryRun {
return true, nil
}
deadline := time.Now().Add(window)
for {
ready, err := o.fluxSourceReady(ctx)
if err != nil {
return false, err
}
if ready {
return true, nil
}
if time.Now().After(deadline) {
return false, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(5 * time.Second):
}
}
}