234 lines
8.1 KiB
Python
234 lines
8.1 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import types
|
||
|
|
|
||
|
|
from ariadne.services import nextcloud as nextcloud_module
|
||
|
|
from ariadne.services.nextcloud import NextcloudService, _parse_mail_export
|
||
|
|
|
||
|
|
|
||
|
|
def test_parse_mail_export() -> None:
|
||
|
|
output = "\n".join(
|
||
|
|
[
|
||
|
|
"Account 12:",
|
||
|
|
" - Name: Alice",
|
||
|
|
" - E-mail: alice@bstein.dev",
|
||
|
|
"Account 13:",
|
||
|
|
" - E-mail: extra@bstein.dev",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
accounts = _parse_mail_export(output)
|
||
|
|
assert accounts == [("12", "alice@bstein.dev"), ("13", "extra@bstein.dev")]
|
||
|
|
|
||
|
|
|
||
|
|
def test_nextcloud_sync_mail_create(monkeypatch) -> None:
|
||
|
|
dummy_settings = types.SimpleNamespace(
|
||
|
|
nextcloud_namespace="nextcloud",
|
||
|
|
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
|
||
|
|
nextcloud_mail_sync_wait_timeout_sec=90.0,
|
||
|
|
nextcloud_mail_sync_job_ttl_sec=3600,
|
||
|
|
nextcloud_pod_label="app=nextcloud",
|
||
|
|
nextcloud_container="nextcloud",
|
||
|
|
nextcloud_exec_timeout_sec=30.0,
|
||
|
|
nextcloud_db_host="",
|
||
|
|
nextcloud_db_port=5432,
|
||
|
|
nextcloud_db_name="nextcloud",
|
||
|
|
nextcloud_db_user="nextcloud",
|
||
|
|
nextcloud_db_password="",
|
||
|
|
mailu_domain="bstein.dev",
|
||
|
|
mailu_host="mail.bstein.dev",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(nextcloud_module, "settings", dummy_settings)
|
||
|
|
|
||
|
|
user = {
|
||
|
|
"id": "uid-1",
|
||
|
|
"username": "alice",
|
||
|
|
"enabled": True,
|
||
|
|
"attributes": {
|
||
|
|
"mailu_app_password": ["pw"],
|
||
|
|
"mailu_email": ["alice@bstein.dev"],
|
||
|
|
},
|
||
|
|
}
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "ready", lambda: True)
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "iter_users", lambda **_kwargs: [user])
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "get_user", lambda *_args, **_kwargs: user)
|
||
|
|
|
||
|
|
occ_calls: list[list[str]] = []
|
||
|
|
list_calls = [[], [("42", "alice@bstein.dev")]]
|
||
|
|
|
||
|
|
def fake_occ(args):
|
||
|
|
occ_calls.append(args)
|
||
|
|
return ""
|
||
|
|
|
||
|
|
svc = NextcloudService()
|
||
|
|
monkeypatch.setattr(svc, "_occ", fake_occ)
|
||
|
|
monkeypatch.setattr(svc, "_list_mail_accounts", lambda *_args, **_kwargs: list_calls.pop(0))
|
||
|
|
monkeypatch.setattr(svc, "_set_editor_mode_richtext", lambda *_args, **_kwargs: None)
|
||
|
|
monkeypatch.setattr(svc, "_set_user_mail_meta", lambda *_args, **_kwargs: None)
|
||
|
|
|
||
|
|
result = svc.sync_mail()
|
||
|
|
assert result["status"] == "ok"
|
||
|
|
summary = result["summary"]
|
||
|
|
assert summary.created == 1
|
||
|
|
assert any("mail:account:create" in call[0] for call in occ_calls)
|
||
|
|
|
||
|
|
|
||
|
|
def test_nextcloud_sync_mail_keycloak_not_ready(monkeypatch) -> None:
|
||
|
|
dummy_settings = types.SimpleNamespace(
|
||
|
|
nextcloud_namespace="nextcloud",
|
||
|
|
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
|
||
|
|
nextcloud_mail_sync_wait_timeout_sec=90.0,
|
||
|
|
nextcloud_mail_sync_job_ttl_sec=3600,
|
||
|
|
nextcloud_pod_label="app=nextcloud",
|
||
|
|
nextcloud_container="nextcloud",
|
||
|
|
nextcloud_exec_timeout_sec=30.0,
|
||
|
|
nextcloud_db_host="",
|
||
|
|
nextcloud_db_port=5432,
|
||
|
|
nextcloud_db_name="nextcloud",
|
||
|
|
nextcloud_db_user="nextcloud",
|
||
|
|
nextcloud_db_password="",
|
||
|
|
mailu_domain="bstein.dev",
|
||
|
|
mailu_host="mail.bstein.dev",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(nextcloud_module, "settings", dummy_settings)
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "ready", lambda: False)
|
||
|
|
|
||
|
|
svc = NextcloudService()
|
||
|
|
result = svc.sync_mail()
|
||
|
|
assert result["status"] == "error"
|
||
|
|
|
||
|
|
|
||
|
|
def test_nextcloud_sync_mail_update_and_delete(monkeypatch) -> None:
|
||
|
|
dummy_settings = types.SimpleNamespace(
|
||
|
|
nextcloud_namespace="nextcloud",
|
||
|
|
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
|
||
|
|
nextcloud_mail_sync_wait_timeout_sec=90.0,
|
||
|
|
nextcloud_mail_sync_job_ttl_sec=3600,
|
||
|
|
nextcloud_pod_label="app=nextcloud",
|
||
|
|
nextcloud_container="nextcloud",
|
||
|
|
nextcloud_exec_timeout_sec=30.0,
|
||
|
|
nextcloud_db_host="",
|
||
|
|
nextcloud_db_port=5432,
|
||
|
|
nextcloud_db_name="nextcloud",
|
||
|
|
nextcloud_db_user="nextcloud",
|
||
|
|
nextcloud_db_password="",
|
||
|
|
mailu_domain="bstein.dev",
|
||
|
|
mailu_host="mail.bstein.dev",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(nextcloud_module, "settings", dummy_settings)
|
||
|
|
|
||
|
|
user = {
|
||
|
|
"id": "uid-1",
|
||
|
|
"username": "alice",
|
||
|
|
"enabled": True,
|
||
|
|
"attributes": {
|
||
|
|
"mailu_app_password": ["pw"],
|
||
|
|
"mailu_email": ["alice@bstein.dev"],
|
||
|
|
},
|
||
|
|
}
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "ready", lambda: True)
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "iter_users", lambda **_kwargs: [user])
|
||
|
|
monkeypatch.setattr(nextcloud_module.keycloak_admin, "get_user", lambda *_args, **_kwargs: user)
|
||
|
|
|
||
|
|
occ_calls: list[list[str]] = []
|
||
|
|
list_calls = [
|
||
|
|
[("1", "alice@bstein.dev"), ("2", "extra@bstein.dev")],
|
||
|
|
[("1", "alice@bstein.dev")],
|
||
|
|
]
|
||
|
|
|
||
|
|
def fake_occ(args):
|
||
|
|
occ_calls.append(args)
|
||
|
|
return ""
|
||
|
|
|
||
|
|
svc = NextcloudService()
|
||
|
|
monkeypatch.setattr(svc, "_occ", fake_occ)
|
||
|
|
monkeypatch.setattr(svc, "_list_mail_accounts", lambda *_args, **_kwargs: list_calls.pop(0))
|
||
|
|
monkeypatch.setattr(svc, "_set_editor_mode_richtext", lambda *_args, **_kwargs: None)
|
||
|
|
monkeypatch.setattr(svc, "_set_user_mail_meta", lambda *_args, **_kwargs: None)
|
||
|
|
|
||
|
|
result = svc.sync_mail()
|
||
|
|
assert result["status"] == "ok"
|
||
|
|
summary = result["summary"]
|
||
|
|
assert summary.updated == 1
|
||
|
|
assert summary.deleted == 1
|
||
|
|
assert any("mail:account:update" in call[0] for call in occ_calls)
|
||
|
|
|
||
|
|
|
||
|
|
def test_nextcloud_run_cron(monkeypatch) -> None:
|
||
|
|
dummy_settings = types.SimpleNamespace(
|
||
|
|
nextcloud_namespace="nextcloud",
|
||
|
|
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
|
||
|
|
nextcloud_mail_sync_wait_timeout_sec=90.0,
|
||
|
|
nextcloud_mail_sync_job_ttl_sec=3600,
|
||
|
|
nextcloud_pod_label="app=nextcloud",
|
||
|
|
nextcloud_container="nextcloud",
|
||
|
|
nextcloud_exec_timeout_sec=30.0,
|
||
|
|
nextcloud_db_host="",
|
||
|
|
nextcloud_db_port=5432,
|
||
|
|
nextcloud_db_name="nextcloud",
|
||
|
|
nextcloud_db_user="nextcloud",
|
||
|
|
nextcloud_db_password="",
|
||
|
|
mailu_domain="bstein.dev",
|
||
|
|
mailu_host="mail.bstein.dev",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(nextcloud_module, "settings", dummy_settings)
|
||
|
|
|
||
|
|
svc = NextcloudService()
|
||
|
|
|
||
|
|
def fake_exec(*_args, **_kwargs):
|
||
|
|
return types.SimpleNamespace(stdout="ok", stderr="", exit_code=0, ok=True)
|
||
|
|
|
||
|
|
monkeypatch.setattr(svc._executor, "exec", fake_exec)
|
||
|
|
result = svc.run_cron()
|
||
|
|
assert result["status"] == "ok"
|
||
|
|
|
||
|
|
|
||
|
|
def test_nextcloud_run_maintenance(monkeypatch) -> None:
|
||
|
|
dummy_settings = types.SimpleNamespace(
|
||
|
|
nextcloud_namespace="nextcloud",
|
||
|
|
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
|
||
|
|
nextcloud_mail_sync_wait_timeout_sec=90.0,
|
||
|
|
nextcloud_mail_sync_job_ttl_sec=3600,
|
||
|
|
nextcloud_pod_label="app=nextcloud",
|
||
|
|
nextcloud_container="nextcloud",
|
||
|
|
nextcloud_exec_timeout_sec=30.0,
|
||
|
|
nextcloud_db_host="",
|
||
|
|
nextcloud_db_port=5432,
|
||
|
|
nextcloud_db_name="nextcloud",
|
||
|
|
nextcloud_db_user="nextcloud",
|
||
|
|
nextcloud_db_password="",
|
||
|
|
nextcloud_url="https://cloud.bstein.dev",
|
||
|
|
nextcloud_admin_user="admin",
|
||
|
|
nextcloud_admin_password="secret",
|
||
|
|
mailu_domain="bstein.dev",
|
||
|
|
mailu_host="mail.bstein.dev",
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(nextcloud_module, "settings", dummy_settings)
|
||
|
|
|
||
|
|
svc = NextcloudService()
|
||
|
|
|
||
|
|
occ_calls: list[list[str]] = []
|
||
|
|
monkeypatch.setattr(svc, "_occ", lambda args: occ_calls.append(args) or "")
|
||
|
|
|
||
|
|
exec_calls = []
|
||
|
|
|
||
|
|
def fake_exec(*_args, **_kwargs):
|
||
|
|
exec_calls.append(_args)
|
||
|
|
return types.SimpleNamespace(stdout="ok", stderr="", exit_code=0, ok=True)
|
||
|
|
|
||
|
|
monkeypatch.setattr(svc._executor, "exec", fake_exec)
|
||
|
|
|
||
|
|
api_calls: list[tuple[str, str]] = []
|
||
|
|
|
||
|
|
def fake_api(method: str, path: str, data=None):
|
||
|
|
api_calls.append((method, path))
|
||
|
|
if method == "GET":
|
||
|
|
return {"ocs": {"data": [{"id": 1}]}}
|
||
|
|
return {}
|
||
|
|
|
||
|
|
monkeypatch.setattr(svc, "_external_api", fake_api)
|
||
|
|
|
||
|
|
result = svc.run_maintenance()
|
||
|
|
assert result["status"] == "ok"
|
||
|
|
assert any(len(call) > 1 and call[1] == "theming" for call in occ_calls)
|
||
|
|
assert any(method == "DELETE" for method, _ in api_calls)
|