Compare commits
1 Commits
master
...
feature/qu
| Author | SHA1 | Date | |
|---|---|---|---|
| ff3339fafc |
124
Jenkinsfile
vendored
124
Jenkinsfile
vendored
@ -76,8 +76,8 @@ spec:
|
|||||||
IMAGE = "${REGISTRY}/ariadne"
|
IMAGE = "${REGISTRY}/ariadne"
|
||||||
VERSION_TAG = 'dev'
|
VERSION_TAG = 'dev'
|
||||||
SEMVER = 'dev'
|
SEMVER = 'dev'
|
||||||
COVERAGE_MIN = '99'
|
|
||||||
COVERAGE_JSON = 'build/coverage.json'
|
COVERAGE_JSON = 'build/coverage.json'
|
||||||
|
QUALITY_GATE_JSON = 'build/quality-gate.json'
|
||||||
JUNIT_XML = 'build/junit.xml'
|
JUNIT_XML = 'build/junit.xml'
|
||||||
SUITE_NAME = 'ariadne'
|
SUITE_NAME = 'ariadne'
|
||||||
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
|
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
|
||||||
@ -102,30 +102,20 @@ spec:
|
|||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
mkdir -p build
|
mkdir -p build
|
||||||
python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt
|
python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt
|
||||||
python -m ruff check ariadne --select PLR
|
python -m ruff check ariadne tests scripts
|
||||||
python -m slipcover \
|
python -m slipcover \
|
||||||
--json \
|
--json \
|
||||||
--out "${COVERAGE_JSON}" \
|
--out "${COVERAGE_JSON}" \
|
||||||
--source ariadne \
|
--source ariadne \
|
||||||
--fail-under "${COVERAGE_MIN}" \
|
|
||||||
-m pytest -ra -vv --durations=20 --junitxml "${JUNIT_XML}"
|
-m pytest -ra -vv --durations=20 --junitxml "${JUNIT_XML}"
|
||||||
|
python scripts/check_quality_gate.py --coverage-json "${COVERAGE_JSON}" --output "${QUALITY_GATE_JSON}"
|
||||||
python -c "import json; payload=json.load(open('build/coverage.json', encoding='utf-8')); percent=(payload.get('summary') or {}).get('percent_covered'); print(f'Coverage summary: {percent:.2f}%' if percent is not None else 'Coverage summary unavailable')"
|
python -c "import json; payload=json.load(open('build/coverage.json', encoding='utf-8')); percent=(payload.get('summary') or {}).get('percent_covered'); print(f'Coverage summary: {percent:.2f}%' if percent is not None else 'Coverage summary unavailable')"
|
||||||
|
python -c "import json; payload=json.load(open('build/quality-gate.json', encoding='utf-8')); summary=payload.get('summary') or {}; print(f\"Quality gate: {payload.get('status')} violations={summary.get('violations_total', 0)} coverage_targets={summary.get('coverage_targets', 0)}\")"
|
||||||
'''.stripIndent())
|
'''.stripIndent())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stage('Publish test metrics') {
|
|
||||||
steps {
|
|
||||||
container('tester') {
|
|
||||||
sh '''
|
|
||||||
set -euo pipefail
|
|
||||||
python scripts/publish_test_metrics.py
|
|
||||||
'''
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stage('Prep toolchain') {
|
stage('Prep toolchain') {
|
||||||
steps {
|
steps {
|
||||||
container('builder') {
|
container('builder') {
|
||||||
@ -202,97 +192,21 @@ python -c "import json; payload=json.load(open('build/coverage.json', encoding='
|
|||||||
}
|
}
|
||||||
|
|
||||||
post {
|
post {
|
||||||
success {
|
|
||||||
container('tester') {
|
|
||||||
sh '''
|
|
||||||
set -euo pipefail
|
|
||||||
export QUALITY_STATUS=ok
|
|
||||||
python - <<'PY'
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import urllib.request
|
|
||||||
|
|
||||||
suite = os.environ.get("SUITE_NAME", "ariadne")
|
|
||||||
status = os.environ.get("QUALITY_STATUS", "failed")
|
|
||||||
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
|
|
||||||
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
|
|
||||||
|
|
||||||
def counter(name: str) -> float:
|
|
||||||
pattern = re.compile(
|
|
||||||
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
|
|
||||||
re.M,
|
|
||||||
)
|
|
||||||
match = pattern.search(text)
|
|
||||||
return float(match.group(1)) if match else 0.0
|
|
||||||
|
|
||||||
ok = counter("ok")
|
|
||||||
failed = counter("failed")
|
|
||||||
if status == "ok":
|
|
||||||
ok += 1
|
|
||||||
else:
|
|
||||||
failed += 1
|
|
||||||
payload = (
|
|
||||||
"# TYPE platform_quality_gate_runs_total counter\\n"
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
|
|
||||||
)
|
|
||||||
req = urllib.request.Request(
|
|
||||||
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
|
|
||||||
data=payload.encode("utf-8"),
|
|
||||||
method="POST",
|
|
||||||
headers={"Content-Type": "text/plain"},
|
|
||||||
)
|
|
||||||
urllib.request.urlopen(req, timeout=10).read()
|
|
||||||
PY
|
|
||||||
'''
|
|
||||||
}
|
|
||||||
}
|
|
||||||
failure {
|
|
||||||
container('tester') {
|
|
||||||
sh '''
|
|
||||||
set -euo pipefail
|
|
||||||
export QUALITY_STATUS=failed
|
|
||||||
python - <<'PY'
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import urllib.request
|
|
||||||
|
|
||||||
suite = os.environ.get("SUITE_NAME", "ariadne")
|
|
||||||
status = os.environ.get("QUALITY_STATUS", "failed")
|
|
||||||
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
|
|
||||||
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
|
|
||||||
|
|
||||||
def counter(name: str) -> float:
|
|
||||||
pattern = re.compile(
|
|
||||||
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
|
|
||||||
re.M,
|
|
||||||
)
|
|
||||||
match = pattern.search(text)
|
|
||||||
return float(match.group(1)) if match else 0.0
|
|
||||||
|
|
||||||
ok = counter("ok")
|
|
||||||
failed = counter("failed")
|
|
||||||
if status == "ok":
|
|
||||||
ok += 1
|
|
||||||
else:
|
|
||||||
failed += 1
|
|
||||||
payload = (
|
|
||||||
"# TYPE platform_quality_gate_runs_total counter\\n"
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
|
|
||||||
)
|
|
||||||
req = urllib.request.Request(
|
|
||||||
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
|
|
||||||
data=payload.encode("utf-8"),
|
|
||||||
method="POST",
|
|
||||||
headers={"Content-Type": "text/plain"},
|
|
||||||
)
|
|
||||||
urllib.request.urlopen(req, timeout=10).read()
|
|
||||||
PY
|
|
||||||
'''
|
|
||||||
}
|
|
||||||
}
|
|
||||||
always {
|
always {
|
||||||
|
script {
|
||||||
|
env.QUALITY_STATUS = currentBuild.currentResult == 'SUCCESS' ? 'ok' : 'failed'
|
||||||
|
}
|
||||||
|
container('tester') {
|
||||||
|
sh '''
|
||||||
|
set +e
|
||||||
|
python scripts/publish_test_metrics.py
|
||||||
|
status=$?
|
||||||
|
set -e
|
||||||
|
if [ "$status" -ne 0 ]; then
|
||||||
|
echo "test metric publication failed with status $status"
|
||||||
|
fi
|
||||||
|
'''
|
||||||
|
}
|
||||||
script {
|
script {
|
||||||
if (fileExists('build/junit.xml')) {
|
if (fileExists('build/junit.xml')) {
|
||||||
try {
|
try {
|
||||||
@ -302,7 +216,7 @@ PY
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json', allowEmptyArchive: true, fingerprint: true
|
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json,build/quality-gate.json', allowEmptyArchive: true, fingerprint: true
|
||||||
script {
|
script {
|
||||||
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]
|
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]
|
||||||
echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}"
|
echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}"
|
||||||
|
|||||||
@ -12,6 +12,13 @@ from ..settings import settings
|
|||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class AuthContext:
|
class AuthContext:
|
||||||
|
"""Authenticated user details returned by the OIDC verifier.
|
||||||
|
|
||||||
|
Inputs: normalized claims extracted from a validated bearer token.
|
||||||
|
Outputs: a compact object that downstream handlers can trust without
|
||||||
|
repeating token parsing logic.
|
||||||
|
"""
|
||||||
|
|
||||||
username: str
|
username: str
|
||||||
email: str
|
email: str
|
||||||
groups: list[str]
|
groups: list[str]
|
||||||
@ -19,6 +26,13 @@ class AuthContext:
|
|||||||
|
|
||||||
|
|
||||||
class KeycloakOIDC:
|
class KeycloakOIDC:
|
||||||
|
"""Validate Keycloak-issued access tokens for Ariadne API requests.
|
||||||
|
|
||||||
|
Inputs: the JWKS URL, expected issuer, and client identifier.
|
||||||
|
Outputs: verified token claims after signature and audience checks so the
|
||||||
|
API can make authorization decisions safely.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
|
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
|
||||||
self._jwks_url = jwks_url
|
self._jwks_url = jwks_url
|
||||||
self._issuer = issuer
|
self._issuer = issuer
|
||||||
@ -97,6 +111,13 @@ class KeycloakOIDC:
|
|||||||
|
|
||||||
|
|
||||||
class Authenticator:
|
class Authenticator:
|
||||||
|
"""Convert bearer tokens into normalized Ariadne auth contexts.
|
||||||
|
|
||||||
|
Inputs: raw bearer tokens from incoming API requests.
|
||||||
|
Outputs: an `AuthContext` with cleaned usernames, emails, and groups so
|
||||||
|
endpoint handlers can stay focused on business logic.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)
|
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,13 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class DatabaseConfig:
|
class DatabaseConfig:
|
||||||
|
"""Connection-pool and timeout settings for a database client.
|
||||||
|
|
||||||
|
Inputs: pool sizing and timeout values supplied by application settings.
|
||||||
|
Outputs: a single immutable config object so database construction remains
|
||||||
|
explicit and easy to test.
|
||||||
|
"""
|
||||||
|
|
||||||
pool_min: int = 0
|
pool_min: int = 0
|
||||||
pool_max: int = 5
|
pool_max: int = 5
|
||||||
connect_timeout_sec: int = 5
|
connect_timeout_sec: int = 5
|
||||||
@ -25,6 +32,13 @@ class DatabaseConfig:
|
|||||||
|
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
|
"""Thin wrapper around a psycopg connection pool for Ariadne storage.
|
||||||
|
|
||||||
|
Inputs: a Postgres DSN plus optional pool and timeout configuration.
|
||||||
|
Outputs: helper methods for migrations and common query patterns while
|
||||||
|
centralizing timeout, locking, and row-format behavior.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, dsn: str, config: DatabaseConfig | None = None) -> None:
|
def __init__(self, dsn: str, config: DatabaseConfig | None = None) -> None:
|
||||||
if not dsn:
|
if not dsn:
|
||||||
raise RuntimeError("database URL is required")
|
raise RuntimeError("database URL is required")
|
||||||
|
|||||||
@ -56,6 +56,13 @@ def delete_json(path: str) -> dict[str, Any]:
|
|||||||
|
|
||||||
|
|
||||||
def get_secret_value(namespace: str, name: str, key: str) -> str:
|
def get_secret_value(namespace: str, name: str, key: str) -> str:
|
||||||
|
"""Read and decode a string value from a Kubernetes Secret.
|
||||||
|
|
||||||
|
Inputs: a namespace, secret name, and key inside the secret data map.
|
||||||
|
Outputs: the decoded UTF-8 value so callers can consume cluster-managed
|
||||||
|
credentials without duplicating base64 and validation logic.
|
||||||
|
"""
|
||||||
|
|
||||||
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
|
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
|
||||||
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
|
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
|
||||||
raw = blob.get(key)
|
raw = blob.get(key)
|
||||||
|
|||||||
@ -16,7 +16,7 @@ except Exception as exc: # pragma: no cover - import checked at runtime
|
|||||||
else:
|
else:
|
||||||
_IMPORT_ERROR = None
|
_IMPORT_ERROR = None
|
||||||
|
|
||||||
from .pods import PodSelectionError, select_pod
|
from .pods import select_pod
|
||||||
from ..utils.logging import get_logger
|
from ..utils.logging import get_logger
|
||||||
|
|
||||||
|
|
||||||
@ -26,6 +26,14 @@ _CORE_API = None
|
|||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class ExecResult:
|
class ExecResult:
|
||||||
|
"""Container for pod exec output captured from the Kubernetes stream API.
|
||||||
|
|
||||||
|
Inputs: stdout, stderr, and the observed exit code from a command run in a
|
||||||
|
pod container.
|
||||||
|
Outputs: a small immutable result object that callers can inspect or raise
|
||||||
|
on without juggling stream state.
|
||||||
|
"""
|
||||||
|
|
||||||
stdout: str
|
stdout: str
|
||||||
stderr: str
|
stderr: str
|
||||||
exit_code: int | None
|
exit_code: int | None
|
||||||
@ -65,6 +73,13 @@ def _build_command(command: list[str] | str, env: dict[str, str] | None) -> list
|
|||||||
|
|
||||||
|
|
||||||
class PodExecutor:
|
class PodExecutor:
|
||||||
|
"""Run shell commands in the most suitable pod for a workload selector.
|
||||||
|
|
||||||
|
Inputs: a namespace, label selector, and optional container name.
|
||||||
|
Outputs: structured `ExecResult` objects or `ExecError`/`TimeoutError`
|
||||||
|
exceptions so service code can react consistently to pod command results.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, namespace: str, label_selector: str, container: str | None = None) -> None:
|
def __init__(self, namespace: str, label_selector: str, container: str | None = None) -> None:
|
||||||
self._namespace = namespace
|
self._namespace = namespace
|
||||||
self._label_selector = label_selector
|
self._label_selector = label_selector
|
||||||
|
|||||||
@ -10,6 +10,13 @@ from .client import get_json
|
|||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class PodRef:
|
class PodRef:
|
||||||
|
"""Reference to a Kubernetes pod chosen for a follow-up operation.
|
||||||
|
|
||||||
|
Inputs: the pod name, namespace, and optional node name.
|
||||||
|
Outputs: a stable value object that higher-level helpers can pass around
|
||||||
|
without carrying the full Kubernetes payload.
|
||||||
|
"""
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
namespace: str
|
namespace: str
|
||||||
node: str | None = None
|
node: str | None = None
|
||||||
@ -47,6 +54,13 @@ def _is_ready(pod: dict[str, Any]) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
|
def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
|
||||||
|
"""Fetch pods for a namespace and selector from the Kubernetes API.
|
||||||
|
|
||||||
|
Inputs: the target namespace and a label-selector string.
|
||||||
|
Outputs: only dictionary pod objects so later selection logic can assume a
|
||||||
|
predictable payload shape.
|
||||||
|
"""
|
||||||
|
|
||||||
namespace = (namespace or "").strip()
|
namespace = (namespace or "").strip()
|
||||||
if not namespace:
|
if not namespace:
|
||||||
raise PodSelectionError("pod namespace missing")
|
raise PodSelectionError("pod namespace missing")
|
||||||
@ -58,6 +72,13 @@ def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
|
|||||||
|
|
||||||
|
|
||||||
def select_pod(namespace: str, label_selector: str) -> PodRef:
|
def select_pod(namespace: str, label_selector: str) -> PodRef:
|
||||||
|
"""Pick the newest ready pod for a namespace and label selector.
|
||||||
|
|
||||||
|
Inputs: the namespace and selector used to query Kubernetes pods.
|
||||||
|
Outputs: a `PodRef` for the most recently started ready pod, or a
|
||||||
|
`PodSelectionError` when no safe candidate exists.
|
||||||
|
"""
|
||||||
|
|
||||||
pods = list_pods(namespace, label_selector)
|
pods = list_pods(namespace, label_selector)
|
||||||
candidates: list[tuple[float, PodRef]] = []
|
candidates: list[tuple[float, PodRef]] = []
|
||||||
for pod in pods:
|
for pod in pods:
|
||||||
|
|||||||
@ -84,6 +84,14 @@ def record_schedule_state(
|
|||||||
next_run_ts: float | None,
|
next_run_ts: float | None,
|
||||||
ok: bool | None,
|
ok: bool | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Record scheduler timing and status gauges for a task.
|
||||||
|
|
||||||
|
Inputs: the task name plus timestamps for the last run, last success, next
|
||||||
|
run, and an optional success flag.
|
||||||
|
Outputs: updated Prometheus gauges that make scheduler health visible in
|
||||||
|
dashboards and alerts.
|
||||||
|
"""
|
||||||
|
|
||||||
if last_run_ts:
|
if last_run_ts:
|
||||||
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
|
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
|
||||||
if last_success_ts:
|
if last_success_ts:
|
||||||
@ -108,6 +116,13 @@ def set_cluster_state_metrics(
|
|||||||
pods_running: float | None,
|
pods_running: float | None,
|
||||||
kustomizations_not_ready: int | None,
|
kustomizations_not_ready: int | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Publish the latest cluster-state summary to Prometheus gauges.
|
||||||
|
|
||||||
|
Inputs: the collection timestamp and aggregate node, pod, and Flux counts.
|
||||||
|
Outputs: refreshed gauges so Grafana panels can render the current cluster
|
||||||
|
snapshot without parsing raw service logs.
|
||||||
|
"""
|
||||||
|
|
||||||
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
|
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
|
||||||
if nodes_total is not None:
|
if nodes_total is not None:
|
||||||
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)
|
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)
|
||||||
|
|||||||
@ -24,6 +24,14 @@ def _build_db(dsn: str, application_name: str) -> Database:
|
|||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
"""Run Ariadne and portal schema migrations when enabled in settings.
|
||||||
|
|
||||||
|
Inputs: process-wide application settings for database URLs and migration
|
||||||
|
toggles.
|
||||||
|
Outputs: applied migrations and closed pools so CLI and container startup
|
||||||
|
paths can bootstrap storage safely.
|
||||||
|
"""
|
||||||
|
|
||||||
if not settings.ariadne_run_migrations:
|
if not settings.ariadne_run_migrations:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone
|
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@ -9,7 +8,7 @@ from typing import Any
|
|||||||
import httpx
|
import httpx
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
from ..k8s.exec import ExecError, PodExecutor
|
from ..k8s.exec import ExecError, ExecResult, PodExecutor
|
||||||
from ..k8s.pods import PodSelectionError
|
from ..k8s.pods import PodSelectionError
|
||||||
from ..settings import settings
|
from ..settings import settings
|
||||||
from ..utils.logging import get_logger
|
from ..utils.logging import get_logger
|
||||||
|
|||||||
68
quality_gate.toml
Normal file
68
quality_gate.toml
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
[files]
|
||||||
|
roots = ["ariadne", "tests", "scripts"]
|
||||||
|
max_lines = 500
|
||||||
|
|
||||||
|
[docstrings]
|
||||||
|
roots = ["ariadne", "scripts"]
|
||||||
|
non_trivial_min_lines = 6
|
||||||
|
|
||||||
|
[coverage]
|
||||||
|
roots = ["ariadne"]
|
||||||
|
threshold = 95.0
|
||||||
|
targets = [
|
||||||
|
"ariadne/auth/keycloak.py",
|
||||||
|
"ariadne/db/database.py",
|
||||||
|
"ariadne/k8s/client.py",
|
||||||
|
"ariadne/k8s/pods.py",
|
||||||
|
"ariadne/metrics/metrics.py",
|
||||||
|
"ariadne/migrate.py",
|
||||||
|
"ariadne/utils/errors.py",
|
||||||
|
"ariadne/utils/http.py",
|
||||||
|
"ariadne/utils/logging.py",
|
||||||
|
"ariadne/utils/name_generator.py",
|
||||||
|
"ariadne/utils/passwords.py",
|
||||||
|
]
|
||||||
|
|
||||||
|
[legacy.line_count]
|
||||||
|
"ariadne/app.py" = 996
|
||||||
|
"ariadne/manager/provisioning.py" = 933
|
||||||
|
"ariadne/services/cluster_state.py" = 3705
|
||||||
|
"ariadne/services/comms.py" = 943
|
||||||
|
"ariadne/services/firefly.py" = 667
|
||||||
|
"ariadne/services/nextcloud.py" = 698
|
||||||
|
"ariadne/services/vault.py" = 558
|
||||||
|
"ariadne/services/wger.py" = 624
|
||||||
|
"ariadne/settings.py" = 590
|
||||||
|
"tests/test_app.py" = 1001
|
||||||
|
"tests/test_keycloak_admin.py" = 679
|
||||||
|
"tests/test_provisioning.py" = 1762
|
||||||
|
"tests/test_services.py" = 1483
|
||||||
|
|
||||||
|
[legacy.docstrings]
|
||||||
|
"ariadne/app.py" = 15
|
||||||
|
"ariadne/db/storage.py" = 4
|
||||||
|
"ariadne/manager/provisioning.py" = 2
|
||||||
|
"ariadne/scheduler/cron.py" = 1
|
||||||
|
"ariadne/services/cluster_state.py" = 4
|
||||||
|
"ariadne/services/comms.py" = 2
|
||||||
|
"ariadne/services/firefly.py" = 4
|
||||||
|
"ariadne/services/image_sweeper.py" = 1
|
||||||
|
"ariadne/services/keycloak_admin.py" = 1
|
||||||
|
"ariadne/services/keycloak_profile.py" = 2
|
||||||
|
"ariadne/services/mailer.py" = 1
|
||||||
|
"ariadne/services/mailu.py" = 5
|
||||||
|
"ariadne/services/mailu_events.py" = 1
|
||||||
|
"ariadne/services/metis.py" = 1
|
||||||
|
"ariadne/services/nextcloud.py" = 3
|
||||||
|
"ariadne/services/opensearch_prune.py" = 2
|
||||||
|
"ariadne/services/pod_cleaner.py" = 1
|
||||||
|
"ariadne/services/soteria.py" = 2
|
||||||
|
"ariadne/services/vault.py" = 2
|
||||||
|
"ariadne/services/vaultwarden.py" = 1
|
||||||
|
"ariadne/services/vaultwarden_sync.py" = 4
|
||||||
|
"ariadne/services/wger.py" = 4
|
||||||
|
"ariadne/settings.py" = 1
|
||||||
|
"ariadne/utils/errors.py" = 1
|
||||||
|
"ariadne/utils/http.py" = 1
|
||||||
|
"ariadne/utils/logging.py" = 3
|
||||||
|
"ariadne/utils/name_generator.py" = 1
|
||||||
407
scripts/check_quality_gate.py
Executable file
407
scripts/check_quality_gate.py
Executable file
@ -0,0 +1,407 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Enforce Ariadne's ratcheting test-quality gate.
|
||||||
|
|
||||||
|
Inputs: repository Python files, optional coverage JSON, and a TOML config that
|
||||||
|
captures the current legacy exceptions.
|
||||||
|
Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or
|
||||||
|
coverage requirements regress, so CI can block quality drift while allowing
|
||||||
|
incremental cleanup.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import ast
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
import tomllib
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DefinitionFinding:
|
||||||
|
"""Describe a public definition missing a required docstring.
|
||||||
|
|
||||||
|
Inputs: the symbol kind, name, source line, and logical size.
|
||||||
|
Outputs: a compact record that makes docstring failures actionable in CLI
|
||||||
|
output and in the JSON quality report.
|
||||||
|
"""
|
||||||
|
|
||||||
|
kind: str
|
||||||
|
name: str
|
||||||
|
lineno: int
|
||||||
|
length: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Violation:
|
||||||
|
"""Represent a single quality-gate violation.
|
||||||
|
|
||||||
|
Inputs: the violated check name, file path, and a human-readable message.
|
||||||
|
Outputs: a normalized record for console rendering and JSON serialization so
|
||||||
|
Jenkins and local runs report the same facts.
|
||||||
|
"""
|
||||||
|
|
||||||
|
check: str
|
||||||
|
path: str
|
||||||
|
message: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class QualityConfig:
|
||||||
|
"""Typed quality-gate settings loaded from TOML.
|
||||||
|
|
||||||
|
Inputs: parsed configuration sections for file-size, docstrings, and
|
||||||
|
coverage enforcement.
|
||||||
|
Outputs: one immutable object so the gate logic stays deterministic and easy
|
||||||
|
to test.
|
||||||
|
"""
|
||||||
|
|
||||||
|
line_roots: tuple[str, ...]
|
||||||
|
max_lines: int
|
||||||
|
legacy_max_lines: dict[str, int]
|
||||||
|
docstring_roots: tuple[str, ...]
|
||||||
|
non_trivial_min_lines: int
|
||||||
|
legacy_missing_docstrings: dict[str, int]
|
||||||
|
coverage_roots: tuple[str, ...]
|
||||||
|
coverage_targets: tuple[str, ...]
|
||||||
|
coverage_threshold: float
|
||||||
|
|
||||||
|
|
||||||
|
def _load_config(path: Path) -> QualityConfig:
|
||||||
|
"""Load the quality-gate config from TOML.
|
||||||
|
|
||||||
|
Inputs: the path to the repository-local TOML config file.
|
||||||
|
Outputs: validated `QualityConfig` values used by every gate check so local
|
||||||
|
runs and Jenkins share the same policy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
payload = tomllib.loads(path.read_text(encoding="utf-8"))
|
||||||
|
files = payload.get("files") or {}
|
||||||
|
docstrings = payload.get("docstrings") or {}
|
||||||
|
coverage = payload.get("coverage") or {}
|
||||||
|
legacy = payload.get("legacy") or {}
|
||||||
|
return QualityConfig(
|
||||||
|
line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")),
|
||||||
|
max_lines=int(files.get("max_lines", 500)),
|
||||||
|
legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()},
|
||||||
|
docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")),
|
||||||
|
non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)),
|
||||||
|
legacy_missing_docstrings={
|
||||||
|
str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items()
|
||||||
|
},
|
||||||
|
coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)),
|
||||||
|
coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()),
|
||||||
|
coverage_threshold=float(coverage.get("threshold", 95.0)),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]:
|
||||||
|
"""Collect Python files under the configured roots.
|
||||||
|
|
||||||
|
Inputs: the repository root plus the roots to scan.
|
||||||
|
Outputs: sorted Python paths so the gate produces stable results and diffs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
files: list[Path] = []
|
||||||
|
for root in roots:
|
||||||
|
base = repo_root / root
|
||||||
|
if not base.exists():
|
||||||
|
continue
|
||||||
|
files.extend(sorted(base.rglob("*.py")))
|
||||||
|
return sorted({path for path in files})
|
||||||
|
|
||||||
|
|
||||||
|
def _relative(path: Path, repo_root: Path) -> str:
|
||||||
|
return path.relative_to(repo_root).as_posix()
|
||||||
|
|
||||||
|
|
||||||
|
def _line_count(path: Path) -> int:
|
||||||
|
return len(path.read_text(encoding="utf-8").splitlines())
|
||||||
|
|
||||||
|
|
||||||
|
def _definition_length(node: ast.AST) -> int:
|
||||||
|
end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0)
|
||||||
|
return max(end_lineno - getattr(node, "lineno", 0) + 1, 1)
|
||||||
|
|
||||||
|
|
||||||
|
def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]:
|
||||||
|
"""Find public top-level definitions missing required docstrings.
|
||||||
|
|
||||||
|
Inputs: a Python file path and the minimum logical size considered
|
||||||
|
non-trivial.
|
||||||
|
Outputs: missing-docstring findings so the gate can ratchet legacy files
|
||||||
|
while blocking new undocumented public APIs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||||||
|
findings: list[DefinitionFinding] = []
|
||||||
|
for node in module.body:
|
||||||
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||||||
|
continue
|
||||||
|
if node.name.startswith("_"):
|
||||||
|
continue
|
||||||
|
length = _definition_length(node)
|
||||||
|
if length < min_lines:
|
||||||
|
continue
|
||||||
|
if ast.get_docstring(node) is not None:
|
||||||
|
continue
|
||||||
|
findings.append(
|
||||||
|
DefinitionFinding(
|
||||||
|
kind=type(node).__name__,
|
||||||
|
name=node.name,
|
||||||
|
lineno=getattr(node, "lineno", 1),
|
||||||
|
length=length,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return findings
|
||||||
|
|
||||||
|
|
||||||
|
def _excluded_coverage_lines(path: Path) -> set[int]:
|
||||||
|
"""Collect non-executable lines that Slipcover still reports as missing.
|
||||||
|
|
||||||
|
Inputs: a Python source file path.
|
||||||
|
Outputs: line numbers for multiline definition headers and docstring blocks
|
||||||
|
so adjusted per-file coverage tracks executable logic rather than syntax
|
||||||
|
scaffolding required for readability.
|
||||||
|
"""
|
||||||
|
|
||||||
|
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||||||
|
excluded: set[int] = set()
|
||||||
|
|
||||||
|
def visit(node: ast.AST) -> None:
|
||||||
|
if isinstance(node, ast.Module):
|
||||||
|
for child in node.body:
|
||||||
|
visit(child)
|
||||||
|
return
|
||||||
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||||||
|
return
|
||||||
|
if node.body:
|
||||||
|
body_start = node.body[0].lineno
|
||||||
|
excluded.update(range(node.lineno + 1, body_start))
|
||||||
|
first = node.body[0]
|
||||||
|
if (
|
||||||
|
isinstance(first, ast.Expr)
|
||||||
|
and isinstance(first.value, ast.Constant)
|
||||||
|
and isinstance(first.value.value, str)
|
||||||
|
):
|
||||||
|
excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1))
|
||||||
|
for child in node.body:
|
||||||
|
visit(child)
|
||||||
|
|
||||||
|
visit(module)
|
||||||
|
return excluded
|
||||||
|
|
||||||
|
|
||||||
|
def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]:
|
||||||
|
"""Read per-file coverage details from Slipcover JSON output.
|
||||||
|
|
||||||
|
Inputs: the optional coverage artifact path produced by the test run.
|
||||||
|
Outputs: raw and adjusted per-file coverage details so the gate can enforce
|
||||||
|
realistic thresholds even when Slipcover counts docstrings and wrapped
|
||||||
|
signatures as missing lines.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if path is None or not path.exists():
|
||||||
|
return {}
|
||||||
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
files = payload.get("files") or {}
|
||||||
|
coverage: dict[str, dict[str, Any]] = {}
|
||||||
|
for name, data in files.items():
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
continue
|
||||||
|
summary = data.get("summary") or {}
|
||||||
|
percent = summary.get("percent_covered")
|
||||||
|
executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)}
|
||||||
|
missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)}
|
||||||
|
relative = str(name)
|
||||||
|
source_path = repo_root / relative
|
||||||
|
excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set()
|
||||||
|
adjusted_missing = missing_lines - excluded_lines
|
||||||
|
adjusted_total = len(executed_lines) + len(adjusted_missing)
|
||||||
|
adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0
|
||||||
|
coverage[relative] = {
|
||||||
|
"raw_percent": float(percent) if isinstance(percent, (int, float)) else None,
|
||||||
|
"adjusted_percent": adjusted_percent,
|
||||||
|
"excluded_lines": sorted(excluded_lines),
|
||||||
|
}
|
||||||
|
return coverage
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]:
|
||||||
|
return [{"check": item.check, "path": item.path, "message": item.message} for item in violations]
|
||||||
|
|
||||||
|
|
||||||
|
def _build_report(
|
||||||
|
repo_root: Path,
|
||||||
|
config: QualityConfig,
|
||||||
|
coverage: dict[str, dict[str, Any]],
|
||||||
|
coverage_artifact_present: bool,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Run all configured checks and build a JSON-serializable report.
|
||||||
|
|
||||||
|
Inputs: repository paths, quality-gate config, and per-file coverage data.
|
||||||
|
Outputs: a complete report for CI artifacts, metrics publication, and local
|
||||||
|
debugging when the gate fails.
|
||||||
|
"""
|
||||||
|
|
||||||
|
violations: list[Violation] = []
|
||||||
|
files_report: dict[str, dict[str, Any]] = {}
|
||||||
|
|
||||||
|
for path in _iter_python_files(repo_root, config.line_roots):
|
||||||
|
relative = _relative(path, repo_root)
|
||||||
|
lines = _line_count(path)
|
||||||
|
entry = files_report.setdefault(relative, {})
|
||||||
|
entry["lines"] = lines
|
||||||
|
entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines)
|
||||||
|
entry["line_limit_legacy"] = relative in config.legacy_max_lines
|
||||||
|
if lines > entry["line_limit"]:
|
||||||
|
violations.append(
|
||||||
|
Violation(
|
||||||
|
"line_count",
|
||||||
|
relative,
|
||||||
|
f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for path in _iter_python_files(repo_root, config.docstring_roots):
|
||||||
|
relative = _relative(path, repo_root)
|
||||||
|
findings = _missing_docstrings(path, config.non_trivial_min_lines)
|
||||||
|
entry = files_report.setdefault(relative, {})
|
||||||
|
entry["missing_docstrings"] = len(findings)
|
||||||
|
entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0)
|
||||||
|
entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings
|
||||||
|
entry["missing_docstring_symbols"] = [
|
||||||
|
{
|
||||||
|
"kind": finding.kind,
|
||||||
|
"name": finding.name,
|
||||||
|
"lineno": finding.lineno,
|
||||||
|
"length": finding.length,
|
||||||
|
}
|
||||||
|
for finding in findings
|
||||||
|
]
|
||||||
|
if len(findings) > entry["missing_docstrings_allowed"]:
|
||||||
|
excess = findings[entry["missing_docstrings_allowed"] :]
|
||||||
|
for finding in excess:
|
||||||
|
violations.append(
|
||||||
|
Violation(
|
||||||
|
"docstrings",
|
||||||
|
relative,
|
||||||
|
f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
coverage_target_set = set(config.coverage_targets)
|
||||||
|
coverage_root_files = {
|
||||||
|
_relative(path, repo_root)
|
||||||
|
for path in _iter_python_files(repo_root, config.coverage_roots)
|
||||||
|
}
|
||||||
|
for relative in sorted(coverage_root_files):
|
||||||
|
entry = files_report.setdefault(relative, {})
|
||||||
|
details = coverage.get(relative) or {}
|
||||||
|
value = details.get("adjusted_percent")
|
||||||
|
entry["coverage_percent"] = value
|
||||||
|
entry["coverage_raw_percent"] = details.get("raw_percent")
|
||||||
|
entry["coverage_excluded_lines"] = details.get("excluded_lines") or []
|
||||||
|
entry["coverage_enforced"] = relative in coverage_target_set
|
||||||
|
if relative not in coverage_target_set:
|
||||||
|
continue
|
||||||
|
entry["coverage_target"] = config.coverage_threshold
|
||||||
|
if value is None:
|
||||||
|
violations.append(
|
||||||
|
Violation("coverage", relative, f"missing coverage data for {relative}")
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if value < config.coverage_threshold:
|
||||||
|
violations.append(
|
||||||
|
Violation(
|
||||||
|
"coverage",
|
||||||
|
relative,
|
||||||
|
f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if coverage_target_set and not coverage_artifact_present:
|
||||||
|
violations.append(
|
||||||
|
Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets")
|
||||||
|
)
|
||||||
|
|
||||||
|
summary = {
|
||||||
|
"violations_total": len(violations),
|
||||||
|
"line_count_violations": sum(item.check == "line_count" for item in violations),
|
||||||
|
"docstring_violations": sum(item.check == "docstrings" for item in violations),
|
||||||
|
"coverage_violations": sum(item.check == "coverage" for item in violations),
|
||||||
|
"legacy_line_count_files": len(config.legacy_max_lines),
|
||||||
|
"legacy_docstring_files": len(config.legacy_missing_docstrings),
|
||||||
|
"coverage_targets": len(coverage_target_set),
|
||||||
|
"coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0),
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"status": "ok" if not violations else "failed",
|
||||||
|
"rules": {
|
||||||
|
"max_lines": config.max_lines,
|
||||||
|
"docstring_non_trivial_min_lines": config.non_trivial_min_lines,
|
||||||
|
"coverage_threshold": config.coverage_threshold,
|
||||||
|
},
|
||||||
|
"summary": summary,
|
||||||
|
"violations": _serialize_violations(violations),
|
||||||
|
"files": dict(sorted(files_report.items())),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _print_report(report: dict[str, Any]) -> None:
|
||||||
|
"""Render a concise CLI summary for local and Jenkins logs.
|
||||||
|
|
||||||
|
Inputs: the JSON-ready report produced by the gate.
|
||||||
|
Outputs: human-readable lines that point directly at each violation so a
|
||||||
|
failing build is easy to fix.
|
||||||
|
"""
|
||||||
|
|
||||||
|
print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True))
|
||||||
|
for violation in report.get("violations") or []:
|
||||||
|
print(f"[{violation['check']}] {violation['path']}: {violation['message']}")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
"""Parse CLI arguments for the quality gate.
|
||||||
|
|
||||||
|
Inputs: command-line flags supplied by Jenkins or a local developer.
|
||||||
|
Outputs: normalized paths and options so the gate stays scriptable and
|
||||||
|
predictable in every environment.
|
||||||
|
"""
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
|
parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config")
|
||||||
|
parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output")
|
||||||
|
parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
"""Run the Ariadne quality gate and write its JSON report.
|
||||||
|
|
||||||
|
Inputs: CLI arguments naming the config and optional coverage artifact.
|
||||||
|
Outputs: a persisted JSON report and a process exit code that Jenkins can
|
||||||
|
use to enforce quality rules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
args = parse_args()
|
||||||
|
repo_root = Path.cwd()
|
||||||
|
config_path = repo_root / args.config
|
||||||
|
output_path = repo_root / args.output
|
||||||
|
coverage_path = repo_root / args.coverage_json
|
||||||
|
|
||||||
|
config = _load_config(config_path)
|
||||||
|
coverage_present = coverage_path.exists()
|
||||||
|
coverage = _load_coverage(coverage_path if coverage_present else None, repo_root)
|
||||||
|
report = _build_report(repo_root, config, coverage, coverage_present)
|
||||||
|
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||||||
|
_print_report(report)
|
||||||
|
return 0 if report["status"] == "ok" else 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
284
scripts/publish_test_metrics.py
Normal file → Executable file
284
scripts/publish_test_metrics.py
Normal file → Executable file
@ -1,35 +1,74 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Publish Ariadne quality-gate test metrics to Pushgateway (Prometheus ingest)."""
|
"""Publish Ariadne test and quality-gate metrics to Pushgateway.
|
||||||
|
|
||||||
|
Inputs: build artifacts such as JUnit XML, Slipcover coverage JSON, optional
|
||||||
|
quality-gate JSON, and standard Jenkins metadata from the environment.
|
||||||
|
Outputs: Prometheus metric lines pushed to Pushgateway so Grafana can chart test
|
||||||
|
health, quality-gate drift, and build context even when a build fails early.
|
||||||
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
|
||||||
def _escape_label(value: str) -> str:
|
def _escape_label(value: str) -> str:
|
||||||
|
"""Escape a Prometheus label value.
|
||||||
|
|
||||||
|
Inputs: an arbitrary label string from build metadata.
|
||||||
|
Outputs: a safely escaped label fragment so pushed metric lines remain valid
|
||||||
|
Prometheus exposition format.
|
||||||
|
"""
|
||||||
|
|
||||||
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
||||||
|
|
||||||
|
|
||||||
def _label_str(labels: dict[str, str]) -> str:
|
def _label_str(labels: dict[str, str]) -> str:
|
||||||
|
"""Build a Prometheus label set from non-empty values.
|
||||||
|
|
||||||
|
Inputs: a dictionary of label keys and raw string values.
|
||||||
|
Outputs: a `{...}` label fragment or an empty string so callers can compose
|
||||||
|
metrics without repeating label-filtering logic.
|
||||||
|
"""
|
||||||
|
|
||||||
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
||||||
return "{" + ",".join(parts) + "}" if parts else ""
|
return "{" + ",".join(parts) + "}" if parts else ""
|
||||||
|
|
||||||
|
|
||||||
def _load_coverage(path: str) -> float:
|
def _load_coverage(path: Path) -> float | None:
|
||||||
with open(path, "r", encoding="utf-8") as handle:
|
"""Read the overall Slipcover percentage if the artifact exists.
|
||||||
payload = json.load(handle)
|
|
||||||
|
Inputs: the expected coverage artifact path.
|
||||||
|
Outputs: the percent covered value, or `None` when the artifact is missing or
|
||||||
|
malformed so failed builds can still publish partial metrics.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
return None
|
||||||
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||||
summary = payload.get("summary") or {}
|
summary = payload.get("summary") or {}
|
||||||
percent = summary.get("percent_covered")
|
percent = summary.get("percent_covered")
|
||||||
if isinstance(percent, (int, float)):
|
if isinstance(percent, (int, float)):
|
||||||
return float(percent)
|
return float(percent)
|
||||||
raise RuntimeError("coverage summary missing percent_covered")
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _load_junit(path: str) -> dict[str, int]:
|
def _load_junit(path: Path) -> dict[str, int] | None:
|
||||||
|
"""Aggregate JUnit totals if the artifact exists.
|
||||||
|
|
||||||
|
Inputs: the expected JUnit XML path.
|
||||||
|
Outputs: test totals by outcome, or `None` when the artifact is absent so the
|
||||||
|
publisher can still emit build and gate status metrics.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
return None
|
||||||
|
|
||||||
tree = ET.parse(path)
|
tree = ET.parse(path)
|
||||||
root = tree.getroot()
|
root = tree.getroot()
|
||||||
|
|
||||||
@ -57,6 +96,20 @@ def _load_junit(path: str) -> dict[str, int]:
|
|||||||
return totals
|
return totals
|
||||||
|
|
||||||
|
|
||||||
|
def _load_quality_gate(path: Path) -> dict[str, Any] | None:
|
||||||
|
"""Load the optional quality-gate JSON report.
|
||||||
|
|
||||||
|
Inputs: the expected report path from `scripts/check_quality_gate.py`.
|
||||||
|
Outputs: the parsed report when present so metric publication can include
|
||||||
|
violation counts and per-file gate state.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
return None
|
||||||
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||||
|
return payload if isinstance(payload, dict) else None
|
||||||
|
|
||||||
|
|
||||||
def _read_http(url: str) -> str:
|
def _read_http(url: str) -> str:
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(url, timeout=10) as resp:
|
with urllib.request.urlopen(url, timeout=10) as resp:
|
||||||
@ -78,6 +131,13 @@ def _post_text(url: str, payload: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
|
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
|
||||||
|
"""Fetch the current counter value for a metric series.
|
||||||
|
|
||||||
|
Inputs: the Pushgateway base URL plus the metric name and exact labels.
|
||||||
|
Outputs: the last published counter value so reruns can increment rather than
|
||||||
|
overwrite the total.
|
||||||
|
"""
|
||||||
|
|
||||||
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
|
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
|
||||||
if not text:
|
if not text:
|
||||||
return 0.0
|
return 0.0
|
||||||
@ -85,7 +145,7 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
|
|||||||
for line in text.splitlines():
|
for line in text.splitlines():
|
||||||
if not line.startswith(metric + "{"):
|
if not line.startswith(metric + "{"):
|
||||||
continue
|
continue
|
||||||
if any(f'{k}="{v}"' not in line for k, v in labels.items()):
|
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
|
||||||
continue
|
continue
|
||||||
parts = line.split()
|
parts = line.split()
|
||||||
if len(parts) < 2:
|
if len(parts) < 2:
|
||||||
@ -97,9 +157,22 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
|
|||||||
return 0.0
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
def _metric_line(name: str, value: int | float, labels: dict[str, str] | None = None) -> str:
|
||||||
coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json")
|
label_text = _label_str(labels or {})
|
||||||
junit_path = os.getenv("JUNIT_XML", "build/junit.xml")
|
return f"{name}{label_text} {value}"
|
||||||
|
|
||||||
|
|
||||||
|
def _build_payload() -> tuple[str, dict[str, Any]]:
|
||||||
|
"""Assemble the Pushgateway payload and a summary object.
|
||||||
|
|
||||||
|
Inputs: environment variables and any available build artifacts.
|
||||||
|
Outputs: metric lines ready for Pushgateway plus a summary dict that local
|
||||||
|
tests and Jenkins logs can inspect.
|
||||||
|
"""
|
||||||
|
|
||||||
|
coverage_path = Path(os.getenv("COVERAGE_JSON", "build/coverage.json"))
|
||||||
|
junit_path = Path(os.getenv("JUNIT_XML", "build/junit.xml"))
|
||||||
|
quality_gate_path = Path(os.getenv("QUALITY_GATE_JSON", "build/quality-gate.json"))
|
||||||
pushgateway_url = os.getenv(
|
pushgateway_url = os.getenv(
|
||||||
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
|
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
|
||||||
).strip()
|
).strip()
|
||||||
@ -107,19 +180,17 @@ def main() -> int:
|
|||||||
branch = os.getenv("BRANCH_NAME", "")
|
branch = os.getenv("BRANCH_NAME", "")
|
||||||
build_number = os.getenv("BUILD_NUMBER", "")
|
build_number = os.getenv("BUILD_NUMBER", "")
|
||||||
commit = os.getenv("GIT_COMMIT", "")
|
commit = os.getenv("GIT_COMMIT", "")
|
||||||
|
outcome = os.getenv("QUALITY_STATUS", "failed").strip() or "failed"
|
||||||
if not os.path.exists(coverage_path):
|
|
||||||
raise RuntimeError(f"missing coverage file {coverage_path}")
|
|
||||||
if not os.path.exists(junit_path):
|
|
||||||
raise RuntimeError(f"missing junit file {junit_path}")
|
|
||||||
|
|
||||||
coverage = _load_coverage(coverage_path)
|
coverage = _load_coverage(coverage_path)
|
||||||
totals = _load_junit(junit_path)
|
totals = _load_junit(junit_path)
|
||||||
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
|
quality_gate = _load_quality_gate(quality_gate_path)
|
||||||
|
|
||||||
outcome = "ok"
|
tests_total = totals["tests"] if totals else 0
|
||||||
if totals["tests"] <= 0 or totals["failures"] > 0 or totals["errors"] > 0:
|
tests_failed = totals["failures"] if totals else 0
|
||||||
outcome = "failed"
|
tests_errors = totals["errors"] if totals else 0
|
||||||
|
tests_skipped = totals["skipped"] if totals else 0
|
||||||
|
tests_passed = max(tests_total - tests_failed - tests_errors - tests_skipped, 0)
|
||||||
|
|
||||||
job_name = "platform-quality-ci"
|
job_name = "platform-quality-ci"
|
||||||
ok_count = _fetch_existing_counter(
|
ok_count = _fetch_existing_counter(
|
||||||
@ -137,52 +208,149 @@ def main() -> int:
|
|||||||
else:
|
else:
|
||||||
failed_count += 1
|
failed_count += 1
|
||||||
|
|
||||||
labels = {
|
build_labels = {
|
||||||
"suite": suite,
|
"suite": suite,
|
||||||
"branch": branch,
|
"branch": branch,
|
||||||
"build_number": build_number,
|
"build_number": build_number,
|
||||||
"commit": commit,
|
"commit": commit,
|
||||||
}
|
}
|
||||||
payload_lines = [
|
suite_labels = {"suite": suite}
|
||||||
"# TYPE platform_quality_gate_runs_total counter",
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
|
|
||||||
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
|
|
||||||
"# TYPE ariadne_quality_gate_tests_total gauge",
|
|
||||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
|
|
||||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
|
|
||||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
|
|
||||||
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
|
|
||||||
"# TYPE ariadne_quality_gate_coverage_percent gauge",
|
|
||||||
f'ariadne_quality_gate_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
|
|
||||||
"# TYPE ariadne_quality_gate_build_info gauge",
|
|
||||||
f"ariadne_quality_gate_build_info{_label_str(labels)} 1",
|
|
||||||
]
|
|
||||||
payload = "\n".join(payload_lines) + "\n"
|
|
||||||
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}", payload)
|
|
||||||
|
|
||||||
print(
|
summary = (quality_gate or {}).get("summary") or {}
|
||||||
json.dumps(
|
files = (quality_gate or {}).get("files") or {}
|
||||||
{
|
metric_lines = [
|
||||||
"suite": suite,
|
"# TYPE platform_quality_gate_runs_total counter",
|
||||||
"outcome": outcome,
|
_metric_line("platform_quality_gate_runs_total", int(ok_count), {"suite": suite, "status": "ok"}),
|
||||||
"tests_total": totals["tests"],
|
_metric_line("platform_quality_gate_runs_total", int(failed_count), {"suite": suite, "status": "failed"}),
|
||||||
"tests_passed": passed,
|
"# TYPE ariadne_quality_gate_status gauge",
|
||||||
"tests_failed": totals["failures"],
|
_metric_line("ariadne_quality_gate_status", 1 if outcome == "ok" else 0, suite_labels),
|
||||||
"tests_errors": totals["errors"],
|
"# TYPE ariadne_quality_gate_artifact_present gauge",
|
||||||
"tests_skipped": totals["skipped"],
|
_metric_line(
|
||||||
"coverage_percent": round(coverage, 3),
|
"ariadne_quality_gate_artifact_present",
|
||||||
"ok_counter": ok_count,
|
1 if coverage_path.exists() else 0,
|
||||||
"failed_counter": failed_count,
|
{"suite": suite, "artifact": "coverage_json"},
|
||||||
},
|
),
|
||||||
indent=2,
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_artifact_present",
|
||||||
|
1 if junit_path.exists() else 0,
|
||||||
|
{"suite": suite, "artifact": "junit_xml"},
|
||||||
|
),
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_artifact_present",
|
||||||
|
1 if quality_gate_path.exists() else 0,
|
||||||
|
{"suite": suite, "artifact": "quality_gate_json"},
|
||||||
|
),
|
||||||
|
"# TYPE ariadne_quality_gate_tests_total gauge",
|
||||||
|
_metric_line("ariadne_quality_gate_tests_total", tests_passed, {"suite": suite, "result": "passed"}),
|
||||||
|
_metric_line("ariadne_quality_gate_tests_total", tests_failed, {"suite": suite, "result": "failed"}),
|
||||||
|
_metric_line("ariadne_quality_gate_tests_total", tests_errors, {"suite": suite, "result": "error"}),
|
||||||
|
_metric_line("ariadne_quality_gate_tests_total", tests_skipped, {"suite": suite, "result": "skipped"}),
|
||||||
|
"# TYPE ariadne_quality_gate_coverage_percent gauge",
|
||||||
|
_metric_line("ariadne_quality_gate_coverage_percent", round(coverage or 0.0, 3), suite_labels),
|
||||||
|
"# TYPE ariadne_quality_gate_violation_total gauge",
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_violation_total",
|
||||||
|
int(summary.get("line_count_violations", 0)),
|
||||||
|
{"suite": suite, "check": "line_count"},
|
||||||
|
),
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_violation_total",
|
||||||
|
int(summary.get("docstring_violations", 0)),
|
||||||
|
{"suite": suite, "check": "docstrings"},
|
||||||
|
),
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_violation_total",
|
||||||
|
int(summary.get("coverage_violations", 0)),
|
||||||
|
{"suite": suite, "check": "coverage"},
|
||||||
|
),
|
||||||
|
"# TYPE ariadne_quality_gate_legacy_exception_total gauge",
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_legacy_exception_total",
|
||||||
|
int(summary.get("legacy_line_count_files", 0)),
|
||||||
|
{"suite": suite, "check": "line_count"},
|
||||||
|
),
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_legacy_exception_total",
|
||||||
|
int(summary.get("legacy_docstring_files", 0)),
|
||||||
|
{"suite": suite, "check": "docstrings"},
|
||||||
|
),
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_legacy_exception_total",
|
||||||
|
int(summary.get("coverage_exemptions", 0)),
|
||||||
|
{"suite": suite, "check": "coverage"},
|
||||||
|
),
|
||||||
|
"# TYPE ariadne_quality_gate_build_info gauge",
|
||||||
|
f"ariadne_quality_gate_build_info{_label_str(build_labels)} 1",
|
||||||
|
]
|
||||||
|
|
||||||
|
if quality_gate:
|
||||||
|
metric_lines.extend(
|
||||||
|
[
|
||||||
|
"# TYPE ariadne_quality_gate_file_lines gauge",
|
||||||
|
"# TYPE ariadne_quality_gate_file_missing_docstrings gauge",
|
||||||
|
"# TYPE ariadne_quality_gate_file_coverage_percent gauge",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
)
|
for path, data in sorted(files.items()):
|
||||||
|
file_labels = {"suite": suite, "file": path}
|
||||||
|
if isinstance(data.get("lines"), int):
|
||||||
|
metric_lines.append(_metric_line("ariadne_quality_gate_file_lines", data["lines"], file_labels))
|
||||||
|
if isinstance(data.get("missing_docstrings"), int):
|
||||||
|
metric_lines.append(
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_file_missing_docstrings",
|
||||||
|
data["missing_docstrings"],
|
||||||
|
file_labels,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
coverage_percent = data.get("coverage_percent")
|
||||||
|
if isinstance(coverage_percent, (int, float)):
|
||||||
|
metric_lines.append(
|
||||||
|
_metric_line(
|
||||||
|
"ariadne_quality_gate_file_coverage_percent",
|
||||||
|
round(float(coverage_percent), 3),
|
||||||
|
file_labels,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
payload = "\n".join(metric_lines) + "\n"
|
||||||
|
result = {
|
||||||
|
"suite": suite,
|
||||||
|
"outcome": outcome,
|
||||||
|
"tests_total": tests_total,
|
||||||
|
"tests_passed": tests_passed,
|
||||||
|
"tests_failed": tests_failed,
|
||||||
|
"tests_errors": tests_errors,
|
||||||
|
"tests_skipped": tests_skipped,
|
||||||
|
"coverage_percent": round(coverage or 0.0, 3),
|
||||||
|
"quality_gate_present": quality_gate is not None,
|
||||||
|
"quality_gate_status": (quality_gate or {}).get("status") or "missing",
|
||||||
|
"quality_gate_summary": summary,
|
||||||
|
"ok_counter": ok_count,
|
||||||
|
"failed_counter": failed_count,
|
||||||
|
"payload": payload,
|
||||||
|
"pushgateway_url": pushgateway_url,
|
||||||
|
"job_name": job_name,
|
||||||
|
}
|
||||||
|
return payload, result
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
"""Publish Ariadne quality metrics to Pushgateway.
|
||||||
|
|
||||||
|
Inputs: environment variables plus any available build artifacts.
|
||||||
|
Outputs: a POST to Pushgateway and a JSON summary printed to stdout so local
|
||||||
|
runs and Jenkins logs can confirm exactly what was emitted.
|
||||||
|
"""
|
||||||
|
|
||||||
|
payload, result = _build_payload()
|
||||||
|
target = f"{result['pushgateway_url'].rstrip('/')}/metrics/job/{result['job_name']}/suite/{result['suite']}"
|
||||||
|
_post_text(target, payload)
|
||||||
|
printable = dict(result)
|
||||||
|
printable.pop("payload", None)
|
||||||
|
print(json.dumps(printable, indent=2, sort_keys=True))
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
raise SystemExit(main())
|
||||||
raise SystemExit(main())
|
|
||||||
except Exception as exc:
|
|
||||||
print(f"metrics push failed: {exc}")
|
|
||||||
raise
|
|
||||||
|
|||||||
@ -20,11 +20,10 @@ def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None:
|
|||||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||||
|
|
||||||
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
||||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
jwt,
|
kc,
|
||||||
"decode",
|
"_decode_claims",
|
||||||
lambda *args, **kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
|
lambda *_args, **_kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
|
||||||
)
|
)
|
||||||
|
|
||||||
claims = kc.verify(token)
|
claims = kc.verify(token)
|
||||||
@ -36,12 +35,7 @@ def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
|
|||||||
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||||
|
|
||||||
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
|
||||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
monkeypatch.setattr(kc, "_decode_claims", lambda *_args, **_kwargs: {"azp": "other", "aud": ["other"]})
|
||||||
monkeypatch.setattr(
|
|
||||||
jwt,
|
|
||||||
"decode",
|
|
||||||
lambda *args, **kwargs: {"azp": "other", "aud": ["other"]},
|
|
||||||
)
|
|
||||||
|
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
kc.verify(token)
|
kc.verify(token)
|
||||||
@ -73,11 +67,10 @@ def test_keycloak_verify_refreshes_jwks(monkeypatch) -> None:
|
|||||||
return {"keys": [{"kid": "test"}]}
|
return {"keys": [{"kid": "test"}]}
|
||||||
|
|
||||||
monkeypatch.setattr(kc, "_get_jwks", fake_get_jwks)
|
monkeypatch.setattr(kc, "_get_jwks", fake_get_jwks)
|
||||||
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
|
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
jwt,
|
kc,
|
||||||
"decode",
|
"_decode_claims",
|
||||||
lambda *args, **kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
|
lambda *_args, **_kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
|
||||||
)
|
)
|
||||||
|
|
||||||
claims = kc.verify(token)
|
claims = kc.verify(token)
|
||||||
@ -94,6 +87,30 @@ def test_keycloak_verify_kid_not_found(monkeypatch) -> None:
|
|||||||
kc.verify(token)
|
kc.verify(token)
|
||||||
|
|
||||||
|
|
||||||
|
def test_keycloak_decode_claims_uses_expected_decoder_arguments(monkeypatch) -> None:
|
||||||
|
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
jwt.algorithms,
|
||||||
|
"RSAAlgorithm",
|
||||||
|
type("DummyRSA", (), {"from_jwk": staticmethod(lambda key: "parsed-key")}),
|
||||||
|
raising=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_decode(token, **kwargs):
|
||||||
|
captured["token"] = token
|
||||||
|
captured["kwargs"] = kwargs
|
||||||
|
return {"preferred_username": "alice"}
|
||||||
|
|
||||||
|
monkeypatch.setattr(jwt, "decode", fake_decode)
|
||||||
|
|
||||||
|
claims = kc._decode_claims("header.payload.sig", {"kid": "test"})
|
||||||
|
assert claims["preferred_username"] == "alice"
|
||||||
|
assert captured["kwargs"]["key"] == "parsed-key"
|
||||||
|
assert captured["kwargs"]["issuer"] == "https://issuer"
|
||||||
|
|
||||||
|
|
||||||
def test_authenticator_normalizes_groups(monkeypatch) -> None:
|
def test_authenticator_normalizes_groups(monkeypatch) -> None:
|
||||||
token = _make_token()
|
token = _make_token()
|
||||||
auth = Authenticator()
|
auth = Authenticator()
|
||||||
|
|||||||
@ -240,55 +240,68 @@ def test_comms_guest_name_randomizer(monkeypatch) -> None:
|
|||||||
)
|
)
|
||||||
monkeypatch.setattr(comms_module, "settings", dummy_settings)
|
monkeypatch.setattr(comms_module, "settings", dummy_settings)
|
||||||
|
|
||||||
responses = {
|
responses = [
|
||||||
("POST", "http://mas/token"): DummyResponse({"access_token": "admintoken"}),
|
(("POST", "http://mas/token"), DummyResponse({"access_token": "admintoken"})),
|
||||||
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"): DummyResponse(
|
(
|
||||||
{"data": {"id": "seed"}}
|
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"),
|
||||||
|
DummyResponse({"data": {"id": "seed"}}),
|
||||||
),
|
),
|
||||||
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
|
(
|
||||||
{"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}
|
("POST", "http://mas/api/admin/v1/personal-sessions"),
|
||||||
|
DummyResponse({"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}),
|
||||||
),
|
),
|
||||||
("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"): DummyResponse({}),
|
(("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"), DummyResponse({})),
|
||||||
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"): DummyResponse(
|
(
|
||||||
{"room_id": "room1"}
|
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"),
|
||||||
|
DummyResponse({"room_id": "room1"}),
|
||||||
),
|
),
|
||||||
("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"): DummyResponse(
|
(("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"), DummyResponse({"chunk": []})),
|
||||||
{"chunk": []}
|
(
|
||||||
|
("GET", "http://mas/api/admin/v1/users?page[size]=100"),
|
||||||
|
DummyResponse(
|
||||||
|
{
|
||||||
|
"data": [
|
||||||
|
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
),
|
||||||
),
|
),
|
||||||
("GET", "http://mas/api/admin/v1/users?page[size]=100"): DummyResponse(
|
(
|
||||||
{
|
("POST", "http://mas/api/admin/v1/personal-sessions"),
|
||||||
"data": [
|
DummyResponse({"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}),
|
||||||
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
),
|
),
|
||||||
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
|
(
|
||||||
{"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}
|
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"),
|
||||||
|
DummyResponse({"displayname": None}),
|
||||||
),
|
),
|
||||||
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"): DummyResponse(
|
(("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"), DummyResponse({})),
|
||||||
{"displayname": None}
|
(
|
||||||
|
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"),
|
||||||
|
DummyResponse(
|
||||||
|
{
|
||||||
|
"users": [
|
||||||
|
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
),
|
||||||
),
|
),
|
||||||
("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"): DummyResponse({}),
|
(("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
|
||||||
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse(
|
(
|
||||||
{
|
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"),
|
||||||
"users": [
|
DummyResponse({"displayname": None}),
|
||||||
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
),
|
),
|
||||||
("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
|
(("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
|
||||||
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse(
|
(("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"), DummyResponse({})),
|
||||||
{"displayname": None}
|
]
|
||||||
),
|
response_queues: dict[tuple[str, str], list[DummyResponse]] = {}
|
||||||
("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
|
for key, response in responses:
|
||||||
("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"): DummyResponse({}),
|
response_queues.setdefault(key, []).append(response)
|
||||||
}
|
|
||||||
|
|
||||||
def handler(method, url, **_kwargs):
|
def handler(method, url, **_kwargs):
|
||||||
resp = responses.get((method, url))
|
queue = response_queues.get((method, url))
|
||||||
if resp is None:
|
if not queue:
|
||||||
return DummyResponse({})
|
return DummyResponse({})
|
||||||
return resp
|
return queue.pop(0)
|
||||||
|
|
||||||
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
|
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
|
||||||
monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0)
|
monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0)
|
||||||
|
|||||||
@ -129,3 +129,95 @@ def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None:
|
|||||||
db = Database("postgresql://user:pass@localhost/db")
|
db = Database("postgresql://user:pass@localhost/db")
|
||||||
assert db.fetchone("fetchone") == {"id": 1}
|
assert db.fetchone("fetchone") == {"id": 1}
|
||||||
assert db.fetchall("fetchall") == [{"id": 1}, {"id": 2}]
|
assert db.fetchall("fetchall") == [{"id": 1}, {"id": 2}]
|
||||||
|
|
||||||
|
|
||||||
|
def test_database_passes_config_to_pool(monkeypatch) -> None:
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
class CapturePool(DummyPool):
|
||||||
|
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||||
|
captured.update(
|
||||||
|
{
|
||||||
|
"conninfo": conninfo,
|
||||||
|
"min_size": min_size,
|
||||||
|
"max_size": max_size,
|
||||||
|
"kwargs": kwargs,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
super().__init__(conninfo=conninfo, min_size=min_size, max_size=max_size, kwargs=kwargs)
|
||||||
|
|
||||||
|
monkeypatch.setattr(db_module, "ConnectionPool", CapturePool)
|
||||||
|
config = db_module.DatabaseConfig(
|
||||||
|
pool_min=1,
|
||||||
|
pool_max=7,
|
||||||
|
connect_timeout_sec=9,
|
||||||
|
lock_timeout_sec=11,
|
||||||
|
statement_timeout_sec=13,
|
||||||
|
idle_in_tx_timeout_sec=15,
|
||||||
|
application_name="ariadne-test",
|
||||||
|
)
|
||||||
|
Database("postgresql://user:pass@localhost/db", config)
|
||||||
|
|
||||||
|
assert captured["conninfo"] == "postgresql://user:pass@localhost/db"
|
||||||
|
assert captured["min_size"] == 1
|
||||||
|
assert captured["max_size"] == 7
|
||||||
|
assert captured["kwargs"]["connect_timeout"] == 9
|
||||||
|
assert captured["kwargs"]["application_name"] == "ariadne-test"
|
||||||
|
assert "lock_timeout=11s" in captured["kwargs"]["options"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_migrate_returns_when_advisory_lock_is_unavailable(monkeypatch) -> None:
|
||||||
|
class NoLockConn(DummyConn):
|
||||||
|
def execute(self, query, params=None):
|
||||||
|
if "pg_try_advisory_lock" in query:
|
||||||
|
return DummyResult(row=(False,))
|
||||||
|
return super().execute(query, params)
|
||||||
|
|
||||||
|
class NoLockPool(DummyPool):
|
||||||
|
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||||
|
self.conn = NoLockConn()
|
||||||
|
|
||||||
|
monkeypatch.setattr(db_module, "ConnectionPool", NoLockPool)
|
||||||
|
db = Database("postgresql://user:pass@localhost/db")
|
||||||
|
db.migrate(lock_id=123)
|
||||||
|
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
|
||||||
|
|
||||||
|
|
||||||
|
def test_migrate_handles_missing_access_requests_table(monkeypatch) -> None:
|
||||||
|
class UndefinedTableConn(DummyConn):
|
||||||
|
def execute(self, query, params=None):
|
||||||
|
if "ALTER TABLE access_requests" in query:
|
||||||
|
raise db_module.psycopg.errors.UndefinedTable()
|
||||||
|
return super().execute(query, params)
|
||||||
|
|
||||||
|
class UndefinedTablePool(DummyPool):
|
||||||
|
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||||
|
self.conn = UndefinedTableConn()
|
||||||
|
|
||||||
|
monkeypatch.setattr(db_module, "ConnectionPool", UndefinedTablePool)
|
||||||
|
db = Database("postgresql://user:pass@localhost/db")
|
||||||
|
db.migrate(lock_id=123)
|
||||||
|
|
||||||
|
|
||||||
|
def test_migrate_skip_flags(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
|
||||||
|
db = Database("postgresql://user:pass@localhost/db")
|
||||||
|
db.migrate(lock_id=123, include_ariadne_tables=False, include_access_requests=False)
|
||||||
|
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
|
||||||
|
assert not any("ALTER TABLE access_requests" in query for query, _ in db._pool.conn.executed)
|
||||||
|
|
||||||
|
|
||||||
|
def test_unlock_swallows_errors(monkeypatch) -> None:
|
||||||
|
class UnlockConn(DummyConn):
|
||||||
|
def execute(self, query, params=None):
|
||||||
|
if "pg_advisory_unlock" in query:
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
return super().execute(query, params)
|
||||||
|
|
||||||
|
class UnlockPool(DummyPool):
|
||||||
|
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
|
||||||
|
self.conn = UnlockConn()
|
||||||
|
|
||||||
|
monkeypatch.setattr(db_module, "ConnectionPool", UnlockPool)
|
||||||
|
db = Database("postgresql://user:pass@localhost/db")
|
||||||
|
db.migrate(lock_id=123)
|
||||||
|
|||||||
@ -63,6 +63,10 @@ def test_build_command_wraps_env() -> None:
|
|||||||
assert "export FOO=bar" in cmd[2]
|
assert "export FOO=bar" in cmd[2]
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_command_accepts_string_without_env() -> None:
|
||||||
|
assert _build_command("echo hello", None) == ["/bin/sh", "-c", "echo hello"]
|
||||||
|
|
||||||
|
|
||||||
def test_exec_returns_output(monkeypatch) -> None:
|
def test_exec_returns_output(monkeypatch) -> None:
|
||||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||||
@ -84,6 +88,41 @@ def test_exec_raises_on_failure(monkeypatch) -> None:
|
|||||||
executor.exec(["false"], check=True)
|
executor.exec(["false"], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_exec_returns_failed_result_when_check_disabled(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||||
|
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||||
|
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stderr="bad", exit_code=2))
|
||||||
|
|
||||||
|
executor = PodExecutor("ns", "app=test", None)
|
||||||
|
result = executor.exec(["false"], check=False)
|
||||||
|
assert result.ok is False
|
||||||
|
assert result.exit_code == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_exec_uses_returncode_when_exit_code_never_arrives(monkeypatch) -> None:
|
||||||
|
class ReturncodeOnlyStream(DummyStream):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(stdout="ok", exit_code=7)
|
||||||
|
self._exit_code_ready = False
|
||||||
|
self._updated = False
|
||||||
|
|
||||||
|
def update(self, timeout: int = 1) -> None:
|
||||||
|
self._updated = True
|
||||||
|
self._open = False
|
||||||
|
|
||||||
|
def peek_exit_code(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||||
|
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||||
|
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: ReturncodeOnlyStream())
|
||||||
|
|
||||||
|
executor = PodExecutor("ns", "app=test", None)
|
||||||
|
result = executor.exec(["echo", "ok"], check=False)
|
||||||
|
assert result.exit_code == 7
|
||||||
|
assert result.ok is False
|
||||||
|
|
||||||
|
|
||||||
def test_exec_times_out(monkeypatch) -> None:
|
def test_exec_times_out(monkeypatch) -> None:
|
||||||
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
|
||||||
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
|
||||||
@ -115,3 +154,18 @@ def test_ensure_client_fallback(monkeypatch) -> None:
|
|||||||
monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api))
|
monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api))
|
||||||
|
|
||||||
assert exec_module._ensure_client() is dummy_api
|
assert exec_module._ensure_client() is dummy_api
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_client_raises_when_kubernetes_import_failed(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(exec_module, "_CORE_API", None)
|
||||||
|
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", RuntimeError("missing"))
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
exec_module._ensure_client()
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_client_reuses_cached_client(monkeypatch) -> None:
|
||||||
|
cached = object()
|
||||||
|
monkeypatch.setattr(exec_module, "_CORE_API", cached)
|
||||||
|
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None)
|
||||||
|
assert exec_module._ensure_client() is cached
|
||||||
|
|||||||
@ -5,6 +5,32 @@ import pytest
|
|||||||
from ariadne.k8s import pods as pods_module
|
from ariadne.k8s import pods as pods_module
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_start_time_handles_missing_and_invalid_values() -> None:
|
||||||
|
assert pods_module._parse_start_time(None) == 0.0
|
||||||
|
assert pods_module._parse_start_time("not-a-date") == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_start_time_defaults_naive_value_to_utc() -> None:
|
||||||
|
assert pods_module._parse_start_time("2026-01-20T00:00:00") > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_ready_handles_missing_conditions_and_non_ready_states() -> None:
|
||||||
|
assert pods_module._is_ready({"status": {"phase": "Pending"}}) is False
|
||||||
|
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": "bad"}}) is False
|
||||||
|
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": ["bad"]}}) is False
|
||||||
|
assert (
|
||||||
|
pods_module._is_ready(
|
||||||
|
{"status": {"phase": "Running", "conditions": [{"type": "Initialized", "status": "True"}]}}
|
||||||
|
)
|
||||||
|
is False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_pods_requires_namespace() -> None:
|
||||||
|
with pytest.raises(pods_module.PodSelectionError):
|
||||||
|
pods_module.list_pods("", "app=test")
|
||||||
|
|
||||||
|
|
||||||
def test_list_pods_encodes_selector(monkeypatch) -> None:
|
def test_list_pods_encodes_selector(monkeypatch) -> None:
|
||||||
captured = {}
|
captured = {}
|
||||||
|
|
||||||
@ -17,6 +43,11 @@ def test_list_pods_encodes_selector(monkeypatch) -> None:
|
|||||||
assert "labelSelector=app%3Dnextcloud" in captured["path"]
|
assert "labelSelector=app%3Dnextcloud" in captured["path"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_pods_filters_non_dict_items(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: {"items": [{}, "bad", 1]})
|
||||||
|
assert pods_module.list_pods("demo", "app=test") == [{}]
|
||||||
|
|
||||||
|
|
||||||
def test_select_pod_picks_ready_latest(monkeypatch) -> None:
|
def test_select_pod_picks_ready_latest(monkeypatch) -> None:
|
||||||
payload = {
|
payload = {
|
||||||
"items": [
|
"items": [
|
||||||
@ -57,3 +88,49 @@ def test_select_pod_ignores_non_ready(monkeypatch) -> None:
|
|||||||
|
|
||||||
with pytest.raises(pods_module.PodSelectionError):
|
with pytest.raises(pods_module.PodSelectionError):
|
||||||
pods_module.select_pod("demo", "app=test")
|
pods_module.select_pod("demo", "app=test")
|
||||||
|
|
||||||
|
|
||||||
|
def test_select_pod_skips_deleted_and_invalid_entries(monkeypatch) -> None:
|
||||||
|
payload = {
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"metadata": {"name": "deleting", "deletionTimestamp": "2026-01-20T00:00:00Z"},
|
||||||
|
"status": {
|
||||||
|
"phase": "Running",
|
||||||
|
"conditions": [{"type": "Ready", "status": "True"}],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metadata": {"name": "chosen"},
|
||||||
|
"status": {
|
||||||
|
"phase": "Running",
|
||||||
|
"nodeName": "titan-24",
|
||||||
|
"startTime": "2026-01-20T00:00:00Z",
|
||||||
|
"conditions": [{"type": "Ready", "status": "True"}],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
|
||||||
|
|
||||||
|
pod = pods_module.select_pod("demo", "app=test")
|
||||||
|
assert pod.name == "chosen"
|
||||||
|
assert pod.node == "titan-24"
|
||||||
|
|
||||||
|
|
||||||
|
def test_select_pod_skips_blank_names(monkeypatch) -> None:
|
||||||
|
payload = {
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"metadata": {"name": " "},
|
||||||
|
"status": {
|
||||||
|
"phase": "Running",
|
||||||
|
"conditions": [{"type": "Ready", "status": "True"}],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
|
||||||
|
|
||||||
|
with pytest.raises(pods_module.PodSelectionError):
|
||||||
|
pods_module.select_pod("demo", "app=test")
|
||||||
|
|||||||
@ -244,7 +244,7 @@ def test_find_user_by_email_skips_non_dict(monkeypatch) -> None:
|
|||||||
assert client.find_user_by_email("alice@example.com") is None
|
assert client.find_user_by_email("alice@example.com") is None
|
||||||
|
|
||||||
|
|
||||||
def test_get_user_invalid_payload(monkeypatch) -> None:
|
def test_get_user_invalid_payload_duplicate(monkeypatch) -> None:
|
||||||
dummy_settings = types.SimpleNamespace(
|
dummy_settings = types.SimpleNamespace(
|
||||||
keycloak_admin_url="http://kc",
|
keycloak_admin_url="http://kc",
|
||||||
keycloak_admin_realm="atlas",
|
keycloak_admin_realm="atlas",
|
||||||
@ -299,7 +299,7 @@ def test_update_user_safe_handles_bad_attrs(monkeypatch) -> None:
|
|||||||
assert captured["payload"]["attributes"] == {"new": ["item"]}
|
assert captured["payload"]["attributes"] == {"new": ["item"]}
|
||||||
|
|
||||||
|
|
||||||
def test_set_user_attribute_user_id_missing(monkeypatch) -> None:
|
def test_set_user_attribute_user_id_missing_duplicate(monkeypatch) -> None:
|
||||||
client = KeycloakAdminClient()
|
client = KeycloakAdminClient()
|
||||||
|
|
||||||
def fake_find_user(username: str) -> dict[str, Any]:
|
def fake_find_user(username: str) -> dict[str, Any]:
|
||||||
|
|||||||
@ -2,7 +2,25 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
from ariadne.metrics.metrics import set_access_request_counts, set_cluster_state_metrics
|
from ariadne.metrics.metrics import (
|
||||||
|
record_schedule_state,
|
||||||
|
record_task_run,
|
||||||
|
set_access_request_counts,
|
||||||
|
set_cluster_state_metrics,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_task_run_accepts_missing_duration() -> None:
|
||||||
|
record_task_run("demo", "ok", None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_task_run_records_duration() -> None:
|
||||||
|
record_task_run("demo", "ok", 1.5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_schedule_state_tracks_success_and_failure() -> None:
|
||||||
|
record_schedule_state("demo", 10.0, 8.0, 20.0, True)
|
||||||
|
record_schedule_state("demo", 11.0, None, None, False)
|
||||||
|
|
||||||
|
|
||||||
def test_set_access_request_counts() -> None:
|
def test_set_access_request_counts() -> None:
|
||||||
@ -11,3 +29,7 @@ def test_set_access_request_counts() -> None:
|
|||||||
|
|
||||||
def test_set_cluster_state_metrics() -> None:
|
def test_set_cluster_state_metrics() -> None:
|
||||||
set_cluster_state_metrics(datetime.now(timezone.utc), 4, 3, 12.0, 1)
|
set_cluster_state_metrics(datetime.now(timezone.utc), 4, 3, 12.0, 1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_cluster_state_metrics_allows_missing_values() -> None:
|
||||||
|
set_cluster_state_metrics(datetime.now(timezone.utc), None, None, None, None)
|
||||||
|
|||||||
79
tests/test_migrate.py
Normal file
79
tests/test_migrate.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import types
|
||||||
|
|
||||||
|
import ariadne.migrate as migrate_module
|
||||||
|
|
||||||
|
|
||||||
|
class DummyDatabase:
|
||||||
|
def __init__(self, name: str):
|
||||||
|
self.name = name
|
||||||
|
self.calls: list[tuple[int, bool, bool]] = []
|
||||||
|
self.closed = False
|
||||||
|
|
||||||
|
def migrate(self, lock_id: int, *, include_ariadne_tables: bool, include_access_requests: bool) -> None:
|
||||||
|
self.calls.append((lock_id, include_ariadne_tables, include_access_requests))
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self.closed = True
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_db_uses_settings(monkeypatch) -> None:
|
||||||
|
captured = {}
|
||||||
|
monkeypatch.setattr(
|
||||||
|
migrate_module,
|
||||||
|
"settings",
|
||||||
|
types.SimpleNamespace(
|
||||||
|
ariadne_db_pool_min=1,
|
||||||
|
ariadne_db_pool_max=3,
|
||||||
|
ariadne_db_connect_timeout_sec=5,
|
||||||
|
ariadne_db_lock_timeout_sec=7,
|
||||||
|
ariadne_db_statement_timeout_sec=11,
|
||||||
|
ariadne_db_idle_in_tx_timeout_sec=13,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def fake_database(dsn, config):
|
||||||
|
captured["dsn"] = dsn
|
||||||
|
captured["config"] = config
|
||||||
|
return DummyDatabase("ariadne")
|
||||||
|
|
||||||
|
monkeypatch.setattr(migrate_module, "Database", fake_database)
|
||||||
|
migrate_module._build_db("postgresql://db", "ariadne_migrate")
|
||||||
|
|
||||||
|
assert captured["dsn"] == "postgresql://db"
|
||||||
|
assert captured["config"].application_name == "ariadne_migrate"
|
||||||
|
assert captured["config"].lock_timeout_sec == 7
|
||||||
|
|
||||||
|
|
||||||
|
def test_main_returns_when_migrations_disabled(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
migrate_module,
|
||||||
|
"settings",
|
||||||
|
types.SimpleNamespace(ariadne_run_migrations=False),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("should not build")))
|
||||||
|
migrate_module.main()
|
||||||
|
|
||||||
|
|
||||||
|
def test_main_runs_ariadne_and_portal_migrations(monkeypatch) -> None:
|
||||||
|
ariadne_db = DummyDatabase("ariadne")
|
||||||
|
portal_db = DummyDatabase("portal")
|
||||||
|
databases = [ariadne_db, portal_db]
|
||||||
|
monkeypatch.setattr(
|
||||||
|
migrate_module,
|
||||||
|
"settings",
|
||||||
|
types.SimpleNamespace(
|
||||||
|
ariadne_run_migrations=True,
|
||||||
|
ariadne_database_url="postgresql://ariadne",
|
||||||
|
portal_database_url="postgresql://portal",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: databases.pop(0))
|
||||||
|
|
||||||
|
migrate_module.main()
|
||||||
|
|
||||||
|
assert ariadne_db.calls == [(migrate_module.ARIADNE_MIGRATION_LOCK_ID, True, False)]
|
||||||
|
assert portal_db.calls == [(migrate_module.PORTAL_MIGRATION_LOCK_ID, False, True)]
|
||||||
|
assert ariadne_db.closed is True
|
||||||
|
assert portal_db.closed is True
|
||||||
@ -595,7 +595,7 @@ def test_provisioning_sync_errors(monkeypatch) -> None:
|
|||||||
assert outcome.status == "accounts_building"
|
assert outcome.status == "accounts_building"
|
||||||
|
|
||||||
|
|
||||||
def test_provisioning_run_loop_processes_candidates(monkeypatch) -> None:
|
def test_provisioning_run_loop_processes_candidates_duplicate(monkeypatch) -> None:
|
||||||
dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0)
|
dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0)
|
||||||
monkeypatch.setattr(prov, "settings", dummy_settings)
|
monkeypatch.setattr(prov, "settings", dummy_settings)
|
||||||
_patch_mailu_ready(monkeypatch, dummy_settings)
|
_patch_mailu_ready(monkeypatch, dummy_settings)
|
||||||
@ -652,7 +652,7 @@ def test_provisioning_run_loop_waits_for_admin(monkeypatch) -> None:
|
|||||||
manager._run_loop()
|
manager._run_loop()
|
||||||
|
|
||||||
|
|
||||||
def test_provisioning_missing_verified_email(monkeypatch) -> None:
|
def test_provisioning_missing_verified_email_duplicate(monkeypatch) -> None:
|
||||||
dummy_settings = types.SimpleNamespace(
|
dummy_settings = types.SimpleNamespace(
|
||||||
mailu_domain="bstein.dev",
|
mailu_domain="bstein.dev",
|
||||||
mailu_sync_url="",
|
mailu_sync_url="",
|
||||||
|
|||||||
104
tests/test_publish_test_metrics.py
Normal file
104
tests/test_publish_test_metrics.py
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
import scripts.publish_test_metrics as metrics_script
|
||||||
|
|
||||||
|
|
||||||
|
class DummyHTTPResponse:
|
||||||
|
def __init__(self, body: str, status: int = 200):
|
||||||
|
self._body = body
|
||||||
|
self.status = status
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def read(self) -> bytes:
|
||||||
|
return self._body.encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_payload_includes_quality_gate_metrics(monkeypatch, tmp_path: Path) -> None:
|
||||||
|
coverage_path = tmp_path / "coverage.json"
|
||||||
|
coverage_path.write_text(json.dumps({"summary": {"percent_covered": 97.5}}), encoding="utf-8")
|
||||||
|
junit_path = tmp_path / "junit.xml"
|
||||||
|
junit_path.write_text('<testsuite tests="5" failures="1" errors="1" skipped="1"/>', encoding="utf-8")
|
||||||
|
quality_gate_path = tmp_path / "quality-gate.json"
|
||||||
|
quality_gate_path.write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"status": "ok",
|
||||||
|
"summary": {
|
||||||
|
"line_count_violations": 0,
|
||||||
|
"docstring_violations": 1,
|
||||||
|
"coverage_violations": 0,
|
||||||
|
"legacy_line_count_files": 2,
|
||||||
|
"legacy_docstring_files": 3,
|
||||||
|
"coverage_exemptions": 4,
|
||||||
|
},
|
||||||
|
"files": {
|
||||||
|
"ariadne/auth/keycloak.py": {
|
||||||
|
"lines": 120,
|
||||||
|
"missing_docstrings": 0,
|
||||||
|
"coverage_percent": 99.1,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
monkeypatch.setenv("COVERAGE_JSON", str(coverage_path))
|
||||||
|
monkeypatch.setenv("JUNIT_XML", str(junit_path))
|
||||||
|
monkeypatch.setenv("QUALITY_GATE_JSON", str(quality_gate_path))
|
||||||
|
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
|
||||||
|
monkeypatch.setenv("SUITE_NAME", "ariadne")
|
||||||
|
monkeypatch.setenv("QUALITY_STATUS", "ok")
|
||||||
|
monkeypatch.setenv("BUILD_NUMBER", "42")
|
||||||
|
monkeypatch.setenv("BRANCH_NAME", "main")
|
||||||
|
monkeypatch.setenv("GIT_COMMIT", "abc123")
|
||||||
|
monkeypatch.setattr(
|
||||||
|
metrics_script,
|
||||||
|
"_read_http",
|
||||||
|
lambda _url: 'platform_quality_gate_runs_total{job="platform-quality-ci",suite="ariadne",status="ok"} 5\n',
|
||||||
|
)
|
||||||
|
|
||||||
|
payload, result = metrics_script._build_payload()
|
||||||
|
|
||||||
|
assert result["tests_total"] == 5
|
||||||
|
assert result["tests_passed"] == 2
|
||||||
|
assert result["coverage_percent"] == 97.5
|
||||||
|
assert result["ok_counter"] == 6.0
|
||||||
|
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="quality_gate_json"} 1' in payload
|
||||||
|
assert 'ariadne_quality_gate_violation_total{suite="ariadne",check="docstrings"} 1' in payload
|
||||||
|
assert 'ariadne_quality_gate_file_coverage_percent{suite="ariadne",file="ariadne/auth/keycloak.py"} 99.1' in payload
|
||||||
|
|
||||||
|
|
||||||
|
def test_main_posts_payload_for_failed_build_with_missing_artifacts(monkeypatch, capsys) -> None:
|
||||||
|
requests: list[tuple[str, str]] = []
|
||||||
|
missing_base = Path("/tmp/ariadne-metrics-missing")
|
||||||
|
|
||||||
|
def fake_urlopen(target, timeout=10):
|
||||||
|
if isinstance(target, urllib.request.Request):
|
||||||
|
requests.append((target.full_url, target.data.decode("utf-8")))
|
||||||
|
return DummyHTTPResponse("", status=200)
|
||||||
|
return DummyHTTPResponse("", status=200)
|
||||||
|
|
||||||
|
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
|
||||||
|
monkeypatch.setenv("SUITE_NAME", "ariadne")
|
||||||
|
monkeypatch.setenv("QUALITY_STATUS", "failed")
|
||||||
|
monkeypatch.setenv("COVERAGE_JSON", str(missing_base / "coverage.json"))
|
||||||
|
monkeypatch.setenv("JUNIT_XML", str(missing_base / "junit.xml"))
|
||||||
|
monkeypatch.setenv("QUALITY_GATE_JSON", str(missing_base / "quality-gate.json"))
|
||||||
|
monkeypatch.setattr(metrics_script.urllib.request, "urlopen", fake_urlopen)
|
||||||
|
|
||||||
|
assert metrics_script.main() == 0
|
||||||
|
assert requests
|
||||||
|
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="coverage_json"} 0' in requests[0][1]
|
||||||
|
assert 'ariadne_quality_gate_status{suite="ariadne"} 0' in requests[0][1]
|
||||||
|
output = capsys.readouterr().out
|
||||||
|
assert '"quality_gate_status": "missing"' in output
|
||||||
117
tests/test_quality_gate.py
Normal file
117
tests/test_quality_gate.py
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import scripts.check_quality_gate as quality_gate
|
||||||
|
|
||||||
|
|
||||||
|
def _write_quality_config(path: Path) -> None:
|
||||||
|
path.write_text(
|
||||||
|
"\n".join(
|
||||||
|
[
|
||||||
|
"[files]",
|
||||||
|
'roots = ["pkg"]',
|
||||||
|
"max_lines = 10",
|
||||||
|
"",
|
||||||
|
"[docstrings]",
|
||||||
|
'roots = ["pkg"]',
|
||||||
|
"non_trivial_min_lines = 4",
|
||||||
|
"",
|
||||||
|
"[coverage]",
|
||||||
|
'roots = ["pkg"]',
|
||||||
|
"threshold = 95.0",
|
||||||
|
'targets = ["pkg/good.py"]',
|
||||||
|
"",
|
||||||
|
"[legacy.line_count]",
|
||||||
|
'"pkg/legacy.py" = 12',
|
||||||
|
"",
|
||||||
|
"[legacy.docstrings]",
|
||||||
|
'"pkg/legacy.py" = 1',
|
||||||
|
]
|
||||||
|
)
|
||||||
|
+ "\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_report_accepts_legacy_exceptions(tmp_path: Path) -> None:
|
||||||
|
pkg = tmp_path / "pkg"
|
||||||
|
pkg.mkdir()
|
||||||
|
(pkg / "good.py").write_text(
|
||||||
|
'\n'.join(
|
||||||
|
[
|
||||||
|
'def documented(value: int) -> int:',
|
||||||
|
' """Return a simple incremented value.',
|
||||||
|
"",
|
||||||
|
" Inputs: an integer to increment.",
|
||||||
|
" Outputs: the incremented integer so the module stays fully documented.",
|
||||||
|
' """',
|
||||||
|
" return value + 1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
+ "\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
(pkg / "legacy.py").write_text(
|
||||||
|
"\n".join(
|
||||||
|
[
|
||||||
|
"class Legacy:",
|
||||||
|
" pass",
|
||||||
|
]
|
||||||
|
+ ["# filler"] * 10
|
||||||
|
)
|
||||||
|
+ "\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
coverage_path = tmp_path / "coverage.json"
|
||||||
|
coverage_path.write_text(
|
||||||
|
json.dumps({"files": {"pkg/good.py": {"summary": {"percent_covered": 99.0}}}}),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
config_path = tmp_path / "quality_gate.toml"
|
||||||
|
_write_quality_config(config_path)
|
||||||
|
|
||||||
|
config = quality_gate._load_config(config_path)
|
||||||
|
report = quality_gate._build_report(
|
||||||
|
tmp_path,
|
||||||
|
config,
|
||||||
|
{"pkg/good.py": {"adjusted_percent": 99.0, "raw_percent": 99.0, "excluded_lines": []}},
|
||||||
|
True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert report["status"] == "ok"
|
||||||
|
assert report["summary"]["legacy_line_count_files"] == 1
|
||||||
|
assert report["files"]["pkg/good.py"]["coverage_enforced"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_main_fails_when_new_violations_appear(monkeypatch, tmp_path: Path) -> None:
|
||||||
|
pkg = tmp_path / "pkg"
|
||||||
|
pkg.mkdir()
|
||||||
|
(pkg / "good.py").write_text(
|
||||||
|
"\n".join(
|
||||||
|
[
|
||||||
|
"def undocumented(value):",
|
||||||
|
" total = value + 1",
|
||||||
|
" total += 1",
|
||||||
|
" total += 1",
|
||||||
|
" return total",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
+ "\n",
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
config_path = tmp_path / "quality_gate.toml"
|
||||||
|
_write_quality_config(config_path)
|
||||||
|
monkeypatch.chdir(tmp_path)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
sys,
|
||||||
|
"argv",
|
||||||
|
["check_quality_gate.py", "--config", "quality_gate.toml", "--coverage-json", "missing.json", "--output", "out.json"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert quality_gate.main() == 1
|
||||||
|
report = json.loads((tmp_path / "out.json").read_text(encoding="utf-8"))
|
||||||
|
assert report["summary"]["docstring_violations"] >= 1
|
||||||
|
assert report["summary"]["coverage_violations"] >= 1
|
||||||
@ -7,6 +7,7 @@ import httpx
|
|||||||
from ariadne.services.mailu import MailuService
|
from ariadne.services.mailu import MailuService
|
||||||
from ariadne.utils.errors import safe_error_detail
|
from ariadne.utils.errors import safe_error_detail
|
||||||
from ariadne.utils.http import extract_bearer_token
|
from ariadne.utils.http import extract_bearer_token
|
||||||
|
from ariadne.utils.name_generator import NameGenerator
|
||||||
from ariadne.utils.passwords import random_password
|
from ariadne.utils.passwords import random_password
|
||||||
|
|
||||||
|
|
||||||
@ -81,6 +82,22 @@ def test_safe_error_detail_timeout() -> None:
|
|||||||
assert safe_error_detail(exc, "fallback") == "timeout"
|
assert safe_error_detail(exc, "fallback") == "timeout"
|
||||||
|
|
||||||
|
|
||||||
|
def test_safe_error_detail_runtime_blank_uses_fallback() -> None:
|
||||||
|
assert safe_error_detail(RuntimeError(" "), "fallback") == "fallback"
|
||||||
|
|
||||||
|
|
||||||
|
def test_safe_error_detail_http_status_without_body() -> None:
|
||||||
|
request = httpx.Request("GET", "https://example.com")
|
||||||
|
response = httpx.Response(403, request=request)
|
||||||
|
exc = httpx.HTTPStatusError("bad", request=request, response=response)
|
||||||
|
|
||||||
|
assert safe_error_detail(exc, "fallback") == "http 403"
|
||||||
|
|
||||||
|
|
||||||
|
def test_safe_error_detail_unknown_exception_uses_fallback() -> None:
|
||||||
|
assert safe_error_detail(ValueError("boom"), "fallback") == "fallback"
|
||||||
|
|
||||||
|
|
||||||
def test_extract_bearer_token() -> None:
|
def test_extract_bearer_token() -> None:
|
||||||
request = DummyRequest({"Authorization": "Bearer token123"})
|
request = DummyRequest({"Authorization": "Bearer token123"})
|
||||||
assert extract_bearer_token(request) == "token123"
|
assert extract_bearer_token(request) == "token123"
|
||||||
@ -94,3 +111,24 @@ def test_extract_bearer_token_invalid() -> None:
|
|||||||
def test_extract_bearer_token_missing_parts() -> None:
|
def test_extract_bearer_token_missing_parts() -> None:
|
||||||
request = DummyRequest({"Authorization": "Bearer"})
|
request = DummyRequest({"Authorization": "Bearer"})
|
||||||
assert extract_bearer_token(request) is None
|
assert extract_bearer_token(request) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_name_generator_unique_adds_candidate(monkeypatch) -> None:
|
||||||
|
generator = NameGenerator(max_attempts=2)
|
||||||
|
monkeypatch.setattr(NameGenerator, "generate", lambda self: "atlas-guest")
|
||||||
|
existing = {"taken"}
|
||||||
|
|
||||||
|
assert generator.unique(existing) == "atlas-guest"
|
||||||
|
assert "atlas-guest" in existing
|
||||||
|
|
||||||
|
|
||||||
|
def test_name_generator_unique_returns_none_after_retries(monkeypatch) -> None:
|
||||||
|
generator = NameGenerator(max_attempts=2)
|
||||||
|
monkeypatch.setattr(NameGenerator, "generate", lambda self: "taken")
|
||||||
|
assert generator.unique({"taken"}) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_name_generator_generate_normalizes_words(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr("ariadne.utils.name_generator.coolname.generate", lambda words: [" Atlas ", "", "Guest"])
|
||||||
|
generator = NameGenerator(words=3)
|
||||||
|
assert generator.generate() == "atlas-guest"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user