ariadne/tests/test_database.py

132 lines
4.0 KiB
Python
Raw Normal View History

2026-01-19 19:01:32 -03:00
from __future__ import annotations
from contextlib import contextmanager
import pytest
2026-01-19 19:01:32 -03:00
import ariadne.db.database as db_module
from ariadne.db.database import Database
class DummyResult:
def __init__(self, row=None, rows=None):
self._row = row
self._rows = rows or []
def fetchone(self):
return self._row
def fetchall(self):
return self._rows
class DummyConn:
def __init__(self):
self.row_factory = None
self.executed = []
def execute(self, query, params=None):
self.executed.append((query, params))
if "pg_try_advisory_lock" in query:
return DummyResult(row=(True,))
2026-01-19 19:01:32 -03:00
return DummyResult()
class DummyPool:
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
2026-01-19 19:01:32 -03:00
self.conn = DummyConn()
@contextmanager
def connection(self):
yield self.conn
def close(self):
return None
def test_migrate_runs(monkeypatch) -> None:
2026-01-19 19:01:32 -03:00
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
2026-01-19 19:01:32 -03:00
assert db._pool.conn.executed
def test_fetch_and_execute(monkeypatch) -> None:
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
db = Database("postgresql://user:pass@localhost/db")
db.execute("SELECT 1")
db.fetchone("SELECT 1")
db.fetchall("SELECT 1")
db.close()
assert db._pool.conn.executed
def test_database_requires_dsn() -> None:
with pytest.raises(RuntimeError):
Database("")
def test_migrate_handles_lock(monkeypatch) -> None:
class LockConn(DummyConn):
def execute(self, query, params=None):
if "CREATE TABLE" in query:
raise db_module.psycopg.errors.LockNotAvailable()
return super().execute(query, params)
class LockPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = LockConn()
monkeypatch.setattr(db_module, "ConnectionPool", LockPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
def test_migrate_ignores_timeout_errors(monkeypatch) -> None:
class TimeoutConn(DummyConn):
def execute(self, query, params=None):
if query.startswith("SET lock_timeout") or query.startswith("SET statement_timeout"):
raise RuntimeError("boom")
return super().execute(query, params)
class TimeoutPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = TimeoutConn()
monkeypatch.setattr(db_module, "ConnectionPool", TimeoutPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
def test_migrate_handles_lock_on_alter(monkeypatch) -> None:
class LockConn(DummyConn):
def execute(self, query, params=None):
if "ALTER TABLE access_requests" in query:
raise db_module.psycopg.errors.QueryCanceled()
return super().execute(query, params)
class LockPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = LockConn()
monkeypatch.setattr(db_module, "ConnectionPool", LockPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None:
class RowConn(DummyConn):
def execute(self, query, params=None):
if "fetchone" in query:
return DummyResult(row={"id": 1})
return DummyResult(row=None, rows=[{"id": 1}, {"id": 2}])
class RowPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = RowConn()
monkeypatch.setattr(db_module, "ConnectionPool", RowPool)
db = Database("postgresql://user:pass@localhost/db")
assert db.fetchone("fetchone") == {"id": 1}
assert db.fetchall("fetchall") == [{"id": 1}, {"id": 2}]