Compare commits
2 Commits
master
...
codex/loca
| Author | SHA1 | Date | |
|---|---|---|---|
| 855d59c380 | |||
| ff3339fafc |
124
Jenkinsfile
vendored
124
Jenkinsfile
vendored
@ -76,8 +76,8 @@ spec:
|
||||
IMAGE = "${REGISTRY}/ariadne"
|
||||
VERSION_TAG = 'dev'
|
||||
SEMVER = 'dev'
|
||||
COVERAGE_MIN = '99'
|
||||
COVERAGE_JSON = 'build/coverage.json'
|
||||
QUALITY_GATE_JSON = 'build/quality-gate.json'
|
||||
JUNIT_XML = 'build/junit.xml'
|
||||
SUITE_NAME = 'ariadne'
|
||||
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
|
||||
@ -102,30 +102,20 @@ spec:
|
||||
set -euo pipefail
|
||||
mkdir -p build
|
||||
python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt
|
||||
python -m ruff check ariadne --select PLR
|
||||
python -m ruff check ariadne tests scripts
|
||||
python -m slipcover \
|
||||
--json \
|
||||
--out "${COVERAGE_JSON}" \
|
||||
--source ariadne \
|
||||
--fail-under "${COVERAGE_MIN}" \
|
||||
-m pytest -ra -vv --durations=20 --junitxml "${JUNIT_XML}"
|
||||
python scripts/check_quality_gate.py --coverage-json "${COVERAGE_JSON}" --output "${QUALITY_GATE_JSON}"
|
||||
python -c "import json; payload=json.load(open('build/coverage.json', encoding='utf-8')); percent=(payload.get('summary') or {}).get('percent_covered'); print(f'Coverage summary: {percent:.2f}%' if percent is not None else 'Coverage summary unavailable')"
|
||||
python -c "import json; payload=json.load(open('build/quality-gate.json', encoding='utf-8')); summary=payload.get('summary') or {}; print(f\"Quality gate: {payload.get('status')} violations={summary.get('violations_total', 0)} coverage_targets={summary.get('coverage_targets', 0)}\")"
|
||||
'''.stripIndent())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage('Publish test metrics') {
|
||||
steps {
|
||||
container('tester') {
|
||||
sh '''
|
||||
set -euo pipefail
|
||||
python scripts/publish_test_metrics.py
|
||||
'''
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage('Prep toolchain') {
|
||||
steps {
|
||||
container('builder') {
|
||||
@ -202,97 +192,21 @@ python -c "import json; payload=json.load(open('build/coverage.json', encoding='
|
||||
}
|
||||
|
||||
post {
|
||||
success {
|
||||
container('tester') {
|
||||
sh '''
|
||||
set -euo pipefail
|
||||
export QUALITY_STATUS=ok
|
||||
python - <<'PY'
|
||||
import os
|
||||
import re
|
||||
import urllib.request
|
||||
|
||||
suite = os.environ.get("SUITE_NAME", "ariadne")
|
||||
status = os.environ.get("QUALITY_STATUS", "failed")
|
||||
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
|
||||
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
|
||||
|
||||
def counter(name: str) -> float:
|
||||
pattern = re.compile(
|
||||
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
|
||||
re.M,
|
||||
)
|
||||
match = pattern.search(text)
|
||||
return float(match.group(1)) if match else 0.0
|
||||
|
||||
ok = counter("ok")
|
||||
failed = counter("failed")
|
||||
if status == "ok":
|
||||
ok += 1
|
||||
else:
|
||||
failed += 1
|
||||
payload = (
|
||||
"# TYPE platform_quality_gate_runs_total counter\\n"
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
|
||||
)
|
||||
req = urllib.request.Request(
|
||||
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
|
||||
data=payload.encode("utf-8"),
|
||||
method="POST",
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=10).read()
|
||||
PY
|
||||
'''
|
||||
}
|
||||
}
|
||||
failure {
|
||||
container('tester') {
|
||||
sh '''
|
||||
set -euo pipefail
|
||||
export QUALITY_STATUS=failed
|
||||
python - <<'PY'
|
||||
import os
|
||||
import re
|
||||
import urllib.request
|
||||
|
||||
suite = os.environ.get("SUITE_NAME", "ariadne")
|
||||
status = os.environ.get("QUALITY_STATUS", "failed")
|
||||
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
|
||||
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
|
||||
|
||||
def counter(name: str) -> float:
|
||||
pattern = re.compile(
|
||||
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
|
||||
re.M,
|
||||
)
|
||||
match = pattern.search(text)
|
||||
return float(match.group(1)) if match else 0.0
|
||||
|
||||
ok = counter("ok")
|
||||
failed = counter("failed")
|
||||
if status == "ok":
|
||||
ok += 1
|
||||
else:
|
||||
failed += 1
|
||||
payload = (
|
||||
"# TYPE platform_quality_gate_runs_total counter\\n"
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
|
||||
)
|
||||
req = urllib.request.Request(
|
||||
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
|
||||
data=payload.encode("utf-8"),
|
||||
method="POST",
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=10).read()
|
||||
PY
|
||||
'''
|
||||
}
|
||||
}
|
||||
always {
|
||||
script {
|
||||
env.QUALITY_STATUS = currentBuild.currentResult == 'SUCCESS' ? 'ok' : 'failed'
|
||||
}
|
||||
container('tester') {
|
||||
sh '''
|
||||
set +e
|
||||
python scripts/publish_test_metrics.py
|
||||
status=$?
|
||||
set -e
|
||||
if [ "$status" -ne 0 ]; then
|
||||
echo "test metric publication failed with status $status"
|
||||
fi
|
||||
'''
|
||||
}
|
||||
script {
|
||||
if (fileExists('build/junit.xml')) {
|
||||
try {
|
||||
@ -302,7 +216,7 @@ PY
|
||||
}
|
||||
}
|
||||
}
|
||||
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json', allowEmptyArchive: true, fingerprint: true
|
||||
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json,build/quality-gate.json', allowEmptyArchive: true, fingerprint: true
|
||||
script {
|
||||
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]
|
||||
echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}"
|
||||
|
||||
@ -26,7 +26,10 @@ from .services.mailu_events import mailu_events
|
||||
from .services.nextcloud import nextcloud
|
||||
from .services.image_sweeper import image_sweeper
|
||||
from .services.metis import metis
|
||||
from .services.metis_token_sync import metis_token_sync
|
||||
from .services.soteria import soteria
|
||||
from .services.opensearch_prune import prune_indices
|
||||
from .services.platform_quality_probe import platform_quality_probe
|
||||
from .services.pod_cleaner import clean_finished_pods
|
||||
from .services.vaultwarden_sync import run_vaultwarden_sync
|
||||
from .services.vault import vault
|
||||
@ -315,6 +318,28 @@ def _startup() -> None:
|
||||
settings.metis_sentinel_watch_cron,
|
||||
lambda: metis.watch_sentinel(),
|
||||
)
|
||||
scheduler.add_task(
|
||||
"schedule.metis_k3s_token_sync",
|
||||
settings.metis_k3s_token_sync_cron,
|
||||
lambda: metis_token_sync.run(wait=True),
|
||||
)
|
||||
scheduler.add_task(
|
||||
"schedule.platform_quality_suite_probe",
|
||||
settings.platform_quality_suite_probe_cron,
|
||||
lambda: platform_quality_probe.run(wait=True),
|
||||
)
|
||||
if soteria.ready_for_backup():
|
||||
scheduler.add_task(
|
||||
"schedule.soteria_backup",
|
||||
settings.soteria_backup_cron,
|
||||
lambda: soteria.run_scheduled_backups(),
|
||||
)
|
||||
if soteria.ready_for_restore_tests():
|
||||
scheduler.add_task(
|
||||
"schedule.soteria_restore_test",
|
||||
settings.soteria_restore_test_cron,
|
||||
lambda: soteria.run_scheduled_restore_tests(),
|
||||
)
|
||||
scheduler.add_task(
|
||||
"schedule.vault_k8s_auth",
|
||||
settings.vault_k8s_auth_cron,
|
||||
@ -368,6 +393,10 @@ def _startup() -> None:
|
||||
"opensearch_prune_cron": settings.opensearch_prune_cron,
|
||||
"image_sweeper_cron": settings.image_sweeper_cron,
|
||||
"metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron,
|
||||
"metis_k3s_token_sync_cron": settings.metis_k3s_token_sync_cron,
|
||||
"platform_quality_suite_probe_cron": settings.platform_quality_suite_probe_cron,
|
||||
"soteria_backup_cron": settings.soteria_backup_cron if soteria.ready_for_backup() else "",
|
||||
"soteria_restore_test_cron": settings.soteria_restore_test_cron if soteria.ready_for_restore_tests() else "",
|
||||
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
|
||||
"vault_oidc_cron": settings.vault_oidc_cron,
|
||||
"comms_guest_name_cron": settings.comms_guest_name_cron,
|
||||
|
||||
@ -12,6 +12,13 @@ from ..settings import settings
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AuthContext:
|
||||
"""Authenticated user details returned by the OIDC verifier.
|
||||
|
||||
Inputs: normalized claims extracted from a validated bearer token.
|
||||
Outputs: a compact object that downstream handlers can trust without
|
||||
repeating token parsing logic.
|
||||
"""
|
||||
|
||||
username: str
|
||||
email: str
|
||||
groups: list[str]
|
||||
@ -19,6 +26,13 @@ class AuthContext:
|
||||
|
||||
|
||||
class KeycloakOIDC:
|
||||
"""Validate Keycloak-issued access tokens for Ariadne API requests.
|
||||
|
||||
Inputs: the JWKS URL, expected issuer, and client identifier.
|
||||
Outputs: verified token claims after signature and audience checks so the
|
||||
API can make authorization decisions safely.
|
||||
"""
|
||||
|
||||
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
|
||||
self._jwks_url = jwks_url
|
||||
self._issuer = issuer
|
||||
@ -97,6 +111,13 @@ class KeycloakOIDC:
|
||||
|
||||
|
||||
class Authenticator:
|
||||
"""Convert bearer tokens into normalized Ariadne auth contexts.
|
||||
|
||||
Inputs: raw bearer tokens from incoming API requests.
|
||||
Outputs: an `AuthContext` with cleaned usernames, emails, and groups so
|
||||
endpoint handlers can stay focused on business logic.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)
|
||||
|
||||
|
||||
@ -15,6 +15,13 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DatabaseConfig:
|
||||
"""Connection-pool and timeout settings for a database client.
|
||||
|
||||
Inputs: pool sizing and timeout values supplied by application settings.
|
||||
Outputs: a single immutable config object so database construction remains
|
||||
explicit and easy to test.
|
||||
"""
|
||||
|
||||
pool_min: int = 0
|
||||
pool_max: int = 5
|
||||
connect_timeout_sec: int = 5
|
||||
@ -25,6 +32,13 @@ class DatabaseConfig:
|
||||
|
||||
|
||||
class Database:
|
||||
"""Thin wrapper around a psycopg connection pool for Ariadne storage.
|
||||
|
||||
Inputs: a Postgres DSN plus optional pool and timeout configuration.
|
||||
Outputs: helper methods for migrations and common query patterns while
|
||||
centralizing timeout, locking, and row-format behavior.
|
||||
"""
|
||||
|
||||
def __init__(self, dsn: str, config: DatabaseConfig | None = None) -> None:
|
||||
if not dsn:
|
||||
raise RuntimeError("database URL is required")
|
||||
|
||||
@ -56,6 +56,13 @@ def delete_json(path: str) -> dict[str, Any]:
|
||||
|
||||
|
||||
def get_secret_value(namespace: str, name: str, key: str) -> str:
|
||||
"""Read and decode a string value from a Kubernetes Secret.
|
||||
|
||||
Inputs: a namespace, secret name, and key inside the secret data map.
|
||||
Outputs: the decoded UTF-8 value so callers can consume cluster-managed
|
||||
credentials without duplicating base64 and validation logic.
|
||||
"""
|
||||
|
||||
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
|
||||
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
|
||||
raw = blob.get(key)
|
||||
|
||||
@ -16,7 +16,7 @@ except Exception as exc: # pragma: no cover - import checked at runtime
|
||||
else:
|
||||
_IMPORT_ERROR = None
|
||||
|
||||
from .pods import PodSelectionError, select_pod
|
||||
from .pods import select_pod
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
|
||||
@ -26,6 +26,14 @@ _CORE_API = None
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ExecResult:
|
||||
"""Container for pod exec output captured from the Kubernetes stream API.
|
||||
|
||||
Inputs: stdout, stderr, and the observed exit code from a command run in a
|
||||
pod container.
|
||||
Outputs: a small immutable result object that callers can inspect or raise
|
||||
on without juggling stream state.
|
||||
"""
|
||||
|
||||
stdout: str
|
||||
stderr: str
|
||||
exit_code: int | None
|
||||
@ -65,6 +73,13 @@ def _build_command(command: list[str] | str, env: dict[str, str] | None) -> list
|
||||
|
||||
|
||||
class PodExecutor:
|
||||
"""Run shell commands in the most suitable pod for a workload selector.
|
||||
|
||||
Inputs: a namespace, label selector, and optional container name.
|
||||
Outputs: structured `ExecResult` objects or `ExecError`/`TimeoutError`
|
||||
exceptions so service code can react consistently to pod command results.
|
||||
"""
|
||||
|
||||
def __init__(self, namespace: str, label_selector: str, container: str | None = None) -> None:
|
||||
self._namespace = namespace
|
||||
self._label_selector = label_selector
|
||||
|
||||
@ -10,6 +10,13 @@ from .client import get_json
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PodRef:
|
||||
"""Reference to a Kubernetes pod chosen for a follow-up operation.
|
||||
|
||||
Inputs: the pod name, namespace, and optional node name.
|
||||
Outputs: a stable value object that higher-level helpers can pass around
|
||||
without carrying the full Kubernetes payload.
|
||||
"""
|
||||
|
||||
name: str
|
||||
namespace: str
|
||||
node: str | None = None
|
||||
@ -47,6 +54,13 @@ def _is_ready(pod: dict[str, Any]) -> bool:
|
||||
|
||||
|
||||
def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
|
||||
"""Fetch pods for a namespace and selector from the Kubernetes API.
|
||||
|
||||
Inputs: the target namespace and a label-selector string.
|
||||
Outputs: only dictionary pod objects so later selection logic can assume a
|
||||
predictable payload shape.
|
||||
"""
|
||||
|
||||
namespace = (namespace or "").strip()
|
||||
if not namespace:
|
||||
raise PodSelectionError("pod namespace missing")
|
||||
@ -58,6 +72,13 @@ def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
|
||||
|
||||
|
||||
def select_pod(namespace: str, label_selector: str) -> PodRef:
|
||||
"""Pick the newest ready pod for a namespace and label selector.
|
||||
|
||||
Inputs: the namespace and selector used to query Kubernetes pods.
|
||||
Outputs: a `PodRef` for the most recently started ready pod, or a
|
||||
`PodSelectionError` when no safe candidate exists.
|
||||
"""
|
||||
|
||||
pods = list_pods(namespace, label_selector)
|
||||
candidates: list[tuple[float, PodRef]] = []
|
||||
for pod in pods:
|
||||
|
||||
@ -84,6 +84,14 @@ def record_schedule_state(
|
||||
next_run_ts: float | None,
|
||||
ok: bool | None,
|
||||
) -> None:
|
||||
"""Record scheduler timing and status gauges for a task.
|
||||
|
||||
Inputs: the task name plus timestamps for the last run, last success, next
|
||||
run, and an optional success flag.
|
||||
Outputs: updated Prometheus gauges that make scheduler health visible in
|
||||
dashboards and alerts.
|
||||
"""
|
||||
|
||||
if last_run_ts:
|
||||
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
|
||||
if last_success_ts:
|
||||
@ -108,6 +116,13 @@ def set_cluster_state_metrics(
|
||||
pods_running: float | None,
|
||||
kustomizations_not_ready: int | None,
|
||||
) -> None:
|
||||
"""Publish the latest cluster-state summary to Prometheus gauges.
|
||||
|
||||
Inputs: the collection timestamp and aggregate node, pod, and Flux counts.
|
||||
Outputs: refreshed gauges so Grafana panels can render the current cluster
|
||||
snapshot without parsing raw service logs.
|
||||
"""
|
||||
|
||||
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
|
||||
if nodes_total is not None:
|
||||
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)
|
||||
|
||||
@ -24,6 +24,14 @@ def _build_db(dsn: str, application_name: str) -> Database:
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Run Ariadne and portal schema migrations when enabled in settings.
|
||||
|
||||
Inputs: process-wide application settings for database URLs and migration
|
||||
toggles.
|
||||
Outputs: applied migrations and closed pools so CLI and container startup
|
||||
paths can bootstrap storage safely.
|
||||
"""
|
||||
|
||||
if not settings.ariadne_run_migrations:
|
||||
return
|
||||
|
||||
|
||||
159
ariadne/services/metis_token_sync.py
Normal file
159
ariadne/services/metis_token_sync.py
Normal file
@ -0,0 +1,159 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from ..k8s.client import get_json, post_json
|
||||
from ..settings import settings
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_SYNC_SCRIPT = """
|
||||
set -eu
|
||||
token="$(tr -d '\n' < /host/var/lib/rancher/k3s/server/token)"
|
||||
jwt="$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
|
||||
VAULT_TOKEN="$(vault write -field=token auth/kubernetes/login role="${VAULT_K8S_ROLE}" jwt="${jwt}")"
|
||||
export VAULT_TOKEN
|
||||
vault kv put kv/atlas/maintenance/metis-runtime k3s_token="${token}"
|
||||
""".strip()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MetisTokenSyncResult:
|
||||
"""Represent a single metis token-sync execution outcome.
|
||||
|
||||
Inputs: job metadata and completion status gathered from the Kubernetes Job API.
|
||||
Outputs: a stable result shape used by scheduler logs/metrics so operators can
|
||||
quickly confirm whether token sync completed, is still running, or failed.
|
||||
"""
|
||||
|
||||
job: str
|
||||
status: str
|
||||
|
||||
|
||||
class MetisTokenSyncService:
|
||||
"""Run metis token synchronization via one-shot Kubernetes Jobs.
|
||||
|
||||
Inputs: scheduler invocations and runtime settings for namespace, role, and
|
||||
node placement.
|
||||
Outputs: per-run status that confirms whether Ariadne successfully synced
|
||||
the k3s server token into Vault.
|
||||
"""
|
||||
|
||||
def _job_payload(self, job_name: str) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"apiVersion": "batch/v1",
|
||||
"kind": "Job",
|
||||
"metadata": {
|
||||
"name": job_name,
|
||||
"namespace": settings.metis_token_sync_namespace,
|
||||
"labels": {
|
||||
"app": "metis-k3s-token-sync",
|
||||
"atlas.bstein.dev/trigger": "ariadne",
|
||||
},
|
||||
},
|
||||
"spec": {
|
||||
"backoffLimit": 1,
|
||||
"ttlSecondsAfterFinished": settings.metis_token_sync_job_ttl_sec,
|
||||
"template": {
|
||||
"spec": {
|
||||
"serviceAccountName": settings.metis_token_sync_service_account,
|
||||
"restartPolicy": "OnFailure",
|
||||
"nodeName": settings.metis_token_sync_node_name,
|
||||
"tolerations": [
|
||||
{
|
||||
"key": "node-role.kubernetes.io/control-plane",
|
||||
"operator": "Exists",
|
||||
"effect": "NoSchedule",
|
||||
},
|
||||
{
|
||||
"key": "node-role.kubernetes.io/master",
|
||||
"operator": "Exists",
|
||||
"effect": "NoSchedule",
|
||||
},
|
||||
],
|
||||
"containers": [
|
||||
{
|
||||
"name": "sync",
|
||||
"image": settings.metis_token_sync_image,
|
||||
"imagePullPolicy": "IfNotPresent",
|
||||
"command": ["/bin/sh", "-c"],
|
||||
"args": [_SYNC_SCRIPT],
|
||||
"env": [
|
||||
{"name": "VAULT_ADDR", "value": settings.metis_token_sync_vault_addr},
|
||||
{
|
||||
"name": "VAULT_K8S_ROLE",
|
||||
"value": settings.metis_token_sync_vault_k8s_role,
|
||||
},
|
||||
],
|
||||
"securityContext": {"runAsUser": 0},
|
||||
"volumeMounts": [
|
||||
{
|
||||
"name": "k3s-server",
|
||||
"mountPath": "/host/var/lib/rancher/k3s/server",
|
||||
"readOnly": True,
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
"volumes": [
|
||||
{
|
||||
"name": "k3s-server",
|
||||
"hostPath": {"path": "/var/lib/rancher/k3s/server"},
|
||||
}
|
||||
],
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
return payload
|
||||
|
||||
def _wait_for_completion(self, job_name: str, timeout_sec: float) -> MetisTokenSyncResult:
|
||||
deadline = time.time() + timeout_sec
|
||||
while time.time() < deadline:
|
||||
job = get_json(
|
||||
f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs/{job_name}"
|
||||
)
|
||||
status = job.get("status") if isinstance(job.get("status"), dict) else {}
|
||||
if int(status.get("succeeded") or 0) > 0:
|
||||
return MetisTokenSyncResult(job=job_name, status="ok")
|
||||
if int(status.get("failed") or 0) > 0:
|
||||
return MetisTokenSyncResult(job=job_name, status="error")
|
||||
time.sleep(2)
|
||||
return MetisTokenSyncResult(job=job_name, status="running")
|
||||
|
||||
def run(self, wait: bool = True) -> dict[str, Any]:
|
||||
"""Launch and optionally wait on a metis token-sync job.
|
||||
|
||||
Inputs: `wait` to control synchronous verification.
|
||||
Outputs: a JSON-serializable status payload that the scheduler records in
|
||||
metrics/event history for operator visibility.
|
||||
"""
|
||||
|
||||
job_name = f"metis-k3s-token-sync-{int(time.time())}"
|
||||
created = post_json(
|
||||
f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs",
|
||||
self._job_payload(job_name),
|
||||
)
|
||||
name = created.get("metadata", {}).get("name", job_name)
|
||||
logger.info(
|
||||
"metis token sync job triggered",
|
||||
extra={"event": "metis_token_sync_trigger", "job": name},
|
||||
)
|
||||
if not wait:
|
||||
return {"job": name, "status": "queued"}
|
||||
|
||||
result = self._wait_for_completion(name, settings.metis_token_sync_wait_timeout_sec)
|
||||
if result.status != "ok":
|
||||
logger.error(
|
||||
"metis token sync job incomplete",
|
||||
extra={"event": "metis_token_sync_incomplete", "job": name, "status": result.status},
|
||||
)
|
||||
raise RuntimeError(f"metis token sync job {name} {result.status}")
|
||||
return {"job": result.job, "status": result.status}
|
||||
|
||||
|
||||
metis_token_sync = MetisTokenSyncService()
|
||||
@ -1,7 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
import re
|
||||
import time
|
||||
from typing import Any
|
||||
@ -9,7 +8,7 @@ from typing import Any
|
||||
import httpx
|
||||
import psycopg
|
||||
|
||||
from ..k8s.exec import ExecError, PodExecutor
|
||||
from ..k8s.exec import ExecError, ExecResult, PodExecutor
|
||||
from ..k8s.pods import PodSelectionError
|
||||
from ..settings import settings
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
136
ariadne/services/platform_quality_probe.py
Normal file
136
ariadne/services/platform_quality_probe.py
Normal file
@ -0,0 +1,136 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from ..k8s.client import get_json, post_json
|
||||
from ..settings import settings
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PlatformQualityProbeResult:
|
||||
"""Represent one platform-quality probe execution.
|
||||
|
||||
Inputs: Kubernetes Job completion details gathered from API polling.
|
||||
Outputs: a stable status payload for scheduler logs and metrics.
|
||||
"""
|
||||
|
||||
job: str
|
||||
status: str
|
||||
|
||||
|
||||
class PlatformQualityProbeService:
|
||||
"""Run the platform quality-suite probe as an Ariadne-owned one-shot Job.
|
||||
|
||||
Inputs: scheduler invocations plus settings that define namespace, image,
|
||||
probe script ConfigMap, and Pushgateway endpoint.
|
||||
Outputs: structured run status so operators can verify probe freshness
|
||||
without relying on standalone CronJob ownership.
|
||||
"""
|
||||
|
||||
def _job_payload(self, job_name: str) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"apiVersion": "batch/v1",
|
||||
"kind": "Job",
|
||||
"metadata": {
|
||||
"name": job_name,
|
||||
"namespace": settings.platform_quality_probe_namespace,
|
||||
"labels": {
|
||||
"app": "platform-quality-suite-probe",
|
||||
"atlas.bstein.dev/trigger": "ariadne",
|
||||
},
|
||||
},
|
||||
"spec": {
|
||||
"backoffLimit": 0,
|
||||
"ttlSecondsAfterFinished": settings.platform_quality_probe_job_ttl_sec,
|
||||
"template": {
|
||||
"metadata": {"labels": {"app": "platform-quality-suite-probe"}},
|
||||
"spec": {
|
||||
"restartPolicy": "Never",
|
||||
"containers": [
|
||||
{
|
||||
"name": "probe",
|
||||
"image": settings.platform_quality_probe_image,
|
||||
"imagePullPolicy": "IfNotPresent",
|
||||
"command": ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"],
|
||||
"env": [
|
||||
{
|
||||
"name": "PUSHGATEWAY_URL",
|
||||
"value": settings.platform_quality_probe_pushgateway_url,
|
||||
},
|
||||
{
|
||||
"name": "HTTP_TIMEOUT_SECONDS",
|
||||
"value": str(settings.platform_quality_probe_http_timeout_sec),
|
||||
},
|
||||
],
|
||||
"volumeMounts": [
|
||||
{"name": "probe-script", "mountPath": "/scripts", "readOnly": True},
|
||||
],
|
||||
}
|
||||
],
|
||||
"volumes": [
|
||||
{
|
||||
"name": "probe-script",
|
||||
"configMap": {
|
||||
"name": settings.platform_quality_probe_script_configmap,
|
||||
"defaultMode": 365,
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return payload
|
||||
|
||||
def _wait_for_completion(self, job_name: str, timeout_sec: float) -> PlatformQualityProbeResult:
|
||||
deadline = time.time() + timeout_sec
|
||||
while time.time() < deadline:
|
||||
job = get_json(
|
||||
f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs/{job_name}"
|
||||
)
|
||||
status = job.get("status") if isinstance(job.get("status"), dict) else {}
|
||||
if int(status.get("succeeded") or 0) > 0:
|
||||
return PlatformQualityProbeResult(job=job_name, status="ok")
|
||||
if int(status.get("failed") or 0) > 0:
|
||||
return PlatformQualityProbeResult(job=job_name, status="error")
|
||||
time.sleep(2)
|
||||
return PlatformQualityProbeResult(job=job_name, status="running")
|
||||
|
||||
def run(self, wait: bool = True) -> dict[str, Any]:
|
||||
"""Launch and optionally wait on the quality-suite probe job.
|
||||
|
||||
Inputs: `wait` controls whether the scheduler blocks until completion.
|
||||
Outputs: job identity and status for metrics/events so Grafana can report
|
||||
the latest probe outcome.
|
||||
"""
|
||||
|
||||
job_name = f"platform-quality-suite-probe-{int(time.time())}"
|
||||
created = post_json(
|
||||
f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs",
|
||||
self._job_payload(job_name),
|
||||
)
|
||||
name = created.get("metadata", {}).get("name", job_name)
|
||||
logger.info(
|
||||
"platform quality probe job triggered",
|
||||
extra={"event": "platform_quality_probe_trigger", "job": name},
|
||||
)
|
||||
if not wait:
|
||||
return {"job": name, "status": "queued"}
|
||||
|
||||
result = self._wait_for_completion(name, settings.platform_quality_probe_wait_timeout_sec)
|
||||
if result.status != "ok":
|
||||
logger.error(
|
||||
"platform quality probe incomplete",
|
||||
extra={"event": "platform_quality_probe_incomplete", "job": name, "status": result.status},
|
||||
)
|
||||
raise RuntimeError(f"platform quality probe job {name} {result.status}")
|
||||
return {"job": result.job, "status": result.status}
|
||||
|
||||
|
||||
platform_quality_probe = PlatformQualityProbeService()
|
||||
233
ariadne/services/soteria.py
Normal file
233
ariadne/services/soteria.py
Normal file
@ -0,0 +1,233 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from ..settings import settings
|
||||
from ..utils.logging import get_logger
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SoteriaRunSummary:
|
||||
action: str
|
||||
status: str
|
||||
endpoint: str
|
||||
attempted: int
|
||||
succeeded: int
|
||||
failed: int
|
||||
skipped: int
|
||||
detail: str = ""
|
||||
results: list[dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
|
||||
def _parse_backup_target(raw: str) -> tuple[str, str] | None:
|
||||
value = raw.strip()
|
||||
if not value:
|
||||
return None
|
||||
namespace, sep, pvc = value.partition("/")
|
||||
namespace = namespace.strip()
|
||||
pvc = pvc.strip()
|
||||
if sep != "/" or not namespace or not pvc:
|
||||
return None
|
||||
return namespace, pvc
|
||||
|
||||
|
||||
def _parse_restore_target(raw: str) -> tuple[str, str, str] | None:
|
||||
value = raw.strip()
|
||||
if not value:
|
||||
return None
|
||||
source, sep, target_pvc = value.partition("=")
|
||||
source = source.strip()
|
||||
target_pvc = target_pvc.strip()
|
||||
if sep != "=" or not source or not target_pvc:
|
||||
return None
|
||||
parsed = _parse_backup_target(source)
|
||||
if parsed is None:
|
||||
return None
|
||||
namespace, pvc = parsed
|
||||
return namespace, pvc, target_pvc
|
||||
|
||||
|
||||
class SoteriaService:
|
||||
def _backup_url(self) -> str:
|
||||
if settings.soteria_backup_url:
|
||||
return settings.soteria_backup_url
|
||||
if settings.soteria_base_url:
|
||||
return f"{settings.soteria_base_url}/v1/backup"
|
||||
return ""
|
||||
|
||||
def _restore_url(self) -> str:
|
||||
if settings.soteria_restore_test_url:
|
||||
return settings.soteria_restore_test_url
|
||||
if settings.soteria_base_url:
|
||||
return f"{settings.soteria_base_url}/v1/restore-test"
|
||||
return ""
|
||||
|
||||
def ready_for_backup(self) -> bool:
|
||||
return bool(self._backup_url() and settings.soteria_backup_targets)
|
||||
|
||||
def ready_for_restore_tests(self) -> bool:
|
||||
return bool(self._restore_url() and settings.soteria_restore_test_targets)
|
||||
|
||||
def _finish(
|
||||
self,
|
||||
action: str,
|
||||
status: str,
|
||||
endpoint: str,
|
||||
attempted: int,
|
||||
succeeded: int,
|
||||
failed: int,
|
||||
skipped: int,
|
||||
detail: str,
|
||||
results: list[dict[str, Any]],
|
||||
) -> SoteriaRunSummary:
|
||||
summary = SoteriaRunSummary(
|
||||
action=action,
|
||||
status=status,
|
||||
endpoint=endpoint,
|
||||
attempted=attempted,
|
||||
succeeded=succeeded,
|
||||
failed=failed,
|
||||
skipped=skipped,
|
||||
detail=detail,
|
||||
results=results,
|
||||
)
|
||||
logger.info(
|
||||
"soteria schedule finished",
|
||||
extra={
|
||||
"event": f"soteria_{action}",
|
||||
"status": summary.status,
|
||||
"attempted": summary.attempted,
|
||||
"succeeded": summary.succeeded,
|
||||
"failed": summary.failed,
|
||||
"skipped": summary.skipped,
|
||||
"detail": summary.detail,
|
||||
},
|
||||
)
|
||||
return summary
|
||||
|
||||
def run_scheduled_backups(self) -> SoteriaRunSummary:
|
||||
endpoint = self._backup_url()
|
||||
if not endpoint:
|
||||
return self._finish("backup", "skipped", "", 0, 0, 0, 0, "soteria backup url not configured", [])
|
||||
if not settings.soteria_backup_targets:
|
||||
return self._finish("backup", "skipped", endpoint, 0, 0, 0, 0, "no soteria backup targets configured", [])
|
||||
|
||||
attempted = 0
|
||||
succeeded = 0
|
||||
failed = 0
|
||||
skipped = 0
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
for target in settings.soteria_backup_targets:
|
||||
parsed = _parse_backup_target(target)
|
||||
if parsed is None:
|
||||
skipped += 1
|
||||
results.append({"target": target, "status": "skipped", "detail": "invalid target format"})
|
||||
continue
|
||||
namespace, pvc = parsed
|
||||
payload = {
|
||||
"namespace": namespace,
|
||||
"pvc": pvc,
|
||||
"tags": ["trigger=ariadne", "reason=schedule_backup"],
|
||||
"dry_run": False,
|
||||
}
|
||||
attempted += 1
|
||||
try:
|
||||
with httpx.Client(timeout=settings.soteria_timeout_sec, follow_redirects=True) as client:
|
||||
response = client.post(endpoint, json=payload)
|
||||
response.raise_for_status()
|
||||
body = response.json()
|
||||
succeeded += 1
|
||||
results.append(
|
||||
{
|
||||
"target": target,
|
||||
"status": "ok",
|
||||
"backup": body.get("backup", ""),
|
||||
"volume": body.get("volume", ""),
|
||||
}
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
failed += 1
|
||||
results.append({"target": target, "status": "error", "detail": str(exc).strip() or "backup failed"})
|
||||
|
||||
status = "ok"
|
||||
detail = ""
|
||||
if failed > 0:
|
||||
status = "error"
|
||||
detail = f"{failed} backup target(s) failed"
|
||||
elif succeeded == 0:
|
||||
status = "skipped"
|
||||
detail = "no valid backup targets processed"
|
||||
|
||||
return self._finish("backup", status, endpoint, attempted, succeeded, failed, skipped, detail, results)
|
||||
|
||||
def run_scheduled_restore_tests(self) -> SoteriaRunSummary:
|
||||
endpoint = self._restore_url()
|
||||
if not endpoint:
|
||||
return self._finish(
|
||||
"restore_test", "skipped", "", 0, 0, 0, 0, "soteria restore-test url not configured", []
|
||||
)
|
||||
if not settings.soteria_restore_test_targets:
|
||||
return self._finish(
|
||||
"restore_test", "skipped", endpoint, 0, 0, 0, 0, "no soteria restore-test targets configured", []
|
||||
)
|
||||
|
||||
attempted = 0
|
||||
succeeded = 0
|
||||
failed = 0
|
||||
skipped = 0
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
for target in settings.soteria_restore_test_targets:
|
||||
parsed = _parse_restore_target(target)
|
||||
if parsed is None:
|
||||
skipped += 1
|
||||
results.append({"target": target, "status": "skipped", "detail": "invalid target format"})
|
||||
continue
|
||||
namespace, pvc, target_pvc = parsed
|
||||
payload = {
|
||||
"namespace": namespace,
|
||||
"pvc": pvc,
|
||||
"target_pvc": target_pvc,
|
||||
"snapshot": "latest",
|
||||
"dry_run": False,
|
||||
}
|
||||
attempted += 1
|
||||
try:
|
||||
with httpx.Client(timeout=settings.soteria_timeout_sec, follow_redirects=True) as client:
|
||||
response = client.post(endpoint, json=payload)
|
||||
response.raise_for_status()
|
||||
body = response.json()
|
||||
succeeded += 1
|
||||
results.append(
|
||||
{
|
||||
"target": target,
|
||||
"status": "ok",
|
||||
"volume": body.get("volume", ""),
|
||||
"backup_url": body.get("backup_url", ""),
|
||||
}
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
failed += 1
|
||||
results.append({"target": target, "status": "error", "detail": str(exc).strip() or "restore test failed"})
|
||||
|
||||
status = "ok"
|
||||
detail = ""
|
||||
if failed > 0:
|
||||
status = "error"
|
||||
detail = f"{failed} restore target(s) failed"
|
||||
elif succeeded == 0:
|
||||
status = "skipped"
|
||||
detail = "no valid restore targets processed"
|
||||
|
||||
return self._finish("restore_test", status, endpoint, attempted, succeeded, failed, skipped, detail, results)
|
||||
|
||||
|
||||
soteria = SoteriaService()
|
||||
|
||||
@ -161,6 +161,13 @@ class Settings:
|
||||
image_sweeper_service_account: str
|
||||
image_sweeper_job_ttl_sec: int
|
||||
image_sweeper_wait_timeout_sec: float
|
||||
platform_quality_probe_namespace: str
|
||||
platform_quality_probe_script_configmap: str
|
||||
platform_quality_probe_image: str
|
||||
platform_quality_probe_job_ttl_sec: int
|
||||
platform_quality_probe_wait_timeout_sec: float
|
||||
platform_quality_probe_pushgateway_url: str
|
||||
platform_quality_probe_http_timeout_sec: int
|
||||
|
||||
vaultwarden_namespace: str
|
||||
vaultwarden_pod_label: str
|
||||
@ -217,6 +224,24 @@ class Settings:
|
||||
metis_watch_url: str
|
||||
metis_timeout_sec: float
|
||||
metis_sentinel_watch_cron: str
|
||||
metis_token_sync_namespace: str
|
||||
metis_token_sync_service_account: str
|
||||
metis_token_sync_node_name: str
|
||||
metis_token_sync_image: str
|
||||
metis_token_sync_job_ttl_sec: int
|
||||
metis_token_sync_wait_timeout_sec: float
|
||||
metis_token_sync_vault_addr: str
|
||||
metis_token_sync_vault_k8s_role: str
|
||||
metis_k3s_token_sync_cron: str
|
||||
platform_quality_suite_probe_cron: str
|
||||
soteria_base_url: str
|
||||
soteria_backup_url: str
|
||||
soteria_restore_test_url: str
|
||||
soteria_timeout_sec: float
|
||||
soteria_backup_targets: list[str]
|
||||
soteria_restore_test_targets: list[str]
|
||||
soteria_backup_cron: str
|
||||
soteria_restore_test_cron: str
|
||||
|
||||
opensearch_url: str
|
||||
opensearch_limit_bytes: int
|
||||
@ -424,6 +449,24 @@ class Settings:
|
||||
"image_sweeper_wait_timeout_sec": _env_float("IMAGE_SWEEPER_WAIT_TIMEOUT_SEC", 1200.0),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _platform_quality_probe_config(cls) -> dict[str, Any]:
|
||||
return {
|
||||
"platform_quality_probe_namespace": _env("PLATFORM_QUALITY_PROBE_NAMESPACE", "monitoring"),
|
||||
"platform_quality_probe_script_configmap": _env(
|
||||
"PLATFORM_QUALITY_PROBE_SCRIPT_CONFIGMAP",
|
||||
"platform-quality-suite-probe-script",
|
||||
),
|
||||
"platform_quality_probe_image": _env("PLATFORM_QUALITY_PROBE_IMAGE", "curlimages/curl:8.12.1"),
|
||||
"platform_quality_probe_job_ttl_sec": _env_int("PLATFORM_QUALITY_PROBE_JOB_TTL_SEC", 1800),
|
||||
"platform_quality_probe_wait_timeout_sec": _env_float("PLATFORM_QUALITY_PROBE_WAIT_TIMEOUT_SEC", 180.0),
|
||||
"platform_quality_probe_pushgateway_url": _env(
|
||||
"PLATFORM_QUALITY_PROBE_PUSHGATEWAY_URL",
|
||||
"http://platform-quality-gateway.monitoring.svc.cluster.local:9091",
|
||||
).rstrip("/"),
|
||||
"platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _vaultwarden_config(cls) -> dict[str, Any]:
|
||||
return {
|
||||
@ -465,6 +508,11 @@ class Settings:
|
||||
"comms_reset_room_cron": _env("ARIADNE_SCHEDULE_COMMS_RESET_ROOM", "0 0 1 1 *"),
|
||||
"comms_seed_room_cron": _env("ARIADNE_SCHEDULE_COMMS_SEED_ROOM", "*/10 * * * *"),
|
||||
"keycloak_profile_cron": _env("ARIADNE_SCHEDULE_KEYCLOAK_PROFILE", "0 */6 * * *"),
|
||||
"metis_k3s_token_sync_cron": _env("ARIADNE_SCHEDULE_METIS_K3S_TOKEN_SYNC", "11 */6 * * *"),
|
||||
"platform_quality_suite_probe_cron": _env(
|
||||
"ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE",
|
||||
"*/15 * * * *",
|
||||
),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
@ -487,6 +535,34 @@ class Settings:
|
||||
"metis_watch_url": _env("METIS_WATCH_URL", "").rstrip("/"),
|
||||
"metis_timeout_sec": _env_float("METIS_TIMEOUT_SEC", 10.0),
|
||||
"metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"),
|
||||
"metis_token_sync_namespace": _env("METIS_TOKEN_SYNC_NAMESPACE", "maintenance"),
|
||||
"metis_token_sync_service_account": _env("METIS_TOKEN_SYNC_SERVICE_ACCOUNT", "metis-token-sync"),
|
||||
"metis_token_sync_node_name": _env("METIS_TOKEN_SYNC_NODE_NAME", "titan-0a"),
|
||||
"metis_token_sync_image": _env("METIS_TOKEN_SYNC_IMAGE", "hashicorp/vault:1.17.6"),
|
||||
"metis_token_sync_job_ttl_sec": _env_int("METIS_TOKEN_SYNC_JOB_TTL_SEC", 1800),
|
||||
"metis_token_sync_wait_timeout_sec": _env_float("METIS_TOKEN_SYNC_WAIT_TIMEOUT_SEC", 180.0),
|
||||
"metis_token_sync_vault_addr": _env(
|
||||
"METIS_TOKEN_SYNC_VAULT_ADDR",
|
||||
"http://vault.vault.svc.cluster.local:8200",
|
||||
).rstrip("/"),
|
||||
"metis_token_sync_vault_k8s_role": _env("METIS_TOKEN_SYNC_VAULT_K8S_ROLE", "maintenance-metis-token-sync"),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _soteria_config(cls) -> dict[str, Any]:
|
||||
backup_targets = [value.strip() for value in _env("SOTERIA_BACKUP_TARGETS", "").split(",") if value.strip()]
|
||||
restore_targets = [
|
||||
value.strip() for value in _env("SOTERIA_RESTORE_TEST_TARGETS", "").split(",") if value.strip()
|
||||
]
|
||||
return {
|
||||
"soteria_base_url": _env("SOTERIA_BASE_URL", "http://soteria.maintenance.svc.cluster.local").rstrip("/"),
|
||||
"soteria_backup_url": _env("SOTERIA_BACKUP_URL", "").rstrip("/"),
|
||||
"soteria_restore_test_url": _env("SOTERIA_RESTORE_TEST_URL", "").rstrip("/"),
|
||||
"soteria_timeout_sec": _env_float("SOTERIA_TIMEOUT_SEC", 15.0),
|
||||
"soteria_backup_targets": backup_targets,
|
||||
"soteria_restore_test_targets": restore_targets,
|
||||
"soteria_backup_cron": _env("ARIADNE_SCHEDULE_SOTERIA_BACKUP", "45 2 * * *"),
|
||||
"soteria_restore_test_cron": _env("ARIADNE_SCHEDULE_SOTERIA_RESTORE_TEST", "15 4 * * 0"),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
@ -513,10 +589,12 @@ class Settings:
|
||||
vault_cfg = cls._vault_config()
|
||||
comms_cfg = cls._comms_config()
|
||||
image_cfg = cls._image_sweeper_config()
|
||||
platform_quality_probe_cfg = cls._platform_quality_probe_config()
|
||||
vaultwarden_cfg = cls._vaultwarden_config()
|
||||
schedule_cfg = cls._schedule_config()
|
||||
cluster_cfg = cls._cluster_state_config()
|
||||
metis_cfg = cls._metis_config()
|
||||
soteria_cfg = cls._soteria_config()
|
||||
opensearch_cfg = cls._opensearch_config()
|
||||
|
||||
portal_db = _env("PORTAL_DATABASE_URL", "")
|
||||
@ -552,10 +630,12 @@ class Settings:
|
||||
**vault_cfg,
|
||||
**comms_cfg,
|
||||
**image_cfg,
|
||||
**platform_quality_probe_cfg,
|
||||
**vaultwarden_cfg,
|
||||
**schedule_cfg,
|
||||
**cluster_cfg,
|
||||
**metis_cfg,
|
||||
**soteria_cfg,
|
||||
**opensearch_cfg,
|
||||
)
|
||||
|
||||
|
||||
68
quality_gate.toml
Normal file
68
quality_gate.toml
Normal file
@ -0,0 +1,68 @@
|
||||
[files]
|
||||
roots = ["ariadne", "tests", "scripts"]
|
||||
max_lines = 500
|
||||
|
||||
[docstrings]
|
||||
roots = ["ariadne", "scripts"]
|
||||
non_trivial_min_lines = 6
|
||||
|
||||
[coverage]
|
||||
roots = ["ariadne"]
|
||||
threshold = 95.0
|
||||
targets = [
|
||||
"ariadne/auth/keycloak.py",
|
||||
"ariadne/db/database.py",
|
||||
"ariadne/k8s/client.py",
|
||||
"ariadne/k8s/pods.py",
|
||||
"ariadne/metrics/metrics.py",
|
||||
"ariadne/migrate.py",
|
||||
"ariadne/utils/errors.py",
|
||||
"ariadne/utils/http.py",
|
||||
"ariadne/utils/logging.py",
|
||||
"ariadne/utils/name_generator.py",
|
||||
"ariadne/utils/passwords.py",
|
||||
]
|
||||
|
||||
[legacy.line_count]
|
||||
"ariadne/app.py" = 996
|
||||
"ariadne/manager/provisioning.py" = 933
|
||||
"ariadne/services/cluster_state.py" = 3705
|
||||
"ariadne/services/comms.py" = 943
|
||||
"ariadne/services/firefly.py" = 667
|
||||
"ariadne/services/nextcloud.py" = 698
|
||||
"ariadne/services/vault.py" = 558
|
||||
"ariadne/services/wger.py" = 624
|
||||
"ariadne/settings.py" = 590
|
||||
"tests/test_app.py" = 1001
|
||||
"tests/test_keycloak_admin.py" = 679
|
||||
"tests/test_provisioning.py" = 1762
|
||||
"tests/test_services.py" = 1483
|
||||
|
||||
[legacy.docstrings]
|
||||
"ariadne/app.py" = 15
|
||||
"ariadne/db/storage.py" = 4
|
||||
"ariadne/manager/provisioning.py" = 2
|
||||
"ariadne/scheduler/cron.py" = 1
|
||||
"ariadne/services/cluster_state.py" = 4
|
||||
"ariadne/services/comms.py" = 2
|
||||
"ariadne/services/firefly.py" = 4
|
||||
"ariadne/services/image_sweeper.py" = 1
|
||||
"ariadne/services/keycloak_admin.py" = 1
|
||||
"ariadne/services/keycloak_profile.py" = 2
|
||||
"ariadne/services/mailer.py" = 1
|
||||
"ariadne/services/mailu.py" = 5
|
||||
"ariadne/services/mailu_events.py" = 1
|
||||
"ariadne/services/metis.py" = 1
|
||||
"ariadne/services/nextcloud.py" = 3
|
||||
"ariadne/services/opensearch_prune.py" = 2
|
||||
"ariadne/services/pod_cleaner.py" = 1
|
||||
"ariadne/services/soteria.py" = 2
|
||||
"ariadne/services/vault.py" = 2
|
||||
"ariadne/services/vaultwarden.py" = 1
|
||||
"ariadne/services/vaultwarden_sync.py" = 4
|
||||
"ariadne/services/wger.py" = 4
|
||||
"ariadne/settings.py" = 1
|
||||
"ariadne/utils/errors.py" = 1
|
||||
"ariadne/utils/http.py" = 1
|
||||
"ariadne/utils/logging.py" = 3
|
||||
"ariadne/utils/name_generator.py" = 1
|
||||
407
scripts/check_quality_gate.py
Executable file
407
scripts/check_quality_gate.py
Executable file
@ -0,0 +1,407 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Enforce Ariadne's ratcheting test-quality gate.
|
||||
|
||||
Inputs: repository Python files, optional coverage JSON, and a TOML config that
|
||||
captures the current legacy exceptions.
|
||||
Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or
|
||||
coverage requirements regress, so CI can block quality drift while allowing
|
||||
incremental cleanup.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
import tomllib
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DefinitionFinding:
|
||||
"""Describe a public definition missing a required docstring.
|
||||
|
||||
Inputs: the symbol kind, name, source line, and logical size.
|
||||
Outputs: a compact record that makes docstring failures actionable in CLI
|
||||
output and in the JSON quality report.
|
||||
"""
|
||||
|
||||
kind: str
|
||||
name: str
|
||||
lineno: int
|
||||
length: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Violation:
|
||||
"""Represent a single quality-gate violation.
|
||||
|
||||
Inputs: the violated check name, file path, and a human-readable message.
|
||||
Outputs: a normalized record for console rendering and JSON serialization so
|
||||
Jenkins and local runs report the same facts.
|
||||
"""
|
||||
|
||||
check: str
|
||||
path: str
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QualityConfig:
|
||||
"""Typed quality-gate settings loaded from TOML.
|
||||
|
||||
Inputs: parsed configuration sections for file-size, docstrings, and
|
||||
coverage enforcement.
|
||||
Outputs: one immutable object so the gate logic stays deterministic and easy
|
||||
to test.
|
||||
"""
|
||||
|
||||
line_roots: tuple[str, ...]
|
||||
max_lines: int
|
||||
legacy_max_lines: dict[str, int]
|
||||
docstring_roots: tuple[str, ...]
|
||||
non_trivial_min_lines: int
|
||||
legacy_missing_docstrings: dict[str, int]
|
||||
coverage_roots: tuple[str, ...]
|
||||
coverage_targets: tuple[str, ...]
|
||||
coverage_threshold: float
|
||||
|
||||
|
||||
def _load_config(path: Path) -> QualityConfig:
|
||||
"""Load the quality-gate config from TOML.
|
||||
|
||||
Inputs: the path to the repository-local TOML config file.
|
||||
Outputs: validated `QualityConfig` values used by every gate check so local
|
||||
runs and Jenkins share the same policy.
|
||||
"""
|
||||
|
||||
payload = tomllib.loads(path.read_text(encoding="utf-8"))
|
||||
files = payload.get("files") or {}
|
||||
docstrings = payload.get("docstrings") or {}
|
||||
coverage = payload.get("coverage") or {}
|
||||
legacy = payload.get("legacy") or {}
|
||||
return QualityConfig(
|
||||
line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")),
|
||||
max_lines=int(files.get("max_lines", 500)),
|
||||
legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()},
|
||||
docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")),
|
||||
non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)),
|
||||
legacy_missing_docstrings={
|
||||
str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items()
|
||||
},
|
||||
coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)),
|
||||
coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()),
|
||||
coverage_threshold=float(coverage.get("threshold", 95.0)),
|
||||
)
|
||||
|
||||
|
||||
def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]:
|
||||
"""Collect Python files under the configured roots.
|
||||
|
||||
Inputs: the repository root plus the roots to scan.
|
||||
Outputs: sorted Python paths so the gate produces stable results and diffs.
|
||||
"""
|
||||
|
||||
files: list[Path] = []
|
||||
for root in roots:
|
||||
base = repo_root / root
|
||||
if not base.exists():
|
||||
continue
|
||||
files.extend(sorted(base.rglob("*.py")))
|
||||
return sorted({path for path in files})
|
||||
|
||||
|
||||
def _relative(path: Path, repo_root: Path) -> str:
|
||||
return path.relative_to(repo_root).as_posix()
|
||||
|
||||
|
||||
def _line_count(path: Path) -> int:
|
||||
return len(path.read_text(encoding="utf-8").splitlines())
|
||||
|
||||
|
||||
def _definition_length(node: ast.AST) -> int:
|
||||
end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0)
|
||||
return max(end_lineno - getattr(node, "lineno", 0) + 1, 1)
|
||||
|
||||
|
||||
def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]:
|
||||
"""Find public top-level definitions missing required docstrings.
|
||||
|
||||
Inputs: a Python file path and the minimum logical size considered
|
||||
non-trivial.
|
||||
Outputs: missing-docstring findings so the gate can ratchet legacy files
|
||||
while blocking new undocumented public APIs.
|
||||
"""
|
||||
|
||||
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||||
findings: list[DefinitionFinding] = []
|
||||
for node in module.body:
|
||||
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||||
continue
|
||||
if node.name.startswith("_"):
|
||||
continue
|
||||
length = _definition_length(node)
|
||||
if length < min_lines:
|
||||
continue
|
||||
if ast.get_docstring(node) is not None:
|
||||
continue
|
||||
findings.append(
|
||||
DefinitionFinding(
|
||||
kind=type(node).__name__,
|
||||
name=node.name,
|
||||
lineno=getattr(node, "lineno", 1),
|
||||
length=length,
|
||||
)
|
||||
)
|
||||
return findings
|
||||
|
||||
|
||||
def _excluded_coverage_lines(path: Path) -> set[int]:
|
||||
"""Collect non-executable lines that Slipcover still reports as missing.
|
||||
|
||||
Inputs: a Python source file path.
|
||||
Outputs: line numbers for multiline definition headers and docstring blocks
|
||||
so adjusted per-file coverage tracks executable logic rather than syntax
|
||||
scaffolding required for readability.
|
||||
"""
|
||||
|
||||
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||||
excluded: set[int] = set()
|
||||
|
||||
def visit(node: ast.AST) -> None:
|
||||
if isinstance(node, ast.Module):
|
||||
for child in node.body:
|
||||
visit(child)
|
||||
return
|
||||
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||||
return
|
||||
if node.body:
|
||||
body_start = node.body[0].lineno
|
||||
excluded.update(range(node.lineno + 1, body_start))
|
||||
first = node.body[0]
|
||||
if (
|
||||
isinstance(first, ast.Expr)
|
||||
and isinstance(first.value, ast.Constant)
|
||||
and isinstance(first.value.value, str)
|
||||
):
|
||||
excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1))
|
||||
for child in node.body:
|
||||
visit(child)
|
||||
|
||||
visit(module)
|
||||
return excluded
|
||||
|
||||
|
||||
def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]:
|
||||
"""Read per-file coverage details from Slipcover JSON output.
|
||||
|
||||
Inputs: the optional coverage artifact path produced by the test run.
|
||||
Outputs: raw and adjusted per-file coverage details so the gate can enforce
|
||||
realistic thresholds even when Slipcover counts docstrings and wrapped
|
||||
signatures as missing lines.
|
||||
"""
|
||||
|
||||
if path is None or not path.exists():
|
||||
return {}
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
files = payload.get("files") or {}
|
||||
coverage: dict[str, dict[str, Any]] = {}
|
||||
for name, data in files.items():
|
||||
if not isinstance(data, dict):
|
||||
continue
|
||||
summary = data.get("summary") or {}
|
||||
percent = summary.get("percent_covered")
|
||||
executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)}
|
||||
missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)}
|
||||
relative = str(name)
|
||||
source_path = repo_root / relative
|
||||
excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set()
|
||||
adjusted_missing = missing_lines - excluded_lines
|
||||
adjusted_total = len(executed_lines) + len(adjusted_missing)
|
||||
adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0
|
||||
coverage[relative] = {
|
||||
"raw_percent": float(percent) if isinstance(percent, (int, float)) else None,
|
||||
"adjusted_percent": adjusted_percent,
|
||||
"excluded_lines": sorted(excluded_lines),
|
||||
}
|
||||
return coverage
|
||||
|
||||
|
||||
def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]:
|
||||
return [{"check": item.check, "path": item.path, "message": item.message} for item in violations]
|
||||
|
||||
|
||||
def _build_report(
|
||||
repo_root: Path,
|
||||
config: QualityConfig,
|
||||
coverage: dict[str, dict[str, Any]],
|
||||
coverage_artifact_present: bool,
|
||||
) -> dict[str, Any]:
|
||||
"""Run all configured checks and build a JSON-serializable report.
|
||||
|
||||
Inputs: repository paths, quality-gate config, and per-file coverage data.
|
||||
Outputs: a complete report for CI artifacts, metrics publication, and local
|
||||
debugging when the gate fails.
|
||||
"""
|
||||
|
||||
violations: list[Violation] = []
|
||||
files_report: dict[str, dict[str, Any]] = {}
|
||||
|
||||
for path in _iter_python_files(repo_root, config.line_roots):
|
||||
relative = _relative(path, repo_root)
|
||||
lines = _line_count(path)
|
||||
entry = files_report.setdefault(relative, {})
|
||||
entry["lines"] = lines
|
||||
entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines)
|
||||
entry["line_limit_legacy"] = relative in config.legacy_max_lines
|
||||
if lines > entry["line_limit"]:
|
||||
violations.append(
|
||||
Violation(
|
||||
"line_count",
|
||||
relative,
|
||||
f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}",
|
||||
)
|
||||
)
|
||||
|
||||
for path in _iter_python_files(repo_root, config.docstring_roots):
|
||||
relative = _relative(path, repo_root)
|
||||
findings = _missing_docstrings(path, config.non_trivial_min_lines)
|
||||
entry = files_report.setdefault(relative, {})
|
||||
entry["missing_docstrings"] = len(findings)
|
||||
entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0)
|
||||
entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings
|
||||
entry["missing_docstring_symbols"] = [
|
||||
{
|
||||
"kind": finding.kind,
|
||||
"name": finding.name,
|
||||
"lineno": finding.lineno,
|
||||
"length": finding.length,
|
||||
}
|
||||
for finding in findings
|
||||
]
|
||||
if len(findings) > entry["missing_docstrings_allowed"]:
|
||||
excess = findings[entry["missing_docstrings_allowed"] :]
|
||||
for finding in excess:
|
||||
violations.append(
|
||||
Violation(
|
||||
"docstrings",
|
||||
relative,
|
||||
f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}",
|
||||
)
|
||||
)
|
||||
|
||||
coverage_target_set = set(config.coverage_targets)
|
||||
coverage_root_files = {
|
||||
_relative(path, repo_root)
|
||||
for path in _iter_python_files(repo_root, config.coverage_roots)
|
||||
}
|
||||
for relative in sorted(coverage_root_files):
|
||||
entry = files_report.setdefault(relative, {})
|
||||
details = coverage.get(relative) or {}
|
||||
value = details.get("adjusted_percent")
|
||||
entry["coverage_percent"] = value
|
||||
entry["coverage_raw_percent"] = details.get("raw_percent")
|
||||
entry["coverage_excluded_lines"] = details.get("excluded_lines") or []
|
||||
entry["coverage_enforced"] = relative in coverage_target_set
|
||||
if relative not in coverage_target_set:
|
||||
continue
|
||||
entry["coverage_target"] = config.coverage_threshold
|
||||
if value is None:
|
||||
violations.append(
|
||||
Violation("coverage", relative, f"missing coverage data for {relative}")
|
||||
)
|
||||
continue
|
||||
if value < config.coverage_threshold:
|
||||
violations.append(
|
||||
Violation(
|
||||
"coverage",
|
||||
relative,
|
||||
f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%",
|
||||
)
|
||||
)
|
||||
|
||||
if coverage_target_set and not coverage_artifact_present:
|
||||
violations.append(
|
||||
Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets")
|
||||
)
|
||||
|
||||
summary = {
|
||||
"violations_total": len(violations),
|
||||
"line_count_violations": sum(item.check == "line_count" for item in violations),
|
||||
"docstring_violations": sum(item.check == "docstrings" for item in violations),
|
||||
"coverage_violations": sum(item.check == "coverage" for item in violations),
|
||||
"legacy_line_count_files": len(config.legacy_max_lines),
|
||||
"legacy_docstring_files": len(config.legacy_missing_docstrings),
|
||||
"coverage_targets": len(coverage_target_set),
|
||||
"coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0),
|
||||
}
|
||||
return {
|
||||
"status": "ok" if not violations else "failed",
|
||||
"rules": {
|
||||
"max_lines": config.max_lines,
|
||||
"docstring_non_trivial_min_lines": config.non_trivial_min_lines,
|
||||
"coverage_threshold": config.coverage_threshold,
|
||||
},
|
||||
"summary": summary,
|
||||
"violations": _serialize_violations(violations),
|
||||
"files": dict(sorted(files_report.items())),
|
||||
}
|
||||
|
||||
|
||||
def _print_report(report: dict[str, Any]) -> None:
|
||||
"""Render a concise CLI summary for local and Jenkins logs.
|
||||
|
||||
Inputs: the JSON-ready report produced by the gate.
|
||||
Outputs: human-readable lines that point directly at each violation so a
|
||||
failing build is easy to fix.
|
||||
"""
|
||||
|
||||
print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True))
|
||||
for violation in report.get("violations") or []:
|
||||
print(f"[{violation['check']}] {violation['path']}: {violation['message']}")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
"""Parse CLI arguments for the quality gate.
|
||||
|
||||
Inputs: command-line flags supplied by Jenkins or a local developer.
|
||||
Outputs: normalized paths and options so the gate stays scriptable and
|
||||
predictable in every environment.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config")
|
||||
parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output")
|
||||
parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run the Ariadne quality gate and write its JSON report.
|
||||
|
||||
Inputs: CLI arguments naming the config and optional coverage artifact.
|
||||
Outputs: a persisted JSON report and a process exit code that Jenkins can
|
||||
use to enforce quality rules.
|
||||
"""
|
||||
|
||||
args = parse_args()
|
||||
repo_root = Path.cwd()
|
||||
config_path = repo_root / args.config
|
||||
output_path = repo_root / args.output
|
||||
coverage_path = repo_root / args.coverage_json
|
||||
|
||||
config = _load_config(config_path)
|
||||
coverage_present = coverage_path.exists()
|
||||
coverage = _load_coverage(coverage_path if coverage_present else None, repo_root)
|
||||
report = _build_report(repo_root, config, coverage, coverage_present)
|
||||
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||
_print_report(report)
|
||||
return 0 if report["status"] == "ok" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
284
scripts/publish_test_metrics.py
Normal file → Executable file
284
scripts/publish_test_metrics.py
Normal file → Executable file
@ -1,35 +1,74 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Publish Ariadne quality-gate test metrics to Pushgateway (Prometheus ingest)."""
|
||||
"""Publish Ariadne test and quality-gate metrics to Pushgateway.
|
||||
|
||||
Inputs: build artifacts such as JUnit XML, Slipcover coverage JSON, optional
|
||||
quality-gate JSON, and standard Jenkins metadata from the environment.
|
||||
Outputs: Prometheus metric lines pushed to Pushgateway so Grafana can chart test
|
||||
health, quality-gate drift, and build context even when a build fails early.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
import urllib.request
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
def _escape_label(value: str) -> str:
|
||||
"""Escape a Prometheus label value.
|
||||
|
||||
Inputs: an arbitrary label string from build metadata.
|
||||
Outputs: a safely escaped label fragment so pushed metric lines remain valid
|
||||
Prometheus exposition format.
|
||||
"""
|
||||
|
||||
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
||||
|
||||
|
||||
def _label_str(labels: dict[str, str]) -> str:
|
||||
"""Build a Prometheus label set from non-empty values.
|
||||
|
||||
Inputs: a dictionary of label keys and raw string values.
|
||||
Outputs: a `{...}` label fragment or an empty string so callers can compose
|
||||
metrics without repeating label-filtering logic.
|
||||
"""
|
||||
|
||||
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
||||
return "{" + ",".join(parts) + "}" if parts else ""
|
||||
|
||||
|
||||
def _load_coverage(path: str) -> float:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
payload = json.load(handle)
|
||||
def _load_coverage(path: Path) -> float | None:
|
||||
"""Read the overall Slipcover percentage if the artifact exists.
|
||||
|
||||
Inputs: the expected coverage artifact path.
|
||||
Outputs: the percent covered value, or `None` when the artifact is missing or
|
||||
malformed so failed builds can still publish partial metrics.
|
||||
"""
|
||||
|
||||
if not path.exists():
|
||||
return None
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
summary = payload.get("summary") or {}
|
||||
percent = summary.get("percent_covered")
|
||||
if isinstance(percent, (int, float)):
|
||||
return float(percent)
|
||||
raise RuntimeError("coverage summary missing percent_covered")
|
||||
return None
|
||||
|
||||
|
||||
def _load_junit(path: str) -> dict[str, int]:
|
||||
def _load_junit(path: Path) -> dict[str, int] | None:
|
||||
"""Aggregate JUnit totals if the artifact exists.
|
||||
|
||||
Inputs: the expected JUnit XML path.
|
||||
Outputs: test totals by outcome, or `None` when the artifact is absent so the
|
||||
publisher can still emit build and gate status metrics.
|
||||
"""
|
||||
|
||||
if not path.exists():
|
||||
return None
|
||||
|
||||
tree = ET.parse(path)
|
||||
root = tree.getroot()
|
||||
|
||||
@ -57,6 +96,20 @@ def _load_junit(path: str) -> dict[str, int]:
|
||||
return totals
|
||||
|
||||
|
||||
def _load_quality_gate(path: Path) -> dict[str, Any] | None:
|
||||
"""Load the optional quality-gate JSON report.
|
||||
|
||||
Inputs: the expected report path from `scripts/check_quality_gate.py`.
|
||||
Outputs: the parsed report when present so metric publication can include
|
||||
violation counts and per-file gate state.
|
||||
"""
|
||||
|
||||
if not path.exists():
|
||||
return None
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
return payload if isinstance(payload, dict) else None
|
||||
|
||||
|
||||
def _read_http(url: str) -> str:
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=10) as resp:
|
||||
@ -78,6 +131,13 @@ def _post_text(url: str, payload: str) -> None:
|
||||
|
||||
|
||||
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
|
||||
"""Fetch the current counter value for a metric series.
|
||||
|
||||
Inputs: the Pushgateway base URL plus the metric name and exact labels.
|
||||
Outputs: the last published counter value so reruns can increment rather than
|
||||
overwrite the total.
|
||||
"""
|
||||
|
||||
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
|
||||
if not text:
|
||||
return 0.0
|
||||
@ -85,7 +145,7 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
|
||||
for line in text.splitlines():
|
||||
if not line.startswith(metric + "{"):
|
||||
continue
|
||||
if any(f'{k}="{v}"' not in line for k, v in labels.items()):
|
||||
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
|
||||
continue
|
||||
parts = line.split()
|
||||
if len(parts) < 2:
|
||||
@ -97,9 +157,22 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
|
||||
return 0.0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json")
|
||||
junit_path = os.getenv("JUNIT_XML", "build/junit.xml")
|
||||
def _metric_line(name: str, value: int | float, labels: dict[str, str] | None = None) -> str:
|
||||
label_text = _label_str(labels or {})
|
||||
return f"{name}{label_text} {value}"
|
||||
|
||||
|
||||
def _build_payload() -> tuple[str, dict[str, Any]]:
|
||||
"""Assemble the Pushgateway payload and a summary object.
|
||||
|
||||
Inputs: environment variables and any available build artifacts.
|
||||
Outputs: metric lines ready for Pushgateway plus a summary dict that local
|
||||
tests and Jenkins logs can inspect.
|
||||
"""
|
||||
|
||||
coverage_path = Path(os.getenv("COVERAGE_JSON", "build/coverage.json"))
|
||||
junit_path = Path(os.getenv("JUNIT_XML", "build/junit.xml"))
|
||||
quality_gate_path = Path(os.getenv("QUALITY_GATE_JSON", "build/quality-gate.json"))
|
||||
pushgateway_url = os.getenv(
|
||||
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
|
||||
).strip()
|
||||
@ -107,19 +180,17 @@ def main() -> int:
|
||||
branch = os.getenv("BRANCH_NAME", "")
|
||||
build_number = os.getenv("BUILD_NUMBER", "")
|
||||
commit = os.getenv("GIT_COMMIT", "")
|
||||
|
||||
if not os.path.exists(coverage_path):
|
||||
raise RuntimeError(f"missing coverage file {coverage_path}")
|
||||
if not os.path.exists(junit_path):
|
||||
raise RuntimeError(f"missing junit file {junit_path}")
|
||||
outcome = os.getenv("QUALITY_STATUS", "failed").strip() or "failed"
|
||||
|
||||
coverage = _load_coverage(coverage_path)
|
||||
totals = _load_junit(junit_path)
|
||||
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
|
||||
quality_gate = _load_quality_gate(quality_gate_path)
|
||||
|
||||
outcome = "ok"
|
||||
if totals["tests"] <= 0 or totals["failures"] > 0 or totals["errors"] > 0:
|
||||
outcome = "failed"
|
||||
tests_total = totals["tests"] if totals else 0
|
||||
tests_failed = totals["failures"] if totals else 0
|
||||
tests_errors = totals["errors"] if totals else 0
|
||||
tests_skipped = totals["skipped"] if totals else 0
|
||||
tests_passed = max(tests_total - tests_failed - tests_errors - tests_skipped, 0)
|
||||
|
||||
job_name = "platform-quality-ci"
|
||||
ok_count = _fetch_existing_counter(
|
||||
@ -137,52 +208,149 @@ def main() -> int:
|
||||
else:
|
||||
failed_count += 1
|
||||
|
||||
labels = {
|
||||
build_labels = {
|
||||
"suite": suite,
|
||||
"branch": branch,
|
||||
"build_number": build_number,
|
||||
"commit": commit,
|
||||
}
|
||||
payload_lines = [
|
||||
"# TYPE platform_quality_gate_runs_total counter",
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
|
||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
|
||||
"# TYPE ariadne_quality_gate_tests_total gauge",
|
||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
|
||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
|
||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
|
||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
|
||||
"# TYPE ariadne_quality_gate_coverage_percent gauge",
|
||||
f'ariadne_quality_gate_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
|
||||
"# TYPE ariadne_quality_gate_build_info gauge",
|
||||
f"ariadne_quality_gate_build_info{_label_str(labels)} 1",
|
||||
]
|
||||
payload = "\n".join(payload_lines) + "\n"
|
||||
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}", payload)
|
||||
suite_labels = {"suite": suite}
|
||||
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"suite": suite,
|
||||
"outcome": outcome,
|
||||
"tests_total": totals["tests"],
|
||||
"tests_passed": passed,
|
||||
"tests_failed": totals["failures"],
|
||||
"tests_errors": totals["errors"],
|
||||
"tests_skipped": totals["skipped"],
|
||||
"coverage_percent": round(coverage, 3),
|
||||
"ok_counter": ok_count,
|
||||
"failed_counter": failed_count,
|
||||
},
|
||||
indent=2,
|
||||
summary = (quality_gate or {}).get("summary") or {}
|
||||
files = (quality_gate or {}).get("files") or {}
|
||||
metric_lines = [
|
||||
"# TYPE platform_quality_gate_runs_total counter",
|
||||
_metric_line("platform_quality_gate_runs_total", int(ok_count), {"suite": suite, "status": "ok"}),
|
||||
_metric_line("platform_quality_gate_runs_total", int(failed_count), {"suite": suite, "status": "failed"}),
|
||||
"# TYPE ariadne_quality_gate_status gauge",
|
||||
_metric_line("ariadne_quality_gate_status", 1 if outcome == "ok" else 0, suite_labels),
|
||||
"# TYPE ariadne_quality_gate_artifact_present gauge",
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_artifact_present",
|
||||
1 if coverage_path.exists() else 0,
|
||||
{"suite": suite, "artifact": "coverage_json"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_artifact_present",
|
||||
1 if junit_path.exists() else 0,
|
||||
{"suite": suite, "artifact": "junit_xml"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_artifact_present",
|
||||
1 if quality_gate_path.exists() else 0,
|
||||
{"suite": suite, "artifact": "quality_gate_json"},
|
||||
),
|
||||
"# TYPE ariadne_quality_gate_tests_total gauge",
|
||||
_metric_line("ariadne_quality_gate_tests_total", tests_passed, {"suite": suite, "result": "passed"}),
|
||||
_metric_line("ariadne_quality_gate_tests_total", tests_failed, {"suite": suite, "result": "failed"}),
|
||||
_metric_line("ariadne_quality_gate_tests_total", tests_errors, {"suite": suite, "result": "error"}),
|
||||
_metric_line("ariadne_quality_gate_tests_total", tests_skipped, {"suite": suite, "result": "skipped"}),
|
||||
"# TYPE ariadne_quality_gate_coverage_percent gauge",
|
||||
_metric_line("ariadne_quality_gate_coverage_percent", round(coverage or 0.0, 3), suite_labels),
|
||||
"# TYPE ariadne_quality_gate_violation_total gauge",
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_violation_total",
|
||||
int(summary.get("line_count_violations", 0)),
|
||||
{"suite": suite, "check": "line_count"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_violation_total",
|
||||
int(summary.get("docstring_violations", 0)),
|
||||
{"suite": suite, "check": "docstrings"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_violation_total",
|
||||
int(summary.get("coverage_violations", 0)),
|
||||
{"suite": suite, "check": "coverage"},
|
||||
),
|
||||
"# TYPE ariadne_quality_gate_legacy_exception_total gauge",
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_legacy_exception_total",
|
||||
int(summary.get("legacy_line_count_files", 0)),
|
||||
{"suite": suite, "check": "line_count"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_legacy_exception_total",
|
||||
int(summary.get("legacy_docstring_files", 0)),
|
||||
{"suite": suite, "check": "docstrings"},
|
||||
),
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_legacy_exception_total",
|
||||
int(summary.get("coverage_exemptions", 0)),
|
||||
{"suite": suite, "check": "coverage"},
|
||||
),
|
||||
"# TYPE ariadne_quality_gate_build_info gauge",
|
||||
f"ariadne_quality_gate_build_info{_label_str(build_labels)} 1",
|
||||
]
|
||||
|
||||
if quality_gate:
|
||||
metric_lines.extend(
|
||||
[
|
||||
"# TYPE ariadne_quality_gate_file_lines gauge",
|
||||
"# TYPE ariadne_quality_gate_file_missing_docstrings gauge",
|
||||
"# TYPE ariadne_quality_gate_file_coverage_percent gauge",
|
||||
]
|
||||
)
|
||||
)
|
||||
for path, data in sorted(files.items()):
|
||||
file_labels = {"suite": suite, "file": path}
|
||||
if isinstance(data.get("lines"), int):
|
||||
metric_lines.append(_metric_line("ariadne_quality_gate_file_lines", data["lines"], file_labels))
|
||||
if isinstance(data.get("missing_docstrings"), int):
|
||||
metric_lines.append(
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_file_missing_docstrings",
|
||||
data["missing_docstrings"],
|
||||
file_labels,
|
||||
)
|
||||
)
|
||||
coverage_percent = data.get("coverage_percent")
|
||||
if isinstance(coverage_percent, (int, float)):
|
||||
metric_lines.append(
|
||||
_metric_line(
|
||||
"ariadne_quality_gate_file_coverage_percent",
|
||||
round(float(coverage_percent), 3),
|
||||
file_labels,
|
||||
)
|
||||
)
|
||||
|
||||
payload = "\n".join(metric_lines) + "\n"
|
||||
result = {
|
||||
"suite": suite,
|
||||
"outcome": outcome,
|
||||
"tests_total": tests_total,
|
||||
"tests_passed": tests_passed,
|
||||
"tests_failed": tests_failed,
|
||||
"tests_errors": tests_errors,
|
||||
"tests_skipped": tests_skipped,
|
||||
"coverage_percent": round(coverage or 0.0, 3),
|
||||
"quality_gate_present": quality_gate is not None,
|
||||
"quality_gate_status": (quality_gate or {}).get("status") or "missing",
|
||||
"quality_gate_summary": summary,
|
||||
"ok_counter": ok_count,
|
||||
"failed_counter": failed_count,
|
||||
"payload": payload,
|
||||
"pushgateway_url": pushgateway_url,
|
||||
"job_name": job_name,
|
||||
}
|
||||
return payload, result
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Publish Ariadne quality metrics to Pushgateway.
|
||||
|
||||
Inputs: environment variables plus any available build artifacts.
|
||||
Outputs: a POST to Pushgateway and a JSON summary printed to stdout so local
|
||||
runs and Jenkins logs can confirm exactly what was emitted.
|
||||
"""
|
||||
|
||||
payload, result = _build_payload()
|
||||
target = f"{result['pushgateway_url'].rstrip('/')}/metrics/job/{result['job_name']}/suite/{result['suite']}"
|
||||
_post_text(target, payload)
|
||||
printable = dict(result)
|
||||
printable.pop("payload", None)
|
||||
print(json.dumps(printable, indent=2, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
raise SystemExit(main())
|
||||
except Exception as exc:
|
||||
print(f"metrics push failed: {exc}")
|
||||
raise
|
||||
raise SystemExit(main())
|
||||
|
||||
@ -62,6 +62,8 @@ def test_startup_registers_metis_watch(monkeypatch) -> None:
|
||||
app_module._startup()
|
||||
|
||||
assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks)
|
||||
assert any(name == "schedule.metis_k3s_token_sync" for name, _cron in tasks)
|
||||
assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks)
|
||||
|
||||
|
||||
def test_record_event_handles_exception(monkeypatch) -> None:
|
||||
|
||||
@ -20,11 +20,10 @@ def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None:
|
||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||
|
||||
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
||||
monkeypatch.setattr(
|
||||
jwt,
|
||||
"decode",
|
||||
lambda *args, **kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
|
||||
kc,
|
||||
"_decode_claims",
|
||||
lambda *_args, **_kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
|
||||
)
|
||||
|
||||
claims = kc.verify(token)
|
||||
@ -36,12 +35,7 @@ def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
|
||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||
|
||||
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
||||
monkeypatch.setattr(
|
||||
jwt,
|
||||
"decode",
|
||||
lambda *args, **kwargs: {"azp": "other", "aud": ["other"]},
|
||||
)
|
||||
monkeypatch.setattr(kc, "_decode_claims", lambda *_args, **_kwargs: {"azp": "other", "aud": ["other"]})
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
kc.verify(token)
|
||||
@ -73,11 +67,10 @@ def test_keycloak_verify_refreshes_jwks(monkeypatch) -> None:
|
||||
return {"keys": [{"kid": "test"}]}
|
||||
|
||||
monkeypatch.setattr(kc, "_get_jwks", fake_get_jwks)
|
||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
||||
monkeypatch.setattr(
|
||||
jwt,
|
||||
"decode",
|
||||
lambda *args, **kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
|
||||
kc,
|
||||
"_decode_claims",
|
||||
lambda *_args, **_kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
|
||||
)
|
||||
|
||||
claims = kc.verify(token)
|
||||
@ -94,6 +87,30 @@ def test_keycloak_verify_kid_not_found(monkeypatch) -> None:
|
||||
kc.verify(token)
|
||||
|
||||
|
||||
def test_keycloak_decode_claims_uses_expected_decoder_arguments(monkeypatch) -> None:
|
||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||
captured = {}
|
||||
|
||||
monkeypatch.setattr(
|
||||
jwt.algorithms,
|
||||
"RSAAlgorithm",
|
||||
type("DummyRSA", (), {"from_jwk": staticmethod(lambda key: "parsed-key")}),
|
||||
raising=False,
|
||||
)
|
||||
|
||||
def fake_decode(token, **kwargs):
|
||||
captured["token"] = token
|
||||
captured["kwargs"] = kwargs
|
||||
return {"preferred_username": "alice"}
|
||||
|
||||
monkeypatch.setattr(jwt, "decode", fake_decode)
|
||||
|
||||
claims = kc._decode_claims("header.payload.sig", {"kid": "test"})
|
||||
assert claims["preferred_username"] == "alice"
|
||||
assert captured["kwargs"]["key"] == "parsed-key"
|
||||
assert captured["kwargs"]["issuer"] == "https://issuer"
|
||||
|
||||
|
||||
def test_authenticator_normalizes_groups(monkeypatch) -> None:
|
||||
token = _make_token()
|
||||
auth = Authenticator()
|
||||
|
||||
@ -240,55 +240,68 @@ def test_comms_guest_name_randomizer(monkeypatch) -> None:
|
||||
)
|
||||
monkeypatch.setattr(comms_module, "settings", dummy_settings)
|
||||
|
||||
responses = {
|
||||
("POST", "http://mas/token"): DummyResponse({"access_token": "admintoken"}),
|
||||
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"): DummyResponse(
|
||||
{"data": {"id": "seed"}}
|
||||
responses = [
|
||||
(("POST", "http://mas/token"), DummyResponse({"access_token": "admintoken"})),
|
||||
(
|
||||
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"),
|
||||
DummyResponse({"data": {"id": "seed"}}),
|
||||
),
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
|
||||
{"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}
|
||||
(
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions"),
|
||||
DummyResponse({"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}),
|
||||
),
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"): DummyResponse({}),
|
||||
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"): DummyResponse(
|
||||
{"room_id": "room1"}
|
||||
(("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"), DummyResponse({})),
|
||||
(
|
||||
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"),
|
||||
DummyResponse({"room_id": "room1"}),
|
||||
),
|
||||
("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"): DummyResponse(
|
||||
{"chunk": []}
|
||||
(("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"), DummyResponse({"chunk": []})),
|
||||
(
|
||||
("GET", "http://mas/api/admin/v1/users?page[size]=100"),
|
||||
DummyResponse(
|
||||
{
|
||||
"data": [
|
||||
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
|
||||
]
|
||||
}
|
||||
),
|
||||
),
|
||||
("GET", "http://mas/api/admin/v1/users?page[size]=100"): DummyResponse(
|
||||
{
|
||||
"data": [
|
||||
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
|
||||
]
|
||||
}
|
||||
(
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions"),
|
||||
DummyResponse({"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}),
|
||||
),
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
|
||||
{"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}
|
||||
(
|
||||
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"),
|
||||
DummyResponse({"displayname": None}),
|
||||
),
|
||||
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"): DummyResponse(
|
||||
{"displayname": None}
|
||||
(("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"), DummyResponse({})),
|
||||
(
|
||||
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"),
|
||||
DummyResponse(
|
||||
{
|
||||
"users": [
|
||||
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
|
||||
]
|
||||
}
|
||||
),
|
||||
),
|
||||
("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"): DummyResponse({}),
|
||||
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse(
|
||||
{
|
||||
"users": [
|
||||
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
|
||||
]
|
||||
}
|
||||
(("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
|
||||
(
|
||||
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"),
|
||||
DummyResponse({"displayname": None}),
|
||||
),
|
||||
("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
|
||||
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse(
|
||||
{"displayname": None}
|
||||
),
|
||||
("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
|
||||
("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"): DummyResponse({}),
|
||||
}
|
||||
(("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
|
||||
(("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"), DummyResponse({})),
|
||||
]
|
||||
response_queues: dict[tuple[str, str], list[DummyResponse]] = {}
|
||||
for key, response in responses:
|
||||
response_queues.setdefault(key, []).append(response)
|
||||
|
||||
def handler(method, url, **_kwargs):
|
||||
resp = responses.get((method, url))
|
||||
if resp is None:
|
||||
queue = response_queues.get((method, url))
|
||||
if not queue:
|
||||
return DummyResponse({})
|
||||
return resp
|
||||
return queue.pop(0)
|
||||
|
||||
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
|
||||
monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0)
|
||||
|
||||
@ -129,3 +129,95 @@ def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None:
|
||||
db = Database("postgresql://user:pass@localhost/db")
|
||||
assert db.fetchone("fetchone") == {"id": 1}
|
||||
assert db.fetchall("fetchall") == [{"id": 1}, {"id": 2}]
|
||||
|
||||
|
||||
def test_database_passes_config_to_pool(monkeypatch) -> None:
|
||||
captured = {}
|
||||
|
||||
class CapturePool(DummyPool):
|
||||
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||
captured.update(
|
||||
{
|
||||
"conninfo": conninfo,
|
||||
"min_size": min_size,
|
||||
"max_size": max_size,
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
)
|
||||
super().__init__(conninfo=conninfo, min_size=min_size, max_size=max_size, kwargs=kwargs)
|
||||
|
||||
monkeypatch.setattr(db_module, "ConnectionPool", CapturePool)
|
||||
config = db_module.DatabaseConfig(
|
||||
pool_min=1,
|
||||
pool_max=7,
|
||||
connect_timeout_sec=9,
|
||||
lock_timeout_sec=11,
|
||||
statement_timeout_sec=13,
|
||||
idle_in_tx_timeout_sec=15,
|
||||
application_name="ariadne-test",
|
||||
)
|
||||
Database("postgresql://user:pass@localhost/db", config)
|
||||
|
||||
assert captured["conninfo"] == "postgresql://user:pass@localhost/db"
|
||||
assert captured["min_size"] == 1
|
||||
assert captured["max_size"] == 7
|
||||
assert captured["kwargs"]["connect_timeout"] == 9
|
||||
assert captured["kwargs"]["application_name"] == "ariadne-test"
|
||||
assert "lock_timeout=11s" in captured["kwargs"]["options"]
|
||||
|
||||
|
||||
def test_migrate_returns_when_advisory_lock_is_unavailable(monkeypatch) -> None:
|
||||
class NoLockConn(DummyConn):
|
||||
def execute(self, query, params=None):
|
||||
if "pg_try_advisory_lock" in query:
|
||||
return DummyResult(row=(False,))
|
||||
return super().execute(query, params)
|
||||
|
||||
class NoLockPool(DummyPool):
|
||||
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||
self.conn = NoLockConn()
|
||||
|
||||
monkeypatch.setattr(db_module, "ConnectionPool", NoLockPool)
|
||||
db = Database("postgresql://user:pass@localhost/db")
|
||||
db.migrate(lock_id=123)
|
||||
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
|
||||
|
||||
|
||||
def test_migrate_handles_missing_access_requests_table(monkeypatch) -> None:
|
||||
class UndefinedTableConn(DummyConn):
|
||||
def execute(self, query, params=None):
|
||||
if "ALTER TABLE access_requests" in query:
|
||||
raise db_module.psycopg.errors.UndefinedTable()
|
||||
return super().execute(query, params)
|
||||
|
||||
class UndefinedTablePool(DummyPool):
|
||||
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||
self.conn = UndefinedTableConn()
|
||||
|
||||
monkeypatch.setattr(db_module, "ConnectionPool", UndefinedTablePool)
|
||||
db = Database("postgresql://user:pass@localhost/db")
|
||||
db.migrate(lock_id=123)
|
||||
|
||||
|
||||
def test_migrate_skip_flags(monkeypatch) -> None:
|
||||
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
|
||||
db = Database("postgresql://user:pass@localhost/db")
|
||||
db.migrate(lock_id=123, include_ariadne_tables=False, include_access_requests=False)
|
||||
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
|
||||
assert not any("ALTER TABLE access_requests" in query for query, _ in db._pool.conn.executed)
|
||||
|
||||
|
||||
def test_unlock_swallows_errors(monkeypatch) -> None:
|
||||
class UnlockConn(DummyConn):
|
||||
def execute(self, query, params=None):
|
||||
if "pg_advisory_unlock" in query:
|
||||
raise RuntimeError("boom")
|
||||
return super().execute(query, params)
|
||||
|
||||
class UnlockPool(DummyPool):
|
||||
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||
self.conn = UnlockConn()
|
||||
|
||||
monkeypatch.setattr(db_module, "ConnectionPool", UnlockPool)
|
||||
db = Database("postgresql://user:pass@localhost/db")
|
||||
db.migrate(lock_id=123)
|
||||
|
||||
@ -63,6 +63,10 @@ def test_build_command_wraps_env() -> None:
|
||||
assert "export FOO=bar" in cmd[2]
|
||||
|
||||
|
||||
def test_build_command_accepts_string_without_env() -> None:
|
||||
assert _build_command("echo hello", None) == ["/bin/sh", "-c", "echo hello"]
|
||||
|
||||
|
||||
def test_exec_returns_output(monkeypatch) -> None:
|
||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||
@ -84,6 +88,41 @@ def test_exec_raises_on_failure(monkeypatch) -> None:
|
||||
executor.exec(["false"], check=True)
|
||||
|
||||
|
||||
def test_exec_returns_failed_result_when_check_disabled(monkeypatch) -> None:
|
||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stderr="bad", exit_code=2))
|
||||
|
||||
executor = PodExecutor("ns", "app=test", None)
|
||||
result = executor.exec(["false"], check=False)
|
||||
assert result.ok is False
|
||||
assert result.exit_code == 2
|
||||
|
||||
|
||||
def test_exec_uses_returncode_when_exit_code_never_arrives(monkeypatch) -> None:
|
||||
class ReturncodeOnlyStream(DummyStream):
|
||||
def __init__(self):
|
||||
super().__init__(stdout="ok", exit_code=7)
|
||||
self._exit_code_ready = False
|
||||
self._updated = False
|
||||
|
||||
def update(self, timeout: int = 1) -> None:
|
||||
self._updated = True
|
||||
self._open = False
|
||||
|
||||
def peek_exit_code(self) -> bool:
|
||||
return False
|
||||
|
||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: ReturncodeOnlyStream())
|
||||
|
||||
executor = PodExecutor("ns", "app=test", None)
|
||||
result = executor.exec(["echo", "ok"], check=False)
|
||||
assert result.exit_code == 7
|
||||
assert result.ok is False
|
||||
|
||||
|
||||
def test_exec_times_out(monkeypatch) -> None:
|
||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||
@ -115,3 +154,18 @@ def test_ensure_client_fallback(monkeypatch) -> None:
|
||||
monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api))
|
||||
|
||||
assert exec_module._ensure_client() is dummy_api
|
||||
|
||||
|
||||
def test_ensure_client_raises_when_kubernetes_import_failed(monkeypatch) -> None:
|
||||
monkeypatch.setattr(exec_module, "_CORE_API", None)
|
||||
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", RuntimeError("missing"))
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
exec_module._ensure_client()
|
||||
|
||||
|
||||
def test_ensure_client_reuses_cached_client(monkeypatch) -> None:
|
||||
cached = object()
|
||||
monkeypatch.setattr(exec_module, "_CORE_API", cached)
|
||||
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None)
|
||||
assert exec_module._ensure_client() is cached
|
||||
|
||||
@ -5,6 +5,32 @@ import pytest
|
||||
from ariadne.k8s import pods as pods_module
|
||||
|
||||
|
||||
def test_parse_start_time_handles_missing_and_invalid_values() -> None:
|
||||
assert pods_module._parse_start_time(None) == 0.0
|
||||
assert pods_module._parse_start_time("not-a-date") == 0.0
|
||||
|
||||
|
||||
def test_parse_start_time_defaults_naive_value_to_utc() -> None:
|
||||
assert pods_module._parse_start_time("2026-01-20T00:00:00") > 0
|
||||
|
||||
|
||||
def test_is_ready_handles_missing_conditions_and_non_ready_states() -> None:
|
||||
assert pods_module._is_ready({"status": {"phase": "Pending"}}) is False
|
||||
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": "bad"}}) is False
|
||||
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": ["bad"]}}) is False
|
||||
assert (
|
||||
pods_module._is_ready(
|
||||
{"status": {"phase": "Running", "conditions": [{"type": "Initialized", "status": "True"}]}}
|
||||
)
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
def test_list_pods_requires_namespace() -> None:
|
||||
with pytest.raises(pods_module.PodSelectionError):
|
||||
pods_module.list_pods("", "app=test")
|
||||
|
||||
|
||||
def test_list_pods_encodes_selector(monkeypatch) -> None:
|
||||
captured = {}
|
||||
|
||||
@ -17,6 +43,11 @@ def test_list_pods_encodes_selector(monkeypatch) -> None:
|
||||
assert "labelSelector=app%3Dnextcloud" in captured["path"]
|
||||
|
||||
|
||||
def test_list_pods_filters_non_dict_items(monkeypatch) -> None:
|
||||
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: {"items": [{}, "bad", 1]})
|
||||
assert pods_module.list_pods("demo", "app=test") == [{}]
|
||||
|
||||
|
||||
def test_select_pod_picks_ready_latest(monkeypatch) -> None:
|
||||
payload = {
|
||||
"items": [
|
||||
@ -57,3 +88,49 @@ def test_select_pod_ignores_non_ready(monkeypatch) -> None:
|
||||
|
||||
with pytest.raises(pods_module.PodSelectionError):
|
||||
pods_module.select_pod("demo", "app=test")
|
||||
|
||||
|
||||
def test_select_pod_skips_deleted_and_invalid_entries(monkeypatch) -> None:
|
||||
payload = {
|
||||
"items": [
|
||||
{
|
||||
"metadata": {"name": "deleting", "deletionTimestamp": "2026-01-20T00:00:00Z"},
|
||||
"status": {
|
||||
"phase": "Running",
|
||||
"conditions": [{"type": "Ready", "status": "True"}],
|
||||
},
|
||||
},
|
||||
{
|
||||
"metadata": {"name": "chosen"},
|
||||
"status": {
|
||||
"phase": "Running",
|
||||
"nodeName": "titan-24",
|
||||
"startTime": "2026-01-20T00:00:00Z",
|
||||
"conditions": [{"type": "Ready", "status": "True"}],
|
||||
},
|
||||
},
|
||||
]
|
||||
}
|
||||
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
|
||||
|
||||
pod = pods_module.select_pod("demo", "app=test")
|
||||
assert pod.name == "chosen"
|
||||
assert pod.node == "titan-24"
|
||||
|
||||
|
||||
def test_select_pod_skips_blank_names(monkeypatch) -> None:
|
||||
payload = {
|
||||
"items": [
|
||||
{
|
||||
"metadata": {"name": " "},
|
||||
"status": {
|
||||
"phase": "Running",
|
||||
"conditions": [{"type": "Ready", "status": "True"}],
|
||||
},
|
||||
}
|
||||
]
|
||||
}
|
||||
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
|
||||
|
||||
with pytest.raises(pods_module.PodSelectionError):
|
||||
pods_module.select_pod("demo", "app=test")
|
||||
|
||||
@ -244,7 +244,7 @@ def test_find_user_by_email_skips_non_dict(monkeypatch) -> None:
|
||||
assert client.find_user_by_email("alice@example.com") is None
|
||||
|
||||
|
||||
def test_get_user_invalid_payload(monkeypatch) -> None:
|
||||
def test_get_user_invalid_payload_duplicate(monkeypatch) -> None:
|
||||
dummy_settings = types.SimpleNamespace(
|
||||
keycloak_admin_url="http://kc",
|
||||
keycloak_admin_realm="atlas",
|
||||
@ -299,7 +299,7 @@ def test_update_user_safe_handles_bad_attrs(monkeypatch) -> None:
|
||||
assert captured["payload"]["attributes"] == {"new": ["item"]}
|
||||
|
||||
|
||||
def test_set_user_attribute_user_id_missing(monkeypatch) -> None:
|
||||
def test_set_user_attribute_user_id_missing_duplicate(monkeypatch) -> None:
|
||||
client = KeycloakAdminClient()
|
||||
|
||||
def fake_find_user(username: str) -> dict[str, Any]:
|
||||
|
||||
82
tests/test_metis_token_sync.py
Normal file
82
tests/test_metis_token_sync.py
Normal file
@ -0,0 +1,82 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from ariadne.services import metis_token_sync as module
|
||||
|
||||
|
||||
def _settings() -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
metis_token_sync_namespace="maintenance",
|
||||
metis_token_sync_service_account="metis-token-sync",
|
||||
metis_token_sync_node_name="titan-0a",
|
||||
metis_token_sync_image="hashicorp/vault:1.17.6",
|
||||
metis_token_sync_job_ttl_sec=1800,
|
||||
metis_token_sync_wait_timeout_sec=10.0,
|
||||
metis_token_sync_vault_addr="http://vault.vault.svc.cluster.local:8200",
|
||||
metis_token_sync_vault_k8s_role="maintenance-metis-token-sync",
|
||||
)
|
||||
|
||||
|
||||
def test_payload_matches_expected_contract(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
payload = module.MetisTokenSyncService()._job_payload("sync-job")
|
||||
|
||||
assert payload["metadata"]["namespace"] == "maintenance"
|
||||
assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne"
|
||||
spec = payload["spec"]["template"]["spec"]
|
||||
assert spec["serviceAccountName"] == "metis-token-sync"
|
||||
assert spec["nodeName"] == "titan-0a"
|
||||
assert spec["containers"][0]["image"] == "hashicorp/vault:1.17.6"
|
||||
assert spec["containers"][0]["volumeMounts"][0]["mountPath"] == "/host/var/lib/rancher/k3s/server"
|
||||
|
||||
|
||||
def test_run_wait_success(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
|
||||
|
||||
posted: dict[str, object] = {}
|
||||
|
||||
def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]:
|
||||
posted["path"] = path
|
||||
posted["payload"] = payload
|
||||
return {"metadata": {"name": "sync-job-1"}}
|
||||
|
||||
calls = iter(
|
||||
[
|
||||
{"status": {"active": 1}},
|
||||
{"status": {"succeeded": 1}},
|
||||
]
|
||||
)
|
||||
|
||||
monkeypatch.setattr(module, "post_json", fake_post)
|
||||
monkeypatch.setattr(module, "get_json", lambda _path: next(calls))
|
||||
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
|
||||
|
||||
result = module.MetisTokenSyncService().run(wait=True)
|
||||
|
||||
assert posted["path"] == "/apis/batch/v1/namespaces/maintenance/jobs"
|
||||
assert result == {"job": "sync-job-1", "status": "ok"}
|
||||
|
||||
|
||||
def test_run_wait_failure_raises(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
|
||||
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-2"}})
|
||||
monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}})
|
||||
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
|
||||
|
||||
with pytest.raises(RuntimeError, match="metis token sync job sync-job-2 error"):
|
||||
module.MetisTokenSyncService().run(wait=True)
|
||||
|
||||
|
||||
def test_run_without_wait_queues(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
|
||||
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-3"}})
|
||||
|
||||
result = module.MetisTokenSyncService().run(wait=False)
|
||||
|
||||
assert result == {"job": "sync-job-3", "status": "queued"}
|
||||
@ -2,7 +2,25 @@ from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from ariadne.metrics.metrics import set_access_request_counts, set_cluster_state_metrics
|
||||
from ariadne.metrics.metrics import (
|
||||
record_schedule_state,
|
||||
record_task_run,
|
||||
set_access_request_counts,
|
||||
set_cluster_state_metrics,
|
||||
)
|
||||
|
||||
|
||||
def test_record_task_run_accepts_missing_duration() -> None:
|
||||
record_task_run("demo", "ok", None)
|
||||
|
||||
|
||||
def test_record_task_run_records_duration() -> None:
|
||||
record_task_run("demo", "ok", 1.5)
|
||||
|
||||
|
||||
def test_record_schedule_state_tracks_success_and_failure() -> None:
|
||||
record_schedule_state("demo", 10.0, 8.0, 20.0, True)
|
||||
record_schedule_state("demo", 11.0, None, None, False)
|
||||
|
||||
|
||||
def test_set_access_request_counts() -> None:
|
||||
@ -11,3 +29,7 @@ def test_set_access_request_counts() -> None:
|
||||
|
||||
def test_set_cluster_state_metrics() -> None:
|
||||
set_cluster_state_metrics(datetime.now(timezone.utc), 4, 3, 12.0, 1)
|
||||
|
||||
|
||||
def test_set_cluster_state_metrics_allows_missing_values() -> None:
|
||||
set_cluster_state_metrics(datetime.now(timezone.utc), None, None, None, None)
|
||||
|
||||
79
tests/test_migrate.py
Normal file
79
tests/test_migrate.py
Normal file
@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import types
|
||||
|
||||
import ariadne.migrate as migrate_module
|
||||
|
||||
|
||||
class DummyDatabase:
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
self.calls: list[tuple[int, bool, bool]] = []
|
||||
self.closed = False
|
||||
|
||||
def migrate(self, lock_id: int, *, include_ariadne_tables: bool, include_access_requests: bool) -> None:
|
||||
self.calls.append((lock_id, include_ariadne_tables, include_access_requests))
|
||||
|
||||
def close(self) -> None:
|
||||
self.closed = True
|
||||
|
||||
|
||||
def test_build_db_uses_settings(monkeypatch) -> None:
|
||||
captured = {}
|
||||
monkeypatch.setattr(
|
||||
migrate_module,
|
||||
"settings",
|
||||
types.SimpleNamespace(
|
||||
ariadne_db_pool_min=1,
|
||||
ariadne_db_pool_max=3,
|
||||
ariadne_db_connect_timeout_sec=5,
|
||||
ariadne_db_lock_timeout_sec=7,
|
||||
ariadne_db_statement_timeout_sec=11,
|
||||
ariadne_db_idle_in_tx_timeout_sec=13,
|
||||
),
|
||||
)
|
||||
|
||||
def fake_database(dsn, config):
|
||||
captured["dsn"] = dsn
|
||||
captured["config"] = config
|
||||
return DummyDatabase("ariadne")
|
||||
|
||||
monkeypatch.setattr(migrate_module, "Database", fake_database)
|
||||
migrate_module._build_db("postgresql://db", "ariadne_migrate")
|
||||
|
||||
assert captured["dsn"] == "postgresql://db"
|
||||
assert captured["config"].application_name == "ariadne_migrate"
|
||||
assert captured["config"].lock_timeout_sec == 7
|
||||
|
||||
|
||||
def test_main_returns_when_migrations_disabled(monkeypatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
migrate_module,
|
||||
"settings",
|
||||
types.SimpleNamespace(ariadne_run_migrations=False),
|
||||
)
|
||||
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("should not build")))
|
||||
migrate_module.main()
|
||||
|
||||
|
||||
def test_main_runs_ariadne_and_portal_migrations(monkeypatch) -> None:
|
||||
ariadne_db = DummyDatabase("ariadne")
|
||||
portal_db = DummyDatabase("portal")
|
||||
databases = [ariadne_db, portal_db]
|
||||
monkeypatch.setattr(
|
||||
migrate_module,
|
||||
"settings",
|
||||
types.SimpleNamespace(
|
||||
ariadne_run_migrations=True,
|
||||
ariadne_database_url="postgresql://ariadne",
|
||||
portal_database_url="postgresql://portal",
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: databases.pop(0))
|
||||
|
||||
migrate_module.main()
|
||||
|
||||
assert ariadne_db.calls == [(migrate_module.ARIADNE_MIGRATION_LOCK_ID, True, False)]
|
||||
assert portal_db.calls == [(migrate_module.PORTAL_MIGRATION_LOCK_ID, False, True)]
|
||||
assert ariadne_db.closed is True
|
||||
assert portal_db.closed is True
|
||||
80
tests/test_platform_quality_probe.py
Normal file
80
tests/test_platform_quality_probe.py
Normal file
@ -0,0 +1,80 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
from ariadne.services import platform_quality_probe as module
|
||||
|
||||
|
||||
def _settings() -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
platform_quality_probe_namespace="monitoring",
|
||||
platform_quality_probe_script_configmap="platform-quality-suite-probe-script",
|
||||
platform_quality_probe_image="curlimages/curl:8.12.1",
|
||||
platform_quality_probe_job_ttl_sec=1800,
|
||||
platform_quality_probe_wait_timeout_sec=20.0,
|
||||
platform_quality_probe_pushgateway_url="http://platform-quality-gateway.monitoring.svc.cluster.local:9091",
|
||||
platform_quality_probe_http_timeout_sec=12,
|
||||
)
|
||||
|
||||
|
||||
def test_payload_matches_expected_contract(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
payload = module.PlatformQualityProbeService()._job_payload("probe-job")
|
||||
|
||||
assert payload["metadata"]["namespace"] == "monitoring"
|
||||
assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne"
|
||||
container = payload["spec"]["template"]["spec"]["containers"][0]
|
||||
assert container["image"] == "curlimages/curl:8.12.1"
|
||||
assert container["command"] == ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"]
|
||||
assert payload["spec"]["template"]["spec"]["volumes"][0]["configMap"]["name"] == "platform-quality-suite-probe-script"
|
||||
|
||||
|
||||
def test_run_wait_success(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
|
||||
|
||||
posted: dict[str, object] = {}
|
||||
|
||||
def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]:
|
||||
posted["path"] = path
|
||||
posted["payload"] = payload
|
||||
return {"metadata": {"name": "probe-job-1"}}
|
||||
|
||||
calls = iter(
|
||||
[
|
||||
{"status": {"active": 1}},
|
||||
{"status": {"succeeded": 1}},
|
||||
]
|
||||
)
|
||||
|
||||
monkeypatch.setattr(module, "post_json", fake_post)
|
||||
monkeypatch.setattr(module, "get_json", lambda _path: next(calls))
|
||||
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
|
||||
|
||||
result = module.PlatformQualityProbeService().run(wait=True)
|
||||
|
||||
assert posted["path"] == "/apis/batch/v1/namespaces/monitoring/jobs"
|
||||
assert result == {"job": "probe-job-1", "status": "ok"}
|
||||
|
||||
|
||||
def test_run_wait_failure_raises(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
|
||||
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-2"}})
|
||||
monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}})
|
||||
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
|
||||
|
||||
with pytest.raises(RuntimeError, match="platform quality probe job probe-job-2 error"):
|
||||
module.PlatformQualityProbeService().run(wait=True)
|
||||
|
||||
|
||||
def test_run_without_wait_queues(monkeypatch) -> None:
|
||||
monkeypatch.setattr(module, "settings", _settings())
|
||||
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
|
||||
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-3"}})
|
||||
|
||||
result = module.PlatformQualityProbeService().run(wait=False)
|
||||
|
||||
assert result == {"job": "probe-job-3", "status": "queued"}
|
||||
@ -595,7 +595,7 @@ def test_provisioning_sync_errors(monkeypatch) -> None:
|
||||
assert outcome.status == "accounts_building"
|
||||
|
||||
|
||||
def test_provisioning_run_loop_processes_candidates(monkeypatch) -> None:
|
||||
def test_provisioning_run_loop_processes_candidates_duplicate(monkeypatch) -> None:
|
||||
dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0)
|
||||
monkeypatch.setattr(prov, "settings", dummy_settings)
|
||||
_patch_mailu_ready(monkeypatch, dummy_settings)
|
||||
@ -652,7 +652,7 @@ def test_provisioning_run_loop_waits_for_admin(monkeypatch) -> None:
|
||||
manager._run_loop()
|
||||
|
||||
|
||||
def test_provisioning_missing_verified_email(monkeypatch) -> None:
|
||||
def test_provisioning_missing_verified_email_duplicate(monkeypatch) -> None:
|
||||
dummy_settings = types.SimpleNamespace(
|
||||
mailu_domain="bstein.dev",
|
||||
mailu_sync_url="",
|
||||
|
||||
104
tests/test_publish_test_metrics.py
Normal file
104
tests/test_publish_test_metrics.py
Normal file
@ -0,0 +1,104 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import urllib.request
|
||||
|
||||
import scripts.publish_test_metrics as metrics_script
|
||||
|
||||
|
||||
class DummyHTTPResponse:
|
||||
def __init__(self, body: str, status: int = 200):
|
||||
self._body = body
|
||||
self.status = status
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def read(self) -> bytes:
|
||||
return self._body.encode("utf-8")
|
||||
|
||||
|
||||
def test_build_payload_includes_quality_gate_metrics(monkeypatch, tmp_path: Path) -> None:
|
||||
coverage_path = tmp_path / "coverage.json"
|
||||
coverage_path.write_text(json.dumps({"summary": {"percent_covered": 97.5}}), encoding="utf-8")
|
||||
junit_path = tmp_path / "junit.xml"
|
||||
junit_path.write_text('<testsuite tests="5" failures="1" errors="1" skipped="1"/>', encoding="utf-8")
|
||||
quality_gate_path = tmp_path / "quality-gate.json"
|
||||
quality_gate_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"status": "ok",
|
||||
"summary": {
|
||||
"line_count_violations": 0,
|
||||
"docstring_violations": 1,
|
||||
"coverage_violations": 0,
|
||||
"legacy_line_count_files": 2,
|
||||
"legacy_docstring_files": 3,
|
||||
"coverage_exemptions": 4,
|
||||
},
|
||||
"files": {
|
||||
"ariadne/auth/keycloak.py": {
|
||||
"lines": 120,
|
||||
"missing_docstrings": 0,
|
||||
"coverage_percent": 99.1,
|
||||
}
|
||||
},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setenv("COVERAGE_JSON", str(coverage_path))
|
||||
monkeypatch.setenv("JUNIT_XML", str(junit_path))
|
||||
monkeypatch.setenv("QUALITY_GATE_JSON", str(quality_gate_path))
|
||||
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
|
||||
monkeypatch.setenv("SUITE_NAME", "ariadne")
|
||||
monkeypatch.setenv("QUALITY_STATUS", "ok")
|
||||
monkeypatch.setenv("BUILD_NUMBER", "42")
|
||||
monkeypatch.setenv("BRANCH_NAME", "main")
|
||||
monkeypatch.setenv("GIT_COMMIT", "abc123")
|
||||
monkeypatch.setattr(
|
||||
metrics_script,
|
||||
"_read_http",
|
||||
lambda _url: 'platform_quality_gate_runs_total{job="platform-quality-ci",suite="ariadne",status="ok"} 5\n',
|
||||
)
|
||||
|
||||
payload, result = metrics_script._build_payload()
|
||||
|
||||
assert result["tests_total"] == 5
|
||||
assert result["tests_passed"] == 2
|
||||
assert result["coverage_percent"] == 97.5
|
||||
assert result["ok_counter"] == 6.0
|
||||
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="quality_gate_json"} 1' in payload
|
||||
assert 'ariadne_quality_gate_violation_total{suite="ariadne",check="docstrings"} 1' in payload
|
||||
assert 'ariadne_quality_gate_file_coverage_percent{suite="ariadne",file="ariadne/auth/keycloak.py"} 99.1' in payload
|
||||
|
||||
|
||||
def test_main_posts_payload_for_failed_build_with_missing_artifacts(monkeypatch, capsys) -> None:
|
||||
requests: list[tuple[str, str]] = []
|
||||
missing_base = Path("/tmp/ariadne-metrics-missing")
|
||||
|
||||
def fake_urlopen(target, timeout=10):
|
||||
if isinstance(target, urllib.request.Request):
|
||||
requests.append((target.full_url, target.data.decode("utf-8")))
|
||||
return DummyHTTPResponse("", status=200)
|
||||
return DummyHTTPResponse("", status=200)
|
||||
|
||||
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
|
||||
monkeypatch.setenv("SUITE_NAME", "ariadne")
|
||||
monkeypatch.setenv("QUALITY_STATUS", "failed")
|
||||
monkeypatch.setenv("COVERAGE_JSON", str(missing_base / "coverage.json"))
|
||||
monkeypatch.setenv("JUNIT_XML", str(missing_base / "junit.xml"))
|
||||
monkeypatch.setenv("QUALITY_GATE_JSON", str(missing_base / "quality-gate.json"))
|
||||
monkeypatch.setattr(metrics_script.urllib.request, "urlopen", fake_urlopen)
|
||||
|
||||
assert metrics_script.main() == 0
|
||||
assert requests
|
||||
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="coverage_json"} 0' in requests[0][1]
|
||||
assert 'ariadne_quality_gate_status{suite="ariadne"} 0' in requests[0][1]
|
||||
output = capsys.readouterr().out
|
||||
assert '"quality_gate_status": "missing"' in output
|
||||
117
tests/test_quality_gate.py
Normal file
117
tests/test_quality_gate.py
Normal file
@ -0,0 +1,117 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import scripts.check_quality_gate as quality_gate
|
||||
|
||||
|
||||
def _write_quality_config(path: Path) -> None:
|
||||
path.write_text(
|
||||
"\n".join(
|
||||
[
|
||||
"[files]",
|
||||
'roots = ["pkg"]',
|
||||
"max_lines = 10",
|
||||
"",
|
||||
"[docstrings]",
|
||||
'roots = ["pkg"]',
|
||||
"non_trivial_min_lines = 4",
|
||||
"",
|
||||
"[coverage]",
|
||||
'roots = ["pkg"]',
|
||||
"threshold = 95.0",
|
||||
'targets = ["pkg/good.py"]',
|
||||
"",
|
||||
"[legacy.line_count]",
|
||||
'"pkg/legacy.py" = 12',
|
||||
"",
|
||||
"[legacy.docstrings]",
|
||||
'"pkg/legacy.py" = 1',
|
||||
]
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
|
||||
def test_build_report_accepts_legacy_exceptions(tmp_path: Path) -> None:
|
||||
pkg = tmp_path / "pkg"
|
||||
pkg.mkdir()
|
||||
(pkg / "good.py").write_text(
|
||||
'\n'.join(
|
||||
[
|
||||
'def documented(value: int) -> int:',
|
||||
' """Return a simple incremented value.',
|
||||
"",
|
||||
" Inputs: an integer to increment.",
|
||||
" Outputs: the incremented integer so the module stays fully documented.",
|
||||
' """',
|
||||
" return value + 1",
|
||||
]
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
(pkg / "legacy.py").write_text(
|
||||
"\n".join(
|
||||
[
|
||||
"class Legacy:",
|
||||
" pass",
|
||||
]
|
||||
+ ["# filler"] * 10
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
coverage_path = tmp_path / "coverage.json"
|
||||
coverage_path.write_text(
|
||||
json.dumps({"files": {"pkg/good.py": {"summary": {"percent_covered": 99.0}}}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
config_path = tmp_path / "quality_gate.toml"
|
||||
_write_quality_config(config_path)
|
||||
|
||||
config = quality_gate._load_config(config_path)
|
||||
report = quality_gate._build_report(
|
||||
tmp_path,
|
||||
config,
|
||||
{"pkg/good.py": {"adjusted_percent": 99.0, "raw_percent": 99.0, "excluded_lines": []}},
|
||||
True,
|
||||
)
|
||||
|
||||
assert report["status"] == "ok"
|
||||
assert report["summary"]["legacy_line_count_files"] == 1
|
||||
assert report["files"]["pkg/good.py"]["coverage_enforced"] is True
|
||||
|
||||
|
||||
def test_main_fails_when_new_violations_appear(monkeypatch, tmp_path: Path) -> None:
|
||||
pkg = tmp_path / "pkg"
|
||||
pkg.mkdir()
|
||||
(pkg / "good.py").write_text(
|
||||
"\n".join(
|
||||
[
|
||||
"def undocumented(value):",
|
||||
" total = value + 1",
|
||||
" total += 1",
|
||||
" total += 1",
|
||||
" return total",
|
||||
]
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
config_path = tmp_path / "quality_gate.toml"
|
||||
_write_quality_config(config_path)
|
||||
monkeypatch.chdir(tmp_path)
|
||||
monkeypatch.setattr(
|
||||
sys,
|
||||
"argv",
|
||||
["check_quality_gate.py", "--config", "quality_gate.toml", "--coverage-json", "missing.json", "--output", "out.json"],
|
||||
)
|
||||
|
||||
assert quality_gate.main() == 1
|
||||
report = json.loads((tmp_path / "out.json").read_text(encoding="utf-8"))
|
||||
assert report["summary"]["docstring_violations"] >= 1
|
||||
assert report["summary"]["coverage_violations"] >= 1
|
||||
135
tests/test_soteria.py
Normal file
135
tests/test_soteria.py
Normal file
@ -0,0 +1,135 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
|
||||
import httpx
|
||||
|
||||
from ariadne.services import soteria as soteria_module
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(self, status_code: int = 200, payload: object | None = None) -> None:
|
||||
self.status_code = status_code
|
||||
self._payload = payload or {}
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.status_code >= 400:
|
||||
request = httpx.Request("POST", "http://example.test")
|
||||
raise httpx.HTTPStatusError("boom", request=request, response=self)
|
||||
|
||||
def json(self):
|
||||
return self._payload
|
||||
|
||||
|
||||
class DummyClient:
|
||||
def __init__(self, responses: list[DummyResponse]) -> None:
|
||||
self._responses = list(responses)
|
||||
self.calls: list[tuple[str, dict[str, object] | None]] = []
|
||||
self.kwargs = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def post(self, url: str, json: dict[str, object] | None = None):
|
||||
self.calls.append((url, json))
|
||||
if not self._responses:
|
||||
return DummyResponse(payload={})
|
||||
return self._responses.pop(0)
|
||||
|
||||
|
||||
def test_scheduled_backup_posts_for_targets(monkeypatch) -> None:
|
||||
dummy = SimpleNamespace(
|
||||
soteria_base_url="http://soteria.maintenance.svc.cluster.local",
|
||||
soteria_backup_url="",
|
||||
soteria_restore_test_url="",
|
||||
soteria_timeout_sec=7.5,
|
||||
soteria_backup_targets=["jenkins/jenkins", "postgres/postgres-data-postgres-0"],
|
||||
soteria_restore_test_targets=[],
|
||||
)
|
||||
monkeypatch.setattr("ariadne.services.soteria.settings", dummy)
|
||||
client = DummyClient(
|
||||
[
|
||||
DummyResponse(payload={"status": "ok", "backup": "first"}),
|
||||
DummyResponse(payload={"status": "ok", "backup": "second"}),
|
||||
]
|
||||
)
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def factory(**kwargs):
|
||||
captured.update(kwargs)
|
||||
return client
|
||||
|
||||
monkeypatch.setattr(soteria_module.httpx, "Client", factory)
|
||||
|
||||
summary = soteria_module.SoteriaService().run_scheduled_backups()
|
||||
|
||||
assert summary.status == "ok"
|
||||
assert summary.attempted == 2
|
||||
assert summary.succeeded == 2
|
||||
assert summary.failed == 0
|
||||
assert captured["timeout"] == 7.5
|
||||
assert client.calls[0][0].endswith("/v1/backup")
|
||||
|
||||
|
||||
def test_scheduled_backup_skips_when_unconfigured(monkeypatch) -> None:
|
||||
dummy = SimpleNamespace(
|
||||
soteria_base_url="",
|
||||
soteria_backup_url="",
|
||||
soteria_restore_test_url="",
|
||||
soteria_timeout_sec=7.5,
|
||||
soteria_backup_targets=[],
|
||||
soteria_restore_test_targets=[],
|
||||
)
|
||||
monkeypatch.setattr("ariadne.services.soteria.settings", dummy)
|
||||
|
||||
summary = soteria_module.SoteriaService().run_scheduled_backups()
|
||||
|
||||
assert summary.status == "skipped"
|
||||
assert summary.attempted == 0
|
||||
assert summary.detail == "soteria backup url not configured"
|
||||
|
||||
|
||||
def test_scheduled_backup_handles_mixed_results(monkeypatch) -> None:
|
||||
dummy = SimpleNamespace(
|
||||
soteria_base_url="http://soteria.maintenance.svc.cluster.local",
|
||||
soteria_backup_url="",
|
||||
soteria_restore_test_url="",
|
||||
soteria_timeout_sec=7.5,
|
||||
soteria_backup_targets=["bad-target", "gitea/gitea-data"],
|
||||
soteria_restore_test_targets=[],
|
||||
)
|
||||
monkeypatch.setattr("ariadne.services.soteria.settings", dummy)
|
||||
client = DummyClient([DummyResponse(status_code=502, payload={"detail": "upstream fail"})])
|
||||
monkeypatch.setattr(soteria_module.httpx, "Client", lambda **kwargs: client)
|
||||
|
||||
summary = soteria_module.SoteriaService().run_scheduled_backups()
|
||||
|
||||
assert summary.status == "error"
|
||||
assert summary.attempted == 1
|
||||
assert summary.failed == 1
|
||||
assert summary.skipped == 1
|
||||
|
||||
|
||||
def test_scheduled_restore_posts_for_targets(monkeypatch) -> None:
|
||||
dummy = SimpleNamespace(
|
||||
soteria_base_url="http://soteria.maintenance.svc.cluster.local",
|
||||
soteria_backup_url="",
|
||||
soteria_restore_test_url="",
|
||||
soteria_timeout_sec=7.5,
|
||||
soteria_backup_targets=[],
|
||||
soteria_restore_test_targets=["jenkins/jenkins=soteria-restore-smoke"],
|
||||
)
|
||||
monkeypatch.setattr("ariadne.services.soteria.settings", dummy)
|
||||
client = DummyClient([DummyResponse(payload={"status": "ok", "volume": "restore-vol"})])
|
||||
monkeypatch.setattr(soteria_module.httpx, "Client", lambda **kwargs: client)
|
||||
|
||||
summary = soteria_module.SoteriaService().run_scheduled_restore_tests()
|
||||
|
||||
assert summary.status == "ok"
|
||||
assert summary.attempted == 1
|
||||
assert summary.succeeded == 1
|
||||
assert client.calls[0][0].endswith("/v1/restore-test")
|
||||
|
||||
@ -7,6 +7,7 @@ import httpx
|
||||
from ariadne.services.mailu import MailuService
|
||||
from ariadne.utils.errors import safe_error_detail
|
||||
from ariadne.utils.http import extract_bearer_token
|
||||
from ariadne.utils.name_generator import NameGenerator
|
||||
from ariadne.utils.passwords import random_password
|
||||
|
||||
|
||||
@ -81,6 +82,22 @@ def test_safe_error_detail_timeout() -> None:
|
||||
assert safe_error_detail(exc, "fallback") == "timeout"
|
||||
|
||||
|
||||
def test_safe_error_detail_runtime_blank_uses_fallback() -> None:
|
||||
assert safe_error_detail(RuntimeError(" "), "fallback") == "fallback"
|
||||
|
||||
|
||||
def test_safe_error_detail_http_status_without_body() -> None:
|
||||
request = httpx.Request("GET", "https://example.com")
|
||||
response = httpx.Response(403, request=request)
|
||||
exc = httpx.HTTPStatusError("bad", request=request, response=response)
|
||||
|
||||
assert safe_error_detail(exc, "fallback") == "http 403"
|
||||
|
||||
|
||||
def test_safe_error_detail_unknown_exception_uses_fallback() -> None:
|
||||
assert safe_error_detail(ValueError("boom"), "fallback") == "fallback"
|
||||
|
||||
|
||||
def test_extract_bearer_token() -> None:
|
||||
request = DummyRequest({"Authorization": "Bearer token123"})
|
||||
assert extract_bearer_token(request) == "token123"
|
||||
@ -94,3 +111,24 @@ def test_extract_bearer_token_invalid() -> None:
|
||||
def test_extract_bearer_token_missing_parts() -> None:
|
||||
request = DummyRequest({"Authorization": "Bearer"})
|
||||
assert extract_bearer_token(request) is None
|
||||
|
||||
|
||||
def test_name_generator_unique_adds_candidate(monkeypatch) -> None:
|
||||
generator = NameGenerator(max_attempts=2)
|
||||
monkeypatch.setattr(NameGenerator, "generate", lambda self: "atlas-guest")
|
||||
existing = {"taken"}
|
||||
|
||||
assert generator.unique(existing) == "atlas-guest"
|
||||
assert "atlas-guest" in existing
|
||||
|
||||
|
||||
def test_name_generator_unique_returns_none_after_retries(monkeypatch) -> None:
|
||||
generator = NameGenerator(max_attempts=2)
|
||||
monkeypatch.setattr(NameGenerator, "generate", lambda self: "taken")
|
||||
assert generator.unique({"taken"}) is None
|
||||
|
||||
|
||||
def test_name_generator_generate_normalizes_words(monkeypatch) -> None:
|
||||
monkeypatch.setattr("ariadne.utils.name_generator.coolname.generate", lambda words: [" Atlas ", "", "Guest"])
|
||||
generator = NameGenerator(words=3)
|
||||
assert generator.generate() == "atlas-guest"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user