package service import ( "encoding/json" "encoding/pem" "errors" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" "time" ) type roundTripFunc func(*http.Request) (*http.Response, error) func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } func TestInClusterKubeClientMissingEnv(t *testing.T) { t.Setenv("KUBERNETES_SERVICE_HOST", "") t.Setenv("KUBERNETES_SERVICE_PORT", "") if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected inClusterKubeClient error without env") } } func TestInClusterKubeClientFileBranches(t *testing.T) { origTokenPath := kubeServiceAccountTokenPath origCAPath := kubeServiceAccountCAPath t.Cleanup(func() { kubeServiceAccountTokenPath = origTokenPath kubeServiceAccountCAPath = origCAPath }) dir := t.TempDir() tokenPath := filepath.Join(dir, "token") caPath := filepath.Join(dir, "ca.crt") kubeServiceAccountTokenPath = tokenPath kubeServiceAccountCAPath = caPath t.Setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1") t.Setenv("KUBERNETES_SERVICE_PORT", "6443") if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected token read error") } if err := os.WriteFile(tokenPath, []byte(" token \n"), 0o600); err != nil { t.Fatal(err) } if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected CA read error") } if err := os.WriteFile(caPath, []byte("not a certificate"), 0o600); err != nil { t.Fatal(err) } if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected invalid CA error") } tlsServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) })) defer tlsServer.Close() cert := tlsServer.Certificate() caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}) if err := os.WriteFile(caPath, caPEM, 0o600); err != nil { t.Fatal(err) } client, err := inClusterKubeClient() if err != nil { t.Fatalf("inClusterKubeClient: %v", err) } if client.baseURL != "https://10.0.0.1:6443" || client.token != "token" || client.client == nil { t.Fatalf("unexpected kube client: %#v", client) } } func TestKubeClientRequestErrorBranches(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.URL.Path == "/ok": w.WriteHeader(http.StatusNoContent) case r.URL.Path == "/bad-json": _, _ = w.Write([]byte("{")) case r.Method == http.MethodDelete && r.URL.Path == "/fail": http.Error(w, "delete denied", http.StatusInternalServerError) default: http.NotFound(w, r) } })) defer kube.Close() client := kubeClientFactoryForURL(kube.URL, kube.Client()) if err := client.jsonRequest(http.MethodPost, "/ok", map[string]any{"bad": func() {}}, nil); err == nil { t.Fatal("expected JSON marshal error") } if err := client.jsonRequest(http.MethodGet, "/ok", nil, nil); err != nil { t.Fatalf("expected nil out request to pass: %v", err) } if err := client.jsonRequest(http.MethodGet, "/bad-json", nil, &map[string]any{}); err == nil { t.Fatal("expected JSON decode error") } if err := client.deleteRequest("/fail"); err == nil || !strings.Contains(err.Error(), "delete denied") { t.Fatalf("expected delete failure body, got %v", err) } badURLClient := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient} if err := badURLClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil { t.Fatal("expected bad URL jsonRequest error") } if err := badURLClient.deleteRequest("/ok"); err == nil { t.Fatal("expected bad URL deleteRequest error") } transportErrClient := &kubeClient{ baseURL: "http://example.test", token: "tok", client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { return nil, errors.New("transport down") })}, } if err := transportErrClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil || !strings.Contains(err.Error(), "transport down") { t.Fatalf("expected transport jsonRequest error, got %v", err) } if err := transportErrClient.deleteRequest("/ok"); err == nil || !strings.Contains(err.Error(), "transport down") { t.Fatalf("expected transport deleteRequest error, got %v", err) } } func TestKubeClientAndPodHelpers(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes": _ = json.NewEncoder(w).Encode(map[string]any{ "items": []any{ map[string]any{ "metadata": map[string]any{"name": "b", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}}, "spec": map[string]any{"unschedulable": false}, }, map[string]any{ "metadata": map[string]any{"name": "a", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}}, "spec": map[string]any{"unschedulable": false}, }, }, }) case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): w.WriteHeader(http.StatusCreated) case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/nodes/"): w.WriteHeader(http.StatusNotFound) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"): http.Error(w, "proxy error from 127.0.0.1:6443", http.StatusBadGateway) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): _ = json.NewEncoder(w).Encode(map[string]any{ "metadata": map[string]any{"name": filepath.Base(r.URL.Path)}, "status": map[string]any{ "phase": "Failed", "reason": "CrashLoopBackOff", "message": "boom", "containerStatuses": []any{ map[string]any{ "state": map[string]any{ "waiting": map[string]any{"reason": "ImagePullBackOff", "message": "pulling"}, "terminated": map[string]any{"reason": "Completed", "message": "done"}, }, }, }, }, }) default: http.NotFound(w, r) } })) defer kube.Close() client := kubeClientFactoryForURL(kube.URL, kube.Client()) if err := client.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &map[string]any{}); err != nil { t.Fatalf("jsonRequest: %v", err) } if err := client.deleteRequest("/api/v1/nodes/a"); err != nil { t.Fatalf("deleteRequest 404 should be nil: %v", err) } if err := client.jsonRequest(http.MethodGet, "/missing", nil, &map[string]any{}); err == nil { t.Fatal("expected jsonRequest failure on 404") } origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return client, nil } t.Cleanup(func() { kubeClientFactory = origFactory }) nodes := clusterNodes() if len(nodes) != 2 || nodes[0].Name != "a" { t.Fatalf("clusterNodes sort mismatch: %#v", nodes) } app := newTestApp(t) app.settings.Namespace = "maintenance" app.settings.RunnerImageARM64 = "runner:arm64" state, err := app.remotePodState(client, "metis-build-test") if err != nil { t.Fatalf("remotePodState: %v", err) } if state.Reason != "Completed" || state.Message != "done" { t.Fatalf("expected terminated state override, got %#v", state) } if _, err := app.remotePodLogs(client, "metis-build-test"); err == nil || !strings.Contains(err.Error(), "could not reach the node kubelet log endpoint") { t.Fatalf("expected kubelet log endpoint error, got %v", err) } if _, err := app.runRemotePod("job-1", "metis-fail-test", map[string]any{}); err == nil { t.Fatal("expected runRemotePod failure") } if _, err := app.ensureDevice("titan-22", "missing"); err == nil { t.Fatal("expected ensureDevice missing target to fail") } } func TestClusterListErrorBranches(t *testing.T) { origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } if nodes := clusterNodes(); nodes != nil { t.Fatalf("expected nil nodes on factory error, got %#v", nodes) } if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { t.Fatalf("expected nil loads on factory error, got %#v", loads) } kubeClientFactory = origFactory if loads := clusterActiveRemotePodLoads(" ", "build"); loads != nil { t.Fatalf("expected nil loads for empty namespace, got %#v", loads) } kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, "api unavailable", http.StatusServiceUnavailable) })) defer kube.Close() kubeClientFactory = func() (*kubeClient, error) { return kubeClientFactoryForURL(kube.URL, kube.Client()), nil } t.Cleanup(func() { kubeClientFactory = origFactory }) if nodes := clusterNodes(); nodes != nil { t.Fatalf("expected nil nodes on list error, got %#v", nodes) } if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { t.Fatalf("expected nil loads on list error, got %#v", loads) } } func TestDeleteNodeObjectFallback(t *testing.T) { tmp := t.TempDir() kubectl := filepath.Join(tmp, "kubectl") if err := os.WriteFile(kubectl, []byte("#!/usr/bin/env bash\nset -eu\nprintf '%s' \"$*\" > \""+filepath.Join(tmp, "kubectl.args")+"\"\n"), 0o755); err != nil { t.Fatal(err) } t.Setenv("PATH", tmp+string(os.PathListSeparator)+os.Getenv("PATH")) origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } t.Cleanup(func() { kubeClientFactory = origFactory }) if err := deleteNodeObject("titan-15"); err != nil { t.Fatalf("deleteNodeObject fallback: %v", err) } } func TestClusterActiveRemotePodLoadsCountsOnlyLivePods(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": if got := r.URL.Query().Get("labelSelector"); got != "app=metis-remote,metis-run=build" { t.Fatalf("unexpected labelSelector %q", got) } _ = json.NewEncoder(w).Encode(map[string]any{ "items": []any{ map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": "titan-04"}, "status": map[string]any{"phase": "Running"}, }, map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": "titan-04"}, "status": map[string]any{"phase": "Pending"}, }, map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": "titan-05"}, "status": map[string]any{"phase": "Succeeded"}, }, }, }) default: http.NotFound(w, r) } })) defer kube.Close() origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return kubeClientFactoryForURL(kube.URL, kube.Client()), nil } t.Cleanup(func() { kubeClientFactory = origFactory }) loads := clusterActiveRemotePodLoads("maintenance", "build") if loads["titan-04"] != 2 { t.Fatalf("expected titan-04 load 2, got %#v", loads) } if _, ok := loads["titan-05"]; ok { t.Fatalf("expected succeeded pod to be ignored, got %#v", loads) } } func TestClusterActiveRemotePodLoadsFiltersEdges(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": _ = json.NewEncoder(w).Encode(map[string]any{ "items": []any{ map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "flash"}}, "spec": map[string]any{"nodeName": "wrong-run"}, "status": map[string]any{"phase": "Running"}, }, map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": ""}, "status": map[string]any{"phase": "Running"}, }, map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": "failed"}, "status": map[string]any{"phase": "Failed"}, }, map[string]any{ "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, "spec": map[string]any{"nodeName": "good"}, "status": map[string]any{"phase": "Pending"}, }, }, }) default: http.NotFound(w, r) } })) defer kube.Close() origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return kubeClientFactoryForURL(kube.URL, kube.Client()), nil } t.Cleanup(func() { kubeClientFactory = origFactory }) loads := clusterActiveRemotePodLoads("maintenance", "build") if len(loads) != 1 || loads["good"] != 1 { t.Fatalf("unexpected filtered loads: %#v", loads) } } func TestRunRemotePodTerminalEdgeStates(t *testing.T) { cases := []struct { name string phase string reason string message string logStatus int logBody string want string wantErr string }{ {name: "success uses logs when message empty", phase: "Succeeded", logStatus: http.StatusOK, logBody: "fallback payload", want: "fallback payload"}, {name: "success reports missing payload and log error", phase: "Succeeded", logStatus: http.StatusInternalServerError, logBody: "no logs", wantErr: "logs unavailable"}, {name: "success reports missing payload", phase: "Succeeded", logStatus: http.StatusOK, logBody: "", wantErr: "did not return a result payload"}, {name: "failed uses logs when message empty", phase: "Failed", logStatus: http.StatusOK, logBody: "worker failed", wantErr: "worker failed"}, {name: "failed falls back to default reason", phase: "Failed", logStatus: http.StatusOK, logBody: "", wantErr: "remote worker failed before reporting details"}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { kube := remotePodStateServer(t, tc.phase, tc.reason, tc.message, tc.logStatus, tc.logBody) installKubeFactory(t, kube) app := newTestApp(t) app.settings.Namespace = "maintenance" got, err := app.runRemotePod("", "metis-case", map[string]any{}) if tc.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("expected error containing %q, got result=%q err=%v", tc.wantErr, got, err) } return } if err != nil || got != tc.want { t.Fatalf("runRemotePod = %q err=%v", got, err) } }) } } func TestRunRemotePodProgressAndRequestFailures(t *testing.T) { origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } app := newTestApp(t) if _, err := app.runRemotePod("", "metis-offline", map[string]any{}); err == nil { t.Fatal("expected factory error") } kubeClientFactory = origFactory postFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodPost { http.Error(w, "post denied", http.StatusForbidden) return } w.WriteHeader(http.StatusOK) })) defer postFail.Close() installKubeFactory(t, postFail) app = newTestApp(t) app.settings.Namespace = "maintenance" if _, err := app.runRemotePod("", "metis-post-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "post denied") { t.Fatalf("expected post failure, got %v", err) } stateFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodPost: w.WriteHeader(http.StatusCreated) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): http.Error(w, "state denied", http.StatusForbidden) default: w.WriteHeader(http.StatusOK) } })) defer stateFail.Close() installKubeFactory(t, stateFail) app = newTestApp(t) app.settings.Namespace = "maintenance" if _, err := app.runRemotePod("", "metis-state-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "state denied") { t.Fatalf("expected state failure, got %v", err) } progressLog := ProgressLogLine(RemoteProgressUpdate{Stage: "flash", ProgressPct: 92, Message: "writing", WrittenBytes: 10, TotalBytes: 20}) progressServer := remotePodStateServer(t, "Succeeded", "Completed", `{"ok":true}`, http.StatusOK, progressLog) installKubeFactory(t, progressServer) app = newTestApp(t) app.settings.Namespace = "maintenance" job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz") app.setJob(job.ID, func(j *Job) { j.Status = JobRunning j.Stage = "build" j.StageStartedAt = time.Now() }) if got, err := app.runRemotePod(job.ID, "metis-progress", map[string]any{}); err != nil || got != `{"ok":true}` { t.Fatalf("runRemotePod progress = %q err=%v", got, err) } if got := app.job(job.ID); got == nil || got.Written != 10 || got.Total != 20 { t.Fatalf("expected progress update, got %#v", got) } } func TestRemotePodStateAndLogErrorBranches(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/state-fail"): http.Error(w, "state unavailable", http.StatusServiceUnavailable) case r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/log"): http.Error(w, "plain log failure", http.StatusInternalServerError) default: http.NotFound(w, r) } })) defer kube.Close() app := newTestApp(t) app.settings.Namespace = "maintenance" client := kubeClientFactoryForURL(kube.URL, kube.Client()) if _, err := app.remotePodState(client, "state-fail"); err == nil || !strings.Contains(err.Error(), "state unavailable") { t.Fatalf("expected state error, got %v", err) } if _, err := app.remotePodLogs(client, "log-fail"); err == nil || !strings.Contains(err.Error(), "plain log failure") { t.Fatalf("expected plain log error, got %v", err) } badURL := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient} if _, err := app.remotePodLogs(badURL, "log-fail"); err == nil { t.Fatal("expected log request creation error") } transportErr := &kubeClient{ baseURL: "http://example.test", token: "tok", client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { return nil, errors.New("logs transport down") })}, } if _, err := app.remotePodLogs(transportErr, "log-fail"); err == nil || !strings.Contains(err.Error(), "logs transport down") { t.Fatalf("expected log transport error, got %v", err) } } func remotePodStateServer(t *testing.T, phase, reason, message string, logStatus int, logBody string) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): w.WriteHeader(http.StatusCreated) case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/pods/"): w.WriteHeader(http.StatusOK) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"): w.WriteHeader(logStatus) _, _ = w.Write([]byte(logBody)) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): _ = json.NewEncoder(w).Encode(map[string]any{ "metadata": map[string]any{"name": filepath.Base(r.URL.Path)}, "status": map[string]any{ "phase": phase, "reason": reason, "message": message, }, }) default: http.NotFound(w, r) } })) }