test(bstein-home): cover access request state
This commit is contained in:
parent
2dfbf86a93
commit
c675ff9787
@ -44,13 +44,13 @@ def _normalize_name(value: str) -> str:
|
||||
|
||||
|
||||
def _validate_name(value: str, *, label: str, required: bool) -> str | None:
|
||||
if any(ch in "\r\n\t" for ch in value):
|
||||
return f"{label} contains invalid whitespace"
|
||||
cleaned = _normalize_name(value)
|
||||
if not cleaned:
|
||||
return f"{label} is required" if required else None
|
||||
if len(cleaned) > 80:
|
||||
return f"{label} must be 1-80 characters"
|
||||
if any(ch in "\r\n\t" for ch in cleaned):
|
||||
return f"{label} contains invalid whitespace"
|
||||
return None
|
||||
|
||||
|
||||
@ -88,7 +88,7 @@ def _hash_verification_token(token: str) -> str:
|
||||
|
||||
def _verify_url(request_code: str, token: str) -> str:
|
||||
base = settings.PORTAL_PUBLIC_BASE_URL.rstrip("/")
|
||||
return f"{base}/api/access/request/verify-link?code={quote(request_code)}&token={quote(token)}"
|
||||
return f"{base}/api/access/request/verify-link?code={quote(request_code, safe='')}&token={quote(token, safe='')}"
|
||||
|
||||
|
||||
def _send_verification_email(*, request_code: str, email: str, token: str) -> None:
|
||||
|
||||
389
backend/tests/test_access_request_state.py
Normal file
389
backend/tests/test_access_request_state.py
Normal file
@ -0,0 +1,389 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from flask import Flask
|
||||
import pytest
|
||||
|
||||
from atlas_portal.routes import access_request_state as state
|
||||
|
||||
|
||||
class DummyResult:
|
||||
def __init__(self, row: dict[str, Any] | None = None, rows: list[dict[str, Any]] | None = None) -> None:
|
||||
self.row = row
|
||||
self.rows = rows or []
|
||||
|
||||
def fetchone(self) -> dict[str, Any] | None:
|
||||
return self.row
|
||||
|
||||
def fetchall(self) -> list[dict[str, Any]]:
|
||||
return self.rows
|
||||
|
||||
|
||||
class DummyConn:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
rows_by_query: dict[str, dict[str, Any] | None] | None = None,
|
||||
many_by_query: dict[str, list[dict[str, Any]]] | None = None,
|
||||
) -> None:
|
||||
self.rows_by_query = rows_by_query or {}
|
||||
self.many_by_query = many_by_query or {}
|
||||
self.executed: list[tuple[str, object | None]] = []
|
||||
|
||||
def execute(self, query: str, params: object | None = None) -> DummyResult:
|
||||
self.executed.append((query, params))
|
||||
for key, rows in self.many_by_query.items():
|
||||
if key in query:
|
||||
return DummyResult(rows=rows)
|
||||
for key, row in self.rows_by_query.items():
|
||||
if key in query:
|
||||
return DummyResult(row=row)
|
||||
return DummyResult()
|
||||
|
||||
|
||||
class DummyAdmin:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
ready: bool = True,
|
||||
user: dict[str, Any] | None = None,
|
||||
full: dict[str, Any] | None = None,
|
||||
groups: list[str] | None = None,
|
||||
fail_find: bool = False,
|
||||
fail_get: bool = False,
|
||||
fail_update: bool = False,
|
||||
) -> None:
|
||||
self._ready = ready
|
||||
self.user = user if user is not None else {"id": "user-1"}
|
||||
self.full = full if full is not None else {}
|
||||
self.groups = groups or []
|
||||
self.fail_find = fail_find
|
||||
self.fail_get = fail_get
|
||||
self.fail_update = fail_update
|
||||
self.updated: list[tuple[str, dict[str, Any]]] = []
|
||||
|
||||
def ready(self) -> bool:
|
||||
return self._ready
|
||||
|
||||
def find_user(self, username: str) -> dict[str, Any] | None:
|
||||
if self.fail_find:
|
||||
raise RuntimeError("lookup failed")
|
||||
return self.user
|
||||
|
||||
def list_user_groups(self, user_id: str) -> list[str]:
|
||||
return self.groups
|
||||
|
||||
def get_user(self, user_id: str) -> dict[str, Any]:
|
||||
if self.fail_get:
|
||||
raise RuntimeError("get failed")
|
||||
return self.full
|
||||
|
||||
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
|
||||
if self.fail_update:
|
||||
raise RuntimeError("update failed")
|
||||
self.updated.append((user_id, payload))
|
||||
|
||||
|
||||
def test_request_payload_names_username_and_client_ip(monkeypatch) -> None:
|
||||
app = Flask(__name__)
|
||||
monkeypatch.setattr(state.secrets, "choice", lambda alphabet: "A")
|
||||
|
||||
with app.test_request_context(
|
||||
"/request",
|
||||
json={
|
||||
"username": " alice ",
|
||||
"email": " alice@example.com ",
|
||||
"note": " hello ",
|
||||
"first_name": " Alice ",
|
||||
"last_name": " Atlas ",
|
||||
},
|
||||
headers={"X-Forwarded-For": "203.0.113.10, 10.0.0.1"},
|
||||
):
|
||||
assert state._extract_request_payload() == (
|
||||
"alice",
|
||||
"alice@example.com",
|
||||
"hello",
|
||||
"Alice",
|
||||
"Atlas",
|
||||
)
|
||||
assert state._client_ip() == "203.0.113.10"
|
||||
|
||||
with app.test_request_context("/request", headers={"X-Real-IP": "198.51.100.5"}):
|
||||
assert state._client_ip() == "198.51.100.5"
|
||||
with app.test_request_context("/request", environ_base={"REMOTE_ADDR": "192.0.2.10"}):
|
||||
assert state._client_ip() == "192.0.2.10"
|
||||
|
||||
assert state._normalize_name(" Alice Atlas ") == "Alice Atlas"
|
||||
assert state._validate_name("", label="last name", required=True) == "last name is required"
|
||||
assert state._validate_name("A" * 81, label="last name", required=True) == "last name must be 1-80 characters"
|
||||
assert state._validate_name("Alice\tAtlas", label="last name", required=True) == "last name contains invalid whitespace"
|
||||
assert state._validate_name("", label="first name", required=False) is None
|
||||
assert state._validate_name("Alice Atlas", label="last name", required=True) is None
|
||||
assert state._validate_username("") == "username is required"
|
||||
assert state._validate_username("ab") == "username must be 3-32 characters"
|
||||
assert state._validate_username("bad name") == "username contains invalid characters"
|
||||
assert state._validate_username("alice_ok-1") is None
|
||||
assert state._random_request_code("alice") == "alice~AAAAAAAAAA"
|
||||
|
||||
|
||||
def test_verification_url_email_and_error_paths(monkeypatch) -> None:
|
||||
sent: dict[str, str] = {}
|
||||
|
||||
def fake_send_text_email(*, to_addr: str, subject: str, body: str) -> None:
|
||||
sent.update({"to": to_addr, "subject": subject, "body": body})
|
||||
|
||||
monkeypatch.setattr(state.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev/")
|
||||
monkeypatch.setattr(state, "send_text_email", fake_send_text_email)
|
||||
state._send_verification_email(request_code="alice CODE", email="alice@example.dev", token="tok/1")
|
||||
|
||||
assert sent["to"] == "alice@example.dev"
|
||||
assert "confirm your email" in sent["subject"]
|
||||
assert "alice%20CODE" in sent["body"]
|
||||
assert "tok/1" not in sent["body"]
|
||||
|
||||
with pytest.raises(state.VerificationError) as missing:
|
||||
state._verify_request(DummyConn(), "missing", "tok")
|
||||
assert missing.value.status_code == 404
|
||||
|
||||
non_pending = DummyConn(rows_by_query={"SELECT status": {"status": "approved"}})
|
||||
assert state._verify_request(non_pending, "code", "tok") == "accounts_building"
|
||||
|
||||
no_hash = DummyConn(rows_by_query={"SELECT status": {"status": state.EMAIL_VERIFY_PENDING_STATUS}})
|
||||
with pytest.raises(state.VerificationError) as no_token:
|
||||
state._verify_request(no_hash, "code", "tok")
|
||||
assert no_token.value.status_code == 409
|
||||
|
||||
bad_hash = DummyConn(
|
||||
rows_by_query={
|
||||
"SELECT status": {
|
||||
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||||
"email_verification_token_hash": state._hash_verification_token("other"),
|
||||
}
|
||||
}
|
||||
)
|
||||
with pytest.raises(state.VerificationError) as invalid:
|
||||
state._verify_request(bad_hash, "code", "tok")
|
||||
assert invalid.value.status_code == 401
|
||||
|
||||
expired_at = datetime.now() - timedelta(seconds=state.settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC + 5)
|
||||
expired = DummyConn(
|
||||
rows_by_query={
|
||||
"SELECT status": {
|
||||
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||||
"email_verification_token_hash": state._hash_verification_token("tok"),
|
||||
"email_verification_sent_at": expired_at,
|
||||
}
|
||||
}
|
||||
)
|
||||
with pytest.raises(state.VerificationError) as expired_error:
|
||||
state._verify_request(expired, "code", "tok")
|
||||
assert expired_error.value.status_code == 410
|
||||
|
||||
success = DummyConn(
|
||||
rows_by_query={
|
||||
"SELECT status": {
|
||||
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||||
"email_verification_token_hash": state._hash_verification_token("tok"),
|
||||
"email_verification_sent_at": datetime.now(timezone.utc),
|
||||
}
|
||||
}
|
||||
)
|
||||
assert state._verify_request(success, "code", "tok") == "pending"
|
||||
assert any("UPDATE access_requests" in query for query, _ in success.executed)
|
||||
|
||||
|
||||
def test_onboarding_flags_groups_and_recovery_email(monkeypatch) -> None:
|
||||
conn = DummyConn(
|
||||
rows_by_query={
|
||||
"SELECT approval_flags": {
|
||||
"approval_flags": [state.VAULTWARDEN_GRANDFATHERED_FLAG, "", 7],
|
||||
"contact_email": " contact@example.dev ",
|
||||
}
|
||||
},
|
||||
many_by_query={
|
||||
"SELECT step FROM access_request_onboarding_steps": [
|
||||
{"step": "keycloak_password_rotated"},
|
||||
{"step": ""},
|
||||
{"step": 7},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
assert state._fetch_completed_onboarding_steps(conn, "code") == {"keycloak_password_rotated"}
|
||||
assert state._normalize_flag_list("one") == {"one"}
|
||||
assert state._normalize_flag_list(["one", "", 2]) == {"one"}
|
||||
assert state._normalize_flag_list(None) == set()
|
||||
assert state._fetch_request_flags_and_email(conn, "code") == (
|
||||
{state.VAULTWARDEN_GRANDFATHERED_FLAG},
|
||||
"contact@example.dev",
|
||||
)
|
||||
assert state._fetch_request_flags_and_email(DummyConn(), "missing") == (set(), "")
|
||||
assert state._vaultwarden_grandfathered(conn, "code", "alice") == (True, "contact@example.dev")
|
||||
|
||||
admin = DummyAdmin(groups=[state.VAULTWARDEN_GRANDFATHERED_FLAG], full={"email": "real@example.dev"})
|
||||
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||||
|
||||
assert state._user_in_group("", "group") is False
|
||||
assert state._user_in_group("alice", "") is False
|
||||
assert state._user_in_group("alice", state.VAULTWARDEN_GRANDFATHERED_FLAG) is True
|
||||
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (True, "")
|
||||
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "real@example.dev"
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||||
assert state._user_in_group("alice", "group") is False
|
||||
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||||
assert state._user_in_group("alice", "group") is False
|
||||
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (False, "")
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||||
assert state._user_in_group("alice", "group") is False
|
||||
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
|
||||
|
||||
|
||||
def test_keycloak_rotation_and_auto_completed_steps(monkeypatch) -> None:
|
||||
conn = DummyConn(rows_by_query={"SELECT 1": {"exists": True}})
|
||||
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {"mailu_app_password": ["pw"]}})
|
||||
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||||
|
||||
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||||
assert admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})]
|
||||
assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in conn.executed)
|
||||
assert state._password_rotation_requested(conn, "code") is True
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
state._request_keycloak_password_rotation(conn, "code", "")
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||||
with pytest.raises(RuntimeError):
|
||||
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
|
||||
with pytest.raises(RuntimeError):
|
||||
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||||
|
||||
attrs = {
|
||||
"vaultwarden_status": ["active"],
|
||||
"nextcloud_mail_synced_at": ["now"],
|
||||
"firefly_password_rotated_at": "now",
|
||||
"wger_password_rotated_at": "now",
|
||||
}
|
||||
assert state._extract_attr({"key": ["", "value"]}, "key") == "value"
|
||||
assert state._extract_attr({"key": "value"}, "key") == "value"
|
||||
assert state._extract_attr({"key": ["", 7]}, "key") == ""
|
||||
assert state._extract_attr([], "key") == ""
|
||||
assert state._auto_completed_service_steps(attrs) == {
|
||||
"vaultwarden_master_password",
|
||||
"nextcloud_mail_integration",
|
||||
"firefly_password_rotated",
|
||||
"wger_password_rotated",
|
||||
}
|
||||
assert state._auto_completed_service_steps([]) == set()
|
||||
|
||||
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": attrs})
|
||||
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||||
completed = state._auto_completed_keycloak_steps(conn, "code", "alice")
|
||||
assert "keycloak_password_rotated" in completed
|
||||
assert admin.updated[-1] == ("user-1", {"requiredActions": []})
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||||
|
||||
fallback_admin = DummyAdmin(
|
||||
user={
|
||||
"id": "user-1",
|
||||
"requiredActions": [],
|
||||
"attributes": {"vaultwarden_master_password_set_at": ["now"]},
|
||||
},
|
||||
fail_get=True,
|
||||
)
|
||||
monkeypatch.setattr(state, "admin_client", lambda: fallback_admin)
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {
|
||||
"keycloak_password_rotated",
|
||||
"vaultwarden_master_password",
|
||||
}
|
||||
|
||||
update_fails = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {}}, fail_update=True)
|
||||
monkeypatch.setattr(state, "admin_client", lambda: update_fails)
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {"keycloak_password_rotated"}
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||||
assert state._auto_completed_keycloak_steps(conn, "", "alice") == set()
|
||||
assert state._auto_completed_keycloak_steps(conn, "code", "") == set()
|
||||
|
||||
|
||||
def test_vaultwarden_status_and_automation_readiness(monkeypatch) -> None:
|
||||
ready_admin = DummyAdmin(full={"attributes": {"vaultwarden_status": ["grandfathered"], "mailu_app_password": ["pw"]}})
|
||||
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
|
||||
|
||||
assert state._vaultwarden_status_for_user("") == ""
|
||||
assert state._vaultwarden_status_for_user("alice") == "grandfathered"
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is True
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||||
assert state._vaultwarden_status_for_user("alice") == ""
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
assert state._automation_ready(DummyConn(), "code", "") is False
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
|
||||
task_conn = DummyConn(rows_by_query={"SELECT 1 FROM access_request_tasks": {"exists": True}})
|
||||
monkeypatch.setattr(state, "provision_tasks_complete", lambda conn, code: True)
|
||||
assert state._automation_ready(task_conn, "code", "alice") is True
|
||||
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user=None))
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||||
assert state._vaultwarden_status_for_user("alice") == ""
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(full={"attributes": "bad"}))
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||||
assert state._vaultwarden_status_for_user("alice") == ""
|
||||
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||||
|
||||
|
||||
def test_status_transitions_and_payload(monkeypatch) -> None:
|
||||
conn = DummyConn(rows_by_query={"SELECT approval_flags": {"approval_flags": [], "contact_email": "owner@example.dev"}})
|
||||
|
||||
monkeypatch.setattr(state, "_automation_ready", lambda conn, code, username: True)
|
||||
assert state._advance_status(conn, "code", "alice", "approved") == "awaiting_onboarding"
|
||||
assert state._advance_status(conn, "code", "alice", "pending") == "pending"
|
||||
|
||||
required = set(state.ONBOARDING_REQUIRED_STEPS)
|
||||
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required)
|
||||
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
|
||||
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
|
||||
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
|
||||
|
||||
required_with_vault = required | {"vaultwarden_store_temp_password"}
|
||||
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required_with_vault)
|
||||
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
|
||||
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
|
||||
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
|
||||
|
||||
monkeypatch.setattr(state, "_password_rotation_requested", lambda conn, code: True)
|
||||
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
|
||||
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
|
||||
payload = state._onboarding_payload(conn, "code", "alice")
|
||||
assert payload["keycloak"]["password_rotation_requested"] is True
|
||||
assert payload["vaultwarden"]["grandfathered"] is False
|
||||
|
||||
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
|
||||
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
|
||||
monkeypatch.setattr(state, "_resolve_recovery_email", lambda username, fallback: fallback)
|
||||
payload = state._onboarding_payload(conn, "code", "alice")
|
||||
assert "vaultwarden_store_temp_password" in payload["required_steps"]
|
||||
assert payload["vaultwarden"]["matched"] is True
|
||||
|
||||
|
||||
def test_completed_onboarding_steps_merges_manual_and_auto(monkeypatch) -> None:
|
||||
manual_conn = DummyConn(many_by_query={"SELECT step FROM access_request_onboarding_steps": [{"step": "manual"}]})
|
||||
monkeypatch.setattr(state, "_auto_completed_keycloak_steps", lambda conn, code, username: {"auto"})
|
||||
|
||||
assert state._completed_onboarding_steps(manual_conn, "code", "alice") == {"manual", "auto"}
|
||||
Loading…
x
Reference in New Issue
Block a user