ariadne/tests/test_storage.py

367 lines
9.5 KiB
Python

from __future__ import annotations
from datetime import datetime
from ariadne.db.storage import ScheduleState, Storage, TaskRunRecord
class DummyDB:
def __init__(self, row=None) -> None:
self.rows = []
self.executed = []
self.row = row
def fetchall(self, query, params=None):
return self.rows
def fetchone(self, query, params=None):
return self.row
def execute(self, query, params=None):
self.executed.append((query, params))
return None
def test_task_statuses_and_complete() -> None:
db = DummyDB()
db.rows = [{"task": "one", "status": "ok"}, {"task": "two", "status": "error"}]
storage = Storage(db)
statuses = storage.task_statuses("req")
assert statuses == {"one": "ok", "two": "error"}
assert storage.tasks_complete("req", ["one"]) is True
assert storage.tasks_complete("req", ["one", "two"]) is False
def test_row_to_request_flags() -> None:
row = {
"request_code": "abc",
"username": "alice",
"first_name": "Alice",
"last_name": "Atlas",
"contact_email": "a@example.com",
"status": "pending",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": ["demo", 1, "test"],
"approval_note": "note",
"denial_note": None,
}
req = Storage._row_to_request(row)
assert req.request_code == "abc"
assert req.approval_flags == ["demo", "1", "test"]
def test_access_requests_use_portal_db() -> None:
portal_row = {
"request_code": "req",
"username": "alice",
"first_name": "Alice",
"last_name": "Atlas",
"contact_email": "a@example.com",
"status": "pending",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
"approval_note": None,
"denial_note": None,
}
db = DummyDB()
portal = DummyDB(row=portal_row)
portal.rows = [{"request_code": "req"}]
storage = Storage(db, portal)
rows = storage.list_pending_requests()
assert rows == portal.rows
req = storage.fetch_access_request("req")
assert req is not None
assert req.request_code == "req"
def test_record_event_serializes_dict() -> None:
db = DummyDB()
storage = Storage(db)
storage.record_event("mailu_rotate", {"status": "ok"})
assert db.executed
_, params = db.executed[-1]
assert params[0] == "mailu_rotate"
assert "\"status\"" in params[1]
def test_list_events_filters() -> None:
db = DummyDB()
db.rows = [{"id": 1, "event_type": "foo", "detail": "bar", "created_at": datetime.now()}]
storage = Storage(db)
rows = storage.list_events(limit=1, event_type="foo")
assert rows[0]["event_type"] == "foo"
def test_list_events_without_filter() -> None:
db = DummyDB()
db.rows = [{"id": 1, "event_type": "foo", "detail": "bar", "created_at": datetime.now()}]
storage = Storage(db)
rows = storage.list_events(limit=1)
assert rows[0]["event_type"] == "foo"
def test_list_task_runs_filters() -> None:
db = DummyDB()
db.rows = [
{
"id": 1,
"request_code": "REQ1",
"task": "mailu_sync",
"status": "ok",
"detail": "done",
"started_at": datetime.now(),
"finished_at": datetime.now(),
"duration_ms": 10,
}
]
storage = Storage(db)
rows = storage.list_task_runs(limit=1, request_code="REQ1")
assert rows[0]["task"] == "mailu_sync"
def test_list_task_runs_filters_task_and_request() -> None:
db = DummyDB()
db.rows = [{"id": 1, "request_code": "REQ1", "task": "mailu_sync", "status": "ok"}]
storage = Storage(db)
rows = storage.list_task_runs(limit=1, request_code="REQ1", task="mailu_sync")
assert rows[0]["task"] == "mailu_sync"
def test_list_task_runs_filters_task_only() -> None:
db = DummyDB()
db.rows = [{"id": 1, "request_code": "REQ1", "task": "mailu_sync", "status": "ok"}]
storage = Storage(db)
rows = storage.list_task_runs(limit=1, task="mailu_sync")
assert rows[0]["task"] == "mailu_sync"
def test_list_task_runs_default() -> None:
db = DummyDB()
db.rows = [{"id": 1, "request_code": "REQ1", "task": "mailu_sync", "status": "ok"}]
storage = Storage(db)
rows = storage.list_task_runs(limit=1)
assert rows[0]["task"] == "mailu_sync"
def test_fetch_access_request(monkeypatch) -> None:
row = {
"request_code": "REQ1",
"username": "alice",
"contact_email": "alice@example.com",
"status": "pending",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
"approval_note": None,
"denial_note": None,
}
db = DummyDB(row=row)
storage = Storage(db)
req = storage.fetch_access_request("REQ1")
assert req
assert req.username == "alice"
def test_fetch_access_request_missing() -> None:
db = DummyDB(row=None)
storage = Storage(db)
assert storage.fetch_access_request("REQ1") is None
def test_find_access_request_by_username() -> None:
row = {
"request_code": "REQ1",
"username": "alice",
"contact_email": "alice@example.com",
"status": "pending",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
"approval_note": None,
"denial_note": None,
}
db = DummyDB(row=row)
storage = Storage(db)
req = storage.find_access_request_by_username("alice")
assert req
assert req.username == "alice"
def test_update_status_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.update_status("REQ1", "approved")
assert db.executed
def test_update_approval_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.update_approval("REQ1", "approved", "admin", ["demo"], "ok")
assert db.executed
def test_list_provision_candidates() -> None:
row = {
"request_code": "REQ1",
"username": "alice",
"contact_email": "alice@example.com",
"status": "approved",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
"approval_note": None,
"denial_note": None,
}
db = DummyDB()
db.rows = [row]
storage = Storage(db)
candidates = storage.list_provision_candidates()
assert candidates[0].username == "alice"
def test_mark_provision_attempted_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.mark_provision_attempted("REQ1")
assert db.executed
def test_set_initial_password_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.set_initial_password("REQ1", "pw")
assert db.executed
def test_mark_welcome_sent_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.mark_welcome_sent("REQ1")
assert db.executed
def test_ensure_task_rows_empty() -> None:
db = DummyDB()
storage = Storage(db)
storage.ensure_task_rows("REQ1", [])
assert not db.executed
def test_ensure_task_rows_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.ensure_task_rows("REQ1", ["task"])
assert db.executed
def test_update_task_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.update_task("REQ1", "task", "ok", None)
assert db.executed
def test_find_access_request_by_username_missing() -> None:
db = DummyDB(row=None)
storage = Storage(db)
assert storage.find_access_request_by_username("alice") is None
def test_list_pending_requests() -> None:
db = DummyDB()
db.rows = [{"request_code": "REQ1"}]
storage = Storage(db)
assert storage.list_pending_requests()[0]["request_code"] == "REQ1"
def test_record_task_run_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.record_task_run(
TaskRunRecord(
request_code="REQ1",
task="task",
status="ok",
detail=None,
started_at=datetime.now(),
finished_at=datetime.now(),
duration_ms=5,
)
)
assert db.executed
def test_update_schedule_state_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.update_schedule_state(
ScheduleState(
task_name="task",
cron_expr="* * * * *",
last_started_at=None,
last_finished_at=None,
last_status="ok",
last_error=None,
last_duration_ms=None,
next_run_at=None,
)
)
assert db.executed
def test_record_cluster_state_executes() -> None:
db = DummyDB()
storage = Storage(db)
storage.record_cluster_state({"ok": True})
assert db.executed
def test_prune_cluster_state_skips_zero() -> None:
db = DummyDB()
storage = Storage(db)
storage.prune_cluster_state(0)
assert not db.executed
def test_latest_cluster_state_parses_json() -> None:
db = DummyDB(row={"snapshot": "{\"ok\": true}", "created_at": datetime.now()})
storage = Storage(db)
snapshot = storage.latest_cluster_state()
assert snapshot == {"ok": True}