diff --git a/backend/atlas_portal/migrate.py b/backend/atlas_portal/migrate.py index 4945cb8..7db86ee 100644 --- a/backend/atlas_portal/migrate.py +++ b/backend/atlas_portal/migrate.py @@ -9,5 +9,5 @@ def main() -> None: run_migrations() -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover - CLI guard is exercised by direct invocation. main() diff --git a/backend/tests/test_platform_helpers.py b/backend/tests/test_platform_helpers.py new file mode 100644 index 0000000..4a69663 --- /dev/null +++ b/backend/tests/test_platform_helpers.py @@ -0,0 +1,353 @@ +from __future__ import annotations + +"""Coverage for backend integration helper modules.""" + +from contextlib import contextmanager +from types import SimpleNamespace + +import httpx +import pytest + +from atlas_portal import ariadne_client, db, k8s, mailer, migrate +from atlas_portal.app_factory import create_app + + +class DummyResponse: + """Small httpx-like response for helper tests.""" + + def __init__(self, payload=None, *, status_code: int = 200, text: str = "", headers=None) -> None: + self._payload = payload if payload is not None else {} + self.status_code = status_code + self.text = text + self.headers = headers or {} + + def json(self): + """Return the configured JSON payload.""" + + if isinstance(self._payload, BaseException): + raise self._payload + return self._payload + + def raise_for_status(self) -> None: + """Raise for non-success responses like httpx does.""" + + if self.status_code >= 400: + raise httpx.HTTPStatusError("bad status", request=None, response=None) + + +def test_migrate_main_delegates_to_db(monkeypatch) -> None: + calls: list[str] = [] + monkeypatch.setattr(migrate, "run_migrations", lambda: calls.append("run")) + + migrate.main() + + assert calls == ["run"] + + +def test_mailer_validates_configuration_and_sends(monkeypatch) -> None: + sent: list[tuple[str, str]] = [] + + class DummySMTP: + def __init__(self, host, port, timeout): + self.host = host + self.port = port + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def starttls(self) -> None: + sent.append(("starttls", "")) + + def login(self, username, password) -> None: + sent.append(("login", f"{username}:{password}")) + + def send_message(self, message) -> None: + sent.append(("send", message["To"])) + + monkeypatch.setattr(mailer.settings, "SMTP_HOST", "") + with pytest.raises(mailer.MailerError): + mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body") + + monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev") + monkeypatch.setattr(mailer.settings, "SMTP_PORT", 587) + monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", False) + monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", True) + monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "user") + monkeypatch.setattr(mailer.settings, "SMTP_PASSWORD", "pw") + monkeypatch.setattr(mailer.smtplib, "SMTP", DummySMTP) + + mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body") + + assert ("starttls", "") in sent + assert ("login", "user:pw") in sent + assert ("send", "a@example.dev") in sent + body = mailer.access_request_verification_body(request_code="REQ", verify_url="https://verify.example.dev") + assert "REQ" in body and "https://verify.example.dev" in body + + +def test_mailer_reports_missing_recipient_and_send_errors(monkeypatch) -> None: + class FailingSMTP: + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def send_message(self, message) -> None: + raise OSError("offline") + + with pytest.raises(mailer.MailerError, match="missing recipient"): + mailer.send_text_email(to_addr="", subject="Subject", body="Body") + + monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev") + monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", True) + monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", False) + monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "") + monkeypatch.setattr(mailer.smtplib, "SMTP_SSL", FailingSMTP) + + with pytest.raises(mailer.MailerError, match="failed to send email"): + mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body") + + +def test_k8s_get_and_post_json(monkeypatch) -> None: + calls: list[tuple[str, str, object]] = [] + + class DummyClient: + def __init__(self, **kwargs): + calls.append(("init", "", kwargs)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, url): + calls.append(("get", url, None)) + return DummyResponse({"kind": "Pod"}) + + def post(self, url, json=None): + calls.append(("post", url, json)) + return DummyResponse({"kind": "Job"}) + + monkeypatch.setattr(k8s, "_read_service_account", lambda: ("token", "/ca.crt")) + monkeypatch.setattr(k8s.httpx, "Client", DummyClient) + + assert k8s.get_json("/api/v1/pods") == {"kind": "Pod"} + assert k8s.post_json("/apis/batch/v1/jobs", {"metadata": {"name": "job"}}) == {"kind": "Job"} + assert calls[1][0] == "get" + assert calls[2][0] == "init" + assert calls[3][0] == "post" + + +def test_k8s_service_account_and_bad_json(monkeypatch, tmp_path) -> None: + sa_path = tmp_path / "sa" + sa_path.mkdir() + monkeypatch.setattr(k8s, "_SA_PATH", sa_path) + with pytest.raises(RuntimeError, match="token missing"): + k8s._read_service_account() + + (sa_path / "token").write_text(" ") + (sa_path / "ca.crt").write_text("ca") + with pytest.raises(RuntimeError, match="token empty"): + k8s._read_service_account() + + (sa_path / "token").write_text("token") + assert k8s._read_service_account() == ("token", str(sa_path / "ca.crt")) + + class BadClient: + def __init__(self, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, url): + return DummyResponse([]) + + def post(self, url, json=None): + return DummyResponse([]) + + monkeypatch.setattr(k8s.httpx, "Client", BadClient) + with pytest.raises(RuntimeError, match="unexpected kubernetes response"): + k8s.get_json("/api/v1/pods") + with pytest.raises(RuntimeError, match="unexpected kubernetes response"): + k8s.post_json("/api/v1/pods", {}) + + +def test_ariadne_proxy_paths(monkeypatch) -> None: + monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "") + assert not ariadne_client.enabled() + with pytest.raises(ariadne_client.AriadneError): + ariadne_client.request_raw("GET", "/health") + + class DummyClient: + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def request(self, method, url, headers=None, json=None, params=None): + assert headers == {"Authorization": "Bearer token"} + return DummyResponse({"ok": True}) + + monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev") + monkeypatch.setattr(ariadne_client.httpx, "Client", DummyClient) + + app = create_app() + with app.test_request_context(headers={"Authorization": "Bearer token"}): + response = ariadne_client.request_raw("POST", "/path", payload={"a": 1}) + assert response.json() == {"ok": True} + flask_response, status = ariadne_client.proxy("POST", "/path") + assert status == 200 + assert flask_response.get_json() == {"ok": True} + + +def test_ariadne_error_and_proxy_fallback_paths(monkeypatch) -> None: + class ServerErrorClient: + def __init__(self, timeout): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def request(self, method, url, headers=None, json=None, params=None): + return DummyResponse({"error": "upstream"}, status_code=503) + + monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev") + monkeypatch.setattr(ariadne_client.httpx, "Client", ServerErrorClient) + app = create_app() + with app.test_request_context(): + assert ariadne_client.request_raw("GET", "/health").status_code == 503 + + attempts = {"count": 0} + + class FailingClient: + def __init__(self, timeout): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def request(self, method, url, headers=None, json=None, params=None): + attempts["count"] += 1 + raise httpx.RequestError("offline") + + monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_COUNT", 2) + monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_BACKOFF_SEC", 0) + monkeypatch.setattr(ariadne_client.httpx, "Client", FailingClient) + with app.test_request_context(): + with pytest.raises(ariadne_client.AriadneError): + ariadne_client.request_raw("GET", "/health") + assert attempts["count"] == 2 + + with app.test_request_context(): + monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: (_ for _ in ()).throw(ariadne_client.AriadneError("down", 504))) + response, status = ariadne_client.proxy("GET", "/health") + assert status == 504 + assert response.get_json()["error"] == "down" + + monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: DummyResponse(ValueError("bad json"), text="plain", status_code=502)) + response, status = ariadne_client.proxy("GET", "/health") + assert status == 502 + assert response.get_json()["error"] == "plain" + + +def test_db_pool_and_migration_paths(monkeypatch) -> None: + executed: list[tuple[str, object]] = [] + + class DummyConn: + row_factory = None + + def execute(self, query, params=None): + executed.append((str(query), params)) + if "pg_try_advisory_lock" in str(query): + return SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": True}) + return SimpleNamespace(fetchone=lambda: None) + + @contextmanager + def fake_connect(): + yield DummyConn() + + monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "") + assert not db.configured() + with pytest.raises(RuntimeError): + db._get_pool() + with pytest.raises(RuntimeError): + with db.connect(): + pass + + monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal") + monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True) + monkeypatch.setattr(db, "connect", fake_connect) + monkeypatch.setattr(db, "_release_advisory_lock", lambda conn, lock_id: executed.append(("release", lock_id))) + + db.run_migrations() + db.ensure_schema() + + assert any("CREATE TABLE IF NOT EXISTS access_requests" in query for query, _ in executed) + assert ("release", db.MIGRATION_LOCK_ID) in executed + + +def test_db_pool_connect_and_lock_edge_paths(monkeypatch) -> None: + class DummyPool: + def __init__(self, **kwargs): + self.kwargs = kwargs + + @contextmanager + def connection(self): + yield SimpleNamespace(row_factory=None) + + monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal") + monkeypatch.setattr(db, "ConnectionPool", DummyPool) + monkeypatch.setattr(db, "_pool", None) + + assert db.configured() + pool = db._get_pool() + assert pool.kwargs["conninfo"] == "postgres://portal" + assert "statement_timeout" in db._pool_kwargs()["options"] + with db.connect() as conn: + assert conn.row_factory is db.dict_row + assert db._get_pool() is pool + + tuple_conn = SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: (0,))) + assert not db._try_advisory_lock(tuple_conn, 1) + + class BadConn: + def execute(self, *args, **kwargs): + raise RuntimeError("ignore") + + db._release_advisory_lock(BadConn(), 1) + + +def test_db_migration_lock_skip(monkeypatch) -> None: + @contextmanager + def fake_connect(): + yield SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": False})) + + monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal") + monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True) + monkeypatch.setattr(db, "connect", fake_connect) + + db.run_migrations() + diff --git a/backend/tests/test_sync_job_helpers.py b/backend/tests/test_sync_job_helpers.py new file mode 100644 index 0000000..ba38726 --- /dev/null +++ b/backend/tests/test_sync_job_helpers.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +"""Tests for per-user Kubernetes sync Job adapters.""" + +import pytest + +from atlas_portal import firefly_user_sync, nextcloud_mail_sync, wger_user_sync + + +def _cronjob_template() -> dict: + """Build a CronJob payload shaped like the templates used in the cluster.""" + + return { + "spec": { + "jobTemplate": { + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": "worker", + "env": [ + {"name": "ONLY_USERNAME", "value": "old"}, + {"name": "FIREFLY_USER_EMAIL", "value": "old"}, + {"name": "WGER_USERNAME", "value": "old"}, + ], + } + ] + } + } + } + } + } + } + + +@pytest.mark.parametrize( + ("module", "namespace_attr", "cronjob_attr", "timeout_attr", "args", "expected_env"), + [ + ( + nextcloud_mail_sync, + "NEXTCLOUD_NAMESPACE", + "NEXTCLOUD_MAIL_SYNC_CRONJOB", + "NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC", + ("alice",), + {"ONLY_USERNAME": "alice"}, + ), + ( + firefly_user_sync, + "FIREFLY_NAMESPACE", + "FIREFLY_USER_SYNC_CRONJOB", + "FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC", + ("alice", "alice@example.dev", "pw"), + {"FIREFLY_USER_EMAIL": "alice@example.dev", "FIREFLY_USER_PASSWORD": "pw"}, + ), + ( + wger_user_sync, + "WGER_NAMESPACE", + "WGER_USER_SYNC_CRONJOB", + "WGER_USER_SYNC_WAIT_TIMEOUT_SEC", + ("alice", "alice@example.dev", "pw"), + {"WGER_USERNAME": "alice", "WGER_EMAIL": "alice@example.dev", "WGER_PASSWORD": "pw"}, + ), + ], +) +def test_user_sync_modules_render_jobs_and_trigger(monkeypatch, module, namespace_attr, cronjob_attr, timeout_attr, args, expected_env) -> None: + monkeypatch.setattr(module.settings, namespace_attr, "apps") + monkeypatch.setattr(module.settings, cronjob_attr, "sync-cron") + monkeypatch.setattr(module.settings, timeout_attr, 0) + monkeypatch.setattr(module.time, "time", lambda: 1000) + + posted: list[dict] = [] + + def fake_get_json(path: str) -> dict: + if "cronjobs" in path: + return _cronjob_template() + return {"status": {"conditions": [{"type": "Complete", "status": "True"}]}} + + def fake_post_json(path: str, payload: dict) -> dict: + posted.append(payload) + return {"metadata": {"name": payload["metadata"]["name"]}} + + monkeypatch.setattr(module, "get_json", fake_get_json) + monkeypatch.setattr(module, "post_json", fake_post_json) + + result = module.trigger(*args, wait=True) + + assert result["status"] in {"ok", "running"} + env = posted[0]["spec"]["template"]["spec"]["containers"][0]["env"] + env_map = {item["name"]: item["value"] for item in env} + for key, value in expected_env.items(): + assert env_map[key] == value + assert module._job_succeeded({"status": {"succeeded": 1}}) + assert module._job_failed({"status": {"failed": 1}}) + + +@pytest.mark.parametrize( + ("module", "namespace_attr", "cronjob_attr", "timeout_attr", "args"), + [ + ( + nextcloud_mail_sync, + "NEXTCLOUD_NAMESPACE", + "NEXTCLOUD_MAIL_SYNC_CRONJOB", + "NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC", + ("alice",), + ), + ( + firefly_user_sync, + "FIREFLY_NAMESPACE", + "FIREFLY_USER_SYNC_CRONJOB", + "FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC", + ("alice", "alice@example.dev", "pw"), + ), + ( + wger_user_sync, + "WGER_NAMESPACE", + "WGER_USER_SYNC_CRONJOB", + "WGER_USER_SYNC_WAIT_TIMEOUT_SEC", + ("alice", "alice@example.dev", "pw"), + ), + ], +) +def test_user_sync_modules_cover_edge_paths(monkeypatch, module, namespace_attr, cronjob_attr, timeout_attr, args) -> None: + assert module._safe_name_fragment("!!!") == "user" + assert module._job_succeeded({"status": {"conditions": [None, {"type": "Complete", "status": "True"}]}}) + assert not module._job_succeeded({"status": {"conditions": [{"type": "Complete", "status": "False"}]}}) + assert module._job_failed({"status": {"conditions": [None, {"type": "Failed", "status": "True"}]}}) + assert not module._job_failed({"status": {"conditions": [{"type": "Failed", "status": "False"}]}}) + + cronjob = _cronjob_template() + container = cronjob["spec"]["jobTemplate"]["spec"]["template"]["spec"]["containers"][0] + container["env"] = "not-a-list" + job = module._job_from_cronjob(cronjob, *args) + assert job["spec"]["template"]["spec"]["containers"][0]["env"] + + monkeypatch.setattr(module.settings, namespace_attr, "apps") + monkeypatch.setattr(module.settings, cronjob_attr, "sync-cron") + monkeypatch.setattr(module.settings, timeout_attr, 5) + monkeypatch.setattr(module.time, "sleep", lambda *_: None) + + with pytest.raises(RuntimeError, match="missing username"): + module.trigger("", *args[1:]) + + if module in {firefly_user_sync, wger_user_sync}: + with pytest.raises(RuntimeError, match="missing password"): + module.trigger(args[0], args[1], "") + monkeypatch.setattr(module.settings, namespace_attr, "") + with pytest.raises(RuntimeError, match="not configured"): + module.trigger(*args) + monkeypatch.setattr(module.settings, namespace_attr, "apps") + + def cron_then_complete(path: str) -> dict: + if "cronjobs" in path: + return _cronjob_template() + return {"status": {"conditions": [{"type": "Complete", "status": "True"}]}} + + monkeypatch.setattr(module, "get_json", cron_then_complete) + monkeypatch.setattr(module, "post_json", lambda path, payload: {}) + assert module.trigger(*args, wait=False)["status"] == "queued" + + monkeypatch.setattr(module, "post_json", lambda path, payload: {"metadata": {"name": ""}}) + with pytest.raises(RuntimeError, match="job name missing"): + module.trigger(*args, wait=True) + + monkeypatch.setattr(module, "post_json", lambda path, payload: {"metadata": {"name": payload["metadata"]["name"]}}) + clock = iter([0, 1, 2]) + monkeypatch.setattr(module.time, "time", lambda: next(clock)) + assert module.trigger(*args, wait=True)["status"] == "ok" + + def cron_then_failed(path: str) -> dict: + if "cronjobs" in path: + return _cronjob_template() + return {"status": {"conditions": [{"type": "Failed", "status": "True"}]}} + + clock = iter([0, 1, 2]) + monkeypatch.setattr(module.time, "time", lambda: next(clock)) + monkeypatch.setattr(module, "get_json", cron_then_failed) + assert module.trigger(*args, wait=True)["status"] == "error"