test(bstein-home): cover backend platform helpers
This commit is contained in:
parent
2f703005fc
commit
54a9fbde49
@ -9,5 +9,5 @@ def main() -> None:
|
||||
run_migrations()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == "__main__": # pragma: no cover - CLI guard is exercised by direct invocation.
|
||||
main()
|
||||
|
||||
353
backend/tests/test_platform_helpers.py
Normal file
353
backend/tests/test_platform_helpers.py
Normal file
@ -0,0 +1,353 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""Coverage for backend integration helper modules."""
|
||||
|
||||
from contextlib import contextmanager
|
||||
from types import SimpleNamespace
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from atlas_portal import ariadne_client, db, k8s, mailer, migrate
|
||||
from atlas_portal.app_factory import create_app
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
"""Small httpx-like response for helper tests."""
|
||||
|
||||
def __init__(self, payload=None, *, status_code: int = 200, text: str = "", headers=None) -> None:
|
||||
self._payload = payload if payload is not None else {}
|
||||
self.status_code = status_code
|
||||
self.text = text
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self):
|
||||
"""Return the configured JSON payload."""
|
||||
|
||||
if isinstance(self._payload, BaseException):
|
||||
raise self._payload
|
||||
return self._payload
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
"""Raise for non-success responses like httpx does."""
|
||||
|
||||
if self.status_code >= 400:
|
||||
raise httpx.HTTPStatusError("bad status", request=None, response=None)
|
||||
|
||||
|
||||
def test_migrate_main_delegates_to_db(monkeypatch) -> None:
|
||||
calls: list[str] = []
|
||||
monkeypatch.setattr(migrate, "run_migrations", lambda: calls.append("run"))
|
||||
|
||||
migrate.main()
|
||||
|
||||
assert calls == ["run"]
|
||||
|
||||
|
||||
def test_mailer_validates_configuration_and_sends(monkeypatch) -> None:
|
||||
sent: list[tuple[str, str]] = []
|
||||
|
||||
class DummySMTP:
|
||||
def __init__(self, host, port, timeout):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def starttls(self) -> None:
|
||||
sent.append(("starttls", ""))
|
||||
|
||||
def login(self, username, password) -> None:
|
||||
sent.append(("login", f"{username}:{password}"))
|
||||
|
||||
def send_message(self, message) -> None:
|
||||
sent.append(("send", message["To"]))
|
||||
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "")
|
||||
with pytest.raises(mailer.MailerError):
|
||||
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
||||
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev")
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_PORT", 587)
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", False)
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", True)
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "user")
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_PASSWORD", "pw")
|
||||
monkeypatch.setattr(mailer.smtplib, "SMTP", DummySMTP)
|
||||
|
||||
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
||||
|
||||
assert ("starttls", "") in sent
|
||||
assert ("login", "user:pw") in sent
|
||||
assert ("send", "a@example.dev") in sent
|
||||
body = mailer.access_request_verification_body(request_code="REQ", verify_url="https://verify.example.dev")
|
||||
assert "REQ" in body and "https://verify.example.dev" in body
|
||||
|
||||
|
||||
def test_mailer_reports_missing_recipient_and_send_errors(monkeypatch) -> None:
|
||||
class FailingSMTP:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def send_message(self, message) -> None:
|
||||
raise OSError("offline")
|
||||
|
||||
with pytest.raises(mailer.MailerError, match="missing recipient"):
|
||||
mailer.send_text_email(to_addr="", subject="Subject", body="Body")
|
||||
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_HOST", "smtp.example.dev")
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_USE_TLS", True)
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_STARTTLS", False)
|
||||
monkeypatch.setattr(mailer.settings, "SMTP_USERNAME", "")
|
||||
monkeypatch.setattr(mailer.smtplib, "SMTP_SSL", FailingSMTP)
|
||||
|
||||
with pytest.raises(mailer.MailerError, match="failed to send email"):
|
||||
mailer.send_text_email(to_addr="a@example.dev", subject="Subject", body="Body")
|
||||
|
||||
|
||||
def test_k8s_get_and_post_json(monkeypatch) -> None:
|
||||
calls: list[tuple[str, str, object]] = []
|
||||
|
||||
class DummyClient:
|
||||
def __init__(self, **kwargs):
|
||||
calls.append(("init", "", kwargs))
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def get(self, url):
|
||||
calls.append(("get", url, None))
|
||||
return DummyResponse({"kind": "Pod"})
|
||||
|
||||
def post(self, url, json=None):
|
||||
calls.append(("post", url, json))
|
||||
return DummyResponse({"kind": "Job"})
|
||||
|
||||
monkeypatch.setattr(k8s, "_read_service_account", lambda: ("token", "/ca.crt"))
|
||||
monkeypatch.setattr(k8s.httpx, "Client", DummyClient)
|
||||
|
||||
assert k8s.get_json("/api/v1/pods") == {"kind": "Pod"}
|
||||
assert k8s.post_json("/apis/batch/v1/jobs", {"metadata": {"name": "job"}}) == {"kind": "Job"}
|
||||
assert calls[1][0] == "get"
|
||||
assert calls[2][0] == "init"
|
||||
assert calls[3][0] == "post"
|
||||
|
||||
|
||||
def test_k8s_service_account_and_bad_json(monkeypatch, tmp_path) -> None:
|
||||
sa_path = tmp_path / "sa"
|
||||
sa_path.mkdir()
|
||||
monkeypatch.setattr(k8s, "_SA_PATH", sa_path)
|
||||
with pytest.raises(RuntimeError, match="token missing"):
|
||||
k8s._read_service_account()
|
||||
|
||||
(sa_path / "token").write_text(" ")
|
||||
(sa_path / "ca.crt").write_text("ca")
|
||||
with pytest.raises(RuntimeError, match="token empty"):
|
||||
k8s._read_service_account()
|
||||
|
||||
(sa_path / "token").write_text("token")
|
||||
assert k8s._read_service_account() == ("token", str(sa_path / "ca.crt"))
|
||||
|
||||
class BadClient:
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def get(self, url):
|
||||
return DummyResponse([])
|
||||
|
||||
def post(self, url, json=None):
|
||||
return DummyResponse([])
|
||||
|
||||
monkeypatch.setattr(k8s.httpx, "Client", BadClient)
|
||||
with pytest.raises(RuntimeError, match="unexpected kubernetes response"):
|
||||
k8s.get_json("/api/v1/pods")
|
||||
with pytest.raises(RuntimeError, match="unexpected kubernetes response"):
|
||||
k8s.post_json("/api/v1/pods", {})
|
||||
|
||||
|
||||
def test_ariadne_proxy_paths(monkeypatch) -> None:
|
||||
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "")
|
||||
assert not ariadne_client.enabled()
|
||||
with pytest.raises(ariadne_client.AriadneError):
|
||||
ariadne_client.request_raw("GET", "/health")
|
||||
|
||||
class DummyClient:
|
||||
def __init__(self, timeout):
|
||||
self.timeout = timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def request(self, method, url, headers=None, json=None, params=None):
|
||||
assert headers == {"Authorization": "Bearer token"}
|
||||
return DummyResponse({"ok": True})
|
||||
|
||||
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev")
|
||||
monkeypatch.setattr(ariadne_client.httpx, "Client", DummyClient)
|
||||
|
||||
app = create_app()
|
||||
with app.test_request_context(headers={"Authorization": "Bearer token"}):
|
||||
response = ariadne_client.request_raw("POST", "/path", payload={"a": 1})
|
||||
assert response.json() == {"ok": True}
|
||||
flask_response, status = ariadne_client.proxy("POST", "/path")
|
||||
assert status == 200
|
||||
assert flask_response.get_json() == {"ok": True}
|
||||
|
||||
|
||||
def test_ariadne_error_and_proxy_fallback_paths(monkeypatch) -> None:
|
||||
class ServerErrorClient:
|
||||
def __init__(self, timeout):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def request(self, method, url, headers=None, json=None, params=None):
|
||||
return DummyResponse({"error": "upstream"}, status_code=503)
|
||||
|
||||
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_URL", "https://ariadne.example.dev")
|
||||
monkeypatch.setattr(ariadne_client.httpx, "Client", ServerErrorClient)
|
||||
app = create_app()
|
||||
with app.test_request_context():
|
||||
assert ariadne_client.request_raw("GET", "/health").status_code == 503
|
||||
|
||||
attempts = {"count": 0}
|
||||
|
||||
class FailingClient:
|
||||
def __init__(self, timeout):
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def request(self, method, url, headers=None, json=None, params=None):
|
||||
attempts["count"] += 1
|
||||
raise httpx.RequestError("offline")
|
||||
|
||||
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_COUNT", 2)
|
||||
monkeypatch.setattr(ariadne_client.settings, "ARIADNE_RETRY_BACKOFF_SEC", 0)
|
||||
monkeypatch.setattr(ariadne_client.httpx, "Client", FailingClient)
|
||||
with app.test_request_context():
|
||||
with pytest.raises(ariadne_client.AriadneError):
|
||||
ariadne_client.request_raw("GET", "/health")
|
||||
assert attempts["count"] == 2
|
||||
|
||||
with app.test_request_context():
|
||||
monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: (_ for _ in ()).throw(ariadne_client.AriadneError("down", 504)))
|
||||
response, status = ariadne_client.proxy("GET", "/health")
|
||||
assert status == 504
|
||||
assert response.get_json()["error"] == "down"
|
||||
|
||||
monkeypatch.setattr(ariadne_client, "request_raw", lambda *a, **k: DummyResponse(ValueError("bad json"), text="plain", status_code=502))
|
||||
response, status = ariadne_client.proxy("GET", "/health")
|
||||
assert status == 502
|
||||
assert response.get_json()["error"] == "plain"
|
||||
|
||||
|
||||
def test_db_pool_and_migration_paths(monkeypatch) -> None:
|
||||
executed: list[tuple[str, object]] = []
|
||||
|
||||
class DummyConn:
|
||||
row_factory = None
|
||||
|
||||
def execute(self, query, params=None):
|
||||
executed.append((str(query), params))
|
||||
if "pg_try_advisory_lock" in str(query):
|
||||
return SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": True})
|
||||
return SimpleNamespace(fetchone=lambda: None)
|
||||
|
||||
@contextmanager
|
||||
def fake_connect():
|
||||
yield DummyConn()
|
||||
|
||||
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "")
|
||||
assert not db.configured()
|
||||
with pytest.raises(RuntimeError):
|
||||
db._get_pool()
|
||||
with pytest.raises(RuntimeError):
|
||||
with db.connect():
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
||||
monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True)
|
||||
monkeypatch.setattr(db, "connect", fake_connect)
|
||||
monkeypatch.setattr(db, "_release_advisory_lock", lambda conn, lock_id: executed.append(("release", lock_id)))
|
||||
|
||||
db.run_migrations()
|
||||
db.ensure_schema()
|
||||
|
||||
assert any("CREATE TABLE IF NOT EXISTS access_requests" in query for query, _ in executed)
|
||||
assert ("release", db.MIGRATION_LOCK_ID) in executed
|
||||
|
||||
|
||||
def test_db_pool_connect_and_lock_edge_paths(monkeypatch) -> None:
|
||||
class DummyPool:
|
||||
def __init__(self, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
@contextmanager
|
||||
def connection(self):
|
||||
yield SimpleNamespace(row_factory=None)
|
||||
|
||||
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
||||
monkeypatch.setattr(db, "ConnectionPool", DummyPool)
|
||||
monkeypatch.setattr(db, "_pool", None)
|
||||
|
||||
assert db.configured()
|
||||
pool = db._get_pool()
|
||||
assert pool.kwargs["conninfo"] == "postgres://portal"
|
||||
assert "statement_timeout" in db._pool_kwargs()["options"]
|
||||
with db.connect() as conn:
|
||||
assert conn.row_factory is db.dict_row
|
||||
assert db._get_pool() is pool
|
||||
|
||||
tuple_conn = SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: (0,)))
|
||||
assert not db._try_advisory_lock(tuple_conn, 1)
|
||||
|
||||
class BadConn:
|
||||
def execute(self, *args, **kwargs):
|
||||
raise RuntimeError("ignore")
|
||||
|
||||
db._release_advisory_lock(BadConn(), 1)
|
||||
|
||||
|
||||
def test_db_migration_lock_skip(monkeypatch) -> None:
|
||||
@contextmanager
|
||||
def fake_connect():
|
||||
yield SimpleNamespace(execute=lambda *a, **k: SimpleNamespace(fetchone=lambda: {"pg_try_advisory_lock": False}))
|
||||
|
||||
monkeypatch.setattr(db.settings, "PORTAL_DATABASE_URL", "postgres://portal")
|
||||
monkeypatch.setattr(db.settings, "PORTAL_RUN_MIGRATIONS", True)
|
||||
monkeypatch.setattr(db, "connect", fake_connect)
|
||||
|
||||
db.run_migrations()
|
||||
|
||||
178
backend/tests/test_sync_job_helpers.py
Normal file
178
backend/tests/test_sync_job_helpers.py
Normal file
@ -0,0 +1,178 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""Tests for per-user Kubernetes sync Job adapters."""
|
||||
|
||||
import pytest
|
||||
|
||||
from atlas_portal import firefly_user_sync, nextcloud_mail_sync, wger_user_sync
|
||||
|
||||
|
||||
def _cronjob_template() -> dict:
|
||||
"""Build a CronJob payload shaped like the templates used in the cluster."""
|
||||
|
||||
return {
|
||||
"spec": {
|
||||
"jobTemplate": {
|
||||
"spec": {
|
||||
"template": {
|
||||
"spec": {
|
||||
"containers": [
|
||||
{
|
||||
"name": "worker",
|
||||
"env": [
|
||||
{"name": "ONLY_USERNAME", "value": "old"},
|
||||
{"name": "FIREFLY_USER_EMAIL", "value": "old"},
|
||||
{"name": "WGER_USERNAME", "value": "old"},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("module", "namespace_attr", "cronjob_attr", "timeout_attr", "args", "expected_env"),
|
||||
[
|
||||
(
|
||||
nextcloud_mail_sync,
|
||||
"NEXTCLOUD_NAMESPACE",
|
||||
"NEXTCLOUD_MAIL_SYNC_CRONJOB",
|
||||
"NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice",),
|
||||
{"ONLY_USERNAME": "alice"},
|
||||
),
|
||||
(
|
||||
firefly_user_sync,
|
||||
"FIREFLY_NAMESPACE",
|
||||
"FIREFLY_USER_SYNC_CRONJOB",
|
||||
"FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice", "alice@example.dev", "pw"),
|
||||
{"FIREFLY_USER_EMAIL": "alice@example.dev", "FIREFLY_USER_PASSWORD": "pw"},
|
||||
),
|
||||
(
|
||||
wger_user_sync,
|
||||
"WGER_NAMESPACE",
|
||||
"WGER_USER_SYNC_CRONJOB",
|
||||
"WGER_USER_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice", "alice@example.dev", "pw"),
|
||||
{"WGER_USERNAME": "alice", "WGER_EMAIL": "alice@example.dev", "WGER_PASSWORD": "pw"},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_user_sync_modules_render_jobs_and_trigger(monkeypatch, module, namespace_attr, cronjob_attr, timeout_attr, args, expected_env) -> None:
|
||||
monkeypatch.setattr(module.settings, namespace_attr, "apps")
|
||||
monkeypatch.setattr(module.settings, cronjob_attr, "sync-cron")
|
||||
monkeypatch.setattr(module.settings, timeout_attr, 0)
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1000)
|
||||
|
||||
posted: list[dict] = []
|
||||
|
||||
def fake_get_json(path: str) -> dict:
|
||||
if "cronjobs" in path:
|
||||
return _cronjob_template()
|
||||
return {"status": {"conditions": [{"type": "Complete", "status": "True"}]}}
|
||||
|
||||
def fake_post_json(path: str, payload: dict) -> dict:
|
||||
posted.append(payload)
|
||||
return {"metadata": {"name": payload["metadata"]["name"]}}
|
||||
|
||||
monkeypatch.setattr(module, "get_json", fake_get_json)
|
||||
monkeypatch.setattr(module, "post_json", fake_post_json)
|
||||
|
||||
result = module.trigger(*args, wait=True)
|
||||
|
||||
assert result["status"] in {"ok", "running"}
|
||||
env = posted[0]["spec"]["template"]["spec"]["containers"][0]["env"]
|
||||
env_map = {item["name"]: item["value"] for item in env}
|
||||
for key, value in expected_env.items():
|
||||
assert env_map[key] == value
|
||||
assert module._job_succeeded({"status": {"succeeded": 1}})
|
||||
assert module._job_failed({"status": {"failed": 1}})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("module", "namespace_attr", "cronjob_attr", "timeout_attr", "args"),
|
||||
[
|
||||
(
|
||||
nextcloud_mail_sync,
|
||||
"NEXTCLOUD_NAMESPACE",
|
||||
"NEXTCLOUD_MAIL_SYNC_CRONJOB",
|
||||
"NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice",),
|
||||
),
|
||||
(
|
||||
firefly_user_sync,
|
||||
"FIREFLY_NAMESPACE",
|
||||
"FIREFLY_USER_SYNC_CRONJOB",
|
||||
"FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice", "alice@example.dev", "pw"),
|
||||
),
|
||||
(
|
||||
wger_user_sync,
|
||||
"WGER_NAMESPACE",
|
||||
"WGER_USER_SYNC_CRONJOB",
|
||||
"WGER_USER_SYNC_WAIT_TIMEOUT_SEC",
|
||||
("alice", "alice@example.dev", "pw"),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_user_sync_modules_cover_edge_paths(monkeypatch, module, namespace_attr, cronjob_attr, timeout_attr, args) -> None:
|
||||
assert module._safe_name_fragment("!!!") == "user"
|
||||
assert module._job_succeeded({"status": {"conditions": [None, {"type": "Complete", "status": "True"}]}})
|
||||
assert not module._job_succeeded({"status": {"conditions": [{"type": "Complete", "status": "False"}]}})
|
||||
assert module._job_failed({"status": {"conditions": [None, {"type": "Failed", "status": "True"}]}})
|
||||
assert not module._job_failed({"status": {"conditions": [{"type": "Failed", "status": "False"}]}})
|
||||
|
||||
cronjob = _cronjob_template()
|
||||
container = cronjob["spec"]["jobTemplate"]["spec"]["template"]["spec"]["containers"][0]
|
||||
container["env"] = "not-a-list"
|
||||
job = module._job_from_cronjob(cronjob, *args)
|
||||
assert job["spec"]["template"]["spec"]["containers"][0]["env"]
|
||||
|
||||
monkeypatch.setattr(module.settings, namespace_attr, "apps")
|
||||
monkeypatch.setattr(module.settings, cronjob_attr, "sync-cron")
|
||||
monkeypatch.setattr(module.settings, timeout_attr, 5)
|
||||
monkeypatch.setattr(module.time, "sleep", lambda *_: None)
|
||||
|
||||
with pytest.raises(RuntimeError, match="missing username"):
|
||||
module.trigger("", *args[1:])
|
||||
|
||||
if module in {firefly_user_sync, wger_user_sync}:
|
||||
with pytest.raises(RuntimeError, match="missing password"):
|
||||
module.trigger(args[0], args[1], "")
|
||||
monkeypatch.setattr(module.settings, namespace_attr, "")
|
||||
with pytest.raises(RuntimeError, match="not configured"):
|
||||
module.trigger(*args)
|
||||
monkeypatch.setattr(module.settings, namespace_attr, "apps")
|
||||
|
||||
def cron_then_complete(path: str) -> dict:
|
||||
if "cronjobs" in path:
|
||||
return _cronjob_template()
|
||||
return {"status": {"conditions": [{"type": "Complete", "status": "True"}]}}
|
||||
|
||||
monkeypatch.setattr(module, "get_json", cron_then_complete)
|
||||
monkeypatch.setattr(module, "post_json", lambda path, payload: {})
|
||||
assert module.trigger(*args, wait=False)["status"] == "queued"
|
||||
|
||||
monkeypatch.setattr(module, "post_json", lambda path, payload: {"metadata": {"name": ""}})
|
||||
with pytest.raises(RuntimeError, match="job name missing"):
|
||||
module.trigger(*args, wait=True)
|
||||
|
||||
monkeypatch.setattr(module, "post_json", lambda path, payload: {"metadata": {"name": payload["metadata"]["name"]}})
|
||||
clock = iter([0, 1, 2])
|
||||
monkeypatch.setattr(module.time, "time", lambda: next(clock))
|
||||
assert module.trigger(*args, wait=True)["status"] == "ok"
|
||||
|
||||
def cron_then_failed(path: str) -> dict:
|
||||
if "cronjobs" in path:
|
||||
return _cronjob_template()
|
||||
return {"status": {"conditions": [{"type": "Failed", "status": "True"}]}}
|
||||
|
||||
clock = iter([0, 1, 2])
|
||||
monkeypatch.setattr(module.time, "time", lambda: next(clock))
|
||||
monkeypatch.setattr(module, "get_json", cron_then_failed)
|
||||
assert module.trigger(*args, wait=True)["status"] == "error"
|
||||
Loading…
x
Reference in New Issue
Block a user