ariadne/ariadne/app.py

951 lines
33 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import json
import threading
from typing import Any, Callable
from fastapi import Body, Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from .auth.keycloak import AuthContext, authenticator
from .db.database import Database, DatabaseConfig
from .db.storage import Storage, TaskRunRecord
from .manager.provisioning import ProvisioningManager
from .metrics.metrics import record_task_run
from .scheduler.cron import CronScheduler
from .services.comms import comms
from .services.firefly import firefly
from .services.keycloak_admin import keycloak_admin
from .services.keycloak_profile import run_profile_sync
from .services.mailu import mailu
from .services.mailu_events import mailu_events
from .services.nextcloud import nextcloud
from .services.image_sweeper import image_sweeper
from .services.opensearch_prune import prune_indices
from .services.pod_cleaner import clean_finished_pods
from .services.vaultwarden_sync import run_vaultwarden_sync
from .services.vault import vault
from .services.wger import wger
from .settings import settings
from .utils.errors import safe_error_detail
from .utils.http import extract_bearer_token
from .utils.logging import LogConfig, configure_logging, get_logger, task_context
from .utils.passwords import random_password
configure_logging(LogConfig(level=settings.log_level))
logger = get_logger(__name__)
@dataclass(frozen=True)
class AccountTaskContext:
task_name: str
username: str
started: datetime
extra: dict[str, Any] | None = None
@dataclass(frozen=True)
class PasswordResetRequest:
task_name: str
service_label: str
username: str
mailu_email: str
password: str
sync_fn: Callable[[], dict[str, Any]]
password_attr: str
updated_attr: str
error_hint: str
portal_db = Database(
settings.portal_database_url,
DatabaseConfig(
pool_min=settings.ariadne_db_pool_min,
pool_max=settings.ariadne_db_pool_max,
connect_timeout_sec=settings.ariadne_db_connect_timeout_sec,
lock_timeout_sec=settings.ariadne_db_lock_timeout_sec,
statement_timeout_sec=settings.ariadne_db_statement_timeout_sec,
idle_in_tx_timeout_sec=settings.ariadne_db_idle_in_tx_timeout_sec,
application_name="ariadne_portal",
),
)
ariadne_db = Database(
settings.ariadne_database_url,
DatabaseConfig(
pool_min=settings.ariadne_db_pool_min,
pool_max=settings.ariadne_db_pool_max,
connect_timeout_sec=settings.ariadne_db_connect_timeout_sec,
lock_timeout_sec=settings.ariadne_db_lock_timeout_sec,
statement_timeout_sec=settings.ariadne_db_statement_timeout_sec,
idle_in_tx_timeout_sec=settings.ariadne_db_idle_in_tx_timeout_sec,
application_name="ariadne",
),
)
storage = Storage(ariadne_db, portal_db)
provisioning = ProvisioningManager(portal_db, storage)
scheduler = CronScheduler(storage, settings.schedule_tick_sec)
def _record_event(event_type: str, detail: dict[str, Any] | str | None) -> None:
try:
storage.record_event(event_type, detail)
except Exception:
pass
def _parse_event_detail(detail: str | None) -> Any:
if not isinstance(detail, str) or not detail:
return ""
try:
return json.loads(detail)
except Exception:
return detail
app = FastAPI(title=settings.app_name)
def _require_auth(request: Request) -> AuthContext:
token = extract_bearer_token(request)
if not token:
raise HTTPException(status_code=401, detail="missing bearer token")
try:
return authenticator.authenticate(token)
except Exception:
raise HTTPException(status_code=401, detail="invalid token")
def _require_admin(ctx: AuthContext) -> None:
if ctx.username and ctx.username in settings.portal_admin_users:
return
if settings.portal_admin_groups and set(ctx.groups).intersection(settings.portal_admin_groups):
return
raise HTTPException(status_code=403, detail="forbidden")
def _require_account_access(ctx: AuthContext) -> None:
if not settings.account_allowed_groups:
return
if not ctx.groups:
return
if set(ctx.groups).intersection(settings.account_allowed_groups):
return
raise HTTPException(status_code=403, detail="forbidden")
async def _read_json_payload(request: Request) -> dict[str, Any]:
try:
payload = await request.json()
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _note_from_payload(payload: dict[str, Any]) -> str | None:
note = payload.get("note") if isinstance(payload, dict) else None
return str(note).strip() if isinstance(note, str) and note.strip() else None
def _flags_from_payload(payload: dict[str, Any]) -> list[str]:
flags_raw = payload.get("flags") if isinstance(payload, dict) else None
return [flag for flag in flags_raw if isinstance(flag, str)] if isinstance(flags_raw, list) else []
def _allowed_flag_groups() -> list[str]:
if not keycloak_admin.ready():
return settings.allowed_flag_groups
try:
return keycloak_admin.list_group_names(exclude={"admin"})
except Exception:
return settings.allowed_flag_groups
def _resolve_mailu_email(username: str) -> str:
mailu_email = f"{username}@{settings.mailu_domain}"
try:
user = keycloak_admin.find_user(username) or {}
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
return str(raw_mailu[0])
if isinstance(raw_mailu, str) and raw_mailu:
return raw_mailu
except Exception:
return mailu_email
return mailu_email
def _record_account_task(ctx: AccountTaskContext, status: str, error_detail: str) -> None:
finished = datetime.now(timezone.utc)
duration_sec = (finished - ctx.started).total_seconds()
record_task_run(ctx.task_name, status, duration_sec)
try:
storage.record_task_run(
TaskRunRecord(
request_code=None,
task=ctx.task_name,
status=status,
detail=error_detail or None,
started_at=ctx.started,
finished_at=finished,
duration_ms=int(duration_sec * 1000),
)
)
except Exception:
pass
detail = {"username": ctx.username, "status": status, "error": error_detail}
if ctx.extra:
detail.update(ctx.extra)
_record_event(ctx.task_name, detail)
def _run_password_reset(request: PasswordResetRequest) -> JSONResponse:
started = datetime.now(timezone.utc)
task_ctx = AccountTaskContext(
task_name=request.task_name,
username=request.username,
started=started,
extra={"mailu_email": request.mailu_email},
)
status = "ok"
error_detail = ""
logger.info(
f"{request.service_label} password reset requested",
extra={"event": request.task_name, "username": request.username},
)
try:
result = request.sync_fn()
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"{request.service_label} sync {status_val}")
keycloak_admin.set_user_attribute(
request.username,
request.password_attr,
request.password,
)
keycloak_admin.set_user_attribute(
request.username,
request.updated_attr,
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
)
logger.info(
f"{request.service_label} password reset completed",
extra={"event": request.task_name, "username": request.username},
)
return JSONResponse({"status": "ok", "password": request.password})
except HTTPException as exc:
status = "error"
error_detail = str(exc.detail)
raise
except Exception as exc:
status = "error"
error_detail = safe_error_detail(exc, request.error_hint)
raise HTTPException(status_code=502, detail=error_detail)
finally:
_record_account_task(task_ctx, status, error_detail)
@app.on_event("startup")
def _startup() -> None:
provisioning.start()
scheduler.add_task("schedule.mailu_sync", settings.mailu_sync_cron, lambda: mailu.sync("ariadne_schedule"))
scheduler.add_task(
"schedule.nextcloud_sync",
settings.nextcloud_sync_cron,
lambda: nextcloud.sync_mail(wait=False),
)
scheduler.add_task(
"schedule.nextcloud_cron",
settings.nextcloud_cron,
lambda: nextcloud.run_cron(),
)
scheduler.add_task(
"schedule.nextcloud_maintenance",
settings.nextcloud_maintenance_cron,
lambda: nextcloud.run_maintenance(),
)
scheduler.add_task("schedule.vaultwarden_sync", settings.vaultwarden_sync_cron, run_vaultwarden_sync)
scheduler.add_task(
"schedule.keycloak_profile",
settings.keycloak_profile_cron,
run_profile_sync,
)
scheduler.add_task(
"schedule.wger_user_sync",
settings.wger_user_sync_cron,
lambda: wger.sync_users(),
)
scheduler.add_task("schedule.wger_admin", settings.wger_admin_cron, lambda: wger.ensure_admin(wait=False))
scheduler.add_task(
"schedule.firefly_user_sync",
settings.firefly_user_sync_cron,
lambda: firefly.sync_users(),
)
scheduler.add_task(
"schedule.firefly_cron",
settings.firefly_cron,
lambda: firefly.run_cron(),
)
scheduler.add_task(
"schedule.pod_cleaner",
settings.pod_cleaner_cron,
clean_finished_pods,
)
scheduler.add_task(
"schedule.opensearch_prune",
settings.opensearch_prune_cron,
prune_indices,
)
scheduler.add_task(
"schedule.image_sweeper",
settings.image_sweeper_cron,
lambda: image_sweeper.run(wait=True),
)
scheduler.add_task(
"schedule.vault_k8s_auth",
settings.vault_k8s_auth_cron,
lambda: vault.sync_k8s_auth(wait=True),
)
scheduler.add_task(
"schedule.vault_oidc",
settings.vault_oidc_cron,
lambda: vault.sync_oidc(wait=True),
)
scheduler.add_task(
"schedule.comms_guest_name",
settings.comms_guest_name_cron,
lambda: comms.run_guest_name_randomizer(wait=True),
)
scheduler.add_task(
"schedule.comms_pin_invite",
settings.comms_pin_invite_cron,
lambda: comms.run_pin_invite(wait=True),
)
scheduler.add_task(
"schedule.comms_reset_room",
settings.comms_reset_room_cron,
lambda: comms.run_reset_room(wait=True),
)
scheduler.add_task(
"schedule.comms_seed_room",
settings.comms_seed_room_cron,
lambda: comms.run_seed_room(wait=True),
)
scheduler.start()
logger.info(
"ariadne started",
extra={
"event": "startup",
"mailu_cron": settings.mailu_sync_cron,
"nextcloud_mail_cron": settings.nextcloud_sync_cron,
"nextcloud_cron": settings.nextcloud_cron,
"nextcloud_maintenance_cron": settings.nextcloud_maintenance_cron,
"vaultwarden_cron": settings.vaultwarden_sync_cron,
"wger_user_sync_cron": settings.wger_user_sync_cron,
"wger_admin_cron": settings.wger_admin_cron,
"firefly_user_sync_cron": settings.firefly_user_sync_cron,
"firefly_cron": settings.firefly_cron,
"pod_cleaner_cron": settings.pod_cleaner_cron,
"opensearch_prune_cron": settings.opensearch_prune_cron,
"image_sweeper_cron": settings.image_sweeper_cron,
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
"vault_oidc_cron": settings.vault_oidc_cron,
"comms_guest_name_cron": settings.comms_guest_name_cron,
"comms_pin_invite_cron": settings.comms_pin_invite_cron,
"comms_reset_room_cron": settings.comms_reset_room_cron,
"comms_seed_room_cron": settings.comms_seed_room_cron,
"keycloak_profile_cron": settings.keycloak_profile_cron,
},
)
@app.on_event("shutdown")
def _shutdown() -> None:
scheduler.stop()
provisioning.stop()
portal_db.close()
ariadne_db.close()
logger.info("ariadne stopped", extra={"event": "shutdown"})
@app.get("/health")
def health() -> dict[str, Any]:
return {"ok": True}
@app.get(settings.metrics_path)
def metrics() -> Response:
payload = generate_latest()
return Response(payload, media_type=CONTENT_TYPE_LATEST)
@app.get("/api/admin/access/requests")
def list_access_requests(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_admin(ctx)
logger.info(
"list access requests",
extra={"event": "access_requests_list", "actor": ctx.username or ""},
)
try:
rows = storage.list_pending_requests()
except Exception:
raise HTTPException(status_code=502, detail="failed to load requests")
output: list[dict[str, Any]] = []
for row in rows:
created_at = row.get("created_at")
output.append(
{
"id": row.get("request_code"),
"username": row.get("username"),
"email": row.get("contact_email") or "",
"first_name": row.get("first_name") or "",
"last_name": row.get("last_name") or "",
"request_code": row.get("request_code"),
"created_at": created_at.isoformat() if isinstance(created_at, datetime) else "",
"note": row.get("note") or "",
}
)
return JSONResponse({"requests": output})
@app.get("/api/admin/access/flags")
def list_access_flags(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_admin(ctx)
flags = settings.allowed_flag_groups
if keycloak_admin.ready():
try:
flags = keycloak_admin.list_group_names(exclude={"admin"})
except Exception:
flags = settings.allowed_flag_groups
return JSONResponse({"flags": flags})
@app.get("/api/admin/audit/events")
def list_audit_events(
limit: int = 200,
event_type: str | None = None,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
try:
rows = storage.list_events(limit=limit, event_type=event_type)
except Exception:
raise HTTPException(status_code=502, detail="failed to load audit events")
output: list[dict[str, Any]] = []
for row in rows:
created_at = row.get("created_at")
output.append(
{
"id": row.get("id"),
"event_type": row.get("event_type"),
"detail": _parse_event_detail(row.get("detail")),
"created_at": created_at.isoformat() if isinstance(created_at, datetime) else "",
}
)
return JSONResponse({"events": output})
@app.get("/api/admin/audit/task-runs")
def list_audit_task_runs(
limit: int = 200,
request_code: str | None = None,
task: str | None = None,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
try:
rows = storage.list_task_runs(limit=limit, request_code=request_code, task=task)
except Exception:
raise HTTPException(status_code=502, detail="failed to load task runs")
output: list[dict[str, Any]] = []
for row in rows:
started_at = row.get("started_at")
finished_at = row.get("finished_at")
output.append(
{
"id": row.get("id"),
"request_code": row.get("request_code") or "",
"task": row.get("task") or "",
"status": row.get("status") or "",
"detail": _parse_event_detail(row.get("detail")),
"started_at": started_at.isoformat() if isinstance(started_at, datetime) else "",
"finished_at": finished_at.isoformat() if isinstance(finished_at, datetime) else "",
"duration_ms": row.get("duration_ms"),
}
)
return JSONResponse({"task_runs": output})
@app.post("/api/admin/access/requests/{username}/approve")
async def approve_access_request(
username: str,
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
with task_context("admin.access.approve"):
payload = await _read_json_payload(request)
allowed_flags = _allowed_flag_groups()
flags = [flag for flag in _flags_from_payload(payload) if flag in allowed_flags]
note = _note_from_payload(payload)
decided_by = ctx.username or ""
try:
row = portal_db.fetchone(
"""
UPDATE access_requests
SET status = 'approved',
decided_at = NOW(),
decided_by = %s,
approval_flags = %s,
approval_note = %s
WHERE username = %s
AND status = 'pending'
AND email_verified_at IS NOT NULL
RETURNING request_code
""",
(decided_by or None, flags or None, note, username),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to approve request")
if not row:
logger.info(
"access request approval ignored",
extra={"event": "access_request_approve", "actor": decided_by, "username": username, "status": "skipped"},
)
_record_event(
"access_request_approve",
{
"actor": decided_by,
"username": username,
"status": "skipped",
},
)
return JSONResponse({"ok": True, "request_code": ""})
request_code = row.get("request_code") or ""
if request_code:
threading.Thread(
target=provisioning.provision_access_request,
args=(request_code,),
daemon=True,
).start()
logger.info(
"access request approved",
extra={
"event": "access_request_approve",
"actor": decided_by,
"username": username,
"request_code": request_code,
},
)
_record_event(
"access_request_approve",
{
"actor": decided_by,
"username": username,
"request_code": request_code,
"status": "ok",
"flags": flags,
"note": note or "",
},
)
return JSONResponse({"ok": True, "request_code": request_code})
@app.post("/api/admin/access/requests/{username}/deny")
async def deny_access_request(
username: str,
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
with task_context("admin.access.deny"):
payload = await _read_json_payload(request)
note = _note_from_payload(payload)
decided_by = ctx.username or ""
try:
row = portal_db.fetchone(
"""
UPDATE access_requests
SET status = 'denied',
decided_at = NOW(),
decided_by = %s,
denial_note = %s
WHERE username = %s AND status = 'pending'
RETURNING request_code
""",
(decided_by or None, note, username),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to deny request")
if not row:
logger.info(
"access request denial ignored",
extra={"event": "access_request_deny", "actor": decided_by, "username": username, "status": "skipped"},
)
_record_event(
"access_request_deny",
{
"actor": decided_by,
"username": username,
"status": "skipped",
},
)
return JSONResponse({"ok": True, "request_code": ""})
logger.info(
"access request denied",
extra={
"event": "access_request_deny",
"actor": decided_by,
"username": username,
"request_code": row.get("request_code") or "",
},
)
_record_event(
"access_request_deny",
{
"actor": decided_by,
"username": username,
"request_code": row.get("request_code") or "",
"status": "ok",
"note": note or "",
},
)
return JSONResponse({"ok": True, "request_code": row.get("request_code")})
@app.post("/api/access/requests/{request_code}/retry")
def retry_access_request(request_code: str) -> JSONResponse:
code = (request_code or "").strip()
if not code:
raise HTTPException(status_code=400, detail="request_code is required")
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
try:
row = portal_db.fetchone(
"SELECT status FROM access_requests WHERE request_code = %s",
(code,),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to load request")
if not row:
raise HTTPException(status_code=404, detail="not found")
status = (row.get("status") or "").strip()
if status not in {"accounts_building", "approved"}:
raise HTTPException(status_code=409, detail="request not retryable")
try:
portal_db.execute(
"UPDATE access_requests SET provision_attempted_at = NULL WHERE request_code = %s",
(code,),
)
portal_db.execute(
"""
UPDATE access_request_tasks
SET status = 'pending',
detail = 'retry requested',
updated_at = NOW()
WHERE request_code = %s AND status = 'error'
""",
(code,),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to update retry state")
threading.Thread(
target=provisioning.provision_access_request,
args=(code,),
daemon=True,
).start()
_record_event(
"access_request_retry",
{
"request_code": code,
"status": "ok",
},
)
return JSONResponse({"ok": True, "request_code": code})
@app.post("/api/account/mailu/rotate")
def rotate_mailu_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.mailu_rotate"):
started = datetime.now(timezone.utc)
status = "ok"
error_detail = ""
sync_enabled = mailu.ready()
sync_ok = False
sync_error = ""
nextcloud_sync: dict[str, Any] = {"status": "skipped"}
logger.info(
"mailu password rotate requested",
extra={"event": "mailu_rotate", "username": username},
)
try:
password = random_password()
keycloak_admin.set_user_attribute(username, "mailu_app_password", password)
if sync_enabled:
try:
mailu.sync("ariadne_mailu_rotate")
sync_ok = True
except Exception as exc:
sync_error = safe_error_detail(exc, "sync request failed")
try:
nextcloud_sync = nextcloud.sync_mail(username, wait=True)
except Exception as exc:
nextcloud_sync = {"status": "error", "detail": safe_error_detail(exc, "failed to sync nextcloud")}
logger.info(
"mailu password rotate completed",
extra={
"event": "mailu_rotate",
"username": username,
"sync_enabled": sync_enabled,
"sync_ok": sync_ok,
"nextcloud_status": nextcloud_sync.get("status") if isinstance(nextcloud_sync, dict) else "",
},
)
return JSONResponse(
{
"password": password,
"sync_enabled": sync_enabled,
"sync_ok": sync_ok,
"sync_error": sync_error,
"nextcloud_sync": nextcloud_sync,
}
)
except HTTPException as exc:
status = "error"
error_detail = str(exc.detail)
raise
except Exception as exc:
status = "error"
error_detail = safe_error_detail(exc, "mailu rotate failed")
raise HTTPException(status_code=502, detail=error_detail)
finally:
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
record_task_run("mailu_rotate", status, duration_sec)
try:
storage.record_task_run(
TaskRunRecord(
request_code=None,
task="mailu_rotate",
status=status,
detail=error_detail or None,
started_at=started,
finished_at=finished,
duration_ms=int(duration_sec * 1000),
)
)
except Exception:
pass
_record_event(
"mailu_rotate",
{
"username": username,
"status": status,
"sync_enabled": sync_enabled,
"sync_ok": sync_ok,
"nextcloud_status": nextcloud_sync.get("status") if isinstance(nextcloud_sync, dict) else "",
"error": error_detail,
},
)
@app.post("/api/account/wger/reset")
def reset_wger_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.wger_reset"):
mailu_email = _resolve_mailu_email(username)
password = random_password()
request = PasswordResetRequest(
task_name="wger_reset",
service_label="wger",
username=username,
mailu_email=mailu_email,
password=password,
sync_fn=lambda: wger.sync_user(username, mailu_email, password, wait=True),
password_attr="wger_password",
updated_attr="wger_password_updated_at",
error_hint="wger sync failed",
)
return _run_password_reset(request)
@app.post("/api/account/firefly/reset")
def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.firefly_reset"):
mailu_email = _resolve_mailu_email(username)
password = random_password(24)
request = PasswordResetRequest(
task_name="firefly_reset",
service_label="firefly",
username=username,
mailu_email=mailu_email,
password=password,
sync_fn=lambda: firefly.sync_user(mailu_email, password, wait=True),
password_attr="firefly_password",
updated_attr="firefly_password_updated_at",
error_hint="firefly sync failed",
)
return _run_password_reset(request)
@app.post("/api/account/firefly/rotation/check")
def firefly_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.firefly_rotation_check"):
result = firefly.check_rotation_for_user(username)
if result.get("status") == "error":
raise HTTPException(status_code=502, detail=result.get("detail") or "firefly rotation check failed")
return JSONResponse(result)
@app.post("/api/account/wger/rotation/check")
def wger_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.wger_rotation_check"):
result = wger.check_rotation_for_user(username)
if result.get("status") == "error":
raise HTTPException(status_code=502, detail=result.get("detail") or "wger rotation check failed")
return JSONResponse(result)
@app.post("/api/account/nextcloud/mail/sync")
async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.nextcloud_sync"):
try:
payload = await request.json()
except Exception:
payload = {}
wait = bool(payload.get("wait", True)) if isinstance(payload, dict) else True
started = datetime.now(timezone.utc)
status = "ok"
error_detail = ""
logger.info(
"nextcloud mail sync requested",
extra={"event": "nextcloud_sync", "username": username, "wait": wait},
)
try:
result = nextcloud.sync_mail(username, wait=wait)
logger.info(
"nextcloud mail sync completed",
extra={
"event": "nextcloud_sync",
"username": username,
"status": result.get("status") if isinstance(result, dict) else "",
},
)
return JSONResponse(result)
except HTTPException as exc:
status = "error"
error_detail = str(exc.detail)
raise
except Exception as exc:
status = "error"
error_detail = safe_error_detail(exc, "failed to sync nextcloud mail")
logger.info(
"nextcloud mail sync failed",
extra={"event": "nextcloud_sync", "username": username, "error": error_detail},
)
raise HTTPException(status_code=502, detail=error_detail)
finally:
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
record_task_run("nextcloud_sync", status, duration_sec)
try:
storage.record_task_run(
TaskRunRecord(
request_code=None,
task="nextcloud_sync",
status=status,
detail=error_detail or None,
started_at=started,
finished_at=finished,
duration_ms=int(duration_sec * 1000),
)
)
except Exception:
pass
_record_event(
"nextcloud_sync",
{
"username": username,
"status": status,
"wait": wait,
"error": error_detail,
},
)
@app.post("/events")
def mailu_event_listener(payload: dict[str, Any] | None = Body(default=None)) -> Response:
status_code, response = mailu_events.handle_event(payload)
return JSONResponse(response, status_code=status_code)