ariadne/tests/unit/app/test_app_admin_routes.py

287 lines
9.9 KiB
Python
Raw Normal View History

from tests.unit.app.app_route_helpers import *
def test_forbidden_admin(monkeypatch) -> None:
ctx = AuthContext(username="alice", email="", groups=["dev"], claims={})
client = _client(monkeypatch, ctx)
resp = client.get(
"/api/admin/access/requests",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 403
def test_list_access_requests(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
now = datetime.now(timezone.utc)
monkeypatch.setattr(
app_module.storage,
"list_pending_requests",
lambda: [
{
"request_code": "REQ1",
"username": "alice",
"contact_email": "alice@example.com",
"note": "hello",
"status": "pending",
"created_at": now,
}
],
)
resp = client.get(
"/api/admin/access/requests",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
payload = resp.json()
assert payload["requests"][0]["username"] == "alice"
def test_list_access_requests_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.storage, "list_pending_requests", lambda: (_ for _ in ()).throw(RuntimeError("fail")))
resp = client.get(
"/api/admin/access/requests",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 502
def test_list_audit_events(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
now = datetime.now(timezone.utc)
monkeypatch.setattr(
app_module.storage,
"list_events",
lambda **kwargs: [
{
"id": 1,
"event_type": "mailu_rotate",
"detail": '{"status":"ok"}',
"created_at": now,
}
],
)
resp = client.get(
"/api/admin/audit/events",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
payload = resp.json()
assert payload["events"][0]["detail"]["status"] == "ok"
def test_list_audit_events_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.storage, "list_events", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("fail")))
resp = client.get(
"/api/admin/audit/events",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 502
def test_list_audit_task_runs(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
now = datetime.now(timezone.utc)
monkeypatch.setattr(
app_module.storage,
"list_task_runs",
lambda **kwargs: [
{
"id": 1,
"request_code": "REQ1",
"task": "mailu_sync",
"status": "ok",
"detail": "done",
"started_at": now,
"finished_at": now,
"duration_ms": 120,
}
],
)
resp = client.get(
"/api/admin/audit/task-runs",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
payload = resp.json()
assert payload["task_runs"][0]["task"] == "mailu_sync"
def test_list_audit_task_runs_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.storage, "list_task_runs", lambda **kwargs: (_ for _ in ()).throw(RuntimeError("fail")))
resp = client.get(
"/api/admin/audit/task-runs",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 502
def test_access_flags_from_keycloak(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True)
monkeypatch.setattr(app_module.keycloak_admin, "list_group_names", lambda **kwargs: ["demo", "test"])
resp = client.get(
"/api/admin/access/flags",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
assert resp.json()["flags"] == ["demo", "test"]
def test_access_flags_fallback(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: False)
monkeypatch.setattr(
app_module,
"settings",
dataclasses.replace(app_module.settings, allowed_flag_groups=["demo"]),
)
resp = client.get(
"/api/admin/access/flags",
headers={"Authorization": "Bearer token"},
)
assert resp.status_code == 200
assert resp.json()["flags"] == ["demo"]
def test_access_request_approve(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
captured = {}
def fake_fetchone(_query, params):
captured["flags"] = params[1]
return {"request_code": "REQ1"}
monkeypatch.setattr(app_module.portal_db, "fetchone", fake_fetchone)
monkeypatch.setattr(app_module.provisioning, "provision_access_request", lambda code: None)
monkeypatch.setattr(app_module.keycloak_admin, "ready", lambda: True)
monkeypatch.setattr(app_module.keycloak_admin, "list_group_names", lambda **kwargs: ["demo"])
resp = client.post(
"/api/admin/access/requests/alice/approve",
headers={"Authorization": "Bearer token"},
json={"flags": ["demo", "test", "admin"], "note": "ok"},
)
assert resp.status_code == 200
assert resp.json()["request_code"] == "REQ1"
assert captured["flags"] == ["demo"]
def test_access_request_approve_bad_json(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ1"})
resp = client.post(
"/api/admin/access/requests/alice/approve",
headers={"Authorization": "Bearer token", "Content-Type": "application/json"},
data="{bad}",
)
assert resp.status_code == 200
def test_access_request_approve_db_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(
app_module.portal_db,
"fetchone",
lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")),
)
resp = client.post(
"/api/admin/access/requests/alice/approve",
headers={"Authorization": "Bearer token"},
json={},
)
assert resp.status_code == 502
def test_access_request_approve_skipped(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: None)
resp = client.post(
"/api/admin/access/requests/alice/approve",
headers={"Authorization": "Bearer token"},
json={"flags": ["demo"]},
)
assert resp.status_code == 200
assert resp.json()["request_code"] == ""
def test_access_request_deny(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ2"})
resp = client.post(
"/api/admin/access/requests/alice/deny",
headers={"Authorization": "Bearer token"},
json={"note": "no"},
)
assert resp.status_code == 200
assert resp.json()["request_code"] == "REQ2"
def test_access_request_deny_db_error(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(
app_module.portal_db,
"fetchone",
lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")),
)
resp = client.post(
"/api/admin/access/requests/alice/deny",
headers={"Authorization": "Bearer token"},
json={},
)
assert resp.status_code == 502
def test_access_request_deny_skipped(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: None)
resp = client.post(
"/api/admin/access/requests/alice/deny",
headers={"Authorization": "Bearer token"},
json={"note": "no"},
)
assert resp.status_code == 200
assert resp.json()["request_code"] == ""
def test_require_admin_allows_group() -> None:
ctx = AuthContext(username="alice", email="", groups=["admin"], claims={})
app_module._require_admin(ctx)
def test_access_request_deny_bad_json(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module.portal_db, "fetchone", lambda *args, **kwargs: {"request_code": "REQ2"})
resp = client.post(
"/api/admin/access/requests/alice/deny",
headers={"Authorization": "Bearer token", "Content-Type": "application/json"},
data="{bad}",
)
assert resp.status_code == 200