From 4f132a44cf6f35a407ef9dc27290f40d1ab7f5ce Mon Sep 17 00:00:00 2001 From: codex Date: Tue, 21 Apr 2026 08:10:56 -0300 Subject: [PATCH] test(bstein-home): cover access request onboarding --- .../tests/test_access_request_onboarding.py | 356 ++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 backend/tests/test_access_request_onboarding.py diff --git a/backend/tests/test_access_request_onboarding.py b/backend/tests/test_access_request_onboarding.py new file mode 100644 index 0000000..1628099 --- /dev/null +++ b/backend/tests/test_access_request_onboarding.py @@ -0,0 +1,356 @@ +from __future__ import annotations + +from contextlib import contextmanager +from types import SimpleNamespace +from typing import Any + +from flask import Flask + +from atlas_portal.routes.access_request_onboarding import register_access_request_onboarding + + +class DummyResult: + def __init__(self, row: dict[str, Any] | None = None) -> None: + self.row = row + + def fetchone(self) -> dict[str, Any] | None: + return self.row + + +class DummyConn: + def __init__(self, row: dict[str, Any] | None = None, *, fail: bool = False) -> None: + self.row = row + self.fail = fail + self.executed: list[tuple[str, object | None]] = [] + + def execute(self, query: str, params: object | None = None) -> DummyResult: + self.executed.append((query, params)) + if self.fail: + raise RuntimeError("database failed") + return DummyResult(self.row) + + +class DummyOidc: + def __init__(self, *, fail: bool = False, claims: dict[str, Any] | None = None) -> None: + self.fail = fail + self.claims = claims or {"preferred_username": "alice", "groups": ["/vaultwarden_grandfathered"]} + + def verify(self, token: str) -> dict[str, Any]: + if self.fail: + raise RuntimeError("bad token") + return self.claims + + +class DummyAdmin: + def __init__( + self, + *, + ready: bool = True, + user: dict[str, Any] | None = None, + full: dict[str, Any] | None = None, + fail_attrs: bool = False, + ) -> None: + self._ready = ready + self.user = user if user is not None else {"id": "user-1"} + self.full = full if full is not None else {"requiredActions": []} + self.fail_attrs = fail_attrs + self.attributes: list[tuple[str, str, str]] = [] + self.updated: list[tuple[str, dict[str, Any]]] = [] + + def ready(self) -> bool: + return self._ready + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + if self.fail_attrs: + raise RuntimeError("attribute update failed") + self.attributes.append((username, key, value)) + + def find_user(self, username: str) -> dict[str, Any] | None: + return self.user + + def get_user(self, user_id: str) -> dict[str, Any]: + return self.full + + def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None: + self.updated.append((user_id, payload)) + + +class DummyDeps: + ONBOARDING_STEPS = { + "profile_reviewed", + "vaultwarden_master_password", + "vaultwarden_store_temp_password", + "keycloak_password_rotated", + } + KEYCLOAK_MANAGED_STEPS = {"keycloak_password_rotated"} + ONBOARDING_STEP_PREREQUISITES = { + "vaultwarden_master_password": {"profile_reviewed"}, + "keycloak_password_rotated": {"profile_reviewed"}, + } + VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered" + _KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT = "keycloak_password_rotation_requested" + + def __init__(self, conn: DummyConn | None = None) -> None: + self.configured_value = True + self.conn = conn or DummyConn(self.request_row()) + self.oidc = DummyOidc() + self.admin = DummyAdmin() + self.completed_steps: set[str] = {"profile_reviewed"} + self.rotation_requested = True + self.request_rotation_fails = False + self.user_in_group = False + self.recovery_email = "alice@example.dev" + self.advanced_status = "ready" + + def request_row(self, **overrides: Any) -> dict[str, Any]: + row = { + "username": "alice", + "status": "awaiting_onboarding", + "approval_flags": [], + "contact_email": "alice@example.dev", + } + row.update(overrides) + return row + + def configured(self) -> bool: + return self.configured_value + + @contextmanager + def connect(self): + yield self.conn + + def oidc_client(self) -> DummyOidc: + return self.oidc + + def admin_client(self) -> DummyAdmin: + return self.admin + + def _normalize_status(self, status: str) -> str: + return "accounts_building" if status == "approved" else (status or "unknown") + + def _normalize_flag_list(self, raw: Any) -> set[str]: + return {item for item in raw if isinstance(item, str)} if isinstance(raw, list) else set() + + def _completed_onboarding_steps(self, conn: DummyConn, code: str, username: str) -> set[str]: + return self.completed_steps + + def _password_rotation_requested(self, conn: DummyConn, code: str) -> bool: + return self.rotation_requested + + def _request_keycloak_password_rotation(self, conn: DummyConn, code: str, username: str) -> None: + if self.request_rotation_fails: + raise RuntimeError("rotation request failed") + self.rotation_requested = True + + def _user_in_group(self, username: str, group: str) -> bool: + return self.user_in_group + + def _resolve_recovery_email(self, username: str, fallback: str) -> str: + return self.recovery_email or fallback + + def _advance_status(self, conn: DummyConn, code: str, username: str, status: str) -> str: + return self.advanced_status + + def _onboarding_payload(self, conn: DummyConn, code: str, username: str) -> dict[str, str]: + return {"code": code, "username": username} + + +def make_client(deps: DummyDeps): + app = Flask(__name__) + register_access_request_onboarding(app, deps) + return app.test_client() + + +def test_attest_preflight_token_and_lookup_paths() -> None: + deps = DummyDeps() + client = make_client(deps) + + deps.configured_value = False + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 503 + deps.configured_value = True + + assert client.post("/api/access/request/onboarding/attest", json={"step": "profile_reviewed"}).status_code == 400 + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "missing"}).status_code == 400 + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "keycloak_password_rotated"}).status_code == 400 + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "profile_reviewed"}, + headers={"Authorization": "bad"}, + ).status_code == 401 + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "profile_reviewed"}, + headers={"Authorization": "Bearer "}, + ).status_code == 401 + deps.oidc = DummyOidc(fail=True) + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "profile_reviewed"}, + headers={"Authorization": "Bearer token"}, + ).status_code == 401 + deps.oidc = DummyOidc(claims={"preferred_username": "other", "groups": []}) + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "profile_reviewed"}, + headers={"Authorization": "Bearer token"}, + ).status_code == 403 + + deps.oidc = DummyOidc() + deps.conn = DummyConn(None) + assert client.post("/api/access/request/onboarding/attest", json={"code": "missing", "step": "profile_reviewed"}).status_code == 404 + + deps.conn = DummyConn(deps.request_row(status="pending")) + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 409 + + +def test_attest_prerequisites_rotation_and_manual_clear_paths() -> None: + deps = DummyDeps() + client = make_client(deps) + + deps.completed_steps = set() + response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_master_password"}) + assert response.status_code == 409 + assert response.get_json()["blocked_by"] == ["profile_reviewed"] + + deps.completed_steps = {"profile_reviewed"} + deps.rotation_requested = False + deps.request_rotation_fails = True + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_store_temp_password"}).status_code == 502 + + deps.request_rotation_fails = False + response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed", "completed": False}) + assert response.status_code == 200 + assert any("DELETE FROM access_request_onboarding_steps" in query for query, _ in deps.conn.executed) + + deps.conn = DummyConn(deps.request_row(), fail=True) + assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 502 + + +def test_attest_vaultwarden_claim_and_attribute_paths() -> None: + deps = DummyDeps() + client = make_client(deps) + + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True}, + ).status_code == 401 + + deps.oidc = DummyOidc(claims={"preferred_username": "alice", "groups": []}) + deps.completed_steps = {"profile_reviewed"} + deps.user_in_group = False + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True}, + headers={"Authorization": "Bearer token"}, + ).status_code == 403 + + deps.conn = DummyConn(deps.request_row(approval_flags=[deps.VAULTWARDEN_GRANDFATHERED_FLAG])) + deps.admin = DummyAdmin(ready=False) + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True}, + headers={"Authorization": "Bearer token"}, + ).status_code == 503 + + deps.admin = DummyAdmin() + deps.recovery_email = "" + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True}, + headers={"Authorization": "Bearer token"}, + ).status_code == 200 + + deps.recovery_email = "recovery@example.dev" + response = client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True}, + headers={"Authorization": "Bearer token"}, + ) + assert response.status_code == 200 + assert ("alice", "vaultwarden_email", "recovery@example.dev") in deps.admin.attributes + assert any("INSERT INTO access_request_onboarding_steps" in query for query, _ in deps.conn.executed) + + deps.admin = DummyAdmin() + response = client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password"}, + ) + assert response.status_code == 200 + assert ("alice", "vaultwarden_status", "already_present") in deps.admin.attributes + + deps.admin = DummyAdmin(fail_attrs=True) + assert client.post( + "/api/access/request/onboarding/attest", + json={"code": "code", "step": "vaultwarden_master_password"}, + ).status_code == 502 + + +def test_keycloak_rotate_preflight_and_lookup_paths() -> None: + deps = DummyDeps() + client = make_client(deps) + + deps.configured_value = False + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503 + deps.configured_value = True + + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={}).status_code == 400 + assert client.post( + "/api/access/request/onboarding/keycloak-password-rotate", + json={"code": "code"}, + headers={"Authorization": "bad"}, + ).status_code == 401 + deps.oidc = DummyOidc(fail=True) + assert client.post( + "/api/access/request/onboarding/keycloak-password-rotate", + json={"code": "code"}, + headers={"Authorization": "Bearer token"}, + ).status_code == 401 + deps.oidc = DummyOidc() + + deps.admin = DummyAdmin(ready=False) + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503 + deps.admin = DummyAdmin() + + deps.conn = DummyConn(None) + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "missing"}).status_code == 404 + + deps.conn = DummyConn(deps.request_row()) + deps.oidc = DummyOidc(claims={"preferred_username": "other"}) + assert client.post( + "/api/access/request/onboarding/keycloak-password-rotate", + json={"code": "code"}, + headers={"Authorization": "Bearer token"}, + ).status_code == 403 + + deps.oidc = DummyOidc() + deps.conn = DummyConn(deps.request_row(status="pending")) + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409 + + deps.conn = DummyConn(deps.request_row()) + deps.completed_steps = set() + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409 + + +def test_keycloak_rotate_success_and_error_paths() -> None: + deps = DummyDeps() + deps.rotation_requested = False + deps.completed_steps = {"profile_reviewed"} + deps.admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"]}) + client = make_client(deps) + + response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}) + assert response.status_code == 200 + assert deps.admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})] + assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in deps.conn.executed) + + deps.rotation_requested = True + deps.admin = DummyAdmin(full={"requiredActions": []}) + response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}) + assert response.status_code == 200 + assert deps.admin.updated == [] + + deps.admin = DummyAdmin(user={}) + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409 + + deps.conn = DummyConn(deps.request_row(), fail=True) + assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 502