package cluster import ( "context" "fmt" "net" neturl "net/url" "sort" "strings" "time" ) // waitForTimeSync runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error { if o.runner.DryRun { return nil } wait := time.Duration(o.cfg.Startup.TimeSyncWaitSeconds) * time.Second if wait <= 0 { wait = 240 * time.Second } poll := time.Duration(o.cfg.Startup.TimeSyncPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } mode := strings.ToLower(strings.TrimSpace(o.cfg.Startup.TimeSyncMode)) if mode == "" { mode = "strict" } managedControlPlanes := 0 for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if o.sshManaged(node) { managedControlPlanes++ } } requiredQuorum := o.cfg.Startup.TimeSyncQuorum if requiredQuorum <= 0 { requiredQuorum = managedControlPlanes if requiredQuorum <= 0 { requiredQuorum = 1 } } if requiredQuorum > managedControlPlanes && managedControlPlanes > 0 { requiredQuorum = managedControlPlanes } deadline := time.Now().Add(wait) for { unsynced := []string{} syncedControlPlanes := 0 checkedControlPlanes := 0 localOut, localErr := o.run(ctx, 10*time.Second, "sh", "-lc", "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown") localSynced := localErr == nil && isTimeSynced(localOut) if !localSynced { if localErr != nil { unsynced = append(unsynced, fmt.Sprintf("local(%v)", localErr)) } else { unsynced = append(unsynced, fmt.Sprintf("local(%s)", strings.TrimSpace(localOut))) } } for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if !o.sshManaged(node) { continue } checkedControlPlanes++ out, err := o.ssh(ctx, node, "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown") if err != nil || !isTimeSynced(out) { if err != nil { unsynced = append(unsynced, fmt.Sprintf("%s(%v)", node, err)) } else { unsynced = append(unsynced, fmt.Sprintf("%s(%s)", node, strings.TrimSpace(out))) } } else { syncedControlPlanes++ } } ready := false switch mode { case "quorum": if localSynced && syncedControlPlanes >= requiredQuorum { ready = true } default: if localSynced && len(unsynced) == 0 { ready = true } } if ready { return nil } if time.Now().After(deadline) { if mode == "quorum" { return fmt.Errorf( "startup blocked: time sync quorum not ready within %s (mode=quorum local_synced=%t synced_control_planes=%d required=%d checked=%d details=%s)", wait, localSynced, syncedControlPlanes, requiredQuorum, checkedControlPlanes, strings.Join(unsynced, ", "), ) } return fmt.Errorf("startup blocked: time sync not ready within %s (%s)", wait, strings.Join(unsynced, ", ")) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // isTimeSynced runs one orchestration or CLI step. // Signature: isTimeSynced(raw string) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func isTimeSynced(raw string) bool { v := strings.ToLower(strings.TrimSpace(raw)) return v == "yes" || v == "true" || v == "1" } // preflightExternalDatastore runs one orchestration or CLI step. // Signature: (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error { if len(o.cfg.ControlPlanes) == 0 { return nil } controlPlane := strings.TrimSpace(o.cfg.ControlPlanes[0]) if controlPlane == "" || !o.sshManaged(controlPlane) { return nil } unitOut, err := o.ssh(ctx, controlPlane, "sudo systemctl cat k3s") if err != nil { o.log.Printf("warning: external datastore preflight skipped: unable to inspect %s k3s unit: %v", controlPlane, err) return nil } datastoreEndpoint := parseDatastoreEndpoint(unitOut) if datastoreEndpoint == "" { return nil } u, err := neturl.Parse(datastoreEndpoint) if err != nil || u.Host == "" { o.log.Printf("warning: external datastore preflight skipped: unable to parse datastore endpoint %q", datastoreEndpoint) return nil } host := strings.TrimSpace(u.Hostname()) port := strings.TrimSpace(u.Port()) if port == "" { port = "5432" } address := net.JoinHostPort(host, port) if o.tcpReachable(address, 3*time.Second) { return nil } o.log.Printf("warning: datastore endpoint %s is unreachable; attempting software recovery", address) if node := o.nodeNameForHost(host); node != "" && o.sshManaged(node) { o.bestEffort("restart datastore service on "+node, func() error { _, err := o.ssh(ctx, node, "sudo systemctl restart postgresql || sudo systemctl restart postgresql@16-main || sudo systemctl restart postgres") return err }) } deadline := time.Now().Add(90 * time.Second) for time.Now().Before(deadline) { if o.tcpReachable(address, 3*time.Second) { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(3 * time.Second): } } return fmt.Errorf("startup blocked: external datastore endpoint %s remained unreachable after recovery attempt", address) } // parseDatastoreEndpoint runs one orchestration or CLI step. // Signature: parseDatastoreEndpoint(unitText string) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func parseDatastoreEndpoint(unitText string) string { if match := datastoreEndpointPattern.FindStringSubmatch(unitText); len(match) == 4 { for _, candidate := range match[1:] { value := strings.TrimSpace(candidate) if value != "" { return value } } } for _, raw := range strings.Split(unitText, "\n") { line := strings.TrimSpace(raw) idx := strings.Index(line, "--datastore-endpoint") if idx < 0 { continue } value := strings.TrimSpace(line[idx+len("--datastore-endpoint"):]) value = strings.TrimSpace(strings.TrimPrefix(value, "=")) value = strings.TrimSuffix(strings.TrimSpace(value), "\\") value = strings.Trim(value, `"'`) if value != "" { return value } } return "" } // nodeNameForHost runs one orchestration or CLI step. // Signature: (o *Orchestrator) nodeNameForHost(host string) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) nodeNameForHost(host string) string { host = strings.TrimSpace(host) if host == "" { return "" } if _, ok := o.cfg.SSHNodeHosts[host]; ok { return host } for node, mapped := range o.cfg.SSHNodeHosts { if strings.TrimSpace(mapped) == host { return strings.TrimSpace(node) } } return "" } // inventoryNodesForValidation runs one orchestration or CLI step. // Signature: (o *Orchestrator) inventoryNodesForValidation() []string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) inventoryNodesForValidation() []string { set := map[string]struct{}{} add := func(node string) { node = strings.TrimSpace(node) if node == "" { return } set[node] = struct{}{} } for _, n := range o.cfg.ControlPlanes { add(n) } for _, n := range o.cfg.Workers { add(n) } for _, n := range o.cfg.SSHManagedNodes { add(n) } for _, n := range o.cfg.Coordination.PeerHosts { add(n) } add(o.cfg.Coordination.ForwardShutdownHost) nodes := make([]string, 0, len(set)) for n := range set { nodes = append(nodes, n) } sort.Strings(nodes) return nodes } // validateNodeInventory runs one orchestration or CLI step. // Signature: (o *Orchestrator) validateNodeInventory() error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) validateNodeInventory() error { issues := []string{} if o.cfg.SSHPort <= 0 || o.cfg.SSHPort > 65535 { issues = append(issues, fmt.Sprintf("ssh_port=%d is invalid", o.cfg.SSHPort)) } managed := makeStringSet(o.cfg.SSHManagedNodes) for _, cp := range o.cfg.ControlPlanes { cp = strings.TrimSpace(cp) if cp == "" { continue } if _, ok := managed[cp]; !ok { issues = append(issues, fmt.Sprintf("control plane %s is missing from ssh_managed_nodes", cp)) } } for _, node := range o.cfg.Workers { node = strings.TrimSpace(node) if node == "" { continue } if _, ok := managed[node]; !ok { issues = append(issues, fmt.Sprintf("worker %s is missing from ssh_managed_nodes", node)) } } baseUser := strings.TrimSpace(o.cfg.SSHUser) for _, node := range o.inventoryNodesForValidation() { if _, ok := o.cfg.SSHNodeHosts[node]; !ok { issues = append(issues, fmt.Sprintf("%s is missing ssh_node_hosts entry", node)) } host := strings.TrimSpace(o.cfg.SSHNodeHosts[node]) if host == "" { host = node } if strings.ContainsAny(host, " \t\r\n") { issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains whitespace)", node, host)) } if strings.Contains(host, "/") { issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains slash)", node, host)) } user := baseUser if override, ok := o.cfg.SSHNodeUsers[node]; ok { user = strings.TrimSpace(override) } if user == "" { issues = append(issues, fmt.Sprintf("%s has empty ssh user (ssh_user/ssh_node_users)", node)) } if strings.ContainsAny(user, " \t\r\n@") { issues = append(issues, fmt.Sprintf("%s has invalid ssh user %q", node, user)) } } if len(issues) > 0 { sort.Strings(issues) return fmt.Errorf("node inventory preflight failed: %s", joinLimited(issues, 10)) } return nil } // tcpReachable runs one orchestration or CLI step. // Signature: (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool { conn, err := net.DialTimeout("tcp", address, timeout) if err != nil { return false } _ = conn.Close() return true }