test: expand Ariadne unit coverage

This commit is contained in:
Brad Stein 2026-01-19 19:01:32 -03:00
parent ee532ac215
commit 952c48e55f
12 changed files with 541 additions and 0 deletions

19
Jenkinsfile vendored
View File

@ -44,6 +44,13 @@ spec:
mountPath: /root/.docker mountPath: /root/.docker
- name: harbor-config - name: harbor-config
mountPath: /docker-config mountPath: /docker-config
- name: tester
image: python:3.12-slim
command: ["cat"]
tty: true
volumeMounts:
- name: workspace-volume
mountPath: /home/jenkins/agent
volumes: volumes:
- name: workspace-volume - name: workspace-volume
emptyDir: {} emptyDir: {}
@ -79,6 +86,18 @@ spec:
} }
} }
stage('Unit tests') {
steps {
container('tester') {
sh '''
set -euo pipefail
python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt
pytest -q
'''
}
}
}
stage('Prep toolchain') { stage('Prep toolchain') {
steps { steps {
container('builder') { container('builder') {

66
tests/test_auth.py Normal file
View File

@ -0,0 +1,66 @@
from __future__ import annotations
import jwt
import pytest
from ariadne.auth.keycloak import Authenticator, KeycloakOIDC
def _make_token(kid: str = "test") -> str:
return jwt.encode(
{"sub": "user"},
"secret",
algorithm="HS256",
headers={"kid": kid},
)
def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None:
token = _make_token()
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
lambda *args, **kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
)
claims = kc.verify(token)
assert claims["preferred_username"] == "alice"
def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
token = _make_token()
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
lambda *args, **kwargs: {"azp": "other", "aud": ["other"]},
)
with pytest.raises(ValueError):
kc.verify(token)
def test_keycloak_verify_missing_kid(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(jwt, "get_unverified_header", lambda token: {})
with pytest.raises(ValueError):
kc.verify("header.payload.sig")
def test_authenticator_normalizes_groups(monkeypatch) -> None:
token = _make_token()
auth = Authenticator()
monkeypatch.setattr(auth._oidc, "verify", lambda token: {"preferred_username": "bob", "groups": ["/admin", "dev"]})
ctx = auth.authenticate(token)
assert ctx.username == "bob"
assert ctx.groups == ["admin", "dev"]

47
tests/test_database.py Normal file
View File

@ -0,0 +1,47 @@
from __future__ import annotations
from contextlib import contextmanager
import ariadne.db.database as db_module
from ariadne.db.database import Database
class DummyResult:
def __init__(self, row=None, rows=None):
self._row = row
self._rows = rows or []
def fetchone(self):
return self._row
def fetchall(self):
return self._rows
class DummyConn:
def __init__(self):
self.row_factory = None
self.executed = []
def execute(self, query, params=None):
self.executed.append((query, params))
return DummyResult()
class DummyPool:
def __init__(self, conninfo=None, max_size=None):
self.conn = DummyConn()
@contextmanager
def connection(self):
yield self.conn
def close(self):
return None
def test_ensure_schema_runs(monkeypatch) -> None:
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
db = Database("postgresql://user:pass@localhost/db")
db.ensure_schema()
assert db._pool.conn.executed

41
tests/test_k8s_client.py Normal file
View File

@ -0,0 +1,41 @@
from __future__ import annotations
import types
import ariadne.k8s.client as k8s_client
class DummyResponse:
def __init__(self, payload):
self._payload = payload
def raise_for_status(self):
return None
def json(self):
return self._payload
class DummyClient:
def __init__(self, *args, **kwargs):
self.calls = []
def request(self, method, url, json=None):
self.calls.append((method, url, json))
return DummyResponse({"ok": True})
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def test_get_json_builds_url(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(k8s_api_timeout_sec=5.0)
monkeypatch.setattr(k8s_client, "settings", dummy_settings)
monkeypatch.setattr(k8s_client, "_read_service_account", lambda: ("token", "/tmp/ca"))
monkeypatch.setattr(k8s_client.httpx, "Client", DummyClient)
result = k8s_client.get_json("/api/test")
assert result == {"ok": True}

42
tests/test_k8s_jobs.py Normal file
View File

@ -0,0 +1,42 @@
from __future__ import annotations
from ariadne.k8s.jobs import JobSpawner
def test_job_from_cronjob_applies_env_and_ttl() -> None:
cronjob = {
"spec": {
"jobTemplate": {
"spec": {
"template": {
"spec": {
"containers": [
{"name": "sync", "env": [{"name": "FOO", "value": "1"}]}
]
}
}
}
}
}
}
spawner = JobSpawner("ns", "cron")
job = spawner._job_from_cronjob(
cronjob,
"User@Name",
env_overrides=[{"name": "FOO", "value": "2"}, {"name": "BAR", "value": "3"}],
job_ttl_seconds=3600,
)
assert job["spec"]["ttlSecondsAfterFinished"] == 3600
labels = job["metadata"]["labels"]
assert labels["atlas.bstein.dev/trigger"] == "ariadne"
env = job["spec"]["template"]["spec"]["containers"][0]["env"]
env_map = {item["name"]: item["value"] for item in env}
assert env_map["FOO"] == "2"
assert env_map["BAR"] == "3"
def test_safe_name_fragment() -> None:
assert JobSpawner._safe_name_fragment("User@Name") == "user-name"

51
tests/test_mailer.py Normal file
View File

@ -0,0 +1,51 @@
from __future__ import annotations
import types
import pytest
from ariadne.services.mailer import Mailer, MailerError
def test_mailer_requires_host(monkeypatch) -> None:
dummy = types.SimpleNamespace(
smtp_host="",
smtp_port=25,
smtp_username="",
smtp_password="",
smtp_from="test@bstein.dev",
smtp_starttls=False,
smtp_use_tls=False,
smtp_timeout_sec=5.0,
)
monkeypatch.setattr("ariadne.services.mailer.settings", dummy)
svc = Mailer()
with pytest.raises(MailerError):
svc.send("subject", ["a@bstein.dev"], "body")
def test_send_welcome_calls_send(monkeypatch) -> None:
dummy = types.SimpleNamespace(
smtp_host="smtp",
smtp_port=25,
smtp_username="",
smtp_password="",
smtp_from="test@bstein.dev",
smtp_starttls=False,
smtp_use_tls=False,
smtp_timeout_sec=5.0,
)
monkeypatch.setattr("ariadne.services.mailer.settings", dummy)
svc = Mailer()
called = {}
def _send(subject, to_addrs, text_body, html_body=None):
called["subject"] = subject
called["to"] = to_addrs
return types.SimpleNamespace(ok=True, detail="sent")
monkeypatch.setattr(svc, "send", _send)
svc.send_welcome("user@bstein.dev", "CODE", "https://bstein.dev/onboarding?code=CODE", username="user")
assert called["subject"] == "Welcome to Titan Lab"

17
tests/test_metrics.py Normal file
View File

@ -0,0 +1,17 @@
from __future__ import annotations
from prometheus_client import generate_latest
from ariadne.metrics.metrics import record_task_run, record_schedule_state
def test_metrics_include_task_run() -> None:
record_task_run("unit", "ok", 0.2)
payload = generate_latest()
assert b"ariadne_task_runs_total" in payload
def test_metrics_include_schedule() -> None:
record_schedule_state("sched", 1, 1, 2, True)
payload = generate_latest()
assert b"ariadne_schedule_last_status" in payload

32
tests/test_scheduler.py Normal file
View File

@ -0,0 +1,32 @@
from __future__ import annotations
from datetime import datetime
from ariadne.scheduler.cron import CronScheduler, CronTask
class DummyStorage:
def __init__(self) -> None:
self.task_runs = []
self.schedule_states = []
def record_task_run(self, *args, **kwargs):
self.task_runs.append((args, kwargs))
def update_schedule_state(self, *args, **kwargs):
self.schedule_states.append((args, kwargs))
def test_execute_task_records_failure() -> None:
storage = DummyStorage()
scheduler = CronScheduler(storage, tick_sec=0.1)
def runner():
raise RuntimeError("boom")
task = CronTask(name="test", cron_expr="*/5 * * * *", runner=runner)
scheduler._next_run["test"] = datetime.utcnow()
scheduler._execute_task(task)
assert storage.task_runs
assert storage.schedule_states

101
tests/test_services.py Normal file
View File

@ -0,0 +1,101 @@
from __future__ import annotations
import types
import pytest
from ariadne.services.firefly import FireflyService
from ariadne.services.nextcloud import NextcloudService
from ariadne.services.wger import WgerService
class DummySpawner:
def __init__(self, namespace, cronjob):
self.namespace = namespace
self.cronjob = cronjob
self.calls = []
def trigger_and_wait(self, label_suffix, env_overrides, timeout_sec, job_ttl_seconds=None):
self.calls.append((label_suffix, env_overrides, timeout_sec, job_ttl_seconds))
return {"job": "test", "status": "ok"}
def trigger(self, label_suffix, env_overrides, job_ttl_seconds=None):
self.calls.append((label_suffix, env_overrides, job_ttl_seconds))
return {"job": "test", "status": "queued"}
def test_nextcloud_sync_mail_builds_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
nextcloud_namespace="nextcloud",
nextcloud_mail_sync_cronjob="nextcloud-mail-sync",
nextcloud_mail_sync_wait_timeout_sec=90.0,
nextcloud_mail_sync_job_ttl_sec=3600,
)
monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy)
monkeypatch.setattr("ariadne.services.nextcloud.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = NextcloudService()
result = svc.sync_mail("alice", wait=True)
assert result["status"] == "ok"
spawner = svc._spawner
assert spawner.calls
label, env, timeout, ttl = spawner.calls[0]
assert label == "alice"
assert {item["name"]: item["value"] for item in env}["ONLY_USERNAME"] == "alice"
assert ttl == 3600
def test_wger_sync_user_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
wger_namespace="health",
wger_user_sync_cronjob="wger-user-sync",
wger_admin_cronjob="wger-admin-ensure",
wger_user_sync_wait_timeout_sec=60.0,
)
monkeypatch.setattr("ariadne.services.wger.settings", dummy)
monkeypatch.setattr("ariadne.services.wger.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = WgerService()
result = svc.sync_user("alice", "alice@bstein.dev", "pw", wait=True)
assert result["status"] == "ok"
user_spawner = svc._user_spawner
label, env, _, _ = user_spawner.calls[0]
assert label == "alice"
env_map = {item["name"]: item["value"] for item in env}
assert env_map["WGER_USERNAME"] == "alice"
assert env_map["WGER_EMAIL"] == "alice@bstein.dev"
def test_firefly_sync_user_env(monkeypatch) -> None:
dummy = types.SimpleNamespace(
firefly_namespace="finance",
firefly_user_sync_cronjob="firefly-user-sync",
firefly_user_sync_wait_timeout_sec=60.0,
)
monkeypatch.setattr("ariadne.services.firefly.settings", dummy)
monkeypatch.setattr("ariadne.services.firefly.JobSpawner", lambda ns, cj: DummySpawner(ns, cj))
svc = FireflyService()
result = svc.sync_user("alice@bstein.dev", "pw", wait=True)
assert result["status"] == "ok"
spawner = svc._spawner
label, env, _, _ = spawner.calls[0]
assert label == "alice"
env_map = {item["name"]: item["value"] for item in env}
assert env_map["FIREFLY_USER_EMAIL"] == "alice@bstein.dev"
def test_nextcloud_missing_config(monkeypatch) -> None:
dummy = types.SimpleNamespace(
nextcloud_namespace="",
nextcloud_mail_sync_cronjob="",
nextcloud_mail_sync_wait_timeout_sec=90.0,
nextcloud_mail_sync_job_ttl_sec=3600,
)
monkeypatch.setattr("ariadne.services.nextcloud.settings", dummy)
svc = NextcloudService()
with pytest.raises(RuntimeError):
svc.sync_mail("alice")

50
tests/test_storage.py Normal file
View File

@ -0,0 +1,50 @@
from __future__ import annotations
from datetime import datetime
from ariadne.db.storage import Storage
class DummyDB:
def __init__(self) -> None:
self.rows = []
def fetchall(self, query, params=None):
return self.rows
def fetchone(self, query, params=None):
return None
def execute(self, query, params=None):
return None
def test_task_statuses_and_complete() -> None:
db = DummyDB()
db.rows = [{"task": "one", "status": "ok"}, {"task": "two", "status": "error"}]
storage = Storage(db)
statuses = storage.task_statuses("req")
assert statuses == {"one": "ok", "two": "error"}
assert storage.tasks_complete("req", ["one"]) is True
assert storage.tasks_complete("req", ["one", "two"]) is False
def test_row_to_request_flags() -> None:
row = {
"request_code": "abc",
"username": "alice",
"contact_email": "a@example.com",
"status": "pending",
"email_verified_at": None,
"initial_password": None,
"initial_password_revealed_at": None,
"provision_attempted_at": None,
"approval_flags": ["demo", 1, "test"],
"approval_note": "note",
"denial_note": None,
}
req = Storage._row_to_request(row)
assert req.request_code == "abc"
assert req.approval_flags == ["demo", "1", "test"]

View File

@ -3,6 +3,12 @@ from __future__ import annotations
import re import re
from ariadne.services.mailu import MailuService from ariadne.services.mailu import MailuService
from ariadne.utils.http import extract_bearer_token
class DummyRequest:
def __init__(self, headers):
self.headers = headers
from ariadne.utils.errors import safe_error_detail from ariadne.utils.errors import safe_error_detail
from ariadne.utils.passwords import random_password from ariadne.utils.passwords import random_password
@ -24,3 +30,8 @@ def test_mailu_resolve_email_default() -> None:
def test_safe_error_detail_runtime() -> None: def test_safe_error_detail_runtime() -> None:
assert safe_error_detail(RuntimeError("boom"), "fallback") == "boom" assert safe_error_detail(RuntimeError("boom"), "fallback") == "boom"
def test_extract_bearer_token() -> None:
request = DummyRequest({"Authorization": "Bearer token123"})
assert extract_bearer_token(request) == "token123"

View File

@ -0,0 +1,64 @@
from __future__ import annotations
from dataclasses import dataclass
from ariadne.services.vaultwarden import VaultwardenInvite
from ariadne.services import vaultwarden_sync
@dataclass
class DummyAdmin:
ready_value: bool = True
users: list[dict] = None
attrs: dict[str, dict] = None
set_calls: list[tuple[str, str, str]] = None
def ready(self) -> bool:
return self.ready_value
def iter_users(self, page_size: int = 200, brief: bool = False):
return self.users or []
def get_user(self, user_id: str):
return self.attrs.get(user_id, {}) if self.attrs else {}
def set_user_attribute(self, username: str, key: str, value: str) -> None:
if self.set_calls is None:
self.set_calls = []
self.set_calls.append((username, key, value))
def test_vaultwarden_sync_requires_admin(monkeypatch) -> None:
dummy = DummyAdmin(ready_value=False)
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy)
summary = vaultwarden_sync.run_vaultwarden_sync()
assert summary.failures == 1
assert summary.detail == "keycloak admin not configured"
def test_vaultwarden_sync_skips_when_missing_mailbox(monkeypatch) -> None:
dummy = DummyAdmin(
users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}],
attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}},
)
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy)
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: False)
summary = vaultwarden_sync.run_vaultwarden_sync()
assert summary.skipped == 1
assert summary.processed == 0
def test_vaultwarden_sync_invites(monkeypatch) -> None:
dummy = DummyAdmin(
users=[{"id": "1", "username": "alice", "enabled": True, "attributes": {"mailu_email": ["alice@bstein.dev"]}}],
attrs={"1": {"id": "1", "username": "alice", "attributes": {"mailu_email": ["alice@bstein.dev"]}}},
)
monkeypatch.setattr(vaultwarden_sync, "keycloak_admin", dummy)
monkeypatch.setattr(vaultwarden_sync.mailu, "mailbox_exists", lambda email: True)
monkeypatch.setattr(vaultwarden_sync.vaultwarden, "invite_user", lambda email: VaultwardenInvite(True, "invited"))
summary = vaultwarden_sync.run_vaultwarden_sync()
assert summary.created_or_present == 1
assert dummy.set_calls