test(bstein-home): cover access request onboarding
This commit is contained in:
parent
e1dde0bf43
commit
4f132a44cf
356
backend/tests/test_access_request_onboarding.py
Normal file
356
backend/tests/test_access_request_onboarding.py
Normal file
@ -0,0 +1,356 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
from flask import Flask
|
||||
|
||||
from atlas_portal.routes.access_request_onboarding import register_access_request_onboarding
|
||||
|
||||
|
||||
class DummyResult:
|
||||
def __init__(self, row: dict[str, Any] | None = None) -> None:
|
||||
self.row = row
|
||||
|
||||
def fetchone(self) -> dict[str, Any] | None:
|
||||
return self.row
|
||||
|
||||
|
||||
class DummyConn:
|
||||
def __init__(self, row: dict[str, Any] | None = None, *, fail: bool = False) -> None:
|
||||
self.row = row
|
||||
self.fail = fail
|
||||
self.executed: list[tuple[str, object | None]] = []
|
||||
|
||||
def execute(self, query: str, params: object | None = None) -> DummyResult:
|
||||
self.executed.append((query, params))
|
||||
if self.fail:
|
||||
raise RuntimeError("database failed")
|
||||
return DummyResult(self.row)
|
||||
|
||||
|
||||
class DummyOidc:
|
||||
def __init__(self, *, fail: bool = False, claims: dict[str, Any] | None = None) -> None:
|
||||
self.fail = fail
|
||||
self.claims = claims or {"preferred_username": "alice", "groups": ["/vaultwarden_grandfathered"]}
|
||||
|
||||
def verify(self, token: str) -> dict[str, Any]:
|
||||
if self.fail:
|
||||
raise RuntimeError("bad token")
|
||||
return self.claims
|
||||
|
||||
|
||||
class DummyAdmin:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
ready: bool = True,
|
||||
user: dict[str, Any] | None = None,
|
||||
full: dict[str, Any] | None = None,
|
||||
fail_attrs: bool = False,
|
||||
) -> None:
|
||||
self._ready = ready
|
||||
self.user = user if user is not None else {"id": "user-1"}
|
||||
self.full = full if full is not None else {"requiredActions": []}
|
||||
self.fail_attrs = fail_attrs
|
||||
self.attributes: list[tuple[str, str, str]] = []
|
||||
self.updated: list[tuple[str, dict[str, Any]]] = []
|
||||
|
||||
def ready(self) -> bool:
|
||||
return self._ready
|
||||
|
||||
def set_user_attribute(self, username: str, key: str, value: str) -> None:
|
||||
if self.fail_attrs:
|
||||
raise RuntimeError("attribute update failed")
|
||||
self.attributes.append((username, key, value))
|
||||
|
||||
def find_user(self, username: str) -> dict[str, Any] | None:
|
||||
return self.user
|
||||
|
||||
def get_user(self, user_id: str) -> dict[str, Any]:
|
||||
return self.full
|
||||
|
||||
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
|
||||
self.updated.append((user_id, payload))
|
||||
|
||||
|
||||
class DummyDeps:
|
||||
ONBOARDING_STEPS = {
|
||||
"profile_reviewed",
|
||||
"vaultwarden_master_password",
|
||||
"vaultwarden_store_temp_password",
|
||||
"keycloak_password_rotated",
|
||||
}
|
||||
KEYCLOAK_MANAGED_STEPS = {"keycloak_password_rotated"}
|
||||
ONBOARDING_STEP_PREREQUISITES = {
|
||||
"vaultwarden_master_password": {"profile_reviewed"},
|
||||
"keycloak_password_rotated": {"profile_reviewed"},
|
||||
}
|
||||
VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered"
|
||||
_KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT = "keycloak_password_rotation_requested"
|
||||
|
||||
def __init__(self, conn: DummyConn | None = None) -> None:
|
||||
self.configured_value = True
|
||||
self.conn = conn or DummyConn(self.request_row())
|
||||
self.oidc = DummyOidc()
|
||||
self.admin = DummyAdmin()
|
||||
self.completed_steps: set[str] = {"profile_reviewed"}
|
||||
self.rotation_requested = True
|
||||
self.request_rotation_fails = False
|
||||
self.user_in_group = False
|
||||
self.recovery_email = "alice@example.dev"
|
||||
self.advanced_status = "ready"
|
||||
|
||||
def request_row(self, **overrides: Any) -> dict[str, Any]:
|
||||
row = {
|
||||
"username": "alice",
|
||||
"status": "awaiting_onboarding",
|
||||
"approval_flags": [],
|
||||
"contact_email": "alice@example.dev",
|
||||
}
|
||||
row.update(overrides)
|
||||
return row
|
||||
|
||||
def configured(self) -> bool:
|
||||
return self.configured_value
|
||||
|
||||
@contextmanager
|
||||
def connect(self):
|
||||
yield self.conn
|
||||
|
||||
def oidc_client(self) -> DummyOidc:
|
||||
return self.oidc
|
||||
|
||||
def admin_client(self) -> DummyAdmin:
|
||||
return self.admin
|
||||
|
||||
def _normalize_status(self, status: str) -> str:
|
||||
return "accounts_building" if status == "approved" else (status or "unknown")
|
||||
|
||||
def _normalize_flag_list(self, raw: Any) -> set[str]:
|
||||
return {item for item in raw if isinstance(item, str)} if isinstance(raw, list) else set()
|
||||
|
||||
def _completed_onboarding_steps(self, conn: DummyConn, code: str, username: str) -> set[str]:
|
||||
return self.completed_steps
|
||||
|
||||
def _password_rotation_requested(self, conn: DummyConn, code: str) -> bool:
|
||||
return self.rotation_requested
|
||||
|
||||
def _request_keycloak_password_rotation(self, conn: DummyConn, code: str, username: str) -> None:
|
||||
if self.request_rotation_fails:
|
||||
raise RuntimeError("rotation request failed")
|
||||
self.rotation_requested = True
|
||||
|
||||
def _user_in_group(self, username: str, group: str) -> bool:
|
||||
return self.user_in_group
|
||||
|
||||
def _resolve_recovery_email(self, username: str, fallback: str) -> str:
|
||||
return self.recovery_email or fallback
|
||||
|
||||
def _advance_status(self, conn: DummyConn, code: str, username: str, status: str) -> str:
|
||||
return self.advanced_status
|
||||
|
||||
def _onboarding_payload(self, conn: DummyConn, code: str, username: str) -> dict[str, str]:
|
||||
return {"code": code, "username": username}
|
||||
|
||||
|
||||
def make_client(deps: DummyDeps):
|
||||
app = Flask(__name__)
|
||||
register_access_request_onboarding(app, deps)
|
||||
return app.test_client()
|
||||
|
||||
|
||||
def test_attest_preflight_token_and_lookup_paths() -> None:
|
||||
deps = DummyDeps()
|
||||
client = make_client(deps)
|
||||
|
||||
deps.configured_value = False
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 503
|
||||
deps.configured_value = True
|
||||
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"step": "profile_reviewed"}).status_code == 400
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "missing"}).status_code == 400
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "keycloak_password_rotated"}).status_code == 400
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "profile_reviewed"},
|
||||
headers={"Authorization": "bad"},
|
||||
).status_code == 401
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "profile_reviewed"},
|
||||
headers={"Authorization": "Bearer "},
|
||||
).status_code == 401
|
||||
deps.oidc = DummyOidc(fail=True)
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "profile_reviewed"},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 401
|
||||
deps.oidc = DummyOidc(claims={"preferred_username": "other", "groups": []})
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "profile_reviewed"},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 403
|
||||
|
||||
deps.oidc = DummyOidc()
|
||||
deps.conn = DummyConn(None)
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "missing", "step": "profile_reviewed"}).status_code == 404
|
||||
|
||||
deps.conn = DummyConn(deps.request_row(status="pending"))
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 409
|
||||
|
||||
|
||||
def test_attest_prerequisites_rotation_and_manual_clear_paths() -> None:
|
||||
deps = DummyDeps()
|
||||
client = make_client(deps)
|
||||
|
||||
deps.completed_steps = set()
|
||||
response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_master_password"})
|
||||
assert response.status_code == 409
|
||||
assert response.get_json()["blocked_by"] == ["profile_reviewed"]
|
||||
|
||||
deps.completed_steps = {"profile_reviewed"}
|
||||
deps.rotation_requested = False
|
||||
deps.request_rotation_fails = True
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_store_temp_password"}).status_code == 502
|
||||
|
||||
deps.request_rotation_fails = False
|
||||
response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed", "completed": False})
|
||||
assert response.status_code == 200
|
||||
assert any("DELETE FROM access_request_onboarding_steps" in query for query, _ in deps.conn.executed)
|
||||
|
||||
deps.conn = DummyConn(deps.request_row(), fail=True)
|
||||
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 502
|
||||
|
||||
|
||||
def test_attest_vaultwarden_claim_and_attribute_paths() -> None:
|
||||
deps = DummyDeps()
|
||||
client = make_client(deps)
|
||||
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
|
||||
).status_code == 401
|
||||
|
||||
deps.oidc = DummyOidc(claims={"preferred_username": "alice", "groups": []})
|
||||
deps.completed_steps = {"profile_reviewed"}
|
||||
deps.user_in_group = False
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 403
|
||||
|
||||
deps.conn = DummyConn(deps.request_row(approval_flags=[deps.VAULTWARDEN_GRANDFATHERED_FLAG]))
|
||||
deps.admin = DummyAdmin(ready=False)
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 503
|
||||
|
||||
deps.admin = DummyAdmin()
|
||||
deps.recovery_email = ""
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 200
|
||||
|
||||
deps.recovery_email = "recovery@example.dev"
|
||||
response = client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert ("alice", "vaultwarden_email", "recovery@example.dev") in deps.admin.attributes
|
||||
assert any("INSERT INTO access_request_onboarding_steps" in query for query, _ in deps.conn.executed)
|
||||
|
||||
deps.admin = DummyAdmin()
|
||||
response = client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert ("alice", "vaultwarden_status", "already_present") in deps.admin.attributes
|
||||
|
||||
deps.admin = DummyAdmin(fail_attrs=True)
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/attest",
|
||||
json={"code": "code", "step": "vaultwarden_master_password"},
|
||||
).status_code == 502
|
||||
|
||||
|
||||
def test_keycloak_rotate_preflight_and_lookup_paths() -> None:
|
||||
deps = DummyDeps()
|
||||
client = make_client(deps)
|
||||
|
||||
deps.configured_value = False
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503
|
||||
deps.configured_value = True
|
||||
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={}).status_code == 400
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/keycloak-password-rotate",
|
||||
json={"code": "code"},
|
||||
headers={"Authorization": "bad"},
|
||||
).status_code == 401
|
||||
deps.oidc = DummyOidc(fail=True)
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/keycloak-password-rotate",
|
||||
json={"code": "code"},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 401
|
||||
deps.oidc = DummyOidc()
|
||||
|
||||
deps.admin = DummyAdmin(ready=False)
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503
|
||||
deps.admin = DummyAdmin()
|
||||
|
||||
deps.conn = DummyConn(None)
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "missing"}).status_code == 404
|
||||
|
||||
deps.conn = DummyConn(deps.request_row())
|
||||
deps.oidc = DummyOidc(claims={"preferred_username": "other"})
|
||||
assert client.post(
|
||||
"/api/access/request/onboarding/keycloak-password-rotate",
|
||||
json={"code": "code"},
|
||||
headers={"Authorization": "Bearer token"},
|
||||
).status_code == 403
|
||||
|
||||
deps.oidc = DummyOidc()
|
||||
deps.conn = DummyConn(deps.request_row(status="pending"))
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
|
||||
|
||||
deps.conn = DummyConn(deps.request_row())
|
||||
deps.completed_steps = set()
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
|
||||
|
||||
|
||||
def test_keycloak_rotate_success_and_error_paths() -> None:
|
||||
deps = DummyDeps()
|
||||
deps.rotation_requested = False
|
||||
deps.completed_steps = {"profile_reviewed"}
|
||||
deps.admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"]})
|
||||
client = make_client(deps)
|
||||
|
||||
response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"})
|
||||
assert response.status_code == 200
|
||||
assert deps.admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})]
|
||||
assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in deps.conn.executed)
|
||||
|
||||
deps.rotation_requested = True
|
||||
deps.admin = DummyAdmin(full={"requiredActions": []})
|
||||
response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"})
|
||||
assert response.status_code == 200
|
||||
assert deps.admin.updated == []
|
||||
|
||||
deps.admin = DummyAdmin(user={})
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
|
||||
|
||||
deps.conn = DummyConn(deps.request_row(), fail=True)
|
||||
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 502
|
||||
Loading…
x
Reference in New Issue
Block a user