ariadne/tests/test_scheduler.py

33 lines
856 B
Python

from __future__ import annotations
from datetime import datetime
from ariadne.scheduler.cron import CronScheduler, CronTask
class DummyStorage:
def __init__(self) -> None:
self.task_runs = []
self.schedule_states = []
def record_task_run(self, *args, **kwargs):
self.task_runs.append((args, kwargs))
def update_schedule_state(self, *args, **kwargs):
self.schedule_states.append((args, kwargs))
def test_execute_task_records_failure() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
raise RuntimeError("boom")
task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["test"] = datetime.utcnow()
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states