from __future__ import annotations import base64 import threading import time from dataclasses import dataclass from pathlib import Path from typing import Any import httpx from . import settings _K8S_BASE_URL = "https://kubernetes.default.svc" _SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount") def _read_service_account() -> tuple[str, str]: token_path = _SA_PATH / "token" ca_path = _SA_PATH / "ca.crt" if not token_path.exists() or not ca_path.exists(): raise RuntimeError("kubernetes service account token missing") token = token_path.read_text().strip() if not token: raise RuntimeError("kubernetes service account token empty") return token, str(ca_path) def _k8s_get_json(path: str) -> dict[str, Any]: token, ca_path = _read_service_account() url = f"{_K8S_BASE_URL}{path}" with httpx.Client( verify=ca_path, timeout=settings.HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}, ) as client: resp = client.get(url) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise RuntimeError("unexpected kubernetes response") return data def _k8s_find_pod_ip(namespace: str, label_selector: str) -> str: data = _k8s_get_json(f"/api/v1/namespaces/{namespace}/pods?labelSelector={label_selector}") items = data.get("items") or [] if not isinstance(items, list) or not items: raise RuntimeError("no vaultwarden pods found") def _pod_ready(pod: dict[str, Any]) -> bool: status = pod.get("status") if isinstance(pod.get("status"), dict) else {} if status.get("phase") != "Running": return False ip = status.get("podIP") if not isinstance(ip, str) or not ip: return False conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else [] for cond in conditions: if not isinstance(cond, dict): continue if cond.get("type") == "Ready": return cond.get("status") == "True" return True ready = [p for p in items if isinstance(p, dict) and _pod_ready(p)] candidates = ready or [p for p in items if isinstance(p, dict)] status = candidates[0].get("status") or {} ip = status.get("podIP") if isinstance(status, dict) else None if not isinstance(ip, str) or not ip: raise RuntimeError("vaultwarden pod has no IP") return ip def _k8s_get_secret_value(namespace: str, name: str, key: str) -> str: data = _k8s_get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}") blob = data.get("data") if isinstance(data.get("data"), dict) else {} raw = blob.get(key) if not isinstance(raw, str) or not raw: raise RuntimeError("secret key missing") try: decoded = base64.b64decode(raw).decode("utf-8").strip() except Exception as exc: raise RuntimeError("failed to decode secret") from exc if not decoded: raise RuntimeError("secret value empty") return decoded @dataclass(frozen=True) class VaultwardenInvite: ok: bool status: str detail: str = "" _ADMIN_LOCK = threading.Lock() _ADMIN_SESSION: httpx.Client | None = None _ADMIN_SESSION_EXPIRES_AT: float = 0.0 _ADMIN_SESSION_BASE_URL: str = "" _ADMIN_RATE_LIMITED_UNTIL: float = 0.0 def _admin_session(base_url: str) -> httpx.Client: global _ADMIN_SESSION, _ADMIN_SESSION_EXPIRES_AT, _ADMIN_SESSION_BASE_URL, _ADMIN_RATE_LIMITED_UNTIL now = time.time() with _ADMIN_LOCK: if _ADMIN_RATE_LIMITED_UNTIL and now < _ADMIN_RATE_LIMITED_UNTIL: raise RuntimeError("vaultwarden rate limited") if _ADMIN_SESSION and now < _ADMIN_SESSION_EXPIRES_AT and _ADMIN_SESSION_BASE_URL == base_url: return _ADMIN_SESSION if _ADMIN_SESSION: try: _ADMIN_SESSION.close() except Exception: pass _ADMIN_SESSION = None token = _k8s_get_secret_value( settings.VAULTWARDEN_NAMESPACE, settings.VAULTWARDEN_ADMIN_SECRET_NAME, settings.VAULTWARDEN_ADMIN_SECRET_KEY, ) client = httpx.Client( base_url=base_url, timeout=settings.HTTP_CHECK_TIMEOUT_SEC, follow_redirects=True, headers={"User-Agent": "atlas-portal/1"}, ) # Authenticate to the admin UI to establish a session cookie. # Vaultwarden can rate-limit admin login attempts, so keep this session cached briefly. resp = client.post("/admin", data={"token": token}) if resp.status_code == 429: _ADMIN_RATE_LIMITED_UNTIL = now + float(settings.VAULTWARDEN_ADMIN_RATE_LIMIT_BACKOFF_SEC) raise RuntimeError("vaultwarden rate limited") resp.raise_for_status() _ADMIN_SESSION = client _ADMIN_SESSION_BASE_URL = base_url _ADMIN_SESSION_EXPIRES_AT = now + float(settings.VAULTWARDEN_ADMIN_SESSION_TTL_SEC) return client def invite_user(email: str) -> VaultwardenInvite: global _ADMIN_RATE_LIMITED_UNTIL email = (email or "").strip() if not email or "@" not in email: return VaultwardenInvite(ok=False, status="invalid_email", detail="email invalid") if _ADMIN_RATE_LIMITED_UNTIL and time.time() < _ADMIN_RATE_LIMITED_UNTIL: return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") # Prefer the service name when it works; fall back to pod IP because the Service can be misconfigured. base_url = f"http://{settings.VAULTWARDEN_SERVICE_HOST}" fallback_url = "" try: pod_ip = _k8s_find_pod_ip(settings.VAULTWARDEN_NAMESPACE, settings.VAULTWARDEN_POD_LABEL) fallback_url = f"http://{pod_ip}:{settings.VAULTWARDEN_POD_PORT}" except Exception: fallback_url = "" last_error = "" for candidate in [base_url, fallback_url]: if not candidate: continue try: session = _admin_session(candidate) resp = session.post("/admin/invite", json={"email": email}) if resp.status_code == 429: _ADMIN_RATE_LIMITED_UNTIL = time.time() + float(settings.VAULTWARDEN_ADMIN_RATE_LIMIT_BACKOFF_SEC) return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") if resp.status_code in {200, 201, 204}: return VaultwardenInvite(ok=True, status="invited", detail="invite created") # Treat "already exists/invited" as success for idempotency. body = "" try: body = resp.text or "" except Exception: body = "" if resp.status_code in {400, 409} and any( marker in body.lower() for marker in ( "already invited", "already exists", "already registered", "user already exists", ) ): return VaultwardenInvite(ok=True, status="already_present", detail="user already present") last_error = f"status {resp.status_code}" except Exception as exc: last_error = str(exc) if "rate limited" in last_error.lower(): return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") continue return VaultwardenInvite(ok=False, status="error", detail=last_error or "failed to invite")