diff --git a/pkg/service/app.go b/pkg/service/app.go index 2f8a868..6717e1f 100644 --- a/pkg/service/app.go +++ b/pkg/service/app.go @@ -210,7 +210,13 @@ func (a *App) Build(node string) (*Job, error) { if err := a.ensureReplacementReady(node); err != nil { return nil, err } - job := a.newJob("build", node, "", "") + if active := a.activeJobForNode(node); active != nil { + return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID} + } + job, err := a.reserveJob("build", node, "", "") + if err != nil { + return nil, err + } go a.runBuild(job, false) return job, nil } @@ -223,10 +229,16 @@ func (a *App) Replace(node, host, device string) (*Job, error) { if err := a.ensureReplacementReady(node); err != nil { return nil, err } + if active := a.activeJobForNode(node); active != nil { + return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID} + } if _, err := a.ensureDevice(host, device); err != nil { return nil, err } - job := a.newJob("replace", node, host, device) + job, err := a.reserveJob("replace", node, host, device) + if err != nil { + return nil, err + } go a.runBuild(job, true) return job, nil } diff --git a/pkg/service/app_helpers.go b/pkg/service/app_helpers.go index 7081221..613e2a1 100644 --- a/pkg/service/app_helpers.go +++ b/pkg/service/app_helpers.go @@ -34,6 +34,76 @@ func (a *App) newJob(kind, node, host, device string) *Job { return job } +type activeNodeJobError struct { + Node string + Kind string + JobID string +} + +func (e *activeNodeJobError) Error() string { + if e == nil { + return "node already has an active metis job" + } + return fmt.Sprintf("node %s already has an active %s job (%s)", e.Node, e.Kind, e.JobID) +} + +func (a *App) activeJobForNode(node string) *Job { + a.mu.RLock() + defer a.mu.RUnlock() + return a.activeJobForNodeLocked(node) +} + +func (a *App) activeJobForNodeLocked(node string) *Job { + node = strings.TrimSpace(node) + if node == "" { + return nil + } + var active *Job + for _, job := range a.jobs { + if job == nil || strings.TrimSpace(job.Node) != node { + continue + } + if job.Status != JobQueued && job.Status != JobRunning { + continue + } + switch job.Kind { + case "build", "replace": + default: + continue + } + if active == nil || job.StartedAt.Before(active.StartedAt) { + active = job + } + } + if active == nil { + return nil + } + copyJob := *active + return ©Job +} + +func (a *App) reserveJob(kind, node, host, device string) (*Job, error) { + a.mu.Lock() + defer a.mu.Unlock() + if active := a.activeJobForNodeLocked(node); active != nil { + return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID} + } + now := time.Now().UTC() + job := &Job{ + ID: fmt.Sprintf("%d", now.UnixNano()), + Kind: kind, + Node: node, + Host: host, + Device: device, + Status: JobQueued, + ProgressPct: 0, + StartedAt: now, + UpdatedAt: now, + } + a.jobs[job.ID] = job + return job, nil +} + func (a *App) job(id string) *Job { a.mu.RLock() defer a.mu.RUnlock() diff --git a/pkg/service/cluster.go b/pkg/service/cluster.go index e1612b7..5561068 100644 --- a/pkg/service/cluster.go +++ b/pkg/service/cluster.go @@ -158,6 +158,54 @@ func clusterNodes() []clusterNode { return nodes } +func clusterActiveRemotePodLoads(namespace, run string) map[string]int { + kube, err := kubeClientFactory() + if err != nil { + return nil + } + ns := url.PathEscape(strings.TrimSpace(namespace)) + if ns == "" { + return nil + } + selector := "app=metis-remote" + if value := strings.TrimSpace(run); value != "" { + selector += ",metis-run=" + value + } + path := fmt.Sprintf("/api/v1/namespaces/%s/pods?labelSelector=%s", ns, url.QueryEscape(selector)) + var payload struct { + Items []struct { + Metadata struct { + Labels map[string]string `json:"labels"` + } `json:"metadata"` + Spec struct { + NodeName string `json:"nodeName"` + } `json:"spec"` + Status struct { + Phase string `json:"phase"` + } `json:"status"` + } `json:"items"` + } + if err := kube.jsonRequest(http.MethodGet, path, nil, &payload); err != nil { + return nil + } + loads := map[string]int{} + for _, item := range payload.Items { + phase := strings.TrimSpace(item.Status.Phase) + if phase == "Succeeded" || phase == "Failed" { + continue + } + if value := strings.TrimSpace(run); value != "" && strings.TrimSpace(item.Metadata.Labels["metis-run"]) != value { + continue + } + nodeName := strings.TrimSpace(item.Spec.NodeName) + if nodeName == "" { + continue + } + loads[nodeName]++ + } + return loads +} + func (a *App) podImageForArch(arch string) string { switch strings.TrimSpace(arch) { case "arm64": diff --git a/pkg/service/cluster_test.go b/pkg/service/cluster_test.go index 0ca3c5d..b3e7ba6 100644 --- a/pkg/service/cluster_test.go +++ b/pkg/service/cluster_test.go @@ -121,3 +121,50 @@ func TestDeleteNodeObjectFallback(t *testing.T) { t.Fatalf("deleteNodeObject fallback: %v", err) } } + +func TestClusterActiveRemotePodLoadsCountsOnlyLivePods(t *testing.T) { + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": + if got := r.URL.Query().Get("labelSelector"); got != "app=metis-remote,metis-run=build" { + t.Fatalf("unexpected labelSelector %q", got) + } + _ = json.NewEncoder(w).Encode(map[string]any{ + "items": []any{ + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "titan-04"}, + "status": map[string]any{"phase": "Running"}, + }, + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "titan-04"}, + "status": map[string]any{"phase": "Pending"}, + }, + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "titan-05"}, + "status": map[string]any{"phase": "Succeeded"}, + }, + }, + }) + default: + http.NotFound(w, r) + } + })) + defer kube.Close() + + origFactory := kubeClientFactory + kubeClientFactory = func() (*kubeClient, error) { + return kubeClientFactoryForURL(kube.URL, kube.Client()), nil + } + t.Cleanup(func() { kubeClientFactory = origFactory }) + + loads := clusterActiveRemotePodLoads("maintenance", "build") + if loads["titan-04"] != 2 { + t.Fatalf("expected titan-04 load 2, got %#v", loads) + } + if _, ok := loads["titan-05"]; ok { + t.Fatalf("expected succeeded pod to be ignored, got %#v", loads) + } +} diff --git a/pkg/service/helpers_test.go b/pkg/service/helpers_test.go index c62bf3f..a6e6a57 100644 --- a/pkg/service/helpers_test.go +++ b/pkg/service/helpers_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" @@ -125,6 +126,17 @@ func TestAppJobDeviceAndStateHelpers(t *testing.T) { } } +func TestBuildAndReplaceRejectDuplicateActiveNodeJobs(t *testing.T) { + app := newTestApp(t) + active := app.newJob("replace", "titan-15", "titan-22", "/dev/sdz") + if _, err := app.Build("titan-15"); err == nil || !strings.Contains(err.Error(), active.ID) { + t.Fatalf("expected build conflict mentioning %s, got %v", active.ID, err) + } + if _, err := app.Replace("titan-15", "titan-22", "/dev/sdz"); err == nil || !strings.Contains(err.Error(), "active replace job") { + t.Fatalf("expected replace conflict, got %v", err) + } +} + func TestAppPersistenceAndTargets(t *testing.T) { dir := t.TempDir() invPath := filepath.Join(dir, "inventory.yaml") diff --git a/pkg/service/remote_helpers.go b/pkg/service/remote_helpers.go index 6361408..c2e9e41 100644 --- a/pkg/service/remote_helpers.go +++ b/pkg/service/remote_helpers.go @@ -81,6 +81,8 @@ func (a *App) ensureDevice(host, path string) (*Device, error) { func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) { nodes := clusterNodes() + activeBuilds := clusterActiveRemotePodLoads(a.settings.Namespace, "build") + activeRemotePods := clusterActiveRemotePodLoads(a.settings.Namespace, "") storageNodes := map[string]struct{}{} for _, node := range a.inventory.Nodes { if len(node.LonghornDisks) > 0 { @@ -119,6 +121,12 @@ func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) { if flashHost != "" && node.Name == flashHost { score += 5 } + if count := activeBuilds[node.Name]; count > 0 { + score -= 100 * count + } + if count := activeRemotePods[node.Name]; count > 0 { + score -= 15 * count + } candidates = append(candidates, scored{node: node, score: score}) } sort.Slice(candidates, func(i, j int) bool { diff --git a/pkg/service/remote_helpers_test.go b/pkg/service/remote_helpers_test.go index ceeb922..f7a9557 100644 --- a/pkg/service/remote_helpers_test.go +++ b/pkg/service/remote_helpers_test.go @@ -1,6 +1,9 @@ package service import ( + "encoding/json" + "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -66,3 +69,60 @@ func TestSelectBuilderHostErrorBranch(t *testing.T) { t.Fatal("expected selectBuilderHost error") } } + +func TestSelectBuilderHostAvoidsBusyBuilderWhenPeersAreFree(t *testing.T) { + kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes": + _ = json.NewEncoder(w).Encode(map[string]any{ + "items": []any{ + map[string]any{ + "metadata": map[string]any{ + "name": "titan-04", + "labels": map[string]string{ + "kubernetes.io/arch": "arm64", + "hardware": "rpi5", + "node-role.kubernetes.io/worker": "true", + }, + }, + "spec": map[string]any{"unschedulable": false}, + }, + map[string]any{ + "metadata": map[string]any{ + "name": "titan-05", + "labels": map[string]string{ + "kubernetes.io/arch": "arm64", + "hardware": "rpi5", + "node-role.kubernetes.io/worker": "true", + }, + }, + "spec": map[string]any{"unschedulable": false}, + }, + }, + }) + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": + _ = json.NewEncoder(w).Encode(map[string]any{ + "items": []any{ + map[string]any{ + "metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}}, + "spec": map[string]any{"nodeName": "titan-04"}, + "status": map[string]any{"phase": "Running"}, + }, + }, + }) + default: + http.NotFound(w, r) + } + })) + defer kube.Close() + installKubeFactory(t, kube) + app := newTestApp(t) + app.settings.Namespace = "maintenance" + node, err := app.selectBuilderHost("arm64", "") + if err != nil { + t.Fatalf("selectBuilderHost: %v", err) + } + if node.Name != "titan-05" { + t.Fatalf("expected titan-05 builder, got %s", node.Name) + } +} diff --git a/pkg/service/server.go b/pkg/service/server.go index a8e335a..6f603f6 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -2,6 +2,7 @@ package service import ( "encoding/json" + "errors" "html/template" "net/http" "strings" @@ -99,7 +100,7 @@ func (a *App) handleBuild(w http.ResponseWriter, r *http.Request) { node := values["node"] job, err := a.Build(node) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), statusForJobError(err)) return } writeJSON(w, http.StatusAccepted, job) @@ -116,12 +117,20 @@ func (a *App) handleReplace(w http.ResponseWriter, r *http.Request) { device := values["device"] job, err := a.Replace(node, host, device) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, err.Error(), statusForJobError(err)) return } writeJSON(w, http.StatusAccepted, job) } +func statusForJobError(err error) int { + var conflict *activeNodeJobError + if errors.As(err, &conflict) { + return http.StatusConflict + } + return http.StatusBadRequest +} + func (a *App) handleWatch(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) diff --git a/pkg/service/server_test.go b/pkg/service/server_test.go index a14fb8e..edfbd59 100644 --- a/pkg/service/server_test.go +++ b/pkg/service/server_test.go @@ -229,6 +229,21 @@ func TestRequestValuesFormAndAuthHelpers(t *testing.T) { } } +func TestHandleBuildReturnsConflictForDuplicateActiveNodeJob(t *testing.T) { + app := newTestApp(t) + app.newJob("build", "titan-15", "", "") + handler := app.Handler() + req := httptest.NewRequest(http.MethodPost, "/api/jobs/build", strings.NewReader(`{"node":"titan-15"}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Auth-Request-User", "brad") + req.Header.Set("X-Auth-Request-Groups", "admin") + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + if resp.Code != http.StatusConflict { + t.Fatalf("expected conflict, got %d: %s", resp.Code, resp.Body.String()) + } +} + func TestHTTPHandlersExerciseErrorBranches(t *testing.T) { kube := fakeKubeServer(t) installKubeFactory(t, kube) diff --git a/pkg/service/workflow_test.go b/pkg/service/workflow_test.go index d2733b5..83dd4c4 100644 --- a/pkg/service/workflow_test.go +++ b/pkg/service/workflow_test.go @@ -166,6 +166,8 @@ func fakeKubeServer(t *testing.T) *httptest.Server { }, }, }) + case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods": + _ = json.NewEncoder(w).Encode(map[string]any{"items": []any{}}) case r.Method == http.MethodPost && strings.Contains(r.URL.Path, "/pods"): w.WriteHeader(http.StatusCreated) case r.Method == http.MethodDelete && strings.Contains(r.URL.Path, "/pods/"):