Compare commits

...

2 Commits

Author SHA1 Message Date
codex
57610c623a docs: shorten ananke README 2026-06-19 15:43:49 -03:00
codex
85c0741b3e recovery: expire automatic node cordons 2026-06-19 15:43:44 -03:00
22 changed files with 1643 additions and 1166 deletions

160
README.md
View File

@ -1,71 +1,32 @@
# ananke
Ananke is the host-side recovery orchestrator for Titan power events.
Ananke is the thing that gets Atlas back on its feet after power trouble.
It runs outside Kubernetes (systemd on host), so it can:
- shut the cluster down gracefully before runtime gets dangerous
- bootstrap the cluster after power is restored
- break known startup deadlocks (including Flux + in-cluster Gitea coupling)
- verify real service availability before declaring startup complete
It runs on the host, outside Kubernetes, because some failures start before the
cluster is healthy enough to fix itself. Its job is to bring nodes, Flux, core
workloads, ingresses, and service checks back into a known-good state.
The goal is not clever automation. The goal is boring, repeatable recovery.
It is deliberately boring software: do the checks, repair the known deadlocks,
and stop loudly when a human needs to touch hardware.
## Why `ananke`
## How it works
In Greek myth, **Ananke** is inevitability and necessity.
That is the exact constraint we operate under during outages and drills.
Ananke reads `/etc/ananke/ananke.yaml`, then walks the cluster through startup or
shutdown gates:
Power-domain names in this lab align with that naming:
- `Statera` UPS: `titan-23`, `titan-24`, `titan-jh`
- `Pyrphoros` UPS: all other nodes
- confirm the expected nodes and SSH access
- check that Flux is looking at the right repo and branch
- wait for required Flux kustomizations and namespaces
- repair known startup traps, including Harbor/Gitea/Flux coupling
- run ingress, service, endpoint, and soak checks before calling startup done
## Operating model (non-negotiable)
Recovery cordons are now treated as short leases. If Ananke cordons a node to
repair something, it must either clear the cordon within the configured window
or mark the node for manual action. The default window is one hour.
- Ananke does **cluster orchestration**, not host power control.
- Shutdown defaults to `cluster-only` and should remain that way for normal drills.
- Physical outages can cut host power themselves; Anankes job is clean state transitions.
## Daily commands
Flux source of truth remains `titan-iac.git`.
Anankes own repo (`ananke.git`) is software only; it is not the desired-state cluster config repo.
## Breakglass reminder
Vault breakglass is available through a remote Magic Mirror path.
If standard unseal retrieval fails, use `startup.vault_unseal_breakglass_command`.
## What "startup complete" means
Startup is complete only after all required gates pass:
- inventory mapping is valid
- expected SSH nodes are reachable/authenticated (minus explicit ignores)
- Flux source drift guard passes (expected URL + branch)
- required Flux kustomizations are healthy
- workload convergence is healthy
- ingress checklist passes
- service checklist passes (internal + externally exposed)
- critical endpoint checks pass
- stability soak passes with no regressions
If a manual intervention is needed during a drill, that is treated as an Ananke gap and must be encoded back into Ananke logic.
## Status and reports
Live status:
- `ananke status --config /etc/ananke/ananke.yaml`
- `ananke status --config /etc/ananke/ananke.yaml --json`
Artifacts:
- `/var/lib/ananke/startup-progress.json` (live run progress)
- `/var/lib/ananke/last-startup-report.json`
- `/var/lib/ananke/last-shutdown-report.json`
- `/var/lib/ananke/reports/*.json` (historical per-run reports)
- `/var/lib/ananke/runs.json` (timing history)
- `/var/lib/ananke/update-last.env` (latest self-update result)
- `/var/log/ananke/update.log` (self-update execution log)
## Quick commands
From `titan-db`:
Run these on `titan-db` unless you know you are using the `tethys` peer:
```bash
sudo /usr/local/bin/ananke status --config /etc/ananke/ananke.yaml
@ -73,76 +34,21 @@ sudo /usr/local/bin/ananke startup --config /etc/ananke/ananke.yaml --execute --
sudo /usr/local/bin/ananke shutdown --config /etc/ananke/ananke.yaml --execute --reason graceful-maintenance --mode cluster-only
```
From `titan-24` (`tethys` peer):
Useful files:
- `/var/lib/ananke/startup-progress.json`
- `/var/lib/ananke/last-startup-report.json`
- `/var/lib/ananke/last-shutdown-report.json`
- `/var/log/ananke/update.log`
## Development
Run the full local check before installing:
```bash
sudo /usr/local/bin/ananke shutdown --config /etc/ananke/ananke.yaml --execute --reason graceful-maintenance --mode cluster-only
./scripts/quality_gate.sh
```
Systemd control:
```bash
sudo systemctl status ananke.service
sudo systemctl start ananke-bootstrap.service
sudo systemctl start ananke-update.service
sudo cat /var/lib/ananke/update-last.env
sudo tail -n 200 /var/log/ananke/update.log
```
## Config
Primary config path:
- `/etc/ananke/ananke.yaml`
Keep these fields accurate:
- `expected_flux_source_url`
- `expected_flux_branch`
- `startup.service_checklist_explicit_only`
- `startup.service_checklist`
- `startup.critical_service_endpoints`
- `startup.require_ingress_checklist`
- `startup.require_node_inventory_reachability`
- `startup.node_inventory_reachability_required_nodes`
- `startup.node_ssh_auth_required_nodes`
- `startup.flux_health_required_kustomizations`
- `startup.workload_convergence_required_namespaces`
- `startup.ignore_unavailable_nodes`
- `coordination.role`
- `coordination.peer_hosts`
## Quality gate
Top-level quality/testing module:
- `testing/`
Deployment gate script:
- `scripts/quality_gate.sh`
Gate order:
1. docs contract checks
2. split test-module contract (`cmd/` + `internal/` cannot grow new in-tree `_test.go` files)
3. naming + LOC hygiene checks
4. pedantic lint
5. per-file coverage gate (95% minimum)
Current migration rule:
- keep new tests in the top-level `testing/` module
- legacy in-tree `_test.go` files are temporarily grandfathered through `testing/hygiene/in_tree_test_allowlist.txt` until they are migrated safely
Installer behavior:
- `scripts/install.sh` runs the quality gate by default
- override only for emergency break/fix: `ANANKE_ENFORCE_QUALITY_GATE=0`
- host quality runs keep writing local `ananke_quality_gate_*` metrics and also publish `platform_quality_gate_runs_total{suite="ananke",status=*}` to Pushgateway for shared Grafana panels
- override the Pushgateway target when running outside cluster DNS: `ANANKE_QUALITY_PUSHGATEWAY_URL=http://... ./scripts/quality_gate.sh`
## Growing with the lab
When adding nodes or services:
1. Update inventory and node mapping in config.
2. Keep the explicit service checklist focused on the core services that must come back during an outage.
3. Keep `*_required_*` startup scopes aligned with the same core set so optional stacks do not block bootstrap.
4. Add/adjust ingress expectations for exposed services.
5. Use temporary ignores only when truly intentional, then remove them.
6. Run `scripts/quality_gate.sh` before host deployment.
Recovery quality should improve over time: every drill should reduce manual work in the next drill.
Emergency installs can bypass the gate with
`ANANKE_ENFORCE_QUALITY_GATE=0`, but that should stay rare. If a recovery drill
needed manual work, the follow-up belongs in Ananke so the next one is cleaner.

View File

@ -156,6 +156,7 @@ startup:
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180
post_start_auto_heal_seconds: 60
recovery_cordon_max_seconds: 3600
dead_node_cleanup_grace_seconds: 300
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: ""

View File

@ -291,6 +291,7 @@ startup:
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180
post_start_auto_heal_seconds: 60
recovery_cordon_max_seconds: 3600
dead_node_cleanup_grace_seconds: 300
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/tethys/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'"

View File

@ -291,6 +291,7 @@ startup:
scheduling_storm_window_seconds: 180
stuck_pod_grace_seconds: 180
post_start_auto_heal_seconds: 60
recovery_cordon_max_seconds: 3600
dead_node_cleanup_grace_seconds: 300
vault_unseal_key_file: /var/lib/ananke/vault-unseal.key
vault_unseal_breakglass_command: "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new -i /home/atlas/.ssh/id_ed25519 -p 1122 brad@99.183.132.163 'cat ~/.ananke-breakglass/vault-unseal.key'"

View File

@ -10,20 +10,27 @@ import (
)
type nodeReadyList struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
Spec struct {
Unschedulable bool `json:"unschedulable"`
} `json:"spec"`
Status struct {
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
} `json:"conditions"`
} `json:"status"`
} `json:"items"`
Items []nodeReadyItem `json:"items"`
}
type nodeReadyItem struct {
Metadata struct {
Name string `json:"name"`
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
Spec struct {
Unschedulable bool `json:"unschedulable"`
Taints []struct {
Key string `json:"key"`
TimeAdded time.Time `json:"timeAdded"`
} `json:"taints"`
} `json:"spec"`
Status struct {
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
} `json:"conditions"`
} `json:"status"`
}
type readyNodeCandidate struct {
@ -68,6 +75,13 @@ func (o *Orchestrator) postStartAutoHeal(ctx context.Context) error {
errs = append(errs, fmt.Sprintf("required node labels: %v", err))
}
releasedCordons, err := o.enforceRecoveryCordonLeases(ctx)
if err != nil {
errs = append(errs, fmt.Sprintf("recovery cordon lease check: %v", err))
} else if releasedCordons > 0 {
requestReconcile = true
}
vaultRecovered, err := o.autoRecoverSealedVault(ctx)
if err != nil {
errs = append(errs, fmt.Sprintf("vault auto-recovery: %v", err))
@ -252,7 +266,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err
}
if !node.Unschedulable {
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node.Name); err != nil {
if err := o.cordonNodeWithLease(ctx, node.Name, cordonReasonKubeletProxy, "broken kubelet proxy before k3s-agent restart"); err != nil {
errs = append(errs, fmt.Sprintf("%s cordon before kubelet restart: %v", node.Name, err))
continue
}
@ -262,8 +276,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err
if _, err := o.sshWithTimeout(ctx, node.Name, "sudo -n systemctl restart k3s-agent", 90*time.Second); err != nil {
if !node.Unschedulable {
o.bestEffort("uncordon node after failed kubelet proxy repair", func() error {
_, uncordonErr := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name)
return uncordonErr
return o.uncordonAndClearCordonLease(ctx, node.Name, cordonReasonKubeletProxy)
})
}
errs = append(errs, fmt.Sprintf("%s restart k3s-agent: %v", node.Name, err))
@ -279,7 +292,7 @@ func (o *Orchestrator) repairBrokenKubeletProxies(ctx context.Context) (int, err
continue
}
if !node.Unschedulable {
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node.Name); err != nil {
if err := o.uncordonAndClearCordonLease(ctx, node.Name, cordonReasonKubeletProxy); err != nil {
errs = append(errs, fmt.Sprintf("%s uncordon after kubelet proxy repair: %v", node.Name, err))
continue
}

View File

@ -0,0 +1,339 @@
package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
)
const (
anankeCordonOwnerAnnotation = "ananke.bstein.dev/cordon-owner"
anankeCordonReasonAnnotation = "ananke.bstein.dev/cordon-reason"
anankeCordonDetailAnnotation = "ananke.bstein.dev/cordon-detail"
anankeCordonCreatedAnnotation = "ananke.bstein.dev/cordon-created-at"
anankeCordonDeadlineAnnotation = "ananke.bstein.dev/cordon-deadline"
anankeCordonManualActionAnnotation = "ananke.bstein.dev/manual-action"
anankeCordonManualAtAnnotation = "ananke.bstein.dev/manual-action-at"
)
const (
cordonReasonMissingCryptsetup = "missing-cryptsetup"
cordonReasonRuntimeWedge = "container-runtime-wedge"
cordonReasonKubeletProxy = "kubelet-proxy-repair"
)
// cordonNodeWithLease cordons a node and records an expiry-bound owner/reason.
// Signature: (o *Orchestrator) cordonNodeWithLease(ctx context.Context, node, reason, detail string) error.
// Why: recovery cordons must be accountable leases so a node cannot be stranded
// indefinitely after an automatic repair attempt.
func (o *Orchestrator) cordonNodeWithLease(ctx context.Context, node string, reason string, detail string) error {
node = strings.TrimSpace(node)
reason = strings.TrimSpace(reason)
detail = sanitizeCordonAnnotationValue(detail)
if node == "" {
return fmt.Errorf("node must not be empty")
}
if reason == "" {
reason = "recovery"
}
now := time.Now().UTC()
deadline := now.Add(o.recoveryCordonMaxDuration())
if err := o.annotateCordonLease(ctx, node, reason, detail, now, deadline); err != nil {
return err
}
if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil {
o.bestEffort("clear cordon lease after cordon failure", func() error {
return o.clearCordonLease(ctx, node)
})
return err
}
o.log.Printf("cordoned node %s with ananke lease reason=%s deadline=%s", node, reason, deadline.Format(time.RFC3339))
return nil
}
// annotateCordonLease records why Ananke is cordoning a node and when it expires.
// Signature: (o *Orchestrator) annotateCordonLease(ctx context.Context, node, reason, detail string, created, deadline time.Time) error.
// Why: the annotations are the durable handoff between startup recovery and the
// daemon's post-start reconciliation loop.
func (o *Orchestrator) annotateCordonLease(ctx context.Context, node string, reason string, detail string, created time.Time, deadline time.Time) error {
args := []string{
"annotate", "node", node,
anankeCordonOwnerAnnotation + "=ananke",
anankeCordonReasonAnnotation + "=" + reason,
anankeCordonDetailAnnotation + "=" + detail,
anankeCordonCreatedAnnotation + "=" + created.Format(time.RFC3339),
anankeCordonDeadlineAnnotation + "=" + deadline.Format(time.RFC3339),
"--overwrite",
}
if _, err := o.kubectl(ctx, 20*time.Second, args...); err != nil {
return fmt.Errorf("annotate cordon lease on %s: %w", node, err)
}
return nil
}
// clearCordonLease removes Ananke's cordon ownership markers from a node.
// Signature: (o *Orchestrator) clearCordonLease(ctx context.Context, node string) error.
// Why: once a node is safe to schedule again, stale lease annotations should not
// keep paging operators or confusing later recovery loops.
func (o *Orchestrator) clearCordonLease(ctx context.Context, node string) error {
args := []string{
"annotate", "node", node,
anankeCordonOwnerAnnotation + "-",
anankeCordonReasonAnnotation + "-",
anankeCordonDetailAnnotation + "-",
anankeCordonCreatedAnnotation + "-",
anankeCordonDeadlineAnnotation + "-",
anankeCordonManualActionAnnotation + "-",
anankeCordonManualAtAnnotation + "-",
"--overwrite",
}
if _, err := o.kubectl(ctx, 20*time.Second, args...); err != nil && !isNotFoundErr(err) {
return fmt.Errorf("clear cordon lease on %s: %w", node, err)
}
return nil
}
// uncordonAndClearCordonLease returns a recovered node to service.
// Signature: (o *Orchestrator) uncordonAndClearCordonLease(ctx context.Context, node, reason string) error.
// Why: the uncordon and annotation cleanup should happen as one logical recovery
// action so leased cordons do not linger after the hazard is gone.
func (o *Orchestrator) uncordonAndClearCordonLease(ctx context.Context, node string, reason string) error {
if _, err := o.kubectl(ctx, 30*time.Second, "uncordon", node); err != nil {
return fmt.Errorf("uncordon %s after %s recovery: %w", node, reason, err)
}
if err := o.clearCordonLease(ctx, node); err != nil {
o.log.Printf("warning: node %s uncordoned but lease cleanup failed: %v", node, err)
}
o.log.Printf("uncordoned node %s after recovering leased cordon reason=%s", node, reason)
o.noteStartupAutoHeal(fmt.Sprintf("uncordoned %s after recovering %s", node, reason))
return nil
}
// enforceRecoveryCordonLeases reconciles Ananke-created cordons and reports stale
// manual cordons.
// Signature: (o *Orchestrator) enforceRecoveryCordonLeases(ctx context.Context) (int, error).
// Why: automatic recovery can temporarily cordon a node, but it must either clear
// that cordon or tell the operator before the resource is stranded.
func (o *Orchestrator) enforceRecoveryCordonLeases(ctx context.Context) (int, error) {
nodes, err := o.queryReadyNodes(ctx)
if err != nil {
return 0, err
}
maxAge := o.recoveryCordonMaxDuration()
now := time.Now().UTC()
released := 0
errs := []string{}
for _, node := range nodes.Items {
name := strings.TrimSpace(node.Metadata.Name)
if name == "" {
continue
}
ann := node.Metadata.Annotations
if !node.Spec.Unschedulable {
if ann[anankeCordonOwnerAnnotation] == "ananke" {
if err := o.clearCordonLease(ctx, name); err != nil {
errs = append(errs, err.Error())
}
}
continue
}
if ann[anankeCordonOwnerAnnotation] != "ananke" {
cordonTime := nodeUnschedulableSince(node)
if !cordonTime.IsZero() && now.Sub(cordonTime) > maxAge {
message := fmt.Sprintf("manual action required: node %s has an unowned cordon older than %s; Ananke will not silently leave resources stranded or override a non-Ananke cordon", name, maxAge)
o.markManualActionRequired(ctx, name, message)
errs = append(errs, message)
}
continue
}
recovered, recoverErr := o.recoverLeasedCordon(ctx, name, ann)
if recovered {
released++
continue
}
if recoverErr == nil {
continue
}
expired, deadlineText := cordonLeaseExpired(ann, now)
if expired {
message := fmt.Sprintf("manual action required: node %s remains cordoned after Ananke repair lease expired at %s: %v", name, deadlineText, recoverErr)
o.markManualActionRequired(ctx, name, message)
errs = append(errs, message)
} else {
o.log.Printf("leased cordon on %s still pending repair before %s: %v", name, deadlineText, recoverErr)
}
}
if len(errs) > 0 {
return released, errors.New(strings.Join(errs, "; "))
}
return released, nil
}
// recoverLeasedCordon tries the reason-specific repair for an Ananke cordon.
// Signature: (o *Orchestrator) recoverLeasedCordon(ctx context.Context, node string, annotations map[string]string) (bool, error).
// Why: each cordon reason has a different safety gate; a node only comes back when
// the original hazard is known to be gone.
func (o *Orchestrator) recoverLeasedCordon(ctx context.Context, node string, annotations map[string]string) (bool, error) {
reason := strings.TrimSpace(annotations[anankeCordonReasonAnnotation])
switch reason {
case cordonReasonMissingCryptsetup:
if err := o.ensureHostCryptsetup(ctx, node); err != nil {
return false, fmt.Errorf("cryptsetup repair still failing: %w", err)
}
if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil {
return false, err
}
return true, nil
case cordonReasonRuntimeWedge:
stillWedged, err := o.nodeStillHasRuntimeWedge(ctx, node)
if err != nil {
return false, err
}
if stillWedged {
return false, fmt.Errorf("container runtime wedge evidence is still present")
}
if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil {
return false, err
}
return true, nil
case cordonReasonKubeletProxy:
healthy, err := o.kubeletProxyHealthy(ctx, node)
if !healthy {
return false, fmt.Errorf("kubelet proxy still unhealthy: %v", err)
}
if err := o.uncordonAndClearCordonLease(ctx, node, reason); err != nil {
return false, err
}
return true, nil
default:
return false, fmt.Errorf("unknown Ananke cordon reason %q", reason)
}
}
// nodeStillHasRuntimeWedge checks whether the original runtime-wedge symptom remains.
// Signature: (o *Orchestrator) nodeStillHasRuntimeWedge(ctx context.Context, node string) (bool, error).
// Why: a runtime-wedge cordon can be released once the cluster no longer shows
// repeated controller-owned, non-PVC pod start failures on that node.
func (o *Orchestrator) nodeStillHasRuntimeWedge(ctx context.Context, node string) (bool, error) {
out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
if err != nil {
return false, fmt.Errorf("query pods for runtime-wedge recovery: %w", err)
}
var list podList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return false, fmt.Errorf("decode pods for runtime-wedge recovery: %w", err)
}
grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second
if grace <= 0 {
grace = 180 * time.Second
}
reasons, err := o.containerRuntimeWedgePodReasons(ctx, list, grace)
if err != nil {
return false, err
}
for _, pod := range list.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
if ns == "" || name == "" || strings.TrimSpace(pod.Spec.NodeName) != node {
continue
}
if reasons[ns+"/"+name] == "" {
continue
}
if podUsesPersistentVolumeClaim(pod) || !podControllerOwned(pod) {
continue
}
return true, nil
}
return false, nil
}
// queryReadyNodes fetches nodes using the shared light-weight node shape.
// Signature: (o *Orchestrator) queryReadyNodes(ctx context.Context) (nodeReadyList, error).
// Why: lease checks need annotations and taint timestamps in addition to Ready state.
func (o *Orchestrator) queryReadyNodes(ctx context.Context) (nodeReadyList, error) {
out, err := o.kubectl(ctx, 20*time.Second, "get", "nodes", "-o", "json")
if err != nil {
return nodeReadyList{}, fmt.Errorf("query nodes: %w", err)
}
var nodes nodeReadyList
if err := json.Unmarshal([]byte(out), &nodes); err != nil {
return nodeReadyList{}, fmt.Errorf("decode nodes: %w", err)
}
return nodes, nil
}
// markManualActionRequired makes a stale cordon visible on the node object itself.
// Signature: (o *Orchestrator) markManualActionRequired(ctx context.Context, node, message string).
// Why: daemon logs are easy to miss during recovery; the node should carry the
// operator-facing action that prevented automatic release.
func (o *Orchestrator) markManualActionRequired(ctx context.Context, node string, message string) {
now := time.Now().UTC().Format(time.RFC3339)
message = sanitizeCordonAnnotationValue(message)
if _, err := o.kubectl(
ctx,
20*time.Second,
"annotate", "node", node,
anankeCordonManualActionAnnotation+"="+message,
anankeCordonManualAtAnnotation+"="+now,
"--overwrite",
); err != nil {
o.log.Printf("warning: mark manual action required on %s failed: %v", node, err)
}
}
// recoveryCordonMaxDuration returns the maximum allowed automatic cordon lease.
// Signature: (o *Orchestrator) recoveryCordonMaxDuration() time.Duration.
// Why: all recovery cordon decisions should share the same operator promise.
func (o *Orchestrator) recoveryCordonMaxDuration() time.Duration {
seconds := o.cfg.Startup.RecoveryCordonMaxSeconds
if seconds <= 0 {
seconds = 3600
}
return time.Duration(seconds) * time.Second
}
// cordonLeaseExpired reports whether a stored cordon lease is past deadline.
// Signature: cordonLeaseExpired(annotations map[string]string, now time.Time) (bool, string).
// Why: malformed or missing deadlines should fail closed so Ananke does not
// treat an indefinite cordon as healthy automation.
func cordonLeaseExpired(annotations map[string]string, now time.Time) (bool, string) {
rawDeadline := strings.TrimSpace(annotations[anankeCordonDeadlineAnnotation])
if rawDeadline == "" {
return true, "missing deadline"
}
deadline, err := time.Parse(time.RFC3339, rawDeadline)
if err != nil {
return true, rawDeadline
}
return !now.Before(deadline), deadline.Format(time.RFC3339)
}
// nodeUnschedulableSince reads the timestamp Kubernetes recorded for a cordon.
// Signature: nodeUnschedulableSince(node nodeReadyItem) time.Time.
// Why: unowned cordons need an age check before Ananke escalates them to manual
// action.
func nodeUnschedulableSince(node nodeReadyItem) time.Time {
for _, taint := range node.Spec.Taints {
if taint.Key == "node.kubernetes.io/unschedulable" {
return taint.TimeAdded
}
}
return time.Time{}
}
// sanitizeCordonAnnotationValue keeps node annotation values compact.
// Signature: sanitizeCordonAnnotationValue(value string) string.
// Why: recovery details are operator-facing, but Kubernetes annotation values
// should stay short enough to remain readable in node listings.
func sanitizeCordonAnnotationValue(value string) string {
value = strings.Join(strings.Fields(strings.TrimSpace(value)), " ")
if len(value) <= 240 {
return value
}
return value[:240]
}

View File

@ -0,0 +1,143 @@
package cluster
import (
"context"
"errors"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
)
// TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode runs one orchestration or CLI step.
// Signature: TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode(t *testing.T).
// Why: a cryptsetup safety cordon should clear automatically once the host
// prerequisite is actually present.
func TestEnforceRecoveryCordonLeasesUncordonsRecoveredCryptsetupNode(t *testing.T) {
now := time.Now().UTC()
nodeJSON := leasedNodeJSON(
"titan-15",
cordonReasonMissingCryptsetup,
now.Add(-10*time.Minute),
now.Add(50*time.Minute),
)
uncordoned := false
cleared := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{RecoveryCordonMaxSeconds: 3600},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON},
{match: matchContains("ssh", "cryptsetup"), out: "__ANANKE_CRYPTSETUP_PRESENT__"},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "uncordon", "titan-15")(name, args) {
return false
}
uncordoned = true
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "annotate", "node", "titan-15", anankeCordonOwnerAnnotation+"-")(name, args) {
return false
}
cleared = true
return true
},
},
})
released, err := orch.enforceRecoveryCordonLeases(context.Background())
if err != nil {
t.Fatalf("enforceRecoveryCordonLeases failed: %v", err)
}
if released != 1 || !uncordoned || !cleared {
t.Fatalf("expected recovered cordon release, released=%d uncordoned=%v cleared=%v", released, uncordoned, cleared)
}
}
// TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon runs one orchestration or CLI step.
// Signature: TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon(t *testing.T).
// Why: Ananke must not overwrite a manual cordon, but it also must not let it sit
// silently beyond the recovery lease window.
func TestEnforceRecoveryCordonLeasesReportsStaleUnownedCordon(t *testing.T) {
old := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339)
nodeJSON := `{"items":[{"metadata":{"name":"titan-18","annotations":{}},"spec":{"unschedulable":true,"taints":[{"key":"node.kubernetes.io/unschedulable","timeAdded":"` + old + `"}]},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`
manualMarked := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{RecoveryCordonMaxSeconds: 3600},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "annotate", "node", "titan-18", anankeCordonManualActionAnnotation+"=")(name, args) {
return false
}
manualMarked = true
return true
},
},
})
released, err := orch.enforceRecoveryCordonLeases(context.Background())
if released != 0 || err == nil || !strings.Contains(err.Error(), "unowned cordon") {
t.Fatalf("expected stale unowned cordon error, released=%d err=%v", released, err)
}
if !manualMarked {
t.Fatalf("expected manual-action annotation")
}
}
// TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair runs one orchestration or CLI step.
// Signature: TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair(t *testing.T).
// Why: after the lease expires, a failed automatic repair must become a clear
// manual action rather than another quiet retry.
func TestEnforceRecoveryCordonLeasesEscalatesExpiredCryptsetupRepair(t *testing.T) {
now := time.Now().UTC()
nodeJSON := leasedNodeJSON(
"titan-17",
cordonReasonMissingCryptsetup,
now.Add(-2*time.Hour),
now.Add(-time.Hour),
)
manualMarked := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{RecoveryCordonMaxSeconds: 3600},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: nodeJSON},
{match: matchContains("ssh", "cryptsetup"), err: errors.New("sudo rejected")},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "annotate", "node", "titan-17", anankeCordonManualActionAnnotation+"=")(name, args) {
return false
}
manualMarked = true
return true
},
},
})
released, err := orch.enforceRecoveryCordonLeases(context.Background())
if released != 0 || err == nil || !strings.Contains(err.Error(), "manual action required") {
t.Fatalf("expected expired lease manual action, released=%d err=%v", released, err)
}
if !manualMarked {
t.Fatalf("expected manual-action annotation")
}
}
// leasedNodeJSON builds a minimal node-list payload with an Ananke cordon lease.
// Signature: leasedNodeJSON(node, reason string, created, deadline time.Time) string.
// Why: the lease tests only need node annotations, cordon state, and Ready
// condition, so a small fixture keeps intent clear.
func leasedNodeJSON(node string, reason string, created time.Time, deadline time.Time) string {
return `{"items":[{"metadata":{"name":"` + node + `","annotations":{"` +
anankeCordonOwnerAnnotation + `":"ananke","` +
anankeCordonReasonAnnotation + `":"` + reason + `","` +
anankeCordonCreatedAnnotation + `":"` + created.Format(time.RFC3339) + `","` +
anankeCordonDeadlineAnnotation + `":"` + deadline.Format(time.RFC3339) +
`"}},"spec":{"unschedulable":true,"taints":[{"key":"node.kubernetes.io/unschedulable","timeAdded":"` + created.Format(time.RFC3339) +
`"}]},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`
}

View File

@ -0,0 +1,99 @@
package cluster
import (
"context"
"errors"
"io"
"log"
"path/filepath"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// TestCriticalEndpointHelpers runs one orchestration or CLI step.
// Signature: TestCriticalEndpointHelpers(t *testing.T).
// Why: covers critical endpoint parsing and readiness checks that gate startup completion.
func TestCriticalEndpointHelpers(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"},
})
ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background())
if err != nil || !ok {
t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err)
}
if detail != "services=1" {
t.Fatalf("unexpected readiness detail: %q", detail)
}
gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server")
if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" {
t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err)
}
if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil {
t.Fatalf("expected parseCriticalServiceEndpoint error")
}
}
// TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step.
// Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T).
// Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding.
func TestCriticalEndpointAutoHealWorkflow(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpointWaitSec: 2,
CriticalServiceEndpointPollSec: 1,
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
State: config.State{
Dir: t.TempDir(),
ReportsDir: filepath.Join(t.TempDir(), "reports"),
RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"),
},
}
orch := &Orchestrator{
cfg: cfg,
runner: &execx.Runner{},
store: state.New(cfg.State.RunHistoryPath),
log: log.New(io.Discard, "", 0),
}
endpointChecks := 0
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
joined := name + " " + strings.Join(args, " ")
if strings.Contains(joined, "get endpoints victoria-metrics-single-server") {
endpointChecks++
if endpointChecks == 1 {
return "", nil
}
return "10.42.0.10\n", nil
}
if strings.Contains(joined, "scale deployment victoria-metrics-single-server") {
return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`)
}
if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") {
return "", nil
}
if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") {
return "statefulset rolled out", nil
}
return "", nil
}
orch.runOverride = dispatch
orch.runSensitiveOverride = dispatch
if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil {
t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err)
}
if endpointChecks < 2 {
t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks)
}
}

View File

@ -7,7 +7,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
@ -465,43 +464,3 @@ func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) {
}
return key, nil
}
// workloadReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
out, err := o.kubectl(
ctx,
20*time.Second,
"-n",
w.Namespace,
"get",
w.Kind,
w.Name,
"-o",
"jsonpath={.status.readyReplicas}",
)
if err != nil {
return false, err
}
raw := strings.TrimSpace(out)
if raw == "" || raw == "<no value>" {
return false, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
}
return n >= 1, nil
}
// isNotFoundErr runs one orchestration or CLI step.
// Signature: isNotFoundErr(err error) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func isNotFoundErr(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
}

View File

@ -446,75 +446,3 @@ func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions)
}
return nil
}
// Shutdown runs one orchestration or CLI step.
// Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
if invErr := o.validateNodeInventory(); invErr != nil {
return invErr
}
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
Action: "shutdown",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
return fmt.Errorf("set shutdown intent: %w", writeErr)
}
defer func() {
final := state.IntentShuttingDown
if err == nil {
final = state.IntentShutdownComplete
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
}
}()
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, "")
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
if !skipEtcd {
o.bestEffort("etcd snapshot", func() error {
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
})
}
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
if !skipDrain {
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
}
shutdownMode := strings.TrimSpace(opts.Mode)
effectiveMode, modeErr := normalizeShutdownMode(shutdownMode)
if modeErr != nil {
return modeErr
}
o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode)
o.stopWorkers(ctx, workers)
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
o.log.Printf("shutdown flow complete")
return nil
}

View File

@ -0,0 +1,254 @@
package cluster
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)
// repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step.
// Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node
// rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot
// mount encrypted PVCs; installing the missing host tool and recycling the
// controller-owned pod lets kubelet retry the same volume safely.
func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err)
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
podsByKey[ns+"/"+name] = pod
}
if len(podsByKey) == 0 {
return map[string]string{}, nil
}
repairedNodes := map[string]bool{}
reasons := map[string]string{}
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedMount" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[key]
if !ok {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
continue
}
message := strings.ToLower(strings.TrimSpace(event.Message))
if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") {
continue
}
node := strings.TrimSpace(pod.Spec.NodeName)
if node == "" || !o.sshManaged(node) {
o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key)
continue
}
if repaired, ok := repairedNodes[node]; ok {
if repaired {
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
}
continue
}
if err := o.ensureHostCryptsetup(ctx, node); err != nil {
repairedNodes[node] = false
o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err)
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr)
continue
}
reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node
continue
}
repairedNodes[node] = true
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
}
return reasons, nil
}
// ensureHostCryptsetup runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error.
// Why: kubelet's encrypted Longhorn mount helper shells into the host namespace,
// so the package must exist on the node host, not merely inside a workload pod.
func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error {
command := strings.Join([]string{
"set -eu",
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi",
"if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi",
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update",
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin",
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi",
"echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__",
"exit 43",
}, "; ")
out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute)
if err != nil {
return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out))
}
trimmed := strings.TrimSpace(out)
o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, trimmed)
if strings.Contains(trimmed, "__ANANKE_CRYPTSETUP_INSTALLED__") {
o.noteStartupAutoHeal(fmt.Sprintf("installed cryptsetup on %s", node))
}
return nil
}
// cordonNodeForMissingCryptsetup runs one orchestration or CLI step.
// Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error.
// Why: when host package repair is not permitted, cordoning is the safest
// automatic fallback: it prevents new encrypted-volume pods from landing on a
// node kubelet cannot mount from, while leaving existing workloads and storage
// objects untouched.
func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error {
if err := o.cordonNodeWithLease(ctx, node, cordonReasonMissingCryptsetup, "encrypted Longhorn volume mount failed because host cryptsetup is missing"); err != nil {
return err
}
o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node)
o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node))
return nil
}
// longhornAttachBlockedPodReasons runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a
// node Longhorn still marks unready. Recycling the unattached Pending pod lets
// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane
// objects.
func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
unreadyNodes, err := o.longhornUnreadyNodes(ctx)
if err != nil {
return nil, err
}
if len(unreadyNodes) == 0 {
return map[string]string{}, nil
}
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err)
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
if _, unready := unreadyNodes[node]; !unready {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
podsByKey[ns+"/"+name] = pod
}
if len(podsByKey) == 0 {
return map[string]string{}, nil
}
reasons := map[string]string{}
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedAttachVolume" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[key]
if !ok {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
continue
}
node := strings.TrimSpace(pod.Spec.NodeName)
message := strings.ToLower(strings.TrimSpace(event.Message))
if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") {
continue
}
if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") {
continue
}
reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node
}
return reasons, nil
}
// longhornUnreadyNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error).
// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes
// node readiness; attach recovery must use Longhorn's view for safety.
func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) {
out, err := o.kubectl(ctx, 30*time.Second,
"-n", "longhorn-system",
"get", "nodes.longhorn.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}",
)
if err != nil {
if isNotFoundErr(err) {
return map[string]struct{}{}, nil
}
return nil, fmt.Errorf("query longhorn node readiness: %w", err)
}
unready := map[string]struct{}{}
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") {
unready[strings.TrimSpace(fields[0])] = struct{}{}
}
}
return unready, nil
}

View File

@ -0,0 +1,139 @@
package cluster
import (
"context"
"strings"
"time"
)
// staleControllerPodReasons runs one orchestration or CLI step.
// Signature: (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: after node or kubelet recovery, controller-owned pods can stay in
// terminal or unknown status even though the node is Ready and a replacement may
// already be healthy. A normal pod delete lets Kubernetes clean the stale status
// without touching storage objects or forcing deletion on a partitioned node.
func (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
unavailable, err := o.unavailableNodeSet(ctx)
if err != nil {
return nil, err
}
reasons := map[string]string{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
phase := strings.TrimSpace(pod.Status.Phase)
if !strings.EqualFold(phase, "Unknown") && !strings.EqualFold(phase, "Failed") {
continue
}
if _, badNode := unavailable[node]; badNode {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
reasons[ns+"/"+name] = "StaleControllerPodOnReadyNode:" + node + ":" + phase
}
return reasons, nil
}
// staleControllerPodForceDeleteSafe runs one orchestration or CLI step.
// Signature: staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool.
// Why: a stale pod already marked for deletion may need force removal after a
// node outage. Keep that fallback away from PVC-bearing pods so Ananke never
// risks duplicating a storage writer.
func staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool {
if pod.Metadata.DeletionTimestamp == nil {
return false
}
if time.Since(*pod.Metadata.DeletionTimestamp) < grace {
return false
}
if podUsesPersistentVolumeClaim(pod) {
return false
}
return true
}
// podUsesPersistentVolumeClaim runs one orchestration or CLI step.
// Signature: podUsesPersistentVolumeClaim(pod podResource) bool.
// Why: force-delete recovery is deliberately disallowed for pods with PVCs; the
// scheduler and storage controller need to settle those normally.
func podUsesPersistentVolumeClaim(pod podResource) bool {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil && strings.TrimSpace(volume.PersistentVolumeClaim.ClaimName) != "" {
return true
}
}
return false
}
// podControllerOwned runs one orchestration or CLI step.
// Signature: podControllerOwned(p podResource) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func podControllerOwned(p podResource) bool {
for _, owner := range p.Metadata.OwnerReferences {
switch strings.TrimSpace(owner.Kind) {
case "ReplicaSet", "StatefulSet", "DaemonSet", "Job":
return true
}
}
return false
}
// stuckContainerReason runs one orchestration or CLI step.
// Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func stuckContainerReason(p podResource, reasons map[string]struct{}) string {
check := func(statuses []podContainerStatus) string {
for _, st := range statuses {
if st.State.Waiting == nil {
continue
}
reason := strings.TrimSpace(st.State.Waiting.Reason)
if reason == "" {
continue
}
if _, ok := reasons[reason]; ok {
return reason
}
}
return ""
}
if reason := check(p.Status.InitContainerStatuses); reason != "" {
return reason
}
return check(p.Status.ContainerStatuses)
}
// stuckVaultInitReason runs one orchestration or CLI step.
// Signature: stuckVaultInitReason(p podResource, grace time.Duration) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func stuckVaultInitReason(p podResource, grace time.Duration) string {
if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") {
return ""
}
if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") {
return ""
}
for _, st := range p.Status.InitContainerStatuses {
if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil {
continue
}
startedAt := st.State.Running.StartedAt
if startedAt.IsZero() {
continue
}
if time.Since(startedAt) < grace {
return ""
}
return "VaultInitStuck"
}
return ""
}

View File

@ -0,0 +1,82 @@
package cluster
import (
"context"
"fmt"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// Shutdown runs one orchestration or CLI step.
// Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
if invErr := o.validateNodeInventory(); invErr != nil {
return invErr
}
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
Action: "shutdown",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
return fmt.Errorf("set shutdown intent: %w", writeErr)
}
defer func() {
final := state.IntentShuttingDown
if err == nil {
final = state.IntentShutdownComplete
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
}
}()
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, "")
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
if !skipEtcd {
o.bestEffort("etcd snapshot", func() error {
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
})
}
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
if !skipDrain {
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
}
shutdownMode := strings.TrimSpace(opts.Mode)
effectiveMode, modeErr := normalizeShutdownMode(shutdownMode)
if modeErr != nil {
return modeErr
}
o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode)
o.stopWorkers(ctx, workers)
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
o.log.Printf("shutdown flow complete")
return nil
}

View File

@ -141,453 +141,6 @@ func TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T) {
}
}
// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T).
// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be
// rescheduled without mutating Longhorn volume, replica, or disk objects.
func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}`
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-0b"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !deleted {
t.Fatalf("expected longhorn attach-blocked pending pod to be recycled")
}
}
// TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T).
// Why: encrypted Longhorn PVC recovery should repair missing host cryptsetup and
// then recycle the blocked pod without touching Longhorn data-plane objects.
func TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}`
installed := false
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if name != "ssh" || !strings.Contains(strings.Join(args, " "), "apt-get install -y --no-install-recommends cryptsetup-bin") {
return false
}
installed = true
return true
},
out: "__ANANKE_CRYPTSETUP_INSTALLED__",
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !installed {
t.Fatalf("expected missing host cryptsetup to be installed")
}
if !deleted {
t.Fatalf("expected encrypted-volume blocked pod to be recycled")
}
}
// TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T).
// Why: when host package repair is blocked by sudo policy, Ananke should avoid
// the bad node and retry the controller-owned pod elsewhere.
func TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}`
cordoned := false
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: matchContains("ssh", "apt-get install -y --no-install-recommends cryptsetup-bin"),
err: errors.New("sudo: a password is required"),
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "cordon", "titan-19")(name, args) {
return false
}
cordoned = true
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !cordoned {
t.Fatalf("expected cryptsetup-missing node to be cordoned")
}
if !deleted {
t.Fatalf("expected encrypted-volume blocked pod to be recycled")
}
}
// TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T).
// Why: post-outage controller pods can remain Unknown or Failed after their
// node recovers; deletion clears stale status while force deletion stays away
// from PVC-backed storage.
func TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T) {
old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
recent := time.Now().Add(-30 * time.Second).UTC().Format(time.RFC3339)
pods := `{"items":[` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-old","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"secret"}]},"status":{"phase":"Failed"}},` +
`{"metadata":{"namespace":"logging","name":"oauth2-proxy-terminating","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy-logs"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"secret"}]},"status":{"phase":"Running"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"pvc-backed-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"pvc-backed"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"data","persistentVolumeClaim":{"claimName":"data"}}]},"status":{"phase":"Failed"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-fresh","creationTimestamp":"` + recent + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"maintenance","name":"stale-on-bad-node","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"maintenance"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"default","name":"bare-pod","creationTimestamp":"` + old + `"},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}}]}`
deleted := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-12\tTrue\ntitan-22\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: `{"items":[]}`},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-12"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-22"},"status":{"conditions":[{"type":"Ready","status":"False"}]}}]}`},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-old", "--wait=false")(name, args) {
return false
}
deleted = append(deleted, "longhorn-vault-sync-old")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-failed", "--wait=false", "--grace-period=0", "--force")(name, args) {
return false
}
deleted = append(deleted, "longhorn-vault-sync-failed")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "logging", "delete", "pod", "oauth2-proxy-terminating", "--wait=false", "--grace-period=0", "--force")(name, args) {
return false
}
deleted = append(deleted, "oauth2-proxy-terminating")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "pvc-backed-failed", "--wait=false")(name, args) {
return false
}
if strings.Contains(strings.Join(args, " "), "--force") {
t.Fatalf("pvc-backed stale pod must not be force deleted")
}
deleted = append(deleted, "pvc-backed-failed")
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if strings.Join(deleted, ",") != "longhorn-vault-sync-old,longhorn-vault-sync-failed,oauth2-proxy-terminating,pvc-backed-failed" {
t.Fatalf("expected only stale controller pods on Ready node to be recycled, got %#v", deleted)
}
}
// TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T).
// Why: a Ready node with a wedged container runtime can trap replacement pods
// indefinitely; startup should cordon that scheduler target without draining it
// or touching Longhorn data-plane objects.
func TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T) {
old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[` +
`{"metadata":{"namespace":"logging","name":"oauth2-proxy-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"oauth2-proxy","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"monitoring","name":"suite-probe-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"suite-probe"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"probe","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"sso","name":"secret-ensure-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"secret-ensure"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","initContainerStatuses":[{"name":"init","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"finance","name":"single-node-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"single"}]},"spec":{"nodeName":"titan-19","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"app","state":{"waiting":{"reason":"CreateContainerError"}}}]}}]}`
events := `{"items":[` +
`{"metadata":{"namespace":"logging","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"logging","name":"oauth2-proxy-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{oauth2-proxy}: Error: failed to reserve container name oauth2-proxy_logging","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"suite-probe-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{probe}: Error: context deadline exceeded","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"sso","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"sso","name":"secret-ensure-bad"},"type":"Warning","reason":"Failed","message":"spec.initContainers{init}: Error: failed to reserve container name init_sso","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"single-node-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{app}: Error: failed to reserve container name app_finance","lastTimestamp":"` + lastSeen + `"}]}`
cordoned := []string{}
deleted := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: ""},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-18"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "cordon")(name, args) {
return false
}
cordoned = append(cordoned, args[len(args)-1])
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "delete", "pod", "--wait=false")(name, args) {
return false
}
joined := strings.Join(args, " ")
if strings.Contains(joined, "--force") {
t.Fatalf("container-runtime wedge recycle must not force-delete fresh pods")
}
if len(args) >= 5 {
deleted = append(deleted, args[4])
}
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if strings.Join(cordoned, ",") != "titan-18" {
t.Fatalf("expected only titan-18 to be cordoned, got %#v", cordoned)
}
if strings.Join(deleted, ",") != "oauth2-proxy-bad,suite-probe-bad,secret-ensure-bad,single-node-bad" {
t.Fatalf("expected runtime-wedged pods to be recycled, got %#v", deleted)
}
}
// TestEffectiveWorkersFiltersIgnoredUnavailableNodes runs one orchestration or CLI step.
// Signature: TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T).
// Why: ignored unavailable nodes should be excluded before startup tries SSH,
// k3s-agent start, or uncordon operations against intentionally absent hosts.
func TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T) {
cfg := config.Config{
Workers: []string{" titan-08 ", "titan-09", "titan-10", "titan-11"},
Startup: config.Startup{
IgnoreUnavailableNodes: []string{"titan-09", "titan-10"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, nil)
got, err := orch.effectiveWorkers(context.Background())
if err != nil {
t.Fatalf("effectiveWorkers failed: %v", err)
}
want := []string{"titan-08", "titan-11"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Fatalf("effectiveWorkers mismatch got=%v want=%v", got, want)
}
}
// TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers runs one orchestration or CLI step.
// Signature: TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T).
// Why: startup must not uncordon Longhorn workers that cannot mount encrypted
// PVCs; cordoning those nodes is safe and avoids repeating the post-outage
// mount deadlock.
func TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T) {
cordoned := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
SSHManagedNodes: []string{"titan-04", "titan-19"},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-19\ntitan-23\n"},
{
match: matchContains("ssh", "titan-04", "command -v cryptsetup"),
out: "__ANANKE_CRYPTSETUP_PRESENT__",
},
{
match: matchContains("ssh", "titan-19", "apt-get install -y --no-install-recommends cryptsetup-bin"),
err: errors.New("sudo: a password is required"),
},
{
match: func(name string, args []string) bool {
if name != "kubectl" || len(args) == 0 || args[0] != "cordon" {
return false
}
if len(args) > 1 {
cordoned = append(cordoned, args[len(args)-1])
}
return true
},
},
})
got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04", "titan-19", "titan-20"})
if err != nil {
t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err)
}
want := []string{"titan-04", "titan-20"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Fatalf("guarded workers mismatch got=%v want=%v", got, want)
}
if strings.Join(cordoned, ",") != "titan-19,titan-23" {
t.Fatalf("expected unsafe longhorn hosts to be cordoned, got %v", cordoned)
}
}
// TestLonghornCryptsetupExemptNodesAreNotQuarantined runs one orchestration or CLI step.
// Signature: TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T).
// Why: Veles/Oceanus uses titan-23 as a Longhorn host for unencrypted local
// volumes; startup should uncordon that policy-exempt node without requiring
// host SSH or weakening encrypted-volume safety on other workers.
func TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T) {
cordoned := []string{}
uncordoned := []string{}
sshTitan23 := false
orch := buildOrchestratorWithStubs(t, config.Config{
SSHManagedNodes: []string{"titan-04"},
Startup: config.Startup{
LonghornCryptsetupExemptNodes: []string{"titan-23"},
},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-23\n"},
{
match: matchContains("ssh", "titan-04", "command -v cryptsetup"),
out: "__ANANKE_CRYPTSETUP_PRESENT__",
},
{
match: func(name string, args []string) bool {
if name == "ssh" && strings.Contains(strings.Join(args, " "), "titan-23") {
sshTitan23 = true
return true
}
return false
},
},
{
match: func(name string, args []string) bool {
if name != "kubectl" || len(args) == 0 || args[0] != "cordon" {
return false
}
if len(args) > 1 {
cordoned = append(cordoned, args[len(args)-1])
}
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "uncordon")(name, args) {
return false
}
if len(args) > 1 {
uncordoned = append(uncordoned, args[len(args)-1])
}
return true
},
},
})
got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04"})
if err != nil {
t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err)
}
if strings.Join(got, ",") != "titan-04" {
t.Fatalf("guarded workers mismatch got=%v", got)
}
if err := orch.uncordonLonghornCryptsetupExemptNodes(context.Background()); err != nil {
t.Fatalf("uncordonLonghornCryptsetupExemptNodes failed: %v", err)
}
if sshTitan23 {
t.Fatalf("did not expect cryptsetup SSH check for exempt titan-23")
}
if len(cordoned) != 0 {
t.Fatalf("did not expect exempt node to be cordoned, got %v", cordoned)
}
if strings.Join(uncordoned, ",") != "titan-23" {
t.Fatalf("expected exempt titan-23 to be uncordoned, got %v", uncordoned)
}
}
// TestLonghornHostNodesFallsBackToConfiguredLabels runs one orchestration or CLI step.
// Signature: TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T).
// Why: bootstrap caches or minimal test clusters can lack live labels; the
// static startup inventory should still protect configured storage workers.
func TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T) {
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{
RequiredNodeLabels: map[string]map[string]string{
"titan-04": {"longhorn-host": "true"},
"titan-20": {"node-role.kubernetes.io/worker": "true"},
},
},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: ""},
})
got, err := orch.longhornHostNodes(context.Background())
if err != nil {
t.Fatalf("longhornHostNodes failed: %v", err)
}
if _, ok := got["titan-04"]; !ok || len(got) != 1 {
t.Fatalf("expected configured longhorn host fallback, got %v", got)
}
}
// TestNewConstructsOrchestrator runs one orchestration or CLI step.
// Signature: TestNewConstructsOrchestrator(t *testing.T).
// Why: covers constructor path in orchestrator core module.
@ -913,86 +466,3 @@ func TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T) {
t.Fatalf("expected runSudoK3S failure when all candidates fail")
}
}
// TestCriticalEndpointHelpers runs one orchestration or CLI step.
// Signature: TestCriticalEndpointHelpers(t *testing.T).
// Why: covers critical endpoint parsing and readiness checks that gate startup completion.
func TestCriticalEndpointHelpers(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, []commandStub{
{match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"},
})
ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background())
if err != nil || !ok {
t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err)
}
if detail != "services=1" {
t.Fatalf("unexpected readiness detail: %q", detail)
}
gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server")
if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" {
t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err)
}
if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil {
t.Fatalf("expected parseCriticalServiceEndpoint error")
}
}
// TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step.
// Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T).
// Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding.
func TestCriticalEndpointAutoHealWorkflow(t *testing.T) {
cfg := config.Config{
Startup: config.Startup{
CriticalServiceEndpointWaitSec: 2,
CriticalServiceEndpointPollSec: 1,
CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"},
},
State: config.State{
Dir: t.TempDir(),
ReportsDir: filepath.Join(t.TempDir(), "reports"),
RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"),
},
}
orch := &Orchestrator{
cfg: cfg,
runner: &execx.Runner{},
store: state.New(cfg.State.RunHistoryPath),
log: log.New(io.Discard, "", 0),
}
endpointChecks := 0
dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
joined := name + " " + strings.Join(args, " ")
if strings.Contains(joined, "get endpoints victoria-metrics-single-server") {
endpointChecks++
if endpointChecks == 1 {
return "", nil
}
return "10.42.0.10\n", nil
}
if strings.Contains(joined, "scale deployment victoria-metrics-single-server") {
return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`)
}
if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") {
return "", nil
}
if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") {
return "statefulset rolled out", nil
}
return "", nil
}
orch.runOverride = dispatch
orch.runSensitiveOverride = dispatch
if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil {
t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err)
}
if endpointChecks < 2 {
t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks)
}
}

View File

@ -417,7 +417,8 @@ func (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context,
continue
}
sort.Strings(keys)
if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil {
detail := fmt.Sprintf("pods=%d %s", len(keys), joinLimited(keys, 8))
if err := o.cordonNodeWithLease(ctx, node, cordonReasonRuntimeWedge, detail); err != nil {
o.log.Printf("warning: cordon container-runtime-wedged node %s failed: %v", node, err)
continue
}
@ -438,380 +439,3 @@ func (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context,
}
return nodes
}
// staleControllerPodReasons runs one orchestration or CLI step.
// Signature: (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: after node or kubelet recovery, controller-owned pods can stay in
// terminal or unknown status even though the node is Ready and a replacement may
// already be healthy. A normal pod delete lets Kubernetes clean the stale status
// without touching storage objects or forcing deletion on a partitioned node.
func (o *Orchestrator) staleControllerPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
unavailable, err := o.unavailableNodeSet(ctx)
if err != nil {
return nil, err
}
reasons := map[string]string{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
phase := strings.TrimSpace(pod.Status.Phase)
if !strings.EqualFold(phase, "Unknown") && !strings.EqualFold(phase, "Failed") {
continue
}
if _, badNode := unavailable[node]; badNode {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
reasons[ns+"/"+name] = "StaleControllerPodOnReadyNode:" + node + ":" + phase
}
return reasons, nil
}
// staleControllerPodForceDeleteSafe runs one orchestration or CLI step.
// Signature: staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool.
// Why: a stale pod already marked for deletion may need force removal after a
// node outage. Keep that fallback away from PVC-bearing pods so Ananke never
// risks duplicating a storage writer.
func staleControllerPodForceDeleteSafe(pod podResource, grace time.Duration) bool {
if pod.Metadata.DeletionTimestamp == nil {
return false
}
if time.Since(*pod.Metadata.DeletionTimestamp) < grace {
return false
}
if podUsesPersistentVolumeClaim(pod) {
return false
}
return true
}
// podUsesPersistentVolumeClaim runs one orchestration or CLI step.
// Signature: podUsesPersistentVolumeClaim(pod podResource) bool.
// Why: force-delete recovery is deliberately disallowed for pods with PVCs; the
// scheduler and storage controller need to settle those normally.
func podUsesPersistentVolumeClaim(pod podResource) bool {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil && strings.TrimSpace(volume.PersistentVolumeClaim.ClaimName) != "" {
return true
}
}
return false
}
// repairEncryptedVolumeMountPrereqs runs one orchestration or CLI step.
// Signature: (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: encrypted Longhorn volume mounts depend on host cryptsetup. After node
// rebuilds or partial OS recovery, Kubernetes may be ready while kubelet cannot
// mount encrypted PVCs; installing the missing host tool and recycling the
// controller-owned pod lets kubelet retry the same volume safely.
func (o *Orchestrator) repairEncryptedVolumeMountPrereqs(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query events for encrypted volume mount scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return nil, fmt.Errorf("decode events for encrypted volume mount scan: %w", err)
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
podsByKey[ns+"/"+name] = pod
}
if len(podsByKey) == 0 {
return map[string]string{}, nil
}
repairedNodes := map[string]bool{}
reasons := map[string]string{}
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedMount" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[key]
if !ok {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
continue
}
message := strings.ToLower(strings.TrimSpace(event.Message))
if !strings.Contains(message, "cryptsetup") || !strings.Contains(message, "no such file or directory") {
continue
}
node := strings.TrimSpace(pod.Spec.NodeName)
if node == "" || !o.sshManaged(node) {
o.log.Printf("warning: encrypted volume mount blocked on unmanaged node %s for pod %s", node, key)
continue
}
if repaired, ok := repairedNodes[node]; ok {
if repaired {
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
}
continue
}
if err := o.ensureHostCryptsetup(ctx, node); err != nil {
repairedNodes[node] = false
o.log.Printf("warning: cryptsetup prerequisite repair failed on %s for pod %s: %v", node, key, err)
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
o.log.Printf("warning: cordon failed after cryptsetup repair failure on %s for pod %s: %v", node, key, cordonErr)
continue
}
reasons[key] = "EncryptedVolumeCryptsetupNodeCordoned:" + node
continue
}
repairedNodes[node] = true
reasons[key] = "EncryptedVolumeCryptsetupRepaired:" + node
}
return reasons, nil
}
// ensureHostCryptsetup runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error.
// Why: kubelet's encrypted Longhorn mount helper shells into the host namespace,
// so the package must exist on the node host, not merely inside a workload pod.
func (o *Orchestrator) ensureHostCryptsetup(ctx context.Context, node string) error {
command := strings.Join([]string{
"set -eu",
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_PRESENT__; exit 0; fi",
"if ! command -v apt-get >/dev/null 2>&1; then echo __ANANKE_CRYPTSETUP_NO_APT__; exit 42; fi",
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get update",
"sudo -n env DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends cryptsetup-bin",
"if command -v cryptsetup >/dev/null 2>&1 || [ -x /usr/sbin/cryptsetup ] || [ -x /usr/bin/cryptsetup ]; then echo __ANANKE_CRYPTSETUP_INSTALLED__; exit 0; fi",
"echo __ANANKE_CRYPTSETUP_INSTALL_FAILED__",
"exit 43",
}, "; ")
out, err := o.sshWithTimeout(ctx, node, command, 5*time.Minute)
if err != nil {
return fmt.Errorf("install cryptsetup-bin: %w (output=%s)", err, strings.TrimSpace(out))
}
trimmed := strings.TrimSpace(out)
o.log.Printf("ensured cryptsetup prerequisite on %s: %s", node, trimmed)
if strings.Contains(trimmed, "__ANANKE_CRYPTSETUP_INSTALLED__") {
o.noteStartupAutoHeal(fmt.Sprintf("installed cryptsetup on %s", node))
}
return nil
}
// cordonNodeForMissingCryptsetup runs one orchestration or CLI step.
// Signature: (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error.
// Why: when host package repair is not permitted, cordoning is the safest
// automatic fallback: it prevents new encrypted-volume pods from landing on a
// node kubelet cannot mount from, while leaving existing workloads and storage
// objects untouched.
func (o *Orchestrator) cordonNodeForMissingCryptsetup(ctx context.Context, node string) error {
if _, err := o.kubectl(ctx, 30*time.Second, "cordon", node); err != nil {
return err
}
o.log.Printf("cordoned node %s after encrypted volume cryptsetup prerequisite failure", node)
o.noteStartupAutoHeal(fmt.Sprintf("cordoned %s after missing cryptsetup blocked encrypted volume mount", node))
return nil
}
// longhornAttachBlockedPodReasons runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error).
// Why: after a power event, Kubernetes can schedule a Longhorn-backed pod onto a
// node Longhorn still marks unready. Recycling the unattached Pending pod lets
// the scheduler pick a Longhorn-ready node without touching Longhorn data-plane
// objects.
func (o *Orchestrator) longhornAttachBlockedPodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) {
unreadyNodes, err := o.longhornUnreadyNodes(ctx)
if err != nil {
return nil, err
}
if len(unreadyNodes) == 0 {
return map[string]string{}, nil
}
eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query events for longhorn attach-blocked pod scan: %w", err)
}
var events eventList
if err := json.Unmarshal([]byte(eventsOut), &events); err != nil {
return nil, fmt.Errorf("decode events for longhorn attach-blocked pod scan: %w", err)
}
podsByKey := map[string]podResource{}
for _, pod := range pods.Items {
ns := strings.TrimSpace(pod.Metadata.Namespace)
name := strings.TrimSpace(pod.Metadata.Name)
node := strings.TrimSpace(pod.Spec.NodeName)
if ns == "" || name == "" || node == "" {
continue
}
if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") {
continue
}
if _, unready := unreadyNodes[node]; !unready {
continue
}
if !podControllerOwned(pod) {
continue
}
if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace {
continue
}
podsByKey[ns+"/"+name] = pod
}
if len(podsByKey) == 0 {
return map[string]string{}, nil
}
reasons := map[string]string{}
for _, event := range events.Items {
if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") {
continue
}
if strings.TrimSpace(event.Reason) != "FailedAttachVolume" {
continue
}
if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") {
continue
}
key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name)
pod, ok := podsByKey[key]
if !ok {
continue
}
lastSeen := eventLastObservedAt(event)
if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) {
continue
}
node := strings.TrimSpace(pod.Spec.NodeName)
message := strings.ToLower(strings.TrimSpace(event.Message))
if !strings.Contains(message, "longhorn-backend") || !strings.Contains(message, "failed for volume") {
continue
}
if !strings.Contains(message, "node "+strings.ToLower(node)+" is not ready") {
continue
}
reasons[key] = "LonghornAttachBlockedOnUnreadyNode:" + node
}
return reasons, nil
}
// longhornUnreadyNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error).
// Why: Longhorn node readiness can lag or intentionally differ from Kubernetes
// node readiness; attach recovery must use Longhorn's view for safety.
func (o *Orchestrator) longhornUnreadyNodes(ctx context.Context) (map[string]struct{}, error) {
out, err := o.kubectl(ctx, 30*time.Second,
"-n", "longhorn-system",
"get", "nodes.longhorn.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{range .status.conditions[?(@.type==\"Ready\")]}{.status}{end}{'\\n'}{end}",
)
if err != nil {
if isNotFoundErr(err) {
return map[string]struct{}{}, nil
}
return nil, fmt.Errorf("query longhorn node readiness: %w", err)
}
unready := map[string]struct{}{}
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
if !strings.EqualFold(strings.TrimSpace(fields[1]), "True") {
unready[strings.TrimSpace(fields[0])] = struct{}{}
}
}
return unready, nil
}
// podControllerOwned runs one orchestration or CLI step.
// Signature: podControllerOwned(p podResource) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func podControllerOwned(p podResource) bool {
for _, owner := range p.Metadata.OwnerReferences {
switch strings.TrimSpace(owner.Kind) {
case "ReplicaSet", "StatefulSet", "DaemonSet", "Job":
return true
}
}
return false
}
// stuckContainerReason runs one orchestration or CLI step.
// Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func stuckContainerReason(p podResource, reasons map[string]struct{}) string {
check := func(statuses []podContainerStatus) string {
for _, st := range statuses {
if st.State.Waiting == nil {
continue
}
reason := strings.TrimSpace(st.State.Waiting.Reason)
if reason == "" {
continue
}
if _, ok := reasons[reason]; ok {
return reason
}
}
return ""
}
if reason := check(p.Status.InitContainerStatuses); reason != "" {
return reason
}
return check(p.Status.ContainerStatuses)
}
// stuckVaultInitReason runs one orchestration or CLI step.
// Signature: stuckVaultInitReason(p podResource, grace time.Duration) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func stuckVaultInitReason(p podResource, grace time.Duration) string {
if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") {
return ""
}
if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") {
return ""
}
for _, st := range p.Status.InitContainerStatuses {
if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil {
continue
}
startedAt := st.State.Running.StartedAt
if startedAt.IsZero() {
continue
}
if time.Since(startedAt) < grace {
return ""
}
return "VaultInitStuck"
}
return ""
}

View File

@ -0,0 +1,458 @@
package cluster
import (
"context"
"errors"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
)
// TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T).
// Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be
// rescheduled without mutating Longhorn volume, replica, or disk objects.
func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}`
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-0b"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !deleted {
t.Fatalf("expected longhorn attach-blocked pending pod to be recycled")
}
}
// TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T).
// Why: encrypted Longhorn PVC recovery should repair missing host cryptsetup and
// then recycle the blocked pod without touching Longhorn data-plane objects.
func TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}`
installed := false
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if name != "ssh" || !strings.Contains(strings.Join(args, " "), "apt-get install -y --no-install-recommends cryptsetup-bin") {
return false
}
installed = true
return true
},
out: "__ANANKE_CRYPTSETUP_INSTALLED__",
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !installed {
t.Fatalf("expected missing host cryptsetup to be installed")
}
if !deleted {
t.Fatalf("expected encrypted-volume blocked pod to be recycled")
}
}
// TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T).
// Why: when host package repair is blocked by sudo policy, Ananke should avoid
// the bad node and retry the controller-owned pod elsewhere.
func TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T) {
created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}`
events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}`
cordoned := false
deleted := false
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: matchContains("ssh", "apt-get install -y --no-install-recommends cryptsetup-bin"),
err: errors.New("sudo: a password is required"),
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "cordon", "titan-19")(name, args) {
return false
}
cordoned = true
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) {
return false
}
deleted = true
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if !cordoned {
t.Fatalf("expected cryptsetup-missing node to be cordoned")
}
if !deleted {
t.Fatalf("expected encrypted-volume blocked pod to be recycled")
}
}
// TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T).
// Why: post-outage controller pods can remain Unknown or Failed after their
// node recovers; deletion clears stale status while force deletion stays away
// from PVC-backed storage.
func TestRecycleStuckControllerPodsHandlesStalePodsOnReadyNodes(t *testing.T) {
old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
recent := time.Now().Add(-30 * time.Second).UTC().Format(time.RFC3339)
pods := `{"items":[` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-old","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"secret"}]},"status":{"phase":"Failed"}},` +
`{"metadata":{"namespace":"logging","name":"oauth2-proxy-terminating","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy-logs"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"secret"}]},"status":{"phase":"Running"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"pvc-backed-failed","creationTimestamp":"` + old + `","deletionTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"pvc-backed"}]},"spec":{"nodeName":"titan-12","volumes":[{"name":"data","persistentVolumeClaim":{"claimName":"data"}}]},"status":{"phase":"Failed"}},` +
`{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-fresh","creationTimestamp":"` + recent + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"maintenance","name":"stale-on-bad-node","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"maintenance"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Unknown"}},` +
`{"metadata":{"namespace":"default","name":"bare-pod","creationTimestamp":"` + old + `"},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}}]}`
deleted := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-12\tTrue\ntitan-22\tTrue\n"},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: `{"items":[]}`},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-12"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-22"},"status":{"conditions":[{"type":"Ready","status":"False"}]}}]}`},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-old", "--wait=false")(name, args) {
return false
}
deleted = append(deleted, "longhorn-vault-sync-old")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-failed", "--wait=false", "--grace-period=0", "--force")(name, args) {
return false
}
deleted = append(deleted, "longhorn-vault-sync-failed")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "logging", "delete", "pod", "oauth2-proxy-terminating", "--wait=false", "--grace-period=0", "--force")(name, args) {
return false
}
deleted = append(deleted, "oauth2-proxy-terminating")
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "pvc-backed-failed", "--wait=false")(name, args) {
return false
}
if strings.Contains(strings.Join(args, " "), "--force") {
t.Fatalf("pvc-backed stale pod must not be force deleted")
}
deleted = append(deleted, "pvc-backed-failed")
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if strings.Join(deleted, ",") != "longhorn-vault-sync-old,longhorn-vault-sync-failed,oauth2-proxy-terminating,pvc-backed-failed" {
t.Fatalf("expected only stale controller pods on Ready node to be recycled, got %#v", deleted)
}
}
// TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode runs one orchestration or CLI step.
// Signature: TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T).
// Why: a Ready node with a wedged container runtime can trap replacement pods
// indefinitely; startup should cordon that scheduler target without draining it
// or touching Longhorn data-plane objects.
func TestRecycleStuckControllerPodsCordonsContainerRuntimeWedgeNode(t *testing.T) {
old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339)
lastSeen := time.Now().UTC().Format(time.RFC3339)
pods := `{"items":[` +
`{"metadata":{"namespace":"logging","name":"oauth2-proxy-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"oauth2-proxy"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"oauth2-proxy","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"monitoring","name":"suite-probe-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"suite-probe"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"probe","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"sso","name":"secret-ensure-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"Job","name":"secret-ensure"}]},"spec":{"nodeName":"titan-18","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","initContainerStatuses":[{"name":"init","state":{"waiting":{"reason":"CreateContainerError"}}}]}},` +
`{"metadata":{"namespace":"finance","name":"single-node-bad","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"single"}]},"spec":{"nodeName":"titan-19","volumes":[{"name":"scratch"}]},"status":{"phase":"Pending","containerStatuses":[{"name":"app","state":{"waiting":{"reason":"CreateContainerError"}}}]}}]}`
events := `{"items":[` +
`{"metadata":{"namespace":"logging","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"logging","name":"oauth2-proxy-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{oauth2-proxy}: Error: failed to reserve container name oauth2-proxy_logging","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"suite-probe-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{probe}: Error: context deadline exceeded","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"sso","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"sso","name":"secret-ensure-bad"},"type":"Warning","reason":"Failed","message":"spec.initContainers{init}: Error: failed to reserve container name init_sso","lastTimestamp":"` + lastSeen + `"},` +
`{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"single-node-bad"},"type":"Warning","reason":"Failed","message":"spec.containers{app}: Error: failed to reserve container name app_finance","lastTimestamp":"` + lastSeen + `"}]}`
cordoned := []string{}
deleted := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{StuckPodGraceSeconds: 180},
}, []commandStub{
{match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods},
{match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: ""},
{match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events},
{match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-18"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`},
{
match: func(name string, args []string) bool {
if name != "kubectl" || len(args) == 0 || args[0] != "cordon" {
return false
}
cordoned = append(cordoned, args[len(args)-1])
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "delete", "pod", "--wait=false")(name, args) {
return false
}
joined := strings.Join(args, " ")
if strings.Contains(joined, "--force") {
t.Fatalf("container-runtime wedge recycle must not force-delete fresh pods")
}
if len(args) >= 5 {
deleted = append(deleted, args[4])
}
return true
},
},
})
if err := orch.recycleStuckControllerPods(context.Background()); err != nil {
t.Fatalf("recycleStuckControllerPods failed: %v", err)
}
if strings.Join(cordoned, ",") != "titan-18" {
t.Fatalf("expected only titan-18 to be cordoned, got %#v", cordoned)
}
if strings.Join(deleted, ",") != "oauth2-proxy-bad,suite-probe-bad,secret-ensure-bad,single-node-bad" {
t.Fatalf("expected runtime-wedged pods to be recycled, got %#v", deleted)
}
}
// TestEffectiveWorkersFiltersIgnoredUnavailableNodes runs one orchestration or CLI step.
// Signature: TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T).
// Why: ignored unavailable nodes should be excluded before startup tries SSH,
// k3s-agent start, or uncordon operations against intentionally absent hosts.
func TestEffectiveWorkersFiltersIgnoredUnavailableNodes(t *testing.T) {
cfg := config.Config{
Workers: []string{" titan-08 ", "titan-09", "titan-10", "titan-11"},
Startup: config.Startup{
IgnoreUnavailableNodes: []string{"titan-09", "titan-10"},
},
}
orch := buildOrchestratorWithStubs(t, cfg, nil)
got, err := orch.effectiveWorkers(context.Background())
if err != nil {
t.Fatalf("effectiveWorkers failed: %v", err)
}
want := []string{"titan-08", "titan-11"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Fatalf("effectiveWorkers mismatch got=%v want=%v", got, want)
}
}
// TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers runs one orchestration or CLI step.
// Signature: TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T).
// Why: startup must not uncordon Longhorn workers that cannot mount encrypted
// PVCs; cordoning those nodes is safe and avoids repeating the post-outage
// mount deadlock.
func TestEnsureLonghornEncryptedHostPrereqsFiltersUnsafeWorkers(t *testing.T) {
cordoned := []string{}
orch := buildOrchestratorWithStubs(t, config.Config{
SSHManagedNodes: []string{"titan-04", "titan-19"},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-19\ntitan-23\n"},
{
match: matchContains("ssh", "titan-04", "command -v cryptsetup"),
out: "__ANANKE_CRYPTSETUP_PRESENT__",
},
{
match: matchContains("ssh", "titan-19", "apt-get install -y --no-install-recommends cryptsetup-bin"),
err: errors.New("sudo: a password is required"),
},
{
match: func(name string, args []string) bool {
if name != "kubectl" || len(args) == 0 || args[0] != "cordon" {
return false
}
if len(args) > 1 {
cordoned = append(cordoned, args[len(args)-1])
}
return true
},
},
})
got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04", "titan-19", "titan-20"})
if err != nil {
t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err)
}
want := []string{"titan-04", "titan-20"}
if strings.Join(got, ",") != strings.Join(want, ",") {
t.Fatalf("guarded workers mismatch got=%v want=%v", got, want)
}
if strings.Join(cordoned, ",") != "titan-19,titan-23" {
t.Fatalf("expected unsafe longhorn hosts to be cordoned, got %v", cordoned)
}
}
// TestLonghornCryptsetupExemptNodesAreNotQuarantined runs one orchestration or CLI step.
// Signature: TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T).
// Why: Veles/Oceanus uses titan-23 as a Longhorn host for unencrypted local
// volumes; startup should uncordon that policy-exempt node without requiring
// host SSH or weakening encrypted-volume safety on other workers.
func TestLonghornCryptsetupExemptNodesAreNotQuarantined(t *testing.T) {
cordoned := []string{}
uncordoned := []string{}
sshTitan23 := false
orch := buildOrchestratorWithStubs(t, config.Config{
SSHManagedNodes: []string{"titan-04"},
Startup: config.Startup{
LonghornCryptsetupExemptNodes: []string{"titan-23"},
},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: "titan-04\ntitan-23\n"},
{
match: matchContains("ssh", "titan-04", "command -v cryptsetup"),
out: "__ANANKE_CRYPTSETUP_PRESENT__",
},
{
match: func(name string, args []string) bool {
if name == "ssh" && strings.Contains(strings.Join(args, " "), "titan-23") {
sshTitan23 = true
return true
}
return false
},
},
{
match: func(name string, args []string) bool {
if name != "kubectl" || len(args) == 0 || args[0] != "cordon" {
return false
}
if len(args) > 1 {
cordoned = append(cordoned, args[len(args)-1])
}
return true
},
},
{
match: func(name string, args []string) bool {
if !matchContains("kubectl", "uncordon")(name, args) {
return false
}
if len(args) > 1 {
uncordoned = append(uncordoned, args[len(args)-1])
}
return true
},
},
})
got, err := orch.ensureLonghornEncryptedHostPrereqs(context.Background(), []string{"titan-04"})
if err != nil {
t.Fatalf("ensureLonghornEncryptedHostPrereqs failed: %v", err)
}
if strings.Join(got, ",") != "titan-04" {
t.Fatalf("guarded workers mismatch got=%v", got)
}
if err := orch.uncordonLonghornCryptsetupExemptNodes(context.Background()); err != nil {
t.Fatalf("uncordonLonghornCryptsetupExemptNodes failed: %v", err)
}
if sshTitan23 {
t.Fatalf("did not expect cryptsetup SSH check for exempt titan-23")
}
if len(cordoned) != 0 {
t.Fatalf("did not expect exempt node to be cordoned, got %v", cordoned)
}
if strings.Join(uncordoned, ",") != "titan-23" {
t.Fatalf("expected exempt titan-23 to be uncordoned, got %v", uncordoned)
}
}
// TestLonghornHostNodesFallsBackToConfiguredLabels runs one orchestration or CLI step.
// Signature: TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T).
// Why: bootstrap caches or minimal test clusters can lack live labels; the
// static startup inventory should still protect configured storage workers.
func TestLonghornHostNodesFallsBackToConfiguredLabels(t *testing.T) {
orch := buildOrchestratorWithStubs(t, config.Config{
Startup: config.Startup{
RequiredNodeLabels: map[string]map[string]string{
"titan-04": {"longhorn-host": "true"},
"titan-20": {"node-role.kubernetes.io/worker": "true"},
},
},
}, []commandStub{
{match: matchContains("kubectl", "get", "nodes", "-l", "longhorn-host=true"), out: ""},
})
got, err := orch.longhornHostNodes(context.Background())
if err != nil {
t.Fatalf("longhornHostNodes failed: %v", err)
}
if _, ok := got["titan-04"]; !ok || len(got) != 1 {
t.Fatalf("expected configured longhorn host fallback, got %v", got)
}
}

View File

@ -0,0 +1,49 @@
package cluster
import (
"context"
"fmt"
"strconv"
"strings"
"time"
)
// workloadReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
out, err := o.kubectl(
ctx,
20*time.Second,
"-n",
w.Namespace,
"get",
w.Kind,
w.Name,
"-o",
"jsonpath={.status.readyReplicas}",
)
if err != nil {
return false, err
}
raw := strings.TrimSpace(out)
if raw == "" || raw == "<no value>" {
return false, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
}
return n >= 1, nil
}
// isNotFoundErr runs one orchestration or CLI step.
// Signature: isNotFoundErr(err error) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func isNotFoundErr(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
}

View File

@ -201,6 +201,9 @@ func (c *Config) applyDefaults() {
if c.Startup.PostStartAutoHealSeconds <= 0 {
c.Startup.PostStartAutoHealSeconds = 60
}
if c.Startup.RecoveryCordonMaxSeconds <= 0 {
c.Startup.RecoveryCordonMaxSeconds = 3600
}
if c.Startup.DeadNodeCleanupGraceSeconds <= 0 {
c.Startup.DeadNodeCleanupGraceSeconds = 300
}

View File

@ -121,6 +121,7 @@ func defaults() Config {
IgnoreUnavailableNodes: []string{},
AutoRecycleStuckPods: true,
StuckPodGraceSeconds: 180,
RecoveryCordonMaxSeconds: 3600,
VaultUnsealKeyFile: "/var/lib/ananke/vault-unseal.key",
VaultUnsealBreakglassTimeout: 15,
},

View File

@ -94,6 +94,7 @@ type Startup struct {
SchedulingStormWindowSeconds int `yaml:"scheduling_storm_window_seconds"`
StuckPodGraceSeconds int `yaml:"stuck_pod_grace_seconds"`
PostStartAutoHealSeconds int `yaml:"post_start_auto_heal_seconds"`
RecoveryCordonMaxSeconds int `yaml:"recovery_cordon_max_seconds"`
DeadNodeCleanupGraceSeconds int `yaml:"dead_node_cleanup_grace_seconds"`
VaultUnsealKeyFile string `yaml:"vault_unseal_key_file"`
VaultUnsealBreakglassCommand string `yaml:"vault_unseal_breakglass_command"`

View File

@ -280,6 +280,9 @@ func (c Config) Validate() error {
if c.Startup.PostStartAutoHealSeconds <= 0 {
return fmt.Errorf("config.startup.post_start_auto_heal_seconds must be > 0")
}
if c.Startup.RecoveryCordonMaxSeconds <= 0 {
return fmt.Errorf("config.startup.recovery_cordon_max_seconds must be > 0")
}
if c.Startup.DeadNodeCleanupGraceSeconds <= 0 {
return fmt.Errorf("config.startup.dead_node_cleanup_grace_seconds must be > 0")
}

View File

@ -16,8 +16,11 @@ internal/cluster/orchestrator_report_test.go
internal/cluster/orchestrator_autorepair_test.go
internal/cluster/orchestrator_autorepair_cleanup_test.go
internal/cluster/orchestrator_autorepair_proxy_test.go
internal/cluster/orchestrator_critical_endpoint_additional_test.go
internal/cluster/orchestrator_cordon_lease_test.go
internal/cluster/orchestrator_test.go
internal/cluster/orchestrator_unit_additional_test.go
internal/cluster/orchestrator_workload_recovery_test.go
internal/cluster/orchestrator_vault_test.go
internal/config/config_test.go
internal/config/load_additional_test.go