from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import time from typing import Any import psycopg from passlib.hash import bcrypt_sha256 from ..settings import settings from ..utils.logging import get_logger from ..utils.passwords import random_password from .keycloak_admin import keycloak_admin logger = get_logger(__name__) MAILU_ENABLED_ATTR = "mailu_enabled" MAILU_EMAIL_ATTR = "mailu_email" MAILU_APP_PASSWORD_ATTR = "mailu_app_password" BCRYPT_MAX_PASSWORD_BYTES = 72 @dataclass(frozen=True) class MailuSyncSummary: processed: int updated: int skipped: int failures: int mailboxes: int system_mailboxes: int detail: str = "" @dataclass(frozen=True) class MailuUserSyncResult: processed: int = 0 updated: int = 0 skipped: int = 0 failures: int = 0 mailboxes: int = 0 @dataclass(frozen=True) class MailuSyncContext: username: str user_id: str mailu_email: str app_password: str updated: int display_name: str class PasswordTooLongError(RuntimeError): pass @dataclass class MailuSyncCounters: processed: int = 0 updated: int = 0 skipped: int = 0 failures: int = 0 mailboxes: int = 0 system_mailboxes: int = 0 def add(self, result: MailuUserSyncResult) -> None: self.processed += result.processed self.updated += result.updated self.skipped += result.skipped self.failures += result.failures self.mailboxes += result.mailboxes def summary(self) -> MailuSyncSummary: return MailuSyncSummary( processed=self.processed, updated=self.updated, skipped=self.skipped, failures=self.failures, mailboxes=self.mailboxes, system_mailboxes=self.system_mailboxes, ) def _extract_attr(attrs: Any, key: str) -> str | None: if not isinstance(attrs, dict): return None raw = attrs.get(key) if isinstance(raw, list): for item in raw: if isinstance(item, str) and item.strip(): return item.strip() return None if isinstance(raw, str) and raw.strip(): return raw.strip() return None def _display_name(user: dict[str, Any]) -> str: parts = [] for key in ("firstName", "lastName"): value = user.get(key) if isinstance(value, str) and value.strip(): parts.append(value.strip()) return " ".join(parts) def _domain_matches(email: str) -> bool: return email.lower().endswith(f"@{settings.mailu_domain.lower()}") def _password_too_long(password: str) -> bool: return len(password.encode("utf-8")) > BCRYPT_MAX_PASSWORD_BYTES class MailuService: def __init__(self) -> None: self._db_config = { "host": settings.mailu_db_host, "port": settings.mailu_db_port, "dbname": settings.mailu_db_name, "user": settings.mailu_db_user, "password": settings.mailu_db_password, } def _connect(self) -> psycopg.Connection: return psycopg.connect(**self._db_config) def ready(self) -> bool: return bool( settings.mailu_db_host and settings.mailu_db_name and settings.mailu_db_user and settings.mailu_db_password ) @staticmethod def resolve_mailu_email( username: str, attributes: dict[str, Any] | None, fallback_email: str = "", ) -> str: attrs = attributes or {} explicit = _extract_attr(attrs, MAILU_EMAIL_ATTR) if explicit: return explicit if fallback_email and fallback_email.lower().endswith(f"@{settings.mailu_domain.lower()}"): return fallback_email return f"{username}@{settings.mailu_domain}" def _mailu_enabled(self, attrs: dict[str, Any], updates: dict[str, list[str]]) -> bool: raw = _extract_attr(attrs, MAILU_ENABLED_ATTR) if raw is None: updates[MAILU_ENABLED_ATTR] = ["true"] return True return raw.strip().lower() in {"1", "true", "yes", "y", "on"} @staticmethod def _username(user: dict[str, Any]) -> str: return user.get("username") if isinstance(user.get("username"), str) else "" @staticmethod def _user_id(user: dict[str, Any]) -> str: return user.get("id") if isinstance(user.get("id"), str) else "" @staticmethod def _is_service_account(user: dict[str, Any], username: str) -> bool: return bool(user.get("serviceAccountClientId") or username.startswith("service-account-")) @staticmethod def _log_sync_error(username: str, detail: str) -> None: logger.info( "mailu sync error", extra={ "event": "mailu_sync", "status": "error", "detail": detail, "username": username, }, ) def _prepare_updates( self, username: str, attrs: dict[str, Any], mailu_email: str, ) -> tuple[bool, dict[str, list[str]], str]: updates: dict[str, list[str]] = {} if not _extract_attr(attrs, MAILU_EMAIL_ATTR): updates[MAILU_EMAIL_ATTR] = [mailu_email] enabled = self._mailu_enabled(attrs, updates) app_password = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR) if not app_password: app_password = random_password(24) updates[MAILU_APP_PASSWORD_ATTR] = [app_password] elif _password_too_long(app_password): app_password = random_password(24) updates[MAILU_APP_PASSWORD_ATTR] = [app_password] logger.info( "mailu app password rotated", extra={ "event": "mailu_sync", "status": "updated", "detail": "app password exceeded bcrypt limit", "username": username, }, ) return enabled, updates, app_password def _apply_updates(self, user_id: str, updates: dict[str, list[str]], username: str) -> bool: if not updates: return True try: keycloak_admin.update_user_safe(user_id, {"attributes": updates}) except Exception as exc: self._log_sync_error(username, str(exc)) return False return True def _should_skip_user(self, user: dict[str, Any], username: str) -> bool: if not username or user.get("enabled") is False: return True return self._is_service_account(user, username) def _build_sync_context( self, user: dict[str, Any], ) -> tuple[MailuSyncContext | None, MailuUserSyncResult | None]: username = self._username(user) if self._should_skip_user(user, username): return None, MailuUserSyncResult(skipped=1) user_id = self._user_id(user) if not user_id: return None, MailuUserSyncResult(failures=1) attrs = user.get("attributes") if not isinstance(attrs, dict): attrs = {} mailu_email = self.resolve_mailu_email( username, attrs, user.get("email") if isinstance(user.get("email"), str) else "", ) enabled, updates, app_password = self._prepare_updates(username, attrs, mailu_email) if not enabled: return None, MailuUserSyncResult(skipped=1) if not self._apply_updates(user_id, updates, username): return None, MailuUserSyncResult(failures=1) updated = 1 if updates else 0 display_name = _display_name(user) return ( MailuSyncContext( username=username, user_id=user_id, mailu_email=mailu_email, app_password=app_password, updated=updated, display_name=display_name, ), None, ) def _ensure_mailbox_with_retry( self, conn: psycopg.Connection, ctx: MailuSyncContext, ) -> tuple[bool, bool, bool]: mailbox_ok = False rotated = False failed = False try: mailbox_ok = self._ensure_mailbox(conn, ctx.mailu_email, ctx.app_password, ctx.display_name) except PasswordTooLongError: rotated = True app_password = random_password(24) try: keycloak_admin.set_user_attribute(ctx.username, MAILU_APP_PASSWORD_ATTR, app_password) logger.info( "mailu app password rotated", extra={ "event": "mailu_sync", "status": "updated", "detail": "app password exceeded bcrypt limit", "username": ctx.username, }, ) mailbox_ok = self._ensure_mailbox(conn, ctx.mailu_email, app_password, ctx.display_name) except Exception as retry_exc: self._log_sync_error(ctx.username, str(retry_exc)) failed = True except Exception as exc: self._log_sync_error(ctx.username, str(exc)) failed = True return mailbox_ok, failed, rotated @staticmethod def _build_sync_result( updated: int, mailbox_ok: bool, failed: bool, rotated: bool, ) -> MailuUserSyncResult: if failed: return MailuUserSyncResult(failures=1, updated=updated) if mailbox_ok: return MailuUserSyncResult(processed=1, updated=updated, mailboxes=1) if rotated: return MailuUserSyncResult(skipped=1, updated=max(updated, 1)) return MailuUserSyncResult(skipped=1, updated=updated) def _sync_user(self, conn: psycopg.Connection, user: dict[str, Any]) -> MailuUserSyncResult: ctx, early = self._build_sync_context(user) if early is not None: return early mailbox_ok, failed, rotated = self._ensure_mailbox_with_retry(conn, ctx) return self._build_sync_result(ctx.updated, mailbox_ok, failed, rotated) def _ensure_mailbox( self, conn: psycopg.Connection, email: str, password: str, display_name: str, ) -> bool: email = (email or "").strip() if not email or "@" not in email: return False if not _domain_matches(email): return False if _password_too_long(password): raise PasswordTooLongError("mailu password exceeds bcrypt limit") localpart, domain = email.split("@", 1) try: hashed = bcrypt_sha256.hash(password) except ValueError as exc: if "password cannot be longer than 72 bytes" in str(exc): raise PasswordTooLongError(str(exc)) from exc raise now = datetime.now(timezone.utc) with conn.cursor() as cur: cur.execute( """ INSERT INTO "user" ( email, localpart, domain_name, password, quota_bytes, quota_bytes_used, global_admin, enabled, enable_imap, enable_pop, allow_spoofing, forward_enabled, forward_destination, forward_keep, reply_enabled, reply_subject, reply_body, reply_startdate, reply_enddate, displayed_name, spam_enabled, spam_mark_as_read, spam_threshold, change_pw_next_login, created_at, updated_at, comment ) VALUES ( %(email)s, %(localpart)s, %(domain)s, %(password)s, %(quota)s, 0, false, true, true, true, false, false, '', true, false, NULL, NULL, DATE '1900-01-01', DATE '2999-12-31', %(display)s, true, true, 80, false, CURRENT_DATE, %(now)s, '' ) ON CONFLICT (email) DO UPDATE SET password = EXCLUDED.password, enabled = true, updated_at = EXCLUDED.updated_at """, { "email": email, "localpart": localpart, "domain": domain, "password": hashed, "quota": settings.mailu_default_quota, "display": display_name or localpart, "now": now, }, ) return True def _ensure_system_mailboxes(self, conn: psycopg.Connection) -> int: if not settings.mailu_system_users: return 0 if not settings.mailu_system_password: logger.info( "mailu system users configured but password missing", extra={"event": "mailu_sync", "status": "error", "detail": "system password missing"}, ) return 0 if _password_too_long(settings.mailu_system_password): logger.info( "mailu system password too long", extra={"event": "mailu_sync", "status": "error", "detail": "system password exceeds bcrypt limit"}, ) return 0 ensured = 0 for email in settings.mailu_system_users: if not email: continue if self._ensure_mailbox(conn, email, settings.mailu_system_password, email.split("@")[0]): ensured += 1 return ensured def sync(self, reason: str, force: bool = False) -> MailuSyncSummary: if not keycloak_admin.ready(): raise RuntimeError("keycloak admin client not configured") if not self.ready(): raise RuntimeError("mailu database not configured") counters = MailuSyncCounters() users = keycloak_admin.iter_users(page_size=200, brief=False) with self._connect() as conn: for user in users: counters.add(self._sync_user(conn, user)) counters.system_mailboxes = self._ensure_system_mailboxes(conn) summary = counters.summary() logger.info( "mailu sync finished", extra={ "event": "mailu_sync", "status": "ok" if summary.failures == 0 else "error", "processed": summary.processed, "updated": summary.updated, "skipped": summary.skipped, "failures": summary.failures, "mailboxes": summary.mailboxes, "system_mailboxes": summary.system_mailboxes, "reason": reason, "force": force, }, ) return summary def mailbox_exists(self, email: str) -> bool: email = (email or "").strip() if not email: return False try: with self._connect() as conn: with conn.cursor() as cur: cur.execute('SELECT 1 FROM "user" WHERE email = %s LIMIT 1', (email,)) return cur.fetchone() is not None except Exception: return False def wait_for_mailbox(self, email: str, timeout_sec: float = 60.0) -> bool: deadline = time.time() + timeout_sec while time.time() < deadline: if self.mailbox_exists(email): return True time.sleep(2) return False mailu = MailuService()