fix(ariadne): hydrate schedule success history
This commit is contained in:
parent
69d543b963
commit
42acfcc436
@ -294,6 +294,26 @@ class Storage:
|
|||||||
)
|
)
|
||||||
return states
|
return states
|
||||||
|
|
||||||
|
def latest_successful_task_run_at(self, task: str) -> datetime | None:
|
||||||
|
"""Return the latest successful finish time for a recorded task."""
|
||||||
|
|
||||||
|
row = self._db.fetchone(
|
||||||
|
"""
|
||||||
|
SELECT finished_at
|
||||||
|
FROM ariadne_task_runs
|
||||||
|
WHERE task = %s
|
||||||
|
AND status = 'ok'
|
||||||
|
AND finished_at IS NOT NULL
|
||||||
|
ORDER BY finished_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
""",
|
||||||
|
(task,),
|
||||||
|
)
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
finished_at = row.get("finished_at")
|
||||||
|
return finished_at if isinstance(finished_at, datetime) else None
|
||||||
|
|
||||||
def record_cluster_state(self, snapshot: dict[str, Any]) -> None:
|
def record_cluster_state(self, snapshot: dict[str, Any]) -> None:
|
||||||
payload = json.dumps(snapshot, ensure_ascii=True)
|
payload = json.dumps(snapshot, ensure_ascii=True)
|
||||||
self._db.execute(
|
self._db.execute(
|
||||||
|
|||||||
@ -103,7 +103,11 @@ class CronScheduler:
|
|||||||
if state.task_name not in known_tasks:
|
if state.task_name not in known_tasks:
|
||||||
continue
|
continue
|
||||||
last_finished = state.last_finished_at or state.last_started_at
|
last_finished = state.last_finished_at or state.last_started_at
|
||||||
last_success = last_finished if state.last_status == "ok" else None
|
last_success = (
|
||||||
|
last_finished
|
||||||
|
if state.last_status == "ok"
|
||||||
|
else self._latest_successful_task_run_at(state.task_name)
|
||||||
|
)
|
||||||
if state.last_status == "ok":
|
if state.last_status == "ok":
|
||||||
ok: bool | None = True
|
ok: bool | None = True
|
||||||
elif state.last_status == "error":
|
elif state.last_status == "error":
|
||||||
@ -120,6 +124,18 @@ class CronScheduler:
|
|||||||
ok,
|
ok,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _latest_successful_task_run_at(self, task_name: str) -> datetime | None:
|
||||||
|
try:
|
||||||
|
return self._storage.latest_successful_task_run_at(task_name)
|
||||||
|
except AttributeError:
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
self._logger.warning(
|
||||||
|
"schedule success hydration failed",
|
||||||
|
extra={"event": "schedule_success_hydration_error", "task": task_name, "detail": str(exc)},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
def _execute_task(self, task: CronTask) -> None:
|
def _execute_task(self, task: CronTask) -> None:
|
||||||
started = datetime.now(timezone.utc)
|
started = datetime.now(timezone.utc)
|
||||||
status = "ok"
|
status = "ok"
|
||||||
|
|||||||
@ -156,6 +156,7 @@ def test_scheduler_hydration_logs_storage_errors(monkeypatch) -> None:
|
|||||||
|
|
||||||
def test_scheduler_hydration_records_error_and_unknown_statuses(monkeypatch) -> None:
|
def test_scheduler_hydration_records_error_and_unknown_statuses(monkeypatch) -> None:
|
||||||
finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc)
|
finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||||
|
earlier_success = datetime(2025, 12, 31, 12, 0, tzinfo=timezone.utc)
|
||||||
|
|
||||||
class StatusStorage(DummyStorage):
|
class StatusStorage(DummyStorage):
|
||||||
def list_schedule_states(self):
|
def list_schedule_states(self):
|
||||||
@ -182,6 +183,11 @@ def test_scheduler_hydration_records_error_and_unknown_statuses(monkeypatch) ->
|
|||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
def latest_successful_task_run_at(self, task_name):
|
||||||
|
if task_name == "failed-task":
|
||||||
|
return earlier_success
|
||||||
|
return None
|
||||||
|
|
||||||
recorded = []
|
recorded = []
|
||||||
monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args))
|
monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args))
|
||||||
|
|
||||||
@ -194,12 +200,48 @@ def test_scheduler_hydration_records_error_and_unknown_statuses(monkeypatch) ->
|
|||||||
|
|
||||||
failed = next(item for item in recorded if item[0] == "failed-task")
|
failed = next(item for item in recorded if item[0] == "failed-task")
|
||||||
pending = next(item for item in recorded if item[0] == "pending-task")
|
pending = next(item for item in recorded if item[0] == "pending-task")
|
||||||
assert failed[2] is None
|
assert failed[2] == earlier_success.timestamp()
|
||||||
assert failed[4] is False
|
assert failed[4] is False
|
||||||
assert pending[3] is None
|
assert pending[3] is None
|
||||||
assert pending[4] is None
|
assert pending[4] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_scheduler_hydration_logs_success_lookup_errors(monkeypatch) -> None:
|
||||||
|
finished = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||||
|
|
||||||
|
class BrokenSuccessStorage(DummyStorage):
|
||||||
|
def list_schedule_states(self):
|
||||||
|
return [
|
||||||
|
ScheduleState(
|
||||||
|
task_name="failed-task",
|
||||||
|
cron_expr="*/5 * * * *",
|
||||||
|
last_started_at=finished,
|
||||||
|
last_finished_at=finished,
|
||||||
|
last_status="error",
|
||||||
|
last_error="boom",
|
||||||
|
last_duration_ms=100,
|
||||||
|
next_run_at=None,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
def latest_successful_task_run_at(self, task_name):
|
||||||
|
raise RuntimeError(f"{task_name} lookup failed")
|
||||||
|
|
||||||
|
warnings = []
|
||||||
|
recorded = []
|
||||||
|
scheduler = CronScheduler(BrokenSuccessStorage(), tick_sec=0.01)
|
||||||
|
scheduler.add_task("failed-task", "*/5 * * * *", lambda: None)
|
||||||
|
monkeypatch.setattr(scheduler._logger, "warning", lambda *args, **kwargs: warnings.append((args, kwargs)))
|
||||||
|
monkeypatch.setattr("ariadne.scheduler.cron.record_schedule_state", lambda *args: recorded.append(args))
|
||||||
|
|
||||||
|
scheduler._hydrate_schedule_metrics()
|
||||||
|
|
||||||
|
assert warnings
|
||||||
|
assert warnings[0][1]["extra"]["event"] == "schedule_success_hydration_error"
|
||||||
|
failed = next(item for item in recorded if item[0] == "failed-task")
|
||||||
|
assert failed[2] is None
|
||||||
|
|
||||||
|
|
||||||
def test_compute_next_handles_naive_timestamp() -> None:
|
def test_compute_next_handles_naive_timestamp() -> None:
|
||||||
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
|
scheduler = CronScheduler(DummyStorage(), tick_sec=0.1)
|
||||||
base = datetime(2024, 1, 1, 12, 0, 0)
|
base = datetime(2024, 1, 1, 12, 0, 0)
|
||||||
|
|||||||
@ -370,6 +370,24 @@ def test_list_schedule_states_returns_valid_rows() -> None:
|
|||||||
assert states[0].last_status == "ok"
|
assert states[0].last_status == "ok"
|
||||||
|
|
||||||
|
|
||||||
|
def test_latest_successful_task_run_at_returns_finished_time() -> None:
|
||||||
|
now = datetime.now()
|
||||||
|
db = DummyDB(row={"finished_at": now})
|
||||||
|
storage = Storage(db)
|
||||||
|
|
||||||
|
assert storage.latest_successful_task_run_at("schedule.nightly") == now
|
||||||
|
|
||||||
|
|
||||||
|
def test_latest_successful_task_run_at_handles_missing_or_invalid_row() -> None:
|
||||||
|
db = DummyDB()
|
||||||
|
storage = Storage(db)
|
||||||
|
|
||||||
|
assert storage.latest_successful_task_run_at("schedule.nightly") is None
|
||||||
|
|
||||||
|
db.row = {"finished_at": "not a datetime"}
|
||||||
|
assert storage.latest_successful_task_run_at("schedule.nightly") is None
|
||||||
|
|
||||||
|
|
||||||
def test_record_cluster_state_executes() -> None:
|
def test_record_cluster_state_executes() -> None:
|
||||||
db = DummyDB()
|
db = DummyDB()
|
||||||
storage = Storage(db)
|
storage = Storage(db)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user