from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any from flask import Flask import pytest from atlas_portal.routes import access_request_state as state class DummyResult: def __init__(self, row: dict[str, Any] | None = None, rows: list[dict[str, Any]] | None = None) -> None: self.row = row self.rows = rows or [] def fetchone(self) -> dict[str, Any] | None: return self.row def fetchall(self) -> list[dict[str, Any]]: return self.rows class DummyConn: def __init__( self, *, rows_by_query: dict[str, dict[str, Any] | None] | None = None, many_by_query: dict[str, list[dict[str, Any]]] | None = None, ) -> None: self.rows_by_query = rows_by_query or {} self.many_by_query = many_by_query or {} self.executed: list[tuple[str, object | None]] = [] def execute(self, query: str, params: object | None = None) -> DummyResult: self.executed.append((query, params)) for key, rows in self.many_by_query.items(): if key in query: return DummyResult(rows=rows) for key, row in self.rows_by_query.items(): if key in query: return DummyResult(row=row) return DummyResult() class DummyAdmin: def __init__( self, *, ready: bool = True, user: dict[str, Any] | None = None, full: dict[str, Any] | None = None, groups: list[str] | None = None, fail_find: bool = False, fail_get: bool = False, fail_update: bool = False, ) -> None: self._ready = ready self.user = user if user is not None else {"id": "user-1"} self.full = full if full is not None else {} self.groups = groups or [] self.fail_find = fail_find self.fail_get = fail_get self.fail_update = fail_update self.updated: list[tuple[str, dict[str, Any]]] = [] def ready(self) -> bool: return self._ready def find_user(self, username: str) -> dict[str, Any] | None: if self.fail_find: raise RuntimeError("lookup failed") return self.user def list_user_groups(self, user_id: str) -> list[str]: return self.groups def get_user(self, user_id: str) -> dict[str, Any]: if self.fail_get: raise RuntimeError("get failed") return self.full def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None: if self.fail_update: raise RuntimeError("update failed") self.updated.append((user_id, payload)) def test_request_payload_names_username_and_client_ip(monkeypatch) -> None: app = Flask(__name__) monkeypatch.setattr(state.secrets, "choice", lambda alphabet: "A") with app.test_request_context( "/request", json={ "username": " alice ", "email": " alice@example.com ", "note": " hello ", "first_name": " Alice ", "last_name": " Atlas ", }, headers={"X-Forwarded-For": "203.0.113.10, 10.0.0.1"}, ): assert state._extract_request_payload() == ( "alice", "alice@example.com", "hello", "Alice", "Atlas", ) assert state._client_ip() == "203.0.113.10" with app.test_request_context("/request", headers={"X-Real-IP": "198.51.100.5"}): assert state._client_ip() == "198.51.100.5" with app.test_request_context("/request", environ_base={"REMOTE_ADDR": "192.0.2.10"}): assert state._client_ip() == "192.0.2.10" assert state._normalize_name(" Alice Atlas ") == "Alice Atlas" assert state._validate_name("", label="last name", required=True) == "last name is required" assert state._validate_name("A" * 81, label="last name", required=True) == "last name must be 1-80 characters" assert state._validate_name("Alice\tAtlas", label="last name", required=True) == "last name contains invalid whitespace" assert state._validate_name("", label="first name", required=False) is None assert state._validate_name("Alice Atlas", label="last name", required=True) is None assert state._validate_username("") == "username is required" assert state._validate_username("ab") == "username must be 3-32 characters" assert state._validate_username("bad name") == "username contains invalid characters" assert state._validate_username("alice_ok-1") is None assert state._random_request_code("alice") == "alice~AAAAAAAAAA" def test_verification_url_email_and_error_paths(monkeypatch) -> None: sent: dict[str, str] = {} def fake_send_text_email(*, to_addr: str, subject: str, body: str) -> None: sent.update({"to": to_addr, "subject": subject, "body": body}) monkeypatch.setattr(state.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev/") monkeypatch.setattr(state, "send_text_email", fake_send_text_email) state._send_verification_email(request_code="alice CODE", email="alice@example.dev", token="tok/1") assert sent["to"] == "alice@example.dev" assert "confirm your email" in sent["subject"] assert "alice%20CODE" in sent["body"] assert "tok/1" not in sent["body"] with pytest.raises(state.VerificationError) as missing: state._verify_request(DummyConn(), "missing", "tok") assert missing.value.status_code == 404 non_pending = DummyConn(rows_by_query={"SELECT status": {"status": "approved"}}) assert state._verify_request(non_pending, "code", "tok") == "accounts_building" no_hash = DummyConn(rows_by_query={"SELECT status": {"status": state.EMAIL_VERIFY_PENDING_STATUS}}) with pytest.raises(state.VerificationError) as no_token: state._verify_request(no_hash, "code", "tok") assert no_token.value.status_code == 409 bad_hash = DummyConn( rows_by_query={ "SELECT status": { "status": state.EMAIL_VERIFY_PENDING_STATUS, "email_verification_token_hash": state._hash_verification_token("other"), } } ) with pytest.raises(state.VerificationError) as invalid: state._verify_request(bad_hash, "code", "tok") assert invalid.value.status_code == 401 expired_at = datetime.now() - timedelta(seconds=state.settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC + 5) expired = DummyConn( rows_by_query={ "SELECT status": { "status": state.EMAIL_VERIFY_PENDING_STATUS, "email_verification_token_hash": state._hash_verification_token("tok"), "email_verification_sent_at": expired_at, } } ) with pytest.raises(state.VerificationError) as expired_error: state._verify_request(expired, "code", "tok") assert expired_error.value.status_code == 410 success = DummyConn( rows_by_query={ "SELECT status": { "status": state.EMAIL_VERIFY_PENDING_STATUS, "email_verification_token_hash": state._hash_verification_token("tok"), "email_verification_sent_at": datetime.now(timezone.utc), } } ) assert state._verify_request(success, "code", "tok") == "pending" assert any("UPDATE access_requests" in query for query, _ in success.executed) def test_onboarding_flags_groups_and_recovery_email(monkeypatch) -> None: conn = DummyConn( rows_by_query={ "SELECT approval_flags": { "approval_flags": [state.VAULTWARDEN_GRANDFATHERED_FLAG, "", 7], "contact_email": " contact@example.dev ", } }, many_by_query={ "SELECT step FROM access_request_onboarding_steps": [ {"step": "keycloak_password_rotated"}, {"step": ""}, {"step": 7}, ] }, ) assert state._fetch_completed_onboarding_steps(conn, "code") == {"keycloak_password_rotated"} assert state._normalize_flag_list("one") == {"one"} assert state._normalize_flag_list(["one", "", 2]) == {"one"} assert state._normalize_flag_list(None) == set() assert state._fetch_request_flags_and_email(conn, "code") == ( {state.VAULTWARDEN_GRANDFATHERED_FLAG}, "contact@example.dev", ) assert state._fetch_request_flags_and_email(DummyConn(), "missing") == (set(), "") assert state._vaultwarden_grandfathered(conn, "code", "alice") == (True, "contact@example.dev") admin = DummyAdmin(groups=[state.VAULTWARDEN_GRANDFATHERED_FLAG], full={"email": "real@example.dev"}) monkeypatch.setattr(state, "admin_client", lambda: admin) assert state._user_in_group("", "group") is False assert state._user_in_group("alice", "") is False assert state._user_in_group("alice", state.VAULTWARDEN_GRANDFATHERED_FLAG) is True assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (True, "") assert state._resolve_recovery_email("alice", "fallback@example.dev") == "real@example.dev" monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) assert state._user_in_group("alice", "group") is False assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev" monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) assert state._user_in_group("alice", "group") is False assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (False, "") monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) assert state._user_in_group("alice", "group") is False assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev" def test_keycloak_rotation_and_auto_completed_steps(monkeypatch) -> None: conn = DummyConn(rows_by_query={"SELECT 1": {"exists": True}}) admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {"mailu_app_password": ["pw"]}}) monkeypatch.setattr(state, "admin_client", lambda: admin) state._request_keycloak_password_rotation(conn, "code", "alice") assert admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})] assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in conn.executed) assert state._password_rotation_requested(conn, "code") is True with pytest.raises(ValueError): state._request_keycloak_password_rotation(conn, "code", "") monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) with pytest.raises(RuntimeError): state._request_keycloak_password_rotation(conn, "code", "alice") monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={})) with pytest.raises(RuntimeError): state._request_keycloak_password_rotation(conn, "code", "alice") attrs = { "vaultwarden_status": ["active"], "nextcloud_mail_synced_at": ["now"], "firefly_password_rotated_at": "now", "wger_password_rotated_at": "now", } assert state._extract_attr({"key": ["", "value"]}, "key") == "value" assert state._extract_attr({"key": "value"}, "key") == "value" assert state._extract_attr({"key": ["", 7]}, "key") == "" assert state._extract_attr([], "key") == "" assert state._auto_completed_service_steps(attrs) == { "vaultwarden_master_password", "nextcloud_mail_integration", "firefly_password_rotated", "wger_password_rotated", } assert state._auto_completed_service_steps([]) == set() admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": attrs}) monkeypatch.setattr(state, "admin_client", lambda: admin) completed = state._auto_completed_keycloak_steps(conn, "code", "alice") assert "keycloak_password_rotated" in completed assert admin.updated[-1] == ("user-1", {"requiredActions": []}) monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() fallback_admin = DummyAdmin( user={ "id": "user-1", "requiredActions": [], "attributes": {"vaultwarden_master_password_set_at": ["now"]}, }, fail_get=True, ) monkeypatch.setattr(state, "admin_client", lambda: fallback_admin) assert state._auto_completed_keycloak_steps(conn, "code", "alice") == { "keycloak_password_rotated", "vaultwarden_master_password", } update_fails = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {}}, fail_update=True) monkeypatch.setattr(state, "admin_client", lambda: update_fails) assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {"keycloak_password_rotated"} monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set() assert state._auto_completed_keycloak_steps(conn, "", "alice") == set() assert state._auto_completed_keycloak_steps(conn, "code", "") == set() def test_vaultwarden_status_and_automation_readiness(monkeypatch) -> None: ready_admin = DummyAdmin(full={"attributes": {"vaultwarden_status": ["grandfathered"], "mailu_app_password": ["pw"]}}) monkeypatch.setattr(state, "admin_client", lambda: ready_admin) assert state._vaultwarden_status_for_user("") == "" assert state._vaultwarden_status_for_user("alice") == "grandfathered" assert state._automation_ready(DummyConn(), "code", "alice") is True monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False)) assert state._vaultwarden_status_for_user("alice") == "" assert state._automation_ready(DummyConn(), "code", "alice") is False assert state._automation_ready(DummyConn(), "code", "") is False monkeypatch.setattr(state, "admin_client", lambda: ready_admin) task_conn = DummyConn(rows_by_query={"SELECT 1 FROM access_request_tasks": {"exists": True}}) monkeypatch.setattr(state, "provision_tasks_complete", lambda conn, code: True) assert state._automation_ready(task_conn, "code", "alice") is True monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user=None)) assert state._automation_ready(DummyConn(), "code", "alice") is False monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={})) assert state._automation_ready(DummyConn(), "code", "alice") is False monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""})) assert state._vaultwarden_status_for_user("alice") == "" assert state._automation_ready(DummyConn(), "code", "alice") is False monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(full={"attributes": "bad"})) assert state._automation_ready(DummyConn(), "code", "alice") is False monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True)) assert state._vaultwarden_status_for_user("alice") == "" assert state._automation_ready(DummyConn(), "code", "alice") is False def test_status_transitions_and_payload(monkeypatch) -> None: conn = DummyConn(rows_by_query={"SELECT approval_flags": {"approval_flags": [], "contact_email": "owner@example.dev"}}) monkeypatch.setattr(state, "_automation_ready", lambda conn, code, username: True) assert state._advance_status(conn, "code", "alice", "approved") == "awaiting_onboarding" assert state._advance_status(conn, "code", "alice", "pending") == "pending" required = set(state.ONBOARDING_REQUIRED_STEPS) monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required) monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev")) monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "") assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready" required_with_vault = required | {"vaultwarden_store_temp_password"} monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required_with_vault) monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev")) monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered") assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready" monkeypatch.setattr(state, "_password_rotation_requested", lambda conn, code: True) monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev")) monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "") payload = state._onboarding_payload(conn, "code", "alice") assert payload["keycloak"]["password_rotation_requested"] is True assert payload["vaultwarden"]["grandfathered"] is False monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev")) monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered") monkeypatch.setattr(state, "_resolve_recovery_email", lambda username, fallback: fallback) payload = state._onboarding_payload(conn, "code", "alice") assert "vaultwarden_store_temp_password" in payload["required_steps"] assert payload["vaultwarden"]["matched"] is True def test_completed_onboarding_steps_merges_manual_and_auto(monkeypatch) -> None: manual_conn = DummyConn(many_by_query={"SELECT step FROM access_request_onboarding_steps": [{"step": "manual"}]}) monkeypatch.setattr(state, "_auto_completed_keycloak_steps", lambda conn, code, username: {"auto"}) assert state._completed_onboarding_steps(manual_conn, "code", "alice") == {"manual", "auto"}