diff --git a/backend/atlas_portal/routes/access_request_state.py b/backend/atlas_portal/routes/access_request_state.py index 546a818..fb70eeb 100644 --- a/backend/atlas_portal/routes/access_request_state.py +++ b/backend/atlas_portal/routes/access_request_state.py @@ -44,13 +44,13 @@ def _normalize_name(value: str) -> str: def _validate_name(value: str, *, label: str, required: bool) -> str | None: + if any(ch in "\r\n\t" for ch in value): + return f"{label} contains invalid whitespace" cleaned = _normalize_name(value) if not cleaned: return f"{label} is required" if required else None if len(cleaned) > 80: return f"{label} must be 1-80 characters" - if any(ch in "\r\n\t" for ch in cleaned): - return f"{label} contains invalid whitespace" return None @@ -88,7 +88,7 @@ def _hash_verification_token(token: str) -> str: def _verify_url(request_code: str, token: str) -> str: base = settings.PORTAL_PUBLIC_BASE_URL.rstrip("/") - return f"{base}/api/access/request/verify-link?code={quote(request_code)}&token={quote(token)}" + return f"{base}/api/access/request/verify-link?code={quote(request_code, safe='')}&token={quote(token, safe='')}" def _send_verification_email(*, request_code: str, email: str, token: str) -> None: diff --git a/backend/tests/test_access_request_state.py b/backend/tests/test_access_request_state.py new file mode 100644 index 0000000..fc38802 --- /dev/null +++ b/backend/tests/test_access_request_state.py @@ -0,0 +1,389 @@ +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Any + +from flask import Flask +import pytest + +from atlas_portal.routes import access_request_state as state + + +class DummyResult: + def __init__(self, row: dict[str, Any] | None = None, rows: list[dict[str, Any]] | None = None) -> None: + self.row = row + self.rows = rows or [] + + def fetchone(self) -> dict[str, Any] | None: + return self.row + + def fetchall(self) -> list[dict[str, Any]]: + return self.rows + + +class DummyConn: + def __init__( + self, + *, + rows_by_query: dict[str, dict[str, Any] | None] | None = None, + many_by_query: dict[str, list[dict[str, Any]]] | None = None, + ) -> None: + self.rows_by_query = rows_by_query or {} + self.many_by_query = many_by_query or {} + self.executed: list[tuple[str, object | None]] = [] + + def execute(self, query: str, params: object | None = None) -> DummyResult: + self.executed.append((query, params)) + for key, rows in self.many_by_query.items(): + if key in query: + return DummyResult(rows=rows) + for key, row in self.rows_by_query.items(): + if key in query: + return DummyResult(row=row) + return DummyResult() + + +class DummyAdmin: + def __init__( + self, + *, + ready: bool = True, + user: dict[str, Any] | None = None, + full: dict[str, Any] | None = None, + groups: list[str] | None = None, + fail_find: bool = False, + fail_get: bool = False, + fail_update: bool = False, + ) -> None: + self._ready = ready + self.user = user if user is not None else {"id": "user-1"} + self.full = full if full is not None else {} + self.groups = groups or [] + self.fail_find = fail_find + self.fail_get = fail_get + self.fail_update = fail_update + self.updated: list[tuple[str, dict[str, Any]]] = [] + + def ready(self) -> bool: + return self._ready + + def find_user(self, username: str) -> dict[str, Any] | None: + if self.fail_find: + raise RuntimeError("lookup failed") + return self.user + + def list_user_groups(self, user_id: str) -> list[str]: + return self.groups + + def get_user(self, user_id: str) -> dict[str, Any]: + if self.fail_get: + raise RuntimeError("get failed") + return self.full + + def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None: + if self.fail_update: + raise RuntimeError("update failed") + self.updated.append((user_id, payload)) + + +def test_request_payload_names_username_and_client_ip(monkeypatch) -> None: + app = Flask(__name__) + monkeypatch.setattr(state.secrets, "choice", lambda alphabet: "A") + + with app.test_request_context( + "/request", + json={ + "username": " alice ", + "email": " alice@example.com ", + "note": " hello ", + "first_name": " Alice ", + "last_name": " Atlas ", + }, + headers={"X-Forwarded-For": "203.0.113.10, 10.0.0.1"}, + ): + assert state._extract_request_payload() == ( + "alice", + "alice@example.com", + "hello", + "Alice", + "Atlas", + ) + assert state._client_ip() == "203.0.113.10" + + with app.test_request_context("/request", headers={"X-Real-IP": "198.51.100.5"}): + assert state._client_ip() == "198.51.100.5" + with app.test_request_context("/request", environ_base={"REMOTE_ADDR": "192.0.2.10"}): + assert state._client_ip() == "192.0.2.10" + + assert state._normalize_name(" Alice Atlas ") == "Alice Atlas" + assert state._validate_name("", label="last name", required=True) == "last name is required" + assert state._validate_name("A" * 81, label="last name", required=True) == "last name must be 1-80 characters" + assert state._validate_name("Alice\tAtlas", label="last name", required=True) == "last name contains invalid whitespace" + assert state._validate_name("", label="first name", required=False) is None + assert state._validate_name("Alice Atlas", label="last name", required=True) is None + assert state._validate_username("") == "username is required" + assert state._validate_username("ab") == "username must be 3-32 characters" + assert state._validate_username("bad name") == "username contains invalid characters" + assert state._validate_username("alice_ok-1") is None + assert state._random_request_code("alice") == "alice~AAAAAAAAAA" + + +def test_verification_url_email_and_error_paths(monkeypatch) -> None: + sent: dict[str, str] = {} + + def fake_send_text_email(*, to_addr: str, subject: str, body: str) -> None: + sent.update({"to": to_addr, "subject": subject, "body": body}) + + monkeypatch.setattr(state.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev/") + monkeypatch.setattr(state, "send_text_email", fake_send_text_email) + state._send_verification_email(request_code="alice CODE", email="alice@example.dev", token="tok/1") + + assert sent["to"] == "alice@example.dev" + assert "confirm your email" in sent["subject"] + assert "alice%20CODE" in sent["body"] + assert "tok/1" not in sent["body"] + + with pytest.raises(state.VerificationError) as missing: + state._verify_request(DummyConn(), "missing", "tok") + assert missing.value.status_code == 404 + + non_pending = DummyConn(rows_by_query={"SELECT status": {"status": "approved"}}) + assert state._verify_request(non_pending, "code", "tok") == "accounts_building" + + no_hash = DummyConn(rows_by_query={"SELECT status": {"status": state.EMAIL_VERIFY_PENDING_STATUS}}) + with pytest.raises(state.VerificationError) as no_token: + state._verify_request(no_hash, "code", "tok") + assert no_token.value.status_code == 409 + + bad_hash = DummyConn( + rows_by_query={ + "SELECT status": { + "status": state.EMAIL_VERIFY_PENDING_STATUS, + "email_verification_token_hash": state._hash_verification_token("other"), + } + } + ) + with pytest.raises(state.VerificationError) as invalid: + state._verify_request(bad_hash, "code", "tok") + assert invalid.value.status_code == 401 + + expired_at = datetime.now() - timedelta(seconds=state.settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC + 5) + expired = DummyConn( + rows_by_query={ + "SELECT status": { + "status": state.EMAIL_VERIFY_PENDING_STATUS, + "email_verification_token_hash": state._hash_verification_token("tok"), + "email_verification_sent_at": expired_at, + } + } + ) + with pytest.raises(state.VerificationError) as expired_error: + state._verify_request(expired, "code", "tok") + assert expired_error.value.status_code == 410 + + success = DummyConn( + rows_by_query={ + "SELECT status": { + "status": state.EMAIL_VERIFY_PENDING_STATUS, + "email_verification_token_hash": state._hash_verification_token("tok"), + "email_verification_sent_at": datetime.now(timezone.utc), + } + } + ) + assert state._verify_request(success, "code", "tok") == "pending" + assert any("UPDATE access_requests" in query for query, _ in success.executed) + + +def test_onboarding_flags_groups_and_recovery_email(monkeypatch) -> None: + conn = DummyConn( + rows_by_query={ + "SELECT approval_flags": { + "approval_flags": [state.VAULTWARDEN_GRANDFATHERED_FLAG, "", 7], + "contact_email": " contact@example.dev ", + } + }, + many_by_query={ + "SELECT step FROM access_request_onboarding_steps": [ + {"step": "keycloak_password_rotated"}, + {"step": ""}, + {"step": 7}, + ] + }, + ) + + assert state._fetch_completed_onboarding_steps(conn, "code") == {"keycloak_password_rotated"} + assert state._normalize_flag_list("one") == {"one"} + assert state._normalize_flag_list(["one", "", 2]) == {"one"} + assert state._normalize_flag_list(None) == set() + assert state._fetch_request_flags_and_email(conn, "code") == ( + {state.VAULTWARDEN_GRANDFATHERED_FLAG}, + "contact@example.dev", + ) + assert state._fetch_request_flags_and_email(DummyConn(), "missing") == (set(), "") + assert state._vaultwarden_grandfathered(conn, "code", "alice") == (True, "contact@example.dev") + + admin = DummyAdmin(groups=[state.VAULTWARDEN_GRANDFATHERED_FLAG], full={"email": "real@example.dev"}) + monkeypatch.setattr(state, "admin_client", lambda: admin) + + assert state._user_in_group("", "group") is False + assert state._user_in_group("alice", "") is False + assert state._user_in_group("alice", state.VAULTWARDEN_GRANDFATHERED_FLAG) is True + assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (True, "") + assert state._resolve_recovery_email("alice", "fallback@example.dev") == "real@example.dev" + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) + assert state._user_in_group("alice", "group") is False + assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev" + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) + assert state._user_in_group("alice", "group") is False + assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (False, "") + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) + assert state._user_in_group("alice", "group") is False + assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev" + + +def test_keycloak_rotation_and_auto_completed_steps(monkeypatch) -> None: + conn = DummyConn(rows_by_query={"SELECT 1": {"exists": True}}) + admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {"mailu_app_password": ["pw"]}}) + monkeypatch.setattr(state, "admin_client", lambda: admin) + + state._request_keycloak_password_rotation(conn, "code", "alice") + assert admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})] + assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in conn.executed) + assert state._password_rotation_requested(conn, "code") is True + + with pytest.raises(ValueError): + state._request_keycloak_password_rotation(conn, "code", "") + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) + with pytest.raises(RuntimeError): + state._request_keycloak_password_rotation(conn, "code", "alice") + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={})) + with pytest.raises(RuntimeError): + state._request_keycloak_password_rotation(conn, "code", "alice") + + attrs = { + "vaultwarden_status": ["active"], + "nextcloud_mail_synced_at": ["now"], + "firefly_password_rotated_at": "now", + "wger_password_rotated_at": "now", + } + assert state._extract_attr({"key": ["", "value"]}, "key") == "value" + assert state._extract_attr({"key": "value"}, "key") == "value" + assert state._extract_attr({"key": ["", 7]}, "key") == "" + assert state._extract_attr([], "key") == "" + assert state._auto_completed_service_steps(attrs) == { + "vaultwarden_master_password", + "nextcloud_mail_integration", + "firefly_password_rotated", + "wger_password_rotated", + } + assert state._auto_completed_service_steps([]) == set() + + admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": attrs}) + monkeypatch.setattr(state, "admin_client", lambda: admin) + completed = state._auto_completed_keycloak_steps(conn, "code", "alice") + assert "keycloak_password_rotated" in completed + assert admin.updated[-1] == ("user-1", {"requiredActions": []}) + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) + assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() + + fallback_admin = DummyAdmin( + user={ + "id": "user-1", + "requiredActions": [], + "attributes": {"vaultwarden_master_password_set_at": ["now"]}, + }, + fail_get=True, + ) + monkeypatch.setattr(state, "admin_client", lambda: fallback_admin) + assert state._auto_completed_keycloak_steps(conn, "code", "alice") == { + "keycloak_password_rotated", + "vaultwarden_master_password", + } + + update_fails = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {}}, fail_update=True) + monkeypatch.setattr(state, "admin_client", lambda: update_fails) + assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {"keycloak_password_rotated"} + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) + assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) + assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() + assert state._auto_completed_keycloak_steps(conn, "", "alice") == set() + assert state._auto_completed_keycloak_steps(conn, "code", "") == set() + + +def test_vaultwarden_status_and_automation_readiness(monkeypatch) -> None: + ready_admin = DummyAdmin(full={"attributes": {"vaultwarden_status": ["grandfathered"], "mailu_app_password": ["pw"]}}) + monkeypatch.setattr(state, "admin_client", lambda: ready_admin) + + assert state._vaultwarden_status_for_user("") == "" + assert state._vaultwarden_status_for_user("alice") == "grandfathered" + assert state._automation_ready(DummyConn(), "code", "alice") is True + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) + assert state._vaultwarden_status_for_user("alice") == "" + assert state._automation_ready(DummyConn(), "code", "alice") is False + assert state._automation_ready(DummyConn(), "code", "") is False + + monkeypatch.setattr(state, "admin_client", lambda: ready_admin) + task_conn = DummyConn(rows_by_query={"SELECT 1 FROM access_request_tasks": {"exists": True}}) + monkeypatch.setattr(state, "provision_tasks_complete", lambda conn, code: True) + assert state._automation_ready(task_conn, "code", "alice") is True + + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user=None)) + assert state._automation_ready(DummyConn(), "code", "alice") is False + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={})) + assert state._automation_ready(DummyConn(), "code", "alice") is False + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) + assert state._vaultwarden_status_for_user("alice") == "" + assert state._automation_ready(DummyConn(), "code", "alice") is False + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(full={"attributes": "bad"})) + assert state._automation_ready(DummyConn(), "code", "alice") is False + monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) + assert state._vaultwarden_status_for_user("alice") == "" + assert state._automation_ready(DummyConn(), "code", "alice") is False + + +def test_status_transitions_and_payload(monkeypatch) -> None: + conn = DummyConn(rows_by_query={"SELECT approval_flags": {"approval_flags": [], "contact_email": "owner@example.dev"}}) + + monkeypatch.setattr(state, "_automation_ready", lambda conn, code, username: True) + assert state._advance_status(conn, "code", "alice", "approved") == "awaiting_onboarding" + assert state._advance_status(conn, "code", "alice", "pending") == "pending" + + required = set(state.ONBOARDING_REQUIRED_STEPS) + monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required) + monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev")) + monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "") + assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready" + + required_with_vault = required | {"vaultwarden_store_temp_password"} + monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required_with_vault) + monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev")) + monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered") + assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready" + + monkeypatch.setattr(state, "_password_rotation_requested", lambda conn, code: True) + monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev")) + monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "") + payload = state._onboarding_payload(conn, "code", "alice") + assert payload["keycloak"]["password_rotation_requested"] is True + assert payload["vaultwarden"]["grandfathered"] is False + + monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev")) + monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered") + monkeypatch.setattr(state, "_resolve_recovery_email", lambda username, fallback: fallback) + payload = state._onboarding_payload(conn, "code", "alice") + assert "vaultwarden_store_temp_password" in payload["required_steps"] + assert payload["vaultwarden"]["matched"] is True + + +def test_completed_onboarding_steps_merges_manual_and_auto(monkeypatch) -> None: + manual_conn = DummyConn(many_by_query={"SELECT step FROM access_request_onboarding_steps": [{"step": "manual"}]}) + monkeypatch.setattr(state, "_auto_completed_keycloak_steps", lambda conn, code, username: {"auto"}) + + assert state._completed_onboarding_steps(manual_conn, "code", "alice") == {"manual", "auto"}