195 lines
6.7 KiB
Python
195 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from . import settings
|
|
|
|
|
|
_K8S_BASE_URL = "https://kubernetes.default.svc"
|
|
_SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount")
|
|
|
|
|
|
def _read_service_account() -> tuple[str, str]:
|
|
token_path = _SA_PATH / "token"
|
|
ca_path = _SA_PATH / "ca.crt"
|
|
if not token_path.exists() or not ca_path.exists():
|
|
raise RuntimeError("kubernetes service account token missing")
|
|
token = token_path.read_text().strip()
|
|
if not token:
|
|
raise RuntimeError("kubernetes service account token empty")
|
|
return token, str(ca_path)
|
|
|
|
|
|
def _k8s_get_json(path: str) -> dict[str, Any]:
|
|
token, ca_path = _read_service_account()
|
|
url = f"{_K8S_BASE_URL}{path}"
|
|
with httpx.Client(
|
|
verify=ca_path,
|
|
timeout=settings.HTTP_CHECK_TIMEOUT_SEC,
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
) as client:
|
|
resp = client.get(url)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not isinstance(data, dict):
|
|
raise RuntimeError("unexpected kubernetes response")
|
|
return data
|
|
|
|
|
|
def _k8s_find_pod_ip(namespace: str, label_selector: str) -> str:
|
|
data = _k8s_get_json(f"/api/v1/namespaces/{namespace}/pods?labelSelector={label_selector}")
|
|
items = data.get("items") or []
|
|
if not isinstance(items, list) or not items:
|
|
raise RuntimeError("no vaultwarden pods found")
|
|
|
|
def _pod_ready(pod: dict[str, Any]) -> bool:
|
|
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
|
|
if status.get("phase") != "Running":
|
|
return False
|
|
ip = status.get("podIP")
|
|
if not isinstance(ip, str) or not ip:
|
|
return False
|
|
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
|
|
for cond in conditions:
|
|
if not isinstance(cond, dict):
|
|
continue
|
|
if cond.get("type") == "Ready":
|
|
return cond.get("status") == "True"
|
|
return True
|
|
|
|
ready = [p for p in items if isinstance(p, dict) and _pod_ready(p)]
|
|
candidates = ready or [p for p in items if isinstance(p, dict)]
|
|
status = candidates[0].get("status") or {}
|
|
ip = status.get("podIP") if isinstance(status, dict) else None
|
|
if not isinstance(ip, str) or not ip:
|
|
raise RuntimeError("vaultwarden pod has no IP")
|
|
return ip
|
|
|
|
|
|
def _k8s_get_secret_value(namespace: str, name: str, key: str) -> str:
|
|
data = _k8s_get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
|
|
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
|
|
raw = blob.get(key)
|
|
if not isinstance(raw, str) or not raw:
|
|
raise RuntimeError("secret key missing")
|
|
try:
|
|
decoded = base64.b64decode(raw).decode("utf-8").strip()
|
|
except Exception as exc:
|
|
raise RuntimeError("failed to decode secret") from exc
|
|
if not decoded:
|
|
raise RuntimeError("secret value empty")
|
|
return decoded
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class VaultwardenInvite:
|
|
ok: bool
|
|
status: str
|
|
detail: str = ""
|
|
|
|
|
|
_ADMIN_LOCK = threading.Lock()
|
|
_ADMIN_SESSION: httpx.Client | None = None
|
|
_ADMIN_SESSION_EXPIRES_AT: float = 0.0
|
|
_ADMIN_SESSION_BASE_URL: str = ""
|
|
|
|
|
|
def _admin_session(base_url: str) -> httpx.Client:
|
|
global _ADMIN_SESSION, _ADMIN_SESSION_EXPIRES_AT, _ADMIN_SESSION_BASE_URL
|
|
now = time.time()
|
|
with _ADMIN_LOCK:
|
|
if _ADMIN_SESSION and now < _ADMIN_SESSION_EXPIRES_AT and _ADMIN_SESSION_BASE_URL == base_url:
|
|
return _ADMIN_SESSION
|
|
|
|
if _ADMIN_SESSION:
|
|
try:
|
|
_ADMIN_SESSION.close()
|
|
except Exception:
|
|
pass
|
|
_ADMIN_SESSION = None
|
|
|
|
token = _k8s_get_secret_value(
|
|
settings.VAULTWARDEN_NAMESPACE,
|
|
settings.VAULTWARDEN_ADMIN_SECRET_NAME,
|
|
settings.VAULTWARDEN_ADMIN_SECRET_KEY,
|
|
)
|
|
|
|
client = httpx.Client(
|
|
base_url=base_url,
|
|
timeout=settings.HTTP_CHECK_TIMEOUT_SEC,
|
|
follow_redirects=True,
|
|
headers={"User-Agent": "atlas-portal/1"},
|
|
)
|
|
|
|
# Authenticate to the admin UI to establish a session cookie.
|
|
# Vaultwarden can rate-limit admin login attempts, so keep this session cached briefly.
|
|
resp = client.post("/admin", data={"token": token})
|
|
if resp.status_code == 429:
|
|
raise RuntimeError("vaultwarden rate limited")
|
|
resp.raise_for_status()
|
|
|
|
_ADMIN_SESSION = client
|
|
_ADMIN_SESSION_BASE_URL = base_url
|
|
_ADMIN_SESSION_EXPIRES_AT = now + float(settings.VAULTWARDEN_ADMIN_SESSION_TTL_SEC)
|
|
return client
|
|
|
|
|
|
def invite_user(email: str) -> VaultwardenInvite:
|
|
email = (email or "").strip()
|
|
if not email or "@" not in email:
|
|
return VaultwardenInvite(ok=False, status="invalid_email", detail="email invalid")
|
|
|
|
# Prefer the service name when it works; fall back to pod IP because the Service can be misconfigured.
|
|
base_url = f"http://{settings.VAULTWARDEN_SERVICE_HOST}"
|
|
fallback_url = ""
|
|
try:
|
|
pod_ip = _k8s_find_pod_ip(settings.VAULTWARDEN_NAMESPACE, settings.VAULTWARDEN_POD_LABEL)
|
|
fallback_url = f"http://{pod_ip}:{settings.VAULTWARDEN_POD_PORT}"
|
|
except Exception:
|
|
fallback_url = ""
|
|
|
|
last_error = ""
|
|
for candidate in [base_url, fallback_url]:
|
|
if not candidate:
|
|
continue
|
|
try:
|
|
session = _admin_session(candidate)
|
|
resp = session.post("/admin/invite", json={"email": email})
|
|
if resp.status_code == 429:
|
|
return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited")
|
|
|
|
if resp.status_code in {200, 201, 204}:
|
|
return VaultwardenInvite(ok=True, status="invited", detail="invite created")
|
|
|
|
# Treat "already exists/invited" as success for idempotency.
|
|
body = ""
|
|
try:
|
|
body = resp.text or ""
|
|
except Exception:
|
|
body = ""
|
|
if resp.status_code in {400, 409} and any(
|
|
marker in body.lower()
|
|
for marker in (
|
|
"already invited",
|
|
"already exists",
|
|
"already registered",
|
|
"user already exists",
|
|
)
|
|
):
|
|
return VaultwardenInvite(ok=True, status="already_present", detail="user already present")
|
|
|
|
last_error = f"status {resp.status_code}"
|
|
except Exception as exc:
|
|
last_error = str(exc)
|
|
continue
|
|
|
|
return VaultwardenInvite(ok=False, status="error", detail=last_error or "failed to invite")
|
|
|