ariadne/tests/test_scheduler.py

183 lines
5.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import time
from ariadne.scheduler.cron import CronScheduler, CronTask
class DummyStorage:
def __init__(self) -> None:
self.task_runs = []
self.schedule_states = []
self.events = []
def record_task_run(self, *args, **kwargs):
self.task_runs.append((args, kwargs))
def update_schedule_state(self, *args, **kwargs):
self.schedule_states.append((args, kwargs))
def record_event(self, *args, **kwargs):
self.events.append((args, kwargs))
def test_execute_task_records_failure() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
raise RuntimeError("boom")
task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["test"] = datetime.now(timezone.utc)
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states
def test_execute_task_records_success() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
return None
task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["ok-task"] = datetime.now(timezone.utc)
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states
def test_scheduler_start_stop() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.01)
scheduler.add_task("noop", "* * * * *", lambda: None)
scheduler.start()
time.sleep(0.02)
scheduler.stop()
assert scheduler._thread is not None
def test_scheduler_start_skips_when_running() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.01)
class DummyThread:
def __init__(self) -> None:
self.started = False
def is_alive(self) -> bool:
return True
def start(self) -> None:
self.started = True
def join(self, timeout=None) -> None:
return None
scheduler._thread = DummyThread()
scheduler.start()
assert scheduler._thread.started is False
def test_compute_next_handles_naive_timestamp() -> None:
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
base = datetime(2024, 1, 1, 12, 0, 0)
next_time = scheduler._compute_next("* * * * *", base)
assert next_time.tzinfo is not None
def test_run_loop_skips_running_task(monkeypatch) -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.01)
scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None)
scheduler._next_run["test"] = datetime.now(timezone.utc)
scheduler._running.add("test")
monkeypatch.setattr(
"ariadne.scheduler.cron.time.sleep",
lambda *_args, **_kwargs: scheduler._stop_event.set(),
)
scheduler._run_loop()
def test_run_loop_spawns_thread(monkeypatch) -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.01)
scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None)
scheduler._next_run["test"] = datetime.now(timezone.utc)
started = {"value": False}
class DummyThread:
def __init__(self, target=None, args=(), name=None, daemon=None):
self.target = target
self.args = args
self.name = name
self.daemon = daemon
def start(self) -> None:
started["value"] = True
monkeypatch.setattr("ariadne.scheduler.cron.threading.Thread", DummyThread)
monkeypatch.setattr(
"ariadne.scheduler.cron.time.sleep",
lambda *_args, **_kwargs: scheduler._stop_event.set(),
)
scheduler._run_loop()
assert started["value"] is True
def test_execute_task_records_result_payload() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
return {"status": "ok", "count": 2}
task = CronTask(name="result-task", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["result-task"] = datetime.now(timezone.utc)
scheduler._execute_task(task)
assert storage.events
event_args, _event_kwargs = storage.events[0]
assert event_args[0] == "schedule_task"
assert event_args[1]["result"]["status"] == "ok"
def test_execute_task_handles_storage_errors() -> None:
class FailingStorage(DummyStorage):
def record_event(self, *args, **kwargs):
raise RuntimeError("fail")
def record_task_run(self, *args, **kwargs):
raise RuntimeError("fail")
def update_schedule_state(self, *args, **kwargs):
raise RuntimeError("fail")
scheduler = CronScheduler(FailingStorage(), tick_sec=0.1)
@dataclass(frozen=True)
class Summary:
ok: bool
task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=lambda: Summary(ok=True))
scheduler._next_run["ok-task"] = datetime.now(timezone.utc)
scheduler._execute_task(task)
def test_format_result_string() -> None:
detail, payload = CronScheduler._format_result("ok")
assert detail == "ok"
assert payload == "ok"