fix: expose rotation checks and nextcloud summary

This commit is contained in:
Brad Stein 2026-01-23 18:22:45 -03:00
parent cd7a5c66e0
commit e9b5f32ef4
6 changed files with 186 additions and 4 deletions

View File

@ -778,6 +778,40 @@ def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONRes
return _run_password_reset(request)
@app.post("/api/account/firefly/rotation/check")
def firefly_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.firefly_rotation_check"):
result = firefly.check_rotation_for_user(username)
if result.get("status") == "error":
raise HTTPException(status_code=502, detail=result.get("detail") or "firefly rotation check failed")
return JSONResponse(result)
@app.post("/api/account/wger/rotation/check")
def wger_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
with task_context("account.wger_rotation_check"):
result = wger.check_rotation_for_user(username)
if result.get("status") == "error":
raise HTTPException(status_code=502, detail=result.get("detail") or "wger rotation check failed")
return JSONResponse(result)
@app.post("/api/account/nextcloud/mail/sync")
async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)

View File

@ -470,6 +470,31 @@ class FireflyService:
settings.firefly_container,
)
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
username = (username or "").strip()
if not username:
return {"status": "error", "detail": "missing username"}
if not keycloak_admin.ready():
return {"status": "error", "detail": "keycloak admin not configured"}
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
return {"status": "error", "detail": "user not found"}
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
return {"status": "error", "detail": "missing user id"}
full = keycloak_admin.get_user(user_id)
prepared = _build_sync_input(full)
if isinstance(prepared, UserSyncOutcome):
return {"status": prepared.status, "detail": prepared.detail or ""}
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
return {"status": "ok", "rotated": True}
if outcome.status == "skipped":
return {"status": "ok", "rotated": False}
return {"status": "error", "detail": outcome.detail or "rotation check failed"}
def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]:
email = (email or "").strip()
if not email:

View File

@ -531,6 +531,15 @@ class NextcloudService:
self._sync_user_mail(user, counters)
summary = counters.summary()
summary_payload = {
"processed": summary.processed,
"created": summary.created,
"updated": summary.updated,
"deleted": summary.deleted,
"skipped": summary.skipped,
"failures": summary.failures,
"detail": summary.detail,
}
logger.info(
"nextcloud mail sync finished",
@ -547,7 +556,7 @@ class NextcloudService:
},
)
return {"status": counters.status(), "summary": summary, "detail": summary.detail}
return {"status": counters.status(), "summary": summary_payload, "detail": summary.detail}
def _run_shell(self, script: str, check: bool = True) -> None:
self._executor.exec(

View File

@ -429,6 +429,31 @@ class WgerService:
settings.wger_container,
)
def check_rotation_for_user(self, username: str) -> dict[str, Any]:
username = (username or "").strip()
if not username:
return {"status": "error", "detail": "missing username"}
if not keycloak_admin.ready():
return {"status": "error", "detail": "keycloak admin not configured"}
user = keycloak_admin.find_user(username)
if not isinstance(user, dict):
return {"status": "error", "detail": "user not found"}
user_id = user.get("id") if isinstance(user.get("id"), str) else ""
if not user_id:
return {"status": "error", "detail": "missing user id"}
full = keycloak_admin.get_user(user_id)
prepared = _build_sync_input(full)
if isinstance(prepared, UserSyncOutcome):
return {"status": prepared.status, "detail": prepared.detail or ""}
outcome = self._rotation_outcome(prepared)
if outcome.status == "synced":
return {"status": "ok", "rotated": True}
if outcome.status == "skipped":
return {"status": "ok", "rotated": False}
return {"status": "error", "detail": outcome.detail or "rotation check failed"}
def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
username = (username or "").strip()
if not username:

View File

@ -72,7 +72,7 @@ def test_nextcloud_sync_mail_create(monkeypatch) -> None:
result = svc.sync_mail()
assert result["status"] == "ok"
summary = result["summary"]
assert summary.created == 1
assert summary["created"] == 1
assert any("mail:account:create" in call[0] for call in occ_calls)
@ -153,8 +153,8 @@ def test_nextcloud_sync_mail_update_and_delete(monkeypatch) -> None:
result = svc.sync_mail()
assert result["status"] == "ok"
summary = result["summary"]
assert summary.updated == 1
assert summary.deleted == 1
assert summary["updated"] == 1
assert summary["deleted"] == 1
assert any("mail:account:update" in call[0] for call in occ_calls)

View File

@ -81,6 +81,49 @@ def test_nextcloud_sync_mail_no_user(monkeypatch) -> None:
assert result["status"] == "ok"
def test_firefly_check_rotation_marks_rotated(monkeypatch) -> None:
dummy = types.SimpleNamespace(
firefly_namespace="finance",
firefly_pod_label="app=firefly",
firefly_container="firefly",
firefly_user_sync_wait_timeout_sec=60.0,
mailu_domain="bstein.dev",
)
monkeypatch.setattr("ariadne.services.firefly.settings", dummy)
calls: list[tuple[str, str, str]] = []
class DummyAdmin:
def ready(self) -> bool:
return True
def find_user(self, username: str):
return {"id": "1", "username": username, "attributes": {}}
def get_user(self, user_id: str):
return {
"id": user_id,
"username": "alice",
"attributes": {
"mailu_email": ["alice@bstein.dev"],
"firefly_password": ["pw"],
},
}
def set_user_attribute(self, username: str, key: str, value: str) -> None:
calls.append((username, key, value))
monkeypatch.setattr("ariadne.services.firefly.keycloak_admin", DummyAdmin())
svc = FireflyService()
monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"})
result = svc.check_rotation_for_user("alice")
assert result["status"] == "ok"
assert result["rotated"] is True
assert any(key == "firefly_password_rotated_at" for _user, key, _value in calls)
def test_wger_sync_user_exec(monkeypatch) -> None:
dummy = types.SimpleNamespace(
wger_namespace="health",
@ -186,6 +229,52 @@ def test_wger_sync_users(monkeypatch) -> None:
assert any(key == "wger_password_updated_at" for _user, key, _value in calls)
def test_wger_check_rotation_marks_rotated(monkeypatch) -> None:
dummy = types.SimpleNamespace(
wger_namespace="health",
wger_user_sync_wait_timeout_sec=60.0,
wger_pod_label="app=wger",
wger_container="wger",
wger_admin_username="admin",
wger_admin_password="pw",
wger_admin_email="admin@bstein.dev",
mailu_domain="bstein.dev",
)
monkeypatch.setattr("ariadne.services.wger.settings", dummy)
calls: list[tuple[str, str, str]] = []
class DummyAdmin:
def ready(self) -> bool:
return True
def find_user(self, username: str):
return {"id": "1", "username": username, "attributes": {}}
def get_user(self, user_id: str):
return {
"id": user_id,
"username": "alice",
"attributes": {
"mailu_email": ["alice@bstein.dev"],
"wger_password": ["pw"],
},
}
def set_user_attribute(self, username: str, key: str, value: str) -> None:
calls.append((username, key, value))
monkeypatch.setattr("ariadne.services.wger.keycloak_admin", DummyAdmin())
svc = WgerService()
monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"})
result = svc.check_rotation_for_user("alice")
assert result["status"] == "ok"
assert result["rotated"] is True
assert any(key == "wger_password_rotated_at" for _user, key, _value in calls)
def test_wger_sync_marks_rotated(monkeypatch) -> None:
dummy = types.SimpleNamespace(
wger_namespace="health",