titan-iac/services/logging/scripts/opensearch_observability_seed.py

141 lines
4.1 KiB
Python
Raw Permalink Normal View History

2026-01-13 09:59:39 -03:00
import json
import os
import time
import urllib.error
import urllib.request
OSD_URL = os.environ.get(
"OSD_URL",
"http://opensearch-dashboards.logging.svc.cluster.local:5601",
).rstrip("/")
OBJECT_DIR = "/config"
def request_json(method, path, payload=None):
url = f"{OSD_URL}{path}"
data = None
headers = {"osd-xsrf": "true"}
if payload is not None:
data = json.dumps(payload).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=data, method=method)
for key, value in headers.items():
req.add_header(key, value)
try:
with urllib.request.urlopen(req, timeout=30) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8")
raise SystemExit(f"{method} {path} failed: {exc.code} {detail}")
if not body:
return {}
return json.loads(body)
def wait_ready():
for _ in range(60):
try:
request_json("GET", "/api/status")
return
except Exception:
time.sleep(5)
raise SystemExit("OpenSearch Dashboards did not become ready in time")
def load_payload(name):
path = os.path.join(OBJECT_DIR, name)
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
def index_by_name(items, key):
lookup = {}
for item in items:
obj = item.get(key, {})
name = obj.get("name")
if not name:
continue
lookup.setdefault(name, item)
return lookup
def ensure_applications(apps):
existing = request_json("GET", "/api/observability/application/").get("data", [])
existing_by_name = {app.get("name"): app for app in existing if app.get("name")}
for app in apps:
name = app.get("name")
if not name:
continue
current = existing_by_name.get(name)
if not current:
request_json("POST", "/api/observability/application/", app)
print(f"created application: {name}")
continue
if app.get("baseQuery") != current.get("baseQuery"):
print(f"baseQuery differs for {name}; skipping update")
update_body = {}
for key in ("description", "servicesEntities", "traceGroups"):
if app.get(key, "") != current.get(key, ""):
update_body[key] = app.get(key, "")
if update_body:
request_json(
"PUT",
"/api/observability/application/",
{"appId": current["id"], "updateBody": update_body},
)
print(f"updated application: {name}")
def ensure_saved_objects(objects, object_type, endpoint):
existing = request_json(
"GET",
f"/api/observability/event_analytics/saved_objects?objectType={object_type}",
).get("observabilityObjectList", [])
key = "savedQuery" if object_type == "savedQuery" else "savedVisualization"
existing_by_name = index_by_name(existing, key)
for obj in objects:
name = obj.get("name")
if not name:
continue
current = existing_by_name.get(name)
if not current:
request_json("POST", endpoint, {"object": obj})
print(f"created {object_type}: {name}")
continue
current_body = current.get(key, {})
if current_body != obj:
request_json(
"PUT",
endpoint,
{"object_id": current["objectId"], "object": obj},
)
print(f"updated {object_type}: {name}")
def main():
wait_ready()
applications = load_payload("applications.json")
queries = load_payload("saved_queries.json")
visualizations = load_payload("saved_visualizations.json")
ensure_applications(applications)
ensure_saved_objects(queries, "savedQuery", "/api/observability/event_analytics/saved_objects/query")
ensure_saved_objects(
visualizations,
"savedVisualization",
"/api/observability/event_analytics/saved_objects/vis",
)
if __name__ == "__main__":
main()