metis/pkg/service/cluster_test.go

521 lines
19 KiB
Go

package service
import (
"encoding/json"
"encoding/pem"
"errors"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
)
type roundTripFunc func(*http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
func TestInClusterKubeClientMissingEnv(t *testing.T) {
t.Setenv("KUBERNETES_SERVICE_HOST", "")
t.Setenv("KUBERNETES_SERVICE_PORT", "")
if _, err := inClusterKubeClient(); err == nil {
t.Fatal("expected inClusterKubeClient error without env")
}
}
func TestInClusterKubeClientFileBranches(t *testing.T) {
origTokenPath := kubeServiceAccountTokenPath
origCAPath := kubeServiceAccountCAPath
t.Cleanup(func() {
kubeServiceAccountTokenPath = origTokenPath
kubeServiceAccountCAPath = origCAPath
})
dir := t.TempDir()
tokenPath := filepath.Join(dir, "token")
caPath := filepath.Join(dir, "ca.crt")
kubeServiceAccountTokenPath = tokenPath
kubeServiceAccountCAPath = caPath
t.Setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1")
t.Setenv("KUBERNETES_SERVICE_PORT", "6443")
if _, err := inClusterKubeClient(); err == nil {
t.Fatal("expected token read error")
}
if err := os.WriteFile(tokenPath, []byte(" token \n"), 0o600); err != nil {
t.Fatal(err)
}
if _, err := inClusterKubeClient(); err == nil {
t.Fatal("expected CA read error")
}
if err := os.WriteFile(caPath, []byte("not a certificate"), 0o600); err != nil {
t.Fatal(err)
}
if _, err := inClusterKubeClient(); err == nil {
t.Fatal("expected invalid CA error")
}
tlsServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer tlsServer.Close()
cert := tlsServer.Certificate()
caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw})
if err := os.WriteFile(caPath, caPEM, 0o600); err != nil {
t.Fatal(err)
}
client, err := inClusterKubeClient()
if err != nil {
t.Fatalf("inClusterKubeClient: %v", err)
}
if client.baseURL != "https://10.0.0.1:6443" || client.token != "token" || client.client == nil {
t.Fatalf("unexpected kube client: %#v", client)
}
}
func TestKubeClientRequestErrorBranches(t *testing.T) {
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "/ok":
w.WriteHeader(http.StatusNoContent)
case r.URL.Path == "/bad-json":
_, _ = w.Write([]byte("{"))
case r.Method == http.MethodDelete && r.URL.Path == "/fail":
http.Error(w, "delete denied", http.StatusInternalServerError)
default:
http.NotFound(w, r)
}
}))
defer kube.Close()
client := kubeClientFactoryForURL(kube.URL, kube.Client())
if err := client.jsonRequest(http.MethodPost, "/ok", map[string]any{"bad": func() {}}, nil); err == nil {
t.Fatal("expected JSON marshal error")
}
if err := client.jsonRequest(http.MethodGet, "/ok", nil, nil); err != nil {
t.Fatalf("expected nil out request to pass: %v", err)
}
if err := client.jsonRequest(http.MethodGet, "/bad-json", nil, &map[string]any{}); err == nil {
t.Fatal("expected JSON decode error")
}
if err := client.deleteRequest("/fail"); err == nil || !strings.Contains(err.Error(), "delete denied") {
t.Fatalf("expected delete failure body, got %v", err)
}
badURLClient := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient}
if err := badURLClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil {
t.Fatal("expected bad URL jsonRequest error")
}
if err := badURLClient.deleteRequest("/ok"); err == nil {
t.Fatal("expected bad URL deleteRequest error")
}
transportErrClient := &kubeClient{
baseURL: "http://example.test",
token: "tok",
client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errors.New("transport down")
})},
}
if err := transportErrClient.jsonRequest(http.MethodGet, "/ok", nil, nil); err == nil || !strings.Contains(err.Error(), "transport down") {
t.Fatalf("expected transport jsonRequest error, got %v", err)
}
if err := transportErrClient.deleteRequest("/ok"); err == nil || !strings.Contains(err.Error(), "transport down") {
t.Fatalf("expected transport deleteRequest error, got %v", err)
}
}
func TestKubeClientAndPodHelpers(t *testing.T) {
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes":
_ = json.NewEncoder(w).Encode(map[string]any{
"items": []any{
map[string]any{
"metadata": map[string]any{"name": "b", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}},
"spec": map[string]any{"unschedulable": false},
},
map[string]any{
"metadata": map[string]any{"name": "a", "labels": map[string]string{"kubernetes.io/arch": "arm64", "node-role.kubernetes.io/worker": "true"}},
"spec": map[string]any{"unschedulable": false},
},
},
})
case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"):
w.WriteHeader(http.StatusCreated)
case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/nodes/"):
w.WriteHeader(http.StatusNotFound)
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"):
http.Error(w, "proxy error from 127.0.0.1:6443", http.StatusBadGateway)
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"):
_ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": filepath.Base(r.URL.Path)},
"status": map[string]any{
"phase": "Failed",
"reason": "CrashLoopBackOff",
"message": "boom",
"containerStatuses": []any{
map[string]any{
"state": map[string]any{
"waiting": map[string]any{"reason": "ImagePullBackOff", "message": "pulling"},
"terminated": map[string]any{"reason": "Completed", "message": "done"},
},
},
},
},
})
default:
http.NotFound(w, r)
}
}))
defer kube.Close()
client := kubeClientFactoryForURL(kube.URL, kube.Client())
if err := client.jsonRequest(http.MethodGet, "/api/v1/nodes", nil, &map[string]any{}); err != nil {
t.Fatalf("jsonRequest: %v", err)
}
if err := client.deleteRequest("/api/v1/nodes/a"); err != nil {
t.Fatalf("deleteRequest 404 should be nil: %v", err)
}
if err := client.jsonRequest(http.MethodGet, "/missing", nil, &map[string]any{}); err == nil {
t.Fatal("expected jsonRequest failure on 404")
}
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) {
return client, nil
}
t.Cleanup(func() { kubeClientFactory = origFactory })
nodes := clusterNodes()
if len(nodes) != 2 || nodes[0].Name != "a" {
t.Fatalf("clusterNodes sort mismatch: %#v", nodes)
}
app := newTestApp(t)
app.settings.Namespace = "maintenance"
app.settings.RunnerImageARM64 = "runner:arm64"
state, err := app.remotePodState(client, "metis-build-test")
if err != nil {
t.Fatalf("remotePodState: %v", err)
}
if state.Reason != "Completed" || state.Message != "done" {
t.Fatalf("expected terminated state override, got %#v", state)
}
if _, err := app.remotePodLogs(client, "metis-build-test"); err == nil || !strings.Contains(err.Error(), "could not reach the node kubelet log endpoint") {
t.Fatalf("expected kubelet log endpoint error, got %v", err)
}
if _, err := app.runRemotePod("job-1", "metis-fail-test", map[string]any{}); err == nil {
t.Fatal("expected runRemotePod failure")
}
if _, err := app.ensureDevice("titan-22", "missing"); err == nil {
t.Fatal("expected ensureDevice missing target to fail")
}
}
func TestClusterListErrorBranches(t *testing.T) {
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") }
if nodes := clusterNodes(); nodes != nil {
t.Fatalf("expected nil nodes on factory error, got %#v", nodes)
}
if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil {
t.Fatalf("expected nil loads on factory error, got %#v", loads)
}
kubeClientFactory = origFactory
if loads := clusterActiveRemotePodLoads(" ", "build"); loads != nil {
t.Fatalf("expected nil loads for empty namespace, got %#v", loads)
}
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "api unavailable", http.StatusServiceUnavailable)
}))
defer kube.Close()
kubeClientFactory = func() (*kubeClient, error) {
return kubeClientFactoryForURL(kube.URL, kube.Client()), nil
}
t.Cleanup(func() { kubeClientFactory = origFactory })
if nodes := clusterNodes(); nodes != nil {
t.Fatalf("expected nil nodes on list error, got %#v", nodes)
}
if loads := clusterActiveRemotePodLoads("maintenance", "build"); loads != nil {
t.Fatalf("expected nil loads on list error, got %#v", loads)
}
}
func TestDeleteNodeObjectFallback(t *testing.T) {
tmp := t.TempDir()
kubectl := filepath.Join(tmp, "kubectl")
if err := os.WriteFile(kubectl, []byte("#!/usr/bin/env bash\nset -eu\nprintf '%s' \"$*\" > \""+filepath.Join(tmp, "kubectl.args")+"\"\n"), 0o755); err != nil {
t.Fatal(err)
}
t.Setenv("PATH", tmp+string(os.PathListSeparator)+os.Getenv("PATH"))
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") }
t.Cleanup(func() { kubeClientFactory = origFactory })
if err := deleteNodeObject("titan-15"); err != nil {
t.Fatalf("deleteNodeObject fallback: %v", err)
}
}
func TestClusterActiveRemotePodLoadsCountsOnlyLivePods(t *testing.T) {
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods":
if got := r.URL.Query().Get("labelSelector"); got != "app=metis-remote,metis-run=build" {
t.Fatalf("unexpected labelSelector %q", got)
}
_ = json.NewEncoder(w).Encode(map[string]any{
"items": []any{
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": "titan-04"},
"status": map[string]any{"phase": "Running"},
},
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": "titan-04"},
"status": map[string]any{"phase": "Pending"},
},
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": "titan-05"},
"status": map[string]any{"phase": "Succeeded"},
},
},
})
default:
http.NotFound(w, r)
}
}))
defer kube.Close()
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) {
return kubeClientFactoryForURL(kube.URL, kube.Client()), nil
}
t.Cleanup(func() { kubeClientFactory = origFactory })
loads := clusterActiveRemotePodLoads("maintenance", "build")
if loads["titan-04"] != 2 {
t.Fatalf("expected titan-04 load 2, got %#v", loads)
}
if _, ok := loads["titan-05"]; ok {
t.Fatalf("expected succeeded pod to be ignored, got %#v", loads)
}
}
func TestClusterActiveRemotePodLoadsFiltersEdges(t *testing.T) {
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods":
_ = json.NewEncoder(w).Encode(map[string]any{
"items": []any{
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "flash"}},
"spec": map[string]any{"nodeName": "wrong-run"},
"status": map[string]any{"phase": "Running"},
},
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": ""},
"status": map[string]any{"phase": "Running"},
},
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": "failed"},
"status": map[string]any{"phase": "Failed"},
},
map[string]any{
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
"spec": map[string]any{"nodeName": "good"},
"status": map[string]any{"phase": "Pending"},
},
},
})
default:
http.NotFound(w, r)
}
}))
defer kube.Close()
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) {
return kubeClientFactoryForURL(kube.URL, kube.Client()), nil
}
t.Cleanup(func() { kubeClientFactory = origFactory })
loads := clusterActiveRemotePodLoads("maintenance", "build")
if len(loads) != 1 || loads["good"] != 1 {
t.Fatalf("unexpected filtered loads: %#v", loads)
}
}
func TestRunRemotePodTerminalEdgeStates(t *testing.T) {
cases := []struct {
name string
phase string
reason string
message string
logStatus int
logBody string
want string
wantErr string
}{
{name: "success uses logs when message empty", phase: "Succeeded", logStatus: http.StatusOK, logBody: "fallback payload", want: "fallback payload"},
{name: "success reports missing payload and log error", phase: "Succeeded", logStatus: http.StatusInternalServerError, logBody: "no logs", wantErr: "logs unavailable"},
{name: "success reports missing payload", phase: "Succeeded", logStatus: http.StatusOK, logBody: "", wantErr: "did not return a result payload"},
{name: "failed uses logs when message empty", phase: "Failed", logStatus: http.StatusOK, logBody: "worker failed", wantErr: "worker failed"},
{name: "failed falls back to default reason", phase: "Failed", logStatus: http.StatusOK, logBody: "", wantErr: "remote worker failed before reporting details"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
kube := remotePodStateServer(t, tc.phase, tc.reason, tc.message, tc.logStatus, tc.logBody)
installKubeFactory(t, kube)
app := newTestApp(t)
app.settings.Namespace = "maintenance"
got, err := app.runRemotePod("", "metis-case", map[string]any{})
if tc.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("expected error containing %q, got result=%q err=%v", tc.wantErr, got, err)
}
return
}
if err != nil || got != tc.want {
t.Fatalf("runRemotePod = %q err=%v", got, err)
}
})
}
}
func TestRunRemotePodProgressAndRequestFailures(t *testing.T) {
origFactory := kubeClientFactory
kubeClientFactory = func() (*kubeClient, error) { return nil, errors.New("offline") }
app := newTestApp(t)
if _, err := app.runRemotePod("", "metis-offline", map[string]any{}); err == nil {
t.Fatal("expected factory error")
}
kubeClientFactory = origFactory
postFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
http.Error(w, "post denied", http.StatusForbidden)
return
}
w.WriteHeader(http.StatusOK)
}))
defer postFail.Close()
installKubeFactory(t, postFail)
app = newTestApp(t)
app.settings.Namespace = "maintenance"
if _, err := app.runRemotePod("", "metis-post-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "post denied") {
t.Fatalf("expected post failure, got %v", err)
}
stateFail := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodPost:
w.WriteHeader(http.StatusCreated)
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"):
http.Error(w, "state denied", http.StatusForbidden)
default:
w.WriteHeader(http.StatusOK)
}
}))
defer stateFail.Close()
installKubeFactory(t, stateFail)
app = newTestApp(t)
app.settings.Namespace = "maintenance"
if _, err := app.runRemotePod("", "metis-state-fail", map[string]any{}); err == nil || !strings.Contains(err.Error(), "state denied") {
t.Fatalf("expected state failure, got %v", err)
}
progressLog := ProgressLogLine(RemoteProgressUpdate{Stage: "flash", ProgressPct: 92, Message: "writing", WrittenBytes: 10, TotalBytes: 20})
progressServer := remotePodStateServer(t, "Succeeded", "Completed", `{"ok":true}`, http.StatusOK, progressLog)
installKubeFactory(t, progressServer)
app = newTestApp(t)
app.settings.Namespace = "maintenance"
job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz")
app.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "build"
j.StageStartedAt = time.Now()
})
if got, err := app.runRemotePod(job.ID, "metis-progress", map[string]any{}); err != nil || got != `{"ok":true}` {
t.Fatalf("runRemotePod progress = %q err=%v", got, err)
}
if got := app.job(job.ID); got == nil || got.Written != 10 || got.Total != 20 {
t.Fatalf("expected progress update, got %#v", got)
}
}
func TestRemotePodStateAndLogErrorBranches(t *testing.T) {
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/state-fail"):
http.Error(w, "state unavailable", http.StatusServiceUnavailable)
case r.Method == http.MethodGet && strings.HasSuffix(r.URL.Path, "/log"):
http.Error(w, "plain log failure", http.StatusInternalServerError)
default:
http.NotFound(w, r)
}
}))
defer kube.Close()
app := newTestApp(t)
app.settings.Namespace = "maintenance"
client := kubeClientFactoryForURL(kube.URL, kube.Client())
if _, err := app.remotePodState(client, "state-fail"); err == nil || !strings.Contains(err.Error(), "state unavailable") {
t.Fatalf("expected state error, got %v", err)
}
if _, err := app.remotePodLogs(client, "log-fail"); err == nil || !strings.Contains(err.Error(), "plain log failure") {
t.Fatalf("expected plain log error, got %v", err)
}
badURL := &kubeClient{baseURL: "http://%zz", token: "tok", client: http.DefaultClient}
if _, err := app.remotePodLogs(badURL, "log-fail"); err == nil {
t.Fatal("expected log request creation error")
}
transportErr := &kubeClient{
baseURL: "http://example.test",
token: "tok",
client: &http.Client{Transport: roundTripFunc(func(*http.Request) (*http.Response, error) {
return nil, errors.New("logs transport down")
})},
}
if _, err := app.remotePodLogs(transportErr, "log-fail"); err == nil || !strings.Contains(err.Error(), "logs transport down") {
t.Fatalf("expected log transport error, got %v", err)
}
}
func remotePodStateServer(t *testing.T, phase, reason, message string, logStatus int, logBody string) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"):
w.WriteHeader(http.StatusCreated)
case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/pods/"):
w.WriteHeader(http.StatusOK)
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/") && strings.HasSuffix(r.URL.Path, "/log"):
w.WriteHeader(logStatus)
_, _ = w.Write([]byte(logBody))
case r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/pods/"):
_ = json.NewEncoder(w).Encode(map[string]any{
"metadata": map[string]any{"name": filepath.Base(r.URL.Path)},
"status": map[string]any{
"phase": phase,
"reason": reason,
"message": message,
},
})
default:
http.NotFound(w, r)
}
}))
}