ananke/internal/cluster/orchestrator_timesync_inventory.go

347 lines
10 KiB
Go
Raw Permalink Normal View History

package cluster
import (
"context"
"fmt"
"net"
neturl "net/url"
"sort"
"strings"
"time"
)
// waitForTimeSync runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error {
if o.runner.DryRun {
return nil
}
wait := time.Duration(o.cfg.Startup.TimeSyncWaitSeconds) * time.Second
if wait <= 0 {
wait = 240 * time.Second
}
poll := time.Duration(o.cfg.Startup.TimeSyncPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
mode := strings.ToLower(strings.TrimSpace(o.cfg.Startup.TimeSyncMode))
if mode == "" {
mode = "strict"
}
managedControlPlanes := 0
for _, node := range nodes {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if o.sshManaged(node) {
managedControlPlanes++
}
}
requiredQuorum := o.cfg.Startup.TimeSyncQuorum
if requiredQuorum <= 0 {
requiredQuorum = managedControlPlanes
if requiredQuorum <= 0 {
requiredQuorum = 1
}
}
if requiredQuorum > managedControlPlanes && managedControlPlanes > 0 {
requiredQuorum = managedControlPlanes
}
deadline := time.Now().Add(wait)
for {
unsynced := []string{}
syncedControlPlanes := 0
checkedControlPlanes := 0
localOut, localErr := o.run(ctx, 10*time.Second, "sh", "-lc", "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown")
localSynced := localErr == nil && isTimeSynced(localOut)
if !localSynced {
if localErr != nil {
unsynced = append(unsynced, fmt.Sprintf("local(%v)", localErr))
} else {
unsynced = append(unsynced, fmt.Sprintf("local(%s)", strings.TrimSpace(localOut)))
}
}
for _, node := range nodes {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if !o.sshManaged(node) {
continue
}
checkedControlPlanes++
out, err := o.ssh(ctx, node, "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown")
if err != nil || !isTimeSynced(out) {
if err != nil {
unsynced = append(unsynced, fmt.Sprintf("%s(%v)", node, err))
} else {
unsynced = append(unsynced, fmt.Sprintf("%s(%s)", node, strings.TrimSpace(out)))
}
} else {
syncedControlPlanes++
}
}
ready := false
switch mode {
case "quorum":
if localSynced && syncedControlPlanes >= requiredQuorum {
ready = true
}
default:
if localSynced && len(unsynced) == 0 {
ready = true
}
}
if ready {
return nil
}
if time.Now().After(deadline) {
if mode == "quorum" {
return fmt.Errorf(
"startup blocked: time sync quorum not ready within %s (mode=quorum local_synced=%t synced_control_planes=%d required=%d checked=%d details=%s)",
wait,
localSynced,
syncedControlPlanes,
requiredQuorum,
checkedControlPlanes,
strings.Join(unsynced, ", "),
)
}
return fmt.Errorf("startup blocked: time sync not ready within %s (%s)", wait, strings.Join(unsynced, ", "))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// isTimeSynced runs one orchestration or CLI step.
// Signature: isTimeSynced(raw string) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func isTimeSynced(raw string) bool {
v := strings.ToLower(strings.TrimSpace(raw))
return v == "yes" || v == "true" || v == "1"
}
// preflightExternalDatastore runs one orchestration or CLI step.
// Signature: (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error {
if len(o.cfg.ControlPlanes) == 0 {
return nil
}
controlPlane := strings.TrimSpace(o.cfg.ControlPlanes[0])
if controlPlane == "" || !o.sshManaged(controlPlane) {
return nil
}
unitOut, err := o.ssh(ctx, controlPlane, "sudo systemctl cat k3s")
if err != nil {
o.log.Printf("warning: external datastore preflight skipped: unable to inspect %s k3s unit: %v", controlPlane, err)
return nil
}
datastoreEndpoint := parseDatastoreEndpoint(unitOut)
if datastoreEndpoint == "" {
return nil
}
u, err := neturl.Parse(datastoreEndpoint)
if err != nil || u.Host == "" {
o.log.Printf("warning: external datastore preflight skipped: unable to parse datastore endpoint %q", datastoreEndpoint)
return nil
}
host := strings.TrimSpace(u.Hostname())
port := strings.TrimSpace(u.Port())
if port == "" {
port = "5432"
}
address := net.JoinHostPort(host, port)
if o.tcpReachable(address, 3*time.Second) {
return nil
}
o.log.Printf("warning: datastore endpoint %s is unreachable; attempting software recovery", address)
if node := o.nodeNameForHost(host); node != "" && o.sshManaged(node) {
o.bestEffort("restart datastore service on "+node, func() error {
_, err := o.ssh(ctx, node, "sudo systemctl restart postgresql || sudo systemctl restart postgresql@16-main || sudo systemctl restart postgres")
return err
})
}
deadline := time.Now().Add(90 * time.Second)
for time.Now().Before(deadline) {
if o.tcpReachable(address, 3*time.Second) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
}
}
return fmt.Errorf("startup blocked: external datastore endpoint %s remained unreachable after recovery attempt", address)
}
// parseDatastoreEndpoint runs one orchestration or CLI step.
// Signature: parseDatastoreEndpoint(unitText string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func parseDatastoreEndpoint(unitText string) string {
if match := datastoreEndpointPattern.FindStringSubmatch(unitText); len(match) == 4 {
for _, candidate := range match[1:] {
value := strings.TrimSpace(candidate)
if value != "" {
return value
}
}
}
for _, raw := range strings.Split(unitText, "\n") {
line := strings.TrimSpace(raw)
idx := strings.Index(line, "--datastore-endpoint")
if idx < 0 {
continue
}
value := strings.TrimSpace(line[idx+len("--datastore-endpoint"):])
value = strings.TrimSpace(strings.TrimPrefix(value, "="))
value = strings.TrimSuffix(strings.TrimSpace(value), "\\")
value = strings.Trim(value, `"'`)
if value != "" {
return value
}
}
return ""
}
// nodeNameForHost runs one orchestration or CLI step.
// Signature: (o *Orchestrator) nodeNameForHost(host string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) nodeNameForHost(host string) string {
host = strings.TrimSpace(host)
if host == "" {
return ""
}
if _, ok := o.cfg.SSHNodeHosts[host]; ok {
return host
}
for node, mapped := range o.cfg.SSHNodeHosts {
if strings.TrimSpace(mapped) == host {
return strings.TrimSpace(node)
}
}
return ""
}
// inventoryNodesForValidation runs one orchestration or CLI step.
// Signature: (o *Orchestrator) inventoryNodesForValidation() []string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) inventoryNodesForValidation() []string {
set := map[string]struct{}{}
add := func(node string) {
node = strings.TrimSpace(node)
if node == "" {
return
}
set[node] = struct{}{}
}
for _, n := range o.cfg.ControlPlanes {
add(n)
}
for _, n := range o.cfg.Workers {
add(n)
}
for _, n := range o.cfg.SSHManagedNodes {
add(n)
}
for _, n := range o.cfg.Coordination.PeerHosts {
add(n)
}
add(o.cfg.Coordination.ForwardShutdownHost)
nodes := make([]string, 0, len(set))
for n := range set {
nodes = append(nodes, n)
}
sort.Strings(nodes)
return nodes
}
// validateNodeInventory runs one orchestration or CLI step.
// Signature: (o *Orchestrator) validateNodeInventory() error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) validateNodeInventory() error {
issues := []string{}
if o.cfg.SSHPort <= 0 || o.cfg.SSHPort > 65535 {
issues = append(issues, fmt.Sprintf("ssh_port=%d is invalid", o.cfg.SSHPort))
}
managed := makeStringSet(o.cfg.SSHManagedNodes)
for _, cp := range o.cfg.ControlPlanes {
cp = strings.TrimSpace(cp)
if cp == "" {
continue
}
if _, ok := managed[cp]; !ok {
issues = append(issues, fmt.Sprintf("control plane %s is missing from ssh_managed_nodes", cp))
}
}
for _, node := range o.cfg.Workers {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if _, ok := managed[node]; !ok {
issues = append(issues, fmt.Sprintf("worker %s is missing from ssh_managed_nodes", node))
}
}
baseUser := strings.TrimSpace(o.cfg.SSHUser)
for _, node := range o.inventoryNodesForValidation() {
if _, ok := o.cfg.SSHNodeHosts[node]; !ok {
issues = append(issues, fmt.Sprintf("%s is missing ssh_node_hosts entry", node))
}
host := strings.TrimSpace(o.cfg.SSHNodeHosts[node])
if host == "" {
host = node
}
if strings.ContainsAny(host, " \t\r\n") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains whitespace)", node, host))
}
if strings.Contains(host, "/") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains slash)", node, host))
}
user := baseUser
if override, ok := o.cfg.SSHNodeUsers[node]; ok {
user = strings.TrimSpace(override)
}
if user == "" {
issues = append(issues, fmt.Sprintf("%s has empty ssh user (ssh_user/ssh_node_users)", node))
}
if strings.ContainsAny(user, " \t\r\n@") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh user %q", node, user))
}
}
if len(issues) > 0 {
sort.Strings(issues)
return fmt.Errorf("node inventory preflight failed: %s", joinLimited(issues, 10))
}
return nil
}
// tcpReachable runs one orchestration or CLI step.
// Signature: (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool {
conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
return false
}
_ = conn.Close()
return true
}