from tests.unit.app.app_route_helpers import * def test_forbidden_admin(monkeypatch) -> None: ctx = AuthContext(username="alice", email="", groups=["dev"], claims={}) client = _client(monkeypatch, ctx) resp = client.get( "/api/admin/access/requests", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 403 def test_list_access_requests(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) now = datetime.now(timezone.utc) monkeypatch.setattr( app_module.storage, "list_pending_requests", lambda: [ { "request_code": "REQ1", "username": "alice", "contact_email": "alice@example.com", "note": "hello", "status": "pending", "created_at": now, } ], ) resp = client.get( "/api/admin/access/requests", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 payload = resp.json() assert payload["requests"][0]["username"] == "alice" def test_list_access_requests_error(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.storage, "list_pending_requests", lambda: (_ for _ in ()).throw(RuntimeError("fail"))) resp = client.get( "/api/admin/access/requests", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 502 def test_list_audit_events(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) now = datetime.now(timezone.utc) monkeypatch.setattr( app_module.storage, "list_events", lambda **kwargs: [ { "id": 1, "event_type": "mailu_rotate", "detail": '{"status":"ok"}', "created_at": now, } ], ) resp = client.get( "/api/admin/audit/events", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 payload = resp.json() assert payload["events"][0]["detail"]["status"] == "ok" def test_list_audit_events_error(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.storage, "list_events", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("fail"))) resp = client.get( "/api/admin/audit/events", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 502 def test_list_audit_task_runs(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) now = datetime.now(timezone.utc) monkeypatch.setattr( app_module.storage, "list_task_runs", lambda **kwargs: [ { "id": 1, "request_code": "REQ1", "task": "mailu_sync", "status": "ok", "detail": "done", "started_at": now, "finished_at": now, "duration_ms": 120, } ], ) resp = client.get( "/api/admin/audit/task-runs", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 payload = resp.json() assert payload["task_runs"][0]["task"] == "mailu_sync" def test_list_audit_task_runs_error(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.storage, "list_task_runs", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("fail"))) resp = client.get( "/api/admin/audit/task-runs", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 502 def test_access_flags_from_keycloak(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True) monkeypatch.setattr(app_module.keycloak_admin, "list_group_names", lambda **kwargs: ["demo", "test"]) resp = client.get( "/api/admin/access/flags", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 assert resp.json()["flags"] == ["demo", "test"] def test_access_flags_fallback(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: False) monkeypatch.setattr( app_module, "settings", dataclasses.replace(app_module.settings, allowed_flag_groups=["demo"]), ) resp = client.get( "/api/admin/access/flags", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 assert resp.json()["flags"] == ["demo"] def test_access_flags_fallback_when_keycloak_listing_fails(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True) monkeypatch.setattr( app_module.keycloak_admin, "list_group_names", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("keycloak unavailable")), ) monkeypatch.setattr( app_module, "settings", dataclasses.replace(app_module.settings, allowed_flag_groups=["fallback"]), ) resp = client.get( "/api/admin/access/flags", headers={"Authorization": "Bearer token"}, ) assert resp.status_code == 200 assert resp.json()["flags"] == ["fallback"] def test_cluster_state_admin_and_internal_routes(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.storage, "latest_cluster_state", lambda: {"nodes": [{"name": "titan-01"}]}) admin_resp = client.get( "/api/admin/cluster/state", headers={"Authorization": "Bearer token"}, ) internal_resp = client.get("/api/internal/cluster/state") assert admin_resp.status_code == 200 assert admin_resp.json()["nodes"][0]["name"] == "titan-01" assert internal_resp.status_code == 200 assert internal_resp.json()["nodes"][0]["name"] == "titan-01" def test_cluster_state_routes_report_unavailable(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.storage, "latest_cluster_state", lambda: None) admin_resp = client.get( "/api/admin/cluster/state", headers={"Authorization": "Bearer token"}, ) internal_resp = client.get("/api/internal/cluster/state") assert admin_resp.status_code == 404 assert internal_resp.status_code == 404 def test_access_request_approve(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) captured = {} def fake_fetchone(_query, params): captured["flags"] = params[1] return {"request_code": "REQ1"} monkeypatch.setattr(app_module.portal_db, "fetchone", fake_fetchone) monkeypatch.setattr(app_module.provisioning, "provision_access_request", lambda code: None) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True) monkeypatch.setattr(app_module.keycloak_admin, "list_group_names", lambda **kwargs: ["demo"]) resp = client.post( "/api/admin/access/requests/alice/approve", headers={"Authorization": "Bearer token"}, json={"flags": ["demo", "test", "admin"], "note": "ok"}, ) assert resp.status_code == 200 assert resp.json()["request_code"] == "REQ1" assert captured["flags"] == ["demo"] def test_access_request_approve_bad_json(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ1"}) resp = client.post( "/api/admin/access/requests/alice/approve", headers={"Authorization": "Bearer token", "Content-Type": "application/json"}, data="{bad}", ) assert resp.status_code == 200 def test_access_request_approve_db_error(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr( app_module.portal_db, "fetchone", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")), ) resp = client.post( "/api/admin/access/requests/alice/approve", headers={"Authorization": "Bearer token"}, json={}, ) assert resp.status_code == 502 def test_access_request_approve_skipped(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: None) resp = client.post( "/api/admin/access/requests/alice/approve", headers={"Authorization": "Bearer token"}, json={"flags": ["demo"]}, ) assert resp.status_code == 200 assert resp.json()["request_code"] == "" def test_access_request_deny(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ2"}) resp = client.post( "/api/admin/access/requests/alice/deny", headers={"Authorization": "Bearer token"}, json={"note": "no"}, ) assert resp.status_code == 200 assert resp.json()["request_code"] == "REQ2" def test_access_request_deny_db_error(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr( app_module.portal_db, "fetchone", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")), ) resp = client.post( "/api/admin/access/requests/alice/deny", headers={"Authorization": "Bearer token"}, json={}, ) assert resp.status_code == 502 def test_access_request_deny_skipped(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: None) resp = client.post( "/api/admin/access/requests/alice/deny", headers={"Authorization": "Bearer token"}, json={"note": "no"}, ) assert resp.status_code == 200 assert resp.json()["request_code"] == "" def test_require_admin_allows_group() -> None: ctx = AuthContext(username="alice", email="", groups=["admin"], claims={}) app_module._require_admin(ctx) def test_access_request_deny_bad_json(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ2"}) resp = client.post( "/api/admin/access/requests/alice/deny", headers={"Authorization": "Bearer token", "Content-Type": "application/json"}, data="{bad}", ) assert resp.status_code == 200 def test_retry_access_request_rejects_invalid_and_unready(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) blank_resp = client.post("/api/access/requests/%20/retry") monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: False) unready_resp = client.post("/api/access/requests/REQ1/retry") assert blank_resp.status_code == 400 assert unready_resp.status_code == 503 def test_retry_access_request_reports_load_failure(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True) monkeypatch.setattr( app_module.portal_db, "fetchone", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("database unavailable")), ) resp = client.post("/api/access/requests/REQ1/retry") assert resp.status_code == 502 def test_retry_access_request_reports_update_failure(monkeypatch) -> None: ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={}) client = _client(monkeypatch, ctx) monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True) monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"status": "approved"}) monkeypatch.setattr( app_module.portal_db, "execute", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("database unavailable")), ) resp = client.post("/api/access/requests/REQ1/retry") assert resp.status_code == 502