from __future__ import annotations import types from ariadne.services import vault as vault_module from ariadne.services.vault import VaultService, _build_policy class DummyResponse: def __init__(self, payload=None, status_code=200): self._payload = payload or {} self.status_code = status_code self.text = "" def json(self): return self._payload def raise_for_status(self): if self.status_code >= 400: raise RuntimeError("status error") def test_build_policy() -> None: policy = _build_policy("foo/*", "bar/*") assert "kv/data/atlas/foo/*" in policy assert "kv/data/atlas/bar/*" in policy def test_vault_sync_k8s_auth_success(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( vault_addr="http://vault", vault_token="token", vault_k8s_role="vault", vault_k8s_role_ttl="1h", vault_k8s_token_reviewer_jwt="jwt", vault_k8s_token_reviewer_jwt_file="", vault_oidc_discovery_url="", vault_oidc_client_id="", vault_oidc_client_secret="", vault_oidc_default_role="admin", vault_oidc_scopes="", vault_oidc_user_claim="", vault_oidc_groups_claim="", vault_oidc_token_policies="", vault_oidc_admin_group="", vault_oidc_admin_policies="", vault_oidc_dev_group="", vault_oidc_dev_policies="", vault_oidc_user_group="", vault_oidc_user_policies="", vault_oidc_redirect_uris="", vault_oidc_bound_audiences="", vault_oidc_bound_claims_type="", k8s_api_timeout_sec=5.0, ) monkeypatch.setattr(vault_module, "settings", dummy_settings) calls: list[tuple[str, str]] = [] def fake_request(self, method: str, path: str, json=None): calls.append((method, path)) if path == "/v1/sys/health": return DummyResponse({"initialized": True, "sealed": False}) if path == "/v1/sys/auth": return DummyResponse({}) return DummyResponse({}) monkeypatch.setattr(vault_module.VaultClient, "request", fake_request) svc = VaultService() result = svc.sync_k8s_auth() assert result["status"] == "ok" assert any(path == "/v1/auth/kubernetes/config" for _, path in calls) def test_vault_sync_oidc_success(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( vault_addr="http://vault", vault_token="token", vault_k8s_role="vault", vault_k8s_role_ttl="1h", vault_k8s_token_reviewer_jwt="jwt", vault_k8s_token_reviewer_jwt_file="", vault_oidc_discovery_url="http://oidc", vault_oidc_client_id="client", vault_oidc_client_secret="secret", vault_oidc_default_role="admin", vault_oidc_scopes="openid profile", vault_oidc_user_claim="preferred_username", vault_oidc_groups_claim="groups", vault_oidc_token_policies="", vault_oidc_admin_group="admin", vault_oidc_admin_policies="default,vault-admin", vault_oidc_dev_group="dev", vault_oidc_dev_policies="default,dev-kv", vault_oidc_user_group="", vault_oidc_user_policies="", vault_oidc_redirect_uris="https://secret.bstein.dev/ui/vault/auth/oidc/oidc/callback", vault_oidc_bound_audiences="", vault_oidc_bound_claims_type="string", k8s_api_timeout_sec=5.0, ) monkeypatch.setattr(vault_module, "settings", dummy_settings) def fake_request(self, method: str, path: str, json=None): if path == "/v1/sys/health": return DummyResponse({"initialized": True, "sealed": False}) if path == "/v1/sys/auth": return DummyResponse({}) return DummyResponse({}) monkeypatch.setattr(vault_module.VaultClient, "request", fake_request) svc = VaultService() result = svc.sync_oidc() assert result["status"] == "ok" def test_vault_sync_oidc_missing_discovery(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( vault_addr="http://vault", vault_token="token", vault_k8s_role="vault", vault_k8s_role_ttl="1h", vault_k8s_token_reviewer_jwt="jwt", vault_k8s_token_reviewer_jwt_file="", vault_oidc_discovery_url="", vault_oidc_client_id="client", vault_oidc_client_secret="secret", vault_oidc_default_role="admin", vault_oidc_scopes="openid profile", vault_oidc_user_claim="preferred_username", vault_oidc_groups_claim="groups", vault_oidc_token_policies="", vault_oidc_admin_group="admin", vault_oidc_admin_policies="default,vault-admin", vault_oidc_dev_group="dev", vault_oidc_dev_policies="default,dev-kv", vault_oidc_user_group="", vault_oidc_user_policies="", vault_oidc_redirect_uris="https://secret.bstein.dev/ui/vault/auth/oidc/oidc/callback", vault_oidc_bound_audiences="", vault_oidc_bound_claims_type="string", k8s_api_timeout_sec=5.0, ) monkeypatch.setattr(vault_module, "settings", dummy_settings) monkeypatch.setattr(vault_module.VaultClient, "request", lambda *args, **kwargs: DummyResponse({"initialized": True, "sealed": False})) svc = VaultService() result = svc.sync_oidc() assert result["status"] == "error" def test_vault_ensure_token_login(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( vault_addr="http://vault", vault_token="", vault_k8s_role="vault", vault_k8s_token_reviewer_jwt="jwt", vault_k8s_token_reviewer_jwt_file="", vault_oidc_discovery_url="", vault_oidc_client_id="", vault_oidc_client_secret="", vault_oidc_default_role="admin", vault_oidc_scopes="", vault_oidc_user_claim="", vault_oidc_groups_claim="", vault_oidc_token_policies="", vault_oidc_admin_group="", vault_oidc_admin_policies="", vault_oidc_dev_group="", vault_oidc_dev_policies="", vault_oidc_user_group="", vault_oidc_user_policies="", vault_oidc_redirect_uris="", vault_oidc_bound_audiences="", vault_oidc_bound_claims_type="", k8s_api_timeout_sec=5.0, ) monkeypatch.setattr(vault_module, "settings", dummy_settings) def fake_post(_url, json=None, timeout=None): return DummyResponse({"auth": {"client_token": "tok"}}) monkeypatch.setattr(vault_module.httpx, "post", fake_post) svc = VaultService() assert svc._ensure_token() == "tok"