diff --git a/pkg/service/cluster_test.go b/pkg/service/cluster_test.go index b3e7ba6..2f85403 100644 --- a/pkg/service/cluster_test.go +++ b/pkg/service/cluster_test.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "encoding/pem" "errors" "net/http" "net/http/httptest" @@ -9,8 +10,15 @@ import ( "path/filepath" "strings" "testing" + "time" ) +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + func TestInClusterKubeClientMissingEnv(t *testing.T) { t.Setenv("KUBERNETES_SERVICE_HOST", "") t.Setenv("KUBERNETES_SERVICE_PORT", "") @@ -19,6 +27,107 @@ func TestInClusterKubeClientMissingEnv(t *testing.T) { } } +func TestInClusterKubeClientFileBranches(t *testing.T) { + origTokenPath := kubeServiceAccountTokenPath + origCAPath := kubeServiceAccountCAPath + t.Cleanup(func() { + kubeServiceAccountTokenPath = origTokenPath + kubeServiceAccountCAPath = origCAPath + }) + dir := t.TempDir() + tokenPath := filepath.Join(dir, "token") + caPath := filepath.Join(dir, "ca.crt") + kubeServiceAccountTokenPath = tokenPath + kubeServiceAccountCAPath = caPath + t.Setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1") + t.Setenv("KUBERNETES_SERVICE_PORT", "6443") + + if _, err := inClusterKubeClient(); err == nil { + t.Fatal("expected token read error") + } + if err := os.WriteFile(tokenPath, []byte(" token \n"), 0o600); err != nil { + t.Fatal(err) + } + if _, err := inClusterKubeClient(); err == nil { + t.Fatal("expected CA read error") + } + if err := os.WriteFile(caPath, []byte("not a certificate"), 0o600); err != nil { + t.Fatal(err) + } + if _, err := inClusterKubeClient(); err == nil { + t.Fatal("expected invalid CA error") + } + + tlsServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer tlsServer.Close() + cert := tlsServer.Certificate() + caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}) + if err := os.WriteFile(caPath, caPEM, 0o600); err != nil { + t.Fatal(err) + } + client, err := inClusterKubeClient() + if err != nil { + t.Fatalf("inClusterKubeClient: %v", err) + } + if client.baseURL != "https://10.0.0.1:6443" || client.token != "token" || client.client == nil { + t.Fatalf("unexpected kube client: %#v", client) + } +} + +func TestKubeClientRequestErrorBranches(t *testing.T) { + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == "/ok": + w.WriteHeader(http.StatusNoContent) + case r.URL.Path == "/bad-json": + _, _ = w.Write([]byte("{")) + case r.Method == http.MethodDelete && r.URL.Path == "/fail": + http.Error(w, "delete denied", http.StatusInternalServerError) + default: + http.NotFound(w, r) + } + })) + defer kube.Close() + client := kubeClientFactoryForURL(kube.URL, kube.Client()) + + if err := client.jsonRequest(http.MethodPost, "/ok", map[string]any{"bad": func() {}}, nil); err == nil { + t.Fatal("expected JSON marshal error") + } + if err := client.jsonRequest(http.MethodGet, "/ok", nil, nil); err != nil { + t.Fatalf("expected nil out request to pass: %v", err) + } + if err := client.jsonRequest(http.MethodGet, "/bad-json", nil, &map[string]any{}); err == nil { + t.Fatal("expected JSON decode error") + } + if err := client.deleteRequest("/fail"); err == nil || !strings.Contains(err.Error(), "delete denied") { + t.Fatalf("expected delete failure body, got %v", err) + } + + badURLClient := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient} + if err := badURLClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil { + t.Fatal("expected bad URL jsonRequest error") + } + if err := badURLClient.deleteRequest("/ok"); err == nil { + t.Fatal("expected bad URL deleteRequest error") + } + + transportErrClient := &kubeClient{ + baseURL: "http://example.test", + token: "tok", + client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("transport down") + })}, + } + if err := transportErrClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil || !strings.Contains(err.Error(), "transport down") { + t.Fatalf("expected transport jsonRequest error, got %v", err) + } + if err := transportErrClient.deleteRequest("/ok"); err == nil || !strings.Contains(err.Error(), "transport down") { + t.Fatalf("expected transport deleteRequest error, got %v", err) + } +} + func TestKubeClientAndPodHelpers(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { @@ -107,6 +216,37 @@ func TestKubeClientAndPodHelpers(t *testing.T) { } } +func TestClusterListErrorBranches(t *testing.T) { + origFactory := kubeClientFactory + kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } + if nodes := clusterNodes(); nodes != nil { + t.Fatalf("expected nil nodes on factory error, got %#v", nodes) + } + if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { + t.Fatalf("expected nil loads on factory error, got %#v", loads) + } + kubeClientFactory = origFactory + + if loads := clusterActiveRemotePodLoads(" ", "build"); loads != nil { + t.Fatalf("expected nil loads for empty namespace, got %#v", loads) + } + + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "api unavailable", http.StatusServiceUnavailable) + })) + defer kube.Close() + kubeClientFactory = func() (*kubeClient, error) { + return kubeClientFactoryForURL(kube.URL, kube.Client()), nil + } + t.Cleanup(func() { kubeClientFactory = origFactory }) + if nodes := clusterNodes(); nodes != nil { + t.Fatalf("expected nil nodes on list error, got %#v", nodes) + } + if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { + t.Fatalf("expected nil loads on list error, got %#v", loads) + } +} + func TestDeleteNodeObjectFallback(t *testing.T) { tmp := t.TempDir() kubectl := filepath.Join(tmp, "kubectl") @@ -168,3 +308,213 @@ func TestClusterActiveRemotePodLoadsCountsOnlyLivePods(t *testing.T) { t.Fatalf("expected succeeded pod to be ignored, got %#v", loads) } } + +func TestClusterActiveRemotePodLoadsFiltersEdges(t *testing.T) { + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": + _ = json.NewEncoder(w).Encode(map[string]any{ + "items": []any{ + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "flash"}}, + "spec": map[string]any{"nodeName": "wrong-run"}, + "status": map[string]any{"phase": "Running"}, + }, + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": ""}, + "status": map[string]any{"phase": "Running"}, + }, + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "failed"}, + "status": map[string]any{"phase": "Failed"}, + }, + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "good"}, + "status": map[string]any{"phase": "Pending"}, + }, + }, + }) + default: + http.NotFound(w, r) + } + })) + defer kube.Close() + + origFactory := kubeClientFactory + kubeClientFactory = func() (*kubeClient, error) { + return kubeClientFactoryForURL(kube.URL, kube.Client()), nil + } + t.Cleanup(func() { kubeClientFactory = origFactory }) + + loads := clusterActiveRemotePodLoads("maintenance", "build") + if len(loads) != 1 || loads["good"] != 1 { + t.Fatalf("unexpected filtered loads: %#v", loads) + } +} + +func TestRunRemotePodTerminalEdgeStates(t *testing.T) { + cases := []struct { + name string + phase string + reason string + message string + logStatus int + logBody string + want string + wantErr string + }{ + {name: "success uses logs when message empty", phase: "Succeeded", logStatus: http.StatusOK, logBody: "fallback payload", want: "fallback payload"}, + {name: "success reports missing payload and log error", phase: "Succeeded", logStatus: http.StatusInternalServerError, logBody: "no logs", wantErr: "logs unavailable"}, + {name: "success reports missing payload", phase: "Succeeded", logStatus: http.StatusOK, logBody: "", wantErr: "did not return a result payload"}, + {name: "failed uses logs when message empty", phase: "Failed", logStatus: http.StatusOK, logBody: "worker failed", wantErr: "worker failed"}, + {name: "failed falls back to default reason", phase: "Failed", logStatus: http.StatusOK, logBody: "", wantErr: "remote worker failed before reporting details"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + kube := remotePodStateServer(t, tc.phase, tc.reason, tc.message, tc.logStatus, tc.logBody) + installKubeFactory(t, kube) + app := newTestApp(t) + app.settings.Namespace = "maintenance" + + got, err := app.runRemotePod("", "metis-case", map[string]any{}) + if tc.wantErr != "" { + if err == nil || !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("expected error containing %q, got result=%q err=%v", tc.wantErr, got, err) + } + return + } + if err != nil || got != tc.want { + t.Fatalf("runRemotePod = %q err=%v", got, err) + } + }) + } +} + +func TestRunRemotePodProgressAndRequestFailures(t *testing.T) { + origFactory := kubeClientFactory + kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } + app := newTestApp(t) + if _, err := app.runRemotePod("", "metis-offline", map[string]any{}); err == nil { + t.Fatal("expected factory error") + } + kubeClientFactory = origFactory + + postFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost { + http.Error(w, "post denied", http.StatusForbidden) + return + } + w.WriteHeader(http.StatusOK) + })) + defer postFail.Close() + installKubeFactory(t, postFail) + app = newTestApp(t) + app.settings.Namespace = "maintenance" + if _, err := app.runRemotePod("", "metis-post-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "post denied") { + t.Fatalf("expected post failure, got %v", err) + } + + stateFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost: + w.WriteHeader(http.StatusCreated) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): + http.Error(w, "state denied", http.StatusForbidden) + default: + w.WriteHeader(http.StatusOK) + } + })) + defer stateFail.Close() + installKubeFactory(t, stateFail) + app = newTestApp(t) + app.settings.Namespace = "maintenance" + if _, err := app.runRemotePod("", "metis-state-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "state denied") { + t.Fatalf("expected state failure, got %v", err) + } + + progressLog := ProgressLogLine(RemoteProgressUpdate{Stage: "flash", ProgressPct: 92, Message: "writing", WrittenBytes: 10, TotalBytes: 20}) + progressServer := remotePodStateServer(t, "Succeeded", "Completed", `{"ok":true}`, http.StatusOK, progressLog) + installKubeFactory(t, progressServer) + app = newTestApp(t) + app.settings.Namespace = "maintenance" + job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz") + app.setJob(job.ID, func(j *Job) { + j.Status = JobRunning + j.Stage = "build" + j.StageStartedAt = time.Now() + }) + if got, err := app.runRemotePod(job.ID, "metis-progress", map[string]any{}); err != nil || got != `{"ok":true}` { + t.Fatalf("runRemotePod progress = %q err=%v", got, err) + } + if got := app.job(job.ID); got == nil || got.Written != 10 || got.Total != 20 { + t.Fatalf("expected progress update, got %#v", got) + } +} + +func TestRemotePodStateAndLogErrorBranches(t *testing.T) { + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/state-fail"): + http.Error(w, "state unavailable", http.StatusServiceUnavailable) + case r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/log"): + http.Error(w, "plain log failure", http.StatusInternalServerError) + default: + http.NotFound(w, r) + } + })) + defer kube.Close() + app := newTestApp(t) + app.settings.Namespace = "maintenance" + client := kubeClientFactoryForURL(kube.URL, kube.Client()) + + if _, err := app.remotePodState(client, "state-fail"); err == nil || !strings.Contains(err.Error(), "state unavailable") { + t.Fatalf("expected state error, got %v", err) + } + if _, err := app.remotePodLogs(client, "log-fail"); err == nil || !strings.Contains(err.Error(), "plain log failure") { + t.Fatalf("expected plain log error, got %v", err) + } + + badURL := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient} + if _, err := app.remotePodLogs(badURL, "log-fail"); err == nil { + t.Fatal("expected log request creation error") + } + transportErr := &kubeClient{ + baseURL: "http://example.test", + token: "tok", + client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("logs transport down") + })}, + } + if _, err := app.remotePodLogs(transportErr, "log-fail"); err == nil || !strings.Contains(err.Error(), "logs transport down") { + t.Fatalf("expected log transport error, got %v", err) + } +} + +func remotePodStateServer(t *testing.T, phase, reason, message string, logStatus int, logBody string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): + w.WriteHeader(http.StatusCreated) + case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/pods/"): + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"): + w.WriteHeader(logStatus) + _, _ = w.Write([]byte(logBody)) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): + _ = json.NewEncoder(w).Encode(map[string]any{ + "metadata": map[string]any{"name": filepath.Base(r.URL.Path)}, + "status": map[string]any{ + "phase": phase, + "reason": reason, + "message": message, + }, + }) + default: + http.NotFound(w, r) + } + })) +} diff --git a/pkg/service/remote_error_test.go b/pkg/service/remote_error_test.go index b20ad70..9ab9615 100644 --- a/pkg/service/remote_error_test.go +++ b/pkg/service/remote_error_test.go @@ -1,7 +1,12 @@ package service import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" "path/filepath" + "strings" "testing" "time" ) @@ -59,3 +64,360 @@ func TestRemoteWorkflowMissingRunnerImageBranch(t *testing.T) { t.Fatalf("expected build job error, got %#v", got) } } + +func TestRefreshDevicesDefaultSortAndErrorBranches(t *testing.T) { + t.Run("default host and deterministic sorting", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{ + deviceMessage: `{"devices":[{"name":"sdc","path":"/dev/sdc","model":"Micro SD","transport":"usb","type":"disk","removable":true,"hotplug":true,"size_bytes":64000000000},{"name":"sdb","path":"/dev/sdb","model":"Micro SD","transport":"usb","type":"disk","removable":true,"hotplug":true,"size_bytes":32000000000},{"name":"sda","path":"/dev/sda","model":"Micro SD","transport":"usb","type":"disk","removable":true,"hotplug":true,"size_bytes":32000000000}]}`, + }) + installKubeFactory(t, kube) + app := remoteTestApp(t, nil) + + devices, err := app.RefreshDevices("") + if err != nil { + t.Fatalf("RefreshDevices: %v", err) + } + if len(devices) != 3 || devices[0].Path != "/dev/sda" || devices[1].Path != "/dev/sdb" { + t.Fatalf("unexpected sorted devices: %#v", devices) + } + }) + + t.Run("remote pod failure records device error", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{devicePhase: "Failed", deviceMessage: "device scan failed"}) + installKubeFactory(t, kube) + app := remoteTestApp(t, nil) + + if _, err := app.RefreshDevices("titan-22"); err == nil || !strings.Contains(err.Error(), "device scan failed") { + t.Fatalf("expected device scan failure, got %v", err) + } + if _, err := app.cachedDevices("titan-22"); err == nil || !strings.Contains(err.Error(), "device scan failed") { + t.Fatalf("expected cached device error, got %v", err) + } + }) + + t.Run("malformed device payload records decode error", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{deviceMessage: "{"}) + installKubeFactory(t, kube) + app := remoteTestApp(t, nil) + + if _, err := app.RefreshDevices("titan-22"); err == nil || !strings.Contains(err.Error(), "decode remote devices") { + t.Fatalf("expected device decode failure, got %v", err) + } + }) +} + +func TestRunBuildAdditionalRemoteBranches(t *testing.T) { + t.Run("missing inventory node", func(t *testing.T) { + app := remoteTestApp(t, nil) + job := app.newJob("build", "missing-node", "", "") + app.runBuild(job, false) + if got := app.job(job.ID); got == nil || got.Status != JobError { + t.Fatalf("expected missing-node job error, got %#v", got) + } + }) + + t.Run("no eligible builder", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{nodes: []map[string]any{}}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("build", "titan-15", "", "") + app.runBuild(job, false) + if got := app.job(job.ID); got == nil || got.Status != JobError || !strings.Contains(got.Error, "no build host") { + t.Fatalf("expected builder selection error, got %#v", got) + } + }) + + t.Run("build pod failure", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{buildPhase: "Failed", buildMessage: "build crashed"}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("build", "titan-15", "", "") + app.runBuild(job, false) + if got := app.job(job.ID); got == nil || got.Status != JobError || !strings.Contains(got.Error, "build crashed") { + t.Fatalf("expected build pod error, got %#v", got) + } + }) + + t.Run("build output decode failure", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{buildMessage: "{"}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("build", "titan-15", "", "") + app.runBuild(job, false) + if got := app.job(job.ID); got == nil || got.Status != JobError || !strings.Contains(got.Error, "decode remote build output") { + t.Fatalf("expected build decode error, got %#v", got) + } + }) + + t.Run("artifact persistence failure", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + app.settings.ArtifactStatePath = t.TempDir() + + job := app.newJob("build", "titan-15", "", "") + app.runBuild(job, false) + if got := app.job(job.ID); got == nil || got.Status != JobError { + t.Fatalf("expected artifact persist error, got %#v", got) + } + }) + + t.Run("prune warning still completes build", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{}) + harbor := harborPruneFailureServer(t) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("build", "titan-15", "", "") + app.runBuild(job, false) + got := app.job(job.ID) + if got == nil || got.Status != JobDone { + t.Fatalf("expected build to finish despite prune warning, got %#v", got) + } + if events := app.recentEvents(5); len(events) == 0 || events[0].Kind != "image.build" { + t.Fatalf("expected image build event, got %#v", events) + } + }) + + t.Run("flash preflight rejects stale device", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("replace", "titan-15", "titan-22", "/dev/sda") + app.runBuild(job, true) + if got := app.job(job.ID); got == nil || got.Status != JobError || !strings.Contains(got.Error, "not a current flash candidate") { + t.Fatalf("expected stale device error, got %#v", got) + } + }) + + t.Run("flash pod failure", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{flashPhase: "Failed", flashMessage: "flash failed"}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz") + app.runBuild(job, true) + if got := app.job(job.ID); got == nil || got.Status != JobError || !strings.Contains(got.Error, "flash failed") { + t.Fatalf("expected flash pod error, got %#v", got) + } + }) + + t.Run("host tmp flash completion message", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + app := remoteTestApp(t, harbor) + + job := app.newJob("replace", "titan-15", "titan-22", hostTmpDevicePath) + app.runBuild(job, true) + if got := app.job(job.ID); got == nil || got.Status != JobDone || !strings.Contains(got.Message, "host /tmp") { + t.Fatalf("expected hosttmp completion, got %#v", got) + } + }) + + t.Run("node delete warning still flashes", func(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{deleteNodeStatus: http.StatusInternalServerError}) + harbor := fakeHarborServer(t, true) + installKubeFactory(t, kube) + tmp := t.TempDir() + kubectl := filepath.Join(tmp, "kubectl") + if err := os.WriteFile(kubectl, []byte("#!/usr/bin/env sh\nprintf 'delete denied' >&2\nexit 1\n"), 0o755); err != nil { + t.Fatal(err) + } + t.Setenv("PATH", tmp+string(os.PathListSeparator)+os.Getenv("PATH")) + app := remoteTestApp(t, harbor) + + job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz") + app.runBuild(job, true) + if got := app.job(job.ID); got == nil || got.Status != JobDone { + t.Fatalf("expected flash success despite delete warning, got %#v", got) + } + found := false + for _, event := range app.recentEvents(10) { + if event.Kind == "node.delete.warning" { + found = true + } + } + if !found { + t.Fatalf("expected node.delete.warning event, got %#v", app.recentEvents(10)) + } + }) +} + +func TestFlashArtifactAndHeartbeatBranches(t *testing.T) { + kube := remoteWorkflowKubeServer(t, remoteKubeOptions{}) + installKubeFactory(t, kube) + app := remoteTestApp(t, nil) + + job := app.newJob("replace", "titan-15", "missing-host", "/dev/sdz") + if err := app.flashArtifact(job.ID, "registry.example/metis/titan-15"); err == nil || !strings.Contains(err.Error(), "not a current cluster node") { + t.Fatalf("expected missing host flashArtifact error, got %v", err) + } + + app.heartbeatRemoteJob("") + app.heartbeatRemoteJob(job.ID) + if got := app.job(job.ID); got == nil || got.ProgressPct != 0 { + t.Fatalf("queued heartbeat should be a no-op, got %#v", got) + } + + app.setJob(job.ID, func(j *Job) { + j.Status = JobRunning + j.Stage = "preflight" + j.Device = "/dev/sdz" + j.Host = "titan-22" + j.ProgressPct = 10 + }) + app.heartbeatRemoteJob(job.ID) + if got := app.job(job.ID); got == nil || got.ProgressPct != 80 || !strings.Contains(got.Message, "Validating") { + t.Fatalf("preflight heartbeat = %#v", got) + } + + app.setJob(job.ID, func(j *Job) { + j.Stage = "flash" + j.ProgressPct = 80 + j.Written = 120 + j.Total = 100 + }) + app.heartbeatRemoteJob(job.ID) + if got := app.job(job.ID); got == nil || got.ProgressPct != 98 || !strings.Contains(got.Message, "Writing") { + t.Fatalf("flash byte heartbeat = %#v", got) + } + + app.setJob(job.ID, func(j *Job) { + j.Stage = "flash" + j.StageStartedAt = time.Time{} + j.StartedAt = time.Now().Add(-20 * time.Second) + j.ProgressPct = 80 + j.Written = 0 + j.Total = 0 + }) + app.heartbeatRemoteJob(job.ID) + if got := app.job(job.ID); got == nil || got.ProgressPct <= 80 || !strings.Contains(got.Message, "Writing") { + t.Fatalf("flash elapsed heartbeat = %#v", got) + } +} + +type remoteKubeOptions struct { + nodes []map[string]any + devicePhase string + deviceMessage string + buildPhase string + buildMessage string + flashPhase string + flashMessage string + deleteNodeStatus int +} + +func remoteTestApp(t *testing.T, harbor *httptest.Server) *App { + t.Helper() + app := newTestApp(t) + app.settings.Namespace = "maintenance" + app.settings.RunnerImageARM64 = "runner:arm64" + app.settings.HarborProject = "metis" + app.settings.HarborRegistry = "registry.example" + app.settings.ArtifactStatePath = filepath.Join(t.TempDir(), "artifacts.json") + if harbor != nil { + app.settings.HarborAPIBase = harbor.URL + "/api/v2.0" + app.settings.HarborUsername = "admin" + app.settings.HarborPassword = "pw" + } + return app +} + +func remoteWorkflowKubeServer(t *testing.T, opts remoteKubeOptions) *httptest.Server { + t.Helper() + devicePhase := defaultString(opts.devicePhase, "Succeeded") + deviceMessage := defaultString(opts.deviceMessage, `{"devices":[{"name":"sdz","path":"/dev/sdz","model":"Micro SD","transport":"usb","type":"disk","removable":true,"hotplug":true,"size_bytes":32000000000},{"name":"tmp","path":"hosttmp:///tmp","model":"Host /tmp","transport":"test","type":"file","note":"Test-only host write target under /tmp","size_bytes":1}]}`) + buildPhase := defaultString(opts.buildPhase, "Succeeded") + buildMessage := defaultString(opts.buildMessage, `{"local_path":"/workspace/build/titan-15.img.xz","compressed":true,"size_bytes":1234,"build_tag":"build-1"}`) + flashPhase := defaultString(opts.flashPhase, "Succeeded") + flashMessage := defaultString(opts.flashMessage, `{"dest_path":"/tmp/metis-flash-test/titan-15.img"}`) + nodes := opts.nodes + if nodes == nil { + nodes = []map[string]any{ + { + "metadata": map[string]any{ + "name": "titan-22", + "labels": map[string]string{ + "kubernetes.io/arch": "arm64", + "hardware": "rpi5", + "node-role.kubernetes.io/worker": "true", + }, + }, + "spec": map[string]any{"unschedulable": false}, + }, + } + } + deleteNodeStatus := opts.deleteNodeStatus + if deleteNodeStatus == 0 { + deleteNodeStatus = http.StatusOK + } + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes": + _ = json.NewEncoder(w).Encode(map[string]any{"items": nodes}) + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": + _ = json.NewEncoder(w).Encode(map[string]any{"items": []any{}}) + case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): + w.WriteHeader(http.StatusCreated) + case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/nodes/"): + w.WriteHeader(deleteNodeStatus) + case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/pods/"): + w.WriteHeader(http.StatusOK) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"): + _, _ = w.Write([]byte("remote logs")) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): + podName := filepath.Base(r.URL.Path) + phase, message := "Succeeded", "{}" + switch { + case strings.Contains(podName, "devices"): + phase, message = devicePhase, deviceMessage + case strings.Contains(podName, "build"): + phase, message = buildPhase, buildMessage + case strings.Contains(podName, "flash"): + phase, message = flashPhase, flashMessage + } + _ = json.NewEncoder(w).Encode(map[string]any{ + "metadata": map[string]any{"name": podName}, + "status": map[string]any{ + "phase": phase, + "reason": "Completed", + "message": message, + }, + }) + default: + http.NotFound(w, r) + } + })) +} + +func harborPruneFailureServer(t *testing.T) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && strings.HasPrefix(r.URL.Path, "/api/v2.0/projects"): + _ = json.NewEncoder(w).Encode([]map[string]string{{"name": "metis"}}) + case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/artifacts"): + http.Error(w, "artifact list failed", http.StatusInternalServerError) + default: + http.NotFound(w, r) + } + })) +} + +func defaultString(value, fallback string) string { + if strings.TrimSpace(value) == "" { + return fallback + } + return value +}