test(ariadne): cover database migration edges

This commit is contained in:
codex 2026-04-21 04:37:47 -03:00
parent c41ff7ab3b
commit 77fc16a7cb
2 changed files with 60 additions and 7 deletions

View File

@ -94,13 +94,7 @@ class Database:
except Exception:
pass
def migrate(
self,
lock_id: int,
*,
include_ariadne_tables: bool = True,
include_access_requests: bool = True,
) -> None:
def migrate(self, lock_id: int, *, include_ariadne_tables: bool = True, include_access_requests: bool = True) -> None:
with self.connection() as conn:
self._configure_timeouts(conn)
if not self._try_advisory_lock(conn, lock_id):

View File

@ -98,6 +98,25 @@ def test_migrate_ignores_timeout_errors(monkeypatch) -> None:
db.migrate(lock_id=123)
def test_migrate_stops_when_dict_lock_is_unavailable(monkeypatch) -> None:
class DictLockConn(DummyConn):
def execute(self, query, params=None):
if "pg_try_advisory_lock" in query:
return DummyResult(row={"pg_try_advisory_lock": False})
return super().execute(query, params)
class DictLockPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = DictLockConn()
monkeypatch.setattr(db_module, "ConnectionPool", DictLockPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
assert not any("pg_advisory_unlock" in query for query, _params in db._pool.conn.executed)
def test_migrate_handles_lock_on_alter(monkeypatch) -> None:
class LockConn(DummyConn):
def execute(self, query, params=None):
@ -114,6 +133,46 @@ def test_migrate_handles_lock_on_alter(monkeypatch) -> None:
db.migrate(lock_id=123)
def test_migrate_skips_missing_access_request_table(monkeypatch) -> None:
class MissingAccessRequestsConn(DummyConn):
def execute(self, query, params=None):
if "ALTER TABLE access_requests" in query:
self.executed.append((query, params))
raise db_module.psycopg.errors.UndefinedTable()
return super().execute(query, params)
class MissingAccessRequestsPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = MissingAccessRequestsConn()
monkeypatch.setattr(db_module, "ConnectionPool", MissingAccessRequestsPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123, include_ariadne_tables=False)
assert any("ALTER TABLE access_requests" in query for query, _params in db._pool.conn.executed)
def test_migrate_ignores_unlock_failures(monkeypatch) -> None:
class UnlockFailureConn(DummyConn):
def execute(self, query, params=None):
if "pg_advisory_unlock" in query:
self.executed.append((query, params))
raise RuntimeError("unlock connection closed")
return super().execute(query, params)
class UnlockFailurePool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = UnlockFailureConn()
monkeypatch.setattr(db_module, "ConnectionPool", UnlockFailurePool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123, include_ariadne_tables=False, include_access_requests=False)
assert any("pg_advisory_unlock" in query for query, _params in db._pool.conn.executed)
def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None:
class RowConn(DummyConn):
def execute(self, query, params=None):