ariadne/tests/test_k8s_exec.py

118 lines
3.9 KiB
Python
Raw Normal View History

from __future__ import annotations
import types
import pytest
from ariadne.k8s.exec import ExecError, PodExecutor, _build_command
from ariadne.k8s.pods import PodRef
import ariadne.k8s.exec as exec_module
class DummyStream:
def __init__(self, stdout: str = "", stderr: str = "", exit_code: int = 0):
self._open = True
self._stdout = [stdout] if stdout else []
self._stderr = [stderr] if stderr else []
self._exit_code_ready = True
self._exit_code = exit_code
self.returncode = exit_code
def is_open(self) -> bool:
return self._open
def update(self, timeout: int = 1) -> None:
return None
def peek_stdout(self) -> bool:
return bool(self._stdout)
def read_stdout(self) -> str:
return self._stdout.pop(0)
def peek_stderr(self) -> bool:
return bool(self._stderr)
def read_stderr(self) -> str:
return self._stderr.pop(0)
def peek_exit_code(self) -> bool:
return self._exit_code_ready
def read_exit_code(self) -> int:
self._exit_code_ready = False
self._open = False
return self._exit_code
def close(self) -> None:
self._open = False
class HangingStream(DummyStream):
def __init__(self):
super().__init__(stdout="", stderr="", exit_code=0)
self._exit_code_ready = False
def peek_exit_code(self) -> bool:
return False
def test_build_command_wraps_env() -> None:
cmd = _build_command(["echo", "hello"], {"FOO": "bar"})
assert cmd[0] == "/bin/sh"
2026-01-21 21:56:28 -03:00
assert "export FOO=bar" in cmd[2]
def test_exec_returns_output(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stdout="ok\n", exit_code=0))
executor = PodExecutor("ns", "app=test", "container")
result = executor.exec(["echo", "ok"], check=True)
assert result.stdout == "ok\n"
assert result.ok
def test_exec_raises_on_failure(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stderr="bad", exit_code=2))
executor = PodExecutor("ns", "app=test", None)
with pytest.raises(ExecError):
executor.exec(["false"], check=True)
def test_exec_times_out(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: HangingStream())
executor = PodExecutor("ns", "app=test", None)
with pytest.raises(TimeoutError):
executor.exec(["sleep", "10"], timeout_sec=0.0, check=False)
def test_ensure_client_fallback(monkeypatch) -> None:
dummy_api = object()
monkeypatch.setattr(exec_module, "_CORE_API", None)
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None)
class DummyConfig:
def __init__(self):
self.calls = []
def load_incluster_config(self):
self.calls.append("incluster")
raise RuntimeError("no in-cluster")
def load_kube_config(self):
self.calls.append("kubeconfig")
dummy_config = DummyConfig()
monkeypatch.setattr(exec_module, "config", dummy_config)
monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api))
assert exec_module._ensure_client() is dummy_api