metis: upstream remote workflow resilience fixes
This commit is contained in:
parent
09f5cd7dac
commit
34ac4cc0f5
13
Jenkinsfile
vendored
13
Jenkinsfile
vendored
@ -210,11 +210,16 @@ PY
|
|||||||
quality_rc=0
|
quality_rc=0
|
||||||
if [ "${test_rc}" -eq 0 ]; then
|
if [ "${test_rc}" -eq 0 ]; then
|
||||||
set +e
|
set +e
|
||||||
cd testing
|
if [ -d testing ]; then
|
||||||
METIS_USE_EXISTING_COVERAGE=1 go test -v ./...
|
cd testing
|
||||||
quality_rc=$?
|
METIS_USE_EXISTING_COVERAGE=1 go test -v ./...
|
||||||
|
quality_rc=$?
|
||||||
|
cd "${WORKSPACE}"
|
||||||
|
else
|
||||||
|
echo "No testing/ directory present; skipping secondary quality suite."
|
||||||
|
quality_rc=0
|
||||||
|
fi
|
||||||
set -e
|
set -e
|
||||||
cd "${WORKSPACE}"
|
|
||||||
else
|
else
|
||||||
quality_rc=1
|
quality_rc=1
|
||||||
fi
|
fi
|
||||||
|
|||||||
@ -12,8 +12,10 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"metis/pkg/image"
|
||||||
"metis/pkg/plan"
|
"metis/pkg/plan"
|
||||||
"metis/pkg/service"
|
"metis/pkg/service"
|
||||||
"metis/pkg/writer"
|
"metis/pkg/writer"
|
||||||
@ -64,9 +66,45 @@ func remoteBuildCmd(args []string) {
|
|||||||
}
|
}
|
||||||
output := filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
|
output := filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
|
||||||
inv := loadInventory(*invPath)
|
inv := loadInventory(*invPath)
|
||||||
if err := plan.BuildImageFile(context.Background(), inv, *node, *cacheDir, output); err != nil {
|
emitStageProgress("build", 12, fmt.Sprintf("Resolving the replacement build plan for %s", *node))
|
||||||
log.Fatalf("build image: %v", err)
|
p, err := plan.Build(inv, *node, output, *cacheDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("build plan: %v", err)
|
||||||
}
|
}
|
||||||
|
_, class, err := inv.FindNode(*node)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("load node class: %v", err)
|
||||||
|
}
|
||||||
|
cacheImage := filepath.Join(*cacheDir, strings.TrimSuffix(filepath.Base(p.Image), ".xz"))
|
||||||
|
emitStageProgress("build", 16, fmt.Sprintf("Downloading and verifying the base image for %s", *node))
|
||||||
|
cacheImage, err = image.DownloadAndVerify(p.Image, cacheImage, class.Checksum)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("download image: %v", err)
|
||||||
|
}
|
||||||
|
copyEmitter := newProgressEmitter("build", 20, 34, fmt.Sprintf("Copying the verified base image for %s", *node), false)
|
||||||
|
if err := writer.WriteImageWithProgress(context.Background(), cacheImage, output, copyEmitter); err != nil {
|
||||||
|
log.Fatalf("copy base image: %v", err)
|
||||||
|
}
|
||||||
|
emitStageProgress("build", 36, fmt.Sprintf("Preparing node-specific injected files for %s", *node))
|
||||||
|
files, err := plan.Files(inv, *node)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("resolve files: %v", err)
|
||||||
|
}
|
||||||
|
rootfsProgress := map[string]service.RemoteProgressUpdate{
|
||||||
|
image.RootFSProgressFindingPartition: {Stage: "build", ProgressPct: 40, Message: fmt.Sprintf("Finding the Linux root partition for %s", *node)},
|
||||||
|
image.RootFSProgressExtracting: {Stage: "build", ProgressPct: 44, Message: fmt.Sprintf("Extracting the Linux root partition for %s", *node)},
|
||||||
|
image.RootFSProgressWritingFiles: {Stage: "build", ProgressPct: 50, Message: fmt.Sprintf("Injecting node-specific files into the root filesystem for %s", *node)},
|
||||||
|
image.RootFSProgressReplacing: {Stage: "build", ProgressPct: 56, Message: fmt.Sprintf("Replacing the root partition inside the replacement image for %s", *node)},
|
||||||
|
}
|
||||||
|
if err := image.InjectRootFSWithProgress(output, files, func(step string) {
|
||||||
|
if update, ok := rootfsProgress[step]; ok {
|
||||||
|
emitProgress(update)
|
||||||
|
}
|
||||||
|
}); err != nil {
|
||||||
|
log.Fatalf("inject rootfs: %v", err)
|
||||||
|
}
|
||||||
|
emitStageProgress("build", 58, fmt.Sprintf("Built the replacement image filesystem for %s", *node))
|
||||||
|
emitStageProgress("build", 60, fmt.Sprintf("Compressing the replacement image for %s before upload", *node))
|
||||||
if err := exec.Command("xz", "-T0", "-z", "-f", output).Run(); err != nil {
|
if err := exec.Command("xz", "-T0", "-z", "-f", output).Run(); err != nil {
|
||||||
log.Fatalf("xz compress: %v", err)
|
log.Fatalf("xz compress: %v", err)
|
||||||
}
|
}
|
||||||
@ -93,13 +131,17 @@ func remoteBuildCmd(args []string) {
|
|||||||
if err := os.WriteFile(metadataPath, metaBytes, 0o644); err != nil {
|
if err := os.WriteFile(metadataPath, metaBytes, 0o644); err != nil {
|
||||||
log.Fatalf("write metadata: %v", err)
|
log.Fatalf("write metadata: %v", err)
|
||||||
}
|
}
|
||||||
|
emitStageProgress("build", 68, fmt.Sprintf("Compression complete for %s; preparing the Harbor upload", *node))
|
||||||
|
emitStageProgress("build", 70, fmt.Sprintf("Authenticating to Harbor for %s", *node))
|
||||||
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
|
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
|
||||||
log.Fatalf("oras login: %v", err)
|
log.Fatalf("oras login: %v", err)
|
||||||
}
|
}
|
||||||
taggedRef := fmt.Sprintf("%s:%s", *artifactRef, *buildTag)
|
taggedRef := fmt.Sprintf("%s:%s", *artifactRef, *buildTag)
|
||||||
|
emitStageProgress("build", 72, fmt.Sprintf("Uploading %s to Harbor", filepath.Base(compressedPath)))
|
||||||
if err := orasPush(taggedRef, compressedPath, metadataPath); err != nil {
|
if err := orasPush(taggedRef, compressedPath, metadataPath); err != nil {
|
||||||
log.Fatalf("oras push: %v", err)
|
log.Fatalf("oras push: %v", err)
|
||||||
}
|
}
|
||||||
|
emitStageProgress("build", 76, fmt.Sprintf("Refreshing the latest Harbor tag for %s", *node))
|
||||||
if err := orasTag(taggedRef, "latest"); err != nil {
|
if err := orasTag(taggedRef, "latest"); err != nil {
|
||||||
log.Fatalf("oras tag latest: %v", err)
|
log.Fatalf("oras tag latest: %v", err)
|
||||||
}
|
}
|
||||||
@ -134,18 +176,21 @@ func remoteFlashCmd(args []string) {
|
|||||||
if err := os.MkdirAll(*workDir, 0o755); err != nil {
|
if err := os.MkdirAll(*workDir, 0o755); err != nil {
|
||||||
log.Fatalf("mkdir workdir: %v", err)
|
log.Fatalf("mkdir workdir: %v", err)
|
||||||
}
|
}
|
||||||
|
emitStageProgress("flash", 84, fmt.Sprintf("Pulling the latest Harbor artifact for %s", *node))
|
||||||
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
|
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
|
||||||
log.Fatalf("oras login: %v", err)
|
log.Fatalf("oras login: %v", err)
|
||||||
}
|
}
|
||||||
if err := orasPull(fmt.Sprintf("%s:latest", *artifactRef), *workDir); err != nil {
|
if err := orasPull(fmt.Sprintf("%s:latest", *artifactRef), *workDir); err != nil {
|
||||||
log.Fatalf("oras pull: %v", err)
|
log.Fatalf("oras pull: %v", err)
|
||||||
}
|
}
|
||||||
|
emitStageProgress("flash", 88, fmt.Sprintf("Preparing the downloaded image for %s", *node))
|
||||||
imagePath, compressed, err := resolvePulledArtifact(*workDir)
|
imagePath, compressed, err := resolvePulledArtifact(*workDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("resolve artifact: %v", err)
|
log.Fatalf("resolve artifact: %v", err)
|
||||||
}
|
}
|
||||||
rawImage := imagePath
|
rawImage := imagePath
|
||||||
if compressed {
|
if compressed {
|
||||||
|
emitStageProgress("flash", 90, fmt.Sprintf("Decompressing the image for %s before writing", *node))
|
||||||
rawImage = filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
|
rawImage = filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
|
||||||
cmd := exec.Command("sh", "-lc", fmt.Sprintf("xz -dc '%s' > '%s'", imagePath, rawImage))
|
cmd := exec.Command("sh", "-lc", fmt.Sprintf("xz -dc '%s' > '%s'", imagePath, rawImage))
|
||||||
if out, err := cmd.CombinedOutput(); err != nil {
|
if out, err := cmd.CombinedOutput(); err != nil {
|
||||||
@ -160,9 +205,12 @@ func remoteFlashCmd(args []string) {
|
|||||||
}
|
}
|
||||||
destPath = filepath.Join(*hostTmpDir, fmt.Sprintf("%s.img", *node))
|
destPath = filepath.Join(*hostTmpDir, fmt.Sprintf("%s.img", *node))
|
||||||
}
|
}
|
||||||
if err := writer.WriteImage(context.Background(), rawImage, destPath); err != nil {
|
emitStageProgress("flash", 92, fmt.Sprintf("Writing the latest image for %s to %s", *node, destPath))
|
||||||
|
writeEmitter := newProgressEmitter("flash", 92, 98, fmt.Sprintf("Writing the latest image for %s", *node), true)
|
||||||
|
if err := writer.WriteImageWithProgress(context.Background(), rawImage, destPath, writeEmitter); err != nil {
|
||||||
log.Fatalf("write image: %v", err)
|
log.Fatalf("write image: %v", err)
|
||||||
}
|
}
|
||||||
|
emitStageProgress("flash", 99, fmt.Sprintf("Flushing the finished image for %s", *node))
|
||||||
_ = exec.Command("sync").Run()
|
_ = exec.Command("sync").Run()
|
||||||
if strings.HasPrefix(destPath, "/dev/") {
|
if strings.HasPrefix(destPath, "/dev/") {
|
||||||
_ = exec.Command("blockdev", "--flushbufs", destPath).Run()
|
_ = exec.Command("blockdev", "--flushbufs", destPath).Run()
|
||||||
@ -193,6 +241,55 @@ func writeStructuredResult(payload any) {
|
|||||||
_ = os.WriteFile("/dev/termination-log", data, 0o644)
|
_ = os.WriteFile("/dev/termination-log", data, 0o644)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func emitStageProgress(stage string, progress float64, message string) {
|
||||||
|
emitProgress(service.RemoteProgressUpdate{
|
||||||
|
Stage: stage,
|
||||||
|
ProgressPct: progress,
|
||||||
|
Message: message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func emitProgress(update service.RemoteProgressUpdate) {
|
||||||
|
line := service.ProgressLogLine(update)
|
||||||
|
if strings.TrimSpace(line) == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Fprintln(os.Stdout, line)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProgressEmitter(stage string, minPct, maxPct float64, message string, includeBytes bool) writer.ProgressFunc {
|
||||||
|
var mu sync.Mutex
|
||||||
|
lastPct := minPct
|
||||||
|
lastEmit := time.Time{}
|
||||||
|
return func(written, total int64) {
|
||||||
|
if total <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pct := minPct + (float64(written)/float64(total))*(maxPct-minPct)
|
||||||
|
if pct > maxPct {
|
||||||
|
pct = maxPct
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
now := time.Now()
|
||||||
|
if pct-lastPct < 0.5 && now.Sub(lastEmit) < time.Second {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
update := service.RemoteProgressUpdate{
|
||||||
|
Stage: stage,
|
||||||
|
ProgressPct: pct,
|
||||||
|
Message: message,
|
||||||
|
}
|
||||||
|
if includeBytes {
|
||||||
|
update.WrittenBytes = written
|
||||||
|
update.TotalBytes = total
|
||||||
|
}
|
||||||
|
emitProgress(update)
|
||||||
|
lastPct = pct
|
||||||
|
lastEmit = now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) {
|
func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) {
|
||||||
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
|
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
|
||||||
out, err := cmd.Output()
|
out, err := cmd.Output()
|
||||||
|
|||||||
@ -1,6 +1,12 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "testing"
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
func TestOrasPushInvocationUsesRelativeWorkspacePaths(t *testing.T) {
|
func TestOrasPushInvocationUsesRelativeWorkspacePaths(t *testing.T) {
|
||||||
dir, args, err := orasPushInvocation("registry.bstein.dev/metis/titan-13:20260331t235724z", "/workspace/build/titan-13.img.xz", "/workspace/build/metadata.json")
|
dir, args, err := orasPushInvocation("registry.bstein.dev/metis/titan-13:20260331t235724z", "/workspace/build/titan-13.img.xz", "/workspace/build/metadata.json")
|
||||||
@ -32,3 +38,36 @@ func TestHumanHostPathMapsMountedTmpBackToHostTmp(t *testing.T) {
|
|||||||
t.Fatalf("expected /tmp/metis-flash-test, got %q", got)
|
t.Fatalf("expected /tmp/metis-flash-test, got %q", got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewProgressEmitterWritesStructuredMarker(t *testing.T) {
|
||||||
|
origStdout := os.Stdout
|
||||||
|
reader, writer, err := os.Pipe()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("pipe: %v", err)
|
||||||
|
}
|
||||||
|
os.Stdout = writer
|
||||||
|
defer func() {
|
||||||
|
os.Stdout = origStdout
|
||||||
|
}()
|
||||||
|
|
||||||
|
emitter := newProgressEmitter("flash", 92, 98, "Writing the latest image for titan-12", true)
|
||||||
|
emitter(1024, 2048)
|
||||||
|
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
t.Fatalf("close writer: %v", err)
|
||||||
|
}
|
||||||
|
var output bytes.Buffer
|
||||||
|
if _, err := io.Copy(&output, reader); err != nil {
|
||||||
|
t.Fatalf("read progress output: %v", err)
|
||||||
|
}
|
||||||
|
got := output.String()
|
||||||
|
if !strings.Contains(got, "METIS_PROGRESS ") {
|
||||||
|
t.Fatalf("expected structured progress prefix, got %q", got)
|
||||||
|
}
|
||||||
|
if !strings.Contains(got, `"stage":"flash"`) {
|
||||||
|
t.Fatalf("expected flash stage marker, got %q", got)
|
||||||
|
}
|
||||||
|
if !strings.Contains(got, `"written_bytes":1024`) || !strings.Contains(got, `"total_bytes":2048`) {
|
||||||
|
t.Fatalf("expected byte counters in progress marker, got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -29,9 +29,24 @@ type partitionTablePart struct {
|
|||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RootFSProgressFunc func(step string)
|
||||||
|
|
||||||
|
const (
|
||||||
|
RootFSProgressFindingPartition = "finding-partition"
|
||||||
|
RootFSProgressExtracting = "extracting-partition"
|
||||||
|
RootFSProgressWritingFiles = "writing-rootfs-files"
|
||||||
|
RootFSProgressReplacing = "replacing-partition"
|
||||||
|
)
|
||||||
|
|
||||||
// InjectRootFS rewrites the Linux root partition inside a raw image file without
|
// InjectRootFS rewrites the Linux root partition inside a raw image file without
|
||||||
// requiring block-device mounts. Only rootfs-targeted files are written.
|
// requiring block-device mounts. Only rootfs-targeted files are written.
|
||||||
func InjectRootFS(imagePath string, files []inject.FileSpec) error {
|
func InjectRootFS(imagePath string, files []inject.FileSpec) error {
|
||||||
|
return InjectRootFSWithProgress(imagePath, files, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InjectRootFSWithProgress emits coarse rootfs rewrite milestones for callers
|
||||||
|
// that want to surface build-stage progress in real time.
|
||||||
|
func InjectRootFSWithProgress(imagePath string, files []inject.FileSpec, progress RootFSProgressFunc) error {
|
||||||
rootFiles := make([]inject.FileSpec, 0, len(files))
|
rootFiles := make([]inject.FileSpec, 0, len(files))
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
if f.RootFS {
|
if f.RootFS {
|
||||||
@ -42,6 +57,7 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
emitRootFSProgress(progress, RootFSProgressFindingPartition)
|
||||||
part, sectorSize, err := findLinuxPartition(imagePath)
|
part, sectorSize, err := findLinuxPartition(imagePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -54,15 +70,24 @@ func InjectRootFS(imagePath string, files []inject.FileSpec) error {
|
|||||||
defer os.RemoveAll(workDir)
|
defer os.RemoveAll(workDir)
|
||||||
|
|
||||||
rootImage := filepath.Join(workDir, "root.ext4")
|
rootImage := filepath.Join(workDir, "root.ext4")
|
||||||
|
emitRootFSProgress(progress, RootFSProgressExtracting)
|
||||||
if err := extractPartition(imagePath, rootImage, part, sectorSize); err != nil {
|
if err := extractPartition(imagePath, rootImage, part, sectorSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
emitRootFSProgress(progress, RootFSProgressWritingFiles)
|
||||||
if err := writeExt4Files(rootImage, rootFiles); err != nil {
|
if err := writeExt4Files(rootImage, rootFiles); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
emitRootFSProgress(progress, RootFSProgressReplacing)
|
||||||
return replacePartition(imagePath, rootImage, part, sectorSize)
|
return replacePartition(imagePath, rootImage, part, sectorSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func emitRootFSProgress(progress RootFSProgressFunc, step string) {
|
||||||
|
if progress != nil {
|
||||||
|
progress(step)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func findLinuxPartition(imagePath string) (partitionTablePart, uint64, error) {
|
func findLinuxPartition(imagePath string) (partitionTablePart, uint64, error) {
|
||||||
out, err := exec.Command("sfdisk", "-J", imagePath).Output()
|
out, err := exec.Command("sfdisk", "-J", imagePath).Output()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -66,3 +66,23 @@ func TestParentDirs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInjectRootFSWithProgressSkipsWhenNoRootFiles(t *testing.T) {
|
||||||
|
steps := make([]string, 0)
|
||||||
|
err := InjectRootFSWithProgress("unused.img", []inject.FileSpec{
|
||||||
|
{
|
||||||
|
Path: "boot/env.txt",
|
||||||
|
Content: []byte("ignored"),
|
||||||
|
Mode: 0o644,
|
||||||
|
RootFS: false,
|
||||||
|
},
|
||||||
|
}, func(step string) {
|
||||||
|
steps = append(steps, step)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error when there are no rootfs files, got %v", err)
|
||||||
|
}
|
||||||
|
if len(steps) != 0 {
|
||||||
|
t.Fatalf("expected no progress callbacks, got %#v", steps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -212,7 +212,10 @@ func (a *App) Build(node string) (*Job, error) {
|
|||||||
if err := a.ensureReplacementReady(node); err != nil {
|
if err := a.ensureReplacementReady(node); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
job := a.newJob("build", node, "", "")
|
job, err := a.reserveJob("build", node, "", "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
go a.runBuild(job, false)
|
go a.runBuild(job, false)
|
||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
@ -228,7 +231,10 @@ func (a *App) Replace(node, host, device string) (*Job, error) {
|
|||||||
if _, err := a.ensureDevice(host, device); err != nil {
|
if _, err := a.ensureDevice(host, device); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
job := a.newJob("replace", node, host, device)
|
job, err := a.reserveJob("replace", node, host, device)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
go a.runBuild(job, true)
|
go a.runBuild(job, true)
|
||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
@ -332,6 +338,70 @@ func (a *App) newJob(kind, node, host, device string) *Job {
|
|||||||
return job
|
return job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type activeNodeJobError struct {
|
||||||
|
Node string
|
||||||
|
Kind string
|
||||||
|
JobID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *activeNodeJobError) Error() string {
|
||||||
|
if e == nil {
|
||||||
|
return "node already has an active metis job"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("node %s already has an active %s job (%s)", e.Node, e.Kind, e.JobID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) activeJobForNodeLocked(node string) *Job {
|
||||||
|
node = strings.TrimSpace(node)
|
||||||
|
if node == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var active *Job
|
||||||
|
for _, job := range a.jobs {
|
||||||
|
if job == nil || strings.TrimSpace(job.Node) != node {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if job.Status != JobQueued && job.Status != JobRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch job.Kind {
|
||||||
|
case "build", "replace":
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if active == nil || job.StartedAt.Before(active.StartedAt) {
|
||||||
|
active = job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if active == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
copyJob := *active
|
||||||
|
return ©Job
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) reserveJob(kind, node, host, device string) (*Job, error) {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
if active := a.activeJobForNodeLocked(node); active != nil {
|
||||||
|
return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID}
|
||||||
|
}
|
||||||
|
now := time.Now().UTC()
|
||||||
|
job := &Job{
|
||||||
|
ID: fmt.Sprintf("%d", now.UnixNano()),
|
||||||
|
Kind: kind,
|
||||||
|
Node: node,
|
||||||
|
Host: host,
|
||||||
|
Device: device,
|
||||||
|
Status: JobQueued,
|
||||||
|
ProgressPct: 0,
|
||||||
|
StartedAt: now,
|
||||||
|
UpdatedAt: now,
|
||||||
|
}
|
||||||
|
a.jobs[job.ID] = job
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) job(id string) *Job {
|
func (a *App) job(id string) *Job {
|
||||||
a.mu.RLock()
|
a.mu.RLock()
|
||||||
defer a.mu.RUnlock()
|
defer a.mu.RUnlock()
|
||||||
|
|||||||
27
pkg/service/app_job_test.go
Normal file
27
pkg/service/app_job_test.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReserveJobRejectsDuplicateActiveNodeJobs(t *testing.T) {
|
||||||
|
app := newTestApp(t)
|
||||||
|
active := app.newJob("replace", "titan-15", "titan-22", "/dev/sdk")
|
||||||
|
|
||||||
|
_, err := app.reserveJob("build", "titan-15", "", "")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected duplicate job reservation to fail")
|
||||||
|
}
|
||||||
|
var activeErr *activeNodeJobError
|
||||||
|
if !errors.As(err, &activeErr) {
|
||||||
|
t.Fatalf("expected activeNodeJobError, got %T", err)
|
||||||
|
}
|
||||||
|
if activeErr.JobID != active.ID || activeErr.Kind != "replace" {
|
||||||
|
t.Fatalf("unexpected active job conflict: %#v", activeErr)
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), active.ID) {
|
||||||
|
t.Fatalf("expected error to mention active job id %s, got %q", active.ID, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -37,6 +37,8 @@ type kubeClient struct {
|
|||||||
client *http.Client
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var kubeClientFactory = inClusterKubeClient
|
||||||
|
|
||||||
func inClusterKubeClient() (*kubeClient, error) {
|
func inClusterKubeClient() (*kubeClient, error) {
|
||||||
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
|
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
|
||||||
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
|
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
|
||||||
@ -118,7 +120,7 @@ func (k *kubeClient) deleteRequest(path string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func clusterNodes() []clusterNode {
|
func clusterNodes() []clusterNode {
|
||||||
kube, err := inClusterKubeClient()
|
kube, err := kubeClientFactory()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -152,6 +154,54 @@ func clusterNodes() []clusterNode {
|
|||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clusterActiveRemotePodLoads(namespace, run string) map[string]int {
|
||||||
|
kube, err := kubeClientFactory()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ns := url.PathEscape(strings.TrimSpace(namespace))
|
||||||
|
if ns == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
selector := "app=metis-remote"
|
||||||
|
if value := strings.TrimSpace(run); value != "" {
|
||||||
|
selector += ",metis-run=" + value
|
||||||
|
}
|
||||||
|
path := fmt.Sprintf("/api/v1/namespaces/%s/pods?labelSelector=%s", ns, url.QueryEscape(selector))
|
||||||
|
var payload struct {
|
||||||
|
Items []struct {
|
||||||
|
Metadata struct {
|
||||||
|
Labels map[string]string `json:"labels"`
|
||||||
|
} `json:"metadata"`
|
||||||
|
Spec struct {
|
||||||
|
NodeName string `json:"nodeName"`
|
||||||
|
} `json:"spec"`
|
||||||
|
Status struct {
|
||||||
|
Phase string `json:"phase"`
|
||||||
|
} `json:"status"`
|
||||||
|
} `json:"items"`
|
||||||
|
}
|
||||||
|
if err := kube.jsonRequest(http.MethodGet, path, nil, &payload); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
loads := map[string]int{}
|
||||||
|
for _, item := range payload.Items {
|
||||||
|
phase := strings.TrimSpace(item.Status.Phase)
|
||||||
|
if phase == "Succeeded" || phase == "Failed" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if value := strings.TrimSpace(run); value != "" && strings.TrimSpace(item.Metadata.Labels["metis-run"]) != value {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nodeName := strings.TrimSpace(item.Spec.NodeName)
|
||||||
|
if nodeName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
loads[nodeName]++
|
||||||
|
}
|
||||||
|
return loads
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) podImageForArch(arch string) string {
|
func (a *App) podImageForArch(arch string) string {
|
||||||
switch strings.TrimSpace(arch) {
|
switch strings.TrimSpace(arch) {
|
||||||
case "arm64":
|
case "arm64":
|
||||||
@ -164,7 +214,7 @@ func (a *App) podImageForArch(arch string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (string, error) {
|
func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (string, error) {
|
||||||
kube, err := inClusterKubeClient()
|
kube, err := kubeClientFactory()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -177,7 +227,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
deadline := time.Now().Add(12 * time.Minute)
|
timeout := time.Duration(a.settings.RemotePodTimeout) * time.Second
|
||||||
|
if timeout < 5*time.Minute {
|
||||||
|
timeout = 5 * time.Minute
|
||||||
|
}
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
lastState := podState{Name: podName}
|
lastState := podState{Name: podName}
|
||||||
for time.Now().Before(deadline) {
|
for time.Now().Before(deadline) {
|
||||||
state, err := a.remotePodState(kube, podName)
|
state, err := a.remotePodState(kube, podName)
|
||||||
@ -186,6 +240,11 @@ func (a *App) runRemotePod(jobID, podName string, podSpec map[string]any) (strin
|
|||||||
}
|
}
|
||||||
lastState = state
|
lastState = state
|
||||||
if strings.TrimSpace(jobID) != "" {
|
if strings.TrimSpace(jobID) != "" {
|
||||||
|
if logs, logErr := a.remotePodLogs(kube, podName); logErr == nil {
|
||||||
|
if update, ok := parseRemoteProgressLogs(logs); ok {
|
||||||
|
a.applyRemoteProgress(jobID, update)
|
||||||
|
}
|
||||||
|
}
|
||||||
a.heartbeatRemoteJob(jobID)
|
a.heartbeatRemoteJob(jobID)
|
||||||
}
|
}
|
||||||
switch state.Phase {
|
switch state.Phase {
|
||||||
|
|||||||
142
pkg/service/cluster_test.go
Normal file
142
pkg/service/cluster_test.go
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func kubeClientFactoryForURL(baseURL string, client *http.Client) *kubeClient {
|
||||||
|
return &kubeClient{
|
||||||
|
baseURL: baseURL,
|
||||||
|
token: "test-token",
|
||||||
|
client: client,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func installKubeFactory(t *testing.T, baseURL string, client *http.Client) {
|
||||||
|
t.Helper()
|
||||||
|
origFactory := kubeClientFactory
|
||||||
|
kubeClientFactory = func() (*kubeClient, error) {
|
||||||
|
return kubeClientFactoryForURL(baseURL, client), nil
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
kubeClientFactory = origFactory
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClusterActiveRemotePodLoadsCountsOnlyLivePods(t *testing.T) {
|
||||||
|
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodGet || r.URL.Path != "/api/v1/namespaces/maintenance/pods" {
|
||||||
|
http.NotFound(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if got := r.URL.Query().Get("labelSelector"); got != "app=metis-remote,metis-run=build" {
|
||||||
|
t.Fatalf("unexpected labelSelector %q", got)
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"items": []any{
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
|
||||||
|
"spec": map[string]any{"nodeName": "titan-04"},
|
||||||
|
"status": map[string]any{"phase": "Running"},
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
|
||||||
|
"spec": map[string]any{"nodeName": "titan-04"},
|
||||||
|
"status": map[string]any{"phase": "Pending"},
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
|
||||||
|
"spec": map[string]any{"nodeName": "titan-05"},
|
||||||
|
"status": map[string]any{"phase": "Succeeded"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer kube.Close()
|
||||||
|
|
||||||
|
installKubeFactory(t, kube.URL, kube.Client())
|
||||||
|
|
||||||
|
loads := clusterActiveRemotePodLoads("maintenance", "build")
|
||||||
|
if loads["titan-04"] != 2 {
|
||||||
|
t.Fatalf("expected titan-04 load 2, got %#v", loads)
|
||||||
|
}
|
||||||
|
if _, ok := loads["titan-05"]; ok {
|
||||||
|
t.Fatalf("expected succeeded pod to be ignored, got %#v", loads)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSelectBuilderHostAvoidsBusyBuilderWhenPeersAreFree(t *testing.T) {
|
||||||
|
kube := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch {
|
||||||
|
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/nodes":
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"items": []any{
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{
|
||||||
|
"name": "titan-04",
|
||||||
|
"labels": map[string]string{
|
||||||
|
"kubernetes.io/arch": "arm64",
|
||||||
|
"hardware": "rpi5",
|
||||||
|
"node-role.kubernetes.io/worker": "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"spec": map[string]any{"unschedulable": false},
|
||||||
|
},
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{
|
||||||
|
"name": "titan-05",
|
||||||
|
"labels": map[string]string{
|
||||||
|
"kubernetes.io/arch": "arm64",
|
||||||
|
"hardware": "rpi5",
|
||||||
|
"node-role.kubernetes.io/worker": "true",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"spec": map[string]any{"unschedulable": false},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
case r.Method == http.MethodGet && r.URL.Path == "/api/v1/namespaces/maintenance/pods":
|
||||||
|
selector := r.URL.Query().Get("labelSelector")
|
||||||
|
var items []any
|
||||||
|
switch selector {
|
||||||
|
case "app=metis-remote,metis-run=build":
|
||||||
|
items = []any{
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
|
||||||
|
"spec": map[string]any{"nodeName": "titan-04"},
|
||||||
|
"status": map[string]any{"phase": "Running"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
case "app=metis-remote":
|
||||||
|
items = []any{
|
||||||
|
map[string]any{
|
||||||
|
"metadata": map[string]any{"labels": map[string]string{"app": "metis-remote", "metis-run": "build"}},
|
||||||
|
"spec": map[string]any{"nodeName": "titan-04"},
|
||||||
|
"status": map[string]any{"phase": "Running"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected labelSelector %q", selector)
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{"items": items})
|
||||||
|
default:
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer kube.Close()
|
||||||
|
|
||||||
|
installKubeFactory(t, kube.URL, kube.Client())
|
||||||
|
|
||||||
|
app := newTestApp(t)
|
||||||
|
app.settings.Namespace = "maintenance"
|
||||||
|
node, err := app.selectBuilderHost("arm64", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("selectBuilderHost: %v", err)
|
||||||
|
}
|
||||||
|
if node.Name != "titan-05" {
|
||||||
|
t.Fatalf("expected titan-05 builder, got %s", node.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -270,6 +270,17 @@ func (a *App) heartbeatRemoteJob(jobID string) {
|
|||||||
}
|
}
|
||||||
j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host)
|
j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host)
|
||||||
case "flash":
|
case "flash":
|
||||||
|
if j.Total > 0 && j.Written > 0 {
|
||||||
|
actual := 88 + (float64(j.Written)/float64(j.Total))*10
|
||||||
|
if actual > 98 {
|
||||||
|
actual = 98
|
||||||
|
}
|
||||||
|
if actual > j.ProgressPct {
|
||||||
|
j.ProgressPct = actual
|
||||||
|
}
|
||||||
|
j.Message = fmt.Sprintf("Writing %s of %s on %s", humanBytes(j.Written), humanBytes(j.Total), j.Host)
|
||||||
|
return
|
||||||
|
}
|
||||||
progress, message := flashStageHeartbeat(j.Host, j.Artifact, elapsed)
|
progress, message := flashStageHeartbeat(j.Host, j.Artifact, elapsed)
|
||||||
if progress > j.ProgressPct {
|
if progress > j.ProgressPct {
|
||||||
j.ProgressPct = progress
|
j.ProgressPct = progress
|
||||||
@ -351,6 +362,8 @@ func (a *App) ensureDevice(host, path string) (*Device, error) {
|
|||||||
|
|
||||||
func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) {
|
func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) {
|
||||||
nodes := clusterNodes()
|
nodes := clusterNodes()
|
||||||
|
activeBuilds := clusterActiveRemotePodLoads(a.settings.Namespace, "build")
|
||||||
|
activeRemotePods := clusterActiveRemotePodLoads(a.settings.Namespace, "")
|
||||||
storageNodes := map[string]struct{}{}
|
storageNodes := map[string]struct{}{}
|
||||||
for _, node := range a.inventory.Nodes {
|
for _, node := range a.inventory.Nodes {
|
||||||
if len(node.LonghornDisks) > 0 {
|
if len(node.LonghornDisks) > 0 {
|
||||||
@ -389,6 +402,12 @@ func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) {
|
|||||||
if flashHost != "" && node.Name == flashHost {
|
if flashHost != "" && node.Name == flashHost {
|
||||||
score += 5
|
score += 5
|
||||||
}
|
}
|
||||||
|
if count := activeBuilds[node.Name]; count > 0 {
|
||||||
|
score -= 100 * count
|
||||||
|
}
|
||||||
|
if count := activeRemotePods[node.Name]; count > 0 {
|
||||||
|
score -= 15 * count
|
||||||
|
}
|
||||||
candidates = append(candidates, scored{node: node, score: score})
|
candidates = append(candidates, scored{node: node, score: score})
|
||||||
}
|
}
|
||||||
sort.Slice(candidates, func(i, j int) bool {
|
sort.Slice(candidates, func(i, j int) bool {
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -43,3 +44,76 @@ func TestFlashStageHeartbeatProgresses(t *testing.T) {
|
|||||||
t.Fatalf("expected flushing message, got %q", m3)
|
t.Fatalf("expected flushing message, got %q", m3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseRemoteProgressLogsFindsLatestMarker(t *testing.T) {
|
||||||
|
logs := strings.Join([]string{
|
||||||
|
"plain log line",
|
||||||
|
ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 44, Message: "extracting"}),
|
||||||
|
ProgressLogLine(RemoteProgressUpdate{Stage: "build", ProgressPct: 72, Message: "uploading"}),
|
||||||
|
}, "\n")
|
||||||
|
|
||||||
|
update, ok := parseRemoteProgressLogs(logs)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected to parse remote progress logs")
|
||||||
|
}
|
||||||
|
if update.Stage != "build" || update.ProgressPct != 72 || update.Message != "uploading" {
|
||||||
|
t.Fatalf("unexpected update: %#v", update)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyRemoteProgressUpdatesRunningJob(t *testing.T) {
|
||||||
|
app := newTestApp(t)
|
||||||
|
job := app.newJob("build", "titan-15", "", "")
|
||||||
|
app.setJob(job.ID, func(j *Job) {
|
||||||
|
j.Status = JobRunning
|
||||||
|
j.Stage = "build"
|
||||||
|
j.StageStartedAt = time.Now().Add(-10 * time.Second)
|
||||||
|
j.ProgressPct = 30
|
||||||
|
})
|
||||||
|
|
||||||
|
app.applyRemoteProgress(job.ID, RemoteProgressUpdate{
|
||||||
|
Stage: "flash",
|
||||||
|
ProgressPct: 95,
|
||||||
|
Message: "writing image",
|
||||||
|
WrittenBytes: 1024,
|
||||||
|
TotalBytes: 2048,
|
||||||
|
})
|
||||||
|
|
||||||
|
got := app.job(job.ID)
|
||||||
|
if got.Stage != "flash" {
|
||||||
|
t.Fatalf("expected stage flash, got %q", got.Stage)
|
||||||
|
}
|
||||||
|
if got.ProgressPct != 95 {
|
||||||
|
t.Fatalf("expected progress 95, got %v", got.ProgressPct)
|
||||||
|
}
|
||||||
|
if got.Message != "writing image" {
|
||||||
|
t.Fatalf("expected progress message to update, got %q", got.Message)
|
||||||
|
}
|
||||||
|
if got.Written != 1024 || got.Total != 2048 {
|
||||||
|
t.Fatalf("expected byte counters to update, got written=%d total=%d", got.Written, got.Total)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatRemoteJobUsesActualFlashBytes(t *testing.T) {
|
||||||
|
app := newTestApp(t)
|
||||||
|
job := app.newJob("replace", "titan-15", "titan-22", "/dev/sdk")
|
||||||
|
app.setJob(job.ID, func(j *Job) {
|
||||||
|
j.Status = JobRunning
|
||||||
|
j.Stage = "flash"
|
||||||
|
j.StageStartedAt = time.Now().Add(-15 * time.Second)
|
||||||
|
j.ProgressPct = 88
|
||||||
|
j.Written = 512
|
||||||
|
j.Total = 1024
|
||||||
|
})
|
||||||
|
|
||||||
|
app.heartbeatRemoteJob(job.ID)
|
||||||
|
|
||||||
|
got := app.job(job.ID)
|
||||||
|
if got.ProgressPct <= 92 || got.ProgressPct > 98 {
|
||||||
|
t.Fatalf("expected actual write progress between 92 and 98, got %v", got.ProgressPct)
|
||||||
|
}
|
||||||
|
expected := fmt.Sprintf("Writing %s of %s on %s", humanBytes(512), humanBytes(1024), "titan-22")
|
||||||
|
if got.Message != expected {
|
||||||
|
t.Fatalf("expected %q, got %q", expected, got.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
77
pkg/service/remote_status.go
Normal file
77
pkg/service/remote_status.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const progressLogPrefix = "METIS_PROGRESS "
|
||||||
|
|
||||||
|
// RemoteProgressUpdate is emitted by remote workers so the UI can show
|
||||||
|
// concrete stage transitions instead of relying only on elapsed-time guesses.
|
||||||
|
type RemoteProgressUpdate struct {
|
||||||
|
Stage string `json:"stage,omitempty"`
|
||||||
|
ProgressPct float64 `json:"progress_pct,omitempty"`
|
||||||
|
Message string `json:"message,omitempty"`
|
||||||
|
WrittenBytes int64 `json:"written_bytes,omitempty"`
|
||||||
|
TotalBytes int64 `json:"total_bytes,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProgressLogLine formats a progress update for remote worker stdout.
|
||||||
|
func ProgressLogLine(update RemoteProgressUpdate) string {
|
||||||
|
data, err := json.Marshal(update)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return progressLogPrefix + string(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRemoteProgressLogs(logs string) (RemoteProgressUpdate, bool) {
|
||||||
|
scanner := bufio.NewScanner(strings.NewReader(logs))
|
||||||
|
scanner.Buffer(make([]byte, 0, 4096), 1<<20)
|
||||||
|
var latest RemoteProgressUpdate
|
||||||
|
found := false
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := strings.TrimSpace(scanner.Text())
|
||||||
|
if !strings.HasPrefix(line, progressLogPrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
raw := strings.TrimSpace(strings.TrimPrefix(line, progressLogPrefix))
|
||||||
|
var update RemoteProgressUpdate
|
||||||
|
if err := json.Unmarshal([]byte(raw), &update); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
latest = update
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
return latest, found
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) applyRemoteProgress(jobID string, update RemoteProgressUpdate) {
|
||||||
|
if strings.TrimSpace(jobID) == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.setJob(jobID, func(j *Job) {
|
||||||
|
if j == nil || j.Status != JobRunning {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if stage := strings.TrimSpace(update.Stage); stage != "" && stage != j.Stage {
|
||||||
|
j.Stage = stage
|
||||||
|
j.StageStartedAt = time.Now().UTC()
|
||||||
|
}
|
||||||
|
if update.ProgressPct > j.ProgressPct {
|
||||||
|
j.ProgressPct = update.ProgressPct
|
||||||
|
}
|
||||||
|
if message := strings.TrimSpace(update.Message); message != "" {
|
||||||
|
j.Message = message
|
||||||
|
}
|
||||||
|
if update.WrittenBytes > 0 {
|
||||||
|
j.Written = update.WrittenBytes
|
||||||
|
}
|
||||||
|
if update.TotalBytes > 0 {
|
||||||
|
j.Total = update.TotalBytes
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"html/template"
|
"html/template"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
@ -99,7 +100,7 @@ func (a *App) handleBuild(w http.ResponseWriter, r *http.Request) {
|
|||||||
node := values["node"]
|
node := values["node"]
|
||||||
job, err := a.Build(node)
|
job, err := a.Build(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), actionErrorStatus(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusAccepted, job)
|
writeJSON(w, http.StatusAccepted, job)
|
||||||
@ -116,12 +117,20 @@ func (a *App) handleReplace(w http.ResponseWriter, r *http.Request) {
|
|||||||
device := values["device"]
|
device := values["device"]
|
||||||
job, err := a.Replace(node, host, device)
|
job, err := a.Replace(node, host, device)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
http.Error(w, err.Error(), actionErrorStatus(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusAccepted, job)
|
writeJSON(w, http.StatusAccepted, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func actionErrorStatus(err error) int {
|
||||||
|
var activeErr *activeNodeJobError
|
||||||
|
if errors.As(err, &activeErr) {
|
||||||
|
return http.StatusConflict
|
||||||
|
}
|
||||||
|
return http.StatusBadRequest
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) handleWatch(w http.ResponseWriter, r *http.Request) {
|
func (a *App) handleWatch(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
|||||||
@ -206,6 +206,26 @@ func TestRequestValuesJSONBody(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHandleBuildReturnsConflictForActiveNodeJob(t *testing.T) {
|
||||||
|
app := newTestApp(t)
|
||||||
|
app.newJob("replace", "titan-15", "titan-22", "/dev/sdk")
|
||||||
|
handler := app.Handler()
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/api/jobs/build", strings.NewReader(`{"node":"titan-15"}`))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("X-Auth-Request-User", "brad")
|
||||||
|
req.Header.Set("X-Auth-Request-Groups", "admin")
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(resp, req)
|
||||||
|
|
||||||
|
if resp.Code != http.StatusConflict {
|
||||||
|
t.Fatalf("expected conflict, got %d: %s", resp.Code, resp.Body.String())
|
||||||
|
}
|
||||||
|
if !strings.Contains(resp.Body.String(), "already has an active replace job") {
|
||||||
|
t.Fatalf("expected active job message, got %q", resp.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newTestApp(t *testing.T) *App {
|
func newTestApp(t *testing.T) *App {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|||||||
@ -31,6 +31,7 @@ type Settings struct {
|
|||||||
HarborUsername string
|
HarborUsername string
|
||||||
HarborPassword string
|
HarborPassword string
|
||||||
HostTmpDir string
|
HostTmpDir string
|
||||||
|
RemotePodTimeout int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromEnv builds service settings with sensible defaults for local dev and in-cluster use.
|
// FromEnv builds service settings with sensible defaults for local dev and in-cluster use.
|
||||||
@ -62,6 +63,7 @@ func FromEnv() Settings {
|
|||||||
HarborUsername: getenvDefault("METIS_HARBOR_USERNAME", ""),
|
HarborUsername: getenvDefault("METIS_HARBOR_USERNAME", ""),
|
||||||
HarborPassword: getenvDefault("METIS_HARBOR_PASSWORD", ""),
|
HarborPassword: getenvDefault("METIS_HARBOR_PASSWORD", ""),
|
||||||
HostTmpDir: getenvDefault("METIS_HOST_TMP_DIR", "/tmp/metis-flash-test"),
|
HostTmpDir: getenvDefault("METIS_HOST_TMP_DIR", "/tmp/metis-flash-test"),
|
||||||
|
RemotePodTimeout: getenvInt64("METIS_REMOTE_POD_TIMEOUT_SEC", 1800),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
30
pkg/service/settings_test.go
Normal file
30
pkg/service/settings_test.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFromEnvIncludesRemotePodTimeout(t *testing.T) {
|
||||||
|
dataDir := filepath.Join(t.TempDir(), "data")
|
||||||
|
t.Setenv("METIS_DATA_DIR", dataDir)
|
||||||
|
t.Setenv("METIS_FLASH_HOSTS", "titan-22, titan-24")
|
||||||
|
t.Setenv("METIS_REMOTE_POD_TIMEOUT_SEC", "1800")
|
||||||
|
t.Setenv("METIS_DEFAULT_FLASH_HOST", "titan-22")
|
||||||
|
t.Setenv("METIS_LOCAL_HOST", "titan-iac")
|
||||||
|
|
||||||
|
settings := FromEnv()
|
||||||
|
if settings.CacheDir != filepath.Join(dataDir, "cache") {
|
||||||
|
t.Fatalf("expected cache dir under data dir, got %q", settings.CacheDir)
|
||||||
|
}
|
||||||
|
if settings.RemotePodTimeout != 1800 {
|
||||||
|
t.Fatalf("expected RemotePodTimeout=1800, got %d", settings.RemotePodTimeout)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(settings.FlashHosts, []string{"titan-22", "titan-24"}) {
|
||||||
|
t.Fatalf("unexpected flash hosts: %#v", settings.FlashHosts)
|
||||||
|
}
|
||||||
|
if settings.DefaultFlashHost != "titan-22" || settings.LocalHost != "titan-iac" {
|
||||||
|
t.Fatalf("unexpected local/default host settings: %+v", settings)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user