From 77fc16a7cbe7770b1e08bb635b1240f5d4c9e1b1 Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 04:37:47 -0300 Subject: [PATCH] test(ariadne): cover database migration edges --- ariadne/db/database.py | 8 +----- tests/test_database.py | 59 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/ariadne/db/database.py b/ariadne/db/database.py index bc53132..868fc16 100644 --- a/ariadne/db/database.py +++ b/ariadne/db/database.py @@ -94,13 +94,7 @@ class Database: except Exception: pass - def migrate( - self, - lock_id: int, - *, - include_ariadne_tables: bool = True, - include_access_requests: bool = True, - ) -> None: + def migrate(self, lock_id: int, *, include_ariadne_tables: bool = True, include_access_requests: bool = True) -> None: with self.connection() as conn: self._configure_timeouts(conn) if not self._try_advisory_lock(conn, lock_id): diff --git a/tests/test_database.py b/tests/test_database.py index fd3dc08..14ad7c2 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -98,6 +98,25 @@ def test_migrate_ignores_timeout_errors(monkeypatch) -> None: db.migrate(lock_id=123) +def test_migrate_stops_when_dict_lock_is_unavailable(monkeypatch) -> None: + class DictLockConn(DummyConn): + def execute(self, query, params=None): + if "pg_try_advisory_lock" in query: + return DummyResult(row={"pg_try_advisory_lock": False}) + return super().execute(query, params) + + class DictLockPool(DummyPool): + def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None): + self.conn = DictLockConn() + + monkeypatch.setattr(db_module, "ConnectionPool", DictLockPool) + db = Database("postgresql://user:pass@localhost/db") + + db.migrate(lock_id=123) + + assert not any("pg_advisory_unlock" in query for query, _params in db._pool.conn.executed) + + def test_migrate_handles_lock_on_alter(monkeypatch) -> None: class LockConn(DummyConn): def execute(self, query, params=None): @@ -114,6 +133,46 @@ def test_migrate_handles_lock_on_alter(monkeypatch) -> None: db.migrate(lock_id=123) +def test_migrate_skips_missing_access_request_table(monkeypatch) -> None: + class MissingAccessRequestsConn(DummyConn): + def execute(self, query, params=None): + if "ALTER TABLE access_requests" in query: + self.executed.append((query, params)) + raise db_module.psycopg.errors.UndefinedTable() + return super().execute(query, params) + + class MissingAccessRequestsPool(DummyPool): + def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None): + self.conn = MissingAccessRequestsConn() + + monkeypatch.setattr(db_module, "ConnectionPool", MissingAccessRequestsPool) + db = Database("postgresql://user:pass@localhost/db") + + db.migrate(lock_id=123, include_ariadne_tables=False) + + assert any("ALTER TABLE access_requests" in query for query, _params in db._pool.conn.executed) + + +def test_migrate_ignores_unlock_failures(monkeypatch) -> None: + class UnlockFailureConn(DummyConn): + def execute(self, query, params=None): + if "pg_advisory_unlock" in query: + self.executed.append((query, params)) + raise RuntimeError("unlock connection closed") + return super().execute(query, params) + + class UnlockFailurePool(DummyPool): + def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None): + self.conn = UnlockFailureConn() + + monkeypatch.setattr(db_module, "ConnectionPool", UnlockFailurePool) + db = Database("postgresql://user:pass@localhost/db") + + db.migrate(lock_id=123, include_ariadne_tables=False, include_access_requests=False) + + assert any("pg_advisory_unlock" in query for query, _params in db._pool.conn.executed) + + def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None: class RowConn(DummyConn): def execute(self, query, params=None):