bstein-dev-home/backend/tests/test_access_request_onboarding.py

357 lines
14 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from contextlib import contextmanager
from types import SimpleNamespace
from typing import Any
from flask import Flask
from atlas_portal.routes.access_request_onboarding import register_access_request_onboarding
class DummyResult:
def __init__(self, row: dict[str, Any] | None = None) -> None:
self.row = row
def fetchone(self) -> dict[str, Any] | None:
return self.row
class DummyConn:
def __init__(self, row: dict[str, Any] | None = None, *, fail: bool = False) -> None:
self.row = row
self.fail = fail
self.executed: list[tuple[str, object | None]] = []
def execute(self, query: str, params: object | None = None) -> DummyResult:
self.executed.append((query, params))
if self.fail:
raise RuntimeError("database failed")
return DummyResult(self.row)
class DummyOidc:
def __init__(self, *, fail: bool = False, claims: dict[str, Any] | None = None) -> None:
self.fail = fail
self.claims = claims or {"preferred_username": "alice", "groups": ["/vaultwarden_grandfathered"]}
def verify(self, token: str) -> dict[str, Any]:
if self.fail:
raise RuntimeError("bad token")
return self.claims
class DummyAdmin:
def __init__(
self,
*,
ready: bool = True,
user: dict[str, Any] | None = None,
full: dict[str, Any] | None = None,
fail_attrs: bool = False,
) -> None:
self._ready = ready
self.user = user if user is not None else {"id": "user-1"}
self.full = full if full is not None else {"requiredActions": []}
self.fail_attrs = fail_attrs
self.attributes: list[tuple[str, str, str]] = []
self.updated: list[tuple[str, dict[str, Any]]] = []
def ready(self) -> bool:
return self._ready
def set_user_attribute(self, username: str, key: str, value: str) -> None:
if self.fail_attrs:
raise RuntimeError("attribute update failed")
self.attributes.append((username, key, value))
def find_user(self, username: str) -> dict[str, Any] | None:
return self.user
def get_user(self, user_id: str) -> dict[str, Any]:
return self.full
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
self.updated.append((user_id, payload))
class DummyDeps:
ONBOARDING_STEPS = {
"profile_reviewed",
"vaultwarden_master_password",
"vaultwarden_store_temp_password",
"keycloak_password_rotated",
}
KEYCLOAK_MANAGED_STEPS = {"keycloak_password_rotated"}
ONBOARDING_STEP_PREREQUISITES = {
"vaultwarden_master_password": {"profile_reviewed"},
"keycloak_password_rotated": {"profile_reviewed"},
}
VAULTWARDEN_GRANDFATHERED_FLAG = "vaultwarden_grandfathered"
_KEYCLOAK_PASSWORD_ROTATION_REQUESTED_ARTIFACT = "keycloak_password_rotation_requested"
def __init__(self, conn: DummyConn | None = None) -> None:
self.configured_value = True
self.conn = conn or DummyConn(self.request_row())
self.oidc = DummyOidc()
self.admin = DummyAdmin()
self.completed_steps: set[str] = {"profile_reviewed"}
self.rotation_requested = True
self.request_rotation_fails = False
self.user_in_group = False
self.recovery_email = "alice@example.dev"
self.advanced_status = "ready"
def request_row(self, **overrides: Any) -> dict[str, Any]:
row = {
"username": "alice",
"status": "awaiting_onboarding",
"approval_flags": [],
"contact_email": "alice@example.dev",
}
row.update(overrides)
return row
def configured(self) -> bool:
return self.configured_value
@contextmanager
def connect(self):
yield self.conn
def oidc_client(self) -> DummyOidc:
return self.oidc
def admin_client(self) -> DummyAdmin:
return self.admin
def _normalize_status(self, status: str) -> str:
return "accounts_building" if status == "approved" else (status or "unknown")
def _normalize_flag_list(self, raw: Any) -> set[str]:
return {item for item in raw if isinstance(item, str)} if isinstance(raw, list) else set()
def _completed_onboarding_steps(self, conn: DummyConn, code: str, username: str) -> set[str]:
return self.completed_steps
def _password_rotation_requested(self, conn: DummyConn, code: str) -> bool:
return self.rotation_requested
def _request_keycloak_password_rotation(self, conn: DummyConn, code: str, username: str) -> None:
if self.request_rotation_fails:
raise RuntimeError("rotation request failed")
self.rotation_requested = True
def _user_in_group(self, username: str, group: str) -> bool:
return self.user_in_group
def _resolve_recovery_email(self, username: str, fallback: str) -> str:
return self.recovery_email or fallback
def _advance_status(self, conn: DummyConn, code: str, username: str, status: str) -> str:
return self.advanced_status
def _onboarding_payload(self, conn: DummyConn, code: str, username: str) -> dict[str, str]:
return {"code": code, "username": username}
def make_client(deps: DummyDeps):
app = Flask(__name__)
register_access_request_onboarding(app, deps)
return app.test_client()
def test_attest_preflight_token_and_lookup_paths() -> None:
deps = DummyDeps()
client = make_client(deps)
deps.configured_value = False
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 503
deps.configured_value = True
assert client.post("/api/access/request/onboarding/attest", json={"step": "profile_reviewed"}).status_code == 400
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "missing"}).status_code == 400
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "keycloak_password_rotated"}).status_code == 400
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "profile_reviewed"},
headers={"Authorization": "bad"},
).status_code == 401
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "profile_reviewed"},
headers={"Authorization": "Bearer "},
).status_code == 401
deps.oidc = DummyOidc(fail=True)
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "profile_reviewed"},
headers={"Authorization": "Bearer token"},
).status_code == 401
deps.oidc = DummyOidc(claims={"preferred_username": "other", "groups": []})
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "profile_reviewed"},
headers={"Authorization": "Bearer token"},
).status_code == 403
deps.oidc = DummyOidc()
deps.conn = DummyConn(None)
assert client.post("/api/access/request/onboarding/attest", json={"code": "missing", "step": "profile_reviewed"}).status_code == 404
deps.conn = DummyConn(deps.request_row(status="pending"))
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 409
def test_attest_prerequisites_rotation_and_manual_clear_paths() -> None:
deps = DummyDeps()
client = make_client(deps)
deps.completed_steps = set()
response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_master_password"})
assert response.status_code == 409
assert response.get_json()["blocked_by"] == ["profile_reviewed"]
deps.completed_steps = {"profile_reviewed"}
deps.rotation_requested = False
deps.request_rotation_fails = True
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "vaultwarden_store_temp_password"}).status_code == 502
deps.request_rotation_fails = False
response = client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed", "completed": False})
assert response.status_code == 200
assert any("DELETE FROM access_request_onboarding_steps" in query for query, _ in deps.conn.executed)
deps.conn = DummyConn(deps.request_row(), fail=True)
assert client.post("/api/access/request/onboarding/attest", json={"code": "code", "step": "profile_reviewed"}).status_code == 502
def test_attest_vaultwarden_claim_and_attribute_paths() -> None:
deps = DummyDeps()
client = make_client(deps)
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
).status_code == 401
deps.oidc = DummyOidc(claims={"preferred_username": "alice", "groups": []})
deps.completed_steps = {"profile_reviewed"}
deps.user_in_group = False
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
headers={"Authorization": "Bearer token"},
).status_code == 403
deps.conn = DummyConn(deps.request_row(approval_flags=[deps.VAULTWARDEN_GRANDFATHERED_FLAG]))
deps.admin = DummyAdmin(ready=False)
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
headers={"Authorization": "Bearer token"},
).status_code == 503
deps.admin = DummyAdmin()
deps.recovery_email = ""
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
headers={"Authorization": "Bearer token"},
).status_code == 200
deps.recovery_email = "recovery@example.dev"
response = client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password", "vaultwarden_claim": True},
headers={"Authorization": "Bearer token"},
)
assert response.status_code == 200
assert ("alice", "vaultwarden_email", "recovery@example.dev") in deps.admin.attributes
assert any("INSERT INTO access_request_onboarding_steps" in query for query, _ in deps.conn.executed)
deps.admin = DummyAdmin()
response = client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password"},
)
assert response.status_code == 200
assert ("alice", "vaultwarden_status", "already_present") in deps.admin.attributes
deps.admin = DummyAdmin(fail_attrs=True)
assert client.post(
"/api/access/request/onboarding/attest",
json={"code": "code", "step": "vaultwarden_master_password"},
).status_code == 502
def test_keycloak_rotate_preflight_and_lookup_paths() -> None:
deps = DummyDeps()
client = make_client(deps)
deps.configured_value = False
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503
deps.configured_value = True
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={}).status_code == 400
assert client.post(
"/api/access/request/onboarding/keycloak-password-rotate",
json={"code": "code"},
headers={"Authorization": "bad"},
).status_code == 401
deps.oidc = DummyOidc(fail=True)
assert client.post(
"/api/access/request/onboarding/keycloak-password-rotate",
json={"code": "code"},
headers={"Authorization": "Bearer token"},
).status_code == 401
deps.oidc = DummyOidc()
deps.admin = DummyAdmin(ready=False)
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 503
deps.admin = DummyAdmin()
deps.conn = DummyConn(None)
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "missing"}).status_code == 404
deps.conn = DummyConn(deps.request_row())
deps.oidc = DummyOidc(claims={"preferred_username": "other"})
assert client.post(
"/api/access/request/onboarding/keycloak-password-rotate",
json={"code": "code"},
headers={"Authorization": "Bearer token"},
).status_code == 403
deps.oidc = DummyOidc()
deps.conn = DummyConn(deps.request_row(status="pending"))
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
deps.conn = DummyConn(deps.request_row())
deps.completed_steps = set()
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
def test_keycloak_rotate_success_and_error_paths() -> None:
deps = DummyDeps()
deps.rotation_requested = False
deps.completed_steps = {"profile_reviewed"}
deps.admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"]})
client = make_client(deps)
response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"})
assert response.status_code == 200
assert deps.admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})]
assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in deps.conn.executed)
deps.rotation_requested = True
deps.admin = DummyAdmin(full={"requiredActions": []})
response = client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"})
assert response.status_code == 200
assert deps.admin.updated == []
deps.admin = DummyAdmin(user={})
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 409
deps.conn = DummyConn(deps.request_row(), fail=True)
assert client.post("/api/access/request/onboarding/keycloak-password-rotate", json={"code": "code"}).status_code == 502