package service import ( "bufio" "encoding/json" "errors" "fmt" "os" "os/exec" "path/filepath" "sort" "strings" "sync" "time" "metis/pkg/facts" "metis/pkg/inventory" "metis/pkg/sentinel" ) type JobStatus string const ( JobQueued JobStatus = "queued" JobRunning JobStatus = "running" JobDone JobStatus = "done" JobError JobStatus = "error" ) // Device describes a flashable block device. type Device struct { Name string `json:"name"` Path string `json:"path"` Model string `json:"model,omitempty"` Transport string `json:"transport,omitempty"` Type string `json:"type,omitempty"` Note string `json:"note,omitempty"` Removable bool `json:"removable"` Hotplug bool `json:"hotplug"` SizeBytes int64 `json:"size_bytes"` } // Job is a long-running Metis action visible in the UI. type Job struct { ID string `json:"id"` Kind string `json:"kind"` Node string `json:"node,omitempty"` Host string `json:"host,omitempty"` Builder string `json:"builder,omitempty"` Device string `json:"device,omitempty"` Status JobStatus `json:"status"` Stage string `json:"stage,omitempty"` StageStartedAt time.Time `json:"stage_started_at,omitempty"` Message string `json:"message,omitempty"` Artifact string `json:"artifact,omitempty"` ProgressPct float64 `json:"progress_pct"` Written int64 `json:"written_bytes,omitempty"` Total int64 `json:"total_bytes,omitempty"` Error string `json:"error,omitempty"` StartedAt time.Time `json:"started_at"` UpdatedAt time.Time `json:"updated_at,omitempty"` FinishedAt time.Time `json:"finished_at,omitempty"` } // Event is a user-facing activity item for recent changes and runs. type Event struct { Time time.Time `json:"time"` Kind string `json:"kind"` Summary string `json:"summary"` Details map[string]any `json:"details,omitempty"` } // SnapshotRecord stores the last fact snapshot pushed by a node sentinel. type SnapshotRecord struct { Node string `json:"node"` CollectedAt time.Time `json:"collected_at"` Snapshot sentinel.Snapshot `json:"snapshot"` } // PageState is the UI/API view model. type PageState struct { LocalHost string `json:"local_host"` DefaultFlashHost string `json:"default_flash_host"` SelectedHost string `json:"selected_host"` FlashHosts []string `json:"flash_hosts"` Nodes []inventory.NodeSpec `json:"nodes"` Jobs []*Job `json:"jobs"` Devices []Device `json:"devices"` PreferredDevice string `json:"preferred_device,omitempty"` DeviceError string `json:"device_error,omitempty"` Events []Event `json:"events"` Snapshots []SnapshotRecord `json:"snapshots"` Targets map[string]facts.Targets `json:"targets"` Artifacts map[string]ArtifactSummary `json:"artifacts"` } // ArtifactSummary describes the latest built image for a node. type ArtifactSummary struct { Node string `json:"node,omitempty"` Ref string `json:"ref,omitempty"` BuildTag string `json:"build_tag,omitempty"` LocalPath string `json:"local_path,omitempty"` HostPath string `json:"host_path,omitempty"` BuilderHost string `json:"builder_host,omitempty"` Compressed bool `json:"compressed,omitempty"` UpdatedAt time.Time `json:"updated_at"` SizeBytes int64 `json:"size_bytes"` } // App coordinates builds, flashes, sentinel snapshots, and the web UI state. type App struct { settings Settings inventory *inventory.Inventory metrics *Metrics mu sync.RWMutex jobs map[string]*Job snapshots map[string]SnapshotRecord targets map[string]facts.Targets artifactStore map[string]ArtifactSummary } // NewApp creates a Metis service app instance. func NewApp(settings Settings) (*App, error) { if err := os.MkdirAll(settings.CacheDir, 0o755); err != nil { return nil, err } if err := os.MkdirAll(settings.ArtifactDir, 0o755); err != nil { return nil, err } if err := os.MkdirAll(filepath.Dir(settings.HistoryPath), 0o755); err != nil { return nil, err } inv, err := inventory.Load(settings.InventoryPath) if err != nil { return nil, err } app := &App{ settings: settings, inventory: inv, metrics: NewMetrics(), jobs: map[string]*Job{}, snapshots: map[string]SnapshotRecord{}, targets: map[string]facts.Targets{}, artifactStore: map[string]ArtifactSummary{}, } _ = app.loadSnapshots() _ = app.loadTargets() _ = app.loadArtifacts() return app, nil } // State returns the current UI/API snapshot. func (a *App) State(deviceHost string) PageState { if strings.TrimSpace(deviceHost) == "" { deviceHost = a.settings.DefaultFlashHost } a.mu.RLock() jobs := make([]*Job, 0, len(a.jobs)) for _, job := range a.jobs { copyJob := *job jobs = append(jobs, ©Job) } sort.Slice(jobs, func(i, j int) bool { return jobs[i].StartedAt.After(jobs[j].StartedAt) }) snaps := make([]SnapshotRecord, 0, len(a.snapshots)) for _, snap := range a.snapshots { snaps = append(snaps, snap) } aTargets := map[string]facts.Targets{} for key, value := range a.targets { aTargets[key] = value } a.mu.RUnlock() sort.Slice(snaps, func(i, j int) bool { return snaps[i].Node < snaps[j].Node }) flashHosts := a.flashHosts() devices, deviceErr := a.ListDevices(deviceHost) preferredDevice := preferredDevice(devices) return PageState{ LocalHost: a.settings.LocalHost, DefaultFlashHost: a.settings.DefaultFlashHost, SelectedHost: deviceHost, FlashHosts: flashHosts, Nodes: append([]inventory.NodeSpec{}, a.inventory.Nodes...), Jobs: jobs, Devices: devices, PreferredDevice: preferredDevice, DeviceError: errorString(deviceErr), Events: a.recentEvents(40), Snapshots: snaps, Targets: aTargets, Artifacts: a.artifacts(), } } // Build starts a background image build for a node. func (a *App) Build(node string) (*Job, error) { if _, _, err := a.inventory.FindNode(node); err != nil { return nil, err } job := a.newJob("build", node, "", "") go a.runBuild(job, false) return job, nil } // Replace starts a background build+flash workflow for a node. func (a *App) Replace(node, host, device string) (*Job, error) { if host == "" { host = a.settings.DefaultFlashHost } if _, _, err := a.inventory.FindNode(node); err != nil { return nil, err } if _, err := a.ensureDevice(host, device); err != nil { return nil, err } job := a.newJob("replace", node, host, device) go a.runBuild(job, true) return job, nil } // StoreSnapshot records a pushed sentinel snapshot. func (a *App) StoreSnapshot(record SnapshotRecord) error { if record.Node == "" { record.Node = record.Snapshot.Hostname } if record.CollectedAt.IsZero() { record.CollectedAt = time.Now().UTC() } if strings.TrimSpace(record.Node) == "" { return fmt.Errorf("snapshot node required") } a.mu.Lock() a.snapshots[record.Node] = record a.mu.Unlock() if err := a.persistSnapshots(); err != nil { return err } a.metrics.RecordSnapshot(record.Node, "ok", record.CollectedAt) a.appendEvent(Event{ Time: record.CollectedAt, Kind: "sentinel.snapshot", Summary: fmt.Sprintf("Captured sentinel snapshot for %s", record.Node), Details: map[string]any{ "node": record.Node, "kernel": record.Snapshot.Kernel, "k3s_version": record.Snapshot.K3sVersion, }, }) return nil } // WatchSentinel recomputes class targets and logs meaningful drift. func (a *App) WatchSentinel() (*Event, error) { a.mu.RLock() snaps := make([]facts.Snapshot, 0, len(a.snapshots)) for _, snap := range a.snapshots { snaps = append(snaps, facts.Snapshot{ Hostname: snap.Node, Kernel: snap.Snapshot.Kernel, OSImage: snap.Snapshot.OSImage, K3sVersion: firstLine(snap.Snapshot.K3sVersion), Containerd: firstLine(snap.Snapshot.Containerd), PackageSample: snap.Snapshot.PackageSample, DropInsSample: snap.Snapshot.DropInsSample, }) } prevTargets := map[string]facts.Targets{} for key, value := range a.targets { prevTargets[key] = value } a.mu.RUnlock() nextTargets := facts.RecommendTargets(a.inventory, snaps) changes := diffTargets(prevTargets, nextTargets) a.mu.Lock() a.targets = nextTargets a.mu.Unlock() if err := a.persistTargets(); err != nil { return nil, err } event := &Event{ Time: time.Now().UTC(), Kind: "sentinel.watch", Summary: "Metis sentinel watch completed with no template changes", Details: map[string]any{ "classes": len(nextTargets), "changes": 0, }, } if len(changes) > 0 { event.Summary = fmt.Sprintf("Metis sentinel watch detected %d template change(s)", len(changes)) event.Details["changes"] = changes } a.appendEvent(*event) a.metrics.RecordWatch("ok") a.metrics.SetDriftTargets(nextTargets, len(changes)) return event, nil } func (a *App) newJob(kind, node, host, device string) *Job { job := &Job{ ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()), Kind: kind, Node: node, Host: host, Device: device, Status: JobQueued, ProgressPct: 0, StartedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), } a.mu.Lock() a.jobs[job.ID] = job a.mu.Unlock() return job } func (a *App) job(id string) *Job { a.mu.RLock() defer a.mu.RUnlock() return a.jobs[id] } func (a *App) setJob(id string, update func(*Job)) { a.mu.Lock() defer a.mu.Unlock() job := a.jobs[id] if job == nil { return } update(job) job.UpdatedAt = time.Now().UTC() } func (a *App) failJob(id string, err error) { a.completeJob(id, func(j *Job) { j.Status = JobError j.Error = err.Error() j.Message = err.Error() }) } func (a *App) completeJob(id string, update func(*Job)) { a.mu.Lock() defer a.mu.Unlock() job := a.jobs[id] if job == nil { return } update(job) if job.Status != JobError { job.Status = JobDone } job.UpdatedAt = time.Now().UTC() job.FinishedAt = time.Now().UTC() } func (a *App) appendEvent(event Event) { line, err := json.Marshal(event) if err != nil { return } f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) if err != nil { return } defer f.Close() _, _ = f.Write(append(line, '\n')) } func (a *App) recentEvents(limit int) []Event { f, err := os.Open(a.settings.HistoryPath) if err != nil { return nil } defer f.Close() events := make([]Event, 0, limit) scanner := bufio.NewScanner(f) for scanner.Scan() { var event Event if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { continue } events = append(events, event) } if len(events) > limit { events = events[len(events)-limit:] } for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 { events[i], events[j] = events[j], events[i] } return events } func cachedImageName(source string) string { return strings.TrimSuffix(filepath.Base(source), ".xz") } func (a *App) flashHosts() []string { hosts := map[string]struct{}{} for _, host := range a.settings.FlashHosts { if value := strings.TrimSpace(host); value != "" { hosts[value] = struct{}{} } } for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} { if value := strings.TrimSpace(host); value != "" { hosts[value] = struct{}{} } } for _, node := range clusterNodes() { if value := strings.TrimSpace(node.Name); value != "" { hosts[value] = struct{}{} } } out := make([]string, 0, len(hosts)) for host := range hosts { out = append(out, host) } sort.Strings(out) if a.settings.DefaultFlashHost == "" { return out } return moveToFront(out, a.settings.DefaultFlashHost) } func (a *App) loadSnapshots() error { data, err := os.ReadFile(a.settings.SnapshotsPath) if err != nil { return err } var snapshots map[string]SnapshotRecord if err := json.Unmarshal(data, &snapshots); err != nil { return err } a.mu.Lock() a.snapshots = snapshots a.mu.Unlock() for _, snap := range snapshots { a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt) } return nil } func (a *App) persistSnapshots() error { a.mu.RLock() data, err := json.MarshalIndent(a.snapshots, "", " ") a.mu.RUnlock() if err != nil { return err } if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil { return err } return os.WriteFile(a.settings.SnapshotsPath, data, 0o644) } func (a *App) loadTargets() error { data, err := os.ReadFile(a.settings.TargetsPath) if err != nil { return err } var targets map[string]facts.Targets if err := json.Unmarshal(data, &targets); err != nil { return err } a.mu.Lock() a.targets = targets a.mu.Unlock() a.metrics.SetDriftTargets(targets, 0) return nil } func (a *App) persistTargets() error { a.mu.RLock() data, err := json.MarshalIndent(a.targets, "", " ") a.mu.RUnlock() if err != nil { return err } if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil { return err } return os.WriteFile(a.settings.TargetsPath, data, 0o644) } func diffTargets(prev, next map[string]facts.Targets) []string { classes := map[string]struct{}{} for class := range prev { classes[class] = struct{}{} } for class := range next { classes[class] = struct{}{} } out := make([]string, 0) for class := range classes { if !targetsEqual(prev[class], next[class]) { out = append(out, class) } } sort.Strings(out) return out } func targetsEqual(a, b facts.Targets) bool { if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion { return false } if len(a.Packages) != len(b.Packages) { return false } for key, value := range a.Packages { if b.Packages[key] != value { return false } } return true } func humanBytes(value int64) string { const unit = 1024 if value < unit { return fmt.Sprintf("%d B", value) } div, exp := int64(unit), 0 for n := value / unit; n >= unit; n /= unit { div *= unit exp++ } return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp]) } func firstLine(value string) string { value = strings.TrimSpace(value) if idx := strings.IndexByte(value, '\n'); idx >= 0 { return strings.TrimSpace(value[:idx]) } return value } func preferredDevice(devices []Device) string { if len(devices) == 0 { return "" } return devices[0].Path } func errorString(err error) string { if err == nil { return "" } return err.Error() } func deviceScore(device Device) int { score := 0 model := strings.ToLower(strings.TrimSpace(device.Model)) switch { case strings.Contains(model, "microsd"), strings.Contains(model, "micro sd"): score += 60 case strings.Contains(model, "sdxc"), strings.Contains(model, "sdhc"), strings.Contains(model, "sd "): score += 50 case strings.Contains(model, "card"), strings.Contains(model, "reader"): score += 40 } if device.Removable { score += 20 } if device.Hotplug { score += 10 } if device.Transport == "usb" { score += 5 } if strings.HasPrefix(device.Name, "mmcblk") { score += 25 } return score } func moveToFront(values []string, preferred string) []string { if preferred == "" || len(values) < 2 { return values } out := append([]string{}, values...) for idx, value := range out { if value != preferred { continue } copy(out[1:idx+1], out[:idx]) out[0] = preferred return out } return out } func deleteNodeObject(node string) error { if err := deleteNodeObjectInCluster(node); err == nil { return nil } cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found") if out, err := cmd.CombinedOutput(); err != nil { return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out))) } return nil } func deleteNodeObjectInCluster(node string) error { kube, err := inClusterKubeClient() if err != nil { return errors.New("not running in cluster") } return kube.deleteRequest(fmt.Sprintf("/api/v1/nodes/%s", node)) }