265 lines
12 KiB
Python
265 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
from types import SimpleNamespace
|
|
from typing import Any
|
|
|
|
from flask import Flask, g, jsonify
|
|
|
|
from atlas_portal.routes import account_actions as actions
|
|
|
|
|
|
class DummyAriadne:
|
|
def __init__(self, enabled: bool = False) -> None:
|
|
self._enabled = enabled
|
|
self.calls: list[tuple[str, str, object | None]] = []
|
|
|
|
def enabled(self) -> bool:
|
|
return self._enabled
|
|
|
|
def proxy(self, method: str, path: str, payload: object | None = None):
|
|
self.calls.append((method, path, payload))
|
|
return jsonify({"proxied": True, "path": path, "payload": payload})
|
|
|
|
|
|
class DummyAdmin:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
ready: bool = True,
|
|
user: dict[str, Any] | None = None,
|
|
fail_find: bool = False,
|
|
fail_set: bool = False,
|
|
) -> None:
|
|
self._ready = ready
|
|
self.user = user if user is not None else {"attributes": {}}
|
|
self.fail_find = fail_find
|
|
self.fail_set = fail_set
|
|
self.attributes: list[tuple[str, str, str]] = []
|
|
|
|
def ready(self) -> bool:
|
|
return self._ready
|
|
|
|
def find_user(self, username: str) -> dict[str, Any] | None:
|
|
if self.fail_find:
|
|
raise RuntimeError("lookup failed")
|
|
return self.user
|
|
|
|
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
|
if self.fail_set:
|
|
raise RuntimeError("write failed")
|
|
self.attributes.append((username, key, value))
|
|
|
|
|
|
class MailuClient:
|
|
status_code = 200
|
|
raises = False
|
|
|
|
def __init__(self, *, timeout: int) -> None:
|
|
self.timeout = timeout
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def post(self, url: str, json: dict[str, Any] | None = None):
|
|
if self.raises:
|
|
raise RuntimeError("mailu unavailable")
|
|
return SimpleNamespace(status_code=self.status_code)
|
|
|
|
|
|
def make_client(
|
|
monkeypatch,
|
|
*,
|
|
admin: DummyAdmin | None = None,
|
|
ariadne: DummyAriadne | None = None,
|
|
account_ok: bool = True,
|
|
username: str = "alice",
|
|
email: str = "alice@example.dev",
|
|
):
|
|
app = Flask(__name__)
|
|
active_admin = admin or DummyAdmin()
|
|
active_ariadne = ariadne or DummyAriadne()
|
|
|
|
monkeypatch.setattr(actions, "require_auth", lambda fn: fn)
|
|
monkeypatch.setattr(
|
|
actions,
|
|
"require_account_access",
|
|
lambda: (True, None) if account_ok else (False, (jsonify({"error": "forbidden"}), 403)),
|
|
)
|
|
monkeypatch.setattr(actions, "admin_client", lambda: active_admin)
|
|
monkeypatch.setattr(actions, "ariadne_client", active_ariadne)
|
|
monkeypatch.setattr(actions, "random_password", lambda length=16: f"pw-{length}")
|
|
monkeypatch.setattr(actions.settings, "MAILU_DOMAIN", "bstein.dev")
|
|
monkeypatch.setattr(actions.settings, "MAILU_SYNC_URL", "")
|
|
monkeypatch.setattr(actions, "trigger_nextcloud_mail_sync", lambda user, wait=True: {"status": "ok", "wait": wait})
|
|
monkeypatch.setattr(actions, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(actions, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "ok"})
|
|
|
|
@app.before_request
|
|
def set_user() -> None:
|
|
g.keycloak_username = username
|
|
g.keycloak_email = email
|
|
|
|
actions.register_account_actions(app)
|
|
return app.test_client(), active_admin, active_ariadne
|
|
|
|
|
|
def test_tcp_check_success_and_failure(monkeypatch) -> None:
|
|
class SocketContext:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
monkeypatch.setattr(actions.socket, "create_connection", lambda *args, **kwargs: SocketContext())
|
|
assert actions._tcp_check("host", 443, 1) is True
|
|
assert actions._tcp_check("", 443, 1) is False
|
|
assert actions._tcp_check("host", 0, 1) is False
|
|
monkeypatch.setattr(actions.socket, "create_connection", lambda *args, **kwargs: (_ for _ in ()).throw(OSError()))
|
|
assert actions._tcp_check("host", 443, 1) is False
|
|
|
|
|
|
def test_mailu_rotate_paths(monkeypatch) -> None:
|
|
client, _admin, _ariadne = make_client(monkeypatch, account_ok=False)
|
|
assert client.post("/api/account/mailu/rotate").status_code == 403
|
|
|
|
ariadne = DummyAriadne(enabled=True)
|
|
client, _admin, proxied = make_client(monkeypatch, ariadne=ariadne)
|
|
assert client.post("/api/account/mailu/rotate").get_json()["proxied"] is True
|
|
assert proxied.calls == [("POST", "/api/account/mailu/rotate", None)]
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(ready=False))
|
|
assert client.post("/api/account/mailu/rotate").status_code == 503
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, username="")
|
|
assert client.post("/api/account/mailu/rotate").status_code == 400
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(fail_set=True))
|
|
assert client.post("/api/account/mailu/rotate").status_code == 502
|
|
|
|
client, admin, _ariadne = make_client(monkeypatch)
|
|
data = client.post("/api/account/mailu/rotate").get_json()
|
|
assert data["password"] == "pw-16"
|
|
assert data["sync_enabled"] is False
|
|
assert data["nextcloud_sync"] == {"status": "ok", "wait": True}
|
|
assert admin.attributes[0] == ("alice", "mailu_app_password", "pw-16")
|
|
|
|
for status_code, expected_error in ((200, ""), (503, "sync status 503")):
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
|
|
MailuClient.status_code = status_code
|
|
MailuClient.raises = False
|
|
monkeypatch.setattr(actions, "httpx", SimpleNamespace(Client=MailuClient))
|
|
data = client.post("/api/account/mailu/rotate").get_json()
|
|
assert data["sync_enabled"] is True
|
|
assert data["sync_error"] == expected_error
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions.settings, "MAILU_SYNC_URL", "https://mailu-sync.example.dev")
|
|
MailuClient.raises = True
|
|
monkeypatch.setattr(actions, "httpx", SimpleNamespace(Client=MailuClient))
|
|
assert client.post("/api/account/mailu/rotate").get_json()["sync_error"] == "sync request failed"
|
|
MailuClient.raises = False
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError()))
|
|
assert client.post("/api/account/mailu/rotate").get_json()["nextcloud_sync"] == {"status": "error"}
|
|
|
|
|
|
def test_wger_reset_and_rotation_check_paths(monkeypatch) -> None:
|
|
client, _admin, _ariadne = make_client(monkeypatch, account_ok=False)
|
|
assert client.post("/api/account/wger/reset").status_code == 403
|
|
|
|
ariadne = DummyAriadne(enabled=True)
|
|
client, _admin, proxied = make_client(monkeypatch, ariadne=ariadne)
|
|
assert client.post("/api/account/wger/reset").get_json()["path"] == "/api/account/wger/reset"
|
|
assert client.post("/api/account/wger/rotation/check").get_json()["path"] == "/api/account/wger/rotation/check"
|
|
assert len(proxied.calls) == 2
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(ready=False))
|
|
assert client.post("/api/account/wger/reset").status_code == 503
|
|
client, _admin, _ariadne = make_client(monkeypatch, username="")
|
|
assert client.post("/api/account/wger/reset").status_code == 400
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(user={"attributes": {"mailu_email": ["mail@example.dev"]}}))
|
|
assert client.post("/api/account/wger/reset").get_json()["password"] == "pw-16"
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(user={"attributes": {"mailu_email": "mail@example.dev"}}))
|
|
assert client.post("/api/account/wger/reset").status_code == 200
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(fail_find=True))
|
|
assert client.post("/api/account/wger/reset").status_code == 200
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions, "trigger_wger_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
|
assert client.post("/api/account/wger/reset").status_code == 502
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(fail_set=True))
|
|
assert client.post("/api/account/wger/reset").status_code == 502
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
assert client.post("/api/account/wger/rotation/check").status_code == 503
|
|
|
|
|
|
def test_firefly_reset_and_rotation_check_paths(monkeypatch) -> None:
|
|
client, _admin, _ariadne = make_client(monkeypatch, account_ok=False)
|
|
assert client.post("/api/account/firefly/reset").status_code == 403
|
|
|
|
ariadne = DummyAriadne(enabled=True)
|
|
client, _admin, proxied = make_client(monkeypatch, ariadne=ariadne)
|
|
assert client.post("/api/account/firefly/reset").get_json()["path"] == "/api/account/firefly/reset"
|
|
assert client.post("/api/account/firefly/rotation/check").get_json()["path"] == "/api/account/firefly/rotation/check"
|
|
assert len(proxied.calls) == 2
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(ready=False))
|
|
assert client.post("/api/account/firefly/reset").status_code == 503
|
|
client, _admin, _ariadne = make_client(monkeypatch, username="")
|
|
assert client.post("/api/account/firefly/reset").status_code == 400
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(user={"attributes": {"mailu_email": ["mail@example.dev"]}}))
|
|
assert client.post("/api/account/firefly/reset").get_json()["password"] == "pw-24"
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(user={"attributes": {"mailu_email": "mail@example.dev"}}))
|
|
assert client.post("/api/account/firefly/reset").status_code == 200
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(fail_find=True))
|
|
assert client.post("/api/account/firefly/reset").status_code == 200
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions, "trigger_firefly_user_sync", lambda *args, **kwargs: {"status": "failed"})
|
|
assert client.post("/api/account/firefly/reset").status_code == 502
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(fail_set=True))
|
|
assert client.post("/api/account/firefly/reset").status_code == 502
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
assert client.post("/api/account/firefly/rotation/check").status_code == 503
|
|
|
|
|
|
def test_nextcloud_mail_sync_paths(monkeypatch) -> None:
|
|
client, _admin, _ariadne = make_client(monkeypatch, account_ok=False)
|
|
assert client.post("/api/account/nextcloud/mail/sync").status_code == 403
|
|
|
|
ariadne = DummyAriadne(enabled=True)
|
|
client, _admin, proxied = make_client(monkeypatch, ariadne=ariadne)
|
|
assert client.post("/api/account/nextcloud/mail/sync", json={"wait": False}).get_json()["proxied"] is True
|
|
assert proxied.calls == [("POST", "/api/account/nextcloud/mail/sync", {"wait": False})]
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch, admin=DummyAdmin(ready=False))
|
|
assert client.post("/api/account/nextcloud/mail/sync").status_code == 503
|
|
client, _admin, _ariadne = make_client(monkeypatch, username="")
|
|
assert client.post("/api/account/nextcloud/mail/sync").status_code == 400
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
assert client.post("/api/account/nextcloud/mail/sync", json={"wait": False}).get_json() == {
|
|
"status": "ok",
|
|
"wait": False,
|
|
}
|
|
|
|
client, _admin, _ariadne = make_client(monkeypatch)
|
|
monkeypatch.setattr(actions, "trigger_nextcloud_mail_sync", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("sync failed")))
|
|
assert client.post("/api/account/nextcloud/mail/sync").status_code == 502
|