ariadne/tests/unit/manager/test_provisioning_run_loop.py

293 lines
12 KiB
Python

from tests.unit.manager.provisioning_helpers import *
def test_provisioning_run_loop_waits_for_admin(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: False)
class DB:
def fetchall(self, *_args, **_kwargs):
return []
class Storage:
def list_provision_candidates(self):
raise AssertionError("should not list candidates")
manager = prov.ProvisioningManager(DB(), Storage())
def fake_sleep(_):
manager._stop_event.set()
monkeypatch.setattr(prov.time, "sleep", fake_sleep)
manager._stop_event.clear()
manager._run_loop()
def test_provisioning_initial_password_missing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
admin = DummyAdmin()
monkeypatch.setattr(prov, "keycloak_admin", admin)
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
monkeypatch.setattr(prov.wger, "sync_user", lambda username, email, password, wait=True: {"status": "ok"})
monkeypatch.setattr(prov.firefly, "sync_user", lambda email, password, wait=True: {"status": "ok"})
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "awaiting_onboarding",
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
db = DummyDB(row)
storage = DummyStorage()
manager = prov.ProvisioningManager(db, storage)
outcome = manager.provision_access_request("REQ201")
assert outcome.status == "awaiting_onboarding"
def test_provisioning_group_and_mailu_errors(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="http://mailu",
mailu_mailbox_wait_timeout_sec=1.0,
nextcloud_namespace="nextcloud",
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
provision_retry_cooldown_sec=0.0,
default_user_groups=["dev"],
allowed_flag_groups=[],
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
class Admin(DummyAdmin):
def get_group_id(self, group_name: str):
return None
def set_user_attribute(self, username: str, key: str, value: str):
if key == "mailu_app_password":
raise RuntimeError("fail")
return None
admin = Admin()
monkeypatch.setattr(prov, "keycloak_admin", admin)
monkeypatch.setattr(prov.mailu, "sync", lambda reason, force=False: None)
monkeypatch.setattr(prov.mailu, "wait_for_mailbox", lambda email, timeout: True)
monkeypatch.setattr(prov.nextcloud, "sync_mail", lambda username, wait=True: (_ for _ in ()).throw(RuntimeError("fail")))
monkeypatch.setattr(prov.wger, "sync_user", lambda username, email, password, wait=True: {"status": "ok"})
monkeypatch.setattr(prov.firefly, "sync_user", lambda email, password, wait=True: {"status": "ok"})
monkeypatch.setattr(prov.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
monkeypatch.setattr(prov.ProvisioningManager, "_all_tasks_ok", lambda *args, **kwargs: False)
row = {
"username": "alice",
"contact_email": "alice@example.com",
"email_verified_at": datetime.now(timezone.utc),
"status": "accounts_building",
"initial_password": "temp",
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": [],
}
db = DummyDB(row)
storage = DummyStorage()
manager = prov.ProvisioningManager(db, storage)
outcome = manager.provision_access_request("REQ202")
assert outcome.status == "accounts_building"
def test_provisioning_task_helpers() -> None:
class Conn:
def execute(self, *_args, **_kwargs):
class Result:
def fetchall(self):
return [
{"task": "a", "status": "ok"},
{"task": "b", "status": "error"},
"bad",
]
return Result()
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
statuses = manager._task_statuses(Conn(), "REQ")
assert statuses == {"a": "ok", "b": "error"}
assert manager._all_tasks_ok(Conn(), "REQ", ["a"]) is True
assert manager._all_tasks_ok(Conn(), "REQ", ["b"]) is False
def test_provisioning_retryable_detail_detection() -> None:
manager = prov.ProvisioningManager(DummyDB({}, locked=True), DummyStorage())
assert manager._is_retryable_detail("") is False
assert manager._is_retryable_detail("timeout") is True
assert manager._is_retryable_detail("http 503: service unavailable") is True
assert manager._is_retryable_detail("mailbox not ready") is True
assert manager._is_retryable_detail("invalid credentials") is False
assert manager._retryable_detail("") == "retryable: temporary failure"
assert manager._retryable_detail("timeout").startswith("retryable:")
def test_provisioning_retry_at_parsing_and_due_state() -> None:
manager = prov.ProvisioningManager(DummyDB({}, locked=True), DummyStorage())
assert manager._parse_retry_at("") is None
assert manager._parse_retry_at("not a retry") is None
assert manager._parse_retry_at("rate limited until not-a-date") is None
assert manager._parse_retry_at("rate limited until 2099-01-01T00:00:00+0000").tzinfo is not None
class Conn:
def __init__(self, row):
self.row = row
def execute(self, *_args, **_kwargs):
return types.SimpleNamespace(fetchone=lambda: self.row)
assert manager._vaultwarden_retry_due(Conn({"status": "ok", "detail": ""}), "REQ") is True
assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": "not a retry"}), "REQ") is True
future = "rate limited until 2099-01-01T00:00:00Z"
assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": future}), "REQ") is False
past = "rate limited until 2000-01-01T00:00:00Z"
assert manager._vaultwarden_retry_due(Conn({"status": "pending", "detail": past}), "REQ") is True
def test_provisioning_ensure_task_rows_empty() -> None:
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
manager._ensure_task_rows(DummyConn({}, locked=True), "REQ", [])
def test_provisioning_record_task_ignores_storage_errors(monkeypatch) -> None:
class Storage:
def record_event(self, *args, **kwargs):
raise RuntimeError("fail")
def record_task_run(self, *args, **kwargs):
raise RuntimeError("fail")
db = DummyDB({})
manager = prov.ProvisioningManager(db, Storage())
manager._record_task("REQ", "task", "ok", None, datetime.now(timezone.utc))
def test_provisioning_send_welcome_email_variants(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
welcome_email_enabled=False,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
manager._send_welcome_email("REQ", "alice", "alice@example.com")
dummy_settings = types.SimpleNamespace(
welcome_email_enabled=True,
portal_public_base_url="https://bstein.dev",
)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
manager._send_welcome_email("REQ", "alice", "")
class DB(DummyDB):
def fetchone(self, *_args, **_kwargs):
return None
class Storage(DummyStorage):
def mark_welcome_sent(self, *args, **kwargs):
raise AssertionError("should not be called")
manager = prov.ProvisioningManager(DB({}), Storage())
monkeypatch.setattr(
prov.mailer,
"send_welcome",
lambda *args, **kwargs: (_ for _ in ()).throw(prov.MailerError("fail")),
)
manager._send_welcome_email("REQ", "alice", "alice@example.com")
def test_provisioning_start_stop(monkeypatch) -> None:
class DummyThread:
def __init__(self, target=None, name=None, daemon=None):
self.started = False
self.joined = False
def is_alive(self) -> bool:
return False
def start(self) -> None:
self.started = True
def join(self, timeout=None) -> None:
self.joined = True
monkeypatch.setattr(prov.threading, "Thread", DummyThread)
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
manager.start()
assert manager._thread.started is True
manager.stop()
assert manager._thread.joined is True
def test_provisioning_start_skips_when_running() -> None:
class LiveThread:
def __init__(self):
self.started = False
def is_alive(self) -> bool:
return True
def start(self) -> None:
self.started = True
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
manager._thread = LiveThread()
manager.start()
assert manager._thread.started is False
def test_provisioning_run_loop_skips_when_admin_not_ready(monkeypatch) -> None:
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: False)
monkeypatch.setattr(manager, "_sync_status_metrics", lambda: (_ for _ in ()).throw(RuntimeError("fail")))
monkeypatch.setattr(
prov.time,
"sleep",
lambda *_args, **_kwargs: manager._stop_event.set(),
)
manager._run_loop()
def test_provisioning_run_loop_processes_candidates(monkeypatch) -> None:
manager = prov.ProvisioningManager(DummyDB({}), DummyStorage())
monkeypatch.setattr(prov.keycloak_admin, "ready", lambda: True)
monkeypatch.setattr(manager._storage, "list_provision_candidates", lambda: [types.SimpleNamespace(request_code="REQ")])
calls: list[str] = []
monkeypatch.setattr(manager, "provision_access_request", lambda code: calls.append(code))
monkeypatch.setattr(
prov.time,
"sleep",
lambda *_args, **_kwargs: manager._stop_event.set(),
)
manager._run_loop()
assert calls == ["REQ"]
def test_provisioning_sync_status_metrics(monkeypatch) -> None:
db = DummyDB({})
db.fetchall = lambda *_args, **_kwargs: [{"status": "pending", "count": 2}]
manager = prov.ProvisioningManager(db, DummyStorage())
captured = {}
monkeypatch.setattr(prov, "set_access_request_counts", lambda payload: captured.update(payload))
manager._sync_status_metrics()
assert captured["pending"] == 2