from __future__ import annotations import types import pytest from ariadne.services.firefly import FireflyService from ariadne.services.mailu import MailuService from ariadne.services.nextcloud import NextcloudService from ariadne.services.wger import WgerService class DummySpawner: def __init__(self, namespace, cronjob): self.namespace = namespace self.cronjob = cronjob self.calls = [] def trigger_and_wait(self, label_suffix, env_overrides, timeout_sec, job_ttl_seconds=None): self.calls.append((label_suffix, env_overrides, timeout_sec, job_ttl_seconds)) return {"job": "test", "status": "ok"} def trigger(self, label_suffix, env_overrides, job_ttl_seconds=None): self.calls.append((label_suffix, env_overrides, job_ttl_seconds)) return {"job": "test", "status": "queued"} class DummyClient: def __init__(self): self.url = "" self.payload = None self.status_code = 200 def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def post(self, url, json=None): self.url = url self.payload = json return types.SimpleNamespace(status_code=self.status_code) def test_nextcloud_sync_mail_builds_env(monkeypatch) -> None: dummy = types.SimpleNamespace( nextcloud_namespace="nextcloud", nextcloud_mail_sync_cronjob="nextcloud-mail-sync", nextcloud_mail_sync_wait_timeout_sec=90.0, nextcloud_mail_sync_job_ttl_sec=3600, ) monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy) monkeypatch.setattr("ariadne.services.nextcloud.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) svc = NextcloudService() result = svc.sync_mail("alice", wait=True) assert result["status"] == "ok" spawner = svc._spawner assert spawner.calls label, env, timeout, ttl = spawner.calls[0] assert label == "alice" assert {item["name"]: item["value"] for item in env}["ONLY_USERNAME"] == "alice" assert ttl == 3600 def test_wger_sync_user_env(monkeypatch) -> None: dummy = types.SimpleNamespace( wger_namespace="health", wger_user_sync_cronjob="wger-user-sync", wger_admin_cronjob="wger-admin-ensure", wger_user_sync_wait_timeout_sec=60.0, ) monkeypatch.setattr("ariadne.services.wger.settings", dummy) monkeypatch.setattr("ariadne.services.wger.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) svc = WgerService() result = svc.sync_user("alice", "alice@bstein.dev", "pw", wait=True) assert result["status"] == "ok" user_spawner = svc._user_spawner label, env, _, _ = user_spawner.calls[0] assert label == "alice" env_map = {item["name"]: item["value"] for item in env} assert env_map["WGER_USERNAME"] == "alice" assert env_map["WGER_EMAIL"] == "alice@bstein.dev" def test_firefly_sync_user_env(monkeypatch) -> None: dummy = types.SimpleNamespace( firefly_namespace="finance", firefly_user_sync_cronjob="firefly-user-sync", firefly_user_sync_wait_timeout_sec=60.0, ) monkeypatch.setattr("ariadne.services.firefly.settings", dummy) monkeypatch.setattr("ariadne.services.firefly.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) svc = FireflyService() result = svc.sync_user("alice@bstein.dev", "pw", wait=True) assert result["status"] == "ok" spawner = svc._spawner label, env, _, _ = spawner.calls[0] assert label == "alice" env_map = {item["name"]: item["value"] for item in env} assert env_map["FIREFLY_USER_EMAIL"] == "alice@bstein.dev" def test_mailu_sync_includes_force(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( mailu_sync_url="http://mailu", mailu_sync_wait_timeout_sec=10.0, mailu_db_host="localhost", mailu_db_port=5432, mailu_db_name="mailu", mailu_db_user="mailu", mailu_db_password="secret", ) client = DummyClient() monkeypatch.setattr("ariadne.services.mailu.settings", dummy_settings) monkeypatch.setattr("ariadne.services.mailu.httpx.Client", lambda *args, **kwargs: client) svc = MailuService() svc.sync("provision", force=True) assert client.url == "http://mailu" assert client.payload["wait"] is True assert client.payload["force"] is True def test_nextcloud_missing_config(monkeypatch) -> None: dummy = types.SimpleNamespace( nextcloud_namespace="", nextcloud_mail_sync_cronjob="", nextcloud_mail_sync_wait_timeout_sec=90.0, nextcloud_mail_sync_job_ttl_sec=3600, ) monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy) svc = NextcloudService() with pytest.raises(RuntimeError): svc.sync_mail("alice")