metis/pkg/service/app.go

956 lines
25 KiB
Go

package service
import (
"bufio"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"metis/pkg/facts"
"metis/pkg/image"
"metis/pkg/inventory"
"metis/pkg/plan"
"metis/pkg/sentinel"
"metis/pkg/writer"
)
type JobStatus string
const (
JobQueued JobStatus = "queued"
JobRunning JobStatus = "running"
JobDone JobStatus = "done"
JobError JobStatus = "error"
)
// Device describes a flashable block device.
type Device struct {
Name string `json:"name"`
Path string `json:"path"`
Model string `json:"model,omitempty"`
Transport string `json:"transport,omitempty"`
Type string `json:"type,omitempty"`
Removable bool `json:"removable"`
Hotplug bool `json:"hotplug"`
SizeBytes int64 `json:"size_bytes"`
}
// Job is a long-running Metis action visible in the UI.
type Job struct {
ID string `json:"id"`
Kind string `json:"kind"`
Node string `json:"node,omitempty"`
Host string `json:"host,omitempty"`
Device string `json:"device,omitempty"`
Status JobStatus `json:"status"`
Stage string `json:"stage,omitempty"`
Message string `json:"message,omitempty"`
Artifact string `json:"artifact,omitempty"`
ProgressPct float64 `json:"progress_pct"`
Written int64 `json:"written_bytes,omitempty"`
Total int64 `json:"total_bytes,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
}
// Event is a user-facing activity item for recent changes and runs.
type Event struct {
Time time.Time `json:"time"`
Kind string `json:"kind"`
Summary string `json:"summary"`
Details map[string]any `json:"details,omitempty"`
}
// SnapshotRecord stores the last fact snapshot pushed by a node sentinel.
type SnapshotRecord struct {
Node string `json:"node"`
CollectedAt time.Time `json:"collected_at"`
Snapshot sentinel.Snapshot `json:"snapshot"`
}
// PageState is the UI/API view model.
type PageState struct {
LocalHost string `json:"local_host"`
DefaultFlashHost string `json:"default_flash_host"`
SelectedHost string `json:"selected_host"`
FlashHosts []string `json:"flash_hosts"`
Nodes []inventory.NodeSpec `json:"nodes"`
Jobs []*Job `json:"jobs"`
Devices []Device `json:"devices"`
PreferredDevice string `json:"preferred_device,omitempty"`
DeviceError string `json:"device_error,omitempty"`
Events []Event `json:"events"`
Snapshots []SnapshotRecord `json:"snapshots"`
Targets map[string]facts.Targets `json:"targets"`
Artifacts map[string]ArtifactSummary `json:"artifacts"`
}
// ArtifactSummary describes the latest built image for a node.
type ArtifactSummary struct {
Path string `json:"path"`
UpdatedAt time.Time `json:"updated_at"`
SizeBytes int64 `json:"size_bytes"`
}
// App coordinates builds, flashes, sentinel snapshots, and the web UI state.
type App struct {
settings Settings
inventory *inventory.Inventory
metrics *Metrics
mu sync.RWMutex
jobs map[string]*Job
snapshots map[string]SnapshotRecord
targets map[string]facts.Targets
}
// NewApp creates a Metis service app instance.
func NewApp(settings Settings) (*App, error) {
if err := os.MkdirAll(settings.CacheDir, 0o755); err != nil {
return nil, err
}
if err := os.MkdirAll(settings.ArtifactDir, 0o755); err != nil {
return nil, err
}
if err := os.MkdirAll(filepath.Dir(settings.HistoryPath), 0o755); err != nil {
return nil, err
}
inv, err := inventory.Load(settings.InventoryPath)
if err != nil {
return nil, err
}
app := &App{
settings: settings,
inventory: inv,
metrics: NewMetrics(),
jobs: map[string]*Job{},
snapshots: map[string]SnapshotRecord{},
targets: map[string]facts.Targets{},
}
_ = app.loadSnapshots()
_ = app.loadTargets()
return app, nil
}
// State returns the current UI/API snapshot.
func (a *App) State(deviceHost string) PageState {
if strings.TrimSpace(deviceHost) == "" {
deviceHost = a.settings.DefaultFlashHost
}
a.mu.RLock()
jobs := make([]*Job, 0, len(a.jobs))
for _, job := range a.jobs {
copyJob := *job
jobs = append(jobs, &copyJob)
}
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].StartedAt.After(jobs[j].StartedAt)
})
snaps := make([]SnapshotRecord, 0, len(a.snapshots))
for _, snap := range a.snapshots {
snaps = append(snaps, snap)
}
aTargets := map[string]facts.Targets{}
for key, value := range a.targets {
aTargets[key] = value
}
a.mu.RUnlock()
sort.Slice(snaps, func(i, j int) bool {
return snaps[i].Node < snaps[j].Node
})
flashHosts := a.flashHosts()
devices, deviceErr := a.ListDevices(deviceHost)
preferredDevice := preferredDevice(devices)
return PageState{
LocalHost: a.settings.LocalHost,
DefaultFlashHost: a.settings.DefaultFlashHost,
SelectedHost: deviceHost,
FlashHosts: flashHosts,
Nodes: append([]inventory.NodeSpec{}, a.inventory.Nodes...),
Jobs: jobs,
Devices: devices,
PreferredDevice: preferredDevice,
DeviceError: errorString(deviceErr),
Events: a.recentEvents(40),
Snapshots: snaps,
Targets: aTargets,
Artifacts: a.artifacts(),
}
}
// Build starts a background image build for a node.
func (a *App) Build(node string) (*Job, error) {
if _, _, err := a.inventory.FindNode(node); err != nil {
return nil, err
}
job := a.newJob("build", node, "", "")
go a.runBuild(job, false)
return job, nil
}
// Replace starts a background build+flash workflow for a node.
func (a *App) Replace(node, host, device string) (*Job, error) {
if host == "" {
host = a.settings.DefaultFlashHost
}
if _, _, err := a.inventory.FindNode(node); err != nil {
return nil, err
}
if _, err := a.ensureDevice(host, device); err != nil {
return nil, err
}
job := a.newJob("replace", node, host, device)
go a.runBuild(job, true)
return job, nil
}
// StoreSnapshot records a pushed sentinel snapshot.
func (a *App) StoreSnapshot(record SnapshotRecord) error {
if record.Node == "" {
record.Node = record.Snapshot.Hostname
}
if record.CollectedAt.IsZero() {
record.CollectedAt = time.Now().UTC()
}
if strings.TrimSpace(record.Node) == "" {
return fmt.Errorf("snapshot node required")
}
a.mu.Lock()
a.snapshots[record.Node] = record
a.mu.Unlock()
if err := a.persistSnapshots(); err != nil {
return err
}
a.metrics.RecordSnapshot(record.Node, "ok", record.CollectedAt)
a.appendEvent(Event{
Time: record.CollectedAt,
Kind: "sentinel.snapshot",
Summary: fmt.Sprintf("Captured sentinel snapshot for %s", record.Node),
Details: map[string]any{
"node": record.Node,
"kernel": record.Snapshot.Kernel,
"k3s_version": record.Snapshot.K3sVersion,
},
})
return nil
}
// WatchSentinel recomputes class targets and logs meaningful drift.
func (a *App) WatchSentinel() (*Event, error) {
a.mu.RLock()
snaps := make([]facts.Snapshot, 0, len(a.snapshots))
for _, snap := range a.snapshots {
snaps = append(snaps, facts.Snapshot{
Hostname: snap.Node,
Kernel: snap.Snapshot.Kernel,
OSImage: snap.Snapshot.OSImage,
K3sVersion: firstLine(snap.Snapshot.K3sVersion),
Containerd: firstLine(snap.Snapshot.Containerd),
PackageSample: snap.Snapshot.PackageSample,
DropInsSample: snap.Snapshot.DropInsSample,
})
}
prevTargets := map[string]facts.Targets{}
for key, value := range a.targets {
prevTargets[key] = value
}
a.mu.RUnlock()
nextTargets := facts.RecommendTargets(a.inventory, snaps)
changes := diffTargets(prevTargets, nextTargets)
a.mu.Lock()
a.targets = nextTargets
a.mu.Unlock()
if err := a.persistTargets(); err != nil {
return nil, err
}
event := &Event{
Time: time.Now().UTC(),
Kind: "sentinel.watch",
Summary: "Metis sentinel watch completed with no template changes",
Details: map[string]any{
"classes": len(nextTargets),
"changes": 0,
},
}
if len(changes) > 0 {
event.Summary = fmt.Sprintf("Metis sentinel watch detected %d template change(s)", len(changes))
event.Details["changes"] = changes
}
a.appendEvent(*event)
a.metrics.RecordWatch("ok")
a.metrics.SetDriftTargets(nextTargets, len(changes))
return event, nil
}
// ListDevices returns locally attached removable media that are safe candidates for flashing.
func (a *App) ListDevices(host string) ([]Device, error) {
if host == "" {
host = a.settings.DefaultFlashHost
}
if !a.supportsLocalMedia(host) {
return nil, fmt.Errorf("flash host %s is listed for planning, but this Metis instance only has direct removable-media access on %s", host, a.settings.LocalHost)
}
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
out, err := cmd.Output()
if err != nil {
return nil, err
}
var payload struct {
Blockdevices []struct {
Name string `json:"name"`
Path string `json:"path"`
RM bool `json:"rm"`
Hotplug bool `json:"hotplug"`
Size any `json:"size"`
Model string `json:"model"`
Tran string `json:"tran"`
Type string `json:"type"`
} `json:"blockdevices"`
}
if err := json.Unmarshal(out, &payload); err != nil {
return nil, err
}
devices := make([]Device, 0)
for _, dev := range payload.Blockdevices {
if dev.Type != "disk" {
continue
}
size := int64(0)
switch value := dev.Size.(type) {
case string:
size, _ = strconv.ParseInt(value, 10, 64)
case float64:
size = int64(value)
}
if size <= 0 || size > a.settings.MaxDeviceBytes {
continue
}
if dev.Tran != "usb" && !dev.RM && !dev.Hotplug {
continue
}
devices = append(devices, Device{
Name: dev.Name,
Path: dev.Path,
Model: strings.TrimSpace(dev.Model),
Transport: dev.Tran,
Type: dev.Type,
Removable: dev.RM,
Hotplug: dev.Hotplug,
SizeBytes: size,
})
}
sort.Slice(devices, func(i, j int) bool {
left := deviceScore(devices[i])
right := deviceScore(devices[j])
if left != right {
return left > right
}
if devices[i].SizeBytes != devices[j].SizeBytes {
return devices[i].SizeBytes < devices[j].SizeBytes
}
return devices[i].Path < devices[j].Path
})
return devices, nil
}
func (a *App) runBuild(job *Job, flash bool) {
a.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "download"
j.Message = "Fetching and verifying base image"
j.ProgressPct = 5
})
output := a.artifactPath(job.Node)
cacheDir := a.settings.CacheDir
planData, err := plan.Build(a.inventory, job.Node, output, cacheDir)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
_, class, err := a.inventory.FindNode(job.Node)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
cacheImage := filepath.Join(cacheDir, cachedImageName(planData.Image))
cacheImage, err = image.DownloadAndVerify(planData.Image, cacheImage, class.Checksum)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
a.setJob(job.ID, func(j *Job) {
j.Stage = "copy"
j.Message = "Copying base image into artifact"
j.ProgressPct = 24
})
if err := writer.WriteImage(context.Background(), cacheImage, output); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
files, err := plan.Files(a.inventory, job.Node)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
a.setJob(job.ID, func(j *Job) {
j.Stage = "inject"
j.Message = "Injecting node-specific rootfs config"
j.ProgressPct = 70
})
if err := image.InjectRootFS(output, files); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
a.metrics.RecordBuild(job.Node, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.build",
Summary: fmt.Sprintf("Built replacement image for %s", job.Node),
Details: map[string]any{"node": job.Node, "artifact": output},
})
if !flash {
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
j.Message = "Image build complete"
j.ProgressPct = 100
j.Artifact = output
})
return
}
a.setJob(job.ID, func(j *Job) {
j.Stage = "preflight"
j.Message = "Validating device and deleting stale node object"
j.ProgressPct = 78
j.Artifact = output
})
if _, err := a.ensureDevice(job.Host, job.Device); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
if err := deleteNodeObject(job.Node); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "node.delete.warning",
Summary: fmt.Sprintf("Could not delete stale Kubernetes node object for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
if err := a.flashArtifact(job.ID, output); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
a.metrics.RecordFlash(job.Node, job.Host, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.flash",
Summary: fmt.Sprintf("Flashed %s image to %s on %s", job.Node, job.Device, job.Host),
Details: map[string]any{"node": job.Node, "device": job.Device, "host": job.Host},
})
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
j.Message = fmt.Sprintf("Flash complete. Move the card into %s and power-cycle it.", j.Node)
j.ProgressPct = 100
j.Artifact = output
})
}
func (a *App) flashArtifact(jobID, artifact string) error {
info, err := os.Stat(artifact)
if err != nil {
return err
}
a.setJob(jobID, func(j *Job) {
j.Stage = "flash"
j.Message = "Writing image to removable media"
j.ProgressPct = 82
j.Total = info.Size()
})
err = writer.WriteImageWithProgress(context.Background(), artifact, a.job(jobID).Device, func(written, total int64) {
pct := 82.0
if total > 0 {
pct = 82.0 + (float64(written)/float64(total))*17.0
}
a.setJob(jobID, func(j *Job) {
j.Written = written
j.Total = total
j.ProgressPct = pct
j.Message = fmt.Sprintf("Flashing %s of %s", humanBytes(written), humanBytes(total))
})
})
return err
}
func (a *App) ensureDevice(host, path string) (*Device, error) {
if strings.TrimSpace(path) == "" {
return nil, fmt.Errorf("select removable media before starting a flash run")
}
devices, err := a.ListDevices(host)
if err != nil {
return nil, err
}
for _, device := range devices {
if device.Path == path {
return &device, nil
}
}
return nil, fmt.Errorf("device %s is not a current removable flash candidate", path)
}
func (a *App) newJob(kind, node, host, device string) *Job {
job := &Job{
ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()),
Kind: kind,
Node: node,
Host: host,
Device: device,
Status: JobQueued,
ProgressPct: 0,
StartedAt: time.Now().UTC(),
}
a.mu.Lock()
a.jobs[job.ID] = job
a.mu.Unlock()
return job
}
func (a *App) job(id string) *Job {
a.mu.RLock()
defer a.mu.RUnlock()
return a.jobs[id]
}
func (a *App) setJob(id string, update func(*Job)) {
a.mu.Lock()
defer a.mu.Unlock()
job := a.jobs[id]
if job == nil {
return
}
update(job)
}
func (a *App) failJob(id string, err error) {
a.completeJob(id, func(j *Job) {
j.Status = JobError
j.Error = err.Error()
j.Message = err.Error()
})
}
func (a *App) completeJob(id string, update func(*Job)) {
a.mu.Lock()
defer a.mu.Unlock()
job := a.jobs[id]
if job == nil {
return
}
update(job)
if job.Status != JobError {
job.Status = JobDone
}
job.FinishedAt = time.Now().UTC()
}
func (a *App) appendEvent(event Event) {
line, err := json.Marshal(event)
if err != nil {
return
}
f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return
}
defer f.Close()
_, _ = f.Write(append(line, '\n'))
}
func (a *App) recentEvents(limit int) []Event {
f, err := os.Open(a.settings.HistoryPath)
if err != nil {
return nil
}
defer f.Close()
events := make([]Event, 0, limit)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
var event Event
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
continue
}
events = append(events, event)
}
if len(events) > limit {
events = events[len(events)-limit:]
}
for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 {
events[i], events[j] = events[j], events[i]
}
return events
}
func (a *App) artifacts() map[string]ArtifactSummary {
result := map[string]ArtifactSummary{}
for _, node := range a.inventory.Nodes {
path := a.artifactPath(node.Name)
info, err := os.Stat(path)
if err != nil {
continue
}
result[node.Name] = ArtifactSummary{
Path: path,
UpdatedAt: info.ModTime().UTC(),
SizeBytes: info.Size(),
}
}
return result
}
func (a *App) artifactPath(node string) string {
return filepath.Join(a.settings.ArtifactDir, fmt.Sprintf("%s.img", node))
}
func cachedImageName(source string) string {
return strings.TrimSuffix(filepath.Base(source), ".xz")
}
func (a *App) flashHosts() []string {
hosts := map[string]struct{}{}
for _, host := range a.settings.FlashHosts {
if value := strings.TrimSpace(host); value != "" {
hosts[value] = struct{}{}
}
}
for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} {
if value := strings.TrimSpace(host); value != "" {
hosts[value] = struct{}{}
}
}
for _, host := range clusterNodeNames() {
hosts[host] = struct{}{}
}
out := make([]string, 0, len(hosts))
for host := range hosts {
out = append(out, host)
}
sort.Strings(out)
if a.settings.DefaultFlashHost == "" {
return out
}
return moveToFront(out, a.settings.DefaultFlashHost)
}
func (a *App) loadSnapshots() error {
data, err := os.ReadFile(a.settings.SnapshotsPath)
if err != nil {
return err
}
var snapshots map[string]SnapshotRecord
if err := json.Unmarshal(data, &snapshots); err != nil {
return err
}
a.mu.Lock()
a.snapshots = snapshots
a.mu.Unlock()
for _, snap := range snapshots {
a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt)
}
return nil
}
func (a *App) persistSnapshots() error {
a.mu.RLock()
data, err := json.MarshalIndent(a.snapshots, "", " ")
a.mu.RUnlock()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil {
return err
}
return os.WriteFile(a.settings.SnapshotsPath, data, 0o644)
}
func (a *App) loadTargets() error {
data, err := os.ReadFile(a.settings.TargetsPath)
if err != nil {
return err
}
var targets map[string]facts.Targets
if err := json.Unmarshal(data, &targets); err != nil {
return err
}
a.mu.Lock()
a.targets = targets
a.mu.Unlock()
a.metrics.SetDriftTargets(targets, 0)
return nil
}
func (a *App) persistTargets() error {
a.mu.RLock()
data, err := json.MarshalIndent(a.targets, "", " ")
a.mu.RUnlock()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil {
return err
}
return os.WriteFile(a.settings.TargetsPath, data, 0o644)
}
func diffTargets(prev, next map[string]facts.Targets) []string {
classes := map[string]struct{}{}
for class := range prev {
classes[class] = struct{}{}
}
for class := range next {
classes[class] = struct{}{}
}
out := make([]string, 0)
for class := range classes {
if !targetsEqual(prev[class], next[class]) {
out = append(out, class)
}
}
sort.Strings(out)
return out
}
func targetsEqual(a, b facts.Targets) bool {
if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion {
return false
}
if len(a.Packages) != len(b.Packages) {
return false
}
for key, value := range a.Packages {
if b.Packages[key] != value {
return false
}
}
return true
}
func humanBytes(value int64) string {
const unit = 1024
if value < unit {
return fmt.Sprintf("%d B", value)
}
div, exp := int64(unit), 0
for n := value / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp])
}
func firstLine(value string) string {
value = strings.TrimSpace(value)
if idx := strings.IndexByte(value, '\n'); idx >= 0 {
return strings.TrimSpace(value[:idx])
}
return value
}
func preferredDevice(devices []Device) string {
if len(devices) == 0 {
return ""
}
return devices[0].Path
}
func errorString(err error) string {
if err == nil {
return ""
}
return err.Error()
}
func (a *App) supportsLocalMedia(host string) bool {
host = strings.TrimSpace(host)
return host == "" || host == a.settings.LocalHost || host == a.settings.DefaultFlashHost
}
func deviceScore(device Device) int {
score := 0
model := strings.ToLower(strings.TrimSpace(device.Model))
switch {
case strings.Contains(model, "microsd"), strings.Contains(model, "micro sd"):
score += 60
case strings.Contains(model, "sdxc"), strings.Contains(model, "sdhc"), strings.Contains(model, "sd "):
score += 50
case strings.Contains(model, "card"), strings.Contains(model, "reader"):
score += 40
}
if device.Removable {
score += 20
}
if device.Hotplug {
score += 10
}
if device.Transport == "usb" {
score += 5
}
if strings.HasPrefix(device.Name, "mmcblk") {
score += 25
}
return score
}
func moveToFront(values []string, preferred string) []string {
if preferred == "" || len(values) < 2 {
return values
}
out := append([]string{}, values...)
for idx, value := range out {
if value != preferred {
continue
}
copy(out[1:idx+1], out[:idx])
out[0] = preferred
return out
}
return out
}
func deleteNodeObject(node string) error {
if err := deleteNodeObjectInCluster(node); err == nil {
return nil
}
cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found")
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out)))
}
return nil
}
func deleteNodeObjectInCluster(node string) error {
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
if host == "" || port == "" {
return errors.New("not running in cluster")
}
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
return err
}
caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
if err != nil {
return err
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return errors.New("append kubernetes CA")
}
client := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: pool},
},
}
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("https://%s:%s/api/v1/nodes/%s", host, port, node), nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(string(token)))
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
return nil
}
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("delete node %s failed: %s: %s", node, resp.Status, strings.TrimSpace(string(body)))
}
func clusterNodeNames() []string {
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
if host == "" || port == "" {
return nil
}
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
return nil
}
caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
if err != nil {
return nil
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil
}
client := &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: pool},
},
}
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://%s:%s/api/v1/nodes", host, port), nil)
if err != nil {
return nil
}
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(string(token)))
resp, err := client.Do(req)
if err != nil {
return nil
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil
}
var payload struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
} `json:"items"`
}
if err := json.NewDecoder(io.LimitReader(resp.Body, 1<<20)).Decode(&payload); err != nil {
return nil
}
names := make([]string, 0, len(payload.Items))
for _, item := range payload.Items {
if name := strings.TrimSpace(item.Metadata.Name); name != "" {
names = append(names, name)
}
}
sort.Strings(names)
return names
}