261 lines
10 KiB
Python
261 lines
10 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
"""Tests for Vaultwarden invite and Kubernetes discovery helpers."""
|
||
|
|
|
||
|
|
import base64
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from atlas_portal import vaultwarden
|
||
|
|
|
||
|
|
|
||
|
|
class DummyResponse:
|
||
|
|
"""HTTP response double used by Vaultwarden tests."""
|
||
|
|
|
||
|
|
def __init__(self, payload=None, *, status_code: int = 200, text: str = "") -> None:
|
||
|
|
self._payload = payload if payload is not None else {}
|
||
|
|
self.status_code = status_code
|
||
|
|
self.text = text
|
||
|
|
|
||
|
|
def json(self):
|
||
|
|
"""Return configured JSON payload."""
|
||
|
|
|
||
|
|
return self._payload
|
||
|
|
|
||
|
|
def raise_for_status(self) -> None:
|
||
|
|
"""Raise for HTTP failure statuses."""
|
||
|
|
|
||
|
|
if self.status_code >= 400:
|
||
|
|
raise RuntimeError("bad status")
|
||
|
|
|
||
|
|
|
||
|
|
class DummyClient:
|
||
|
|
"""httpx.Client replacement with queued responses."""
|
||
|
|
|
||
|
|
responses: list[DummyResponse] = []
|
||
|
|
calls: list[tuple[str, str, dict]] = []
|
||
|
|
closed = 0
|
||
|
|
|
||
|
|
def __init__(self, **kwargs):
|
||
|
|
self.kwargs = kwargs
|
||
|
|
|
||
|
|
def __enter__(self):
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __exit__(self, exc_type, exc, tb):
|
||
|
|
return False
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def reset(cls, *responses: DummyResponse) -> None:
|
||
|
|
"""Replace queued responses and clear captured calls."""
|
||
|
|
|
||
|
|
cls.responses = list(responses)
|
||
|
|
cls.calls = []
|
||
|
|
cls.closed = 0
|
||
|
|
|
||
|
|
def close(self) -> None:
|
||
|
|
"""Record cache eviction closing behavior."""
|
||
|
|
|
||
|
|
type(self).closed += 1
|
||
|
|
|
||
|
|
def get(self, url):
|
||
|
|
self.calls.append(("GET", url, {}))
|
||
|
|
return self.responses.pop(0)
|
||
|
|
|
||
|
|
def post(self, url, **kwargs):
|
||
|
|
self.calls.append(("POST", url, kwargs))
|
||
|
|
return self.responses.pop(0)
|
||
|
|
|
||
|
|
|
||
|
|
def _reset_admin_state(monkeypatch) -> None:
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION", None)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION_EXPIRES_AT", 0.0)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION_BASE_URL", "")
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_RATE_LIMITED_UNTIL", 0.0)
|
||
|
|
|
||
|
|
|
||
|
|
def test_service_account_and_k8s_json_paths(monkeypatch, tmp_path: Path) -> None:
|
||
|
|
sa_path = tmp_path / "sa"
|
||
|
|
sa_path.mkdir()
|
||
|
|
monkeypatch.setattr(vaultwarden, "_SA_PATH", sa_path)
|
||
|
|
|
||
|
|
with pytest.raises(RuntimeError, match="token missing"):
|
||
|
|
vaultwarden._read_service_account()
|
||
|
|
|
||
|
|
(sa_path / "token").write_text(" ")
|
||
|
|
(sa_path / "ca.crt").write_text("ca")
|
||
|
|
with pytest.raises(RuntimeError, match="token empty"):
|
||
|
|
vaultwarden._read_service_account()
|
||
|
|
|
||
|
|
(sa_path / "token").write_text("token")
|
||
|
|
assert vaultwarden._read_service_account() == ("token", str(sa_path / "ca.crt"))
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_read_service_account", lambda: ("token", "/ca.crt"))
|
||
|
|
monkeypatch.setattr(vaultwarden.httpx, "Client", DummyClient)
|
||
|
|
DummyClient.reset(DummyResponse({"items": []}), DummyResponse([]))
|
||
|
|
|
||
|
|
assert vaultwarden._k8s_get_json("/api") == {"items": []}
|
||
|
|
with pytest.raises(RuntimeError, match="unexpected kubernetes response"):
|
||
|
|
vaultwarden._k8s_get_json("/api")
|
||
|
|
|
||
|
|
|
||
|
|
def test_k8s_pod_and_secret_helpers(monkeypatch) -> None:
|
||
|
|
encoded = base64.b64encode(b"admin-token").decode("ascii")
|
||
|
|
pods = {
|
||
|
|
"items": [
|
||
|
|
{"status": {"phase": "Pending", "podIP": "10.0.0.1"}},
|
||
|
|
{
|
||
|
|
"status": {
|
||
|
|
"phase": "Running",
|
||
|
|
"podIP": "10.0.0.2",
|
||
|
|
"conditions": [{"type": "Ready", "status": "True"}],
|
||
|
|
}
|
||
|
|
},
|
||
|
|
]
|
||
|
|
}
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: pods if "pods" in path else {"data": {"token": encoded}})
|
||
|
|
|
||
|
|
assert vaultwarden._k8s_find_pod_ip("apps", "app=vaultwarden") == "10.0.0.2"
|
||
|
|
assert vaultwarden._k8s_get_secret_value("apps", "secret", "token") == "admin-token"
|
||
|
|
|
||
|
|
no_condition_pod = {"items": [{"status": {"phase": "Running", "podIP": "10.0.0.3", "conditions": [None]}}]}
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: no_condition_pod)
|
||
|
|
assert vaultwarden._k8s_find_pod_ip("apps", "app=vaultwarden") == "10.0.0.3"
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: {"items": []})
|
||
|
|
with pytest.raises(RuntimeError, match="no vaultwarden pods"):
|
||
|
|
vaultwarden._k8s_find_pod_ip("apps", "app=vaultwarden")
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: {"items": [{"status": {"phase": "Running"}}]})
|
||
|
|
with pytest.raises(RuntimeError, match="no IP"):
|
||
|
|
vaultwarden._k8s_find_pod_ip("apps", "app=vaultwarden")
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: {"data": {}})
|
||
|
|
with pytest.raises(RuntimeError, match="secret key missing"):
|
||
|
|
vaultwarden._k8s_get_secret_value("apps", "secret", "token")
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: {"data": {"token": "bad-base64"}})
|
||
|
|
with pytest.raises(RuntimeError, match="failed to decode"):
|
||
|
|
vaultwarden._k8s_get_secret_value("apps", "secret", "token")
|
||
|
|
|
||
|
|
empty = base64.b64encode(b" ").decode("ascii")
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_json", lambda path: {"data": {"token": empty}})
|
||
|
|
with pytest.raises(RuntimeError, match="value empty"):
|
||
|
|
vaultwarden._k8s_get_secret_value("apps", "secret", "token")
|
||
|
|
|
||
|
|
|
||
|
|
def test_admin_session_cache_and_rate_limit(monkeypatch) -> None:
|
||
|
|
_reset_admin_state(monkeypatch)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_get_secret_value", lambda *args: "admin-token")
|
||
|
|
monkeypatch.setattr(vaultwarden.httpx, "Client", DummyClient)
|
||
|
|
monkeypatch.setattr(vaultwarden.time, "time", lambda: 100.0)
|
||
|
|
DummyClient.reset(DummyResponse({}, status_code=200))
|
||
|
|
|
||
|
|
session = vaultwarden._admin_session("http://vaultwarden")
|
||
|
|
assert vaultwarden._admin_session("http://vaultwarden") is session
|
||
|
|
|
||
|
|
DummyClient.reset(DummyResponse({}, status_code=200))
|
||
|
|
vaultwarden._admin_session("http://other")
|
||
|
|
assert DummyClient.closed == 1
|
||
|
|
|
||
|
|
class BadCloseSession:
|
||
|
|
def close(self) -> None:
|
||
|
|
raise RuntimeError("ignore")
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION", BadCloseSession())
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION_EXPIRES_AT", 999.0)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_SESSION_BASE_URL", "http://old")
|
||
|
|
DummyClient.reset(DummyResponse({}, status_code=200))
|
||
|
|
vaultwarden._admin_session("http://new")
|
||
|
|
|
||
|
|
_reset_admin_state(monkeypatch)
|
||
|
|
DummyClient.reset(DummyResponse({}, status_code=429))
|
||
|
|
with pytest.raises(RuntimeError, match="rate limited"):
|
||
|
|
vaultwarden._admin_session("http://vaultwarden")
|
||
|
|
with pytest.raises(RuntimeError, match="rate limited"):
|
||
|
|
vaultwarden._admin_session("http://vaultwarden")
|
||
|
|
|
||
|
|
|
||
|
|
def test_invite_user_success_and_idempotent_paths(monkeypatch) -> None:
|
||
|
|
_reset_admin_state(monkeypatch)
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_SERVICE_HOST", "vaultwarden.apps.svc:80")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_NAMESPACE", "apps")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_POD_LABEL", "app=vaultwarden")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_POD_PORT", 8080)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_find_pod_ip", lambda *args: "10.0.0.2")
|
||
|
|
|
||
|
|
class InviteSession:
|
||
|
|
def __init__(self, response):
|
||
|
|
self.response = response
|
||
|
|
|
||
|
|
def post(self, path, json=None):
|
||
|
|
return self.response
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: InviteSession(DummyResponse(status_code=200)))
|
||
|
|
assert vaultwarden.invite_user("alice@example.dev").status == "invited"
|
||
|
|
|
||
|
|
monkeypatch.setattr(
|
||
|
|
vaultwarden,
|
||
|
|
"_admin_session",
|
||
|
|
lambda base_url: InviteSession(DummyResponse(status_code=409, text="user already exists")),
|
||
|
|
)
|
||
|
|
result = vaultwarden.invite_user("alice@example.dev")
|
||
|
|
assert result.ok and result.status == "already_present"
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_RATE_LIMITED_UNTIL", 9999999999.0)
|
||
|
|
assert vaultwarden.invite_user("alice@example.dev").status == "rate_limited"
|
||
|
|
assert vaultwarden.invite_user("not-email").status == "invalid_email"
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_ADMIN_RATE_LIMITED_UNTIL", 0.0)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: InviteSession(DummyResponse(status_code=429)))
|
||
|
|
assert vaultwarden.invite_user("alice@example.dev").status == "rate_limited"
|
||
|
|
|
||
|
|
|
||
|
|
def test_invite_user_fallback_and_error_paths(monkeypatch) -> None:
|
||
|
|
_reset_admin_state(monkeypatch)
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_SERVICE_HOST", "vaultwarden.apps.svc:80")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_NAMESPACE", "apps")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_POD_LABEL", "app=vaultwarden")
|
||
|
|
monkeypatch.setattr(vaultwarden.settings, "VAULTWARDEN_POD_PORT", 8080)
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_find_pod_ip", lambda *args: "10.0.0.2")
|
||
|
|
|
||
|
|
class SequenceSession:
|
||
|
|
def __init__(self):
|
||
|
|
self.calls = 0
|
||
|
|
|
||
|
|
def post(self, path, json=None):
|
||
|
|
self.calls += 1
|
||
|
|
if self.calls == 1:
|
||
|
|
raise RuntimeError("service offline")
|
||
|
|
return DummyResponse(status_code=201)
|
||
|
|
|
||
|
|
session = SequenceSession()
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: session)
|
||
|
|
assert vaultwarden.invite_user("alice@example.dev").status == "invited"
|
||
|
|
|
||
|
|
class BadTextResponse:
|
||
|
|
status_code = 500
|
||
|
|
|
||
|
|
@property
|
||
|
|
def text(self):
|
||
|
|
raise RuntimeError("no body")
|
||
|
|
|
||
|
|
class BadTextSession:
|
||
|
|
def post(self, path, json=None):
|
||
|
|
return BadTextResponse()
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: BadTextSession())
|
||
|
|
result = vaultwarden.invite_user("alice@example.dev")
|
||
|
|
assert result.status == "error"
|
||
|
|
assert "status 500" in result.detail
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_k8s_find_pod_ip", lambda *args: (_ for _ in ()).throw(RuntimeError("no pod")))
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: (_ for _ in ()).throw(RuntimeError("rate limited")))
|
||
|
|
assert vaultwarden.invite_user("alice@example.dev").status == "rate_limited"
|
||
|
|
|
||
|
|
monkeypatch.setattr(vaultwarden, "_admin_session", lambda base_url: (_ for _ in ()).throw(RuntimeError("offline")))
|
||
|
|
result = vaultwarden.invite_user("alice@example.dev")
|
||
|
|
assert not result.ok
|
||
|
|
assert result.status == "error"
|