package service import ( "context" "encoding/json" "fmt" "os/exec" "sort" "strings" "time" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/metrics" ) var gitOpsKubectlOutput = runGitOpsKubectlOutput type gitOpsResourceList struct { Items []gitOpsResource `json:"items"` } type gitOpsResource struct { Metadata struct { Name string `json:"name"` Namespace string `json:"namespace"` } `json:"metadata"` Spec struct { Suspend bool `json:"suspend"` Ref struct { Branch string `json:"branch"` Tag string `json:"tag"` SemVer string `json:"semver"` } `json:"ref"` URL string `json:"url"` Path string `json:"path"` SourceRef struct { Name string `json:"name"` Namespace string `json:"namespace"` } `json:"sourceRef"` Chart struct { Spec struct { Chart string `json:"chart"` Version string `json:"version"` } `json:"spec"` } `json:"chart"` } `json:"spec"` Status struct { Artifact struct { Revision string `json:"revision"` } `json:"artifact"` LastAppliedRevision string `json:"lastAppliedRevision"` LastAttemptedRevision string `json:"lastAttemptedRevision"` Conditions []struct { Type string `json:"type"` Status string `json:"status"` Reason string `json:"reason"` } `json:"conditions"` History []struct { ChartName string `json:"chartName"` ChartVersion string `json:"chartVersion"` AppVersion string `json:"appVersion"` Digest string `json:"digest"` } `json:"history"` } `json:"status"` } // maybeStartGitOpsSnapshot starts a bounded background scrape when the GitOps // sample interval has elapsed. // Signature: (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool. // Why: Kubernetes API reads must not block UPS polling, especially during // emergency power events when the daemon's first job is still safe shutdown. func (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool { if !d.cfg.GitOps.Enabled || d.exporter == nil || running { return running } interval := time.Duration(d.cfg.GitOps.PollSeconds) * time.Second if interval <= 0 { interval = 60 * time.Second } now := time.Now() if lastRun != nil && !lastRun.IsZero() && now.Sub(*lastRun) < interval { return running } if lastRun != nil { *lastRun = now } cfg := d.cfg exporter := d.exporter logger := d.log go func() { defer func() { select { case done <- struct{}{}: default: } }() scrapeCtx, cancel := context.WithTimeout(ctx, 25*time.Second) defer cancel() snapshot := collectGitOpsSnapshot(scrapeCtx, cfg) exporter.UpdateGitOpsSnapshot(snapshot) if !snapshot.ScrapeSuccess && logger != nil { logger.Printf("warning: gitops metrics scrape partial: %s", gitOpsErrorSummary(snapshot.Errors)) } }() return true } // collectGitOpsSnapshot queries Flux custom resources and converts them into // Prometheus-safe metric state. // Signature: collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot. // Why: the Grafana overview needs current branch/readiness/suspend state, which // is object status rather than controller operational telemetry. func collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot { snapshot := metrics.GitOpsSnapshot{ UpdatedAt: time.Now().UTC(), ScrapeSuccess: true, Errors: map[string]string{}, } if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "gitrepositories.source.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil { snapshot.Errors["gitrepository"] = err.Error() } else if sources, err := parseGitRepositories(raw); err != nil { snapshot.Errors["gitrepository"] = err.Error() } else { snapshot.FluxSources = sources } if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil { snapshot.Errors["kustomization"] = err.Error() } else if kustomizations, err := parseKustomizations(raw); err != nil { snapshot.Errors["kustomization"] = err.Error() } else { snapshot.Kustomizations = kustomizations } if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil { snapshot.Errors["helmrelease"] = err.Error() } else if releases, err := parseHelmReleases(raw); err != nil { snapshot.Errors["helmrelease"] = err.Error() } else { snapshot.HelmReleases = releases } snapshot.ScrapeSuccess = len(snapshot.Errors) == 0 return snapshot } // parseGitRepositories extracts Git source branch/revision/readiness from Flux // GitRepository JSON. // Signature: parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error). // Why: keeping status parsing isolated makes it testable without a live Flux // API and protects the daemon loop from brittle JSON traversal code. func parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error) { var list gitOpsResourceList if err := json.Unmarshal(raw, &list); err != nil { return nil, fmt.Errorf("decode gitrepositories: %w", err) } out := make([]metrics.GitOpsFluxSource, 0, len(list.Items)) for _, item := range list.Items { ready, reason := fluxReady(item.Status.Conditions) out = append(out, metrics.GitOpsFluxSource{ Namespace: defaultString(item.Metadata.Namespace, "default"), Name: item.Metadata.Name, URL: item.Spec.URL, Branch: sourceBranch(item), Revision: item.Status.Artifact.Revision, Ready: ready, Reason: reason, Suspended: item.Spec.Suspend, }) } sort.Slice(out, func(i, j int) bool { return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name }) return out, nil } // parseKustomizations extracts Kustomization health and source metadata from // Flux Kustomization JSON. // Signature: parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error). // Why: Kustomization status drives both the overview summary and the detailed // GitOps table, so the parser keeps the metric contract in one place. func parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error) { var list gitOpsResourceList if err := json.Unmarshal(raw, &list); err != nil { return nil, fmt.Errorf("decode kustomizations: %w", err) } out := make([]metrics.GitOpsKustomization, 0, len(list.Items)) for _, item := range list.Items { ready, reason := fluxReady(item.Status.Conditions) namespace := defaultString(item.Metadata.Namespace, "default") out = append(out, metrics.GitOpsKustomization{ Namespace: namespace, Name: item.Metadata.Name, Path: item.Spec.Path, SourceNamespace: defaultString(item.Spec.SourceRef.Namespace, namespace), SourceName: item.Spec.SourceRef.Name, Revision: firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision), Ready: ready, Reason: reason, Suspended: item.Spec.Suspend, }) } sort.Slice(out, func(i, j int) bool { return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name }) return out, nil } // parseHelmReleases extracts HelmRelease health and chart metadata from Flux // HelmRelease JSON. // Signature: parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error). // Why: HelmRelease status has different chart fields than Kustomizations, and // isolating it keeps dashboard metrics stable as Flux payloads evolve. func parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error) { var list gitOpsResourceList if err := json.Unmarshal(raw, &list); err != nil { return nil, fmt.Errorf("decode helmreleases: %w", err) } out := make([]metrics.GitOpsHelmRelease, 0, len(list.Items)) for _, item := range list.Items { ready, reason := fluxReady(item.Status.Conditions) chart := item.Spec.Chart.Spec.Chart version := item.Spec.Chart.Spec.Version appVersion := "" revision := firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision) if len(item.Status.History) > 0 { latest := item.Status.History[0] chart = firstNonEmpty(latest.ChartName, chart) version = firstNonEmpty(latest.ChartVersion, version) appVersion = latest.AppVersion revision = firstNonEmpty(latest.Digest, revision) } out = append(out, metrics.GitOpsHelmRelease{ Namespace: defaultString(item.Metadata.Namespace, "default"), Name: item.Metadata.Name, Chart: chart, Version: version, AppVersion: appVersion, Revision: revision, Ready: ready, Reason: reason, Suspended: item.Spec.Suspend, }) } sort.Slice(out, func(i, j int) bool { return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name }) return out, nil } // gitOpsKubectlJSON runs kubectl with the configured kubeconfig and a short // request timeout. // Signature: gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error). // Why: every GitOps scrape should fail quickly instead of stealing time from // UPS safety polling during outage recovery. func gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error) { finalArgs := []string{} if strings.TrimSpace(cfg.Kubeconfig) != "" { finalArgs = append(finalArgs, "--kubeconfig", cfg.Kubeconfig) } finalArgs = append(finalArgs, "--request-timeout=8s") finalArgs = append(finalArgs, args...) return gitOpsKubectlOutput(ctx, "kubectl", finalArgs...) } // runGitOpsKubectlOutput is the production command runner for GitOps snapshot // collection. // Signature: runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error). // Why: preserving stderr in errors makes live diagnosis possible when Flux CRD // reads fail on a recovering control plane. func runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error) { cmd := exec.CommandContext(ctx, name, args...) out, err := cmd.CombinedOutput() if err != nil { trimmed := strings.TrimSpace(string(out)) if trimmed == "" { return nil, err } return nil, fmt.Errorf("%w: %s", err, trimmed) } return out, nil } // fluxReady resolves a Flux Ready condition into a boolean and reason. // Signature: fluxReady(conditions []struct { Type string; Status string; Reason string }) (bool, string). // Why: Flux resources all use Ready conditions, so one helper keeps status // interpretation consistent across GitRepository, Kustomization, and HelmRelease. func fluxReady(conditions []struct { Type string `json:"type"` Status string `json:"status"` Reason string `json:"reason"` }) (bool, string) { for _, condition := range conditions { if condition.Type == "Ready" { return strings.EqualFold(condition.Status, "True"), defaultString(condition.Reason, "ReadyUnknown") } } return false, "ReadyMissing" } // sourceBranch returns the most useful source reference label for a // GitRepository. // Signature: sourceBranch(item gitOpsResource) string. // Why: branch is preferred for Atlas, but tags and semver refs should still // render clearly if Flux is ever pinned differently. func sourceBranch(item gitOpsResource) string { return firstNonEmpty(item.Spec.Ref.Branch, item.Spec.Ref.Tag, item.Spec.Ref.SemVer, "unknown") } // defaultString trims a value and returns a fallback when it is empty. // Signature: defaultString(value string, fallback string) string. // Why: Flux omits some optional fields; dashboards are easier to read with // explicit fallback labels. func defaultString(value string, fallback string) string { value = strings.TrimSpace(value) if value == "" { return fallback } return value } // firstNonEmpty returns the first non-empty trimmed value. // Signature: firstNonEmpty(values ...string) string. // Why: Flux has several revision fields depending on resource kind and // reconciliation stage, so callers can express preferred fallback order. func firstNonEmpty(values ...string) string { for _, value := range values { value = strings.TrimSpace(value) if value != "" { return value } } return "" } // gitOpsErrorSummary renders resource-family scrape errors for daemon logs. // Signature: gitOpsErrorSummary(errors map[string]string) string. // Why: a compact sorted summary keeps recurring scrape warnings readable in // journald during partial cluster recovery. func gitOpsErrorSummary(errors map[string]string) string { if len(errors) == 0 { return "none" } keys := make([]string, 0, len(errors)) for key := range errors { keys = append(keys, key) } sort.Strings(keys) parts := make([]string, 0, len(keys)) for _, key := range keys { parts = append(parts, key+"="+errors[key]) } return strings.Join(parts, "; ") }