#!/usr/bin/env python3 """Generate OpenSearch Observability seed objects and render them into ConfigMaps. Usage: scripts/logging_render_observability.py --build # rebuild JSON + ConfigMap scripts/logging_render_observability.py # re-render ConfigMap from JSON """ from __future__ import annotations import argparse import json import textwrap from dataclasses import dataclass from pathlib import Path ROOT = Path(__file__).resolve().parents[1] OBS_DIR = ROOT / "services" / "logging" / "observability" APPS_PATH = OBS_DIR / "applications.json" QUERIES_PATH = OBS_DIR / "saved_queries.json" VIS_PATH = OBS_DIR / "saved_visualizations.json" CONFIG_PATH = ROOT / "services" / "logging" / "opensearch-observability-objects.yaml" CONFIG_TEMPLATE = textwrap.dedent( """# {relative_path} # Generated by scripts/logging_render_observability.py --build apiVersion: v1 kind: ConfigMap metadata: name: opensearch-observability-objects namespace: logging data: applications.json: | {applications} saved_queries.json: | {queries} saved_visualizations.json: | {visualizations} """ ) DEFAULT_RANGE = {"start": "now-24h", "end": "now", "text": ""} DEFAULT_TIMESTAMP = {"name": "@timestamp", "type": "timestamp"} DEFAULT_FIELDS = {"text": "", "tokens": []} @dataclass(frozen=True) class AppSpec: name: str base_query: str kind: str = "kube" description: str = "" @dataclass(frozen=True) class QuerySpec: name: str query: str description: str = "" @dataclass(frozen=True) class VisualizationSpec: name: str query: str vis_type: str description: str = "" def source_query(index: str, where: str | None = None) -> str: query = f"source = {index}" if where: query += f" | where {where}" return query def error_filter(fields: list[str]) -> str: parts = [f"match({field}, 'error|exception|fail')" for field in fields] return " or ".join(parts) def saved_query(spec: QuerySpec) -> dict: return { "name": spec.name, "description": spec.description, "query": spec.query, "selected_date_range": DEFAULT_RANGE, "selected_timestamp": DEFAULT_TIMESTAMP, "selected_fields": DEFAULT_FIELDS, } def saved_visualization(spec: VisualizationSpec) -> dict: return { "name": spec.name, "description": spec.description, "query": spec.query, "type": spec.vis_type, "selected_date_range": DEFAULT_RANGE, "selected_timestamp": DEFAULT_TIMESTAMP, "selected_fields": DEFAULT_FIELDS, } def build_objects() -> tuple[list[dict], list[dict], list[dict]]: kube_error = error_filter(["log", "message"]) journald_error = error_filter(["MESSAGE"]) apps = [ AppSpec("bstein-dev-home", source_query("kube-*", "kubernetes.namespace_name = 'bstein-dev-home'")), AppSpec( "pegasus", source_query( "kube-*", "kubernetes.namespace_name = 'jellyfin' and kubernetes.labels.app = 'pegasus'", ), ), AppSpec( "jellyfin", source_query( "kube-*", "kubernetes.namespace_name = 'jellyfin' and kubernetes.labels.app = 'jellyfin'", ), ), AppSpec("vaultwarden", source_query("kube-*", "kubernetes.namespace_name = 'vaultwarden'")), AppSpec("mailu", source_query("kube-*", "kubernetes.namespace_name = 'mailu-mailserver'")), AppSpec("nextcloud", source_query("kube-*", "kubernetes.namespace_name = 'nextcloud'")), AppSpec("gitea", source_query("kube-*", "kubernetes.namespace_name = 'gitea'")), AppSpec("jenkins", source_query("kube-*", "kubernetes.namespace_name = 'jenkins'")), AppSpec("harbor", source_query("kube-*", "kubernetes.namespace_name = 'harbor'")), AppSpec("vault", source_query("kube-*", "kubernetes.namespace_name = 'vault'")), AppSpec("keycloak", source_query("kube-*", "kubernetes.namespace_name = 'sso'")), AppSpec("flux-system", source_query("kube-*", "kubernetes.namespace_name = 'flux-system'")), AppSpec("comms", source_query("kube-*", "kubernetes.namespace_name = 'comms'")), AppSpec( "element-web", source_query( "kube-*", "kubernetes.namespace_name = 'comms' and kubernetes.container_name = 'element-web'", ), ), AppSpec( "element-call", source_query( "kube-*", "kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'element-call'", ), ), AppSpec( "matrix-synapse", source_query( "kube-*", "kubernetes.namespace_name = 'comms' and kubernetes.container_name = 'synapse'", ), ), AppSpec( "livekit", source_query( "kube-*", "kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'livekit'", ), ), AppSpec( "coturn", source_query( "kube-*", "kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'coturn'", ), ), AppSpec( "lesavka", source_query("journald-*", "_HOSTNAME = 'titan-jh'"), kind="journald", ), ] applications = [ { "name": app.name, "description": app.description, "baseQuery": app.base_query, "servicesEntities": [], "traceGroups": [], } for app in apps ] queries = [ saved_query(QuerySpec("kube logs", source_query("kube-*"))), saved_query(QuerySpec("kube errors", f"{source_query('kube-*')} | where {kube_error}")), saved_query(QuerySpec("journald logs", source_query("journald-*"))), saved_query(QuerySpec("journald errors", f"{source_query('journald-*')} | where {journald_error}")), ] for app in apps: query_base = app.base_query error_clause = journald_error if app.kind == "journald" else kube_error queries.append(saved_query(QuerySpec(f"{app.name} logs", query_base))) queries.append(saved_query(QuerySpec(f"{app.name} errors", f"{query_base} | where {error_clause}"))) visualizations = [ saved_visualization( VisualizationSpec( "[Kube] Logs per hour", "source = kube-* | stats count() as log_count by span(`@timestamp`, 1h)", "line", ) ), saved_visualization( VisualizationSpec( "[Kube] Errors per hour", f"source = kube-* | where {kube_error} | stats count() as error_count by span(`@timestamp`, 1h)", "line", ) ), saved_visualization( VisualizationSpec( "[Kube] Top namespaces", "source = kube-* | stats count() as log_count by kubernetes.namespace_name | sort - log_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Kube] Top error namespaces", f"source = kube-* | where {kube_error} | stats count() as error_count by kubernetes.namespace_name | sort - error_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Kube] Top pods", "source = kube-* | stats count() as log_count by kubernetes.pod_name | sort - log_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Kube] Top error pods", f"source = kube-* | where {kube_error} | stats count() as error_count by kubernetes.pod_name | sort - error_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Kube] Top nodes", "source = kube-* | stats count() as log_count by kubernetes.node_name | sort - log_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Journald] Top units", "source = journald-* | stats count() as log_count by _SYSTEMD_UNIT | sort - log_count", "bar", ) ), saved_visualization( VisualizationSpec( "[Journald] Top error units", f"source = journald-* | where {journald_error} | stats count() as error_count by _SYSTEMD_UNIT | sort - error_count", "bar", ) ), ] return applications, queries, visualizations def write_json(payload: list[dict], path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2) + "\n") def render_configmap(apps_path: Path, queries_path: Path, vis_path: Path, output_path: Path) -> None: relative_path = output_path.relative_to(ROOT) applications = indent_payload(apps_path) queries = indent_payload(queries_path) visualizations = indent_payload(vis_path) output_path.write_text( CONFIG_TEMPLATE.format( relative_path=relative_path, applications=applications, queries=queries, visualizations=visualizations, ) ) def indent_payload(path: Path) -> str: lines = path.read_text().splitlines() return "\n".join(" " + line for line in lines) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--build", action="store_true", help="Regenerate JSON payloads and ConfigMap") args = parser.parse_args() if args.build: applications, queries, visualizations = build_objects() write_json(applications, APPS_PATH) write_json(queries, QUERIES_PATH) write_json(visualizations, VIS_PATH) if not (APPS_PATH.exists() and QUERIES_PATH.exists() and VIS_PATH.exists()): raise SystemExit("Missing observability JSON payloads. Run with --build first.") render_configmap(APPS_PATH, QUERIES_PATH, VIS_PATH, CONFIG_PATH) if __name__ == "__main__": main()