ariadne/tests/test_scheduler.py

48 lines
1.3 KiB
Python
Raw Normal View History

2026-01-19 19:01:32 -03:00
from __future__ import annotations
from datetime import datetime, timezone
2026-01-19 19:01:32 -03:00
from ariadne.scheduler.cron import CronScheduler, CronTask
class DummyStorage:
def __init__(self) -> None:
self.task_runs = []
self.schedule_states = []
def record_task_run(self, *args, **kwargs):
self.task_runs.append((args, kwargs))
def update_schedule_state(self, *args, **kwargs):
self.schedule_states.append((args, kwargs))
def test_execute_task_records_failure() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
raise RuntimeError("boom")
task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["test"] = datetime.now(timezone.utc)
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states
def test_execute_task_records_success() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
return None
task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["ok-task"] = datetime.now(timezone.utc)
2026-01-19 19:01:32 -03:00
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states