diff --git a/Jenkinsfile b/Jenkinsfile index 5c7574b..c2625a1 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -44,6 +44,13 @@ spec: mountPath: /root/.docker - name: harbor-config mountPath: /docker-config + - name: tester + image: python:3.12-slim + command: ["cat"] + tty: true + volumeMounts: + - name: workspace-volume + mountPath: /home/jenkins/agent volumes: - name: workspace-volume emptyDir: {} @@ -79,6 +86,18 @@ spec: } } + stage('Unit tests') { + steps { + container('tester') { + sh ''' + set -euo pipefail + python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt + pytest -q + ''' + } + } + } + stage('Prep toolchain') { steps { container('builder') { diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..609e71b --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import jwt +import pytest + +from ariadne.auth.keycloak import Authenticator, KeycloakOIDC + + +def _make_token(kid: str = "test") -> str: + return jwt.encode( + {"sub": "user"}, + "secret", + algorithm="HS256", + headers={"kid": kid}, + ) + + +def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None: + token = _make_token() + kc = KeycloakOIDC("https://jwks", "https://issuer", "portal") + + monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]}) + monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy") + monkeypatch.setattr( + jwt, + "decode", + lambda *args, **kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]}, + ) + + claims = kc.verify(token) + assert claims["preferred_username"] == "alice" + + +def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None: + token = _make_token() + kc = KeycloakOIDC("https://jwks", "https://issuer", "portal") + + monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]}) + monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy") + monkeypatch.setattr( + jwt, + "decode", + lambda *args, **kwargs: {"azp": "other", "aud": ["other"]}, + ) + + with pytest.raises(ValueError): + kc.verify(token) + + +def test_keycloak_verify_missing_kid(monkeypatch) -> None: + kc = KeycloakOIDC("https://jwks", "https://issuer", "portal") + monkeypatch.setattr(jwt, "get_unverified_header", lambda token: {}) + + with pytest.raises(ValueError): + kc.verify("header.payload.sig") + + +def test_authenticator_normalizes_groups(monkeypatch) -> None: + token = _make_token() + auth = Authenticator() + + monkeypatch.setattr(auth._oidc, "verify", lambda token: {"preferred_username": "bob", "groups": ["/admin", "dev"]}) + + ctx = auth.authenticate(token) + assert ctx.username == "bob" + assert ctx.groups == ["admin", "dev"] diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..f23e5b0 --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from contextlib import contextmanager + +import ariadne.db.database as db_module +from ariadne.db.database import Database + + +class DummyResult: + def __init__(self, row=None, rows=None): + self._row = row + self._rows = rows or [] + + def fetchone(self): + return self._row + + def fetchall(self): + return self._rows + + +class DummyConn: + def __init__(self): + self.row_factory = None + self.executed = [] + + def execute(self, query, params=None): + self.executed.append((query, params)) + return DummyResult() + + +class DummyPool: + def __init__(self, conninfo=None, max_size=None): + self.conn = DummyConn() + + @contextmanager + def connection(self): + yield self.conn + + def close(self): + return None + + +def test_ensure_schema_runs(monkeypatch) -> None: + monkeypatch.setattr(db_module, "ConnectionPool", DummyPool) + db = Database("postgresql://user:pass@localhost/db") + db.ensure_schema() + assert db._pool.conn.executed diff --git a/tests/test_k8s_client.py b/tests/test_k8s_client.py new file mode 100644 index 0000000..7783164 --- /dev/null +++ b/tests/test_k8s_client.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import types + +import ariadne.k8s.client as k8s_client + + +class DummyResponse: + def __init__(self, payload): + self._payload = payload + + def raise_for_status(self): + return None + + def json(self): + return self._payload + + +class DummyClient: + def __init__(self, *args, **kwargs): + self.calls = [] + + def request(self, method, url, json=None): + self.calls.append((method, url, json)) + return DummyResponse({"ok": True}) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +def test_get_json_builds_url(monkeypatch) -> None: + dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0) + monkeypatch.setattr(k8s_client, "settings", dummy_settings) + monkeypatch.setattr(k8s_client, "_read_service_account", lambda: ("token", "/tmp/ca")) + monkeypatch.setattr(k8s_client.httpx, "Client", DummyClient) + + result = k8s_client.get_json("/api/test") + assert result == {"ok": True} diff --git a/tests/test_k8s_jobs.py b/tests/test_k8s_jobs.py new file mode 100644 index 0000000..01b5215 --- /dev/null +++ b/tests/test_k8s_jobs.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from ariadne.k8s.jobs import JobSpawner + + +def test_job_from_cronjob_applies_env_and_ttl() -> None: + cronjob = { + "spec": { + "jobTemplate": { + "spec": { + "template": { + "spec": { + "containers": [ + {"name": "sync", "env": [{"name": "FOO", "value": "1"}]} + ] + } + } + } + } + } + } + + spawner = JobSpawner("ns", "cron") + job = spawner._job_from_cronjob( + cronjob, + "User@Name", + env_overrides=[{"name": "FOO", "value": "2"}, {"name": "BAR", "value": "3"}], + job_ttl_seconds=3600, + ) + + assert job["spec"]["ttlSecondsAfterFinished"] == 3600 + labels = job["metadata"]["labels"] + assert labels["atlas.bstein.dev/trigger"] == "ariadne" + + env = job["spec"]["template"]["spec"]["containers"][0]["env"] + env_map = {item["name"]: item["value"] for item in env} + assert env_map["FOO"] == "2" + assert env_map["BAR"] == "3" + + +def test_safe_name_fragment() -> None: + assert JobSpawner._safe_name_fragment("User@Name") == "user-name" diff --git a/tests/test_mailer.py b/tests/test_mailer.py new file mode 100644 index 0000000..2731f20 --- /dev/null +++ b/tests/test_mailer.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import types + +import pytest + +from ariadne.services.mailer import Mailer, MailerError + + +def test_mailer_requires_host(monkeypatch) -> None: + dummy = types.SimpleNamespace( + smtp_host="", + smtp_port=25, + smtp_username="", + smtp_password="", + smtp_from="test@bstein.dev", + smtp_starttls=False, + smtp_use_tls=False, + smtp_timeout_sec=5.0, + ) + monkeypatch.setattr("ariadne.services.mailer.settings", dummy) + + svc = Mailer() + with pytest.raises(MailerError): + svc.send("subject", ["a@bstein.dev"], "body") + + +def test_send_welcome_calls_send(monkeypatch) -> None: + dummy = types.SimpleNamespace( + smtp_host="smtp", + smtp_port=25, + smtp_username="", + smtp_password="", + smtp_from="test@bstein.dev", + smtp_starttls=False, + smtp_use_tls=False, + smtp_timeout_sec=5.0, + ) + monkeypatch.setattr("ariadne.services.mailer.settings", dummy) + + svc = Mailer() + called = {} + + def _send(subject, to_addrs, text_body, html_body=None): + called["subject"] = subject + called["to"] = to_addrs + return types.SimpleNamespace(ok=True, detail="sent") + + monkeypatch.setattr(svc, "send", _send) + svc.send_welcome("user@bstein.dev", "CODE", "https://bstein.dev/onboarding?code=CODE", username="user") + assert called["subject"] == "Welcome to Titan Lab" diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..535b072 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from prometheus_client import generate_latest + +from ariadne.metrics.metrics import record_task_run, record_schedule_state + + +def test_metrics_include_task_run() -> None: + record_task_run("unit", "ok", 0.2) + payload = generate_latest() + assert b"ariadne_task_runs_total" in payload + + +def test_metrics_include_schedule() -> None: + record_schedule_state("sched", 1, 1, 2, True) + payload = generate_latest() + assert b"ariadne_schedule_last_status" in payload diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..9e0795c --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from datetime import datetime + +from ariadne.scheduler.cron import CronScheduler, CronTask + + +class DummyStorage: + def __init__(self) -> None: + self.task_runs = [] + self.schedule_states = [] + + def record_task_run(self, *args, **kwargs): + self.task_runs.append((args, kwargs)) + + def update_schedule_state(self, *args, **kwargs): + self.schedule_states.append((args, kwargs)) + + +def test_execute_task_records_failure() -> None: + storage = DummyStorage() + scheduler = CronScheduler(storage, tick_sec=0.1) + + def runner(): + raise RuntimeError("boom") + + task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner) + scheduler._next_run["test"] = datetime.utcnow() + scheduler._execute_task(task) + + assert storage.task_runs + assert storage.schedule_states diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 0000000..eb55dec --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import types + +import pytest + +from ariadne.services.firefly import FireflyService +from ariadne.services.nextcloud import NextcloudService +from ariadne.services.wger import WgerService + + +class DummySpawner: + def __init__(self, namespace, cronjob): + self.namespace = namespace + self.cronjob = cronjob + self.calls = [] + + def trigger_and_wait(self, label_suffix, env_overrides, timeout_sec, job_ttl_seconds=None): + self.calls.append((label_suffix, env_overrides, timeout_sec, job_ttl_seconds)) + return {"job": "test", "status": "ok"} + + def trigger(self, label_suffix, env_overrides, job_ttl_seconds=None): + self.calls.append((label_suffix, env_overrides, job_ttl_seconds)) + return {"job": "test", "status": "queued"} + + +def test_nextcloud_sync_mail_builds_env(monkeypatch) -> None: + dummy = types.SimpleNamespace( + nextcloud_namespace="nextcloud", + nextcloud_mail_sync_cronjob="nextcloud-mail-sync", + nextcloud_mail_sync_wait_timeout_sec=90.0, + nextcloud_mail_sync_job_ttl_sec=3600, + ) + monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy) + monkeypatch.setattr("ariadne.services.nextcloud.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) + + svc = NextcloudService() + result = svc.sync_mail("alice", wait=True) + + assert result["status"] == "ok" + spawner = svc._spawner + assert spawner.calls + label, env, timeout, ttl = spawner.calls[0] + assert label == "alice" + assert {item["name"]: item["value"] for item in env}["ONLY_USERNAME"] == "alice" + assert ttl == 3600 + + +def test_wger_sync_user_env(monkeypatch) -> None: + dummy = types.SimpleNamespace( + wger_namespace="health", + wger_user_sync_cronjob="wger-user-sync", + wger_admin_cronjob="wger-admin-ensure", + wger_user_sync_wait_timeout_sec=60.0, + ) + monkeypatch.setattr("ariadne.services.wger.settings", dummy) + monkeypatch.setattr("ariadne.services.wger.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) + + svc = WgerService() + result = svc.sync_user("alice", "alice@bstein.dev", "pw", wait=True) + + assert result["status"] == "ok" + user_spawner = svc._user_spawner + label, env, _, _ = user_spawner.calls[0] + assert label == "alice" + env_map = {item["name"]: item["value"] for item in env} + assert env_map["WGER_USERNAME"] == "alice" + assert env_map["WGER_EMAIL"] == "alice@bstein.dev" + + +def test_firefly_sync_user_env(monkeypatch) -> None: + dummy = types.SimpleNamespace( + firefly_namespace="finance", + firefly_user_sync_cronjob="firefly-user-sync", + firefly_user_sync_wait_timeout_sec=60.0, + ) + monkeypatch.setattr("ariadne.services.firefly.settings", dummy) + monkeypatch.setattr("ariadne.services.firefly.JobSpawner", lambda ns, cj: DummySpawner(ns, cj)) + + svc = FireflyService() + result = svc.sync_user("alice@bstein.dev", "pw", wait=True) + + assert result["status"] == "ok" + spawner = svc._spawner + label, env, _, _ = spawner.calls[0] + assert label == "alice" + env_map = {item["name"]: item["value"] for item in env} + assert env_map["FIREFLY_USER_EMAIL"] == "alice@bstein.dev" + + +def test_nextcloud_missing_config(monkeypatch) -> None: + dummy = types.SimpleNamespace( + nextcloud_namespace="", + nextcloud_mail_sync_cronjob="", + nextcloud_mail_sync_wait_timeout_sec=90.0, + nextcloud_mail_sync_job_ttl_sec=3600, + ) + monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy) + svc = NextcloudService() + with pytest.raises(RuntimeError): + svc.sync_mail("alice") diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..416beaf --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +from datetime import datetime + +from ariadne.db.storage import Storage + + +class DummyDB: + def __init__(self) -> None: + self.rows = [] + + def fetchall(self, query, params=None): + return self.rows + + def fetchone(self, query, params=None): + return None + + def execute(self, query, params=None): + return None + + +def test_task_statuses_and_complete() -> None: + db = DummyDB() + db.rows = [{"task": "one", "status": "ok"}, {"task": "two", "status": "error"}] + storage = Storage(db) + + statuses = storage.task_statuses("req") + assert statuses == {"one": "ok", "two": "error"} + assert storage.tasks_complete("req", ["one"]) is True + assert storage.tasks_complete("req", ["one", "two"]) is False + + +def test_row_to_request_flags() -> None: + row = { + "request_code": "abc", + "username": "alice", + "contact_email": "a@example.com", + "status": "pending", + "email_verified_at": None, + "initial_password": None, + "initial_password_revealed_at": None, + "provision_attempted_at": None, + "approval_flags": ["demo", 1, "test"], + "approval_note": "note", + "denial_note": None, + } + + req = Storage._row_to_request(row) + assert req.request_code == "abc" + assert req.approval_flags == ["demo", "1", "test"] diff --git a/tests/test_utils.py b/tests/test_utils.py index f0ecd1c..baf4d20 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,6 +3,12 @@ from __future__ import annotations import re from ariadne.services.mailu import MailuService +from ariadne.utils.http import extract_bearer_token + + +class DummyRequest: + def __init__(self, headers): + self.headers = headers from ariadne.utils.errors import safe_error_detail from ariadne.utils.passwords import random_password @@ -24,3 +30,8 @@ def test_mailu_resolve_email_default() -> None: def test_safe_error_detail_runtime() -> None: assert safe_error_detail(RuntimeError("boom"), "fallback") == "boom" + + +def test_extract_bearer_token() -> None: + request = DummyRequest({"Authorization": "Bearer token123"}) + assert extract_bearer_token(request) == "token123" diff --git a/tests/test_vaultwarden_sync.py b/tests/test_vaultwarden_sync.py new file mode 100644 index 0000000..e401573 --- /dev/null +++ b/tests/test_vaultwarden_sync.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from ariadne.services.vaultwarden import VaultwardenInvite +from ariadne.services import vaultwarden_sync + + +@dataclass +class DummyAdmin: + ready_value: bool = True + users: list[dict] = None + attrs: dict[str, dict] = None + set_calls: list[tuple[str, str, str]] = None + + def ready(self) -> bool: + return self.ready_value + + def iter_users(self, page_size: int = 200, brief: bool = False): + return self.users or [] + + def get_user(self, user_id: str): + return self.attrs.get(user_id, {}) if self.attrs else {} + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + if self.set_calls is None: + self.set_calls = [] + self.set_calls.append((username, key, value)) + + +def test_vaultwarden_sync_requires_admin(monkeypatch) -> None: + dummy = DummyAdmin(ready_value=False) + monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) + + summary = vaultwarden_sync.run_vaultwarden_sync() + assert summary.failures == 1 + assert summary.detail == "keycloak admin not configured" + + +def test_vaultwarden_sync_skips_when_missing_mailbox(monkeypatch) -> None: + dummy = DummyAdmin( + users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], + attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, + ) + monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) + monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: False) + + summary = vaultwarden_sync.run_vaultwarden_sync() + assert summary.skipped == 1 + assert summary.processed == 0 + + +def test_vaultwarden_sync_invites(monkeypatch) -> None: + dummy = DummyAdmin( + users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}], + attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}}, + ) + monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy) + monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True) + monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited")) + + summary = vaultwarden_sync.run_vaultwarden_sync() + assert summary.created_or_present == 1 + assert dummy.set_calls