package cluster import ( "context" "encoding/json" "fmt" "sort" "strings" "time" ) // waitForWorkloadConvergence runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.WorkloadConvergenceWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } poll := time.Duration(o.cfg.Startup.WorkloadConvergencePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} lastSchedulingStormHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("workload convergence check passed (%s)", detail) return nil } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for workload convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: workload convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // workloadConvergenceReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset,daemonset", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query controllers: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode controllers: %w", err) } requiredNamespaces := o.startupRequiredWorkloadNamespaces() ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) ignoredByFlux := namespaceCandidatesFromIgnoreKustomizations(o.cfg.Startup.IgnoreFluxKustomizations) pending := []string{} checked := 0 for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if len(requiredNamespaces) > 0 { if _, ok := requiredNamespaces[ns]; !ok { continue } } if _, ok := ignoredNamespaces[ns]; ok { continue } if _, ok := ignoredByFlux[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, kind, name) { continue } if workloadTargetsIgnoredNodes(item.Spec.Template.Spec, ignoredNodes) { continue } desired, ready, ok := desiredReady(item) if !ok || desired <= 0 { continue } if kind == "daemonset" && desired > ready && len(ignoredNodes) > 0 { missing := desired - ready if missing <= int32(len(ignoredNodes)) { ready = desired } } checked++ if ready < desired { pending = append(pending, fmt.Sprintf("%s/%s/%s ready=%d desired=%d", ns, kind, name, ready, desired)) } } if len(pending) > 0 { sort.Strings(pending) return false, "not ready: " + joinLimited(pending, 8), nil } return true, fmt.Sprintf("controllers ready=%d", checked), nil } // desiredReady runs one orchestration or CLI step. // Signature: desiredReady(item workloadResource) (int32, int32, bool). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func desiredReady(item workloadResource) (int32, int32, bool) { switch strings.ToLower(strings.TrimSpace(item.Kind)) { case "deployment", "statefulset": desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } return desired, item.Status.ReadyReplicas, true case "daemonset": return item.Status.DesiredNumberScheduled, item.Status.NumberReady, true default: return 0, 0, false } } // recycleStuckControllerPods runs one orchestration or CLI step. // Signature: (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return fmt.Errorf("query pods: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return fmt.Errorf("decode pods: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } stuckReasons := map[string]struct{}{ "ImagePullBackOff": {}, "ErrImagePull": {}, "CrashLoopBackOff": {}, "CreateContainerConfigError": {}, "CreateContainerError": {}, } longhornAttachReasons := map[string]string{} if reasons, scanErr := o.longhornAttachBlockedPodReasons(ctx, list, grace); scanErr != nil { o.log.Printf("warning: longhorn attach-blocked pod scan failed: %v", scanErr) } else { longhornAttachReasons = reasons } encryptedMountReasons := map[string]string{} if reasons, scanErr := o.repairEncryptedVolumeMountPrereqs(ctx, list, grace); scanErr != nil { o.log.Printf("warning: encrypted volume mount prerequisite scan failed: %v", scanErr) } else { encryptedMountReasons = reasons } recycled := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) if ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, "", name) { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } if !podControllerOwned(pod) { continue } age := time.Since(pod.Metadata.CreationTimestamp) if !pod.Metadata.CreationTimestamp.IsZero() && age < grace { continue } reason := stuckContainerReason(pod, stuckReasons) if reason == "" { reason = stuckVaultInitReason(pod, grace) } if reason == "" { reason = longhornAttachReasons[ns+"/"+name] } if reason == "" { reason = encryptedMountReasons[ns+"/"+name] } if reason == "" { continue } o.log.Printf("warning: recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second)) if _, err := o.kubectl(ctx, 30*time.Second, "-n", ns, "delete", "pod", name, "--wait=false"); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: recycle pod failed for %s/%s: %v", ns, name, err) continue } recycled = append(recycled, ns+"/"+name) } if len(recycled) > 0 { sort.Strings(recycled) o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10)) o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10))) } return nil } // repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step. // Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). // Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node // rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot // mount encrypted PVCs; installing the missing host tool and recycling the // controller-owned pod lets kubelet retry the same volume safely. func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err) } var events eventList if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err) } podsByKey := map[string]podResource{} for _, pod := range pods.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) node := strings.TrimSpace(pod.Spec.NodeName) if ns == "" || name == "" || node == "" { continue } if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { continue } if !podControllerOwned(pod) { continue } if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { continue } podsByKey[ns+"/"+name] = pod } if len(podsByKey) == 0 { return map[string]string{}, nil } repairedNodes := map[string]bool{} reasons := map[string]string{} for _, event := range events.Items { if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { continue } if strings.TrimSpace(event.Reason) != "FailedMount" { continue } if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { continue } key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) pod, ok := podsByKey[key] if !ok { continue } lastSeen := eventLastObservedAt(event) if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { continue } message := strings.ToLower(strings.TrimSpace(event.Message)) if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") { continue } node := strings.TrimSpace(pod.Spec.NodeName) if node == "" || !o.sshManaged(node) { o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key) continue } if repaired, ok := repairedNodes[node]; ok { if repaired { reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node } continue } if err := o.ensureHostCryptsetup(ctx, node); err != nil { repairedNodes[node] = false o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err) if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil { o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr) continue } reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node continue } repairedNodes[node] = true reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node } return reasons, nil } // ensureHostCryptsetup runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error. // Why: kubelet's encrypted Longhorn mount helper shells into the host namespace, // so the package must exist on the node host, not merely inside a workload pod. func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error { command := strings.Join([]string{ "set -eu", "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi", "if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi", "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update", "sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin", "if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi", "echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__", "exit 43", }, "; ") out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute) if err != nil { return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out)) } o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, strings.TrimSpace(out)) o.noteStartupAutoHeal(fmt.Sprintf("ensured cryptsetup on %s", node)) return nil } // cordonNodeForMissingCryptsetup runs one orchestration or CLI step. // Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error. // Why: when host package repair is not permitted, cordoning is the safest // automatic fallback: it prevents new encrypted-volume pods from landing on a // node kubelet cannot mount from, while leaving existing workloads and storage // objects untouched. func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error { if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil { return err } o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node) o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node)) return nil } // longhornAttachBlockedPodReasons runs one orchestration or CLI step. // Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). // Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a // node Longhorn still marks unready. Recycling the unattached Pending pod lets // the scheduler pick a Longhorn-ready node without touching Longhorn data-plane // objects. func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { unreadyNodes, err := o.longhornUnreadyNodes(ctx) if err != nil { return nil, err } if len(unreadyNodes) == 0 { return map[string]string{}, nil } eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err) } var events eventList if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err) } podsByKey := map[string]podResource{} for _, pod := range pods.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) node := strings.TrimSpace(pod.Spec.NodeName) if ns == "" || name == "" || node == "" { continue } if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { continue } if _, unready := unreadyNodes[node]; !unready { continue } if !podControllerOwned(pod) { continue } if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { continue } podsByKey[ns+"/"+name] = pod } if len(podsByKey) == 0 { return map[string]string{}, nil } reasons := map[string]string{} for _, event := range events.Items { if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { continue } if strings.TrimSpace(event.Reason) != "FailedAttachVolume" { continue } if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { continue } key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) pod, ok := podsByKey[key] if !ok { continue } lastSeen := eventLastObservedAt(event) if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { continue } node := strings.TrimSpace(pod.Spec.NodeName) message := strings.ToLower(strings.TrimSpace(event.Message)) if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") { continue } if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") { continue } reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node } return reasons, nil } // longhornUnreadyNodes runs one orchestration or CLI step. // Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error). // Why: Longhorn node readiness can lag or intentionally differ from Kubernetes // node readiness; attach recovery must use Longhorn's view for safety. func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) { out, err := o.kubectl(ctx, 30*time.Second, "-n", "longhorn-system", "get", "nodes.longhorn.io", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}", ) if err != nil { if isNotFoundErr(err) { return map[string]struct{}{}, nil } return nil, fmt.Errorf("query longhorn node readiness: %w", err) } unready := map[string]struct{}{} for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 2 { continue } if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") { unready[strings.TrimSpace(fields[0])] = struct{}{} } } return unready, nil } // podControllerOwned runs one orchestration or CLI step. // Signature: podControllerOwned(p podResource) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func podControllerOwned(p podResource) bool { for _, owner := range p.Metadata.OwnerReferences { switch strings.TrimSpace(owner.Kind) { case "ReplicaSet", "StatefulSet", "DaemonSet": return true } } return false } // stuckContainerReason runs one orchestration or CLI step. // Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func stuckContainerReason(p podResource, reasons map[string]struct{}) string { check := func(statuses []podContainerStatus) string { for _, st := range statuses { if st.State.Waiting == nil { continue } reason := strings.TrimSpace(st.State.Waiting.Reason) if reason == "" { continue } if _, ok := reasons[reason]; ok { return reason } } return "" } if reason := check(p.Status.InitContainerStatuses); reason != "" { return reason } return check(p.Status.ContainerStatuses) } // stuckVaultInitReason runs one orchestration or CLI step. // Signature: stuckVaultInitReason(p podResource, grace time.Duration) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func stuckVaultInitReason(p podResource, grace time.Duration) string { if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") { return "" } if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") { return "" } for _, st := range p.Status.InitContainerStatuses { if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil { continue } startedAt := st.State.Running.StartedAt if startedAt.IsZero() { continue } if time.Since(startedAt) < grace { return "" } return "VaultInitStuck" } return "" }