portal: support verification resend #9

Merged
bstein merged 1 commits from feature/ariadne-integration-portal into master 2026-01-21 23:22:02 +00:00
3 changed files with 294 additions and 12 deletions

View File

@ -84,6 +84,15 @@ def _verify_url(request_code: str, token: str) -> str:
return f"{base}/request-access?code={quote(request_code)}&verify={quote(token)}"
def _send_verification_email(*, request_code: str, email: str, token: str) -> None:
verify_url = _verify_url(request_code, token)
send_text_email(
to_addr=email,
subject="Atlas: confirm your email",
body=access_request_verification_body(request_code=request_code, verify_url=verify_url),
)
ONBOARDING_STEPS: tuple[str, ...] = (
"vaultwarden_master_password",
"keycloak_password_rotated",
@ -461,13 +470,8 @@ def register(app) -> None:
),
)
verify_url = _verify_url(request_code, token)
try:
send_text_email(
to_addr=email,
subject="Atlas: confirm your email",
body=access_request_verification_body(request_code=request_code, verify_url=verify_url),
)
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return (
jsonify({"error": "failed to send verification email", "request_code": request_code}),
@ -514,13 +518,8 @@ def register(app) -> None:
raise
return jsonify({"ok": True, "request_code": existing["request_code"], "status": existing["status"]})
verify_url = _verify_url(request_code, token)
try:
send_text_email(
to_addr=email,
subject="Atlas: confirm your email",
body=access_request_verification_body(request_code=request_code, verify_url=verify_url),
)
_send_verification_email(request_code=request_code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": request_code}), 502
except Exception:
@ -608,6 +607,76 @@ def register(app) -> None:
except Exception:
return jsonify({"error": "failed to verify"}), 502
@app.route("/api/access/request/resend", methods=["POST"])
def request_access_resend() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:
return jsonify({"error": "request access disabled"}), 503
if not configured():
return jsonify({"error": "server not configured"}), 503
ip = _client_ip()
if not rate_limit_allow(
ip,
key="access_request_resend",
limit=30,
window_sec=60,
):
return jsonify({"error": "rate limited"}), 429
payload = request.get_json(silent=True) or {}
code = (payload.get("request_code") or payload.get("code") or "").strip()
if not code:
return jsonify({"error": "request_code is required"}), 400
if not rate_limit_allow(
f"{ip}:{code}",
key="access_request_resend_code",
limit=10,
window_sec=300,
):
return jsonify({"error": "rate limited"}), 429
try:
with connect() as conn:
row = conn.execute(
"""
SELECT status, contact_email
FROM access_requests
WHERE request_code = %s
""",
(code,),
).fetchone()
if not row:
return jsonify({"error": "not found"}), 404
status = _normalize_status(row.get("status") or "")
if status != EMAIL_VERIFY_PENDING_STATUS:
return jsonify({"ok": True, "status": status})
email = str(row.get("contact_email") or "").strip()
if not email:
return jsonify({"error": "missing email"}), 409
token = secrets.token_urlsafe(24)
token_hash = _hash_verification_token(token)
conn.execute(
"""
UPDATE access_requests
SET email_verification_token_hash = %s,
email_verification_sent_at = NOW()
WHERE request_code = %s AND status = %s
""",
(token_hash, code, EMAIL_VERIFY_PENDING_STATUS),
)
try:
_send_verification_email(request_code=code, email=email, token=token)
except MailerError:
return jsonify({"error": "failed to send verification email", "request_code": code}), 502
return jsonify({"ok": True, "status": EMAIL_VERIFY_PENDING_STATUS})
except Exception:
return jsonify({"error": "failed to resend verification"}), 502
@app.route("/api/access/request/status", methods=["POST"])
def request_access_status() -> Any:
if not settings.ACCESS_REQUEST_ENABLED:

View File

@ -0,0 +1,181 @@
from __future__ import annotations
import json
from contextlib import contextmanager
from unittest import TestCase, mock
from atlas_portal.app_factory import create_app
from atlas_portal.routes import access_requests as ar
class DummyResult:
def __init__(self, row=None):
self._row = row
def fetchone(self):
return self._row
def fetchall(self):
return []
class DummyConn:
def __init__(self, rows_by_query=None):
self._rows_by_query = rows_by_query or {}
self.executed = []
def execute(self, query, params=None):
self.executed.append((query, params))
for key, row in self._rows_by_query.items():
if key in query:
return DummyResult(row)
return DummyResult()
class DummyAdmin:
def ready(self):
return False
def find_user(self, username):
return None
def find_user_by_email(self, email):
return None
@contextmanager
def dummy_connect(rows_by_query=None):
yield DummyConn(rows_by_query=rows_by_query)
class AccessRequestTests(TestCase):
@classmethod
def setUpClass(cls):
cls.schema_patch = mock.patch("atlas_portal.app_factory.ensure_schema", lambda: None)
cls.schema_patch.start()
cls.app = create_app()
cls.client = cls.app.test_client()
@classmethod
def tearDownClass(cls):
cls.schema_patch.stop()
def setUp(self):
self.configured_patch = mock.patch.object(ar, "configured", lambda: True)
self.rate_patch = mock.patch.object(ar, "rate_limit_allow", lambda *args, **kwargs: True)
self.admin_patch = mock.patch.object(ar, "admin_client", lambda: DummyAdmin())
self.configured_patch.start()
self.rate_patch.start()
self.admin_patch.start()
def tearDown(self):
self.configured_patch.stop()
self.rate_patch.stop()
self.admin_patch.stop()
def test_request_access_requires_last_name(self):
with mock.patch.object(ar, "connect", lambda: dummy_connect()):
resp = self.client.post(
"/api/access/request",
data=json.dumps(
{
"username": "alice",
"email": "alice@example.com",
"first_name": "Alice",
"last_name": "",
"note": "",
}
),
content_type="application/json",
)
data = resp.get_json()
self.assertEqual(resp.status_code, 400)
self.assertIn("last name is required", data.get("error", ""))
def test_request_access_sends_verification_email(self):
sent = {}
def fake_send_email(*, request_code, email, token):
sent["request_code"] = request_code
sent["email"] = email
with (
mock.patch.object(ar, "_random_request_code", lambda username: f"{username}~CODE123"),
mock.patch.object(ar, "_send_verification_email", fake_send_email),
mock.patch.object(ar, "connect", lambda: dummy_connect()),
):
resp = self.client.post(
"/api/access/request",
data=json.dumps(
{
"username": "alice",
"email": "alice@example.com",
"first_name": "Alice",
"last_name": "Atlas",
"note": "",
}
),
content_type="application/json",
)
data = resp.get_json()
self.assertEqual(resp.status_code, 200)
self.assertEqual(data.get("request_code"), "alice~CODE123")
self.assertEqual(data.get("status"), "pending_email_verification")
self.assertEqual(sent.get("request_code"), "alice~CODE123")
self.assertEqual(sent.get("email"), "alice@example.com")
def test_request_access_email_failure_returns_request_code(self):
def fake_send_email(*, request_code, email, token):
raise ar.MailerError("failed")
with (
mock.patch.object(ar, "_random_request_code", lambda username: f"{username}~CODE123"),
mock.patch.object(ar, "_send_verification_email", fake_send_email),
mock.patch.object(ar, "connect", lambda: dummy_connect()),
):
resp = self.client.post(
"/api/access/request",
data=json.dumps(
{
"username": "alice",
"email": "alice@example.com",
"first_name": "Alice",
"last_name": "Atlas",
"note": "",
}
),
content_type="application/json",
)
data = resp.get_json()
self.assertEqual(resp.status_code, 502)
self.assertEqual(data.get("request_code"), "alice~CODE123")
self.assertIn("failed to send verification email", data.get("error", ""))
def test_request_access_resend_sends_email(self):
sent = {}
def fake_send_email(*, request_code, email, token):
sent["request_code"] = request_code
sent["email"] = email
rows = {
"SELECT status, contact_email": {
"status": "pending_email_verification",
"contact_email": "alice@example.com",
}
}
with (
mock.patch.object(ar, "_send_verification_email", fake_send_email),
mock.patch.object(ar, "connect", lambda: dummy_connect(rows)),
):
resp = self.client.post(
"/api/access/request/resend",
data=json.dumps({"request_code": "alice~CODE123"}),
content_type="application/json",
)
data = resp.get_json()
self.assertEqual(resp.status_code, 200)
self.assertEqual(data.get("status"), "pending_email_verification")
self.assertEqual(sent.get("request_code"), "alice~CODE123")
self.assertEqual(sent.get("email"), "alice@example.com")

View File

@ -148,6 +148,13 @@
Verifying email
</div>
<div v-if="status === 'pending_email_verification'" class="actions" style="margin-top: 10px;">
<button class="pill mono" type="button" :disabled="resending" @click="resendVerification">
{{ resending ? "Resending..." : "Resend verification email" }}
</button>
<span v-if="resendMessage" class="hint mono">{{ resendMessage }}</span>
</div>
<div v-if="tasks.length" class="task-box">
<div class="module-head" style="margin-bottom: 10px;">
<h2>Automation</h2>
@ -248,6 +255,8 @@ const status = ref("");
const onboardingUrl = ref("");
const tasks = ref([]);
const blocked = ref(false);
const resending = ref(false);
const resendMessage = ref("");
function taskPillClass(status) {
const key = (status || "").trim();
@ -480,6 +489,29 @@ async function verifyFromLink(code, token) {
}
}
async function resendVerification() {
if (resending.value) return;
const code = statusForm.request_code.trim();
if (!code) return;
resending.value = true;
resendMessage.value = "";
try {
const resp = await fetch("/api/access/request/resend", {
method: "POST",
headers: { "Content-Type": "application/json" },
cache: "no-store",
body: JSON.stringify({ request_code: code }),
});
const data = await resp.json().catch(() => ({}));
if (!resp.ok) throw new Error(data.error || resp.statusText || `status ${resp.status}`);
resendMessage.value = "Verification email sent.";
} catch (err) {
resendMessage.value = err?.message || "Failed to resend verification email.";
} finally {
resending.value = false;
}
}
onMounted(async () => {
const code = typeof route.query.code === "string" ? route.query.code.trim() : "";
const token = typeof route.query.verify === "string" ? route.query.verify.trim() : "";