ananke/internal/cluster/orchestrator_workload_ignore.go

322 lines
10 KiB
Go

package cluster
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
// maybeAutoRecycleStuckPods runs one orchestration or CLI step.
// Signature: (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttempt *time.Time).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttempt *time.Time) {
if o.runner.DryRun || !o.cfg.Startup.AutoRecycleStuckPods {
return
}
now := time.Now()
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
return
}
if lastAttempt != nil {
*lastAttempt = now
}
o.bestEffort("recycle stuck controller pods", func() error { return o.recycleStuckControllerPods(ctx) })
}
// maybeAutoHealCriticalWorkloadReplicas runs one orchestration or CLI step.
// Signature: (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time) {
if o.runner.DryRun {
return
}
now := time.Now()
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
return
}
if lastAttempt != nil {
*lastAttempt = now
}
healed, err := o.healCriticalWorkloadReplicas(ctx)
if err != nil {
o.log.Printf("warning: critical workload replica auto-heal failed: %v", err)
return
}
if len(healed) == 0 {
return
}
sort.Strings(healed)
detail := fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))
o.log.Printf("%s", detail)
o.noteStartupAutoHeal(detail)
}
// healCriticalWorkloadReplicas runs one orchestration or CLI step.
// Signature: (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query workloads: %w", err)
}
var list workloadList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode workloads: %w", err)
}
current := map[string]int32{}
for _, item := range list.Items {
kind := strings.ToLower(strings.TrimSpace(item.Kind))
ns := strings.TrimSpace(item.Metadata.Namespace)
name := strings.TrimSpace(item.Metadata.Name)
if kind == "" || ns == "" || name == "" {
continue
}
if kind != "deployment" && kind != "statefulset" {
continue
}
desired := int32(1)
if item.Spec.Replicas != nil {
desired = *item.Spec.Replicas
}
key := ns + "/" + kind + "/" + name
current[key] = desired
}
healed := []string{}
for _, w := range criticalStartupWorkloads {
key := w.Namespace + "/" + strings.ToLower(w.Kind) + "/" + w.Name
desired, ok := current[key]
if !ok || desired >= 1 {
continue
}
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
if isNotFoundErr(err) {
continue
}
return healed, fmt.Errorf("scale %s to 1: %w", key, err)
}
healed = append(healed, key)
}
return healed, nil
}
// startupFailurePods runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query pods: %w", err)
}
var list podList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode pods: %w", err)
}
ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces)
ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
stuckReasons := map[string]struct{}{
"ImagePullBackOff": {},
"ErrImagePull": {},
"CrashLoopBackOff": {},
"CreateContainerConfigError": {},
"CreateContainerError": {},
"RunContainerError": {},
}
grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second
if grace <= 0 {
grace = 180 * time.Second
}
failures := []string{}
for _, pod := range list.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
if ns == "" || name == "" {
continue
}
if _, ok := ignoredNamespaces[ns]; ok {
continue
}
if podTargetsIgnoredNode(pod, ignoredNodes) {
continue
}
reason := stuckContainerReason(pod, stuckReasons)
if reason == "" {
reason = stuckVaultInitReason(pod, grace)
}
if reason == "" {
continue
}
failures = append(failures, fmt.Sprintf("%s/%s(%s)", ns, name, reason))
}
sort.Strings(failures)
return failures, nil
}
// podTargetsIgnoredNode runs one orchestration or CLI step.
// Signature: podTargetsIgnoredNode(p podResource, ignored map[string]struct{}) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func podTargetsIgnoredNode(p podResource, ignored map[string]struct{}) bool {
if len(ignored) == 0 {
return false
}
node := strings.TrimSpace(p.Spec.NodeName)
if node != "" {
_, ok := ignored[node]
return ok
}
return workloadTargetsIgnoredNodes(p.Spec.podSpec, ignored)
}
// workloadTargetsIgnoredNodes runs one orchestration or CLI step.
// Signature: workloadTargetsIgnoredNodes(spec podSpec, ignored map[string]struct{}) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func workloadTargetsIgnoredNodes(spec podSpec, ignored map[string]struct{}) bool {
if len(ignored) == 0 {
return false
}
if hostname, ok := spec.NodeSelector["kubernetes.io/hostname"]; ok {
_, ignoredHost := ignored[strings.TrimSpace(hostname)]
if ignoredHost {
return true
}
}
if spec.Affinity == nil || spec.Affinity.NodeAffinity == nil || spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return false
}
terms := spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
if len(terms) != 1 {
return false
}
for _, expr := range terms[0].MatchExpressions {
if strings.TrimSpace(expr.Key) != "kubernetes.io/hostname" {
continue
}
if !strings.EqualFold(strings.TrimSpace(expr.Operator), "In") {
return false
}
if len(expr.Values) == 0 {
return false
}
for _, value := range expr.Values {
if _, ok := ignored[strings.TrimSpace(value)]; !ok {
return false
}
}
return true
}
return false
}
// parseWorkloadIgnoreRules runs one orchestration or CLI step.
// Signature: parseWorkloadIgnoreRules(entries []string) []workloadIgnoreRule.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func parseWorkloadIgnoreRules(entries []string) []workloadIgnoreRule {
out := []workloadIgnoreRule{}
for _, entry := range entries {
entry = strings.TrimSpace(entry)
if entry == "" {
continue
}
parts := strings.Split(entry, "/")
switch len(parts) {
case 2:
out = append(out, workloadIgnoreRule{
Namespace: strings.TrimSpace(parts[0]),
Name: strings.TrimSpace(parts[1]),
})
case 3:
out = append(out, workloadIgnoreRule{
Namespace: strings.TrimSpace(parts[0]),
Kind: strings.ToLower(strings.TrimSpace(parts[1])),
Name: strings.TrimSpace(parts[2]),
})
}
}
return out
}
// workloadIgnored runs one orchestration or CLI step.
// Signature: workloadIgnored(rules []workloadIgnoreRule, namespace, kind, name string) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func workloadIgnored(rules []workloadIgnoreRule, namespace, kind, name string) bool {
ns := strings.TrimSpace(namespace)
k := strings.ToLower(strings.TrimSpace(kind))
n := strings.TrimSpace(name)
for _, rule := range rules {
if rule.Namespace != ns {
continue
}
if rule.Kind != "" && rule.Kind != k {
continue
}
if rule.Name == n {
return true
}
}
return false
}
// makeStringSet runs one orchestration or CLI step.
// Signature: makeStringSet(entries []string) map[string]struct{}.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func makeStringSet(entries []string) map[string]struct{} {
out := make(map[string]struct{}, len(entries))
for _, entry := range entries {
entry = strings.TrimSpace(entry)
if entry != "" {
out[entry] = struct{}{}
}
}
return out
}
// readyCondition runs one orchestration or CLI step.
// Signature: readyCondition(conditions []fluxCondition) *fluxCondition.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func readyCondition(conditions []fluxCondition) *fluxCondition {
for i := range conditions {
cond := &conditions[i]
if strings.EqualFold(strings.TrimSpace(cond.Type), "Ready") {
return cond
}
}
return nil
}
// joinLimited runs one orchestration or CLI step.
// Signature: joinLimited(items []string, limit int) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func joinLimited(items []string, limit int) string {
if len(items) <= limit || limit <= 0 {
return strings.Join(items, "; ")
}
return strings.Join(items[:limit], "; ") + fmt.Sprintf("; ... (+%d more)", len(items)-limit)
}
// namespaceCandidatesFromIgnoreKustomizations runs one orchestration or CLI step.
// Signature: namespaceCandidatesFromIgnoreKustomizations(entries []string) map[string]struct{}.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func namespaceCandidatesFromIgnoreKustomizations(entries []string) map[string]struct{} {
out := map[string]struct{}{}
for _, entry := range entries {
entry = strings.TrimSpace(entry)
if entry == "" {
continue
}
parts := strings.SplitN(entry, "/", 2)
if len(parts) != 2 {
continue
}
name := strings.TrimSpace(parts[1])
if name != "" {
out[name] = struct{}{}
}
}
return out
}