startup: auto-heal ingress-backed workloads when checks fail
This commit is contained in:
parent
0f48773572
commit
14a9d67088
@ -2514,6 +2514,7 @@ func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error {
|
|||||||
lastLogged := time.Time{}
|
lastLogged := time.Time{}
|
||||||
lastRecycleAttempt := time.Time{}
|
lastRecycleAttempt := time.Time{}
|
||||||
lastReplicaHeal := time.Time{}
|
lastReplicaHeal := time.Time{}
|
||||||
|
lastIngressHeal := time.Time{}
|
||||||
for {
|
for {
|
||||||
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
||||||
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
||||||
@ -2525,6 +2526,7 @@ func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error {
|
|||||||
o.log.Printf("ingress checklist passed (%s)", detail)
|
o.log.Printf("ingress checklist passed (%s)", detail)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure)
|
||||||
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
||||||
remaining := time.Until(deadline).Round(time.Second)
|
remaining := time.Until(deadline).Round(time.Second)
|
||||||
if remaining < 0 {
|
if remaining < 0 {
|
||||||
@ -2603,6 +2605,166 @@ func (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, erro
|
|||||||
return outHosts, nil
|
return outHosts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) discoverIngressNamespacesForHost(ctx context.Context, host string) ([]string, error) {
|
||||||
|
host = strings.ToLower(strings.TrimSpace(host))
|
||||||
|
if host == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query ingresses: %w", err)
|
||||||
|
}
|
||||||
|
var list ingressList
|
||||||
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode ingresses: %w", err)
|
||||||
|
}
|
||||||
|
namespaces := map[string]struct{}{}
|
||||||
|
for _, item := range list.Items {
|
||||||
|
ns := strings.TrimSpace(item.Metadata.Namespace)
|
||||||
|
if ns == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, rule := range item.Spec.Rules {
|
||||||
|
ruleHost := strings.ToLower(strings.TrimSpace(rule.Host))
|
||||||
|
if ruleHost == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if ruleHost == host {
|
||||||
|
namespaces[ns] = struct{}{}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
outNamespaces := make([]string, 0, len(namespaces))
|
||||||
|
for ns := range namespaces {
|
||||||
|
outNamespaces = append(outNamespaces, ns)
|
||||||
|
}
|
||||||
|
sort.Strings(outNamespaces)
|
||||||
|
return outNamespaces, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) maybeAutoHealIngressHostBackends(ctx context.Context, lastAttempt *time.Time, failureDetail string) {
|
||||||
|
if o.runner.DryRun {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
host := o.checklistFailureHost(failureDetail)
|
||||||
|
if host == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 45*time.Second {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lastAttempt != nil {
|
||||||
|
*lastAttempt = now
|
||||||
|
}
|
||||||
|
healed, err := o.healIngressHostBackendReplicas(ctx, host)
|
||||||
|
if err != nil {
|
||||||
|
o.log.Printf("warning: ingress host auto-heal failed for %s: %v", host, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(healed) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sort.Strings(healed)
|
||||||
|
detail := fmt.Sprintf("restored ingress backend replicas for %s: %s", host, joinLimited(healed, 8))
|
||||||
|
o.log.Printf("%s", detail)
|
||||||
|
o.noteStartupAutoHeal(detail)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) checklistFailureHost(failureDetail string) string {
|
||||||
|
prefix := strings.TrimSpace(failureDetail)
|
||||||
|
if idx := strings.Index(prefix, ":"); idx > 0 {
|
||||||
|
prefix = strings.TrimSpace(prefix[:idx])
|
||||||
|
}
|
||||||
|
if isLikelyHostname(prefix) {
|
||||||
|
return strings.ToLower(prefix)
|
||||||
|
}
|
||||||
|
for _, check := range o.cfg.Startup.ServiceChecklist {
|
||||||
|
name := strings.TrimSpace(check.Name)
|
||||||
|
if !strings.EqualFold(name, prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
host := hostFromURL(check.URL)
|
||||||
|
if host != "" {
|
||||||
|
return strings.ToLower(host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if host := hostFromURL(prefix); host != "" {
|
||||||
|
return strings.ToLower(host)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func hostFromURL(raw string) string {
|
||||||
|
parsed, err := neturl.Parse(strings.TrimSpace(raw))
|
||||||
|
if err != nil || parsed == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(parsed.Hostname())
|
||||||
|
}
|
||||||
|
|
||||||
|
func isLikelyHostname(value string) bool {
|
||||||
|
value = strings.TrimSpace(value)
|
||||||
|
if value == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if strings.Contains(value, " ") || strings.Contains(value, "/") {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return strings.Contains(value, ".")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error) {
|
||||||
|
namespaces, err := o.discoverIngressNamespacesForHost(ctx, host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(namespaces) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
targetNamespaces := makeStringSet(namespaces)
|
||||||
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query workloads: %w", err)
|
||||||
|
}
|
||||||
|
var list workloadList
|
||||||
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode workloads: %w", err)
|
||||||
|
}
|
||||||
|
healed := []string{}
|
||||||
|
for _, item := range list.Items {
|
||||||
|
kind := strings.ToLower(strings.TrimSpace(item.Kind))
|
||||||
|
ns := strings.TrimSpace(item.Metadata.Namespace)
|
||||||
|
name := strings.TrimSpace(item.Metadata.Name)
|
||||||
|
if kind == "" || ns == "" || name == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if kind != "deployment" && kind != "statefulset" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := targetNamespaces[ns]; !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
desired := int32(1)
|
||||||
|
if item.Spec.Replicas != nil {
|
||||||
|
desired = *item.Spec.Replicas
|
||||||
|
}
|
||||||
|
if desired >= 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
workload := startupWorkload{Namespace: ns, Kind: kind, Name: name}
|
||||||
|
if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil {
|
||||||
|
if isNotFoundErr(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", ns, kind, name, err)
|
||||||
|
}
|
||||||
|
healed = append(healed, ns+"/"+kind+"/"+name)
|
||||||
|
}
|
||||||
|
return healed, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
|
func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
|
||||||
wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second
|
wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second
|
||||||
if wait <= 0 {
|
if wait <= 0 {
|
||||||
@ -2617,6 +2779,7 @@ func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
|
|||||||
lastLogged := time.Time{}
|
lastLogged := time.Time{}
|
||||||
lastRecycleAttempt := time.Time{}
|
lastRecycleAttempt := time.Time{}
|
||||||
lastReplicaHeal := time.Time{}
|
lastReplicaHeal := time.Time{}
|
||||||
|
lastIngressHeal := time.Time{}
|
||||||
for {
|
for {
|
||||||
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
||||||
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
||||||
@ -2627,6 +2790,7 @@ func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
|
|||||||
o.log.Printf("external service checklist passed (%s)", detail)
|
o.log.Printf("external service checklist passed (%s)", detail)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure)
|
||||||
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
||||||
remaining := time.Until(deadline).Round(time.Second)
|
remaining := time.Until(deadline).Round(time.Second)
|
||||||
if remaining < 0 {
|
if remaining < 0 {
|
||||||
|
|||||||
@ -216,6 +216,54 @@ func TestServiceCheckReadyBodyContainsIgnoresWhitespace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestChecklistFailureHostFromIngressDetail(t *testing.T) {
|
||||||
|
orch := &Orchestrator{}
|
||||||
|
got := orch.checklistFailureHost("cloud.bstein.dev: unexpected status code=500")
|
||||||
|
if got != "cloud.bstein.dev" {
|
||||||
|
t.Fatalf("expected host cloud.bstein.dev, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChecklistFailureHostFromServiceCheckName(t *testing.T) {
|
||||||
|
orch := &Orchestrator{
|
||||||
|
cfg: config.Config{
|
||||||
|
Startup: config.Startup{
|
||||||
|
ServiceChecklist: []config.ServiceChecklistCheck{
|
||||||
|
{
|
||||||
|
Name: "harbor-registry",
|
||||||
|
URL: "https://registry.bstein.dev/v2/",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
got := orch.checklistFailureHost("harbor-registry: unexpected status code=404")
|
||||||
|
if got != "registry.bstein.dev" {
|
||||||
|
t.Fatalf("expected host registry.bstein.dev, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestChecklistFailureHostUnknown(t *testing.T) {
|
||||||
|
orch := &Orchestrator{
|
||||||
|
cfg: config.Config{
|
||||||
|
Startup: config.Startup{
|
||||||
|
ServiceChecklist: []config.ServiceChecklistCheck{
|
||||||
|
{
|
||||||
|
Name: "grafana-api",
|
||||||
|
URL: "https://metrics.bstein.dev/api/health",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if got := orch.checklistFailureHost("grafana-api: tcp timeout"); got != "metrics.bstein.dev" {
|
||||||
|
t.Fatalf("expected metrics host from configured URL, got %q", got)
|
||||||
|
}
|
||||||
|
if got := orch.checklistFailureHost("some-unmapped-check: fail"); got != "" {
|
||||||
|
t.Fatalf("expected empty host for unknown check, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestStuckVaultInitReasonDetectsHungInit(t *testing.T) {
|
func TestStuckVaultInitReasonDetectsHungInit(t *testing.T) {
|
||||||
var pod podResource
|
var pod podResource
|
||||||
pod.Status.Phase = "Pending"
|
pod.Status.Phase = "Pending"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user