from __future__ import annotations import builtins import importlib.util import sys import types import pytest from ariadne.k8s.exec import ExecError, PodExecutor, _build_command from ariadne.k8s.pods import PodRef import ariadne.k8s.exec as exec_module class DummyStream: def __init__(self, stdout: str = "", stderr: str = "", exit_code: int = 0): self._open = True self._stdout = [stdout] if stdout else [] self._stderr = [stderr] if stderr else [] self._exit_code_ready = True self._exit_code = exit_code self.returncode = exit_code def is_open(self) -> bool: return self._open def update(self, timeout: int = 1) -> None: return None def peek_stdout(self) -> bool: return bool(self._stdout) def read_stdout(self) -> str: return self._stdout.pop(0) def peek_stderr(self) -> bool: return bool(self._stderr) def read_stderr(self) -> str: return self._stderr.pop(0) def peek_exit_code(self) -> bool: return self._exit_code_ready def read_exit_code(self) -> int: self._exit_code_ready = False self._open = False return self._exit_code def close(self) -> None: self._open = False class HangingStream(DummyStream): def __init__(self): super().__init__(stdout="", stderr="", exit_code=0) self._exit_code_ready = False def peek_exit_code(self) -> bool: return False class ReturnCodeStream(DummyStream): def __init__(self): super().__init__(stdout="fallback", stderr="", exit_code=0) self.returncode = 7 def is_open(self) -> bool: return False def peek_exit_code(self): raise AssertionError("closed streams should not read exit code") def test_build_command_wraps_env() -> None: cmd = _build_command(["echo", "hello"], {"FOO": "bar"}) assert cmd[0] == "/bin/sh" assert "export FOO=bar" in cmd[2] assert _build_command("echo hello", None) == ["/bin/sh", "-c", "echo hello"] def test_exec_returns_output(monkeypatch) -> None: monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns")) monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None)) monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stdout="ok\n", exit_code=0)) executor = PodExecutor("ns", "app=test", "container") result = executor.exec(["echo", "ok"], check=True) assert result.stdout == "ok\n" assert result.ok def test_exec_raises_on_failure(monkeypatch) -> None: monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns")) monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None)) monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stderr="bad", exit_code=2)) executor = PodExecutor("ns", "app=test", None) with pytest.raises(ExecError): executor.exec(["false"], check=True) def test_exec_times_out(monkeypatch) -> None: monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns")) monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None)) monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: HangingStream()) executor = PodExecutor("ns", "app=test", None) with pytest.raises(TimeoutError): executor.exec(["sleep", "10"], timeout_sec=0.0, check=False) def test_exec_uses_returncode_when_stream_has_no_exit_code(monkeypatch) -> None: monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns")) monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None)) monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: ReturnCodeStream()) result = PodExecutor("ns", "app=test", None).exec("echo ok", check=False) assert result.exit_code == 7 assert result.ok is False def test_ensure_client_fallback(monkeypatch) -> None: dummy_api = object() monkeypatch.setattr(exec_module, "_CORE_API", None) monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None) class DummyConfig: def __init__(self): self.calls = [] def load_incluster_config(self): self.calls.append("incluster") raise RuntimeError("no in-cluster") def load_kube_config(self): self.calls.append("kubeconfig") dummy_config = DummyConfig() monkeypatch.setattr(exec_module, "config", dummy_config) monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api)) assert exec_module._ensure_client() is dummy_api def test_ensure_client_cached_and_import_error(monkeypatch) -> None: cached = object() monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None) monkeypatch.setattr(exec_module, "_CORE_API", cached) assert exec_module._ensure_client() is cached error = RuntimeError("missing kubernetes") monkeypatch.setattr(exec_module, "_IMPORT_ERROR", error) monkeypatch.setattr(exec_module, "_CORE_API", None) with pytest.raises(RuntimeError, match="kubernetes client missing"): exec_module._ensure_client() def test_exec_module_import_error_fallback(monkeypatch) -> None: real_import = builtins.__import__ def fake_import(name, globals=None, locals=None, fromlist=(), level=0): if name == "kubernetes" or name.startswith("kubernetes."): raise RuntimeError("kubernetes unavailable") return real_import(name, globals, locals, fromlist, level) module_name = "ariadne.k8s.exec_import_failure_probe" spec = importlib.util.spec_from_file_location(module_name, exec_module.__file__) assert spec and spec.loader module = importlib.util.module_from_spec(spec) monkeypatch.setattr(builtins, "__import__", fake_import) monkeypatch.setitem(sys.modules, module_name, module) spec.loader.exec_module(module) assert module.client is None assert module.config is None assert module.stream is None assert isinstance(module._IMPORT_ERROR, RuntimeError)