fix(ariadne): hydrate schedule metrics after restart
This commit is contained in:
parent
949ef2c6ad
commit
1b0137d984
@ -264,6 +264,36 @@ class Storage:
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def list_schedule_states(self) -> list[ScheduleState]:
|
||||||
|
"""Return persisted scheduler state so metrics survive process restarts."""
|
||||||
|
|
||||||
|
rows = self._db.fetchall(
|
||||||
|
"""
|
||||||
|
SELECT task_name, cron_expr, last_started_at, last_finished_at, last_status,
|
||||||
|
last_error, last_duration_ms, next_run_at
|
||||||
|
FROM ariadne_schedule_state
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
states: list[ScheduleState] = []
|
||||||
|
for row in rows:
|
||||||
|
task_name = row.get("task_name")
|
||||||
|
cron_expr = row.get("cron_expr")
|
||||||
|
if not isinstance(task_name, str) or not isinstance(cron_expr, str):
|
||||||
|
continue
|
||||||
|
states.append(
|
||||||
|
ScheduleState(
|
||||||
|
task_name=task_name,
|
||||||
|
cron_expr=cron_expr,
|
||||||
|
last_started_at=row.get("last_started_at"),
|
||||||
|
last_finished_at=row.get("last_finished_at"),
|
||||||
|
last_status=row.get("last_status"),
|
||||||
|
last_error=row.get("last_error"),
|
||||||
|
last_duration_ms=row.get("last_duration_ms"),
|
||||||
|
next_run_at=row.get("next_run_at"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return states
|
||||||
|
|
||||||
def record_cluster_state(self, snapshot: dict[str, Any]) -> None:
|
def record_cluster_state(self, snapshot: dict[str, Any]) -> None:
|
||||||
payload = json.dumps(snapshot, ensure_ascii=True)
|
payload = json.dumps(snapshot, ensure_ascii=True)
|
||||||
self._db.execute(
|
self._db.execute(
|
||||||
|
|||||||
@ -43,6 +43,7 @@ class CronScheduler:
|
|||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
if self._thread and self._thread.is_alive():
|
if self._thread and self._thread.is_alive():
|
||||||
return
|
return
|
||||||
|
self._hydrate_schedule_metrics()
|
||||||
self._stop_event.clear()
|
self._stop_event.clear()
|
||||||
self._thread = threading.Thread(target=self._run_loop, name="ariadne-scheduler", daemon=True)
|
self._thread = threading.Thread(target=self._run_loop, name="ariadne-scheduler", daemon=True)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
@ -85,6 +86,40 @@ class CronScheduler:
|
|||||||
)
|
)
|
||||||
time.sleep(self._tick_sec)
|
time.sleep(self._tick_sec)
|
||||||
|
|
||||||
|
def _hydrate_schedule_metrics(self) -> None:
|
||||||
|
try:
|
||||||
|
states = self._storage.list_schedule_states()
|
||||||
|
except AttributeError:
|
||||||
|
return
|
||||||
|
except Exception as exc:
|
||||||
|
self._logger.warning(
|
||||||
|
"schedule metric hydration failed",
|
||||||
|
extra={"event": "schedule_hydration_error", "detail": str(exc)},
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
known_tasks = set(self._tasks)
|
||||||
|
for state in states:
|
||||||
|
if state.task_name not in known_tasks:
|
||||||
|
continue
|
||||||
|
last_finished = state.last_finished_at or state.last_started_at
|
||||||
|
last_success = last_finished if state.last_status == "ok" else None
|
||||||
|
if state.last_status == "ok":
|
||||||
|
ok: bool | None = True
|
||||||
|
elif state.last_status == "error":
|
||||||
|
ok = False
|
||||||
|
else:
|
||||||
|
ok = None
|
||||||
|
record_schedule_state(
|
||||||
|
state.task_name,
|
||||||
|
state.last_started_at.timestamp() if state.last_started_at else None,
|
||||||
|
last_success.timestamp() if last_success else None,
|
||||||
|
self._next_run.get(state.task_name).timestamp()
|
||||||
|
if self._next_run.get(state.task_name)
|
||||||
|
else None,
|
||||||
|
ok,
|
||||||
|
)
|
||||||
|
|
||||||
def _execute_task(self, task: CronTask) -> None:
|
def _execute_task(self, task: CronTask) -> None:
|
||||||
started = datetime.now(timezone.utc)
|
started = datetime.now(timezone.utc)
|
||||||
status = "ok"
|
status = "ok"
|
||||||
|
|||||||
@ -4,6 +4,7 @@ from dataclasses import dataclass
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from ariadne.db.storage import ScheduleState
|
||||||
from ariadne.scheduler.cron import CronScheduler, CronTask
|
from ariadne.scheduler.cron import CronScheduler, CronTask
|
||||||
|
|
||||||
|
|
||||||
@ -22,6 +23,9 @@ class DummyStorage:
|
|||||||
def record_event(self, *args, **kwargs):
|
def record_event(self, *args, **kwargs):
|
||||||
self.events.append((args, kwargs))
|
self.events.append((args, kwargs))
|
||||||
|
|
||||||
|
def list_schedule_states(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
def test_execute_task_records_failure() -> None:
|
def test_execute_task_records_failure() -> None:
|
||||||
storage = DummyStorage()
|
storage = DummyStorage()
|
||||||
@ -87,6 +91,45 @@ def test_scheduler_start_skips_when_running() -> None:
|
|||||||
assert scheduler._thread.started is False
|
assert scheduler._thread.started is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_scheduler_start_hydrates_persisted_schedule_metrics(monkeypatch) -> None:
|
||||||
|
class HydratingStorage(DummyStorage):
|
||||||
|
def list_schedule_states(self):
|
||||||
|
finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||||
|
return [
|
||||||
|
ScheduleState(
|
||||||
|
task_name="nightly",
|
||||||
|
cron_expr="30 4 * * *",
|
||||||
|
last_started_at=finished,
|
||||||
|
last_finished_at=finished,
|
||||||
|
last_status="ok",
|
||||||
|
last_error=None,
|
||||||
|
last_duration_ms=100,
|
||||||
|
next_run_at=None,
|
||||||
|
),
|
||||||
|
ScheduleState(
|
||||||
|
task_name="unknown",
|
||||||
|
cron_expr="* * * * *",
|
||||||
|
last_started_at=finished,
|
||||||
|
last_finished_at=finished,
|
||||||
|
last_status="ok",
|
||||||
|
last_error=None,
|
||||||
|
last_duration_ms=100,
|
||||||
|
next_run_at=None,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
recorded = []
|
||||||
|
monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args))
|
||||||
|
|
||||||
|
scheduler = CronScheduler(HydratingStorage(), tick_sec=0.01)
|
||||||
|
scheduler.add_task("nightly", "30 4 * * *", lambda: None)
|
||||||
|
scheduler.start()
|
||||||
|
scheduler.stop()
|
||||||
|
|
||||||
|
assert any(item[0] == "nightly" and item[4] is True for item in recorded)
|
||||||
|
assert not any(item[0] == "unknown" for item in recorded)
|
||||||
|
|
||||||
|
|
||||||
def test_compute_next_handles_naive_timestamp() -> None:
|
def test_compute_next_handles_naive_timestamp() -> None:
|
||||||
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
|
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
|
||||||
base = datetime(2024, 1, 1, 12, 0, 0)
|
base = datetime(2024, 1, 1, 12, 0, 0)
|
||||||
|
|||||||
@ -345,6 +345,31 @@ def test_update_schedule_state_executes() -> None:
|
|||||||
assert db.executed
|
assert db.executed
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_schedule_states_returns_valid_rows() -> None:
|
||||||
|
db = DummyDB()
|
||||||
|
now = datetime.now()
|
||||||
|
db.rows = [
|
||||||
|
{
|
||||||
|
"task_name": "schedule.nightly",
|
||||||
|
"cron_expr": "30 4 * * *",
|
||||||
|
"last_started_at": now,
|
||||||
|
"last_finished_at": now,
|
||||||
|
"last_status": "ok",
|
||||||
|
"last_error": None,
|
||||||
|
"last_duration_ms": 10,
|
||||||
|
"next_run_at": None,
|
||||||
|
},
|
||||||
|
{"task_name": None, "cron_expr": "bad"},
|
||||||
|
]
|
||||||
|
storage = Storage(db)
|
||||||
|
|
||||||
|
states = storage.list_schedule_states()
|
||||||
|
|
||||||
|
assert len(states) == 1
|
||||||
|
assert states[0].task_name == "schedule.nightly"
|
||||||
|
assert states[0].last_status == "ok"
|
||||||
|
|
||||||
|
|
||||||
def test_record_cluster_state_executes() -> None:
|
def test_record_cluster_state_executes() -> None:
|
||||||
db = DummyDB()
|
db = DummyDB()
|
||||||
storage = Storage(db)
|
storage = Storage(db)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user