ariadne/tests/unit/manager/test_provisioning_failure_modes.py

337 lines
12 KiB
Python
Raw Permalink Normal View History

from tests.unit.manager.provisioning_helpers import *
def test_provisioning_missing_verified_email(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def find_user(self, username):
return None
monkeypatch.setattr(prov, "keycloak_admin", Admin())
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": None,
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_MISSING")
assert outcome.status == "accounts_building"
def test_provisioning_email_conflict(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def find_user(self, username):
return None
def find_user_by_email(self, email):
return {"username": "other"}
monkeypatch.setattr(prov, "keycloak_admin", Admin())
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_CONFLICT")
assert outcome.status == "accounts_building"
def test_provisioning_missing_contact_email(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def find_user(self, username):
return None
monkeypatch.setattr(prov, "keycloak_admin", Admin())
row = {
"username": "alice",
"contact_email": "",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_EMPTY")
assert outcome.status == "accounts_building"
def test_provisioning_user_id_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def find_user(self, username):
return {"id": ""}
monkeypatch.setattr(prov, "keycloak_admin", Admin())
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_ID")
assert outcome.status == "accounts_building"
def test_provisioning_initial_password_revealed(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": None,
"initial_password_revealed_at": datetime.now(timezone.utc),
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_REVEALED")
assert outcome.status == "accounts_building"
def test_provisioning_vaultwarden_attribute_failure(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def set_user_attribute(self, username, key, value):
raise RuntimeError("fail")
monkeypatch.setattr(prov, "keycloak_admin", Admin())
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"})
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_VAULT")
assert outcome.status == "accounts_building"
def test_provisioning_vaultwarden_rate_limited(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
vaultwarden_admin_rate_limit_backoff_sec=600.0,
provision_retry_cooldown_sec=0.0,
)
monkeypatch.setattr(prov, "settings", dummy_settings)
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda *_args, **_kwargs: True)
monkeypatch.setattr(
prov.vaultwarden,
"invite_user",
lambda _email: VaultwardenInvite(False, "rate_limited", "vaultwarden rate limited"),
)
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
manager = prov.ProvisioningManager(DummyDB(row), DummyStorage())
ctx = prov.RequestContext(
request_code="REQ_RATE",
username="alice",
first_name="",
last_name="",
contact_email="alice@example.com",
email_verified_at=row["email_verified_at"],
status="accounts_building",
initial_password="temp",
revealed_at=None,
attempted_at=None,
approval_flags=[],
user_id="1",
mailu_email="alice@bstein.dev",
)
conn = DummyConn(row)
manager._ensure_vaultwarden_invite(conn, ctx)
inserts = [
params
for query, params in conn.executed
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4
]
assert any(params[2] == "pending" and "rate limited until" in (params[3] or "") for params in inserts)
def test_provisioning_vaultwarden_grandfathered(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
vaultwarden_admin_rate_limit_backoff_sec=600.0,
provision_retry_cooldown_sec=0.0,
)
monkeypatch.setattr(prov, "settings", dummy_settings)
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
monkeypatch.setattr(
prov.vaultwarden,
"find_user_by_email",
lambda _email: VaultwardenLookup(True, "present", "user found"),
)
monkeypatch.setattr(
prov.vaultwarden,
"invite_user",
lambda _email: (_ for _ in ()).throw(RuntimeError("invite should not run")),
)
row = {
"username": "legacy",
"contact_email": "legacy@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [prov.VAULTWARDEN_GRANDFATHERED_FLAG],
}
manager = prov.ProvisioningManager(DummyDB(row), DummyStorage())
ctx = prov.RequestContext(
request_code="REQ_GRANDFATHER",
username="legacy",
first_name="",
last_name="",
contact_email="legacy@example.com",
email_verified_at=row["email_verified_at"],
status="accounts_building",
initial_password="temp",
revealed_at=None,
attempted_at=None,
approval_flags=row["approval_flags"],
user_id="1",
mailu_email="legacy@bstein.dev",
)
conn = DummyConn(row)
manager._ensure_vaultwarden_invite(conn, ctx)
inserts = [
params
for query, params in conn.executed
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4
]
assert any(params[2] == "ok" and params[3] == "grandfathered" for params in inserts)