627 lines
23 KiB
Go
627 lines
23 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// waitForWorkloadConvergence runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error {
|
|
wait := time.Duration(o.cfg.Startup.WorkloadConvergenceWaitSeconds) * time.Second
|
|
if wait <= 0 {
|
|
wait = 15 * time.Minute
|
|
}
|
|
poll := time.Duration(o.cfg.Startup.WorkloadConvergencePollSeconds) * time.Second
|
|
if poll <= 0 {
|
|
poll = 5 * time.Second
|
|
}
|
|
deadline := time.Now().Add(wait)
|
|
lastFailure := "unknown"
|
|
lastLogged := time.Time{}
|
|
lastRecycleAttempt := time.Time{}
|
|
lastReplicaHeal := time.Time{}
|
|
lastSchedulingStormHeal := time.Time{}
|
|
for {
|
|
prevFailure := lastFailure
|
|
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
|
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
|
o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal)
|
|
ready, detail, err := o.workloadConvergenceReady(ctx)
|
|
if err != nil {
|
|
lastFailure = err.Error()
|
|
} else {
|
|
lastFailure = detail
|
|
}
|
|
if ready {
|
|
o.log.Printf("workload convergence check passed (%s)", detail)
|
|
return nil
|
|
}
|
|
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
|
remaining := time.Until(deadline).Round(time.Second)
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
o.log.Printf("waiting for workload convergence (%s remaining): %s", remaining, lastFailure)
|
|
lastLogged = time.Now()
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return fmt.Errorf("startup blocked: workload convergence not satisfied within %s (%s)", wait, lastFailure)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(poll):
|
|
}
|
|
}
|
|
}
|
|
|
|
// workloadConvergenceReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error) {
|
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset,daemonset", "-A", "-o", "json")
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("query controllers: %w", err)
|
|
}
|
|
var list workloadList
|
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
|
return false, "", fmt.Errorf("decode controllers: %w", err)
|
|
}
|
|
requiredNamespaces := o.startupRequiredWorkloadNamespaces()
|
|
ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces)
|
|
ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
|
|
ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads)
|
|
ignoredByFlux := namespaceCandidatesFromIgnoreKustomizations(o.cfg.Startup.IgnoreFluxKustomizations)
|
|
pending := []string{}
|
|
checked := 0
|
|
for _, item := range list.Items {
|
|
kind := strings.ToLower(strings.TrimSpace(item.Kind))
|
|
ns := strings.TrimSpace(item.Metadata.Namespace)
|
|
name := strings.TrimSpace(item.Metadata.Name)
|
|
if kind == "" || ns == "" || name == "" {
|
|
continue
|
|
}
|
|
if len(requiredNamespaces) > 0 {
|
|
if _, ok := requiredNamespaces[ns]; !ok {
|
|
continue
|
|
}
|
|
}
|
|
if _, ok := ignoredNamespaces[ns]; ok {
|
|
continue
|
|
}
|
|
if _, ok := ignoredByFlux[ns]; ok {
|
|
continue
|
|
}
|
|
if workloadIgnored(ignoreRules, ns, kind, name) {
|
|
continue
|
|
}
|
|
if workloadTargetsIgnoredNodes(item.Spec.Template.Spec, ignoredNodes) {
|
|
continue
|
|
}
|
|
desired, ready, ok := desiredReady(item)
|
|
if !ok || desired <= 0 {
|
|
continue
|
|
}
|
|
if kind == "daemonset" && desired > ready && len(ignoredNodes) > 0 {
|
|
missing := desired - ready
|
|
if missing <= int32(len(ignoredNodes)) {
|
|
ready = desired
|
|
}
|
|
}
|
|
checked++
|
|
if ready < desired {
|
|
pending = append(pending, fmt.Sprintf("%s/%s/%s ready=%d desired=%d", ns, kind, name, ready, desired))
|
|
}
|
|
}
|
|
if len(pending) > 0 {
|
|
sort.Strings(pending)
|
|
return false, "not ready: " + joinLimited(pending, 8), nil
|
|
}
|
|
return true, fmt.Sprintf("controllers ready=%d", checked), nil
|
|
}
|
|
|
|
// desiredReady runs one orchestration or CLI step.
|
|
// Signature: desiredReady(item workloadResource) (int32, int32, bool).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func desiredReady(item workloadResource) (int32, int32, bool) {
|
|
switch strings.ToLower(strings.TrimSpace(item.Kind)) {
|
|
case "deployment", "statefulset":
|
|
desired := int32(1)
|
|
if item.Spec.Replicas != nil {
|
|
desired = *item.Spec.Replicas
|
|
}
|
|
return desired, item.Status.ReadyReplicas, true
|
|
case "daemonset":
|
|
return item.Status.DesiredNumberScheduled, item.Status.NumberReady, true
|
|
default:
|
|
return 0, 0, false
|
|
}
|
|
}
|
|
|
|
// recycleStuckControllerPods runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error {
|
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
|
|
if err != nil {
|
|
return fmt.Errorf("query pods: %w", err)
|
|
}
|
|
var list podList
|
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
|
return fmt.Errorf("decode pods: %w", err)
|
|
}
|
|
ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces)
|
|
ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
|
|
ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads)
|
|
grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second
|
|
if grace <= 0 {
|
|
grace = 180 * time.Second
|
|
}
|
|
stuckReasons := map[string]struct{}{
|
|
"ImagePullBackOff": {},
|
|
"ErrImagePull": {},
|
|
"CrashLoopBackOff": {},
|
|
"CreateContainerConfigError": {},
|
|
"CreateContainerError": {},
|
|
}
|
|
longhornAttachReasons := map[string]string{}
|
|
if reasons, scanErr := o.longhornAttachBlockedPodReasons(ctx, list, grace); scanErr != nil {
|
|
o.log.Printf("warning: longhorn attach-blocked pod scan failed: %v", scanErr)
|
|
} else {
|
|
longhornAttachReasons = reasons
|
|
}
|
|
encryptedMountReasons := map[string]string{}
|
|
if reasons, scanErr := o.repairEncryptedVolumeMountPrereqs(ctx, list, grace); scanErr != nil {
|
|
o.log.Printf("warning: encrypted volume mount prerequisite scan failed: %v", scanErr)
|
|
} else {
|
|
encryptedMountReasons = reasons
|
|
}
|
|
stalePhaseReasons := map[string]string{}
|
|
if reasons, scanErr := o.staleControllerPodReasons(ctx, list, grace); scanErr != nil {
|
|
o.log.Printf("warning: stale controller pod scan failed: %v", scanErr)
|
|
} else {
|
|
stalePhaseReasons = reasons
|
|
}
|
|
recycled := []string{}
|
|
for _, pod := range list.Items {
|
|
ns := strings.TrimSpace(pod.Metadata.Namespace)
|
|
name := strings.TrimSpace(pod.Metadata.Name)
|
|
if ns == "" || name == "" {
|
|
continue
|
|
}
|
|
if _, ok := ignoredNamespaces[ns]; ok {
|
|
continue
|
|
}
|
|
if workloadIgnored(ignoreRules, ns, "", name) {
|
|
continue
|
|
}
|
|
if podTargetsIgnoredNode(pod, ignoredNodes) {
|
|
continue
|
|
}
|
|
if !podControllerOwned(pod) {
|
|
continue
|
|
}
|
|
age := time.Since(pod.Metadata.CreationTimestamp)
|
|
if !pod.Metadata.CreationTimestamp.IsZero() && age < grace {
|
|
continue
|
|
}
|
|
reason := stuckContainerReason(pod, stuckReasons)
|
|
if reason == "" {
|
|
reason = stuckVaultInitReason(pod, grace)
|
|
}
|
|
if reason == "" {
|
|
reason = longhornAttachReasons[ns+"/"+name]
|
|
}
|
|
if reason == "" {
|
|
reason = encryptedMountReasons[ns+"/"+name]
|
|
}
|
|
if reason == "" {
|
|
reason = stalePhaseReasons[ns+"/"+name]
|
|
}
|
|
if reason == "" {
|
|
continue
|
|
}
|
|
deleteArgs := []string{"-n", ns, "delete", "pod", name, "--wait=false"}
|
|
forceDelete := staleControllerPodForceDeleteSafe(pod, grace)
|
|
if forceDelete {
|
|
deleteArgs = append(deleteArgs, "--grace-period=0", "--force")
|
|
}
|
|
if forceDelete {
|
|
o.log.Printf("warning: force recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second))
|
|
} else {
|
|
o.log.Printf("warning: recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second))
|
|
}
|
|
if _, err := o.kubectl(ctx, 30*time.Second, deleteArgs...); err != nil && !isNotFoundErr(err) {
|
|
o.log.Printf("warning: recycle pod failed for %s/%s: %v", ns, name, err)
|
|
continue
|
|
}
|
|
recycled = append(recycled, ns+"/"+name)
|
|
}
|
|
if len(recycled) > 0 {
|
|
sort.Strings(recycled)
|
|
o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10))
|
|
o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// staleControllerPodReasons runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
|
|
// Why: after node or kubelet recovery, controller-owned pods can stay in
|
|
// terminal or unknown status even though the node is Ready and a replacement may
|
|
// already be healthy. A normal pod delete lets Kubernetes clean the stale status
|
|
// without touching storage objects or forcing deletion on a partitioned node.
|
|
func (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
|
|
unavailable, err := o.unavailableNodeSet(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reasons := map[string]string{}
|
|
for _, pod := range pods.Items {
|
|
ns := strings.TrimSpace(pod.Metadata.Namespace)
|
|
name := strings.TrimSpace(pod.Metadata.Name)
|
|
node := strings.TrimSpace(pod.Spec.NodeName)
|
|
if ns == "" || name == "" || node == "" {
|
|
continue
|
|
}
|
|
phase := strings.TrimSpace(pod.Status.Phase)
|
|
if !strings.EqualFold(phase, "Unknown") && !strings.EqualFold(phase, "Failed") {
|
|
continue
|
|
}
|
|
if _, badNode := unavailable[node]; badNode {
|
|
continue
|
|
}
|
|
if !podControllerOwned(pod) {
|
|
continue
|
|
}
|
|
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
|
|
continue
|
|
}
|
|
reasons[ns+"/"+name] = "StaleControllerPodOnReadyNode:" + node + ":" + phase
|
|
}
|
|
return reasons, nil
|
|
}
|
|
|
|
// staleControllerPodForceDeleteSafe runs one orchestration or CLI step.
|
|
// Signature: staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool.
|
|
// Why: a stale pod already marked for deletion may need force removal after a
|
|
// node outage. Keep that fallback away from PVC-bearing pods so Ananke never
|
|
// risks duplicating a storage writer.
|
|
func staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool {
|
|
if pod.Metadata.DeletionTimestamp == nil {
|
|
return false
|
|
}
|
|
if time.Since(*pod.Metadata.DeletionTimestamp) < grace {
|
|
return false
|
|
}
|
|
if podUsesPersistentVolumeClaim(pod) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// podUsesPersistentVolumeClaim runs one orchestration or CLI step.
|
|
// Signature: podUsesPersistentVolumeClaim(pod podResource) bool.
|
|
// Why: force-delete recovery is deliberately disallowed for pods with PVCs; the
|
|
// scheduler and storage controller need to settle those normally.
|
|
func podUsesPersistentVolumeClaim(pod podResource) bool {
|
|
for _, volume := range pod.Spec.Volumes {
|
|
if volume.PersistentVolumeClaim != nil && strings.TrimSpace(volume.PersistentVolumeClaim.ClaimName) != "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
|
|
// Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node
|
|
// rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot
|
|
// mount encrypted PVCs; installing the missing host tool and recycling the
|
|
// controller-owned pod lets kubelet retry the same volume safely.
|
|
func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
|
|
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err)
|
|
}
|
|
var events eventList
|
|
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
|
|
return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err)
|
|
}
|
|
|
|
podsByKey := map[string]podResource{}
|
|
for _, pod := range pods.Items {
|
|
ns := strings.TrimSpace(pod.Metadata.Namespace)
|
|
name := strings.TrimSpace(pod.Metadata.Name)
|
|
node := strings.TrimSpace(pod.Spec.NodeName)
|
|
if ns == "" || name == "" || node == "" {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
|
|
continue
|
|
}
|
|
if !podControllerOwned(pod) {
|
|
continue
|
|
}
|
|
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
|
|
continue
|
|
}
|
|
podsByKey[ns+"/"+name] = pod
|
|
}
|
|
if len(podsByKey) == 0 {
|
|
return map[string]string{}, nil
|
|
}
|
|
|
|
repairedNodes := map[string]bool{}
|
|
reasons := map[string]string{}
|
|
for _, event := range events.Items {
|
|
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(event.Reason) != "FailedMount" {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
|
|
continue
|
|
}
|
|
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
|
|
pod, ok := podsByKey[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
lastSeen := eventLastObservedAt(event)
|
|
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
|
|
continue
|
|
}
|
|
message := strings.ToLower(strings.TrimSpace(event.Message))
|
|
if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") {
|
|
continue
|
|
}
|
|
node := strings.TrimSpace(pod.Spec.NodeName)
|
|
if node == "" || !o.sshManaged(node) {
|
|
o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key)
|
|
continue
|
|
}
|
|
if repaired, ok := repairedNodes[node]; ok {
|
|
if repaired {
|
|
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
|
|
}
|
|
continue
|
|
}
|
|
if err := o.ensureHostCryptsetup(ctx, node); err != nil {
|
|
repairedNodes[node] = false
|
|
o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err)
|
|
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
|
|
o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr)
|
|
continue
|
|
}
|
|
reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node
|
|
continue
|
|
}
|
|
repairedNodes[node] = true
|
|
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
|
|
}
|
|
return reasons, nil
|
|
}
|
|
|
|
// ensureHostCryptsetup runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error.
|
|
// Why: kubelet's encrypted Longhorn mount helper shells into the host namespace,
|
|
// so the package must exist on the node host, not merely inside a workload pod.
|
|
func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error {
|
|
command := strings.Join([]string{
|
|
"set -eu",
|
|
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi",
|
|
"if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi",
|
|
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update",
|
|
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin",
|
|
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi",
|
|
"echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__",
|
|
"exit 43",
|
|
}, "; ")
|
|
out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute)
|
|
if err != nil {
|
|
return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out))
|
|
}
|
|
o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, strings.TrimSpace(out))
|
|
o.noteStartupAutoHeal(fmt.Sprintf("ensured cryptsetup on %s", node))
|
|
return nil
|
|
}
|
|
|
|
// cordonNodeForMissingCryptsetup runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error.
|
|
// Why: when host package repair is not permitted, cordoning is the safest
|
|
// automatic fallback: it prevents new encrypted-volume pods from landing on a
|
|
// node kubelet cannot mount from, while leaving existing workloads and storage
|
|
// objects untouched.
|
|
func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error {
|
|
if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil {
|
|
return err
|
|
}
|
|
o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node)
|
|
o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node))
|
|
return nil
|
|
}
|
|
|
|
// longhornAttachBlockedPodReasons runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
|
|
// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a
|
|
// node Longhorn still marks unready. Recycling the unattached Pending pod lets
|
|
// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane
|
|
// objects.
|
|
func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
|
|
unreadyNodes, err := o.longhornUnreadyNodes(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(unreadyNodes) == 0 {
|
|
return map[string]string{}, nil
|
|
}
|
|
|
|
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err)
|
|
}
|
|
var events eventList
|
|
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
|
|
return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err)
|
|
}
|
|
|
|
podsByKey := map[string]podResource{}
|
|
for _, pod := range pods.Items {
|
|
ns := strings.TrimSpace(pod.Metadata.Namespace)
|
|
name := strings.TrimSpace(pod.Metadata.Name)
|
|
node := strings.TrimSpace(pod.Spec.NodeName)
|
|
if ns == "" || name == "" || node == "" {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
|
|
continue
|
|
}
|
|
if _, unready := unreadyNodes[node]; !unready {
|
|
continue
|
|
}
|
|
if !podControllerOwned(pod) {
|
|
continue
|
|
}
|
|
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
|
|
continue
|
|
}
|
|
podsByKey[ns+"/"+name] = pod
|
|
}
|
|
if len(podsByKey) == 0 {
|
|
return map[string]string{}, nil
|
|
}
|
|
|
|
reasons := map[string]string{}
|
|
for _, event := range events.Items {
|
|
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(event.Reason) != "FailedAttachVolume" {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
|
|
continue
|
|
}
|
|
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
|
|
pod, ok := podsByKey[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
lastSeen := eventLastObservedAt(event)
|
|
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
|
|
continue
|
|
}
|
|
node := strings.TrimSpace(pod.Spec.NodeName)
|
|
message := strings.ToLower(strings.TrimSpace(event.Message))
|
|
if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") {
|
|
continue
|
|
}
|
|
if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") {
|
|
continue
|
|
}
|
|
reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node
|
|
}
|
|
return reasons, nil
|
|
}
|
|
|
|
// longhornUnreadyNodes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error).
|
|
// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes
|
|
// node readiness; attach recovery must use Longhorn's view for safety.
|
|
func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) {
|
|
out, err := o.kubectl(ctx, 30*time.Second,
|
|
"-n", "longhorn-system",
|
|
"get", "nodes.longhorn.io",
|
|
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}",
|
|
)
|
|
if err != nil {
|
|
if isNotFoundErr(err) {
|
|
return map[string]struct{}{}, nil
|
|
}
|
|
return nil, fmt.Errorf("query longhorn node readiness: %w", err)
|
|
}
|
|
unready := map[string]struct{}{}
|
|
for _, line := range lines(out) {
|
|
fields := strings.Fields(line)
|
|
if len(fields) < 2 {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") {
|
|
unready[strings.TrimSpace(fields[0])] = struct{}{}
|
|
}
|
|
}
|
|
return unready, nil
|
|
}
|
|
|
|
// podControllerOwned runs one orchestration or CLI step.
|
|
// Signature: podControllerOwned(p podResource) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func podControllerOwned(p podResource) bool {
|
|
for _, owner := range p.Metadata.OwnerReferences {
|
|
switch strings.TrimSpace(owner.Kind) {
|
|
case "ReplicaSet", "StatefulSet", "DaemonSet":
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// stuckContainerReason runs one orchestration or CLI step.
|
|
// Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func stuckContainerReason(p podResource, reasons map[string]struct{}) string {
|
|
check := func(statuses []podContainerStatus) string {
|
|
for _, st := range statuses {
|
|
if st.State.Waiting == nil {
|
|
continue
|
|
}
|
|
reason := strings.TrimSpace(st.State.Waiting.Reason)
|
|
if reason == "" {
|
|
continue
|
|
}
|
|
if _, ok := reasons[reason]; ok {
|
|
return reason
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
if reason := check(p.Status.InitContainerStatuses); reason != "" {
|
|
return reason
|
|
}
|
|
return check(p.Status.ContainerStatuses)
|
|
}
|
|
|
|
// stuckVaultInitReason runs one orchestration or CLI step.
|
|
// Signature: stuckVaultInitReason(p podResource, grace time.Duration) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func stuckVaultInitReason(p podResource, grace time.Duration) string {
|
|
if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") {
|
|
return ""
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") {
|
|
return ""
|
|
}
|
|
for _, st := range p.Status.InitContainerStatuses {
|
|
if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil {
|
|
continue
|
|
}
|
|
startedAt := st.State.Running.StartedAt
|
|
if startedAt.IsZero() {
|
|
continue
|
|
}
|
|
if time.Since(startedAt) < grace {
|
|
return ""
|
|
}
|
|
return "VaultInitStuck"
|
|
}
|
|
return ""
|
|
}
|