ananke/internal/cluster/orchestrator_ingress.go

355 lines
12 KiB
Go
Raw Normal View History

package cluster
import (
"context"
"encoding/json"
"fmt"
neturl "net/url"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
)
// ensureRequiredNodeLabels runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureRequiredNodeLabels(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ensureRequiredNodeLabels(ctx context.Context) error {
if o.runner.DryRun || len(o.cfg.Startup.RequiredNodeLabels) == 0 {
return nil
}
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
nodes := make([]string, 0, len(o.cfg.Startup.RequiredNodeLabels))
for node := range o.cfg.Startup.RequiredNodeLabels {
node = strings.TrimSpace(node)
if node != "" {
nodes = append(nodes, node)
}
}
sort.Strings(nodes)
for _, node := range nodes {
if _, skip := ignored[node]; skip {
o.log.Printf("skipping required node labels for ignored unavailable node %s", node)
continue
}
labels := o.cfg.Startup.RequiredNodeLabels[node]
if len(labels) == 0 {
continue
}
keys := make([]string, 0, len(labels))
for key := range labels {
key = strings.TrimSpace(key)
if key != "" {
keys = append(keys, key)
}
}
sort.Strings(keys)
args := []string{"label", "node", node, "--overwrite"}
pairs := make([]string, 0, len(keys))
for _, key := range keys {
value := strings.TrimSpace(labels[key])
if value == "" {
continue
}
pair := fmt.Sprintf("%s=%s", key, value)
args = append(args, pair)
pairs = append(pairs, pair)
}
if len(pairs) == 0 {
continue
}
if _, err := o.kubectl(ctx, 25*time.Second, args...); err != nil {
if isNotFoundErr(err) && !o.startupNodeStrictlyRequired(node) {
o.log.Printf("warning: skipping required labels for absent non-core node %s: %v", node, err)
o.noteStartupAutoHeal(fmt.Sprintf("skipped required node labels for absent non-core node %s", node))
continue
}
return fmt.Errorf("ensure required node labels on %s (%s): %w", node, strings.Join(pairs, ", "), err)
}
o.log.Printf("ensured required labels on node %s: %s", node, strings.Join(pairs, ", "))
}
return nil
}
// waitForStartupConvergence runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForStartupConvergence(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForStartupConvergence(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
if o.cfg.Startup.RequireIngressChecklist {
o.noteStartupCheckState("ingress-checklist", "running", "waiting for ingress host reachability")
if err := o.waitForIngressChecklist(ctx); err != nil {
o.noteStartupCheck("ingress-checklist", false, err.Error())
return err
}
o.noteStartupCheck("ingress-checklist", true, "all ingress hosts reachable")
}
if o.cfg.Startup.RequireServiceChecklist {
o.noteStartupCheckState("service-checklist", "running", "waiting for external service checklist")
if err := o.waitForServiceChecklist(ctx); err != nil {
o.noteStartupCheck("service-checklist", false, err.Error())
return err
}
o.noteStartupCheck("service-checklist", true, "all configured service checks passed")
}
if o.cfg.Startup.RequireCriticalServiceEndpoints {
o.noteStartupCheckState("critical-service-endpoints", "running", "waiting for critical service endpoint backends")
if err := o.waitForCriticalServiceEndpoints(ctx); err != nil {
o.noteStartupCheck("critical-service-endpoints", false, err.Error())
return err
}
o.noteStartupCheck("critical-service-endpoints", true, "critical service endpoints have active backends")
}
if o.cfg.Startup.RequireFluxHealth {
o.noteStartupCheckState("flux-health", "running", "waiting for flux kustomization readiness")
if err := o.waitForFluxHealth(ctx); err != nil {
o.noteStartupCheck("flux-health", false, err.Error())
return err
}
o.noteStartupCheck("flux-health", true, "all flux kustomizations ready")
}
if o.cfg.Startup.RequireWorkloadConvergence {
o.noteStartupCheckState("workload-convergence", "running", "waiting for controller convergence")
if err := o.waitForWorkloadConvergence(ctx); err != nil {
o.noteStartupCheck("workload-convergence", false, err.Error())
return err
}
o.noteStartupCheck("workload-convergence", true, "controllers converged")
}
o.noteStartupCheckState("stability-window", "running", "running startup stability soak window")
if err := o.waitForStabilityWindow(ctx); err != nil {
o.noteStartupCheck("stability-window", false, err.Error())
return err
}
o.noteStartupCheck("stability-window", true, "startup soak passed")
return nil
}
// waitForIngressChecklist runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error {
wait := time.Duration(o.cfg.Startup.IngressChecklistWaitSeconds) * time.Second
if wait <= 0 {
wait = 7 * time.Minute
}
poll := time.Duration(o.cfg.Startup.IngressChecklistPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
lastIngressHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
prevFailure := lastFailure
ready, detail := o.ingressChecklistReady(ctx)
lastFailure = detail
if ready {
o.log.Printf("ingress checklist passed (%s)", detail)
return nil
}
o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure)
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for ingress checklist (%s remaining): %s", remaining, lastFailure)
lastLogged = time.Now()
}
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: ingress checklist not satisfied within %s (%s)", wait, lastFailure)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
// ingressChecklistReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ingressChecklistReady(ctx context.Context) (bool, string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ingressChecklistReady(ctx context.Context) (bool, string) {
hosts, err := o.discoverIngressHosts(ctx)
if err != nil {
return false, err.Error()
}
if len(hosts) == 0 {
return true, "no ingress hosts discovered"
}
accepted := o.cfg.Startup.IngressChecklistAccepted
if len(accepted) == 0 {
accepted = []int{200, 301, 302, 307, 308, 401, 403, 404}
}
for _, host := range hosts {
check := config.ServiceChecklistCheck{
Name: "ingress-" + host,
URL: "https://" + host + "/",
AcceptedStatuses: accepted,
TimeoutSeconds: 12,
InsecureSkipTLS: o.cfg.Startup.IngressChecklistInsecureSkip,
}
ok, detail := o.serviceCheckReady(ctx, check)
if !ok {
return false, fmt.Sprintf("%s: %s", host, detail)
}
}
return true, fmt.Sprintf("hosts=%d", len(hosts))
}
// discoverIngressHosts runs one orchestration or CLI step.
// Signature: (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query ingresses: %w", err)
}
var list ingressList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode ingresses: %w", err)
}
ignored := makeStringSet(o.cfg.Startup.IngressChecklistIgnoreHosts)
hosts := map[string]struct{}{}
for _, item := range list.Items {
for _, rule := range item.Spec.Rules {
host := strings.TrimSpace(rule.Host)
if host == "" || strings.Contains(host, "*") {
continue
}
if _, skip := ignored[host]; skip {
continue
}
hosts[host] = struct{}{}
}
}
outHosts := make([]string, 0, len(hosts))
for host := range hosts {
outHosts = append(outHosts, host)
}
sort.Strings(outHosts)
return outHosts, nil
}
// discoverIngressNamespacesForHost runs one orchestration or CLI step.
// Signature: (o *Orchestrator) discoverIngressNamespacesForHost(ctx context.Context, host string) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) discoverIngressNamespacesForHost(ctx context.Context, host string) ([]string, error) {
host = strings.ToLower(strings.TrimSpace(host))
if host == "" {
return nil, nil
}
out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query ingresses: %w", err)
}
var list ingressList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode ingresses: %w", err)
}
namespaces := map[string]struct{}{}
for _, item := range list.Items {
ns := strings.TrimSpace(item.Metadata.Namespace)
if ns == "" {
continue
}
for _, rule := range item.Spec.Rules {
ruleHost := strings.ToLower(strings.TrimSpace(rule.Host))
if ruleHost == "" {
continue
}
if ruleHost == host {
namespaces[ns] = struct{}{}
break
}
}
}
outNamespaces := make([]string, 0, len(namespaces))
for ns := range namespaces {
outNamespaces = append(outNamespaces, ns)
}
sort.Strings(outNamespaces)
return outNamespaces, nil
}
// maybeAutoHealIngressHostBackends runs one orchestration or CLI step.
// Signature: (o *Orchestrator) maybeAutoHealIngressHostBackends(ctx context.Context, lastAttempt *time.Time, failureDetail string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) maybeAutoHealIngressHostBackends(ctx context.Context, lastAttempt *time.Time, failureDetail string) {
if o.runner.DryRun {
return
}
host := o.checklistFailureHost(failureDetail)
if host == "" {
return
}
now := time.Now()
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 45*time.Second {
return
}
if lastAttempt != nil {
*lastAttempt = now
}
healed, err := o.healIngressHostBackendReplicas(ctx, host)
if err != nil {
o.log.Printf("warning: ingress host auto-heal failed for %s: %v", host, err)
return
}
if len(healed) == 0 {
return
}
sort.Strings(healed)
detail := fmt.Sprintf("restored ingress backend replicas for %s: %s", host, joinLimited(healed, 8))
o.log.Printf("%s", detail)
o.noteStartupAutoHeal(detail)
}
// checklistFailureHost runs one orchestration or CLI step.
// Signature: (o *Orchestrator) checklistFailureHost(failureDetail string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) checklistFailureHost(failureDetail string) string {
prefix := strings.TrimSpace(failureDetail)
if idx := strings.Index(prefix, ":"); idx > 0 {
prefix = strings.TrimSpace(prefix[:idx])
}
if isLikelyHostname(prefix) {
return strings.ToLower(prefix)
}
for _, check := range o.cfg.Startup.ServiceChecklist {
name := strings.TrimSpace(check.Name)
if !strings.EqualFold(name, prefix) {
continue
}
host := hostFromURL(check.URL)
if host != "" {
return strings.ToLower(host)
}
}
if host := hostFromURL(prefix); host != "" {
return strings.ToLower(host)
}
return ""
}
// hostFromURL runs one orchestration or CLI step.
// Signature: hostFromURL(raw string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func hostFromURL(raw string) string {
parsed, err := neturl.Parse(strings.TrimSpace(raw))
if err != nil || parsed == nil {
return ""
}
return strings.TrimSpace(parsed.Hostname())
}