diff --git a/services/bstein-dev-home/chat-ai-gateway-configmap.yaml b/services/bstein-dev-home/chat-ai-gateway-configmap.yaml deleted file mode 100644 index 17ed95b..0000000 --- a/services/bstein-dev-home/chat-ai-gateway-configmap.yaml +++ /dev/null @@ -1,78 +0,0 @@ -# services/bstein-dev-home/chat-ai-gateway-configmap.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: chat-ai-gateway - namespace: bstein-dev-home -data: - gateway.py: | - import json - import os - from http.server import BaseHTTPRequestHandler, HTTPServer - from urllib import request, error - - UPSTREAM = os.environ.get("UPSTREAM_URL", "http://bstein-dev-home-backend/api/chat") - KEY_MATRIX = os.environ.get("CHAT_KEY_MATRIX", "") - KEY_HOMEPAGE = os.environ.get("CHAT_KEY_HOMEPAGE", "") - - ALLOWED = {k for k in (KEY_MATRIX, KEY_HOMEPAGE) if k} - - class Handler(BaseHTTPRequestHandler): - def _send_json(self, code: int, payload: dict): - body = json.dumps(payload).encode() - self.send_response(code) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - def do_GET(self): # noqa: N802 - if self.path in ("/healthz", "/"): - return self._send_json(200, {"ok": True}) - return self._send_json(404, {"error": "not_found"}) - - def do_POST(self): # noqa: N802 - if self.path != "/": - return self._send_json(404, {"error": "not_found"}) - - key = self.headers.get("x-api-key", "") - if not key or key not in ALLOWED: - return self._send_json(401, {"error": "unauthorized"}) - - length = int(self.headers.get("content-length", "0") or "0") - raw = self.rfile.read(length) if length else b"{}" - - try: - upstream_req = request.Request( - UPSTREAM, - data=raw, - headers={"Content-Type": "application/json"}, - method="POST", - ) - with request.urlopen(upstream_req, timeout=90) as resp: - data = resp.read() - self.send_response(resp.status) - for k, v in resp.headers.items(): - if k.lower() in ("content-length", "connection", "server", "date"): - continue - self.send_header(k, v) - self.send_header("Content-Length", str(len(data))) - self.end_headers() - self.wfile.write(data) - except error.HTTPError as e: - data = e.read() if hasattr(e, "read") else b"" - self.send_response(e.code) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(data))) - self.end_headers() - self.wfile.write(data) - except Exception: - return self._send_json(502, {"error": "bad_gateway"}) - - def main(): - port = int(os.environ.get("PORT", "8080")) - httpd = HTTPServer(("0.0.0.0", port), Handler) - httpd.serve_forever() - - if __name__ == "__main__": - main() diff --git a/services/bstein-dev-home/kustomization.yaml b/services/bstein-dev-home/kustomization.yaml index 3a423ef..4847d2b 100644 --- a/services/bstein-dev-home/kustomization.yaml +++ b/services/bstein-dev-home/kustomization.yaml @@ -7,7 +7,6 @@ resources: - image.yaml - rbac.yaml - portal-e2e-client-secret-sync-rbac.yaml - - chat-ai-gateway-configmap.yaml - chat-ai-gateway-deployment.yaml - chat-ai-gateway-service.yaml - frontend-deployment.yaml @@ -19,15 +18,21 @@ resources: - ingress.yaml configMapGenerator: + - name: chat-ai-gateway + namespace: bstein-dev-home + files: + - gateway.py=scripts/gateway.py + options: + disableNameSuffixHash: true - name: vaultwarden-cred-sync-script namespace: bstein-dev-home files: - - vaultwarden_cred_sync.py=../../scripts/vaultwarden_cred_sync.py + - vaultwarden_cred_sync.py=scripts/vaultwarden_cred_sync.py options: disableNameSuffixHash: true - name: portal-onboarding-e2e-tests namespace: bstein-dev-home files: - - test_portal_onboarding_flow.py=../../scripts/tests/test_portal_onboarding_flow.py + - test_portal_onboarding_flow.py=scripts/test_portal_onboarding_flow.py options: disableNameSuffixHash: true diff --git a/services/bstein-dev-home/scripts/gateway.py b/services/bstein-dev-home/scripts/gateway.py new file mode 100644 index 0000000..3ca2fa1 --- /dev/null +++ b/services/bstein-dev-home/scripts/gateway.py @@ -0,0 +1,70 @@ +import json +import os +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib import request, error + +UPSTREAM = os.environ.get("UPSTREAM_URL", "http://bstein-dev-home-backend/api/chat") +KEY_MATRIX = os.environ.get("CHAT_KEY_MATRIX", "") +KEY_HOMEPAGE = os.environ.get("CHAT_KEY_HOMEPAGE", "") + +ALLOWED = {k for k in (KEY_MATRIX, KEY_HOMEPAGE) if k} + +class Handler(BaseHTTPRequestHandler): + def _send_json(self, code: int, payload: dict): + body = json.dumps(payload).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self): # noqa: N802 + if self.path in ("/healthz", "/"): + return self._send_json(200, {"ok": True}) + return self._send_json(404, {"error": "not_found"}) + + def do_POST(self): # noqa: N802 + if self.path != "/": + return self._send_json(404, {"error": "not_found"}) + + key = self.headers.get("x-api-key", "") + if not key or key not in ALLOWED: + return self._send_json(401, {"error": "unauthorized"}) + + length = int(self.headers.get("content-length", "0") or "0") + raw = self.rfile.read(length) if length else b"{}" + + try: + upstream_req = request.Request( + UPSTREAM, + data=raw, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(upstream_req, timeout=90) as resp: + data = resp.read() + self.send_response(resp.status) + for k, v in resp.headers.items(): + if k.lower() in ("content-length", "connection", "server", "date"): + continue + self.send_header(k, v) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + except error.HTTPError as e: + data = e.read() if hasattr(e, "read") else b"" + self.send_response(e.code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + except Exception: + return self._send_json(502, {"error": "bad_gateway"}) + +def main(): + port = int(os.environ.get("PORT", "8080")) + httpd = HTTPServer(("0.0.0.0", port), Handler) + httpd.serve_forever() + +if __name__ == "__main__": + main() diff --git a/scripts/tests/test_portal_onboarding_flow.py b/services/bstein-dev-home/scripts/test_portal_onboarding_flow.py similarity index 100% rename from scripts/tests/test_portal_onboarding_flow.py rename to services/bstein-dev-home/scripts/test_portal_onboarding_flow.py diff --git a/scripts/vaultwarden_cred_sync.py b/services/bstein-dev-home/scripts/vaultwarden_cred_sync.py similarity index 100% rename from scripts/vaultwarden_cred_sync.py rename to services/bstein-dev-home/scripts/vaultwarden_cred_sync.py diff --git a/services/comms/atlasbot-configmap.yaml b/services/comms/atlasbot-configmap.yaml deleted file mode 100644 index be9640e..0000000 --- a/services/comms/atlasbot-configmap.yaml +++ /dev/null @@ -1,629 +0,0 @@ -# services/comms/atlasbot-configmap.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: atlasbot -data: - bot.py: | - import collections - import json - import os - import re - import ssl - import time - from typing import Any - from urllib import error, parse, request - - BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") - AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") - USER = os.environ["BOT_USER"] - PASSWORD = os.environ["BOT_PASS"] - ROOM_ALIAS = "#othrys:live.bstein.dev" - - OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") - MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") - API_KEY = os.environ.get("CHAT_API_KEY", "") - - KB_DIR = os.environ.get("KB_DIR", "") - VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") - - BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") - SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") - - MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) - MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) - - TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) - HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") - STOPWORDS = { - "the", - "and", - "for", - "with", - "this", - "that", - "from", - "into", - "what", - "how", - "why", - "when", - "where", - "which", - "who", - "can", - "could", - "should", - "would", - "please", - "help", - "atlas", - "othrys", - } - - METRIC_HINT_WORDS = { - "health", - "status", - "down", - "slow", - "error", - "unknown_error", - "timeout", - "crash", - "crashloop", - "restart", - "restarts", - "pending", - "unreachable", - "latency", - } - - def _tokens(text: str) -> list[str]: - toks = [t.lower() for t in TOKEN_RE.findall(text or "")] - return [t for t in toks if t not in STOPWORDS and len(t) >= 2] - - - # Mention detection (Matrix rich mentions + plain @atlas). - MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] - MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] - MENTION_RE = re.compile( - r"(? str: - t = token.strip() - if not t: - return "" - if t.startswith("@") and ":" in t: - return t - t = t.lstrip("@") - if ":" in t: - return f"@{t}" - return f"@{t}:{SERVER_NAME}" - - MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} - - def is_mentioned(content: dict, body: str) -> bool: - if MENTION_RE.search(body or "") is not None: - return True - mentions = content.get("m.mentions", {}) - user_ids = mentions.get("user_ids", []) - if not isinstance(user_ids, list): - return False - return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) - - - # Matrix HTTP helper. - def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): - url = (base or BASE) + path - data = None - headers = {} - if body is not None: - data = json.dumps(body).encode() - headers["Content-Type"] = "application/json" - if token: - headers["Authorization"] = f"Bearer {token}" - r = request.Request(url, data=data, headers=headers, method=method) - with request.urlopen(r, timeout=timeout) as resp: - raw = resp.read() - return json.loads(raw.decode()) if raw else {} - - def login() -> str: - login_user = normalize_user_id(USER) - payload = { - "type": "m.login.password", - "identifier": {"type": "m.id.user", "user": login_user}, - "password": PASSWORD, - } - res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) - return res["access_token"] - - def resolve_alias(token: str, alias: str) -> str: - enc = parse.quote(alias) - res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) - return res["room_id"] - - def join_room(token: str, room: str): - req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) - - def send_msg(token: str, room: str, text: str): - path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" - req("POST", path, token, body={"msgtype": "m.text", "body": text}) - - - # Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). - KB = {"catalog": {}, "runbooks": []} - _HOST_INDEX: dict[str, list[dict]] = {} - _NAME_INDEX: set[str] = set() - - def _load_json_file(path: str) -> Any | None: - try: - with open(path, "rb") as f: - return json.loads(f.read().decode("utf-8")) - except Exception: - return None - - def load_kb(): - global KB, _HOST_INDEX, _NAME_INDEX - if not KB_DIR: - return - catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} - runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] - KB = {"catalog": catalog, "runbooks": runbooks} - - host_index: dict[str, list[dict]] = collections.defaultdict(list) - for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: - host = (ep.get("host") or "").lower() - if host: - host_index[host].append(ep) - _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} - - names: set[str] = set() - for s in catalog.get("services", []) if isinstance(catalog, dict) else []: - if isinstance(s, dict) and s.get("name"): - names.add(str(s["name"]).lower()) - for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: - if isinstance(w, dict) and w.get("name"): - names.add(str(w["name"]).lower()) - _NAME_INDEX = names - - def kb_retrieve(query: str, *, limit: int = 3) -> str: - q = (query or "").strip() - if not q or not KB.get("runbooks"): - return "" - ql = q.lower() - q_tokens = _tokens(q) - if not q_tokens: - return "" - - scored: list[tuple[int, dict]] = [] - for doc in KB.get("runbooks", []): - if not isinstance(doc, dict): - continue - title = str(doc.get("title") or "") - body = str(doc.get("body") or "") - tags = doc.get("tags") or [] - entrypoints = doc.get("entrypoints") or [] - hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() - score = 0 - for t in set(q_tokens): - if t in hay: - score += 3 if t in title.lower() else 1 - for h in entrypoints: - if isinstance(h, str) and h.lower() in ql: - score += 4 - if score: - scored.append((score, doc)) - - scored.sort(key=lambda x: x[0], reverse=True) - picked = [d for _, d in scored[:limit]] - if not picked: - return "" - - parts: list[str] = ["Atlas KB (retrieved):"] - used = 0 - for d in picked: - path = d.get("path") or "" - title = d.get("title") or path - body = (d.get("body") or "").strip() - snippet = body[:900].strip() - chunk = f"- {title} ({path})\n{snippet}" - if used + len(chunk) > MAX_KB_CHARS: - break - parts.append(chunk) - used += len(chunk) - return "\n".join(parts).strip() - - def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: - q = (query or "").strip() - if not q or not KB.get("catalog"): - return "", [] - ql = q.lower() - hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} - - # Also match by known workload/service names. - for t in _tokens(ql): - if t in _NAME_INDEX: - hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} - - edges: list[tuple[str, str]] = [] - lines: list[str] = [] - for host in sorted(hosts): - for ep in _HOST_INDEX.get(host, []): - backend = ep.get("backend") or {} - ns = backend.get("namespace") or "" - svc = backend.get("service") or "" - path = ep.get("path") or "/" - if not svc: - continue - wk = backend.get("workloads") or [] - wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" - lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") - for w in wk: - if isinstance(w, dict) and w.get("name"): - edges.append((ns, str(w["name"]))) - if not lines: - return "", [] - return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges - - - # Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. - _K8S_TOKEN: str | None = None - _K8S_CTX: ssl.SSLContext | None = None - - def _k8s_context() -> ssl.SSLContext: - global _K8S_CTX - if _K8S_CTX is not None: - return _K8S_CTX - ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - ctx = ssl.create_default_context(cafile=ca_path) - _K8S_CTX = ctx - return ctx - - def _k8s_token() -> str: - global _K8S_TOKEN - if _K8S_TOKEN: - return _K8S_TOKEN - token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" - with open(token_path, "r", encoding="utf-8") as f: - _K8S_TOKEN = f.read().strip() - return _K8S_TOKEN - - def k8s_get(path: str, timeout: int = 8) -> dict: - host = os.environ.get("KUBERNETES_SERVICE_HOST") - port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" - if not host: - raise RuntimeError("k8s host missing") - url = f"https://{host}:{port}{path}" - headers = {"Authorization": f"Bearer {_k8s_token()}"} - r = request.Request(url, headers=headers, method="GET") - with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: - raw = resp.read() - return json.loads(raw.decode()) if raw else {} - - def k8s_pods(namespace: str) -> list[dict]: - data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") - items = data.get("items") or [] - return items if isinstance(items, list) else [] - - def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: - try: - pods = k8s_pods(namespace) - except Exception: - return "" - out: list[str] = [] - for p in pods: - md = p.get("metadata") or {} - st = p.get("status") or {} - name = md.get("name") or "" - if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): - continue - phase = st.get("phase") or "?" - cs = st.get("containerStatuses") or [] - restarts = 0 - ready = 0 - total = 0 - reason = st.get("reason") or "" - for c in cs if isinstance(cs, list) else []: - if not isinstance(c, dict): - continue - total += 1 - restarts += int(c.get("restartCount") or 0) - if c.get("ready"): - ready += 1 - state = c.get("state") or {} - if not reason and isinstance(state, dict): - waiting = state.get("waiting") or {} - if isinstance(waiting, dict) and waiting.get("reason"): - reason = waiting.get("reason") - extra = f" ({reason})" if reason else "" - out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") - return "\n".join(out[:20]) - - def flux_not_ready() -> str: - try: - data = k8s_get( - "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" - ) - except Exception: - return "" - items = data.get("items") or [] - bad: list[str] = [] - for it in items if isinstance(items, list) else []: - md = it.get("metadata") or {} - st = it.get("status") or {} - name = md.get("name") or "" - conds = st.get("conditions") or [] - ready = None - msg = "" - for c in conds if isinstance(conds, list) else []: - if isinstance(c, dict) and c.get("type") == "Ready": - ready = c.get("status") - msg = c.get("message") or "" - if ready not in ("True", True): - bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) - return "\n".join(bad[:10]) - - - # VictoriaMetrics (PromQL) helpers. - def vm_query(query: str, timeout: int = 8) -> dict | None: - try: - url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) - with request.urlopen(url, timeout=timeout) as resp: - return json.loads(resp.read().decode()) - except Exception: - return None - - def _vm_value_series(res: dict) -> list[dict]: - if not res or (res.get("status") != "success"): - return [] - data = res.get("data") or {} - result = data.get("result") or [] - return result if isinstance(result, list) else [] - - def vm_render_result(res: dict | None, limit: int = 12) -> str: - if not res: - return "" - series = _vm_value_series(res) - if not series: - return "" - out: list[str] = [] - for r in series[:limit]: - if not isinstance(r, dict): - continue - metric = r.get("metric") or {} - value = r.get("value") or [] - val = value[1] if isinstance(value, list) and len(value) > 1 else "" - # Prefer common labels if present. - label_parts = [] - for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): - if isinstance(metric, dict) and metric.get(k): - label_parts.append(f"{k}={metric.get(k)}") - if not label_parts and isinstance(metric, dict): - for k in sorted(metric.keys()): - if k.startswith("__"): - continue - label_parts.append(f"{k}={metric.get(k)}") - if len(label_parts) >= 4: - break - labels = ", ".join(label_parts) if label_parts else "series" - out.append(f"- {labels}: {val}") - return "\n".join(out) - - def vm_top_restarts(hours: int = 1) -> str: - q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" - res = vm_query(q) - if not res or (res.get("status") != "success"): - return "" - out: list[str] = [] - for r in (res.get("data") or {}).get("result") or []: - if not isinstance(r, dict): - continue - m = r.get("metric") or {} - v = r.get("value") or [] - ns = (m.get("namespace") or "").strip() - pod = (m.get("pod") or "").strip() - val = v[1] if isinstance(v, list) and len(v) > 1 else "" - if pod: - out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") - return "\n".join(out) - - def vm_cluster_snapshot() -> str: - parts: list[str] = [] - # Node readiness (kube-state-metrics). - ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') - not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') - if ready and not_ready: - try: - r = _vm_value_series(ready)[0]["value"][1] - nr = _vm_value_series(not_ready)[0]["value"][1] - parts.append(f"- nodes ready: {r} (not ready: {nr})") - except Exception: - pass - - phases = vm_query("sum by (phase) (kube_pod_status_phase)") - pr = vm_render_result(phases, limit=8) - if pr: - parts.append("Pod phases:") - parts.append(pr) - return "\n".join(parts).strip() - - - # Conversation state. - history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) - - def key_for(room_id: str, sender: str, is_dm: bool): - return (room_id, None) if is_dm else (room_id, sender) - - def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str: - parts: list[str] = [] - - kb = kb_retrieve(prompt) - if kb: - parts.append(kb) - - endpoints, edges = catalog_hints(prompt) - if endpoints: - parts.append(endpoints) - - if allow_tools: - # Scope pod summaries to relevant namespaces/workloads when possible. - prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) - for ns, name in (targets or []) + (edges or []): - if ns and name: - prefixes_by_ns[ns].add(name) - pod_lines: list[str] = [] - for ns in sorted(prefixes_by_ns.keys()): - summary = summarize_pods(ns, prefixes_by_ns[ns]) - if summary: - pod_lines.append(f"Pods (live):\n{summary}") - if pod_lines: - parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) - - flux_bad = flux_not_ready() - if flux_bad: - parts.append("Flux (not ready):\n" + flux_bad) - - p_l = (prompt or "").lower() - if any(w in p_l for w in METRIC_HINT_WORDS): - restarts = vm_top_restarts(1) - if restarts: - parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) - snap = vm_cluster_snapshot() - if snap: - parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) - - return "\n\n".join([p for p in parts if p]).strip() - - def ollama_reply(hist_key, prompt: str, *, context: str) -> str: - try: - system = ( - "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " - "Be helpful, direct, and concise. " - "Prefer answering with exact repo paths and Kubernetes resource names. " - "Never include or request secret values." - ) - transcript_parts = [system] - if context: - transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) - transcript_parts.extend(history[hist_key][-24:]) - transcript_parts.append(f"User: {prompt}") - transcript = "\n".join(transcript_parts) - - payload = {"model": MODEL, "message": transcript} - headers = {"Content-Type": "application/json"} - if API_KEY: - headers["x-api-key"] = API_KEY - r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) - with request.urlopen(r, timeout=20) as resp: - data = json.loads(resp.read().decode()) - reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help." - history[hist_key].append(f"Atlas: {reply}") - return reply - except Exception: - return "I’m here — but I couldn’t reach the model backend." - - def sync_loop(token: str, room_id: str): - since = None - try: - res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) - since = res.get("next_batch") - except Exception: - pass - - while True: - params = {"timeout": 30000} - if since: - params["since"] = since - query = parse.urlencode(params) - try: - res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) - except Exception: - time.sleep(5) - continue - since = res.get("next_batch", since) - - # invites - for rid, data in res.get("rooms", {}).get("invite", {}).items(): - try: - join_room(token, rid) - except Exception: - pass - - # messages - for rid, data in res.get("rooms", {}).get("join", {}).items(): - timeline = data.get("timeline", {}).get("events", []) - joined_count = data.get("summary", {}).get("m.joined_member_count") - is_dm = joined_count is not None and joined_count <= 2 - - for ev in timeline: - if ev.get("type") != "m.room.message": - continue - content = ev.get("content", {}) - body = (content.get("body", "") or "").strip() - if not body: - continue - sender = ev.get("sender", "") - if sender == f"@{USER}:live.bstein.dev": - continue - - mentioned = is_mentioned(content, body) - hist_key = key_for(rid, sender, is_dm) - history[hist_key].append(f"{sender}: {body}") - history[hist_key] = history[hist_key][-80:] - - if not (is_dm or mentioned): - continue - - # Only do live cluster/metrics introspection in DMs. - allow_tools = is_dm - - promql = "" - if allow_tools: - m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) - if m: - promql = m.group(1).strip() - - # Attempt to scope tools to the most likely workloads when hostnames are mentioned. - targets: list[tuple[str, str]] = [] - for m in HOST_RE.finditer(body.lower()): - host = m.group(1).lower() - for ep in _HOST_INDEX.get(host, []): - backend = ep.get("backend") or {} - ns = backend.get("namespace") or "" - for w in backend.get("workloads") or []: - if isinstance(w, dict) and w.get("name"): - targets.append((ns, str(w["name"]))) - - context = build_context(body, allow_tools=allow_tools, targets=targets) - if allow_tools and promql: - res = vm_query(promql, timeout=20) - rendered = vm_render_result(res, limit=15) or "(no results)" - extra = "VictoriaMetrics (PromQL result):\n" + rendered - context = (context + "\n\n" + extra).strip() if context else extra - reply = ollama_reply(hist_key, body, context=context) - send_msg(token, rid, reply) - - def login_with_retry(): - last_err = None - for attempt in range(10): - try: - return login() - except Exception as exc: # noqa: BLE001 - last_err = exc - time.sleep(min(30, 2 ** attempt)) - raise last_err - - def main(): - load_kb() - token = login_with_retry() - try: - room_id = resolve_alias(token, ROOM_ALIAS) - join_room(token, room_id) - except Exception: - room_id = None - sync_loop(token, room_id) - - if __name__ == "__main__": - main() diff --git a/services/comms/guest-register-configmap.yaml b/services/comms/guest-register-configmap.yaml deleted file mode 100644 index a40d52c..0000000 --- a/services/comms/guest-register-configmap.yaml +++ /dev/null @@ -1,271 +0,0 @@ -# services/comms/guest-register-configmap.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: matrix-guest-register -data: - server.py: | - import base64 - import json - import os - import random - import secrets - from http.server import BaseHTTPRequestHandler, HTTPServer - from urllib import error, parse, request - - MAS_BASE = os.environ.get("MAS_BASE", "http://matrix-authentication-service:8080").rstrip("/") - MAS_ADMIN_API_BASE = os.environ.get("MAS_ADMIN_API_BASE", "http://matrix-authentication-service:8081/api/admin/v1").rstrip("/") - SYNAPSE_BASE = os.environ.get("SYNAPSE_BASE", "http://othrys-synapse-matrix-synapse:8008").rstrip("/") - SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") - - MAS_ADMIN_CLIENT_ID = os.environ["MAS_ADMIN_CLIENT_ID"] - MAS_ADMIN_CLIENT_SECRET_FILE = os.environ.get("MAS_ADMIN_CLIENT_SECRET_FILE", "/etc/mas/admin-client/client_secret") - MAS_ADMIN_SCOPE = os.environ.get("MAS_ADMIN_SCOPE", "urn:mas:admin") - RATE_WINDOW_SEC = int(os.environ.get("RATE_WINDOW_SEC", "60")) - RATE_MAX = int(os.environ.get("RATE_MAX", "30")) - _rate = {} # ip -> [window_start, count] - - ADJ = [ - "brisk","calm","eager","gentle","merry","nifty","rapid","sunny","witty","zesty", - "amber","bold","bright","crisp","daring","frosty","glad","jolly","lively","mellow", - "quiet","ripe","serene","spry","tidy","vivid","warm","wild","clever","kind", - ] - NOUN = [ - "otter","falcon","comet","ember","grove","harbor","meadow","raven","river","summit", - "breeze","cedar","cinder","cove","delta","forest","glade","lark","marsh","peak", - "pine","quartz","reef","ridge","sable","sage","shore","thunder","vale","zephyr", - ] - - def _json(method, url, *, headers=None, body=None, timeout=20): - hdrs = {"Content-Type": "application/json"} - if headers: - hdrs.update(headers) - data = None - if body is not None: - data = json.dumps(body).encode() - req = request.Request(url, data=data, headers=hdrs, method=method) - try: - with request.urlopen(req, timeout=timeout) as resp: - raw = resp.read() - payload = json.loads(raw.decode()) if raw else {} - return resp.status, payload - except error.HTTPError as e: - raw = e.read() - try: - payload = json.loads(raw.decode()) if raw else {} - except Exception: - payload = {} - return e.code, payload - - def _form(method, url, *, headers=None, fields=None, timeout=20): - hdrs = {"Content-Type": "application/x-www-form-urlencoded"} - if headers: - hdrs.update(headers) - data = parse.urlencode(fields or {}).encode() - req = request.Request(url, data=data, headers=hdrs, method=method) - try: - with request.urlopen(req, timeout=timeout) as resp: - raw = resp.read() - payload = json.loads(raw.decode()) if raw else {} - return resp.status, payload - except error.HTTPError as e: - raw = e.read() - try: - payload = json.loads(raw.decode()) if raw else {} - except Exception: - payload = {} - return e.code, payload - - _admin_token = None - _admin_token_at = 0.0 - - def _mas_admin_access_token(now): - global _admin_token, _admin_token_at - if _admin_token and (now - _admin_token_at) < 300: - return _admin_token - - with open(MAS_ADMIN_CLIENT_SECRET_FILE, encoding="utf-8") as fh: - client_secret = fh.read().strip() - basic = base64.b64encode(f"{MAS_ADMIN_CLIENT_ID}:{client_secret}".encode()).decode() - - status, payload = _form( - "POST", - f"{MAS_BASE}/oauth2/token", - headers={"Authorization": f"Basic {basic}"}, - fields={"grant_type": "client_credentials", "scope": MAS_ADMIN_SCOPE}, - timeout=20, - ) - if status != 200 or "access_token" not in payload: - raise RuntimeError("mas_admin_token_failed") - - _admin_token = payload["access_token"] - _admin_token_at = now - return _admin_token - - def _generate_localpart(): - return "guest-" + secrets.token_hex(6) - - def _generate_displayname(): - return f"{random.choice(ADJ)}-{random.choice(NOUN)}" - - def _admin_api(admin_token, method, path, body=None): - return _json( - method, - f"{MAS_ADMIN_API_BASE}{path}", - headers={"Authorization": f"Bearer {admin_token}"}, - body=body, - timeout=20, - ) - - def _create_user(admin_token, username): - status, payload = _admin_api(admin_token, "POST", "/users", {"username": username}) - if status != 201: - return status, None - user = payload.get("data") or {} - return status, user.get("id") - - def _set_password(admin_token, user_id, password): - status, _payload = _admin_api( - admin_token, - "POST", - f"/users/{parse.quote(user_id)}/set-password", - {"password": password}, - ) - return status in (200, 204) - - def _login_password(username, password): - payload = { - "type": "m.login.password", - "identifier": {"type": "m.id.user", "user": f"@{username}:{SERVER_NAME}"}, - "password": password, - } - status, data = _json( - "POST", - f"{MAS_BASE}/_matrix/client/v3/login", - body=payload, - timeout=20, - ) - if status != 200: - return None, None - return data.get("access_token"), data.get("device_id") - - def _set_display_name(access_token, user_id, displayname): - _json( - "PUT", - f"{SYNAPSE_BASE}/_matrix/client/v3/profile/{parse.quote(user_id, safe='')}/displayname", - headers={"Authorization": f"Bearer {access_token}"}, - body={"displayname": displayname}, - timeout=20, - ) - - def _rate_check(ip, now): - win, cnt = _rate.get(ip, (now, 0)) - if now - win > RATE_WINDOW_SEC: - _rate[ip] = (now, 1) - return True - if cnt >= RATE_MAX: - return False - _rate[ip] = (win, cnt + 1) - return True - - class Handler(BaseHTTPRequestHandler): - server_version = "matrix-guest-register" - - def _send_json(self, code, payload): - body = json.dumps(payload).encode() - self.send_response(code) - self.send_header("Content-Type", "application/json") - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") - self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - def do_OPTIONS(self): # noqa: N802 - self.send_response(204) - self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") - self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") - self.end_headers() - - def do_GET(self): # noqa: N802 - parsed = parse.urlparse(self.path) - if parsed.path in ("/healthz", "/"): - return self._send_json(200, {"ok": True}) - if parsed.path in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"): - return self._send_json(200, {"flows": [{"stages": []}]}) - return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) - - def do_POST(self): # noqa: N802 - parsed = parse.urlparse(self.path) - if parsed.path not in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"): - return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) - - qs = parse.parse_qs(parsed.query) - kind = (qs.get("kind") or ["user"])[0] - if kind != "guest": - return self._send_json( - 403, - { - "errcode": "M_FORBIDDEN", - "error": "Registration is disabled; use https://bstein.dev/request-access for accounts.", - }, - ) - - xfwd = self.headers.get("x-forwarded-for", "") - ip = (xfwd.split(",")[0].strip() if xfwd else "") or self.client_address[0] - now = __import__("time").time() - if not _rate_check(ip, now): - return self._send_json(429, {"errcode": "M_LIMIT_EXCEEDED", "error": "rate_limited"}) - - length = int(self.headers.get("content-length", "0") or "0") - raw = self.rfile.read(length) if length else b"{}" - try: - body = json.loads(raw.decode()) if raw else {} - if not isinstance(body, dict): - body = {} - except Exception: - body = {} - try: - admin_token = _mas_admin_access_token(now) - displayname = _generate_displayname() - - localpart = None - mas_user_id = None - for _ in range(5): - localpart = _generate_localpart() - status, mas_user_id = _create_user(admin_token, localpart) - if status == 201 and mas_user_id: - break - mas_user_id = None - if not mas_user_id or not localpart: - raise RuntimeError("add_user_failed") - - password = secrets.token_urlsafe(18) - if not _set_password(admin_token, mas_user_id, password): - raise RuntimeError("set_password_failed") - access_token, device_id = _login_password(localpart, password) - if not access_token: - raise RuntimeError("login_failed") - try: - _set_display_name(access_token, f"@{localpart}:{SERVER_NAME}", displayname) - except Exception: - pass - except Exception: - return self._send_json(502, {"errcode": "M_UNKNOWN", "error": "guest_provision_failed"}) - - resp = { - "user_id": f"@{localpart}:{SERVER_NAME}", - "access_token": access_token, - "device_id": device_id or "guest_device", - "home_server": SERVER_NAME, - } - return self._send_json(200, resp) - - def main(): - port = int(os.environ.get("PORT", "8080")) - HTTPServer(("0.0.0.0", port), Handler).serve_forever() - - if __name__ == "__main__": - main() diff --git a/services/comms/kustomization.yaml b/services/comms/kustomization.yaml index 6b69c1e..2008843 100644 --- a/services/comms/kustomization.yaml +++ b/services/comms/kustomization.yaml @@ -9,10 +9,8 @@ resources: - livekit-config.yaml - element-call-config.yaml - element-call-deployment.yaml - - guest-register-configmap.yaml - guest-register-deployment.yaml - guest-register-service.yaml - - atlasbot-configmap.yaml - atlasbot-deployment.yaml - wellknown.yaml - atlasbot-rbac.yaml @@ -45,6 +43,36 @@ patches: - path: synapse-deployment-strategy-patch.yaml configMapGenerator: + - name: matrix-guest-register + files: + - server.py=scripts/guest-register/server.py + options: + disableNameSuffixHash: true + - name: atlasbot + files: + - bot.py=scripts/atlasbot/bot.py + options: + disableNameSuffixHash: true + - name: othrys-synapse-redis-health + files: + - ping_readiness_local.sh=scripts/synapse/redis/ping_readiness_local.sh + - ping_liveness_local.sh=scripts/synapse/redis/ping_liveness_local.sh + - ping_readiness_master.sh=scripts/synapse/redis/ping_readiness_master.sh + - ping_liveness_master.sh=scripts/synapse/redis/ping_liveness_master.sh + - ping_readiness_local_and_master.sh=scripts/synapse/redis/ping_readiness_local_and_master.sh + - ping_liveness_local_and_master.sh=scripts/synapse/redis/ping_liveness_local_and_master.sh + options: + disableNameSuffixHash: true + - name: othrys-synapse-redis-scripts + files: + - start-master.sh=scripts/synapse/redis/start-master.sh + options: + disableNameSuffixHash: true + - name: othrys-synapse-matrix-synapse-scripts + files: + - signing-key.sh=scripts/synapse/signing-key.sh + options: + disableNameSuffixHash: true - name: atlas-kb files: - INDEX.md=knowledge/INDEX.md diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py new file mode 100644 index 0000000..e8bd1a8 --- /dev/null +++ b/services/comms/scripts/atlasbot/bot.py @@ -0,0 +1,622 @@ +import collections +import json +import os +import re +import ssl +import time +from typing import Any +from urllib import error, parse, request + +BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") +AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") +USER = os.environ["BOT_USER"] +PASSWORD = os.environ["BOT_PASS"] +ROOM_ALIAS = "#othrys:live.bstein.dev" + +OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") +MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") +API_KEY = os.environ.get("CHAT_API_KEY", "") + +KB_DIR = os.environ.get("KB_DIR", "") +VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428") + +BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") +SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") + +MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) +MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) + +TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) +HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") +STOPWORDS = { + "the", + "and", + "for", + "with", + "this", + "that", + "from", + "into", + "what", + "how", + "why", + "when", + "where", + "which", + "who", + "can", + "could", + "should", + "would", + "please", + "help", + "atlas", + "othrys", +} + +METRIC_HINT_WORDS = { + "health", + "status", + "down", + "slow", + "error", + "unknown_error", + "timeout", + "crash", + "crashloop", + "restart", + "restarts", + "pending", + "unreachable", + "latency", +} + +def _tokens(text: str) -> list[str]: + toks = [t.lower() for t in TOKEN_RE.findall(text or "")] + return [t for t in toks if t not in STOPWORDS and len(t) >= 2] + + +# Mention detection (Matrix rich mentions + plain @atlas). +MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] +MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] +MENTION_RE = re.compile( + r"(? str: + t = token.strip() + if not t: + return "" + if t.startswith("@") and ":" in t: + return t + t = t.lstrip("@") + if ":" in t: + return f"@{t}" + return f"@{t}:{SERVER_NAME}" + +MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} + +def is_mentioned(content: dict, body: str) -> bool: + if MENTION_RE.search(body or "") is not None: + return True + mentions = content.get("m.mentions", {}) + user_ids = mentions.get("user_ids", []) + if not isinstance(user_ids, list): + return False + return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) + + +# Matrix HTTP helper. +def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): + url = (base or BASE) + path + data = None + headers = {} + if body is not None: + data = json.dumps(body).encode() + headers["Content-Type"] = "application/json" + if token: + headers["Authorization"] = f"Bearer {token}" + r = request.Request(url, data=data, headers=headers, method=method) + with request.urlopen(r, timeout=timeout) as resp: + raw = resp.read() + return json.loads(raw.decode()) if raw else {} + +def login() -> str: + login_user = normalize_user_id(USER) + payload = { + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": login_user}, + "password": PASSWORD, + } + res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) + return res["access_token"] + +def resolve_alias(token: str, alias: str) -> str: + enc = parse.quote(alias) + res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) + return res["room_id"] + +def join_room(token: str, room: str): + req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) + +def send_msg(token: str, room: str, text: str): + path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" + req("POST", path, token, body={"msgtype": "m.text", "body": text}) + + +# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py). +KB = {"catalog": {}, "runbooks": []} +_HOST_INDEX: dict[str, list[dict]] = {} +_NAME_INDEX: set[str] = set() + +def _load_json_file(path: str) -> Any | None: + try: + with open(path, "rb") as f: + return json.loads(f.read().decode("utf-8")) + except Exception: + return None + +def load_kb(): + global KB, _HOST_INDEX, _NAME_INDEX + if not KB_DIR: + return + catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {} + runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or [] + KB = {"catalog": catalog, "runbooks": runbooks} + + host_index: dict[str, list[dict]] = collections.defaultdict(list) + for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []: + host = (ep.get("host") or "").lower() + if host: + host_index[host].append(ep) + _HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())} + + names: set[str] = set() + for s in catalog.get("services", []) if isinstance(catalog, dict) else []: + if isinstance(s, dict) and s.get("name"): + names.add(str(s["name"]).lower()) + for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []: + if isinstance(w, dict) and w.get("name"): + names.add(str(w["name"]).lower()) + _NAME_INDEX = names + +def kb_retrieve(query: str, *, limit: int = 3) -> str: + q = (query or "").strip() + if not q or not KB.get("runbooks"): + return "" + ql = q.lower() + q_tokens = _tokens(q) + if not q_tokens: + return "" + + scored: list[tuple[int, dict]] = [] + for doc in KB.get("runbooks", []): + if not isinstance(doc, dict): + continue + title = str(doc.get("title") or "") + body = str(doc.get("body") or "") + tags = doc.get("tags") or [] + entrypoints = doc.get("entrypoints") or [] + hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower() + score = 0 + for t in set(q_tokens): + if t in hay: + score += 3 if t in title.lower() else 1 + for h in entrypoints: + if isinstance(h, str) and h.lower() in ql: + score += 4 + if score: + scored.append((score, doc)) + + scored.sort(key=lambda x: x[0], reverse=True) + picked = [d for _, d in scored[:limit]] + if not picked: + return "" + + parts: list[str] = ["Atlas KB (retrieved):"] + used = 0 + for d in picked: + path = d.get("path") or "" + title = d.get("title") or path + body = (d.get("body") or "").strip() + snippet = body[:900].strip() + chunk = f"- {title} ({path})\n{snippet}" + if used + len(chunk) > MAX_KB_CHARS: + break + parts.append(chunk) + used += len(chunk) + return "\n".join(parts).strip() + +def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]: + q = (query or "").strip() + if not q or not KB.get("catalog"): + return "", [] + ql = q.lower() + hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")} + + # Also match by known workload/service names. + for t in _tokens(ql): + if t in _NAME_INDEX: + hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t} + + edges: list[tuple[str, str]] = [] + lines: list[str] = [] + for host in sorted(hosts): + for ep in _HOST_INDEX.get(host, []): + backend = ep.get("backend") or {} + ns = backend.get("namespace") or "" + svc = backend.get("service") or "" + path = ep.get("path") or "/" + if not svc: + continue + wk = backend.get("workloads") or [] + wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown" + lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}") + for w in wk: + if isinstance(w, dict) and w.get("name"): + edges.append((ns, str(w["name"]))) + if not lines: + return "", [] + return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges + + +# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot. +_K8S_TOKEN: str | None = None +_K8S_CTX: ssl.SSLContext | None = None + +def _k8s_context() -> ssl.SSLContext: + global _K8S_CTX + if _K8S_CTX is not None: + return _K8S_CTX + ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + ctx = ssl.create_default_context(cafile=ca_path) + _K8S_CTX = ctx + return ctx + +def _k8s_token() -> str: + global _K8S_TOKEN + if _K8S_TOKEN: + return _K8S_TOKEN + token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" + with open(token_path, "r", encoding="utf-8") as f: + _K8S_TOKEN = f.read().strip() + return _K8S_TOKEN + +def k8s_get(path: str, timeout: int = 8) -> dict: + host = os.environ.get("KUBERNETES_SERVICE_HOST") + port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443" + if not host: + raise RuntimeError("k8s host missing") + url = f"https://{host}:{port}{path}" + headers = {"Authorization": f"Bearer {_k8s_token()}"} + r = request.Request(url, headers=headers, method="GET") + with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp: + raw = resp.read() + return json.loads(raw.decode()) if raw else {} + +def k8s_pods(namespace: str) -> list[dict]: + data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500") + items = data.get("items") or [] + return items if isinstance(items, list) else [] + +def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str: + try: + pods = k8s_pods(namespace) + except Exception: + return "" + out: list[str] = [] + for p in pods: + md = p.get("metadata") or {} + st = p.get("status") or {} + name = md.get("name") or "" + if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes): + continue + phase = st.get("phase") or "?" + cs = st.get("containerStatuses") or [] + restarts = 0 + ready = 0 + total = 0 + reason = st.get("reason") or "" + for c in cs if isinstance(cs, list) else []: + if not isinstance(c, dict): + continue + total += 1 + restarts += int(c.get("restartCount") or 0) + if c.get("ready"): + ready += 1 + state = c.get("state") or {} + if not reason and isinstance(state, dict): + waiting = state.get("waiting") or {} + if isinstance(waiting, dict) and waiting.get("reason"): + reason = waiting.get("reason") + extra = f" ({reason})" if reason else "" + out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}") + return "\n".join(out[:20]) + +def flux_not_ready() -> str: + try: + data = k8s_get( + "/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200" + ) + except Exception: + return "" + items = data.get("items") or [] + bad: list[str] = [] + for it in items if isinstance(items, list) else []: + md = it.get("metadata") or {} + st = it.get("status") or {} + name = md.get("name") or "" + conds = st.get("conditions") or [] + ready = None + msg = "" + for c in conds if isinstance(conds, list) else []: + if isinstance(c, dict) and c.get("type") == "Ready": + ready = c.get("status") + msg = c.get("message") or "" + if ready not in ("True", True): + bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip()) + return "\n".join(bad[:10]) + + +# VictoriaMetrics (PromQL) helpers. +def vm_query(query: str, timeout: int = 8) -> dict | None: + try: + url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query}) + with request.urlopen(url, timeout=timeout) as resp: + return json.loads(resp.read().decode()) + except Exception: + return None + +def _vm_value_series(res: dict) -> list[dict]: + if not res or (res.get("status") != "success"): + return [] + data = res.get("data") or {} + result = data.get("result") or [] + return result if isinstance(result, list) else [] + +def vm_render_result(res: dict | None, limit: int = 12) -> str: + if not res: + return "" + series = _vm_value_series(res) + if not series: + return "" + out: list[str] = [] + for r in series[:limit]: + if not isinstance(r, dict): + continue + metric = r.get("metric") or {} + value = r.get("value") or [] + val = value[1] if isinstance(value, list) and len(value) > 1 else "" + # Prefer common labels if present. + label_parts = [] + for k in ("namespace", "pod", "container", "node", "instance", "job", "phase"): + if isinstance(metric, dict) and metric.get(k): + label_parts.append(f"{k}={metric.get(k)}") + if not label_parts and isinstance(metric, dict): + for k in sorted(metric.keys()): + if k.startswith("__"): + continue + label_parts.append(f"{k}={metric.get(k)}") + if len(label_parts) >= 4: + break + labels = ", ".join(label_parts) if label_parts else "series" + out.append(f"- {labels}: {val}") + return "\n".join(out) + +def vm_top_restarts(hours: int = 1) -> str: + q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))" + res = vm_query(q) + if not res or (res.get("status") != "success"): + return "" + out: list[str] = [] + for r in (res.get("data") or {}).get("result") or []: + if not isinstance(r, dict): + continue + m = r.get("metric") or {} + v = r.get("value") or [] + ns = (m.get("namespace") or "").strip() + pod = (m.get("pod") or "").strip() + val = v[1] if isinstance(v, list) and len(v) > 1 else "" + if pod: + out.append(f"- restarts({hours}h): {ns}/{pod} = {val}") + return "\n".join(out) + +def vm_cluster_snapshot() -> str: + parts: list[str] = [] + # Node readiness (kube-state-metrics). + ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="true"})') + not_ready = vm_query('sum(kube_node_status_condition{condition="Ready",status="false"})') + if ready and not_ready: + try: + r = _vm_value_series(ready)[0]["value"][1] + nr = _vm_value_series(not_ready)[0]["value"][1] + parts.append(f"- nodes ready: {r} (not ready: {nr})") + except Exception: + pass + + phases = vm_query("sum by (phase) (kube_pod_status_phase)") + pr = vm_render_result(phases, limit=8) + if pr: + parts.append("Pod phases:") + parts.append(pr) + return "\n".join(parts).strip() + + +# Conversation state. +history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript) + +def key_for(room_id: str, sender: str, is_dm: bool): + return (room_id, None) if is_dm else (room_id, sender) + +def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str: + parts: list[str] = [] + + kb = kb_retrieve(prompt) + if kb: + parts.append(kb) + + endpoints, edges = catalog_hints(prompt) + if endpoints: + parts.append(endpoints) + + if allow_tools: + # Scope pod summaries to relevant namespaces/workloads when possible. + prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set) + for ns, name in (targets or []) + (edges or []): + if ns and name: + prefixes_by_ns[ns].add(name) + pod_lines: list[str] = [] + for ns in sorted(prefixes_by_ns.keys()): + summary = summarize_pods(ns, prefixes_by_ns[ns]) + if summary: + pod_lines.append(f"Pods (live):\n{summary}") + if pod_lines: + parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS]) + + flux_bad = flux_not_ready() + if flux_bad: + parts.append("Flux (not ready):\n" + flux_bad) + + p_l = (prompt or "").lower() + if any(w in p_l for w in METRIC_HINT_WORDS): + restarts = vm_top_restarts(1) + if restarts: + parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts) + snap = vm_cluster_snapshot() + if snap: + parts.append("VictoriaMetrics (cluster snapshot):\n" + snap) + + return "\n\n".join([p for p in parts if p]).strip() + +def ollama_reply(hist_key, prompt: str, *, context: str) -> str: + try: + system = ( + "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " + "Be helpful, direct, and concise. " + "Prefer answering with exact repo paths and Kubernetes resource names. " + "Never include or request secret values." + ) + transcript_parts = [system] + if context: + transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS]) + transcript_parts.extend(history[hist_key][-24:]) + transcript_parts.append(f"User: {prompt}") + transcript = "\n".join(transcript_parts) + + payload = {"model": MODEL, "message": transcript} + headers = {"Content-Type": "application/json"} + if API_KEY: + headers["x-api-key"] = API_KEY + r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) + with request.urlopen(r, timeout=20) as resp: + data = json.loads(resp.read().decode()) + reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help." + history[hist_key].append(f"Atlas: {reply}") + return reply + except Exception: + return "I’m here — but I couldn’t reach the model backend." + +def sync_loop(token: str, room_id: str): + since = None + try: + res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) + since = res.get("next_batch") + except Exception: + pass + + while True: + params = {"timeout": 30000} + if since: + params["since"] = since + query = parse.urlencode(params) + try: + res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) + except Exception: + time.sleep(5) + continue + since = res.get("next_batch", since) + + # invites + for rid, data in res.get("rooms", {}).get("invite", {}).items(): + try: + join_room(token, rid) + except Exception: + pass + + # messages + for rid, data in res.get("rooms", {}).get("join", {}).items(): + timeline = data.get("timeline", {}).get("events", []) + joined_count = data.get("summary", {}).get("m.joined_member_count") + is_dm = joined_count is not None and joined_count <= 2 + + for ev in timeline: + if ev.get("type") != "m.room.message": + continue + content = ev.get("content", {}) + body = (content.get("body", "") or "").strip() + if not body: + continue + sender = ev.get("sender", "") + if sender == f"@{USER}:live.bstein.dev": + continue + + mentioned = is_mentioned(content, body) + hist_key = key_for(rid, sender, is_dm) + history[hist_key].append(f"{sender}: {body}") + history[hist_key] = history[hist_key][-80:] + + if not (is_dm or mentioned): + continue + + # Only do live cluster/metrics introspection in DMs. + allow_tools = is_dm + + promql = "" + if allow_tools: + m = re.match(r"(?is)^\\s*promql\\s*(?:\\:|\\s)\\s*(.+?)\\s*$", body) + if m: + promql = m.group(1).strip() + + # Attempt to scope tools to the most likely workloads when hostnames are mentioned. + targets: list[tuple[str, str]] = [] + for m in HOST_RE.finditer(body.lower()): + host = m.group(1).lower() + for ep in _HOST_INDEX.get(host, []): + backend = ep.get("backend") or {} + ns = backend.get("namespace") or "" + for w in backend.get("workloads") or []: + if isinstance(w, dict) and w.get("name"): + targets.append((ns, str(w["name"]))) + + context = build_context(body, allow_tools=allow_tools, targets=targets) + if allow_tools and promql: + res = vm_query(promql, timeout=20) + rendered = vm_render_result(res, limit=15) or "(no results)" + extra = "VictoriaMetrics (PromQL result):\n" + rendered + context = (context + "\n\n" + extra).strip() if context else extra + reply = ollama_reply(hist_key, body, context=context) + send_msg(token, rid, reply) + +def login_with_retry(): + last_err = None + for attempt in range(10): + try: + return login() + except Exception as exc: # noqa: BLE001 + last_err = exc + time.sleep(min(30, 2 ** attempt)) + raise last_err + +def main(): + load_kb() + token = login_with_retry() + try: + room_id = resolve_alias(token, ROOM_ALIAS) + join_room(token, room_id) + except Exception: + room_id = None + sync_loop(token, room_id) + +if __name__ == "__main__": + main() diff --git a/services/comms/scripts/guest-register/server.py b/services/comms/scripts/guest-register/server.py new file mode 100644 index 0000000..0e1fb4c --- /dev/null +++ b/services/comms/scripts/guest-register/server.py @@ -0,0 +1,264 @@ +import base64 +import json +import os +import random +import secrets +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib import error, parse, request + +MAS_BASE = os.environ.get("MAS_BASE", "http://matrix-authentication-service:8080").rstrip("/") +MAS_ADMIN_API_BASE = os.environ.get("MAS_ADMIN_API_BASE", "http://matrix-authentication-service:8081/api/admin/v1").rstrip("/") +SYNAPSE_BASE = os.environ.get("SYNAPSE_BASE", "http://othrys-synapse-matrix-synapse:8008").rstrip("/") +SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") + +MAS_ADMIN_CLIENT_ID = os.environ["MAS_ADMIN_CLIENT_ID"] +MAS_ADMIN_CLIENT_SECRET_FILE = os.environ.get("MAS_ADMIN_CLIENT_SECRET_FILE", "/etc/mas/admin-client/client_secret") +MAS_ADMIN_SCOPE = os.environ.get("MAS_ADMIN_SCOPE", "urn:mas:admin") +RATE_WINDOW_SEC = int(os.environ.get("RATE_WINDOW_SEC", "60")) +RATE_MAX = int(os.environ.get("RATE_MAX", "30")) +_rate = {} # ip -> [window_start, count] + +ADJ = [ + "brisk","calm","eager","gentle","merry","nifty","rapid","sunny","witty","zesty", + "amber","bold","bright","crisp","daring","frosty","glad","jolly","lively","mellow", + "quiet","ripe","serene","spry","tidy","vivid","warm","wild","clever","kind", +] +NOUN = [ + "otter","falcon","comet","ember","grove","harbor","meadow","raven","river","summit", + "breeze","cedar","cinder","cove","delta","forest","glade","lark","marsh","peak", + "pine","quartz","reef","ridge","sable","sage","shore","thunder","vale","zephyr", +] + +def _json(method, url, *, headers=None, body=None, timeout=20): + hdrs = {"Content-Type": "application/json"} + if headers: + hdrs.update(headers) + data = None + if body is not None: + data = json.dumps(body).encode() + req = request.Request(url, data=data, headers=hdrs, method=method) + try: + with request.urlopen(req, timeout=timeout) as resp: + raw = resp.read() + payload = json.loads(raw.decode()) if raw else {} + return resp.status, payload + except error.HTTPError as e: + raw = e.read() + try: + payload = json.loads(raw.decode()) if raw else {} + except Exception: + payload = {} + return e.code, payload + +def _form(method, url, *, headers=None, fields=None, timeout=20): + hdrs = {"Content-Type": "application/x-www-form-urlencoded"} + if headers: + hdrs.update(headers) + data = parse.urlencode(fields or {}).encode() + req = request.Request(url, data=data, headers=hdrs, method=method) + try: + with request.urlopen(req, timeout=timeout) as resp: + raw = resp.read() + payload = json.loads(raw.decode()) if raw else {} + return resp.status, payload + except error.HTTPError as e: + raw = e.read() + try: + payload = json.loads(raw.decode()) if raw else {} + except Exception: + payload = {} + return e.code, payload + +_admin_token = None +_admin_token_at = 0.0 + +def _mas_admin_access_token(now): + global _admin_token, _admin_token_at + if _admin_token and (now - _admin_token_at) < 300: + return _admin_token + + with open(MAS_ADMIN_CLIENT_SECRET_FILE, encoding="utf-8") as fh: + client_secret = fh.read().strip() + basic = base64.b64encode(f"{MAS_ADMIN_CLIENT_ID}:{client_secret}".encode()).decode() + + status, payload = _form( + "POST", + f"{MAS_BASE}/oauth2/token", + headers={"Authorization": f"Basic {basic}"}, + fields={"grant_type": "client_credentials", "scope": MAS_ADMIN_SCOPE}, + timeout=20, + ) + if status != 200 or "access_token" not in payload: + raise RuntimeError("mas_admin_token_failed") + + _admin_token = payload["access_token"] + _admin_token_at = now + return _admin_token + +def _generate_localpart(): + return "guest-" + secrets.token_hex(6) + +def _generate_displayname(): + return f"{random.choice(ADJ)}-{random.choice(NOUN)}" + +def _admin_api(admin_token, method, path, body=None): + return _json( + method, + f"{MAS_ADMIN_API_BASE}{path}", + headers={"Authorization": f"Bearer {admin_token}"}, + body=body, + timeout=20, + ) + +def _create_user(admin_token, username): + status, payload = _admin_api(admin_token, "POST", "/users", {"username": username}) + if status != 201: + return status, None + user = payload.get("data") or {} + return status, user.get("id") + +def _set_password(admin_token, user_id, password): + status, _payload = _admin_api( + admin_token, + "POST", + f"/users/{parse.quote(user_id)}/set-password", + {"password": password}, + ) + return status in (200, 204) + +def _login_password(username, password): + payload = { + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": f"@{username}:{SERVER_NAME}"}, + "password": password, + } + status, data = _json( + "POST", + f"{MAS_BASE}/_matrix/client/v3/login", + body=payload, + timeout=20, + ) + if status != 200: + return None, None + return data.get("access_token"), data.get("device_id") + +def _set_display_name(access_token, user_id, displayname): + _json( + "PUT", + f"{SYNAPSE_BASE}/_matrix/client/v3/profile/{parse.quote(user_id, safe='')}/displayname", + headers={"Authorization": f"Bearer {access_token}"}, + body={"displayname": displayname}, + timeout=20, + ) + +def _rate_check(ip, now): + win, cnt = _rate.get(ip, (now, 0)) + if now - win > RATE_WINDOW_SEC: + _rate[ip] = (now, 1) + return True + if cnt >= RATE_MAX: + return False + _rate[ip] = (win, cnt + 1) + return True + +class Handler(BaseHTTPRequestHandler): + server_version = "matrix-guest-register" + + def _send_json(self, code, payload): + body = json.dumps(payload).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_OPTIONS(self): # noqa: N802 + self.send_response(204) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With") + self.end_headers() + + def do_GET(self): # noqa: N802 + parsed = parse.urlparse(self.path) + if parsed.path in ("/healthz", "/"): + return self._send_json(200, {"ok": True}) + if parsed.path in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"): + return self._send_json(200, {"flows": [{"stages": []}]}) + return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) + + def do_POST(self): # noqa: N802 + parsed = parse.urlparse(self.path) + if parsed.path not in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"): + return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"}) + + qs = parse.parse_qs(parsed.query) + kind = (qs.get("kind") or ["user"])[0] + if kind != "guest": + return self._send_json( + 403, + { + "errcode": "M_FORBIDDEN", + "error": "Registration is disabled; use https://bstein.dev/request-access for accounts.", + }, + ) + + xfwd = self.headers.get("x-forwarded-for", "") + ip = (xfwd.split(",")[0].strip() if xfwd else "") or self.client_address[0] + now = __import__("time").time() + if not _rate_check(ip, now): + return self._send_json(429, {"errcode": "M_LIMIT_EXCEEDED", "error": "rate_limited"}) + + length = int(self.headers.get("content-length", "0") or "0") + raw = self.rfile.read(length) if length else b"{}" + try: + body = json.loads(raw.decode()) if raw else {} + if not isinstance(body, dict): + body = {} + except Exception: + body = {} + try: + admin_token = _mas_admin_access_token(now) + displayname = _generate_displayname() + + localpart = None + mas_user_id = None + for _ in range(5): + localpart = _generate_localpart() + status, mas_user_id = _create_user(admin_token, localpart) + if status == 201 and mas_user_id: + break + mas_user_id = None + if not mas_user_id or not localpart: + raise RuntimeError("add_user_failed") + + password = secrets.token_urlsafe(18) + if not _set_password(admin_token, mas_user_id, password): + raise RuntimeError("set_password_failed") + access_token, device_id = _login_password(localpart, password) + if not access_token: + raise RuntimeError("login_failed") + try: + _set_display_name(access_token, f"@{localpart}:{SERVER_NAME}", displayname) + except Exception: + pass + except Exception: + return self._send_json(502, {"errcode": "M_UNKNOWN", "error": "guest_provision_failed"}) + + resp = { + "user_id": f"@{localpart}:{SERVER_NAME}", + "access_token": access_token, + "device_id": device_id or "guest_device", + "home_server": SERVER_NAME, + } + return self._send_json(200, resp) + +def main(): + port = int(os.environ.get("PORT", "8080")) + HTTPServer(("0.0.0.0", port), Handler).serve_forever() + +if __name__ == "__main__": + main() diff --git a/services/comms/scripts/synapse/redis/ping_liveness_local.sh b/services/comms/scripts/synapse/redis/ping_liveness_local.sh new file mode 100644 index 0000000..964e552 --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_liveness_local.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +[[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" +[[ -n "$REDIS_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_PASSWORD" +response=$( + timeout -s 15 $1 \ + redis-cli \ + -h localhost \ + -p $REDIS_PORT \ + ping +) +if [ "$?" -eq "124" ]; then + echo "Timed out" + exit 1 +fi +responseFirstWord=$(echo $response | head -n1 | awk '{print $1;}') +if [ "$response" != "PONG" ] && [ "$responseFirstWord" != "LOADING" ] && [ "$responseFirstWord" != "MASTERDOWN" ]; then + echo "$response" + exit 1 +fi \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/ping_liveness_local_and_master.sh b/services/comms/scripts/synapse/redis/ping_liveness_local_and_master.sh new file mode 100644 index 0000000..c343f82 --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_liveness_local_and_master.sh @@ -0,0 +1,5 @@ +script_dir="$(dirname "$0")" +exit_status=0 +"$script_dir/ping_liveness_local.sh" $1 || exit_status=$? +"$script_dir/ping_liveness_master.sh" $1 || exit_status=$? +exit $exit_status \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/ping_liveness_master.sh b/services/comms/scripts/synapse/redis/ping_liveness_master.sh new file mode 100644 index 0000000..849982a --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_liveness_master.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +[[ -f $REDIS_MASTER_PASSWORD_FILE ]] && export REDIS_MASTER_PASSWORD="$(< "${REDIS_MASTER_PASSWORD_FILE}")" +[[ -n "$REDIS_MASTER_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_MASTER_PASSWORD" +response=$( + timeout -s 15 $1 \ + redis-cli \ + -h $REDIS_MASTER_HOST \ + -p $REDIS_MASTER_PORT_NUMBER \ + ping +) +if [ "$?" -eq "124" ]; then + echo "Timed out" + exit 1 +fi +responseFirstWord=$(echo $response | head -n1 | awk '{print $1;}') +if [ "$response" != "PONG" ] && [ "$responseFirstWord" != "LOADING" ]; then + echo "$response" + exit 1 +fi \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/ping_readiness_local.sh b/services/comms/scripts/synapse/redis/ping_readiness_local.sh new file mode 100644 index 0000000..080273f --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_readiness_local.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +[[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" +[[ -n "$REDIS_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_PASSWORD" +response=$( + timeout -s 15 $1 \ + redis-cli \ + -h localhost \ + -p $REDIS_PORT \ + ping +) +if [ "$?" -eq "124" ]; then + echo "Timed out" + exit 1 +fi +if [ "$response" != "PONG" ]; then + echo "$response" + exit 1 +fi \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/ping_readiness_local_and_master.sh b/services/comms/scripts/synapse/redis/ping_readiness_local_and_master.sh new file mode 100644 index 0000000..0ba63cc --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_readiness_local_and_master.sh @@ -0,0 +1,5 @@ +script_dir="$(dirname "$0")" +exit_status=0 +"$script_dir/ping_readiness_local.sh" $1 || exit_status=$? +"$script_dir/ping_readiness_master.sh" $1 || exit_status=$? +exit $exit_status \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/ping_readiness_master.sh b/services/comms/scripts/synapse/redis/ping_readiness_master.sh new file mode 100644 index 0000000..95ced76 --- /dev/null +++ b/services/comms/scripts/synapse/redis/ping_readiness_master.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +[[ -f $REDIS_MASTER_PASSWORD_FILE ]] && export REDIS_MASTER_PASSWORD="$(< "${REDIS_MASTER_PASSWORD_FILE}")" +[[ -n "$REDIS_MASTER_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_MASTER_PASSWORD" +response=$( + timeout -s 15 $1 \ + redis-cli \ + -h $REDIS_MASTER_HOST \ + -p $REDIS_MASTER_PORT_NUMBER \ + ping +) +if [ "$?" -eq "124" ]; then + echo "Timed out" + exit 1 +fi +if [ "$response" != "PONG" ]; then + echo "$response" + exit 1 +fi \ No newline at end of file diff --git a/services/comms/scripts/synapse/redis/start-master.sh b/services/comms/scripts/synapse/redis/start-master.sh new file mode 100644 index 0000000..4284839 --- /dev/null +++ b/services/comms/scripts/synapse/redis/start-master.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +[[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" +if [[ -f /opt/bitnami/redis/mounted-etc/master.conf ]];then + cp /opt/bitnami/redis/mounted-etc/master.conf /opt/bitnami/redis/etc/master.conf +fi +if [[ -f /opt/bitnami/redis/mounted-etc/redis.conf ]];then + cp /opt/bitnami/redis/mounted-etc/redis.conf /opt/bitnami/redis/etc/redis.conf +fi +ARGS=("--port" "${REDIS_PORT}") +ARGS+=("--requirepass" "${REDIS_PASSWORD}") +ARGS+=("--masterauth" "${REDIS_PASSWORD}") +ARGS+=("--include" "/opt/bitnami/redis/etc/redis.conf") +ARGS+=("--include" "/opt/bitnami/redis/etc/master.conf") +exec redis-server "${ARGS[@]}" diff --git a/services/comms/scripts/synapse/signing-key.sh b/services/comms/scripts/synapse/signing-key.sh new file mode 100644 index 0000000..5d1b941 --- /dev/null +++ b/services/comms/scripts/synapse/signing-key.sh @@ -0,0 +1,41 @@ +#!/bin/sh + +set -eu + +check_key() { + set +e + + echo "Checking for existing signing key..." + key="$(kubectl get secret "$SECRET_NAME" -o jsonpath="{.data['signing\.key']}" 2> /dev/null)" + [ $? -ne 0 ] && return 1 + [ -z "$key" ] && return 2 + return 0 +} + +create_key() { + echo "Waiting for new signing key to be generated..." + begin=$(date +%s) + end=$((begin + 300)) # 5 minutes + while true; do + [ -f /synapse/keys/signing.key ] && return 0 + [ "$(date +%s)" -gt $end ] && return 1 + sleep 5 + done +} + +store_key() { + echo "Storing signing key in Kubernetes secret..." + kubectl patch secret "$SECRET_NAME" -p "{\"data\":{\"signing.key\":\"$(base64 /synapse/keys/signing.key | tr -d '\n')\"}}" +} + +if check_key; then + echo "Key already in place, exiting." + exit +fi + +if ! create_key; then + echo "Timed out waiting for a signing key to appear." + exit 1 +fi + +store_key diff --git a/services/comms/synapse-rendered.yaml b/services/comms/synapse-rendered.yaml index 097189a..83fce79 100644 --- a/services/comms/synapse-rendered.yaml +++ b/services/comms/synapse-rendered.yaml @@ -82,140 +82,6 @@ data: rename-command FLUSHALL "" # End of replica configuration --- -# Source: matrix-synapse/charts/redis/templates/health-configmap.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: othrys-synapse-redis-health - labels: - app.kubernetes.io/instance: othrys-synapse - app.kubernetes.io/managed-by: Helm - app.kubernetes.io/name: redis - helm.sh/chart: redis-17.17.1 -data: - ping_readiness_local.sh: |- - #!/bin/bash - - [[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" - [[ -n "$REDIS_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_PASSWORD" - response=$( - timeout -s 15 $1 \ - redis-cli \ - -h localhost \ - -p $REDIS_PORT \ - ping - ) - if [ "$?" -eq "124" ]; then - echo "Timed out" - exit 1 - fi - if [ "$response" != "PONG" ]; then - echo "$response" - exit 1 - fi - ping_liveness_local.sh: |- - #!/bin/bash - - [[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" - [[ -n "$REDIS_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_PASSWORD" - response=$( - timeout -s 15 $1 \ - redis-cli \ - -h localhost \ - -p $REDIS_PORT \ - ping - ) - if [ "$?" -eq "124" ]; then - echo "Timed out" - exit 1 - fi - responseFirstWord=$(echo $response | head -n1 | awk '{print $1;}') - if [ "$response" != "PONG" ] && [ "$responseFirstWord" != "LOADING" ] && [ "$responseFirstWord" != "MASTERDOWN" ]; then - echo "$response" - exit 1 - fi - ping_readiness_master.sh: |- - #!/bin/bash - - [[ -f $REDIS_MASTER_PASSWORD_FILE ]] && export REDIS_MASTER_PASSWORD="$(< "${REDIS_MASTER_PASSWORD_FILE}")" - [[ -n "$REDIS_MASTER_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_MASTER_PASSWORD" - response=$( - timeout -s 15 $1 \ - redis-cli \ - -h $REDIS_MASTER_HOST \ - -p $REDIS_MASTER_PORT_NUMBER \ - ping - ) - if [ "$?" -eq "124" ]; then - echo "Timed out" - exit 1 - fi - if [ "$response" != "PONG" ]; then - echo "$response" - exit 1 - fi - ping_liveness_master.sh: |- - #!/bin/bash - - [[ -f $REDIS_MASTER_PASSWORD_FILE ]] && export REDIS_MASTER_PASSWORD="$(< "${REDIS_MASTER_PASSWORD_FILE}")" - [[ -n "$REDIS_MASTER_PASSWORD" ]] && export REDISCLI_AUTH="$REDIS_MASTER_PASSWORD" - response=$( - timeout -s 15 $1 \ - redis-cli \ - -h $REDIS_MASTER_HOST \ - -p $REDIS_MASTER_PORT_NUMBER \ - ping - ) - if [ "$?" -eq "124" ]; then - echo "Timed out" - exit 1 - fi - responseFirstWord=$(echo $response | head -n1 | awk '{print $1;}') - if [ "$response" != "PONG" ] && [ "$responseFirstWord" != "LOADING" ]; then - echo "$response" - exit 1 - fi - ping_readiness_local_and_master.sh: |- - script_dir="$(dirname "$0")" - exit_status=0 - "$script_dir/ping_readiness_local.sh" $1 || exit_status=$? - "$script_dir/ping_readiness_master.sh" $1 || exit_status=$? - exit $exit_status - ping_liveness_local_and_master.sh: |- - script_dir="$(dirname "$0")" - exit_status=0 - "$script_dir/ping_liveness_local.sh" $1 || exit_status=$? - "$script_dir/ping_liveness_master.sh" $1 || exit_status=$? - exit $exit_status ---- -# Source: matrix-synapse/charts/redis/templates/scripts-configmap.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: othrys-synapse-redis-scripts - labels: - app.kubernetes.io/instance: othrys-synapse - app.kubernetes.io/managed-by: Helm - app.kubernetes.io/name: redis - helm.sh/chart: redis-17.17.1 -data: - start-master.sh: | - #!/bin/bash - - [[ -f $REDIS_PASSWORD_FILE ]] && export REDIS_PASSWORD="$(< "${REDIS_PASSWORD_FILE}")" - if [[ -f /opt/bitnami/redis/mounted-etc/master.conf ]];then - cp /opt/bitnami/redis/mounted-etc/master.conf /opt/bitnami/redis/etc/master.conf - fi - if [[ -f /opt/bitnami/redis/mounted-etc/redis.conf ]];then - cp /opt/bitnami/redis/mounted-etc/redis.conf /opt/bitnami/redis/etc/redis.conf - fi - ARGS=("--port" "${REDIS_PORT}") - ARGS+=("--requirepass" "${REDIS_PASSWORD}") - ARGS+=("--masterauth" "${REDIS_PASSWORD}") - ARGS+=("--include" "/opt/bitnami/redis/etc/redis.conf") - ARGS+=("--include" "/opt/bitnami/redis/etc/master.conf") - exec redis-server "${ARGS[@]}" ---- # Source: matrix-synapse/templates/configuration.yaml apiVersion: v1 kind: ConfigMap @@ -870,64 +736,6 @@ metadata: app.kubernetes.io/component: signingkey-job --- # Source: matrix-synapse/templates/signing-key-job.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: othrys-synapse-matrix-synapse-scripts - labels: - helm.sh/chart: matrix-synapse-3.12.17 - app.kubernetes.io/name: matrix-synapse - app.kubernetes.io/instance: othrys-synapse - app.kubernetes.io/version: "1.144.0" - app.kubernetes.io/managed-by: Helm - annotations: - helm.sh/hook: pre-install - helm.sh/hook-delete-policy: hook-succeeded -data: - signing-key.sh: | - #!/bin/sh - - set -eu - - check_key() { - set +e - - echo "Checking for existing signing key..." - key="$(kubectl get secret "$SECRET_NAME" -o jsonpath="{.data['signing\.key']}" 2> /dev/null)" - [ $? -ne 0 ] && return 1 - [ -z "$key" ] && return 2 - return 0 - } - - create_key() { - echo "Waiting for new signing key to be generated..." - begin=$(date +%s) - end=$((begin + 300)) # 5 minutes - while true; do - [ -f /synapse/keys/signing.key ] && return 0 - [ "$(date +%s)" -gt $end ] && return 1 - sleep 5 - done - } - - store_key() { - echo "Storing signing key in Kubernetes secret..." - kubectl patch secret "$SECRET_NAME" -p "{\"data\":{\"signing.key\":\"$(base64 /synapse/keys/signing.key | tr -d '\n')\"}}" - } - - if check_key; then - echo "Key already in place, exiting." - exit - fi - - if ! create_key; then - echo "Timed out waiting for a signing key to appear." - exit 1 - fi - - store_key ---- -# Source: matrix-synapse/templates/signing-key-job.yaml apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: diff --git a/services/jenkins/configmap-init-scripts.yaml b/services/jenkins/configmap-init-scripts.yaml deleted file mode 100644 index ed87720..0000000 --- a/services/jenkins/configmap-init-scripts.yaml +++ /dev/null @@ -1,24 +0,0 @@ -# services/jenkins/configmap-init-scripts.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: jenkins-init-scripts - namespace: jenkins -data: - theme.groovy: | - import jenkins.model.Jenkins - import org.codefirst.SimpleThemeDecorator - - def instance = Jenkins.get() - def decorators = instance.getExtensionList(SimpleThemeDecorator.class) - - if (decorators?.size() > 0) { - def theme = decorators[0] - theme.setCssUrl("https://jenkins-contrib-themes.github.io/jenkins-material-theme/dist/material-ocean.css") - theme.setJsUrl("") - theme.setTheme("") - instance.save() - println("Applied simple-theme-plugin dark theme") - } else { - println("simple-theme-plugin not installed; skipping theme configuration") - } diff --git a/services/jenkins/kustomization.yaml b/services/jenkins/kustomization.yaml index c183a4f..acb6fb4 100644 --- a/services/jenkins/kustomization.yaml +++ b/services/jenkins/kustomization.yaml @@ -7,8 +7,15 @@ resources: - serviceaccount.yaml - pvc.yaml - configmap-jcasc.yaml - - configmap-init-scripts.yaml - configmap-plugins.yaml - deployment.yaml - service.yaml - ingress.yaml + +configMapGenerator: + - name: jenkins-init-scripts + namespace: jenkins + files: + - theme.groovy=scripts/theme.groovy + options: + disableNameSuffixHash: true diff --git a/services/jenkins/scripts/theme.groovy b/services/jenkins/scripts/theme.groovy new file mode 100644 index 0000000..cf171f7 --- /dev/null +++ b/services/jenkins/scripts/theme.groovy @@ -0,0 +1,16 @@ +import jenkins.model.Jenkins +import org.codefirst.SimpleThemeDecorator + +def instance = Jenkins.get() +def decorators = instance.getExtensionList(SimpleThemeDecorator.class) + +if (decorators?.size() > 0) { + def theme = decorators[0] + theme.setCssUrl("https://jenkins-contrib-themes.github.io/jenkins-material-theme/dist/material-ocean.css") + theme.setJsUrl("") + theme.setTheme("") + instance.save() + println("Applied simple-theme-plugin dark theme") +} else { + println("simple-theme-plugin not installed; skipping theme configuration") +} diff --git a/services/logging/kustomization.yaml b/services/logging/kustomization.yaml index 3c40da2..fe010f6 100644 --- a/services/logging/kustomization.yaml +++ b/services/logging/kustomization.yaml @@ -6,11 +6,8 @@ resources: - opensearch-dashboards-objects.yaml - opensearch-observability-objects.yaml - node-log-rotation-serviceaccount.yaml - - node-log-rotation-script.yaml - node-image-gc-rpi4-serviceaccount.yaml - - node-image-gc-rpi4-script.yaml - node-image-prune-rpi5-serviceaccount.yaml - - node-image-prune-rpi5-script.yaml - opensearch-pvc.yaml - opensearch-helmrelease.yaml - opensearch-dashboards-helmrelease.yaml @@ -26,3 +23,35 @@ resources: - node-image-prune-rpi5-daemonset.yaml - oauth2-proxy.yaml - ingress.yaml + +configMapGenerator: + - name: node-log-rotation-script + namespace: logging + files: + - node_log_rotation.sh=scripts/node_log_rotation.sh + options: + disableNameSuffixHash: true + - name: node-image-gc-rpi4-script + namespace: logging + files: + - node_image_gc_rpi4.sh=scripts/node_image_gc_rpi4.sh + options: + disableNameSuffixHash: true + - name: node-image-prune-rpi5-script + namespace: logging + files: + - node_image_prune_rpi5.sh=scripts/node_image_prune_rpi5.sh + options: + disableNameSuffixHash: true + - name: opensearch-prune-script + namespace: logging + files: + - prune.py=scripts/opensearch_prune.py + options: + disableNameSuffixHash: true + - name: opensearch-observability-script + namespace: logging + files: + - seed.py=scripts/opensearch_observability_seed.py + options: + disableNameSuffixHash: true diff --git a/services/logging/node-image-gc-rpi4-script.yaml b/services/logging/node-image-gc-rpi4-script.yaml deleted file mode 100644 index 44c4c16..0000000 --- a/services/logging/node-image-gc-rpi4-script.yaml +++ /dev/null @@ -1,44 +0,0 @@ -# services/logging/node-image-gc-rpi4-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: node-image-gc-rpi4-script - namespace: logging -data: - node_image_gc_rpi4.sh: | - #!/usr/bin/env bash - set -euo pipefail - - changed=0 - k3s_changed=0 - k3s_agent_changed=0 - - k3s_dropin="/host/etc/systemd/system/k3s.service.d/98-image-gc.conf" - k3s_agent_dropin="/host/etc/systemd/system/k3s-agent.service.d/98-image-gc.conf" - - if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_dropin}" ]; then - mkdir -p "$(dirname "${k3s_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_dropin}" - changed=1 - k3s_changed=1 - fi - - if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_dropin}" ]; then - mkdir -p "$(dirname "${k3s_agent_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_agent_dropin}" - changed=1 - k3s_agent_changed=1 - fi - - if [ "${changed}" -eq 1 ]; then - sleep "$(( (RANDOM % 300) + 10 ))" - chroot /host /bin/systemctl daemon-reload - if [ "${k3s_changed}" -eq 1 ]; then - chroot /host /bin/systemctl restart k3s - fi - if [ "${k3s_agent_changed}" -eq 1 ]; then - chroot /host /bin/systemctl restart k3s-agent - fi - fi - - sleep infinity diff --git a/services/logging/node-image-prune-rpi5-script.yaml b/services/logging/node-image-prune-rpi5-script.yaml deleted file mode 100644 index ae79ce3..0000000 --- a/services/logging/node-image-prune-rpi5-script.yaml +++ /dev/null @@ -1,34 +0,0 @@ -# services/logging/node-image-prune-rpi5-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: node-image-prune-rpi5-script - namespace: logging -data: - node_image_prune_rpi5.sh: | - #!/usr/bin/env bash - set -euo pipefail - - threshold=70 - - sleep "$(( (RANDOM % 300) + 10 ))" - - while true; do - usage=$(df -P /host | awk 'NR==2 {gsub(/%/,"",$5); print $5}') - if [ -z "${usage}" ]; then - sleep 1800 - continue - fi - - if [ "${usage}" -ge "${threshold}" ]; then - chroot /host /bin/sh -c ' - if command -v crictl >/dev/null 2>&1; then - crictl --runtime-endpoint=unix:///run/k3s/containerd/containerd.sock rmi --prune || true - elif [ -x /usr/local/bin/crictl ]; then - /usr/local/bin/crictl --runtime-endpoint=unix:///run/k3s/containerd/containerd.sock rmi --prune || true - fi - ' - fi - - sleep 21600 - done diff --git a/services/logging/node-log-rotation-script.yaml b/services/logging/node-log-rotation-script.yaml deleted file mode 100644 index 7926e0d..0000000 --- a/services/logging/node-log-rotation-script.yaml +++ /dev/null @@ -1,72 +0,0 @@ -# services/logging/node-log-rotation-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: node-log-rotation-script - namespace: logging -data: - node_log_rotation.sh: | - #!/usr/bin/env bash - set -euo pipefail - - changed=0 - journald_changed=0 - k3s_changed=0 - k3s_agent_changed=0 - - journald_dropin="/host/etc/systemd/journald.conf.d/99-logging.conf" - k3s_dropin="/host/etc/systemd/system/k3s.service.d/99-logging.conf" - k3s_agent_dropin="/host/etc/systemd/system/k3s-agent.service.d/99-logging.conf" - k3s_image_gc_dropin="/host/etc/systemd/system/k3s.service.d/98-image-gc.conf" - k3s_agent_image_gc_dropin="/host/etc/systemd/system/k3s-agent.service.d/98-image-gc.conf" - - if [ ! -f "${journald_dropin}" ]; then - mkdir -p "$(dirname "${journald_dropin}")" - printf "[Journal]\nStorage=volatile\nRuntimeMaxUse=200M\nRuntimeKeepFree=512M\nMaxFileSec=1h\n" > "${journald_dropin}" - changed=1 - journald_changed=1 - fi - - if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_dropin}" ]; then - mkdir -p "$(dirname "${k3s_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-size=10Mi\"\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-files=2\"\n" > "${k3s_dropin}" - changed=1 - k3s_changed=1 - fi - - if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_image_gc_dropin}" ]; then - mkdir -p "$(dirname "${k3s_image_gc_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_image_gc_dropin}" - changed=1 - k3s_changed=1 - fi - - if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_dropin}" ]; then - mkdir -p "$(dirname "${k3s_agent_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-size=10Mi\"\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-files=2\"\n" > "${k3s_agent_dropin}" - changed=1 - k3s_agent_changed=1 - fi - - if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_image_gc_dropin}" ]; then - mkdir -p "$(dirname "${k3s_agent_image_gc_dropin}")" - printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_agent_image_gc_dropin}" - changed=1 - k3s_agent_changed=1 - fi - - if [ "${changed}" -eq 1 ]; then - sleep "$(( (RANDOM % 300) + 10 ))" - chroot /host /bin/systemctl daemon-reload - if [ "${journald_changed}" -eq 1 ]; then - chroot /host /bin/systemctl restart systemd-journald - fi - if [ "${k3s_changed}" -eq 1 ]; then - chroot /host /bin/systemctl restart k3s - fi - if [ "${k3s_agent_changed}" -eq 1 ]; then - chroot /host /bin/systemctl restart k3s-agent - fi - fi - - sleep infinity diff --git a/services/logging/opensearch-observability-setup-job.yaml b/services/logging/opensearch-observability-setup-job.yaml index 75e65b2..e4590fb 100644 --- a/services/logging/opensearch-observability-setup-job.yaml +++ b/services/logging/opensearch-observability-setup-job.yaml @@ -1,152 +1,4 @@ # services/logging/opensearch-observability-setup-job.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: opensearch-observability-script - namespace: logging -data: - seed.py: | - import json - import os - import time - import urllib.error - import urllib.request - - OSD_URL = os.environ.get( - "OSD_URL", - "http://opensearch-dashboards.logging.svc.cluster.local:5601", - ).rstrip("/") - OBJECT_DIR = "/config" - - def request_json(method, path, payload=None): - url = f"{OSD_URL}{path}" - data = None - headers = {"osd-xsrf": "true"} - if payload is not None: - data = json.dumps(payload).encode("utf-8") - headers["Content-Type"] = "application/json" - - req = urllib.request.Request(url, data=data, method=method) - for key, value in headers.items(): - req.add_header(key, value) - - try: - with urllib.request.urlopen(req, timeout=30) as response: - body = response.read().decode("utf-8") - except urllib.error.HTTPError as exc: - detail = exc.read().decode("utf-8") - raise SystemExit(f"{method} {path} failed: {exc.code} {detail}") - - if not body: - return {} - return json.loads(body) - - - def wait_ready(): - for _ in range(60): - try: - request_json("GET", "/api/status") - return - except Exception: - time.sleep(5) - raise SystemExit("OpenSearch Dashboards did not become ready in time") - - - def load_payload(name): - path = os.path.join(OBJECT_DIR, name) - with open(path, "r", encoding="utf-8") as handle: - return json.load(handle) - - - def index_by_name(items, key): - lookup = {} - for item in items: - obj = item.get(key, {}) - name = obj.get("name") - if not name: - continue - lookup.setdefault(name, item) - return lookup - - - def ensure_applications(apps): - existing = request_json("GET", "/api/observability/application/").get("data", []) - existing_by_name = {app.get("name"): app for app in existing if app.get("name")} - - for app in apps: - name = app.get("name") - if not name: - continue - current = existing_by_name.get(name) - if not current: - request_json("POST", "/api/observability/application/", app) - print(f"created application: {name}") - continue - - if app.get("baseQuery") != current.get("baseQuery"): - print(f"baseQuery differs for {name}; skipping update") - - update_body = {} - for key in ("description", "servicesEntities", "traceGroups"): - if app.get(key, "") != current.get(key, ""): - update_body[key] = app.get(key, "") - - if update_body: - request_json( - "PUT", - "/api/observability/application/", - {"appId": current["id"], "updateBody": update_body}, - ) - print(f"updated application: {name}") - - - def ensure_saved_objects(objects, object_type, endpoint): - existing = request_json( - "GET", - f"/api/observability/event_analytics/saved_objects?objectType={object_type}", - ).get("observabilityObjectList", []) - key = "savedQuery" if object_type == "savedQuery" else "savedVisualization" - existing_by_name = index_by_name(existing, key) - - for obj in objects: - name = obj.get("name") - if not name: - continue - current = existing_by_name.get(name) - if not current: - request_json("POST", endpoint, {"object": obj}) - print(f"created {object_type}: {name}") - continue - - current_body = current.get(key, {}) - if current_body != obj: - request_json( - "PUT", - endpoint, - {"object_id": current["objectId"], "object": obj}, - ) - print(f"updated {object_type}: {name}") - - - def main(): - wait_ready() - - applications = load_payload("applications.json") - queries = load_payload("saved_queries.json") - visualizations = load_payload("saved_visualizations.json") - - ensure_applications(applications) - ensure_saved_objects(queries, "savedQuery", "/api/observability/event_analytics/saved_objects/query") - ensure_saved_objects( - visualizations, - "savedVisualization", - "/api/observability/event_analytics/saved_objects/vis", - ) - - - if __name__ == "__main__": - main() ---- apiVersion: batch/v1 kind: Job metadata: diff --git a/services/logging/opensearch-prune-cronjob.yaml b/services/logging/opensearch-prune-cronjob.yaml index 83aee1a..75e72db 100644 --- a/services/logging/opensearch-prune-cronjob.yaml +++ b/services/logging/opensearch-prune-cronjob.yaml @@ -1,89 +1,4 @@ # services/logging/opensearch-prune-cronjob.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: opensearch-prune-script - namespace: logging -data: - prune.py: | - import json - import os - import re - import sys - import urllib.error - import urllib.request - - os_url = os.environ.get("OPENSEARCH_URL", "http://opensearch-master.logging.svc.cluster.local:9200").rstrip("/") - limit_bytes = int(os.environ.get("LOG_LIMIT_BYTES", str(1024**4))) - patterns = [p.strip() for p in os.environ.get("LOG_INDEX_PATTERNS", "kube-*,journald-*").split(",") if p.strip()] - - UNITS = { - "b": 1, - "kb": 1024, - "mb": 1024**2, - "gb": 1024**3, - "tb": 1024**4, - } - - def parse_size(value: str) -> int: - if not value: - return 0 - text = value.strip().lower() - if text in ("-", "0"): - return 0 - match = re.match(r"^([0-9.]+)([a-z]+)$", text) - if not match: - return 0 - number = float(match.group(1)) - unit = match.group(2) - if unit not in UNITS: - return 0 - return int(number * UNITS[unit]) - - def request_json(path: str): - url = f"{os_url}{path}" - with urllib.request.urlopen(url, timeout=30) as response: - payload = response.read().decode("utf-8") - return json.loads(payload) - - def delete_index(index: str) -> None: - url = f"{os_url}/{index}" - req = urllib.request.Request(url, method="DELETE") - with urllib.request.urlopen(req, timeout=30) as response: - _ = response.read() - print(f"deleted {index}") - - indices = [] - for pattern in patterns: - try: - data = request_json(f"/_cat/indices/{pattern}?format=json&h=index,store.size,creation.date") - except urllib.error.HTTPError as exc: - if exc.code == 404: - continue - raise - for item in data: - index = item.get("index") - if not index or index.startswith("."): - continue - size = parse_size(item.get("store.size", "")) - created = int(item.get("creation.date", "0") or 0) - indices.append({"index": index, "size": size, "created": created}) - - total = sum(item["size"] for item in indices) - print(f"total_log_bytes={total}") - if total <= limit_bytes: - print("within limit") - sys.exit(0) - - indices.sort(key=lambda item: item["created"]) - for item in indices: - if total <= limit_bytes: - break - delete_index(item["index"]) - total -= item["size"] - - print(f"remaining_log_bytes={total}") ---- apiVersion: batch/v1 kind: CronJob metadata: diff --git a/services/logging/scripts/node_image_gc_rpi4.sh b/services/logging/scripts/node_image_gc_rpi4.sh new file mode 100644 index 0000000..81f27b1 --- /dev/null +++ b/services/logging/scripts/node_image_gc_rpi4.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -euo pipefail + +changed=0 +k3s_changed=0 +k3s_agent_changed=0 + +k3s_dropin="/host/etc/systemd/system/k3s.service.d/98-image-gc.conf" +k3s_agent_dropin="/host/etc/systemd/system/k3s-agent.service.d/98-image-gc.conf" + +if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_dropin}" ]; then + mkdir -p "$(dirname "${k3s_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_dropin}" + changed=1 + k3s_changed=1 +fi + +if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_dropin}" ]; then + mkdir -p "$(dirname "${k3s_agent_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_agent_dropin}" + changed=1 + k3s_agent_changed=1 +fi + +if [ "${changed}" -eq 1 ]; then + sleep "$(( (RANDOM % 300) + 10 ))" + chroot /host /bin/systemctl daemon-reload + if [ "${k3s_changed}" -eq 1 ]; then + chroot /host /bin/systemctl restart k3s + fi + if [ "${k3s_agent_changed}" -eq 1 ]; then + chroot /host /bin/systemctl restart k3s-agent + fi +fi + +sleep infinity diff --git a/services/logging/scripts/node_image_prune_rpi5.sh b/services/logging/scripts/node_image_prune_rpi5.sh new file mode 100644 index 0000000..eb54b77 --- /dev/null +++ b/services/logging/scripts/node_image_prune_rpi5.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +set -euo pipefail + +threshold=70 + +sleep "$(( (RANDOM % 300) + 10 ))" + +while true; do + usage=$(df -P /host | awk 'NR==2 {gsub(/%/,"",$5); print $5}') + if [ -z "${usage}" ]; then + sleep 1800 + continue + fi + + if [ "${usage}" -ge "${threshold}" ]; then + chroot /host /bin/sh -c ' + if command -v crictl >/dev/null 2>&1; then + crictl --runtime-endpoint=unix:///run/k3s/containerd/containerd.sock rmi --prune || true + elif [ -x /usr/local/bin/crictl ]; then + /usr/local/bin/crictl --runtime-endpoint=unix:///run/k3s/containerd/containerd.sock rmi --prune || true + fi + ' + fi + + sleep 21600 +done diff --git a/services/logging/scripts/node_log_rotation.sh b/services/logging/scripts/node_log_rotation.sh new file mode 100644 index 0000000..534806f --- /dev/null +++ b/services/logging/scripts/node_log_rotation.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +set -euo pipefail + +changed=0 +journald_changed=0 +k3s_changed=0 +k3s_agent_changed=0 + +journald_dropin="/host/etc/systemd/journald.conf.d/99-logging.conf" +k3s_dropin="/host/etc/systemd/system/k3s.service.d/99-logging.conf" +k3s_agent_dropin="/host/etc/systemd/system/k3s-agent.service.d/99-logging.conf" +k3s_image_gc_dropin="/host/etc/systemd/system/k3s.service.d/98-image-gc.conf" +k3s_agent_image_gc_dropin="/host/etc/systemd/system/k3s-agent.service.d/98-image-gc.conf" + +if [ ! -f "${journald_dropin}" ]; then + mkdir -p "$(dirname "${journald_dropin}")" + printf "[Journal]\nStorage=volatile\nRuntimeMaxUse=200M\nRuntimeKeepFree=512M\nMaxFileSec=1h\n" > "${journald_dropin}" + changed=1 + journald_changed=1 +fi + +if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_dropin}" ]; then + mkdir -p "$(dirname "${k3s_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-size=10Mi\"\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-files=2\"\n" > "${k3s_dropin}" + changed=1 + k3s_changed=1 +fi + +if [ -f "/host/etc/systemd/system/k3s.service" ] && [ ! -f "${k3s_image_gc_dropin}" ]; then + mkdir -p "$(dirname "${k3s_image_gc_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_image_gc_dropin}" + changed=1 + k3s_changed=1 +fi + +if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_dropin}" ]; then + mkdir -p "$(dirname "${k3s_agent_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-size=10Mi\"\nEnvironment=\"K3S_KUBELET_ARG=container-log-max-files=2\"\n" > "${k3s_agent_dropin}" + changed=1 + k3s_agent_changed=1 +fi + +if [ -f "/host/etc/systemd/system/k3s-agent.service" ] && [ ! -f "${k3s_agent_image_gc_dropin}" ]; then + mkdir -p "$(dirname "${k3s_agent_image_gc_dropin}")" + printf "[Service]\nEnvironment=\"K3S_KUBELET_ARG=image-gc-high-threshold=70\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-low-threshold=60\"\nEnvironment=\"K3S_KUBELET_ARG=image-gc-minimum-available=5Gi\"\n" > "${k3s_agent_image_gc_dropin}" + changed=1 + k3s_agent_changed=1 +fi + +if [ "${changed}" -eq 1 ]; then + sleep "$(( (RANDOM % 300) + 10 ))" + chroot /host /bin/systemctl daemon-reload + if [ "${journald_changed}" -eq 1 ]; then + chroot /host /bin/systemctl restart systemd-journald + fi + if [ "${k3s_changed}" -eq 1 ]; then + chroot /host /bin/systemctl restart k3s + fi + if [ "${k3s_agent_changed}" -eq 1 ]; then + chroot /host /bin/systemctl restart k3s-agent + fi +fi + +sleep infinity diff --git a/services/logging/scripts/opensearch_observability_seed.py b/services/logging/scripts/opensearch_observability_seed.py new file mode 100644 index 0000000..d7bf808 --- /dev/null +++ b/services/logging/scripts/opensearch_observability_seed.py @@ -0,0 +1,140 @@ +import json +import os +import time +import urllib.error +import urllib.request + +OSD_URL = os.environ.get( + "OSD_URL", + "http://opensearch-dashboards.logging.svc.cluster.local:5601", +).rstrip("/") +OBJECT_DIR = "/config" + +def request_json(method, path, payload=None): + url = f"{OSD_URL}{path}" + data = None + headers = {"osd-xsrf": "true"} + if payload is not None: + data = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + + req = urllib.request.Request(url, data=data, method=method) + for key, value in headers.items(): + req.add_header(key, value) + + try: + with urllib.request.urlopen(req, timeout=30) as response: + body = response.read().decode("utf-8") + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8") + raise SystemExit(f"{method} {path} failed: {exc.code} {detail}") + + if not body: + return {} + return json.loads(body) + + +def wait_ready(): + for _ in range(60): + try: + request_json("GET", "/api/status") + return + except Exception: + time.sleep(5) + raise SystemExit("OpenSearch Dashboards did not become ready in time") + + +def load_payload(name): + path = os.path.join(OBJECT_DIR, name) + with open(path, "r", encoding="utf-8") as handle: + return json.load(handle) + + +def index_by_name(items, key): + lookup = {} + for item in items: + obj = item.get(key, {}) + name = obj.get("name") + if not name: + continue + lookup.setdefault(name, item) + return lookup + + +def ensure_applications(apps): + existing = request_json("GET", "/api/observability/application/").get("data", []) + existing_by_name = {app.get("name"): app for app in existing if app.get("name")} + + for app in apps: + name = app.get("name") + if not name: + continue + current = existing_by_name.get(name) + if not current: + request_json("POST", "/api/observability/application/", app) + print(f"created application: {name}") + continue + + if app.get("baseQuery") != current.get("baseQuery"): + print(f"baseQuery differs for {name}; skipping update") + + update_body = {} + for key in ("description", "servicesEntities", "traceGroups"): + if app.get(key, "") != current.get(key, ""): + update_body[key] = app.get(key, "") + + if update_body: + request_json( + "PUT", + "/api/observability/application/", + {"appId": current["id"], "updateBody": update_body}, + ) + print(f"updated application: {name}") + + +def ensure_saved_objects(objects, object_type, endpoint): + existing = request_json( + "GET", + f"/api/observability/event_analytics/saved_objects?objectType={object_type}", + ).get("observabilityObjectList", []) + key = "savedQuery" if object_type == "savedQuery" else "savedVisualization" + existing_by_name = index_by_name(existing, key) + + for obj in objects: + name = obj.get("name") + if not name: + continue + current = existing_by_name.get(name) + if not current: + request_json("POST", endpoint, {"object": obj}) + print(f"created {object_type}: {name}") + continue + + current_body = current.get(key, {}) + if current_body != obj: + request_json( + "PUT", + endpoint, + {"object_id": current["objectId"], "object": obj}, + ) + print(f"updated {object_type}: {name}") + + +def main(): + wait_ready() + + applications = load_payload("applications.json") + queries = load_payload("saved_queries.json") + visualizations = load_payload("saved_visualizations.json") + + ensure_applications(applications) + ensure_saved_objects(queries, "savedQuery", "/api/observability/event_analytics/saved_objects/query") + ensure_saved_objects( + visualizations, + "savedVisualization", + "/api/observability/event_analytics/saved_objects/vis", + ) + + +if __name__ == "__main__": + main() diff --git a/services/logging/scripts/opensearch_prune.py b/services/logging/scripts/opensearch_prune.py new file mode 100644 index 0000000..ad84d5b --- /dev/null +++ b/services/logging/scripts/opensearch_prune.py @@ -0,0 +1,77 @@ +import json +import os +import re +import sys +import urllib.error +import urllib.request + +os_url = os.environ.get("OPENSEARCH_URL", "http://opensearch-master.logging.svc.cluster.local:9200").rstrip("/") +limit_bytes = int(os.environ.get("LOG_LIMIT_BYTES", str(1024**4))) +patterns = [p.strip() for p in os.environ.get("LOG_INDEX_PATTERNS", "kube-*,journald-*").split(",") if p.strip()] + +UNITS = { + "b": 1, + "kb": 1024, + "mb": 1024**2, + "gb": 1024**3, + "tb": 1024**4, +} + +def parse_size(value: str) -> int: + if not value: + return 0 + text = value.strip().lower() + if text in ("-", "0"): + return 0 + match = re.match(r"^([0-9.]+)([a-z]+)$", text) + if not match: + return 0 + number = float(match.group(1)) + unit = match.group(2) + if unit not in UNITS: + return 0 + return int(number * UNITS[unit]) + +def request_json(path: str): + url = f"{os_url}{path}" + with urllib.request.urlopen(url, timeout=30) as response: + payload = response.read().decode("utf-8") + return json.loads(payload) + +def delete_index(index: str) -> None: + url = f"{os_url}/{index}" + req = urllib.request.Request(url, method="DELETE") + with urllib.request.urlopen(req, timeout=30) as response: + _ = response.read() + print(f"deleted {index}") + +indices = [] +for pattern in patterns: + try: + data = request_json(f"/_cat/indices/{pattern}?format=json&h=index,store.size,creation.date") + except urllib.error.HTTPError as exc: + if exc.code == 404: + continue + raise + for item in data: + index = item.get("index") + if not index or index.startswith("."): + continue + size = parse_size(item.get("store.size", "")) + created = int(item.get("creation.date", "0") or 0) + indices.append({"index": index, "size": size, "created": created}) + +total = sum(item["size"] for item in indices) +print(f"total_log_bytes={total}") +if total <= limit_bytes: + print("within limit") + sys.exit(0) + +indices.sort(key=lambda item: item["created"]) +for item in indices: + if total <= limit_bytes: + break + delete_index(item["index"]) + total -= item["size"] + +print(f"remaining_log_bytes={total}") diff --git a/services/maintenance/kustomization.yaml b/services/maintenance/kustomization.yaml index ccb5e7e..ce34afb 100644 --- a/services/maintenance/kustomization.yaml +++ b/services/maintenance/kustomization.yaml @@ -5,11 +5,28 @@ resources: - namespace.yaml - node-nofile-serviceaccount.yaml - pod-cleaner-rbac.yaml - - node-nofile-script.yaml - - pod-cleaner-script.yaml - node-nofile-daemonset.yaml - pod-cleaner-cronjob.yaml - node-image-sweeper-serviceaccount.yaml - - node-image-sweeper-script.yaml - node-image-sweeper-daemonset.yaml - image-sweeper-cronjob.yaml + +configMapGenerator: + - name: node-nofile-script + namespace: maintenance + files: + - node_nofile.sh=scripts/node_nofile.sh + options: + disableNameSuffixHash: true + - name: pod-cleaner-script + namespace: maintenance + files: + - pod_cleaner.sh=scripts/pod_cleaner.sh + options: + disableNameSuffixHash: true + - name: node-image-sweeper-script + namespace: maintenance + files: + - node_image_sweeper.sh=scripts/node_image_sweeper.sh + options: + disableNameSuffixHash: true diff --git a/services/maintenance/node-image-sweeper-script.yaml b/services/maintenance/node-image-sweeper-script.yaml deleted file mode 100644 index 6e3b02c..0000000 --- a/services/maintenance/node-image-sweeper-script.yaml +++ /dev/null @@ -1,100 +0,0 @@ -# services/maintenance/node-image-sweeper-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: node-image-sweeper-script - namespace: maintenance -data: - node_image_sweeper.sh: | - #!/bin/sh - set -eu - - ONE_SHOT=${ONE_SHOT:-false} - THRESHOLD_DAYS=14 - - usage=$(df -P /host | awk 'NR==2 {gsub(/%/,"",$5); print $5}') || usage="" - if [ -n "${usage}" ] && [ "${usage}" -ge 70 ]; then - THRESHOLD_DAYS=3 - fi - - cutoff=$(python3 - <<'PY' - import time, os - print(int(time.time()) - int(os.environ.get("THRESHOLD_DAYS", "14")) * 86400) - PY - ) - - RUNNING=$(chroot /host /bin/sh -c "crictl ps -a --quiet 2>/dev/null" | tr -s ' ' '\n' | sort -u | tr '\n' ' ') - IMAGES_JSON=$(chroot /host /bin/sh -c "crictl images -o json 2>/dev/null" || echo '{}') - - SKIP="registry.k8s.io/pause k8s.gcr.io/pause rancher/mirrored-pause" - - prune_list=$(printf "%s" "${IMAGES_JSON}" | CUTOFF="${cutoff}" RUNNING="${RUNNING}" SKIP="${SKIP}" python3 - <<'PY' - import json, os, sys, time - - try: - data = json.load(sys.stdin) - except Exception: - print("", end="") - sys.exit(0) - - cutoff = int(os.environ.get("CUTOFF", "0")) - running = set(os.environ.get("RUNNING", "").split()) - skip = os.environ.get("SKIP", "").split() - now = int(time.time()) - prune = [] - - - def is_skip(tags): - if not tags: - return False - for t in tags: - for prefix in skip: - if prefix and t.startswith(prefix): - return True - return False - - - for img in data.get("images", []): - image_id = img.get("id", "") - if not image_id: - continue - if image_id in running: - continue - tags = img.get("repoTags") or [] - if is_skip(tags): - continue - created = img.get("createdAt") or 0 - try: - created = int(str(created)) // 1000000000 - except Exception: - created = 0 - if created and created > now: - created = now - if cutoff and created and created < cutoff: - prune.append(image_id) - - seen = set() - for p in prune: - if p in seen: - continue - seen.add(p) - print(p) - PY - ) - - if [ -n "${prune_list}" ]; then - printf "%s" "${prune_list}" | while read -r image_id; do - if [ -n "${image_id}" ]; then - chroot /host /bin/sh -c "crictl rmi --prune ${image_id}" || true - fi - done - fi - - find /host/var/lib/rancher/k3s/agent/images -type f -name "*.tar" -mtime +7 -print -delete 2>/dev/null || true - find /host/var/lib/rancher/k3s/agent/containerd -maxdepth 1 -type f -mtime +7 -print -delete 2>/dev/null || true - - if [ "${ONE_SHOT}" = "true" ]; then - exit 0 - fi - - sleep infinity diff --git a/services/maintenance/node-nofile-script.yaml b/services/maintenance/node-nofile-script.yaml deleted file mode 100644 index 2e2b440..0000000 --- a/services/maintenance/node-nofile-script.yaml +++ /dev/null @@ -1,38 +0,0 @@ -# services/maintenance/node-nofile-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: node-nofile-script - namespace: maintenance -data: - node_nofile.sh: | - #!/usr/bin/env bash - set -euo pipefail - - limit_line="LimitNOFILE=1048576" - changed=0 - - for unit in k3s k3s-agent; do - unit_file="/host/etc/systemd/system/${unit}.service" - if [ -f "${unit_file}" ]; then - dropin_dir="/host/etc/systemd/system/${unit}.service.d" - dropin_file="${dropin_dir}/99-nofile.conf" - if [ ! -f "${dropin_file}" ] || ! grep -q "${limit_line}" "${dropin_file}"; then - mkdir -p "${dropin_dir}" - printf "[Service]\n%s\n" "${limit_line}" > "${dropin_file}" - changed=1 - fi - fi - done - - if [ "${changed}" -eq 1 ]; then - sleep "$(( (RANDOM % 300) + 10 ))" - chroot /host /bin/systemctl daemon-reload - for unit in k3s k3s-agent; do - if [ -f "/host/etc/systemd/system/${unit}.service" ]; then - chroot /host /bin/systemctl restart "${unit}" - fi - done - fi - - sleep infinity diff --git a/services/maintenance/pod-cleaner-script.yaml b/services/maintenance/pod-cleaner-script.yaml deleted file mode 100644 index 909a37c..0000000 --- a/services/maintenance/pod-cleaner-script.yaml +++ /dev/null @@ -1,20 +0,0 @@ -# services/maintenance/pod-cleaner-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: pod-cleaner-script - namespace: maintenance -data: - pod_cleaner.sh: | - #!/usr/bin/env bash - set -euo pipefail - - for phase in Succeeded Failed; do - kubectl get pods -A --field-selector="status.phase=${phase}" \ - -o jsonpath='{range .items[*]}{.metadata.namespace}{" "}{.metadata.name}{"\n"}{end}' \ - | while read -r namespace name; do - if [ -n "${namespace}" ] && [ -n "${name}" ]; then - kubectl delete pod -n "${namespace}" "${name}" --ignore-not-found --grace-period=0 --wait=false - fi - done - done diff --git a/services/maintenance/scripts/node_image_sweeper.sh b/services/maintenance/scripts/node_image_sweeper.sh new file mode 100644 index 0000000..2ad7b47 --- /dev/null +++ b/services/maintenance/scripts/node_image_sweeper.sh @@ -0,0 +1,92 @@ +#!/bin/sh +set -eu + +ONE_SHOT=${ONE_SHOT:-false} +THRESHOLD_DAYS=14 + +usage=$(df -P /host | awk 'NR==2 {gsub(/%/,"",$5); print $5}') || usage="" +if [ -n "${usage}" ] && [ "${usage}" -ge 70 ]; then + THRESHOLD_DAYS=3 +fi + +cutoff=$(python3 - <<'PY' +import time, os +print(int(time.time()) - int(os.environ.get("THRESHOLD_DAYS", "14")) * 86400) +PY +) + +RUNNING=$(chroot /host /bin/sh -c "crictl ps -a --quiet 2>/dev/null" | tr -s ' ' '\n' | sort -u | tr '\n' ' ') +IMAGES_JSON=$(chroot /host /bin/sh -c "crictl images -o json 2>/dev/null" || echo '{}') + +SKIP="registry.k8s.io/pause k8s.gcr.io/pause rancher/mirrored-pause" + +prune_list=$(printf "%s" "${IMAGES_JSON}" | CUTOFF="${cutoff}" RUNNING="${RUNNING}" SKIP="${SKIP}" python3 - <<'PY' +import json, os, sys, time + +try: + data = json.load(sys.stdin) +except Exception: + print("", end="") + sys.exit(0) + +cutoff = int(os.environ.get("CUTOFF", "0")) +running = set(os.environ.get("RUNNING", "").split()) +skip = os.environ.get("SKIP", "").split() +now = int(time.time()) +prune = [] + + +def is_skip(tags): + if not tags: + return False + for t in tags: + for prefix in skip: + if prefix and t.startswith(prefix): + return True + return False + + +for img in data.get("images", []): + image_id = img.get("id", "") + if not image_id: + continue + if image_id in running: + continue + tags = img.get("repoTags") or [] + if is_skip(tags): + continue + created = img.get("createdAt") or 0 + try: + created = int(str(created)) // 1000000000 + except Exception: + created = 0 + if created and created > now: + created = now + if cutoff and created and created < cutoff: + prune.append(image_id) + +seen = set() +for p in prune: + if p in seen: + continue + seen.add(p) + print(p) +PY +) + +if [ -n "${prune_list}" ]; then + printf "%s" "${prune_list}" | while read -r image_id; do + if [ -n "${image_id}" ]; then + chroot /host /bin/sh -c "crictl rmi --prune ${image_id}" || true + fi + done +fi + +find /host/var/lib/rancher/k3s/agent/images -type f -name "*.tar" -mtime +7 -print -delete 2>/dev/null || true +find /host/var/lib/rancher/k3s/agent/containerd -maxdepth 1 -type f -mtime +7 -print -delete 2>/dev/null || true + +if [ "${ONE_SHOT}" = "true" ]; then + exit 0 +fi + +sleep infinity diff --git a/services/maintenance/scripts/node_nofile.sh b/services/maintenance/scripts/node_nofile.sh new file mode 100644 index 0000000..cf6c5d9 --- /dev/null +++ b/services/maintenance/scripts/node_nofile.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +limit_line="LimitNOFILE=1048576" +changed=0 + +for unit in k3s k3s-agent; do + unit_file="/host/etc/systemd/system/${unit}.service" + if [ -f "${unit_file}" ]; then + dropin_dir="/host/etc/systemd/system/${unit}.service.d" + dropin_file="${dropin_dir}/99-nofile.conf" + if [ ! -f "${dropin_file}" ] || ! grep -q "${limit_line}" "${dropin_file}"; then + mkdir -p "${dropin_dir}" + printf "[Service]\n%s\n" "${limit_line}" > "${dropin_file}" + changed=1 + fi + fi +done + +if [ "${changed}" -eq 1 ]; then + sleep "$(( (RANDOM % 300) + 10 ))" + chroot /host /bin/systemctl daemon-reload + for unit in k3s k3s-agent; do + if [ -f "/host/etc/systemd/system/${unit}.service" ]; then + chroot /host /bin/systemctl restart "${unit}" + fi + done +fi + +sleep infinity diff --git a/services/maintenance/scripts/pod_cleaner.sh b/services/maintenance/scripts/pod_cleaner.sh new file mode 100644 index 0000000..2ec043e --- /dev/null +++ b/services/maintenance/scripts/pod_cleaner.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +for phase in Succeeded Failed; do + kubectl get pods -A --field-selector="status.phase=${phase}" \ + -o jsonpath='{range .items[*]}{.metadata.namespace}{" "}{.metadata.name}{"\n"}{end}' \ + | while read -r namespace name; do + if [ -n "${namespace}" ] && [ -n "${name}" ]; then + kubectl delete pod -n "${namespace}" "${name}" --ignore-not-found --grace-period=0 --wait=false + fi + done +done diff --git a/services/monitoring/grafana-smtp-sync-script.yaml b/services/monitoring/grafana-smtp-sync-script.yaml deleted file mode 100644 index 0a58a3c..0000000 --- a/services/monitoring/grafana-smtp-sync-script.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# services/monitoring/grafana-smtp-sync-script.yaml -apiVersion: v1 -kind: ConfigMap -metadata: - name: grafana-smtp-sync-script - namespace: monitoring -data: - sync.sh: | - #!/bin/sh - set -euo pipefail - - SOURCE_NS=${SOURCE_NS:-mailu-mailserver} - SOURCE_SECRET=${SOURCE_SECRET:-mailu-postmark-relay} - TARGET_NS=${TARGET_NS:-monitoring} - TARGET_SECRET=${TARGET_SECRET:-grafana-smtp} - - tmp=$(mktemp) - cleanup() { rm -f "$tmp"; } - trap cleanup EXIT - - kubectl -n "$SOURCE_NS" get secret "$SOURCE_SECRET" -o json > "$tmp" - - pass=$(jq -r '.data["relay-password"]' "$tmp") - user=$pass - - if [ -z "$user" ] || [ -z "$pass" ] || [ "$user" = "null" ] || [ "$pass" = "null" ]; then - echo "missing credentials from $SOURCE_NS/$SOURCE_SECRET" >&2 - exit 1 - fi - - cat < dict: - today = dt.date.today() - fromdate = today - dt.timedelta(days=window.days) - params = {"fromdate": fromdate.isoformat(), "todate": today.isoformat()} - headers = { - "Accept": "application/json", - "X-Postmark-Server-Token": token, - } - response = requests.get( - f"{API_BASE}/stats/outbound", - headers=headers, - params=params, - timeout=15, - ) - response.raise_for_status() - return response.json() - - - def update_metrics(token: str) -> None: - sent_by_window = {} - for window in WINDOWS: - data = fetch_outbound_stats(token, window) - sent = int(data.get("Sent", 0) or 0) - bounced = int(data.get("Bounced", 0) or 0) - rate = (bounced / sent * 100.0) if sent else 0.0 - sent_by_window[window.label] = sent - POSTMARK_OUTBOUND_SENT.labels(window=window.label).set(sent) - POSTMARK_OUTBOUND_BOUNCED.labels(window=window.label).set(bounced) - POSTMARK_OUTBOUND_BOUNCE_RATE.labels(window=window.label).set(rate) - - POSTMARK_SENDING_LIMIT_GAUGE.set(SENDING_LIMIT) - limit_window_sent = sent_by_window.get(LIMIT_WINDOW, 0) - POSTMARK_SENDING_LIMIT_USED.set(limit_window_sent) - if SENDING_LIMIT: - POSTMARK_SENDING_LIMIT_USED_PERCENT.set(limit_window_sent / SENDING_LIMIT * 100.0) - else: - POSTMARK_SENDING_LIMIT_USED_PERCENT.set(0.0) - - - def main() -> None: - if not PRIMARY_TOKEN and not FALLBACK_TOKEN: - raise SystemExit("POSTMARK_SERVER_TOKEN or POSTMARK_SERVER_TOKEN_FALLBACK is required") - - start_http_server(LISTEN_PORT, addr=LISTEN_ADDRESS) - - tokens = [token for token in (PRIMARY_TOKEN, FALLBACK_TOKEN) if token] - token_index = 0 - - while True: - token = tokens[token_index % len(tokens)] - token_index += 1 - try: - update_metrics(token) - POSTMARK_API_UP.set(1) - POSTMARK_LAST_SUCCESS.set(time.time()) - except Exception as exc: # noqa: BLE001 - POSTMARK_API_UP.set(0) - POSTMARK_REQUEST_ERRORS.inc() - print(f"postmark_exporter: refresh failed: {exc}", flush=True) - time.sleep(POLL_INTERVAL_SECONDS) - - - if __name__ == "__main__": - main() diff --git a/services/monitoring/scripts/grafana_smtp_sync.sh b/services/monitoring/scripts/grafana_smtp_sync.sh new file mode 100644 index 0000000..c8207ad --- /dev/null +++ b/services/monitoring/scripts/grafana_smtp_sync.sh @@ -0,0 +1,31 @@ +#!/bin/sh +set -euo pipefail + +SOURCE_NS=${SOURCE_NS:-mailu-mailserver} +SOURCE_SECRET=${SOURCE_SECRET:-mailu-postmark-relay} +TARGET_NS=${TARGET_NS:-monitoring} +TARGET_SECRET=${TARGET_SECRET:-grafana-smtp} + +tmp=$(mktemp) +cleanup() { rm -f "$tmp"; } +trap cleanup EXIT + +kubectl -n "$SOURCE_NS" get secret "$SOURCE_SECRET" -o json > "$tmp" + +pass=$(jq -r '.data["relay-password"]' "$tmp") +user=$pass + +if [ -z "$user" ] || [ -z "$pass" ] || [ "$user" = "null" ] || [ "$pass" = "null" ]; then + echo "missing credentials from $SOURCE_NS/$SOURCE_SECRET" >&2 + exit 1 +fi + +cat < dict: + today = dt.date.today() + fromdate = today - dt.timedelta(days=window.days) + params = {"fromdate": fromdate.isoformat(), "todate": today.isoformat()} + headers = { + "Accept": "application/json", + "X-Postmark-Server-Token": token, + } + response = requests.get( + f"{API_BASE}/stats/outbound", + headers=headers, + params=params, + timeout=15, + ) + response.raise_for_status() + return response.json() + + +def update_metrics(token: str) -> None: + sent_by_window = {} + for window in WINDOWS: + data = fetch_outbound_stats(token, window) + sent = int(data.get("Sent", 0) or 0) + bounced = int(data.get("Bounced", 0) or 0) + rate = (bounced / sent * 100.0) if sent else 0.0 + sent_by_window[window.label] = sent + POSTMARK_OUTBOUND_SENT.labels(window=window.label).set(sent) + POSTMARK_OUTBOUND_BOUNCED.labels(window=window.label).set(bounced) + POSTMARK_OUTBOUND_BOUNCE_RATE.labels(window=window.label).set(rate) + + POSTMARK_SENDING_LIMIT_GAUGE.set(SENDING_LIMIT) + limit_window_sent = sent_by_window.get(LIMIT_WINDOW, 0) + POSTMARK_SENDING_LIMIT_USED.set(limit_window_sent) + if SENDING_LIMIT: + POSTMARK_SENDING_LIMIT_USED_PERCENT.set(limit_window_sent / SENDING_LIMIT * 100.0) + else: + POSTMARK_SENDING_LIMIT_USED_PERCENT.set(0.0) + + +def main() -> None: + if not PRIMARY_TOKEN and not FALLBACK_TOKEN: + raise SystemExit("POSTMARK_SERVER_TOKEN or POSTMARK_SERVER_TOKEN_FALLBACK is required") + + start_http_server(LISTEN_PORT, addr=LISTEN_ADDRESS) + + tokens = [token for token in (PRIMARY_TOKEN, FALLBACK_TOKEN) if token] + token_index = 0 + + while True: + token = tokens[token_index % len(tokens)] + token_index += 1 + try: + update_metrics(token) + POSTMARK_API_UP.set(1) + POSTMARK_LAST_SUCCESS.set(time.time()) + except Exception as exc: # noqa: BLE001 + POSTMARK_API_UP.set(0) + POSTMARK_REQUEST_ERRORS.inc() + print(f"postmark_exporter: refresh failed: {exc}", flush=True) + time.sleep(POLL_INTERVAL_SECONDS) + + +if __name__ == "__main__": + main()