bstein-dev-home/backend/tests/test_access_request_state.py

390 lines
18 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from flask import Flask
import pytest
from atlas_portal.routes import access_request_state as state
class DummyResult:
def __init__(self, row: dict[str, Any] | None = None, rows: list[dict[str, Any]] | None = None) -> None:
self.row = row
self.rows = rows or []
def fetchone(self) -> dict[str, Any] | None:
return self.row
def fetchall(self) -> list[dict[str, Any]]:
return self.rows
class DummyConn:
def __init__(
self,
*,
rows_by_query: dict[str, dict[str, Any] | None] | None = None,
many_by_query: dict[str, list[dict[str, Any]]] | None = None,
) -> None:
self.rows_by_query = rows_by_query or {}
self.many_by_query = many_by_query or {}
self.executed: list[tuple[str, object | None]] = []
def execute(self, query: str, params: object | None = None) -> DummyResult:
self.executed.append((query, params))
for key, rows in self.many_by_query.items():
if key in query:
return DummyResult(rows=rows)
for key, row in self.rows_by_query.items():
if key in query:
return DummyResult(row=row)
return DummyResult()
class DummyAdmin:
def __init__(
self,
*,
ready: bool = True,
user: dict[str, Any] | None = None,
full: dict[str, Any] | None = None,
groups: list[str] | None = None,
fail_find: bool = False,
fail_get: bool = False,
fail_update: bool = False,
) -> None:
self._ready = ready
self.user = user if user is not None else {"id": "user-1"}
self.full = full if full is not None else {}
self.groups = groups or []
self.fail_find = fail_find
self.fail_get = fail_get
self.fail_update = fail_update
self.updated: list[tuple[str, dict[str, Any]]] = []
def ready(self) -> bool:
return self._ready
def find_user(self, username: str) -> dict[str, Any] | None:
if self.fail_find:
raise RuntimeError("lookup failed")
return self.user
def list_user_groups(self, user_id: str) -> list[str]:
return self.groups
def get_user(self, user_id: str) -> dict[str, Any]:
if self.fail_get:
raise RuntimeError("get failed")
return self.full
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
if self.fail_update:
raise RuntimeError("update failed")
self.updated.append((user_id, payload))
def test_request_payload_names_username_and_client_ip(monkeypatch) -> None:
app = Flask(__name__)
monkeypatch.setattr(state.secrets, "choice", lambda alphabet: "A")
with app.test_request_context(
"/request",
json={
"username": " alice ",
"email": " alice@example.com ",
"note": " hello ",
"first_name": " Alice ",
"last_name": " Atlas ",
},
headers={"X-Forwarded-For": "203.0.113.10, 10.0.0.1"},
):
assert state._extract_request_payload() == (
"alice",
"alice@example.com",
"hello",
"Alice",
"Atlas",
)
assert state._client_ip() == "203.0.113.10"
with app.test_request_context("/request", headers={"X-Real-IP": "198.51.100.5"}):
assert state._client_ip() == "198.51.100.5"
with app.test_request_context("/request", environ_base={"REMOTE_ADDR": "192.0.2.10"}):
assert state._client_ip() == "192.0.2.10"
assert state._normalize_name(" Alice Atlas ") == "Alice Atlas"
assert state._validate_name("", label="last name", required=True) == "last name is required"
assert state._validate_name("A" * 81, label="last name", required=True) == "last name must be 1-80 characters"
assert state._validate_name("Alice\tAtlas", label="last name", required=True) == "last name contains invalid whitespace"
assert state._validate_name("", label="first name", required=False) is None
assert state._validate_name("Alice Atlas", label="last name", required=True) is None
assert state._validate_username("") == "username is required"
assert state._validate_username("ab") == "username must be 3-32 characters"
assert state._validate_username("bad name") == "username contains invalid characters"
assert state._validate_username("alice_ok-1") is None
assert state._random_request_code("alice") == "alice~AAAAAAAAAA"
def test_verification_url_email_and_error_paths(monkeypatch) -> None:
sent: dict[str, str] = {}
def fake_send_text_email(*, to_addr: str, subject: str, body: str) -> None:
sent.update({"to": to_addr, "subject": subject, "body": body})
monkeypatch.setattr(state.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev/")
monkeypatch.setattr(state, "send_text_email", fake_send_text_email)
state._send_verification_email(request_code="alice CODE", email="alice@example.dev", token="tok/1")
assert sent["to"] == "alice@example.dev"
assert "confirm your email" in sent["subject"]
assert "alice%20CODE" in sent["body"]
assert "tok/1" not in sent["body"]
with pytest.raises(state.VerificationError) as missing:
state._verify_request(DummyConn(), "missing", "tok")
assert missing.value.status_code == 404
non_pending = DummyConn(rows_by_query={"SELECT status": {"status": "approved"}})
assert state._verify_request(non_pending, "code", "tok") == "accounts_building"
no_hash = DummyConn(rows_by_query={"SELECT status": {"status": state.EMAIL_VERIFY_PENDING_STATUS}})
with pytest.raises(state.VerificationError) as no_token:
state._verify_request(no_hash, "code", "tok")
assert no_token.value.status_code == 409
bad_hash = DummyConn(
rows_by_query={
"SELECT status": {
"status": state.EMAIL_VERIFY_PENDING_STATUS,
"email_verification_token_hash": state._hash_verification_token("other"),
}
}
)
with pytest.raises(state.VerificationError) as invalid:
state._verify_request(bad_hash, "code", "tok")
assert invalid.value.status_code == 401
expired_at = datetime.now() - timedelta(seconds=state.settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC + 5)
expired = DummyConn(
rows_by_query={
"SELECT status": {
"status": state.EMAIL_VERIFY_PENDING_STATUS,
"email_verification_token_hash": state._hash_verification_token("tok"),
"email_verification_sent_at": expired_at,
}
}
)
with pytest.raises(state.VerificationError) as expired_error:
state._verify_request(expired, "code", "tok")
assert expired_error.value.status_code == 410
success = DummyConn(
rows_by_query={
"SELECT status": {
"status": state.EMAIL_VERIFY_PENDING_STATUS,
"email_verification_token_hash": state._hash_verification_token("tok"),
"email_verification_sent_at": datetime.now(timezone.utc),
}
}
)
assert state._verify_request(success, "code", "tok") == "pending"
assert any("UPDATE access_requests" in query for query, _ in success.executed)
def test_onboarding_flags_groups_and_recovery_email(monkeypatch) -> None:
conn = DummyConn(
rows_by_query={
"SELECT approval_flags": {
"approval_flags": [state.VAULTWARDEN_GRANDFATHERED_FLAG, "", 7],
"contact_email": " contact@example.dev ",
}
},
many_by_query={
"SELECT step FROM access_request_onboarding_steps": [
{"step": "keycloak_password_rotated"},
{"step": ""},
{"step": 7},
]
},
)
assert state._fetch_completed_onboarding_steps(conn, "code") == {"keycloak_password_rotated"}
assert state._normalize_flag_list("one") == {"one"}
assert state._normalize_flag_list(["one", "", 2]) == {"one"}
assert state._normalize_flag_list(None) == set()
assert state._fetch_request_flags_and_email(conn, "code") == (
{state.VAULTWARDEN_GRANDFATHERED_FLAG},
"contact@example.dev",
)
assert state._fetch_request_flags_and_email(DummyConn(), "missing") == (set(), "")
assert state._vaultwarden_grandfathered(conn, "code", "alice") == (True, "contact@example.dev")
admin = DummyAdmin(groups=[state.VAULTWARDEN_GRANDFATHERED_FLAG], full={"email": "real@example.dev"})
monkeypatch.setattr(state, "admin_client", lambda: admin)
assert state._user_in_group("", "group") is False
assert state._user_in_group("alice", "") is False
assert state._user_in_group("alice", state.VAULTWARDEN_GRANDFATHERED_FLAG) is True
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (True, "")
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "real@example.dev"
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
assert state._user_in_group("alice", "group") is False
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
assert state._user_in_group("alice", "group") is False
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (False, "")
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
assert state._user_in_group("alice", "group") is False
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
def test_keycloak_rotation_and_auto_completed_steps(monkeypatch) -> None:
conn = DummyConn(rows_by_query={"SELECT 1": {"exists": True}})
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {"mailu_app_password": ["pw"]}})
monkeypatch.setattr(state, "admin_client", lambda: admin)
state._request_keycloak_password_rotation(conn, "code", "alice")
assert admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})]
assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in conn.executed)
assert state._password_rotation_requested(conn, "code") is True
with pytest.raises(ValueError):
state._request_keycloak_password_rotation(conn, "code", "")
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
with pytest.raises(RuntimeError):
state._request_keycloak_password_rotation(conn, "code", "alice")
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
with pytest.raises(RuntimeError):
state._request_keycloak_password_rotation(conn, "code", "alice")
attrs = {
"vaultwarden_status": ["active"],
"nextcloud_mail_synced_at": ["now"],
"firefly_password_rotated_at": "now",
"wger_password_rotated_at": "now",
}
assert state._extract_attr({"key": ["", "value"]}, "key") == "value"
assert state._extract_attr({"key": "value"}, "key") == "value"
assert state._extract_attr({"key": ["", 7]}, "key") == ""
assert state._extract_attr([], "key") == ""
assert state._auto_completed_service_steps(attrs) == {
"vaultwarden_master_password",
"nextcloud_mail_integration",
"firefly_password_rotated",
"wger_password_rotated",
}
assert state._auto_completed_service_steps([]) == set()
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": attrs})
monkeypatch.setattr(state, "admin_client", lambda: admin)
completed = state._auto_completed_keycloak_steps(conn, "code", "alice")
assert "keycloak_password_rotated" in completed
assert admin.updated[-1] == ("user-1", {"requiredActions": []})
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
fallback_admin = DummyAdmin(
user={
"id": "user-1",
"requiredActions": [],
"attributes": {"vaultwarden_master_password_set_at": ["now"]},
},
fail_get=True,
)
monkeypatch.setattr(state, "admin_client", lambda: fallback_admin)
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {
"keycloak_password_rotated",
"vaultwarden_master_password",
}
update_fails = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {}}, fail_update=True)
monkeypatch.setattr(state, "admin_client", lambda: update_fails)
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {"keycloak_password_rotated"}
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
assert state._auto_completed_keycloak_steps(conn, "", "alice") == set()
assert state._auto_completed_keycloak_steps(conn, "code", "") == set()
def test_vaultwarden_status_and_automation_readiness(monkeypatch) -> None:
ready_admin = DummyAdmin(full={"attributes": {"vaultwarden_status": ["grandfathered"], "mailu_app_password": ["pw"]}})
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
assert state._vaultwarden_status_for_user("") == ""
assert state._vaultwarden_status_for_user("alice") == "grandfathered"
assert state._automation_ready(DummyConn(), "code", "alice") is True
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
assert state._vaultwarden_status_for_user("alice") == ""
assert state._automation_ready(DummyConn(), "code", "alice") is False
assert state._automation_ready(DummyConn(), "code", "") is False
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
task_conn = DummyConn(rows_by_query={"SELECT 1 FROM access_request_tasks": {"exists": True}})
monkeypatch.setattr(state, "provision_tasks_complete", lambda conn, code: True)
assert state._automation_ready(task_conn, "code", "alice") is True
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user=None))
assert state._automation_ready(DummyConn(), "code", "alice") is False
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
assert state._automation_ready(DummyConn(), "code", "alice") is False
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
assert state._vaultwarden_status_for_user("alice") == ""
assert state._automation_ready(DummyConn(), "code", "alice") is False
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(full={"attributes": "bad"}))
assert state._automation_ready(DummyConn(), "code", "alice") is False
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
assert state._vaultwarden_status_for_user("alice") == ""
assert state._automation_ready(DummyConn(), "code", "alice") is False
def test_status_transitions_and_payload(monkeypatch) -> None:
conn = DummyConn(rows_by_query={"SELECT approval_flags": {"approval_flags": [], "contact_email": "owner@example.dev"}})
monkeypatch.setattr(state, "_automation_ready", lambda conn, code, username: True)
assert state._advance_status(conn, "code", "alice", "approved") == "awaiting_onboarding"
assert state._advance_status(conn, "code", "alice", "pending") == "pending"
required = set(state.ONBOARDING_REQUIRED_STEPS)
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required)
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
required_with_vault = required | {"vaultwarden_store_temp_password"}
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required_with_vault)
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
monkeypatch.setattr(state, "_password_rotation_requested", lambda conn, code: True)
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
payload = state._onboarding_payload(conn, "code", "alice")
assert payload["keycloak"]["password_rotation_requested"] is True
assert payload["vaultwarden"]["grandfathered"] is False
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
monkeypatch.setattr(state, "_resolve_recovery_email", lambda username, fallback: fallback)
payload = state._onboarding_payload(conn, "code", "alice")
assert "vaultwarden_store_temp_password" in payload["required_steps"]
assert payload["vaultwarden"]["matched"] is True
def test_completed_onboarding_steps_merges_manual_and_auto(monkeypatch) -> None:
manual_conn = DummyConn(many_by_query={"SELECT step FROM access_request_onboarding_steps": [{"step": "manual"}]})
monkeypatch.setattr(state, "_auto_completed_keycloak_steps", lambda conn, code, username: {"auto"})
assert state._completed_onboarding_steps(manual_conn, "code", "alice") == {"manual", "auto"}