ariadne/ariadne/services/vaultwarden_sync.py

218 lines
6.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import time
from typing import Any
from ..settings import settings
from ..utils.logging import get_logger
from .keycloak_admin import keycloak_admin
from .mailu import mailu
from .vaultwarden import vaultwarden
VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email"
VAULTWARDEN_STATUS_ATTR = "vaultwarden_status"
VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at"
logger = get_logger(__name__)
@dataclass(frozen=True)
class VaultwardenSyncSummary:
processed: int
created_or_present: int
skipped: int
failures: int
detail: str = ""
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _parse_synced_at(value: str) -> float | None:
value = (value or "").strip()
if not value:
return None
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"):
try:
parsed = datetime.strptime(value, fmt)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.timestamp()
except ValueError:
continue
return None
def _vaultwarden_email_for_user(user: dict[str, Any]) -> str:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
return ""
attrs = user.get("attributes")
vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR)
if vaultwarden_email:
return vaultwarden_email
mailu_email = _extract_attr(attrs, "mailu_email")
if mailu_email:
return mailu_email
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
email = email.strip()
if email and email.lower().endswith(f"@{settings.mailu_domain.lower()}"):
return email
return ""
def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
existing = _extract_attr(user.get("attributes"), key)
if existing:
return
keycloak_admin.set_user_attribute(username, key, value)
def _set_user_attribute(username: str, key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
keycloak_admin.set_user_attribute(username, key, value)
def run_vaultwarden_sync() -> VaultwardenSyncSummary:
processed = 0
created = 0
skipped = 0
failures = 0
consecutive_failures = 0
if not keycloak_admin.ready():
summary = VaultwardenSyncSummary(0, 0, 0, 1, detail="keycloak admin not configured")
logger.info(
"vaultwarden sync skipped",
extra={"event": "vaultwarden_sync", "status": "error", "detail": summary.detail},
)
return summary
users = keycloak_admin.iter_users(page_size=200, brief=False)
for user in users:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
skipped += 1
continue
enabled = user.get("enabled")
if enabled is False:
skipped += 1
continue
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
skipped += 1
continue
user_id = (user.get("id") if isinstance(user.get("id"), str) else "") or ""
full_user = user
if user_id:
try:
full_user = keycloak_admin.get_user(user_id)
except Exception:
full_user = user
current_status = _extract_attr(full_user.get("attributes"), VAULTWARDEN_STATUS_ATTR)
current_synced_at = _extract_attr(full_user.get("attributes"), VAULTWARDEN_SYNCED_AT_ATTR)
current_synced_ts = _parse_synced_at(current_synced_at)
if current_status in {"rate_limited", "error"} and current_synced_ts:
if time.time() - current_synced_ts < settings.vaultwarden_retry_cooldown_sec:
skipped += 1
continue
email = _vaultwarden_email_for_user(full_user)
if not email:
skipped += 1
continue
if not mailu.mailbox_exists(email):
skipped += 1
continue
try:
_set_user_attribute_if_missing(username, full_user, VAULTWARDEN_EMAIL_ATTR, email)
except Exception:
pass
if current_status in {"invited", "already_present"}:
if not current_synced_at:
try:
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
skipped += 1
continue
processed += 1
result = vaultwarden.invite_user(email)
if result.ok:
created += 1
consecutive_failures = 0
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
else:
failures += 1
if result.status in {"rate_limited", "error"}:
consecutive_failures += 1
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
if consecutive_failures >= settings.vaultwarden_failure_bailout:
break
summary = VaultwardenSyncSummary(processed, created, skipped, failures)
logger.info(
"vaultwarden sync finished",
extra={
"event": "vaultwarden_sync",
"status": "ok" if failures == 0 else "error",
"processed": processed,
"created_or_present": created,
"skipped": skipped,
"failures": failures,
},
)
return summary