183 lines
5.2 KiB
Python
183 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
import time
|
|
|
|
from ariadne.scheduler.cron import CronScheduler, CronTask
|
|
|
|
|
|
class DummyStorage:
|
|
def __init__(self) -> None:
|
|
self.task_runs = []
|
|
self.schedule_states = []
|
|
self.events = []
|
|
|
|
def record_task_run(self, *args, **kwargs):
|
|
self.task_runs.append((args, kwargs))
|
|
|
|
def update_schedule_state(self, *args, **kwargs):
|
|
self.schedule_states.append((args, kwargs))
|
|
|
|
def record_event(self, *args, **kwargs):
|
|
self.events.append((args, kwargs))
|
|
|
|
|
|
def test_execute_task_records_failure() -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.1)
|
|
|
|
def runner():
|
|
raise RuntimeError("boom")
|
|
|
|
task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner)
|
|
scheduler._next_run["test"] = datetime.now(timezone.utc)
|
|
scheduler._execute_task(task)
|
|
|
|
assert storage.task_runs
|
|
assert storage.schedule_states
|
|
|
|
|
|
def test_execute_task_records_success() -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.1)
|
|
|
|
def runner():
|
|
return None
|
|
|
|
task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=runner)
|
|
scheduler._next_run["ok-task"] = datetime.now(timezone.utc)
|
|
scheduler._execute_task(task)
|
|
|
|
assert storage.task_runs
|
|
assert storage.schedule_states
|
|
|
|
|
|
def test_scheduler_start_stop() -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.01)
|
|
|
|
scheduler.add_task("noop", "* * * * *", lambda: None)
|
|
scheduler.start()
|
|
time.sleep(0.02)
|
|
scheduler.stop()
|
|
|
|
assert scheduler._thread is not None
|
|
|
|
|
|
def test_scheduler_start_skips_when_running() -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.01)
|
|
|
|
class DummyThread:
|
|
def __init__(self) -> None:
|
|
self.started = False
|
|
|
|
def is_alive(self) -> bool:
|
|
return True
|
|
|
|
def start(self) -> None:
|
|
self.started = True
|
|
|
|
def join(self, timeout=None) -> None:
|
|
return None
|
|
|
|
scheduler._thread = DummyThread()
|
|
scheduler.start()
|
|
assert scheduler._thread.started is False
|
|
|
|
|
|
def test_compute_next_handles_naive_timestamp() -> None:
|
|
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
|
|
base = datetime(2024, 1, 1, 12, 0, 0)
|
|
next_time = scheduler._compute_next("* * * * *", base)
|
|
assert next_time.tzinfo is not None
|
|
|
|
|
|
def test_run_loop_skips_running_task(monkeypatch) -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.01)
|
|
scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None)
|
|
scheduler._next_run["test"] = datetime.now(timezone.utc)
|
|
scheduler._running.add("test")
|
|
|
|
monkeypatch.setattr(
|
|
"ariadne.scheduler.cron.time.sleep",
|
|
lambda *_args, **_kwargs: scheduler._stop_event.set(),
|
|
)
|
|
|
|
scheduler._run_loop()
|
|
|
|
|
|
def test_run_loop_spawns_thread(monkeypatch) -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.01)
|
|
scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None)
|
|
scheduler._next_run["test"] = datetime.now(timezone.utc)
|
|
|
|
started = {"value": False}
|
|
|
|
class DummyThread:
|
|
def __init__(self, target=None, args=(), name=None, daemon=None):
|
|
self.target = target
|
|
self.args = args
|
|
self.name = name
|
|
self.daemon = daemon
|
|
|
|
def start(self) -> None:
|
|
started["value"] = True
|
|
|
|
monkeypatch.setattr("ariadne.scheduler.cron.threading.Thread", DummyThread)
|
|
monkeypatch.setattr(
|
|
"ariadne.scheduler.cron.time.sleep",
|
|
lambda *_args, **_kwargs: scheduler._stop_event.set(),
|
|
)
|
|
|
|
scheduler._run_loop()
|
|
assert started["value"] is True
|
|
|
|
|
|
def test_execute_task_records_result_payload() -> None:
|
|
storage = DummyStorage()
|
|
scheduler = CronScheduler(storage, tick_sec=0.1)
|
|
|
|
def runner():
|
|
return {"status": "ok", "count": 2}
|
|
|
|
task = CronTask(name="result-task", cron_expr="*/5 * * * *", runner=runner)
|
|
scheduler._next_run["result-task"] = datetime.now(timezone.utc)
|
|
scheduler._execute_task(task)
|
|
|
|
assert storage.events
|
|
event_args, _event_kwargs = storage.events[0]
|
|
assert event_args[0] == "schedule_task"
|
|
assert event_args[1]["result"]["status"] == "ok"
|
|
|
|
|
|
def test_execute_task_handles_storage_errors() -> None:
|
|
class FailingStorage(DummyStorage):
|
|
def record_event(self, *args, **kwargs):
|
|
raise RuntimeError("fail")
|
|
|
|
def record_task_run(self, *args, **kwargs):
|
|
raise RuntimeError("fail")
|
|
|
|
def update_schedule_state(self, *args, **kwargs):
|
|
raise RuntimeError("fail")
|
|
|
|
scheduler = CronScheduler(FailingStorage(), tick_sec=0.1)
|
|
|
|
@dataclass(frozen=True)
|
|
class Summary:
|
|
ok: bool
|
|
|
|
task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=lambda: Summary(ok=True))
|
|
scheduler._next_run["ok-task"] = datetime.now(timezone.utc)
|
|
scheduler._execute_task(task)
|
|
|
|
|
|
def test_format_result_string() -> None:
|
|
detail, payload = CronScheduler._format_result("ok")
|
|
assert detail == "ok"
|
|
assert payload == "ok"
|