package service import ( "encoding/json" "encoding/pem" "errors" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" ) type roundTripFunc func(*http.Request) (*http.Response, error) func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } func TestInClusterKubeClientMissingEnv(t *testing.T) { t.Setenv("KUBERNETES_SERVICE_HOST", "") t.Setenv("KUBERNETES_SERVICE_PORT", "") if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected inClusterKubeClient error without env") } } func TestInClusterKubeClientFileBranches(t *testing.T) { origTokenPath := kubeServiceAccountTokenPath origCAPath := kubeServiceAccountCAPath t.Cleanup(func() { kubeServiceAccountTokenPath = origTokenPath kubeServiceAccountCAPath = origCAPath }) dir := t.TempDir() tokenPath := filepath.Join(dir, "token") caPath := filepath.Join(dir, "ca.crt") kubeServiceAccountTokenPath = tokenPath kubeServiceAccountCAPath = caPath t.Setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1") t.Setenv("KUBERNETES_SERVICE_PORT", "6443") if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected token read error") } if err := os.WriteFile(tokenPath, []byte(" token \n"), 0o600); err != nil { t.Fatal(err) } if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected CA read error") } if err := os.WriteFile(caPath, []byte("not a certificate"), 0o600); err != nil { t.Fatal(err) } if _, err := inClusterKubeClient(); err == nil { t.Fatal("expected invalid CA error") } tlsServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) })) defer tlsServer.Close() cert := tlsServer.Certificate() caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}) if err := os.WriteFile(caPath, caPEM, 0o600); err != nil { t.Fatal(err) } client, err := inClusterKubeClient() if err != nil { t.Fatalf("inClusterKubeClient: %v", err) } if client.baseURL != "https://10.0.0.1:6443" || client.token != "token" || client.client == nil { t.Fatalf("unexpected kube client: %#v", client) } } func TestKubeClientRequestErrorBranches(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.URL.Path == "/ok": w.WriteHeader(http.StatusNoContent) case r.URL.Path == "/bad-json": _, _ = w.Write([]byte("{")) case r.Method == http.MethodDelete && r.URL.Path == "/fail": http.Error(w, "delete denied", http.StatusInternalServerError) default: http.NotFound(w, r) } })) defer kube.Close() client := kubeClientFactoryForURL(kube.URL, kube.Client()) if err := client.jsonRequest(http.MethodPost, "/ok", map[string]any{"bad": func() {}}, nil); err == nil { t.Fatal("expected JSON marshal error") } if err := client.jsonRequest(http.MethodGet, "/ok", nil, nil); err != nil { t.Fatalf("expected nil out request to pass: %v", err) } if err := client.jsonRequest(http.MethodGet, "/bad-json", nil, &map[string]any{}); err == nil { t.Fatal("expected JSON decode error") } if err := client.deleteRequest("/fail"); err == nil || !strings.Contains(err.Error(), "delete denied") { t.Fatalf("expected delete failure body, got %v", err) } badURLClient := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient} if err := badURLClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil { t.Fatal("expected bad URL jsonRequest error") } if err := badURLClient.deleteRequest("/ok"); err == nil { t.Fatal("expected bad URL deleteRequest error") } transportErrClient := &kubeClient{ baseURL: "http://example.test", token: "tok", client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) { return nil, errors.New("transport down") })}, } if err := transportErrClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil || !strings.Contains(err.Error(), "transport down") { t.Fatalf("expected transport jsonRequest error, got %v", err) } if err := transportErrClient.deleteRequest("/ok"); err == nil || !strings.Contains(err.Error(), "transport down") { t.Fatalf("expected transport deleteRequest error, got %v", err) } } func TestKubeClientAndPodHelpers(t *testing.T) { kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes": _ = json.NewEncoder(w).Encode(map[string]any{ "items": []any{ map[string]any{ "metadata": map[string]any{"name": "b", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}}, "spec": map[string]any{"unschedulable": false}, }, map[string]any{ "metadata": map[string]any{"name": "a", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}}, "spec": map[string]any{"unschedulable": false}, }, }, }) case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): w.WriteHeader(http.StatusCreated) case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/nodes/"): w.WriteHeader(http.StatusNotFound) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"): http.Error(w, "proxy error from 127.0.0.1:6443", http.StatusBadGateway) case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"): _ = json.NewEncoder(w).Encode(map[string]any{ "metadata": map[string]any{"name": filepath.Base(r.URL.Path)}, "status": map[string]any{ "phase": "Failed", "reason": "CrashLoopBackOff", "message": "boom", "containerStatuses": []any{ map[string]any{ "state": map[string]any{ "waiting": map[string]any{"reason": "ImagePullBackOff", "message": "pulling"}, "terminated": map[string]any{"reason": "Completed", "message": "done"}, }, }, }, }, }) default: http.NotFound(w, r) } })) defer kube.Close() client := kubeClientFactoryForURL(kube.URL, kube.Client()) if err := client.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &map[string]any{}); err != nil { t.Fatalf("jsonRequest: %v", err) } if err := client.deleteRequest("/api/v1/nodes/a"); err != nil { t.Fatalf("deleteRequest 404 should be nil: %v", err) } if err := client.jsonRequest(http.MethodGet, "/missing", nil, &map[string]any{}); err == nil { t.Fatal("expected jsonRequest failure on 404") } origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return client, nil } t.Cleanup(func() { kubeClientFactory = origFactory }) nodes := clusterNodes() if len(nodes) != 2 || nodes[0].Name != "a" { t.Fatalf("clusterNodes sort mismatch: %#v", nodes) } app := newTestApp(t) app.settings.Namespace = "maintenance" app.settings.RunnerImageARM64 = "runner:arm64" state, err := app.remotePodState(client, "metis-build-test") if err != nil { t.Fatalf("remotePodState: %v", err) } if state.Reason != "Completed" || state.Message != "done" { t.Fatalf("expected terminated state override, got %#v", state) } if _, err := app.remotePodLogs(client, "metis-build-test"); err == nil || !strings.Contains(err.Error(), "could not reach the node kubelet log endpoint") { t.Fatalf("expected kubelet log endpoint error, got %v", err) } if _, err := app.runRemotePod("job-1", "metis-fail-test", map[string]any{}); err == nil { t.Fatal("expected runRemotePod failure") } if _, err := app.ensureDevice("titan-22", "missing"); err == nil { t.Fatal("expected ensureDevice missing target to fail") } } func TestClusterListErrorBranches(t *testing.T) { origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } if nodes := clusterNodes(); nodes != nil { t.Fatalf("expected nil nodes on factory error, got %#v", nodes) } if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { t.Fatalf("expected nil loads on factory error, got %#v", loads) } kubeClientFactory = origFactory if loads := clusterActiveRemotePodLoads(" ", "build"); loads != nil { t.Fatalf("expected nil loads for empty namespace, got %#v", loads) } kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, "api unavailable", http.StatusServiceUnavailable) })) defer kube.Close() kubeClientFactory = func() (*kubeClient, error) { return kubeClientFactoryForURL(kube.URL, kube.Client()), nil } t.Cleanup(func() { kubeClientFactory = origFactory }) if nodes := clusterNodes(); nodes != nil { t.Fatalf("expected nil nodes on list error, got %#v", nodes) } if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil { t.Fatalf("expected nil loads on list error, got %#v", loads) } } func TestDeleteNodeObjectFallback(t *testing.T) { tmp := t.TempDir() kubectl := filepath.Join(tmp, "kubectl") if err := os.WriteFile(kubectl, []byte("#!/usr/bin/env bash\nset -eu\nprintf '%s' \"$*\" > \""+filepath.Join(tmp, "kubectl.args")+"\"\n"), 0o755); err != nil { t.Fatal(err) } t.Setenv("PATH", tmp+string(os.PathListSeparator)+os.Getenv("PATH")) origFactory := kubeClientFactory kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") } t.Cleanup(func() { kubeClientFactory = origFactory }) if err := deleteNodeObject("titan-15"); err != nil { t.Fatalf("deleteNodeObject fallback: %v", err) } }