From e9b5f32ef4bd6f7c61840196001a25c188c03dff Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 23 Jan 2026 18:22:45 -0300 Subject: [PATCH] fix: expose rotation checks and nextcloud summary --- ariadne/app.py | 34 +++++++++++++ ariadne/services/firefly.py | 25 ++++++++++ ariadne/services/nextcloud.py | 11 ++++- ariadne/services/wger.py | 25 ++++++++++ tests/test_nextcloud_sync.py | 6 +-- tests/test_services.py | 89 +++++++++++++++++++++++++++++++++++ 6 files changed, 186 insertions(+), 4 deletions(-) diff --git a/ariadne/app.py b/ariadne/app.py index a3c3ea7..04f1df1 100644 --- a/ariadne/app.py +++ b/ariadne/app.py @@ -778,6 +778,40 @@ def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONRes return _run_password_reset(request) +@app.post("/api/account/firefly/rotation/check") +def firefly_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + with task_context("account.firefly_rotation_check"): + result = firefly.check_rotation_for_user(username) + if result.get("status") == "error": + raise HTTPException(status_code=502, detail=result.get("detail") or "firefly rotation check failed") + return JSONResponse(result) + + +@app.post("/api/account/wger/rotation/check") +def wger_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + with task_context("account.wger_rotation_check"): + result = wger.check_rotation_for_user(username) + if result.get("status") == "error": + raise HTTPException(status_code=502, detail=result.get("detail") or "wger rotation check failed") + return JSONResponse(result) + + @app.post("/api/account/nextcloud/mail/sync") async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: _require_account_access(ctx) diff --git a/ariadne/services/firefly.py b/ariadne/services/firefly.py index 047729d..075609c 100644 --- a/ariadne/services/firefly.py +++ b/ariadne/services/firefly.py @@ -470,6 +470,31 @@ class FireflyService: settings.firefly_container, ) + def check_rotation_for_user(self, username: str) -> dict[str, Any]: + username = (username or "").strip() + if not username: + return {"status": "error", "detail": "missing username"} + if not keycloak_admin.ready(): + return {"status": "error", "detail": "keycloak admin not configured"} + + user = keycloak_admin.find_user(username) + if not isinstance(user, dict): + return {"status": "error", "detail": "user not found"} + user_id = user.get("id") if isinstance(user.get("id"), str) else "" + if not user_id: + return {"status": "error", "detail": "missing user id"} + full = keycloak_admin.get_user(user_id) + prepared = _build_sync_input(full) + if isinstance(prepared, UserSyncOutcome): + return {"status": prepared.status, "detail": prepared.detail or ""} + + outcome = self._rotation_outcome(prepared) + if outcome.status == "synced": + return {"status": "ok", "rotated": True} + if outcome.status == "skipped": + return {"status": "ok", "rotated": False} + return {"status": "error", "detail": outcome.detail or "rotation check failed"} + def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]: email = (email or "").strip() if not email: diff --git a/ariadne/services/nextcloud.py b/ariadne/services/nextcloud.py index 996bd68..2ae8b3d 100644 --- a/ariadne/services/nextcloud.py +++ b/ariadne/services/nextcloud.py @@ -531,6 +531,15 @@ class NextcloudService: self._sync_user_mail(user, counters) summary = counters.summary() + summary_payload = { + "processed": summary.processed, + "created": summary.created, + "updated": summary.updated, + "deleted": summary.deleted, + "skipped": summary.skipped, + "failures": summary.failures, + "detail": summary.detail, + } logger.info( "nextcloud mail sync finished", @@ -547,7 +556,7 @@ class NextcloudService: }, ) - return {"status": counters.status(), "summary": summary, "detail": summary.detail} + return {"status": counters.status(), "summary": summary_payload, "detail": summary.detail} def _run_shell(self, script: str, check: bool = True) -> None: self._executor.exec( diff --git a/ariadne/services/wger.py b/ariadne/services/wger.py index 220eaec..d329a59 100644 --- a/ariadne/services/wger.py +++ b/ariadne/services/wger.py @@ -429,6 +429,31 @@ class WgerService: settings.wger_container, ) + def check_rotation_for_user(self, username: str) -> dict[str, Any]: + username = (username or "").strip() + if not username: + return {"status": "error", "detail": "missing username"} + if not keycloak_admin.ready(): + return {"status": "error", "detail": "keycloak admin not configured"} + + user = keycloak_admin.find_user(username) + if not isinstance(user, dict): + return {"status": "error", "detail": "user not found"} + user_id = user.get("id") if isinstance(user.get("id"), str) else "" + if not user_id: + return {"status": "error", "detail": "missing user id"} + full = keycloak_admin.get_user(user_id) + prepared = _build_sync_input(full) + if isinstance(prepared, UserSyncOutcome): + return {"status": prepared.status, "detail": prepared.detail or ""} + + outcome = self._rotation_outcome(prepared) + if outcome.status == "synced": + return {"status": "ok", "rotated": True} + if outcome.status == "skipped": + return {"status": "ok", "rotated": False} + return {"status": "error", "detail": outcome.detail or "rotation check failed"} + def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]: username = (username or "").strip() if not username: diff --git a/tests/test_nextcloud_sync.py b/tests/test_nextcloud_sync.py index 4adad04..3519814 100644 --- a/tests/test_nextcloud_sync.py +++ b/tests/test_nextcloud_sync.py @@ -72,7 +72,7 @@ def test_nextcloud_sync_mail_create(monkeypatch) -> None: result = svc.sync_mail() assert result["status"] == "ok" summary = result["summary"] - assert summary.created == 1 + assert summary["created"] == 1 assert any("mail:account:create" in call[0] for call in occ_calls) @@ -153,8 +153,8 @@ def test_nextcloud_sync_mail_update_and_delete(monkeypatch) -> None: result = svc.sync_mail() assert result["status"] == "ok" summary = result["summary"] - assert summary.updated == 1 - assert summary.deleted == 1 + assert summary["updated"] == 1 + assert summary["deleted"] == 1 assert any("mail:account:update" in call[0] for call in occ_calls) diff --git a/tests/test_services.py b/tests/test_services.py index bd3a68c..e87bbe8 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -81,6 +81,49 @@ def test_nextcloud_sync_mail_no_user(monkeypatch) -> None: assert result["status"] == "ok" +def test_firefly_check_rotation_marks_rotated(monkeypatch) -> None: + dummy = types.SimpleNamespace( + firefly_namespace="finance", + firefly_pod_label="app=firefly", + firefly_container="firefly", + firefly_user_sync_wait_timeout_sec=60.0, + mailu_domain="bstein.dev", + ) + monkeypatch.setattr("ariadne.services.firefly.settings", dummy) + + calls: list[tuple[str, str, str]] = [] + + class DummyAdmin: + def ready(self) -> bool: + return True + + def find_user(self, username: str): + return {"id": "1", "username": username, "attributes": {}} + + def get_user(self, user_id: str): + return { + "id": user_id, + "username": "alice", + "attributes": { + "mailu_email": ["alice@bstein.dev"], + "firefly_password": ["pw"], + }, + } + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + calls.append((username, key, value)) + + monkeypatch.setattr("ariadne.services.firefly.keycloak_admin", DummyAdmin()) + + svc = FireflyService() + monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"}) + + result = svc.check_rotation_for_user("alice") + assert result["status"] == "ok" + assert result["rotated"] is True + assert any(key == "firefly_password_rotated_at" for _user, key, _value in calls) + + def test_wger_sync_user_exec(monkeypatch) -> None: dummy = types.SimpleNamespace( wger_namespace="health", @@ -186,6 +229,52 @@ def test_wger_sync_users(monkeypatch) -> None: assert any(key == "wger_password_updated_at" for _user, key, _value in calls) +def test_wger_check_rotation_marks_rotated(monkeypatch) -> None: + dummy = types.SimpleNamespace( + wger_namespace="health", + wger_user_sync_wait_timeout_sec=60.0, + wger_pod_label="app=wger", + wger_container="wger", + wger_admin_username="admin", + wger_admin_password="pw", + wger_admin_email="admin@bstein.dev", + mailu_domain="bstein.dev", + ) + monkeypatch.setattr("ariadne.services.wger.settings", dummy) + + calls: list[tuple[str, str, str]] = [] + + class DummyAdmin: + def ready(self) -> bool: + return True + + def find_user(self, username: str): + return {"id": "1", "username": username, "attributes": {}} + + def get_user(self, user_id: str): + return { + "id": user_id, + "username": "alice", + "attributes": { + "mailu_email": ["alice@bstein.dev"], + "wger_password": ["pw"], + }, + } + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + calls.append((username, key, value)) + + monkeypatch.setattr("ariadne.services.wger.keycloak_admin", DummyAdmin()) + + svc = WgerService() + monkeypatch.setattr(svc, "check_password", lambda *_args, **_kwargs: {"status": "mismatch"}) + + result = svc.check_rotation_for_user("alice") + assert result["status"] == "ok" + assert result["rotated"] is True + assert any(key == "wger_password_rotated_at" for _user, key, _value in calls) + + def test_wger_sync_marks_rotated(monkeypatch) -> None: dummy = types.SimpleNamespace( wger_namespace="health",