390 lines
18 KiB
Python
390 lines
18 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from flask import Flask
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from atlas_portal.routes import access_request_state as state
|
||
|
|
|
||
|
|
|
||
|
|
class DummyResult:
|
||
|
|
def __init__(self, row: dict[str, Any] | None = None, rows: list[dict[str, Any]] | None = None) -> None:
|
||
|
|
self.row = row
|
||
|
|
self.rows = rows or []
|
||
|
|
|
||
|
|
def fetchone(self) -> dict[str, Any] | None:
|
||
|
|
return self.row
|
||
|
|
|
||
|
|
def fetchall(self) -> list[dict[str, Any]]:
|
||
|
|
return self.rows
|
||
|
|
|
||
|
|
|
||
|
|
class DummyConn:
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
rows_by_query: dict[str, dict[str, Any] | None] | None = None,
|
||
|
|
many_by_query: dict[str, list[dict[str, Any]]] | None = None,
|
||
|
|
) -> None:
|
||
|
|
self.rows_by_query = rows_by_query or {}
|
||
|
|
self.many_by_query = many_by_query or {}
|
||
|
|
self.executed: list[tuple[str, object | None]] = []
|
||
|
|
|
||
|
|
def execute(self, query: str, params: object | None = None) -> DummyResult:
|
||
|
|
self.executed.append((query, params))
|
||
|
|
for key, rows in self.many_by_query.items():
|
||
|
|
if key in query:
|
||
|
|
return DummyResult(rows=rows)
|
||
|
|
for key, row in self.rows_by_query.items():
|
||
|
|
if key in query:
|
||
|
|
return DummyResult(row=row)
|
||
|
|
return DummyResult()
|
||
|
|
|
||
|
|
|
||
|
|
class DummyAdmin:
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
ready: bool = True,
|
||
|
|
user: dict[str, Any] | None = None,
|
||
|
|
full: dict[str, Any] | None = None,
|
||
|
|
groups: list[str] | None = None,
|
||
|
|
fail_find: bool = False,
|
||
|
|
fail_get: bool = False,
|
||
|
|
fail_update: bool = False,
|
||
|
|
) -> None:
|
||
|
|
self._ready = ready
|
||
|
|
self.user = user if user is not None else {"id": "user-1"}
|
||
|
|
self.full = full if full is not None else {}
|
||
|
|
self.groups = groups or []
|
||
|
|
self.fail_find = fail_find
|
||
|
|
self.fail_get = fail_get
|
||
|
|
self.fail_update = fail_update
|
||
|
|
self.updated: list[tuple[str, dict[str, Any]]] = []
|
||
|
|
|
||
|
|
def ready(self) -> bool:
|
||
|
|
return self._ready
|
||
|
|
|
||
|
|
def find_user(self, username: str) -> dict[str, Any] | None:
|
||
|
|
if self.fail_find:
|
||
|
|
raise RuntimeError("lookup failed")
|
||
|
|
return self.user
|
||
|
|
|
||
|
|
def list_user_groups(self, user_id: str) -> list[str]:
|
||
|
|
return self.groups
|
||
|
|
|
||
|
|
def get_user(self, user_id: str) -> dict[str, Any]:
|
||
|
|
if self.fail_get:
|
||
|
|
raise RuntimeError("get failed")
|
||
|
|
return self.full
|
||
|
|
|
||
|
|
def update_user_safe(self, user_id: str, payload: dict[str, Any]) -> None:
|
||
|
|
if self.fail_update:
|
||
|
|
raise RuntimeError("update failed")
|
||
|
|
self.updated.append((user_id, payload))
|
||
|
|
|
||
|
|
|
||
|
|
def test_request_payload_names_username_and_client_ip(monkeypatch) -> None:
|
||
|
|
app = Flask(__name__)
|
||
|
|
monkeypatch.setattr(state.secrets, "choice", lambda alphabet: "A")
|
||
|
|
|
||
|
|
with app.test_request_context(
|
||
|
|
"/request",
|
||
|
|
json={
|
||
|
|
"username": " alice ",
|
||
|
|
"email": " alice@example.com ",
|
||
|
|
"note": " hello ",
|
||
|
|
"first_name": " Alice ",
|
||
|
|
"last_name": " Atlas ",
|
||
|
|
},
|
||
|
|
headers={"X-Forwarded-For": "203.0.113.10, 10.0.0.1"},
|
||
|
|
):
|
||
|
|
assert state._extract_request_payload() == (
|
||
|
|
"alice",
|
||
|
|
"alice@example.com",
|
||
|
|
"hello",
|
||
|
|
"Alice",
|
||
|
|
"Atlas",
|
||
|
|
)
|
||
|
|
assert state._client_ip() == "203.0.113.10"
|
||
|
|
|
||
|
|
with app.test_request_context("/request", headers={"X-Real-IP": "198.51.100.5"}):
|
||
|
|
assert state._client_ip() == "198.51.100.5"
|
||
|
|
with app.test_request_context("/request", environ_base={"REMOTE_ADDR": "192.0.2.10"}):
|
||
|
|
assert state._client_ip() == "192.0.2.10"
|
||
|
|
|
||
|
|
assert state._normalize_name(" Alice Atlas ") == "Alice Atlas"
|
||
|
|
assert state._validate_name("", label="last name", required=True) == "last name is required"
|
||
|
|
assert state._validate_name("A" * 81, label="last name", required=True) == "last name must be 1-80 characters"
|
||
|
|
assert state._validate_name("Alice\tAtlas", label="last name", required=True) == "last name contains invalid whitespace"
|
||
|
|
assert state._validate_name("", label="first name", required=False) is None
|
||
|
|
assert state._validate_name("Alice Atlas", label="last name", required=True) is None
|
||
|
|
assert state._validate_username("") == "username is required"
|
||
|
|
assert state._validate_username("ab") == "username must be 3-32 characters"
|
||
|
|
assert state._validate_username("bad name") == "username contains invalid characters"
|
||
|
|
assert state._validate_username("alice_ok-1") is None
|
||
|
|
assert state._random_request_code("alice") == "alice~AAAAAAAAAA"
|
||
|
|
|
||
|
|
|
||
|
|
def test_verification_url_email_and_error_paths(monkeypatch) -> None:
|
||
|
|
sent: dict[str, str] = {}
|
||
|
|
|
||
|
|
def fake_send_text_email(*, to_addr: str, subject: str, body: str) -> None:
|
||
|
|
sent.update({"to": to_addr, "subject": subject, "body": body})
|
||
|
|
|
||
|
|
monkeypatch.setattr(state.settings, "PORTAL_PUBLIC_BASE_URL", "https://portal.example.dev/")
|
||
|
|
monkeypatch.setattr(state, "send_text_email", fake_send_text_email)
|
||
|
|
state._send_verification_email(request_code="alice CODE", email="alice@example.dev", token="tok/1")
|
||
|
|
|
||
|
|
assert sent["to"] == "alice@example.dev"
|
||
|
|
assert "confirm your email" in sent["subject"]
|
||
|
|
assert "alice%20CODE" in sent["body"]
|
||
|
|
assert "tok/1" not in sent["body"]
|
||
|
|
|
||
|
|
with pytest.raises(state.VerificationError) as missing:
|
||
|
|
state._verify_request(DummyConn(), "missing", "tok")
|
||
|
|
assert missing.value.status_code == 404
|
||
|
|
|
||
|
|
non_pending = DummyConn(rows_by_query={"SELECT status": {"status": "approved"}})
|
||
|
|
assert state._verify_request(non_pending, "code", "tok") == "accounts_building"
|
||
|
|
|
||
|
|
no_hash = DummyConn(rows_by_query={"SELECT status": {"status": state.EMAIL_VERIFY_PENDING_STATUS}})
|
||
|
|
with pytest.raises(state.VerificationError) as no_token:
|
||
|
|
state._verify_request(no_hash, "code", "tok")
|
||
|
|
assert no_token.value.status_code == 409
|
||
|
|
|
||
|
|
bad_hash = DummyConn(
|
||
|
|
rows_by_query={
|
||
|
|
"SELECT status": {
|
||
|
|
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||
|
|
"email_verification_token_hash": state._hash_verification_token("other"),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
)
|
||
|
|
with pytest.raises(state.VerificationError) as invalid:
|
||
|
|
state._verify_request(bad_hash, "code", "tok")
|
||
|
|
assert invalid.value.status_code == 401
|
||
|
|
|
||
|
|
expired_at = datetime.now() - timedelta(seconds=state.settings.ACCESS_REQUEST_EMAIL_VERIFY_TTL_SEC + 5)
|
||
|
|
expired = DummyConn(
|
||
|
|
rows_by_query={
|
||
|
|
"SELECT status": {
|
||
|
|
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||
|
|
"email_verification_token_hash": state._hash_verification_token("tok"),
|
||
|
|
"email_verification_sent_at": expired_at,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
)
|
||
|
|
with pytest.raises(state.VerificationError) as expired_error:
|
||
|
|
state._verify_request(expired, "code", "tok")
|
||
|
|
assert expired_error.value.status_code == 410
|
||
|
|
|
||
|
|
success = DummyConn(
|
||
|
|
rows_by_query={
|
||
|
|
"SELECT status": {
|
||
|
|
"status": state.EMAIL_VERIFY_PENDING_STATUS,
|
||
|
|
"email_verification_token_hash": state._hash_verification_token("tok"),
|
||
|
|
"email_verification_sent_at": datetime.now(timezone.utc),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
)
|
||
|
|
assert state._verify_request(success, "code", "tok") == "pending"
|
||
|
|
assert any("UPDATE access_requests" in query for query, _ in success.executed)
|
||
|
|
|
||
|
|
|
||
|
|
def test_onboarding_flags_groups_and_recovery_email(monkeypatch) -> None:
|
||
|
|
conn = DummyConn(
|
||
|
|
rows_by_query={
|
||
|
|
"SELECT approval_flags": {
|
||
|
|
"approval_flags": [state.VAULTWARDEN_GRANDFATHERED_FLAG, "", 7],
|
||
|
|
"contact_email": " contact@example.dev ",
|
||
|
|
}
|
||
|
|
},
|
||
|
|
many_by_query={
|
||
|
|
"SELECT step FROM access_request_onboarding_steps": [
|
||
|
|
{"step": "keycloak_password_rotated"},
|
||
|
|
{"step": ""},
|
||
|
|
{"step": 7},
|
||
|
|
]
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert state._fetch_completed_onboarding_steps(conn, "code") == {"keycloak_password_rotated"}
|
||
|
|
assert state._normalize_flag_list("one") == {"one"}
|
||
|
|
assert state._normalize_flag_list(["one", "", 2]) == {"one"}
|
||
|
|
assert state._normalize_flag_list(None) == set()
|
||
|
|
assert state._fetch_request_flags_and_email(conn, "code") == (
|
||
|
|
{state.VAULTWARDEN_GRANDFATHERED_FLAG},
|
||
|
|
"contact@example.dev",
|
||
|
|
)
|
||
|
|
assert state._fetch_request_flags_and_email(DummyConn(), "missing") == (set(), "")
|
||
|
|
assert state._vaultwarden_grandfathered(conn, "code", "alice") == (True, "contact@example.dev")
|
||
|
|
|
||
|
|
admin = DummyAdmin(groups=[state.VAULTWARDEN_GRANDFATHERED_FLAG], full={"email": "real@example.dev"})
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||
|
|
|
||
|
|
assert state._user_in_group("", "group") is False
|
||
|
|
assert state._user_in_group("alice", "") is False
|
||
|
|
assert state._user_in_group("alice", state.VAULTWARDEN_GRANDFATHERED_FLAG) is True
|
||
|
|
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (True, "")
|
||
|
|
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "real@example.dev"
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||
|
|
assert state._user_in_group("alice", "group") is False
|
||
|
|
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||
|
|
assert state._user_in_group("alice", "group") is False
|
||
|
|
assert state._vaultwarden_grandfathered(DummyConn(), "missing", "alice") == (False, "")
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||
|
|
assert state._user_in_group("alice", "group") is False
|
||
|
|
assert state._resolve_recovery_email("alice", "fallback@example.dev") == "fallback@example.dev"
|
||
|
|
|
||
|
|
|
||
|
|
def test_keycloak_rotation_and_auto_completed_steps(monkeypatch) -> None:
|
||
|
|
conn = DummyConn(rows_by_query={"SELECT 1": {"exists": True}})
|
||
|
|
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {"mailu_app_password": ["pw"]}})
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||
|
|
|
||
|
|
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||
|
|
assert admin.updated == [("user-1", {"requiredActions": ["CONFIGURE_TOTP", "UPDATE_PASSWORD"]})]
|
||
|
|
assert any("INSERT INTO access_request_onboarding_artifacts" in query for query, _ in conn.executed)
|
||
|
|
assert state._password_rotation_requested(conn, "code") is True
|
||
|
|
|
||
|
|
with pytest.raises(ValueError):
|
||
|
|
state._request_keycloak_password_rotation(conn, "code", "")
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||
|
|
with pytest.raises(RuntimeError):
|
||
|
|
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
|
||
|
|
with pytest.raises(RuntimeError):
|
||
|
|
state._request_keycloak_password_rotation(conn, "code", "alice")
|
||
|
|
|
||
|
|
attrs = {
|
||
|
|
"vaultwarden_status": ["active"],
|
||
|
|
"nextcloud_mail_synced_at": ["now"],
|
||
|
|
"firefly_password_rotated_at": "now",
|
||
|
|
"wger_password_rotated_at": "now",
|
||
|
|
}
|
||
|
|
assert state._extract_attr({"key": ["", "value"]}, "key") == "value"
|
||
|
|
assert state._extract_attr({"key": "value"}, "key") == "value"
|
||
|
|
assert state._extract_attr({"key": ["", 7]}, "key") == ""
|
||
|
|
assert state._extract_attr([], "key") == ""
|
||
|
|
assert state._auto_completed_service_steps(attrs) == {
|
||
|
|
"vaultwarden_master_password",
|
||
|
|
"nextcloud_mail_integration",
|
||
|
|
"firefly_password_rotated",
|
||
|
|
"wger_password_rotated",
|
||
|
|
}
|
||
|
|
assert state._auto_completed_service_steps([]) == set()
|
||
|
|
|
||
|
|
admin = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": attrs})
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: admin)
|
||
|
|
completed = state._auto_completed_keycloak_steps(conn, "code", "alice")
|
||
|
|
assert "keycloak_password_rotated" in completed
|
||
|
|
assert admin.updated[-1] == ("user-1", {"requiredActions": []})
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||
|
|
|
||
|
|
fallback_admin = DummyAdmin(
|
||
|
|
user={
|
||
|
|
"id": "user-1",
|
||
|
|
"requiredActions": [],
|
||
|
|
"attributes": {"vaultwarden_master_password_set_at": ["now"]},
|
||
|
|
},
|
||
|
|
fail_get=True,
|
||
|
|
)
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: fallback_admin)
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {
|
||
|
|
"keycloak_password_rotated",
|
||
|
|
"vaultwarden_master_password",
|
||
|
|
}
|
||
|
|
|
||
|
|
update_fails = DummyAdmin(full={"requiredActions": ["CONFIGURE_TOTP"], "attributes": {}}, fail_update=True)
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: update_fails)
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == {"keycloak_password_rotated"}
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "alice") == set()
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "", "alice") == set()
|
||
|
|
assert state._auto_completed_keycloak_steps(conn, "code", "") == set()
|
||
|
|
|
||
|
|
|
||
|
|
def test_vaultwarden_status_and_automation_readiness(monkeypatch) -> None:
|
||
|
|
ready_admin = DummyAdmin(full={"attributes": {"vaultwarden_status": ["grandfathered"], "mailu_app_password": ["pw"]}})
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
|
||
|
|
|
||
|
|
assert state._vaultwarden_status_for_user("") == ""
|
||
|
|
assert state._vaultwarden_status_for_user("alice") == "grandfathered"
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is True
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(ready=False))
|
||
|
|
assert state._vaultwarden_status_for_user("alice") == ""
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "") is False
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: ready_admin)
|
||
|
|
task_conn = DummyConn(rows_by_query={"SELECT 1 FROM access_request_tasks": {"exists": True}})
|
||
|
|
monkeypatch.setattr(state, "provision_tasks_complete", lambda conn, code: True)
|
||
|
|
assert state._automation_ready(task_conn, "code", "alice") is True
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user=None))
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={}))
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(user={"id": ""}))
|
||
|
|
assert state._vaultwarden_status_for_user("alice") == ""
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(full={"attributes": "bad"}))
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
monkeypatch.setattr(state, "admin_client", lambda: DummyAdmin(fail_find=True))
|
||
|
|
assert state._vaultwarden_status_for_user("alice") == ""
|
||
|
|
assert state._automation_ready(DummyConn(), "code", "alice") is False
|
||
|
|
|
||
|
|
|
||
|
|
def test_status_transitions_and_payload(monkeypatch) -> None:
|
||
|
|
conn = DummyConn(rows_by_query={"SELECT approval_flags": {"approval_flags": [], "contact_email": "owner@example.dev"}})
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "_automation_ready", lambda conn, code, username: True)
|
||
|
|
assert state._advance_status(conn, "code", "alice", "approved") == "awaiting_onboarding"
|
||
|
|
assert state._advance_status(conn, "code", "alice", "pending") == "pending"
|
||
|
|
|
||
|
|
required = set(state.ONBOARDING_REQUIRED_STEPS)
|
||
|
|
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required)
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
|
||
|
|
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
|
||
|
|
|
||
|
|
required_with_vault = required | {"vaultwarden_store_temp_password"}
|
||
|
|
monkeypatch.setattr(state, "_completed_onboarding_steps", lambda conn, code, username: required_with_vault)
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
|
||
|
|
assert state._advance_status(conn, "code", "alice", "awaiting_onboarding") == "ready"
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "_password_rotation_requested", lambda conn, code: True)
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (False, "owner@example.dev"))
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "")
|
||
|
|
payload = state._onboarding_payload(conn, "code", "alice")
|
||
|
|
assert payload["keycloak"]["password_rotation_requested"] is True
|
||
|
|
assert payload["vaultwarden"]["grandfathered"] is False
|
||
|
|
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_grandfathered", lambda conn, code, username: (True, "owner@example.dev"))
|
||
|
|
monkeypatch.setattr(state, "_vaultwarden_status_for_user", lambda username: "grandfathered")
|
||
|
|
monkeypatch.setattr(state, "_resolve_recovery_email", lambda username, fallback: fallback)
|
||
|
|
payload = state._onboarding_payload(conn, "code", "alice")
|
||
|
|
assert "vaultwarden_store_temp_password" in payload["required_steps"]
|
||
|
|
assert payload["vaultwarden"]["matched"] is True
|
||
|
|
|
||
|
|
|
||
|
|
def test_completed_onboarding_steps_merges_manual_and_auto(monkeypatch) -> None:
|
||
|
|
manual_conn = DummyConn(many_by_query={"SELECT step FROM access_request_onboarding_steps": [{"step": "manual"}]})
|
||
|
|
monkeypatch.setattr(state, "_auto_completed_keycloak_steps", lambda conn, code, username: {"auto"})
|
||
|
|
|
||
|
|
assert state._completed_onboarding_steps(manual_conn, "code", "alice") == {"manual", "auto"}
|