354 lines
12 KiB
Python
354 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
"""Coverage for backend integration helper modules."""
|
|
|
|
from contextlib import contextmanager
|
|
from types import SimpleNamespace
|
|
|
|
import httpx
|
|
import pytest
|
|
|
|
from atlas_portal import ariadne_client, db, k8s, mailer, migrate
|
|
from atlas_portal.app_factory import create_app
|
|
|
|
|
|
class DummyResponse:
|
|
"""Small httpx-like response for helper tests."""
|
|
|
|
def __init__(self, payload=None, *, status_code: int = 200, text: str = "", headers=None) -> None:
|
|
self._payload = payload if payload is not None else {}
|
|
self.status_code = status_code
|
|
self.text = text
|
|
self.headers = headers or {}
|
|
|
|
def json(self):
|
|
"""Return the configured JSON payload."""
|
|
|
|
if isinstance(self._payload, BaseException):
|
|
raise self._payload
|
|
return self._payload
|
|
|
|
def raise_for_status(self) -> None:
|
|
"""Raise for non-success responses like httpx does."""
|
|
|
|
if self.status_code >= 400:
|
|
raise httpx.HTTPStatusError("bad status", request=None, response=None)
|
|
|
|
|
|
def test_migrate_main_delegates_to_db(monkeypatch) -> None:
|
|
calls: list[str] = []
|
|
monkeypatch.setattr(migrate, "run_migrations", lambda: calls.append("run"))
|
|
|
|
migrate.main()
|
|
|
|
assert calls == ["run"]
|
|
|
|
|
|
def test_mailer_validates_configuration_and_sends(monkeypatch) -> None:
|
|
sent: list[tuple[str, str]] = []
|
|
|
|
class DummySMTP:
|
|
def __init__(self, host, port, timeout):
|
|
self.host = host
|
|
self.port = port
|
|
self.timeout = timeout
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def starttls(self) -> None:
|
|
sent.append(("starttls", ""))
|
|
|
|
def login(self, username, password) -> None:
|
|
sent.append(("login", f"{username}:{password}"))
|
|
|
|
def send_message(self, message) -> None:
|
|
sent.append(("send", message["To"]))
|
|
|
|
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "")
|
|
with pytest.raises(mailer.MailerError):
|
|
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
|
|
|
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev")
|
|
monkeypatch.setattr(mailer.settings, "SMTP_PORT", 587)
|
|
monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", False)
|
|
monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", True)
|
|
monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "user")
|
|
monkeypatch.setattr(mailer.settings, "SMTP_PASSWORD", "pw")
|
|
monkeypatch.setattr(mailer.smtplib, "SMTP", DummySMTP)
|
|
|
|
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
|
|
|
assert ("starttls", "") in sent
|
|
assert ("login", "user:pw") in sent
|
|
assert ("send", "a@example.dev") in sent
|
|
body = mailer.access_request_verification_body(request_code="REQ", verify_url="https://verify.example.dev")
|
|
assert "REQ" in body and "https://verify.example.dev" in body
|
|
|
|
|
|
def test_mailer_reports_missing_recipient_and_send_errors(monkeypatch) -> None:
|
|
class FailingSMTP:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def send_message(self, message) -> None:
|
|
raise OSError("offline")
|
|
|
|
with pytest.raises(mailer.MailerError, match="missing recipient"):
|
|
mailer.send_text_email(to_addr="", subject="Subject", body="Body")
|
|
|
|
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev")
|
|
monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", True)
|
|
monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", False)
|
|
monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "")
|
|
monkeypatch.setattr(mailer.smtplib, "SMTP_SSL", FailingSMTP)
|
|
|
|
with pytest.raises(mailer.MailerError, match="failed to send email"):
|
|
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
|
|
|
|
|
def test_k8s_get_and_post_json(monkeypatch) -> None:
|
|
calls: list[tuple[str, str, object]] = []
|
|
|
|
class DummyClient:
|
|
def __init__(self, **kwargs):
|
|
calls.append(("init", "", kwargs))
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def get(self, url):
|
|
calls.append(("get", url, None))
|
|
return DummyResponse({"kind": "Pod"})
|
|
|
|
def post(self, url, json=None):
|
|
calls.append(("post", url, json))
|
|
return DummyResponse({"kind": "Job"})
|
|
|
|
monkeypatch.setattr(k8s, "_read_service_account", lambda: ("token", "/ca.crt"))
|
|
monkeypatch.setattr(k8s.httpx, "Client", DummyClient)
|
|
|
|
assert k8s.get_json("/api/v1/pods") == {"kind": "Pod"}
|
|
assert k8s.post_json("/apis/batch/v1/jobs", {"metadata": {"name": "job"}}) == {"kind": "Job"}
|
|
assert calls[1][0] == "get"
|
|
assert calls[2][0] == "init"
|
|
assert calls[3][0] == "post"
|
|
|
|
|
|
def test_k8s_service_account_and_bad_json(monkeypatch, tmp_path) -> None:
|
|
sa_path = tmp_path / "sa"
|
|
sa_path.mkdir()
|
|
monkeypatch.setattr(k8s, "_SA_PATH", sa_path)
|
|
with pytest.raises(RuntimeError, match="token missing"):
|
|
k8s._read_service_account()
|
|
|
|
(sa_path / "token").write_text(" ")
|
|
(sa_path / "ca.crt").write_text("ca")
|
|
with pytest.raises(RuntimeError, match="token empty"):
|
|
k8s._read_service_account()
|
|
|
|
(sa_path / "token").write_text("token")
|
|
assert k8s._read_service_account() == ("token", str(sa_path / "ca.crt"))
|
|
|
|
class BadClient:
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def get(self, url):
|
|
return DummyResponse([])
|
|
|
|
def post(self, url, json=None):
|
|
return DummyResponse([])
|
|
|
|
monkeypatch.setattr(k8s.httpx, "Client", BadClient)
|
|
with pytest.raises(RuntimeError, match="unexpected kubernetes response"):
|
|
k8s.get_json("/api/v1/pods")
|
|
with pytest.raises(RuntimeError, match="unexpected kubernetes response"):
|
|
k8s.post_json("/api/v1/pods", {})
|
|
|
|
|
|
def test_ariadne_proxy_paths(monkeypatch) -> None:
|
|
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "")
|
|
assert not ariadne_client.enabled()
|
|
with pytest.raises(ariadne_client.AriadneError):
|
|
ariadne_client.request_raw("GET", "/health")
|
|
|
|
class DummyClient:
|
|
def __init__(self, timeout):
|
|
self.timeout = timeout
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def request(self, method, url, headers=None, json=None, params=None):
|
|
assert headers == {"Authorization": "Bearer token"}
|
|
return DummyResponse({"ok": True})
|
|
|
|
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev")
|
|
monkeypatch.setattr(ariadne_client.httpx, "Client", DummyClient)
|
|
|
|
app = create_app()
|
|
with app.test_request_context(headers={"Authorization": "Bearer token"}):
|
|
response = ariadne_client.request_raw("POST", "/path", payload={"a": 1})
|
|
assert response.json() == {"ok": True}
|
|
flask_response, status = ariadne_client.proxy("POST", "/path")
|
|
assert status == 200
|
|
assert flask_response.get_json() == {"ok": True}
|
|
|
|
|
|
def test_ariadne_error_and_proxy_fallback_paths(monkeypatch) -> None:
|
|
class ServerErrorClient:
|
|
def __init__(self, timeout):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def request(self, method, url, headers=None, json=None, params=None):
|
|
return DummyResponse({"error": "upstream"}, status_code=503)
|
|
|
|
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev")
|
|
monkeypatch.setattr(ariadne_client.httpx, "Client", ServerErrorClient)
|
|
app = create_app()
|
|
with app.test_request_context():
|
|
assert ariadne_client.request_raw("GET", "/health").status_code == 503
|
|
|
|
attempts = {"count": 0}
|
|
|
|
class FailingClient:
|
|
def __init__(self, timeout):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def request(self, method, url, headers=None, json=None, params=None):
|
|
attempts["count"] += 1
|
|
raise httpx.RequestError("offline")
|
|
|
|
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_COUNT", 2)
|
|
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_BACKOFF_SEC", 0)
|
|
monkeypatch.setattr(ariadne_client.httpx, "Client", FailingClient)
|
|
with app.test_request_context():
|
|
with pytest.raises(ariadne_client.AriadneError):
|
|
ariadne_client.request_raw("GET", "/health")
|
|
assert attempts["count"] == 2
|
|
|
|
with app.test_request_context():
|
|
monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: (_ for _ in ()).throw(ariadne_client.AriadneError("down", 504)))
|
|
response, status = ariadne_client.proxy("GET", "/health")
|
|
assert status == 504
|
|
assert response.get_json()["error"] == "down"
|
|
|
|
monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: DummyResponse(ValueError("bad json"), text="plain", status_code=502))
|
|
response, status = ariadne_client.proxy("GET", "/health")
|
|
assert status == 502
|
|
assert response.get_json()["error"] == "plain"
|
|
|
|
|
|
def test_db_pool_and_migration_paths(monkeypatch) -> None:
|
|
executed: list[tuple[str, object]] = []
|
|
|
|
class DummyConn:
|
|
row_factory = None
|
|
|
|
def execute(self, query, params=None):
|
|
executed.append((str(query), params))
|
|
if "pg_try_advisory_lock" in str(query):
|
|
return SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": True})
|
|
return SimpleNamespace(fetchone=lambda: None)
|
|
|
|
@contextmanager
|
|
def fake_connect():
|
|
yield DummyConn()
|
|
|
|
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "")
|
|
assert not db.configured()
|
|
with pytest.raises(RuntimeError):
|
|
db._get_pool()
|
|
with pytest.raises(RuntimeError):
|
|
with db.connect():
|
|
pass
|
|
|
|
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
|
monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True)
|
|
monkeypatch.setattr(db, "connect", fake_connect)
|
|
monkeypatch.setattr(db, "_release_advisory_lock", lambda conn, lock_id: executed.append(("release", lock_id)))
|
|
|
|
db.run_migrations()
|
|
db.ensure_schema()
|
|
|
|
assert any("CREATE TABLE IF NOT EXISTS access_requests" in query for query, _ in executed)
|
|
assert ("release", db.MIGRATION_LOCK_ID) in executed
|
|
|
|
|
|
def test_db_pool_connect_and_lock_edge_paths(monkeypatch) -> None:
|
|
class DummyPool:
|
|
def __init__(self, **kwargs):
|
|
self.kwargs = kwargs
|
|
|
|
@contextmanager
|
|
def connection(self):
|
|
yield SimpleNamespace(row_factory=None)
|
|
|
|
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
|
monkeypatch.setattr(db, "ConnectionPool", DummyPool)
|
|
monkeypatch.setattr(db, "_pool", None)
|
|
|
|
assert db.configured()
|
|
pool = db._get_pool()
|
|
assert pool.kwargs["conninfo"] == "postgres://portal"
|
|
assert "statement_timeout" in db._pool_kwargs()["options"]
|
|
with db.connect() as conn:
|
|
assert conn.row_factory is db.dict_row
|
|
assert db._get_pool() is pool
|
|
|
|
tuple_conn = SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: (0,)))
|
|
assert not db._try_advisory_lock(tuple_conn, 1)
|
|
|
|
class BadConn:
|
|
def execute(self, *args, **kwargs):
|
|
raise RuntimeError("ignore")
|
|
|
|
db._release_advisory_lock(BadConn(), 1)
|
|
|
|
|
|
def test_db_migration_lock_skip(monkeypatch) -> None:
|
|
@contextmanager
|
|
def fake_connect():
|
|
yield SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": False}))
|
|
|
|
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
|
monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True)
|
|
monkeypatch.setattr(db, "connect", fake_connect)
|
|
|
|
db.run_migrations()
|
|
|