ariadne/tests/test_vault.py

189 lines
6.5 KiB
Python
Raw Normal View History

from __future__ import annotations
import types
from ariadne.services import vault as vault_module
from ariadne.services.vault import VaultService, _build_policy
class DummyResponse:
def __init__(self, payload=None, status_code=200):
self._payload = payload or {}
self.status_code = status_code
self.text = ""
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError("status error")
def test_build_policy() -> None:
policy = _build_policy("foo/*", "bar/*")
assert "kv/data/atlas/foo/*" in policy
assert "kv/data/atlas/bar/*" in policy
def test_vault_sync_k8s_auth_success(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="token",
vault_k8s_role="vault",
vault_k8s_role_ttl="1h",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
vault_oidc_discovery_url="",
vault_oidc_client_id="",
vault_oidc_client_secret="",
vault_oidc_default_role="admin",
vault_oidc_scopes="",
vault_oidc_user_claim="",
vault_oidc_groups_claim="",
vault_oidc_token_policies="",
vault_oidc_admin_group="",
vault_oidc_admin_policies="",
vault_oidc_dev_group="",
vault_oidc_dev_policies="",
vault_oidc_user_group="",
vault_oidc_user_policies="",
vault_oidc_redirect_uris="",
vault_oidc_bound_audiences="",
vault_oidc_bound_claims_type="",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
calls: list[tuple[str, str]] = []
def fake_request(self, method: str, path: str, json=None):
calls.append((method, path))
if path == "/v1/sys/health":
return DummyResponse({"initialized": True, "sealed": False})
if path == "/v1/sys/auth":
return DummyResponse({})
return DummyResponse({})
monkeypatch.setattr(vault_module.VaultClient, "request", fake_request)
svc = VaultService()
result = svc.sync_k8s_auth()
assert result["status"] == "ok"
assert any(path == "/v1/auth/kubernetes/config" for _, path in calls)
def test_vault_sync_oidc_success(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="token",
vault_k8s_role="vault",
vault_k8s_role_ttl="1h",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
vault_oidc_discovery_url="http://oidc",
vault_oidc_client_id="client",
vault_oidc_client_secret="secret",
vault_oidc_default_role="admin",
vault_oidc_scopes="openid profile",
vault_oidc_user_claim="preferred_username",
vault_oidc_groups_claim="groups",
vault_oidc_token_policies="",
vault_oidc_admin_group="admin",
vault_oidc_admin_policies="default,vault-admin",
vault_oidc_dev_group="dev",
vault_oidc_dev_policies="default,dev-kv",
vault_oidc_user_group="",
vault_oidc_user_policies="",
vault_oidc_redirect_uris="https://secret.bstein.dev/ui/vault/auth/oidc/oidc/callback",
vault_oidc_bound_audiences="",
vault_oidc_bound_claims_type="string",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
def fake_request(self, method: str, path: str, json=None):
if path == "/v1/sys/health":
return DummyResponse({"initialized": True, "sealed": False})
if path == "/v1/sys/auth":
return DummyResponse({})
return DummyResponse({})
monkeypatch.setattr(vault_module.VaultClient, "request", fake_request)
svc = VaultService()
result = svc.sync_oidc()
assert result["status"] == "ok"
def test_vault_sync_oidc_missing_discovery(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="token",
vault_k8s_role="vault",
vault_k8s_role_ttl="1h",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
vault_oidc_discovery_url="",
vault_oidc_client_id="client",
vault_oidc_client_secret="secret",
vault_oidc_default_role="admin",
vault_oidc_scopes="openid profile",
vault_oidc_user_claim="preferred_username",
vault_oidc_groups_claim="groups",
vault_oidc_token_policies="",
vault_oidc_admin_group="admin",
vault_oidc_admin_policies="default,vault-admin",
vault_oidc_dev_group="dev",
vault_oidc_dev_policies="default,dev-kv",
vault_oidc_user_group="",
vault_oidc_user_policies="",
vault_oidc_redirect_uris="https://secret.bstein.dev/ui/vault/auth/oidc/oidc/callback",
vault_oidc_bound_audiences="",
vault_oidc_bound_claims_type="string",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
monkeypatch.setattr(vault_module.VaultClient, "request", lambda *args, **kwargs: DummyResponse({"initialized": True, "sealed": False}))
svc = VaultService()
result = svc.sync_oidc()
assert result["status"] == "error"
def test_vault_ensure_token_login(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
vault_addr="http://vault",
vault_token="",
vault_k8s_role="vault",
vault_k8s_token_reviewer_jwt="jwt",
vault_k8s_token_reviewer_jwt_file="",
vault_oidc_discovery_url="",
vault_oidc_client_id="",
vault_oidc_client_secret="",
vault_oidc_default_role="admin",
vault_oidc_scopes="",
vault_oidc_user_claim="",
vault_oidc_groups_claim="",
vault_oidc_token_policies="",
vault_oidc_admin_group="",
vault_oidc_admin_policies="",
vault_oidc_dev_group="",
vault_oidc_dev_policies="",
vault_oidc_user_group="",
vault_oidc_user_policies="",
vault_oidc_redirect_uris="",
vault_oidc_bound_audiences="",
vault_oidc_bound_claims_type="",
k8s_api_timeout_sec=5.0,
)
monkeypatch.setattr(vault_module, "settings", dummy_settings)
def fake_post(_url, json=None, timeout=None):
return DummyResponse({"auth": {"client_token": "tok"}})
monkeypatch.setattr(vault_module.httpx, "post", fake_post)
svc = VaultService()
assert svc._ensure_token() == "tok"