from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timezone import time from ariadne.db.storage import ScheduleState from ariadne.scheduler.cron import CronScheduler, CronTask class DummyStorage: def __init__(self) -> None: self.task_runs = [] self.schedule_states = [] self.events = [] def record_task_run(self, *args, **kwargs): self.task_runs.append((args, kwargs)) def update_schedule_state(self, *args, **kwargs): self.schedule_states.append((args, kwargs)) def record_event(self, *args, **kwargs): self.events.append((args, kwargs)) def list_schedule_states(self): return [] def test_execute_task_records_failure() -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.1) def runner(): raise RuntimeError("boom") task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner) scheduler._next_run["test"] = datetime.now(timezone.utc) scheduler._execute_task(task) assert storage.task_runs assert storage.schedule_states def test_execute_task_records_success() -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.1) def runner(): return None task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=runner) scheduler._next_run["ok-task"] = datetime.now(timezone.utc) scheduler._execute_task(task) assert storage.task_runs assert storage.schedule_states def test_scheduler_start_stop() -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.01) scheduler.add_task("noop", "* * * * *", lambda: None) scheduler.start() time.sleep(0.02) scheduler.stop() assert scheduler._thread is not None def test_scheduler_start_skips_when_running() -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.01) class DummyThread: def __init__(self) -> None: self.started = False def is_alive(self) -> bool: return True def start(self) -> None: self.started = True def join(self, timeout=None) -> None: return None scheduler._thread = DummyThread() scheduler.start() assert scheduler._thread.started is False def test_scheduler_start_hydrates_persisted_schedule_metrics(monkeypatch) -> None: class HydratingStorage(DummyStorage): def list_schedule_states(self): finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc) return [ ScheduleState( task_name="nightly", cron_expr="30 4 * * *", last_started_at=finished, last_finished_at=finished, last_status="ok", last_error=None, last_duration_ms=100, next_run_at=None, ), ScheduleState( task_name="unknown", cron_expr="* * * * *", last_started_at=finished, last_finished_at=finished, last_status="ok", last_error=None, last_duration_ms=100, next_run_at=None, ), ] recorded = [] monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args)) scheduler = CronScheduler(HydratingStorage(), tick_sec=0.01) scheduler.add_task("nightly", "30 4 * * *", lambda: None) scheduler.start() scheduler.stop() assert any(item[0] == "nightly" and item[4] is True for item in recorded) assert not any(item[0] == "unknown" for item in recorded) def test_scheduler_hydration_ignores_storage_without_state_listing() -> None: class MinimalStorage: pass scheduler = CronScheduler(MinimalStorage(), tick_sec=0.01) scheduler._hydrate_schedule_metrics() def test_scheduler_hydration_logs_storage_errors(monkeypatch) -> None: class BrokenStorage(DummyStorage): def list_schedule_states(self): raise RuntimeError("storage offline") warnings = [] scheduler = CronScheduler(BrokenStorage(), tick_sec=0.01) monkeypatch.setattr(scheduler._logger, "warning", lambda *args, **kwargs: warnings.append((args, kwargs))) scheduler._hydrate_schedule_metrics() assert warnings assert warnings[0][1]["extra"]["detail"] == "storage offline" def test_scheduler_hydration_records_error_and_unknown_statuses(monkeypatch) -> None: finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc) class StatusStorage(DummyStorage): def list_schedule_states(self): return [ ScheduleState( task_name="failed-task", cron_expr="*/5 * * * *", last_started_at=finished, last_finished_at=None, last_status="error", last_error="boom", last_duration_ms=100, next_run_at=None, ), ScheduleState( task_name="pending-task", cron_expr="*/10 * * * *", last_started_at=finished, last_finished_at=None, last_status="running", last_error=None, last_duration_ms=100, next_run_at=None, ), ] recorded = [] monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args)) scheduler = CronScheduler(StatusStorage(), tick_sec=0.01) scheduler.add_task("failed-task", "*/5 * * * *", lambda: None) scheduler.add_task("pending-task", "*/10 * * * *", lambda: None) scheduler._next_run.pop("pending-task") scheduler._hydrate_schedule_metrics() failed = next(item for item in recorded if item[0] == "failed-task") pending = next(item for item in recorded if item[0] == "pending-task") assert failed[2] is None assert failed[4] is False assert pending[3] is None assert pending[4] is None def test_compute_next_handles_naive_timestamp() -> None: scheduler = CronScheduler(DummyStorage(), tick_sec=0.1) base = datetime(2024, 1, 1, 12, 0, 0) next_time = scheduler._compute_next("* * * * *", base) assert next_time.tzinfo is not None def test_run_loop_skips_running_task(monkeypatch) -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.01) scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None) scheduler._next_run["test"] = datetime.now(timezone.utc) scheduler._running.add("test") monkeypatch.setattr( "ariadne.scheduler.cron.time.sleep", lambda *_args, **_kwargs: scheduler._stop_event.set(), ) scheduler._run_loop() def test_run_loop_spawns_thread(monkeypatch) -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.01) scheduler._tasks["test"] = CronTask(name="test", cron_expr="* * * * *", runner=lambda: None) scheduler._next_run["test"] = datetime.now(timezone.utc) started = {"value": False} class DummyThread: def __init__(self, target=None, args=(), name=None, daemon=None): self.target = target self.args = args self.name = name self.daemon = daemon def start(self) -> None: started["value"] = True monkeypatch.setattr("ariadne.scheduler.cron.threading.Thread", DummyThread) monkeypatch.setattr( "ariadne.scheduler.cron.time.sleep", lambda *_args, **_kwargs: scheduler._stop_event.set(), ) scheduler._run_loop() assert started["value"] is True def test_execute_task_records_result_payload() -> None: storage = DummyStorage() scheduler = CronScheduler(storage, tick_sec=0.1) def runner(): return {"status": "ok", "count": 2} task = CronTask(name="result-task", cron_expr="*/5 * * * *", runner=runner) scheduler._next_run["result-task"] = datetime.now(timezone.utc) scheduler._execute_task(task) assert storage.events event_args, _event_kwargs = storage.events[0] assert event_args[0] == "schedule_task" assert event_args[1]["result"]["status"] == "ok" def test_execute_task_handles_storage_errors() -> None: class FailingStorage(DummyStorage): def record_event(self, *args, **kwargs): raise RuntimeError("fail") def record_task_run(self, *args, **kwargs): raise RuntimeError("fail") def update_schedule_state(self, *args, **kwargs): raise RuntimeError("fail") scheduler = CronScheduler(FailingStorage(), tick_sec=0.1) @dataclass(frozen=True) class Summary: ok: bool task = CronTask(name="ok-task", cron_expr="*/5 * * * *", runner=lambda: Summary(ok=True)) scheduler._next_run["ok-task"] = datetime.now(timezone.utc) scheduler._execute_task(task) def test_format_result_string() -> None: detail, payload = CronScheduler._format_result("ok") assert detail == "ok" assert payload == "ok"