235 lines
9.3 KiB
Python
235 lines
9.3 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from contextlib import contextmanager
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from flask import Flask, g, jsonify
|
||
|
|
|
||
|
|
from atlas_portal.routes import account_overview as overview
|
||
|
|
|
||
|
|
|
||
|
|
class DummyResult:
|
||
|
|
def __init__(self, row: dict[str, Any] | None = None) -> None:
|
||
|
|
self.row = row
|
||
|
|
|
||
|
|
def fetchone(self) -> dict[str, Any] | None:
|
||
|
|
return self.row
|
||
|
|
|
||
|
|
|
||
|
|
class DummyConn:
|
||
|
|
def __init__(self, *, request_code: str = "alice~CODE", step_done: bool = True, fail: bool = False) -> None:
|
||
|
|
self.request_code = request_code
|
||
|
|
self.step_done = step_done
|
||
|
|
self.fail = fail
|
||
|
|
self.executed: list[tuple[str, object | None]] = []
|
||
|
|
|
||
|
|
def execute(self, query: str, params: object | None = None) -> DummyResult:
|
||
|
|
self.executed.append((query, params))
|
||
|
|
if self.fail:
|
||
|
|
raise RuntimeError("database failed")
|
||
|
|
if "access_request_onboarding_steps" in query:
|
||
|
|
return DummyResult({"exists": 1} if self.step_done else None)
|
||
|
|
if "FROM access_requests" in query:
|
||
|
|
return DummyResult({"request_code": self.request_code} if self.request_code else None)
|
||
|
|
return DummyResult()
|
||
|
|
|
||
|
|
|
||
|
|
class DummyAdmin:
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
ready: bool = True,
|
||
|
|
user: dict[str, Any] | None = None,
|
||
|
|
full: dict[str, Any] | None = None,
|
||
|
|
fail_find: bool = False,
|
||
|
|
) -> None:
|
||
|
|
self._ready = ready
|
||
|
|
self.user = user if user is not None else {"id": "user-1", "email": "alice@idp.dev", "attributes": {}}
|
||
|
|
self.full = full if full is not None else {"email": "full@idp.dev", "attributes": {}}
|
||
|
|
self.fail_find = fail_find
|
||
|
|
|
||
|
|
def ready(self) -> bool:
|
||
|
|
return self._ready
|
||
|
|
|
||
|
|
def find_user(self, username: str) -> dict[str, Any] | None:
|
||
|
|
if self.fail_find:
|
||
|
|
raise RuntimeError("keycloak failed")
|
||
|
|
return self.user
|
||
|
|
|
||
|
|
def get_user(self, user_id: str) -> dict[str, Any]:
|
||
|
|
return self.full
|
||
|
|
|
||
|
|
|
||
|
|
class DummyAriadne:
|
||
|
|
def enabled(self) -> bool:
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def make_client(
|
||
|
|
monkeypatch,
|
||
|
|
*,
|
||
|
|
admin: DummyAdmin | None = None,
|
||
|
|
conn: DummyConn | None = None,
|
||
|
|
account_ok: bool = True,
|
||
|
|
username: str = "alice",
|
||
|
|
email: str = "",
|
||
|
|
):
|
||
|
|
app = Flask(__name__)
|
||
|
|
active_admin = admin or DummyAdmin()
|
||
|
|
active_conn = conn or DummyConn()
|
||
|
|
|
||
|
|
monkeypatch.setattr(overview, "require_auth", lambda fn: fn)
|
||
|
|
monkeypatch.setattr(
|
||
|
|
overview,
|
||
|
|
"require_account_access",
|
||
|
|
lambda: (True, None) if account_ok else (False, (jsonify({"error": "forbidden"}), 403)),
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(overview, "admin_client", lambda: active_admin)
|
||
|
|
monkeypatch.setattr(overview, "ariadne_client", DummyAriadne())
|
||
|
|
monkeypatch.setattr(overview.settings, "MAILU_DOMAIN", "bstein.dev")
|
||
|
|
monkeypatch.setattr(overview.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
||
|
|
monkeypatch.setattr(overview.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev")
|
||
|
|
monkeypatch.setattr(overview.settings, "JELLYFIN_LDAP_HOST", "ldap.example.dev")
|
||
|
|
monkeypatch.setattr(overview.settings, "JELLYFIN_LDAP_PORT", 389)
|
||
|
|
monkeypatch.setattr(overview.settings, "JELLYFIN_LDAP_CHECK_TIMEOUT_SEC", 1)
|
||
|
|
|
||
|
|
@contextmanager
|
||
|
|
def connect():
|
||
|
|
yield active_conn
|
||
|
|
|
||
|
|
monkeypatch.setattr(overview, "connect", connect)
|
||
|
|
|
||
|
|
@app.before_request
|
||
|
|
def set_user() -> None:
|
||
|
|
g.keycloak_username = username
|
||
|
|
g.keycloak_email = email
|
||
|
|
g.keycloak_groups = ["dev"]
|
||
|
|
|
||
|
|
overview.register_account_overview(app)
|
||
|
|
return app.test_client(), active_conn
|
||
|
|
|
||
|
|
|
||
|
|
def full_attrs(prefix: str = "") -> dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"mailu_email": f"{prefix}mail@example.dev",
|
||
|
|
"mailu_app_password": f"{prefix}mail-pw",
|
||
|
|
"nextcloud_mail_primary_email": f"{prefix}mail@example.dev",
|
||
|
|
"nextcloud_mail_account_count": "2",
|
||
|
|
"nextcloud_mail_synced_at": f"{prefix}synced",
|
||
|
|
"wger_password": f"{prefix}wger-pw",
|
||
|
|
"wger_password_updated_at": f"{prefix}wger-updated",
|
||
|
|
"firefly_password": f"{prefix}firefly-pw",
|
||
|
|
"firefly_password_updated_at": f"{prefix}firefly-updated",
|
||
|
|
"vaultwarden_email": f"{prefix}vault@example.dev",
|
||
|
|
"vaultwarden_status": f"{prefix}active",
|
||
|
|
"vaultwarden_synced_at": f"{prefix}vault-synced",
|
||
|
|
"vaultwarden_master_password_set_at": f"{prefix}vault-master",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def list_attrs(attrs: dict[str, Any]) -> dict[str, list[str]]:
|
||
|
|
return {key: [str(value)] for key, value in attrs.items()}
|
||
|
|
|
||
|
|
|
||
|
|
def test_tcp_check_paths(monkeypatch) -> None:
|
||
|
|
class SocketContext:
|
||
|
|
def __enter__(self):
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __exit__(self, exc_type, exc, tb):
|
||
|
|
return False
|
||
|
|
|
||
|
|
monkeypatch.setattr(overview.socket, "create_connection", lambda *args, **kwargs: SocketContext())
|
||
|
|
assert overview._tcp_check("host", 443, 1) is True
|
||
|
|
assert overview._tcp_check("", 443, 1) is False
|
||
|
|
assert overview._tcp_check("host", 0, 1) is False
|
||
|
|
monkeypatch.setattr(overview.socket, "create_connection", lambda *args, **kwargs: (_ for _ in ()).throw(OSError()))
|
||
|
|
assert overview._tcp_check("host", 443, 1) is False
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_preflight_and_admin_unavailable(monkeypatch) -> None:
|
||
|
|
client, _conn = make_client(monkeypatch, account_ok=False)
|
||
|
|
assert client.get("/api/account/overview").status_code == 403
|
||
|
|
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(ready=False))
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
assert data["mailu"]["status"] == "server not configured"
|
||
|
|
assert data["jellyfin"]["sync_detail"] == "keycloak admin not configured"
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_reads_list_attributes_and_reports_ldap_ok(monkeypatch) -> None:
|
||
|
|
attrs = list_attrs(full_attrs())
|
||
|
|
user = {"id": "user-1", "email": "alice@idp.dev", "federationLink": "ldap", "attributes": attrs}
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(user=user))
|
||
|
|
monkeypatch.setattr(overview, "_tcp_check", lambda *args, **kwargs: True)
|
||
|
|
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
|
||
|
|
assert data["user"] == {"username": "alice", "email": "alice@idp.dev", "groups": ["dev"]}
|
||
|
|
assert data["mailu"]["app_password"] == "mail-pw"
|
||
|
|
assert data["nextcloud_mail"]["status"] == "ready"
|
||
|
|
assert data["wger"]["password"] == "wger-pw"
|
||
|
|
assert data["firefly"]["password"] == "firefly-pw"
|
||
|
|
assert data["vaultwarden"]["status"] == "ready"
|
||
|
|
assert data["jellyfin"]["sync_status"] == "ok"
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_reads_string_attributes_and_database_confirmed_step(monkeypatch) -> None:
|
||
|
|
attrs = full_attrs("str-")
|
||
|
|
attrs["vaultwarden_master_password_set_at"] = ""
|
||
|
|
attrs["vaultwarden_status"] = "invited"
|
||
|
|
attrs["nextcloud_mail_account_count"] = "not-a-number"
|
||
|
|
user = {"id": "user-1", "email": "alice@idp.dev", "attributes": attrs}
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(user=user))
|
||
|
|
monkeypatch.setattr(overview, "_tcp_check", lambda *args, **kwargs: False)
|
||
|
|
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
|
||
|
|
assert data["nextcloud_mail"]["status"] == "needs sync"
|
||
|
|
assert data["vaultwarden"]["status"] == "ready"
|
||
|
|
assert data["jellyfin"]["sync_status"] == "degraded"
|
||
|
|
assert data["jellyfin"]["sync_detail"] == "LDAP unreachable"
|
||
|
|
assert data["onboarding_url"] == "https://portal.example.dev/onboarding?code=alice~CODE"
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_falls_back_to_full_user_list_attributes(monkeypatch) -> None:
|
||
|
|
user = {"id": "user-1", "attributes": {}}
|
||
|
|
full = {"email": "full@example.dev", "attributes": list_attrs(full_attrs("full-"))}
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(user=user, full=full), email="")
|
||
|
|
monkeypatch.setattr(overview, "_tcp_check", lambda *args, **kwargs: True)
|
||
|
|
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
|
||
|
|
assert data["user"]["email"] == "full@example.dev"
|
||
|
|
assert data["mailu"]["username"] == "full-mail@example.dev"
|
||
|
|
assert data["nextcloud_mail"]["primary_email"] == "full-mail@example.dev"
|
||
|
|
assert data["vaultwarden"]["username"] == "full-vault@example.dev"
|
||
|
|
assert data["jellyfin"]["sync_status"] == "degraded"
|
||
|
|
assert data["jellyfin"]["sync_detail"] == "Keycloak user is not LDAP-backed"
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_falls_back_to_full_user_string_attributes(monkeypatch) -> None:
|
||
|
|
user = {"id": "user-1", "attributes": {}}
|
||
|
|
full = {"email": "full@example.dev", "attributes": full_attrs("full-str-")}
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(user=user, full=full), email="")
|
||
|
|
monkeypatch.setattr(overview, "_tcp_check", lambda *args, **kwargs: True)
|
||
|
|
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
|
||
|
|
assert data["mailu"]["app_password"] == "full-str-mail-pw"
|
||
|
|
assert data["wger"]["password_updated_at"] == "full-str-wger-updated"
|
||
|
|
assert data["firefly"]["password_updated_at"] == "full-str-firefly-updated"
|
||
|
|
assert data["vaultwarden"]["synced_at"] == "full-str-vault-synced"
|
||
|
|
|
||
|
|
|
||
|
|
def test_overview_handles_keycloak_and_database_failures(monkeypatch) -> None:
|
||
|
|
client, _conn = make_client(monkeypatch, admin=DummyAdmin(fail_find=True), conn=DummyConn(fail=True))
|
||
|
|
data = client.get("/api/account/overview").get_json()
|
||
|
|
|
||
|
|
assert data["mailu"]["status"] == "unavailable"
|
||
|
|
assert data["nextcloud_mail"]["status"] == "unavailable"
|
||
|
|
assert data["wger"]["status"] == "unavailable"
|
||
|
|
assert data["firefly"]["status"] == "unavailable"
|
||
|
|
assert data["vaultwarden"]["status"] == "unavailable"
|
||
|
|
assert data["jellyfin"]["sync_detail"] == "unavailable"
|