337 lines
12 KiB
Python
337 lines
12 KiB
Python
from tests.unit.manager.provisioning_helpers import *
|
|
|
|
|
|
def test_provisioning_missing_verified_email(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
|
|
class Admin(DummyAdmin):
|
|
def find_user(self, username):
|
|
return None
|
|
|
|
monkeypatch.setattr(prov, "keycloak_admin", Admin())
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": None,
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_MISSING")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_email_conflict(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
|
|
class Admin(DummyAdmin):
|
|
def find_user(self, username):
|
|
return None
|
|
|
|
def find_user_by_email(self, email):
|
|
return {"username": "other"}
|
|
|
|
monkeypatch.setattr(prov, "keycloak_admin", Admin())
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_CONFLICT")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_missing_contact_email(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
|
|
class Admin(DummyAdmin):
|
|
def find_user(self, username):
|
|
return None
|
|
|
|
monkeypatch.setattr(prov, "keycloak_admin", Admin())
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_EMPTY")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_user_id_missing(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
|
|
class Admin(DummyAdmin):
|
|
def find_user(self, username):
|
|
return {"id": ""}
|
|
|
|
monkeypatch.setattr(prov, "keycloak_admin", Admin())
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_ID")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_initial_password_revealed(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
|
|
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
|
|
monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
|
|
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": None,
|
|
"initial_password_revealed_at": datetime.now(timezone.utc),
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_REVEALED")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_vaultwarden_attribute_failure(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
nextcloud_namespace="",
|
|
nextcloud_mail_sync_cronjob="",
|
|
provision_retry_cooldown_sec=0.0,
|
|
default_user_groups=["dev"],
|
|
allowed_flag_groups=[],
|
|
welcome_email_enabled=False,
|
|
portal_public_base_url="https://bstein.dev",
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
|
|
|
class Admin(DummyAdmin):
|
|
def set_user_attribute(self, username, key, value):
|
|
raise RuntimeError("fail")
|
|
|
|
monkeypatch.setattr(prov, "keycloak_admin", Admin())
|
|
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
|
|
monkeypatch.setattr(prov.wger, "sync_user", lambda *args, **kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(prov.firefly, "sync_user", lambda *args, **kwargs: {"status": "ok"})
|
|
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
|
|
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
|
|
outcome = prov.ProvisioningManager(DummyDB(row), DummyStorage()).provision_access_request("REQ_VAULT")
|
|
assert outcome.status == "accounts_building"
|
|
|
|
def test_provisioning_vaultwarden_rate_limited(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
vaultwarden_admin_rate_limit_backoff_sec=600.0,
|
|
provision_retry_cooldown_sec=0.0,
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
|
|
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda *_args, **_kwargs: True)
|
|
monkeypatch.setattr(
|
|
prov.vaultwarden,
|
|
"invite_user",
|
|
lambda _email: VaultwardenInvite(False, "rate_limited", "vaultwarden rate limited"),
|
|
)
|
|
|
|
row = {
|
|
"username": "alice",
|
|
"contact_email": "alice@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [],
|
|
}
|
|
manager = prov.ProvisioningManager(DummyDB(row), DummyStorage())
|
|
ctx = prov.RequestContext(
|
|
request_code="REQ_RATE",
|
|
username="alice",
|
|
first_name="",
|
|
last_name="",
|
|
contact_email="alice@example.com",
|
|
email_verified_at=row["email_verified_at"],
|
|
status="accounts_building",
|
|
initial_password="temp",
|
|
revealed_at=None,
|
|
attempted_at=None,
|
|
approval_flags=[],
|
|
user_id="1",
|
|
mailu_email="alice@bstein.dev",
|
|
)
|
|
conn = DummyConn(row)
|
|
manager._ensure_vaultwarden_invite(conn, ctx)
|
|
|
|
inserts = [
|
|
params
|
|
for query, params in conn.executed
|
|
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4
|
|
]
|
|
assert any(params[2] == "pending" and "rate limited until" in (params[3] or "") for params in inserts)
|
|
|
|
def test_provisioning_vaultwarden_grandfathered(monkeypatch) -> None:
|
|
dummy_settings = types.SimpleNamespace(
|
|
mailu_domain="bstein.dev",
|
|
mailu_sync_url="",
|
|
mailu_mailbox_wait_timeout_sec=1.0,
|
|
vaultwarden_admin_rate_limit_backoff_sec=600.0,
|
|
provision_retry_cooldown_sec=0.0,
|
|
)
|
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
|
monkeypatch.setattr(prov, "keycloak_admin", DummyAdmin())
|
|
monkeypatch.setattr(
|
|
prov.vaultwarden,
|
|
"find_user_by_email",
|
|
lambda _email: VaultwardenLookup(True, "present", "user found"),
|
|
)
|
|
monkeypatch.setattr(
|
|
prov.vaultwarden,
|
|
"invite_user",
|
|
lambda _email: (_ for _ in ()).throw(RuntimeError("invite should not run")),
|
|
)
|
|
|
|
row = {
|
|
"username": "legacy",
|
|
"contact_email": "legacy@example.com",
|
|
"email_verified_at": datetime.now(timezone.utc),
|
|
"status": "accounts_building",
|
|
"initial_password": "temp",
|
|
"initial_password_revealed_at": None,
|
|
"provision_attempted_at": None,
|
|
"approval_flags": [prov.VAULTWARDEN_GRANDFATHERED_FLAG],
|
|
}
|
|
manager = prov.ProvisioningManager(DummyDB(row), DummyStorage())
|
|
ctx = prov.RequestContext(
|
|
request_code="REQ_GRANDFATHER",
|
|
username="legacy",
|
|
first_name="",
|
|
last_name="",
|
|
contact_email="legacy@example.com",
|
|
email_verified_at=row["email_verified_at"],
|
|
status="accounts_building",
|
|
initial_password="temp",
|
|
revealed_at=None,
|
|
attempted_at=None,
|
|
approval_flags=row["approval_flags"],
|
|
user_id="1",
|
|
mailu_email="legacy@bstein.dev",
|
|
)
|
|
conn = DummyConn(row)
|
|
manager._ensure_vaultwarden_invite(conn, ctx)
|
|
|
|
inserts = [
|
|
params
|
|
for query, params in conn.executed
|
|
if "INSERT INTO access_request_tasks" in query and isinstance(params, tuple) and len(params) >= 4
|
|
]
|
|
assert any(params[2] == "ok" and params[3] == "grandfathered" for params in inserts)
|