328 lines
10 KiB
Go
328 lines
10 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// maybeAutoRecycleStuckPods runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttempt *time.Time).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttempt *time.Time) {
|
|
if o.runner.DryRun || !o.cfg.Startup.AutoRecycleStuckPods {
|
|
return
|
|
}
|
|
now := time.Now()
|
|
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
|
|
return
|
|
}
|
|
if lastAttempt != nil {
|
|
*lastAttempt = now
|
|
}
|
|
o.bestEffort("recycle stuck controller pods", func() error { return o.recycleStuckControllerPods(ctx) })
|
|
}
|
|
|
|
// maybeAutoHealCriticalWorkloadReplicas runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time) {
|
|
if o.runner.DryRun {
|
|
return
|
|
}
|
|
now := time.Now()
|
|
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
|
|
return
|
|
}
|
|
if lastAttempt != nil {
|
|
*lastAttempt = now
|
|
}
|
|
healed, err := o.healCriticalWorkloadReplicas(ctx)
|
|
if err != nil {
|
|
o.log.Printf("warning: critical workload replica auto-heal failed: %v", err)
|
|
return
|
|
}
|
|
if len(healed) == 0 {
|
|
return
|
|
}
|
|
sort.Strings(healed)
|
|
detail := fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))
|
|
o.log.Printf("%s", detail)
|
|
o.noteStartupAutoHeal(detail)
|
|
}
|
|
|
|
// healCriticalWorkloadReplicas runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error) {
|
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query workloads: %w", err)
|
|
}
|
|
var list workloadList
|
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
|
return nil, fmt.Errorf("decode workloads: %w", err)
|
|
}
|
|
current := map[string]int32{}
|
|
for _, item := range list.Items {
|
|
kind := strings.ToLower(strings.TrimSpace(item.Kind))
|
|
ns := strings.TrimSpace(item.Metadata.Namespace)
|
|
name := strings.TrimSpace(item.Metadata.Name)
|
|
if kind == "" || ns == "" || name == "" {
|
|
continue
|
|
}
|
|
if kind != "deployment" && kind != "statefulset" {
|
|
continue
|
|
}
|
|
desired := int32(1)
|
|
if item.Spec.Replicas != nil {
|
|
desired = *item.Spec.Replicas
|
|
}
|
|
key := ns + "/" + kind + "/" + name
|
|
current[key] = desired
|
|
}
|
|
|
|
healed := []string{}
|
|
for _, w := range criticalStartupWorkloads {
|
|
key := w.Namespace + "/" + strings.ToLower(w.Kind) + "/" + w.Name
|
|
desired, ok := current[key]
|
|
if !ok || desired >= 1 {
|
|
continue
|
|
}
|
|
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
|
|
if isNotFoundErr(err) {
|
|
continue
|
|
}
|
|
return healed, fmt.Errorf("scale %s to 1: %w", key, err)
|
|
}
|
|
healed = append(healed, key)
|
|
}
|
|
return healed, nil
|
|
}
|
|
|
|
// startupFailurePods runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error) {
|
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query pods: %w", err)
|
|
}
|
|
var list podList
|
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
|
return nil, fmt.Errorf("decode pods: %w", err)
|
|
}
|
|
|
|
requiredNamespaces := o.startupRequiredWorkloadNamespaces()
|
|
ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces)
|
|
ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
|
|
stuckReasons := map[string]struct{}{
|
|
"ImagePullBackOff": {},
|
|
"ErrImagePull": {},
|
|
"CrashLoopBackOff": {},
|
|
"CreateContainerConfigError": {},
|
|
"CreateContainerError": {},
|
|
"RunContainerError": {},
|
|
}
|
|
grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second
|
|
if grace <= 0 {
|
|
grace = 180 * time.Second
|
|
}
|
|
|
|
failures := []string{}
|
|
for _, pod := range list.Items {
|
|
ns := strings.TrimSpace(pod.Metadata.Namespace)
|
|
name := strings.TrimSpace(pod.Metadata.Name)
|
|
if ns == "" || name == "" {
|
|
continue
|
|
}
|
|
if len(requiredNamespaces) > 0 {
|
|
if _, ok := requiredNamespaces[ns]; !ok {
|
|
continue
|
|
}
|
|
}
|
|
if _, ok := ignoredNamespaces[ns]; ok {
|
|
continue
|
|
}
|
|
if podTargetsIgnoredNode(pod, ignoredNodes) {
|
|
continue
|
|
}
|
|
reason := stuckContainerReason(pod, stuckReasons)
|
|
if reason == "" {
|
|
reason = stuckVaultInitReason(pod, grace)
|
|
}
|
|
if reason == "" {
|
|
continue
|
|
}
|
|
failures = append(failures, fmt.Sprintf("%s/%s(%s)", ns, name, reason))
|
|
}
|
|
sort.Strings(failures)
|
|
return failures, nil
|
|
}
|
|
|
|
// podTargetsIgnoredNode runs one orchestration or CLI step.
|
|
// Signature: podTargetsIgnoredNode(p podResource, ignored map[string]struct{}) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func podTargetsIgnoredNode(p podResource, ignored map[string]struct{}) bool {
|
|
if len(ignored) == 0 {
|
|
return false
|
|
}
|
|
node := strings.TrimSpace(p.Spec.NodeName)
|
|
if node != "" {
|
|
_, ok := ignored[node]
|
|
return ok
|
|
}
|
|
return workloadTargetsIgnoredNodes(p.Spec.podSpec, ignored)
|
|
}
|
|
|
|
// workloadTargetsIgnoredNodes runs one orchestration or CLI step.
|
|
// Signature: workloadTargetsIgnoredNodes(spec podSpec, ignored map[string]struct{}) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func workloadTargetsIgnoredNodes(spec podSpec, ignored map[string]struct{}) bool {
|
|
if len(ignored) == 0 {
|
|
return false
|
|
}
|
|
if hostname, ok := spec.NodeSelector["kubernetes.io/hostname"]; ok {
|
|
_, ignoredHost := ignored[strings.TrimSpace(hostname)]
|
|
if ignoredHost {
|
|
return true
|
|
}
|
|
}
|
|
if spec.Affinity == nil || spec.Affinity.NodeAffinity == nil || spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
|
|
return false
|
|
}
|
|
terms := spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
|
|
if len(terms) != 1 {
|
|
return false
|
|
}
|
|
for _, expr := range terms[0].MatchExpressions {
|
|
if strings.TrimSpace(expr.Key) != "kubernetes.io/hostname" {
|
|
continue
|
|
}
|
|
if !strings.EqualFold(strings.TrimSpace(expr.Operator), "In") {
|
|
return false
|
|
}
|
|
if len(expr.Values) == 0 {
|
|
return false
|
|
}
|
|
for _, value := range expr.Values {
|
|
if _, ok := ignored[strings.TrimSpace(value)]; !ok {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// parseWorkloadIgnoreRules runs one orchestration or CLI step.
|
|
// Signature: parseWorkloadIgnoreRules(entries []string) []workloadIgnoreRule.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func parseWorkloadIgnoreRules(entries []string) []workloadIgnoreRule {
|
|
out := []workloadIgnoreRule{}
|
|
for _, entry := range entries {
|
|
entry = strings.TrimSpace(entry)
|
|
if entry == "" {
|
|
continue
|
|
}
|
|
parts := strings.Split(entry, "/")
|
|
switch len(parts) {
|
|
case 2:
|
|
out = append(out, workloadIgnoreRule{
|
|
Namespace: strings.TrimSpace(parts[0]),
|
|
Name: strings.TrimSpace(parts[1]),
|
|
})
|
|
case 3:
|
|
out = append(out, workloadIgnoreRule{
|
|
Namespace: strings.TrimSpace(parts[0]),
|
|
Kind: strings.ToLower(strings.TrimSpace(parts[1])),
|
|
Name: strings.TrimSpace(parts[2]),
|
|
})
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// workloadIgnored runs one orchestration or CLI step.
|
|
// Signature: workloadIgnored(rules []workloadIgnoreRule, namespace, kind, name string) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func workloadIgnored(rules []workloadIgnoreRule, namespace, kind, name string) bool {
|
|
ns := strings.TrimSpace(namespace)
|
|
k := strings.ToLower(strings.TrimSpace(kind))
|
|
n := strings.TrimSpace(name)
|
|
for _, rule := range rules {
|
|
if rule.Namespace != ns {
|
|
continue
|
|
}
|
|
if rule.Kind != "" && rule.Kind != k {
|
|
continue
|
|
}
|
|
if rule.Name == n {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// makeStringSet runs one orchestration or CLI step.
|
|
// Signature: makeStringSet(entries []string) map[string]struct{}.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func makeStringSet(entries []string) map[string]struct{} {
|
|
out := make(map[string]struct{}, len(entries))
|
|
for _, entry := range entries {
|
|
entry = strings.TrimSpace(entry)
|
|
if entry != "" {
|
|
out[entry] = struct{}{}
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// readyCondition runs one orchestration or CLI step.
|
|
// Signature: readyCondition(conditions []fluxCondition) *fluxCondition.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func readyCondition(conditions []fluxCondition) *fluxCondition {
|
|
for i := range conditions {
|
|
cond := &conditions[i]
|
|
if strings.EqualFold(strings.TrimSpace(cond.Type), "Ready") {
|
|
return cond
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// joinLimited runs one orchestration or CLI step.
|
|
// Signature: joinLimited(items []string, limit int) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func joinLimited(items []string, limit int) string {
|
|
if len(items) <= limit || limit <= 0 {
|
|
return strings.Join(items, "; ")
|
|
}
|
|
return strings.Join(items[:limit], "; ") + fmt.Sprintf("; ... (+%d more)", len(items)-limit)
|
|
}
|
|
|
|
// namespaceCandidatesFromIgnoreKustomizations runs one orchestration or CLI step.
|
|
// Signature: namespaceCandidatesFromIgnoreKustomizations(entries []string) map[string]struct{}.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func namespaceCandidatesFromIgnoreKustomizations(entries []string) map[string]struct{} {
|
|
out := map[string]struct{}{}
|
|
for _, entry := range entries {
|
|
entry = strings.TrimSpace(entry)
|
|
if entry == "" {
|
|
continue
|
|
}
|
|
parts := strings.SplitN(entry, "/", 2)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
name := strings.TrimSpace(parts[1])
|
|
if name != "" {
|
|
out[name] = struct{}{}
|
|
}
|
|
}
|
|
return out
|
|
}
|