package cluster import ( "context" "fmt" "sort" "strings" "time" ) // waitForNodeInventoryReachability runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForNodeInventoryReachability(ctx context.Context) error. // Why: Startup deadlocks often begin when the coordinator races ahead while one or // more expected hosts are still unavailable; this gate blocks bootstrap until // inventory hosts are actually reachable and authenticated over SSH. func (o *Orchestrator) waitForNodeInventoryReachability(ctx context.Context) error { if o.runner.DryRun || !o.cfg.Startup.RequireNodeInventoryReach { return nil } wait := time.Duration(o.cfg.Startup.NodeInventoryReachWaitSeconds) * time.Second if wait <= 0 { wait = 5 * time.Minute } poll := time.Duration(o.cfg.Startup.NodeInventoryReachPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) targets := make([]string, 0, len(o.inventoryNodesForValidation())) seen := map[string]struct{}{} for _, node := range startupRequiredNodes(o.inventoryNodesForValidation(), o.cfg.Startup.NodeInventoryReachRequiredNodes) { node = strings.TrimSpace(node) if node == "" { continue } if _, skip := ignored[node]; skip { continue } if _, ok := seen[node]; ok { continue } seen[node] = struct{}{} targets = append(targets, node) } sort.Strings(targets) if len(targets) == 0 { return nil } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} for { authDenied := []string{} pending := []string{} for _, node := range targets { if !o.sshManaged(node) { authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node)) continue } out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_NODE_REACHABLE__", 12*time.Second) if err != nil { detail := strings.ToLower(strings.TrimSpace(err.Error() + " " + out)) switch { case strings.Contains(detail, "permission denied"), strings.Contains(detail, "publickey"), strings.Contains(detail, "authentication"): authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node)) default: pending = append(pending, fmt.Sprintf("%s(unreachable)", node)) } continue } if !strings.Contains(out, "__ANANKE_NODE_REACHABLE__") { pending = append(pending, fmt.Sprintf("%s(unexpected output)", node)) } } if len(authDenied) > 0 { sort.Strings(authDenied) return fmt.Errorf("node inventory reachability gate failed: %s", joinLimited(authDenied, 10)) } if len(pending) == 0 { return nil } sort.Strings(pending) lastFailure = joinLimited(pending, 10) if time.Now().After(deadline) { return fmt.Errorf("node inventory reachability gate did not pass within %s (%s)", wait, lastFailure) } if time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for node inventory reachability (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } }