ariadne/tests/test_services.py

102 lines
3.6 KiB
Python

from __future__ import annotations
import types
import pytest
from ariadne.services.firefly import FireflyService
from ariadne.services.nextcloud import NextcloudService
from ariadne.services.wger import WgerService
class DummySpawner:
def __init__(self, namespace, cronjob):
self.namespace = namespace
self.cronjob = cronjob
self.calls = []
def trigger_and_wait(self, label_suffix, env_overrides, timeout_sec, job_ttl_seconds=None):
self.calls.append((label_suffix, env_overrides, timeout_sec, job_ttl_seconds))
return {"job": "test", "status": "ok"}
def trigger(self, label_suffix, env_overrides, job_ttl_seconds=None):
self.calls.append((label_suffix, env_overrides, job_ttl_seconds))
return {"job": "test", "status": "queued"}
def test_nextcloud_sync_mail_builds_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
nextcloud_namespace="nextcloud",
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
nextcloud_mail_sync_wait_timeout_sec=90.0,
nextcloud_mail_sync_job_ttl_sec=3600,
)
monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy)
monkeypatch.setattr("ariadne.services.nextcloud.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = NextcloudService()
result = svc.sync_mail("alice", wait=True)
assert result["status"] == "ok"
spawner = svc._spawner
assert spawner.calls
label, env, timeout, ttl = spawner.calls[0]
assert label == "alice"
assert {item["name"]: item["value"] for item in env}["ONLY_USERNAME"] == "alice"
assert ttl == 3600
def test_wger_sync_user_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
wger_namespace="health",
wger_user_sync_cronjob="wger-user-sync",
wger_admin_cronjob="wger-admin-ensure",
wger_user_sync_wait_timeout_sec=60.0,
)
monkeypatch.setattr("ariadne.services.wger.settings", dummy)
monkeypatch.setattr("ariadne.services.wger.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = WgerService()
result = svc.sync_user("alice", "alice@bstein.dev", "pw", wait=True)
assert result["status"] == "ok"
user_spawner = svc._user_spawner
label, env, _, _ = user_spawner.calls[0]
assert label == "alice"
env_map = {item["name"]: item["value"] for item in env}
assert env_map["WGER_USERNAME"] == "alice"
assert env_map["WGER_EMAIL"] == "alice@bstein.dev"
def test_firefly_sync_user_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
firefly_namespace="finance",
firefly_user_sync_cronjob="firefly-user-sync",
firefly_user_sync_wait_timeout_sec=60.0,
)
monkeypatch.setattr("ariadne.services.firefly.settings", dummy)
monkeypatch.setattr("ariadne.services.firefly.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = FireflyService()
result = svc.sync_user("alice@bstein.dev", "pw", wait=True)
assert result["status"] == "ok"
spawner = svc._spawner
label, env, _, _ = spawner.calls[0]
assert label == "alice"
env_map = {item["name"]: item["value"] for item in env}
assert env_map["FIREFLY_USER_EMAIL"] == "alice@bstein.dev"
def test_nextcloud_missing_config(monkeypatch) -> None:
dummy = types.SimpleNamespace(
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
nextcloud_mail_sync_wait_timeout_sec=90.0,
nextcloud_mail_sync_job_ttl_sec=3600,
)
monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy)
svc = NextcloudService()
with pytest.raises(RuntimeError):
svc.sync_mail("alice")