From 1b0137d984e54529028892654b1a8e39dc380d9d Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 12:39:26 -0300 Subject: [PATCH] fix(ariadne): hydrate schedule metrics after restart --- ariadne/db/storage.py | 30 +++++++++++++++++++++++++++ ariadne/scheduler/cron.py | 35 +++++++++++++++++++++++++++++++ tests/test_scheduler.py | 43 +++++++++++++++++++++++++++++++++++++++ tests/test_storage.py | 25 +++++++++++++++++++++++ 4 files changed, 133 insertions(+) diff --git a/ariadne/db/storage.py b/ariadne/db/storage.py index e526a87..3a7b191 100644 --- a/ariadne/db/storage.py +++ b/ariadne/db/storage.py @@ -264,6 +264,36 @@ class Storage: ), ) + def list_schedule_states(self) -> list[ScheduleState]: + """Return persisted scheduler state so metrics survive process restarts.""" + + rows = self._db.fetchall( + """ + SELECT task_name, cron_expr, last_started_at, last_finished_at, last_status, + last_error, last_duration_ms, next_run_at + FROM ariadne_schedule_state + """ + ) + states: list[ScheduleState] = [] + for row in rows: + task_name = row.get("task_name") + cron_expr = row.get("cron_expr") + if not isinstance(task_name, str) or not isinstance(cron_expr, str): + continue + states.append( + ScheduleState( + task_name=task_name, + cron_expr=cron_expr, + last_started_at=row.get("last_started_at"), + last_finished_at=row.get("last_finished_at"), + last_status=row.get("last_status"), + last_error=row.get("last_error"), + last_duration_ms=row.get("last_duration_ms"), + next_run_at=row.get("next_run_at"), + ) + ) + return states + def record_cluster_state(self, snapshot: dict[str, Any]) -> None: payload = json.dumps(snapshot, ensure_ascii=True) self._db.execute( diff --git a/ariadne/scheduler/cron.py b/ariadne/scheduler/cron.py index 8fa9cee..d942280 100644 --- a/ariadne/scheduler/cron.py +++ b/ariadne/scheduler/cron.py @@ -43,6 +43,7 @@ class CronScheduler: def start(self) -> None: if self._thread and self._thread.is_alive(): return + self._hydrate_schedule_metrics() self._stop_event.clear() self._thread = threading.Thread(target=self._run_loop, name="ariadne-scheduler", daemon=True) self._thread.start() @@ -85,6 +86,40 @@ class CronScheduler: ) time.sleep(self._tick_sec) + def _hydrate_schedule_metrics(self) -> None: + try: + states = self._storage.list_schedule_states() + except AttributeError: + return + except Exception as exc: + self._logger.warning( + "schedule metric hydration failed", + extra={"event": "schedule_hydration_error", "detail": str(exc)}, + ) + return + + known_tasks = set(self._tasks) + for state in states: + if state.task_name not in known_tasks: + continue + last_finished = state.last_finished_at or state.last_started_at + last_success = last_finished if state.last_status == "ok" else None + if state.last_status == "ok": + ok: bool | None = True + elif state.last_status == "error": + ok = False + else: + ok = None + record_schedule_state( + state.task_name, + state.last_started_at.timestamp() if state.last_started_at else None, + last_success.timestamp() if last_success else None, + self._next_run.get(state.task_name).timestamp() + if self._next_run.get(state.task_name) + else None, + ok, + ) + def _execute_task(self, task: CronTask) -> None: started = datetime.now(timezone.utc) status = "ok" diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 56d5583..a22d902 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timezone import time +from ariadne.db.storage import ScheduleState from ariadne.scheduler.cron import CronScheduler, CronTask @@ -22,6 +23,9 @@ class DummyStorage: def record_event(self, *args, **kwargs): self.events.append((args, kwargs)) + def list_schedule_states(self): + return [] + def test_execute_task_records_failure() -> None: storage = DummyStorage() @@ -87,6 +91,45 @@ def test_scheduler_start_skips_when_running() -> None: assert scheduler._thread.started is False +def test_scheduler_start_hydrates_persisted_schedule_metrics(monkeypatch) -> None: + class HydratingStorage(DummyStorage): + def list_schedule_states(self): + finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc) + return [ + ScheduleState( + task_name="nightly", + cron_expr="30 4 * * *", + last_started_at=finished, + last_finished_at=finished, + last_status="ok", + last_error=None, + last_duration_ms=100, + next_run_at=None, + ), + ScheduleState( + task_name="unknown", + cron_expr="* * * * *", + last_started_at=finished, + last_finished_at=finished, + last_status="ok", + last_error=None, + last_duration_ms=100, + next_run_at=None, + ), + ] + + recorded = [] + monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args)) + + scheduler = CronScheduler(HydratingStorage(), tick_sec=0.01) + scheduler.add_task("nightly", "30 4 * * *", lambda: None) + scheduler.start() + scheduler.stop() + + assert any(item[0] == "nightly" and item[4] is True for item in recorded) + assert not any(item[0] == "unknown" for item in recorded) + + def test_compute_next_handles_naive_timestamp() -> None: scheduler = CronScheduler(DummyStorage(), tick_sec=0.1) base = datetime(2024, 1, 1, 12, 0, 0) diff --git a/tests/test_storage.py b/tests/test_storage.py index 69dd086..4208892 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -345,6 +345,31 @@ def test_update_schedule_state_executes() -> None: assert db.executed +def test_list_schedule_states_returns_valid_rows() -> None: + db = DummyDB() + now = datetime.now() + db.rows = [ + { + "task_name": "schedule.nightly", + "cron_expr": "30 4 * * *", + "last_started_at": now, + "last_finished_at": now, + "last_status": "ok", + "last_error": None, + "last_duration_ms": 10, + "next_run_at": None, + }, + {"task_name": None, "cron_expr": "bad"}, + ] + storage = Storage(db) + + states = storage.list_schedule_states() + + assert len(states) == 1 + assert states[0].task_name == "schedule.nightly" + assert states[0].last_status == "ok" + + def test_record_cluster_state_executes() -> None: db = DummyDB() storage = Storage(db)