metis/cmd/metis/remote_cmd.go

363 lines
11 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"metis/pkg/image"
"metis/pkg/plan"
"metis/pkg/service"
"metis/pkg/writer"
)
func remoteDevicesCmd(args []string) {
fs := flag.NewFlagSet("remote-devices", flag.ExitOnError)
maxBytes := fs.Int64("max-device-bytes", 300000000000, "max real removable device size")
hostTmpDir := fs.String("host-tmp-dir", "/var/tmp/metis-flash-test", "host tmp dir for test writes")
fs.Parse(args)
devices, err := localFlashDevices(*maxBytes, *hostTmpDir)
if err != nil {
fatalf("remote devices: %v", err)
}
sort.Slice(devices, func(i, j int) bool {
left := localDeviceScore(devices[i])
right := localDeviceScore(devices[j])
if left != right {
return left > right
}
if devices[i].SizeBytes != devices[j].SizeBytes {
return devices[i].SizeBytes < devices[j].SizeBytes
}
return devices[i].Path < devices[j].Path
})
writeStructuredResult(map[string]any{"devices": devices})
}
func remoteBuildCmd(args []string) {
fs := flag.NewFlagSet("remote-build", flag.ExitOnError)
invPath := fs.String("inventory", "inventory.yaml", "inventory file")
node := fs.String("node", "", "target node")
cacheDir := fs.String("cache", filepath.Join(os.TempDir(), "metis-cache"), "image cache dir")
workDir := fs.String("work-dir", filepath.Join(os.TempDir(), "metis-work"), "working directory")
artifactRef := fs.String("artifact-ref", "", "harbor artifact ref without tag")
buildTag := fs.String("build-tag", "", "artifact build tag")
harborRegistry := fs.String("harbor-registry", getenvOr("METIS_HARBOR_REGISTRY", "registry.bstein.dev"), "harbor registry host")
harborUsername := fs.String("harbor-username", getenvOr("METIS_HARBOR_USERNAME", ""), "harbor username")
harborPassword := fs.String("harbor-password", getenvOr("METIS_HARBOR_PASSWORD", ""), "harbor password")
fs.Parse(args)
if *node == "" || *artifactRef == "" || *buildTag == "" {
fatalf("--node, --artifact-ref, and --build-tag are required")
}
if err := os.MkdirAll(*workDir, 0o755); err != nil {
fatalf("mkdir workdir: %v", err)
}
output := filepath.Join(*workDir, fmt.Sprintf("%s.img", *node))
inv := loadInventory(*invPath)
emitStageProgress("build", 12, fmt.Sprintf("Resolving the replacement build plan for %s", *node))
p, err := plan.Build(inv, *node, output, *cacheDir)
if err != nil {
fatalf("build plan: %v", err)
}
_, class, err := inv.FindNode(*node)
if err != nil {
fatalf("load node class: %v", err)
}
cacheImage := filepath.Join(*cacheDir, strings.TrimSuffix(filepath.Base(p.Image), ".xz"))
emitStageProgress("build", 16, fmt.Sprintf("Downloading and verifying the base image for %s", *node))
cacheImage, err = image.DownloadAndVerify(p.Image, cacheImage, class.Checksum)
if err != nil {
fatalf("download image: %v", err)
}
copyEmitter := newProgressEmitter("build", 20, 34, fmt.Sprintf("Copying the verified base image for %s", *node), false)
if err := writer.WriteImageWithProgress(context.Background(), cacheImage, output, copyEmitter); err != nil {
fatalf("copy base image: %v", err)
}
emitStageProgress("build", 36, fmt.Sprintf("Preparing node-specific injected files for %s", *node))
files, err := plan.Files(inv, *node)
if err != nil {
fatalf("resolve files: %v", err)
}
rootfsProgress := map[string]service.RemoteProgressUpdate{
image.RootFSProgressFindingPartition: {Stage: "build", ProgressPct: 40, Message: fmt.Sprintf("Finding the Linux root partition for %s", *node)},
image.RootFSProgressExtracting: {Stage: "build", ProgressPct: 44, Message: fmt.Sprintf("Extracting the Linux root partition for %s", *node)},
image.RootFSProgressWritingFiles: {Stage: "build", ProgressPct: 50, Message: fmt.Sprintf("Injecting node-specific files into the root filesystem for %s", *node)},
image.RootFSProgressReplacing: {Stage: "build", ProgressPct: 56, Message: fmt.Sprintf("Replacing the root partition inside the replacement image for %s", *node)},
}
replaceEmitter := newProgressEmitter("build", 56, 58, fmt.Sprintf("Replacing the root partition inside the replacement image for %s", *node), true)
if err := image.InjectRootFSWithProgress(output, files, func(update image.RootFSProgressUpdate) {
if update.Step == image.RootFSProgressReplacing && update.TotalBytes > 0 {
replaceEmitter(update.WrittenBytes, update.TotalBytes)
return
}
if progressUpdate, ok := rootfsProgress[update.Step]; ok {
emitProgress(progressUpdate)
}
}); err != nil {
fatalf("inject rootfs: %v", err)
}
emitStageProgress("build", 58, fmt.Sprintf("Built the replacement image filesystem for %s", *node))
emitStageProgress("build", 59, fmt.Sprintf("Injecting boot-partition recovery files for %s", *node))
if err := image.InjectBootFiles(output, files); err != nil {
fatalf("inject boot files: %v", err)
}
emitStageProgress("build", 60, fmt.Sprintf("Compressing the replacement image for %s before upload", *node))
if err := exec.Command("xz", "-T0", "-z", "-f", output).Run(); err != nil {
fatalf("xz compress: %v", err)
}
compressedPath := output + ".xz"
info, err := os.Stat(compressedPath)
if err != nil {
fatalf("stat compressed image: %v", err)
}
metadataPath := filepath.Join(*workDir, "metadata.json")
builtAt := time.Now().UTC()
meta := map[string]any{
"node": *node,
"artifact_ref": *artifactRef,
"build_tag": *buildTag,
"built_at": builtAt.Format(time.RFC3339),
"size_bytes": info.Size(),
"compressed": true,
}
metaBytes, err := json.MarshalIndent(meta, "", " ")
if err != nil {
fatalf("encode metadata: %v", err)
}
if err := os.WriteFile(metadataPath, metaBytes, 0o644); err != nil {
fatalf("write metadata: %v", err)
}
emitStageProgress("build", 68, fmt.Sprintf("Compression complete for %s; preparing the Harbor upload", *node))
emitStageProgress("build", 70, fmt.Sprintf("Authenticating to Harbor for %s", *node))
if err := orasLogin(*harborRegistry, *harborUsername, *harborPassword); err != nil {
fatalf("oras login: %v", err)
}
taggedRef := fmt.Sprintf("%s:%s", *artifactRef, *buildTag)
emitStageProgress("build", 72, fmt.Sprintf("Uploading %s to Harbor", filepath.Base(compressedPath)))
if err := orasPush(taggedRef, compressedPath, metadataPath); err != nil {
fatalf("oras push: %v", err)
}
emitStageProgress("build", 76, fmt.Sprintf("Refreshing the latest Harbor tag for %s", *node))
if err := orasTag(taggedRef, "latest"); err != nil {
fatalf("oras tag latest: %v", err)
}
emitStageProgress("build", 80, fmt.Sprintf("Published the replacement image for %s to Harbor", *node))
summary := service.ArtifactSummary{
Node: *node,
Ref: fmt.Sprintf("%s:latest", *artifactRef),
BuildTag: *buildTag,
LocalPath: compressedPath,
Compressed: true,
UpdatedAt: builtAt,
SizeBytes: info.Size(),
}
writeStructuredResult(summary)
}
func writeStructuredResult(payload any) {
data, err := json.Marshal(payload)
if err != nil {
fatalf("encode result: %v", err)
}
if _, err := os.Stdout.Write(append(data, '\n')); err != nil {
fatalf("write stdout result: %v", err)
}
// Keep the result available in pod status so Metis does not depend on the
// kubelet log endpoint for successful worker runs.
_ = os.WriteFile("/dev/termination-log", data, 0o644)
}
func emitStageProgress(stage string, progress float64, message string) {
emitProgress(service.RemoteProgressUpdate{
Stage: stage,
ProgressPct: progress,
Message: message,
})
}
func emitProgress(update service.RemoteProgressUpdate) {
line := service.ProgressLogLine(update)
if strings.TrimSpace(line) == "" {
return
}
fmt.Fprintln(os.Stdout, line)
}
func newProgressEmitter(stage string, minPct, maxPct float64, message string, includeBytes bool) writer.ProgressFunc {
var mu sync.Mutex
lastPct := minPct
lastEmit := time.Time{}
return func(written, total int64) {
if total <= 0 {
return
}
pct := minPct + (float64(written)/float64(total))*(maxPct-minPct)
if pct > maxPct {
pct = maxPct
}
mu.Lock()
defer mu.Unlock()
now := time.Now()
if pct-lastPct < 0.5 && now.Sub(lastEmit) < time.Second {
return
}
update := service.RemoteProgressUpdate{
Stage: stage,
ProgressPct: pct,
Message: message,
}
if includeBytes {
update.WrittenBytes = written
update.TotalBytes = total
}
emitProgress(update)
lastPct = pct
lastEmit = now
}
}
func localFlashDevices(maxBytes int64, hostTmpDir string) ([]service.Device, error) {
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
out, err := cmd.Output()
if err != nil {
return nil, err
}
var payload struct {
Blockdevices []struct {
Name string `json:"name"`
Path string `json:"path"`
RM bool `json:"rm"`
Hotplug bool `json:"hotplug"`
Size any `json:"size"`
Model string `json:"model"`
Tran string `json:"tran"`
Type string `json:"type"`
Mountpoint string `json:"mountpoint"`
Children []struct {
Mountpoint string `json:"mountpoint"`
} `json:"children"`
} `json:"blockdevices"`
}
if err := json.Unmarshal(out, &payload); err != nil {
return nil, err
}
devices := make([]service.Device, 0, len(payload.Blockdevices)+1)
for _, dev := range payload.Blockdevices {
if dev.Type != "disk" {
continue
}
size := int64(0)
switch value := dev.Size.(type) {
case string:
size, _ = strconv.ParseInt(value, 10, 64)
case float64:
size = int64(value)
}
if size <= 0 || size > maxBytes {
continue
}
if dev.Tran != "usb" && !dev.RM {
continue
}
if strings.TrimSpace(dev.Mountpoint) != "" {
continue
}
if hasMountedChildren(dev.Children) {
continue
}
devices = append(devices, service.Device{
Name: dev.Name,
Path: dev.Path,
Model: strings.TrimSpace(dev.Model),
Transport: dev.Tran,
Type: dev.Type,
Removable: dev.RM,
Hotplug: dev.Hotplug,
SizeBytes: size,
})
}
displayPath := humanHostPath(hostTmpDir)
if strings.TrimSpace(displayPath) == "" {
displayPath = "/var/tmp/metis-flash-test"
}
devices = append(devices, service.Device{
Name: "host-tmp",
Path: "hosttmp://" + displayPath,
Model: "Host scratch",
Transport: "test",
Type: "file",
Note: fmt.Sprintf("Test-only host write target under %s", displayPath),
Removable: false,
Hotplug: false,
SizeBytes: 1,
})
return devices, nil
}
func localDeviceScore(device service.Device) int {
score := 0
if strings.HasPrefix(device.Path, "hosttmp://") {
return -100
}
if device.Transport == "usb" {
score += 50
}
if device.Removable {
score += 30
}
if device.Hotplug {
score += 20
}
if strings.Contains(strings.ToLower(device.Model), "sd") {
score += 10
}
return score
}
func hasMountedChildren(children []struct {
Mountpoint string `json:"mountpoint"`
}) bool {
for _, child := range children {
if strings.TrimSpace(child.Mountpoint) != "" {
return true
}
}
return false
}
func humanHostPath(path string) string {
path = strings.TrimSpace(path)
if strings.HasPrefix(path, "/host-tmp/") {
trimmed := strings.TrimPrefix(path, "/host-tmp/")
if trimmed == "" {
return "/tmp"
}
return "/tmp/" + trimmed
}
if path == "/host-tmp" {
return "/tmp"
}
return path
}
func getenvOr(key, fallback string) string {
value := strings.TrimSpace(os.Getenv(key))
if value == "" {
return fallback
}
return value
}