ci(test): add ratcheting quality gate

This commit is contained in:
Brad Stein 2026-04-10 13:57:33 -03:00
parent 06759399fb
commit ff3339fafc
24 changed files with 1493 additions and 223 deletions

124
Jenkinsfile vendored
View File

@ -76,8 +76,8 @@ spec:
IMAGE = "${REGISTRY}/ariadne"
VERSION_TAG = 'dev'
SEMVER = 'dev'
COVERAGE_MIN = '99'
COVERAGE_JSON = 'build/coverage.json'
QUALITY_GATE_JSON = 'build/quality-gate.json'
JUNIT_XML = 'build/junit.xml'
SUITE_NAME = 'ariadne'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
@ -102,30 +102,20 @@ spec:
set -euo pipefail
mkdir -p build
python -m pip install --no-cache-dir -r requirements.txt -r requirements-dev.txt
python -m ruff check ariadne --select PLR
python -m ruff check ariadne tests scripts
python -m slipcover \
--json \
--out "${COVERAGE_JSON}" \
--source ariadne \
--fail-under "${COVERAGE_MIN}" \
-m pytest -ra -vv --durations=20 --junitxml "${JUNIT_XML}"
python scripts/check_quality_gate.py --coverage-json "${COVERAGE_JSON}" --output "${QUALITY_GATE_JSON}"
python -c "import json; payload=json.load(open('build/coverage.json', encoding='utf-8')); percent=(payload.get('summary') or {}).get('percent_covered'); print(f'Coverage summary: {percent:.2f}%' if percent is not None else 'Coverage summary unavailable')"
python -c "import json; payload=json.load(open('build/quality-gate.json', encoding='utf-8')); summary=payload.get('summary') or {}; print(f\"Quality gate: {payload.get('status')} violations={summary.get('violations_total', 0)} coverage_targets={summary.get('coverage_targets', 0)}\")"
'''.stripIndent())
}
}
}
stage('Publish test metrics') {
steps {
container('tester') {
sh '''
set -euo pipefail
python scripts/publish_test_metrics.py
'''
}
}
}
stage('Prep toolchain') {
steps {
container('builder') {
@ -202,97 +192,21 @@ python -c "import json; payload=json.load(open('build/coverage.json', encoding='
}
post {
success {
container('tester') {
sh '''
set -euo pipefail
export QUALITY_STATUS=ok
python - <<'PY'
import os
import re
import urllib.request
suite = os.environ.get("SUITE_NAME", "ariadne")
status = os.environ.get("QUALITY_STATUS", "failed")
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
def counter(name: str) -> float:
pattern = re.compile(
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
re.M,
)
match = pattern.search(text)
return float(match.group(1)) if match else 0.0
ok = counter("ok")
failed = counter("failed")
if status == "ok":
ok += 1
else:
failed += 1
payload = (
"# TYPE platform_quality_gate_runs_total counter\\n"
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
)
req = urllib.request.Request(
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
data=payload.encode("utf-8"),
method="POST",
headers={"Content-Type": "text/plain"},
)
urllib.request.urlopen(req, timeout=10).read()
PY
'''
}
}
failure {
container('tester') {
sh '''
set -euo pipefail
export QUALITY_STATUS=failed
python - <<'PY'
import os
import re
import urllib.request
suite = os.environ.get("SUITE_NAME", "ariadne")
status = os.environ.get("QUALITY_STATUS", "failed")
gateway = os.environ.get("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091").rstrip("/")
text = urllib.request.urlopen(f"{gateway}/metrics", timeout=10).read().decode("utf-8", errors="replace")
def counter(name: str) -> float:
pattern = re.compile(
rf'^platform_quality_gate_runs_total\\{{[^}}]*job="platform-quality-ci"[^}}]*suite="{re.escape(suite)}"[^}}]*status="{name}"[^}}]*\\}}\\s+([0-9]+(?:\\.[0-9]+)?)$',
re.M,
)
match = pattern.search(text)
return float(match.group(1)) if match else 0.0
ok = counter("ok")
failed = counter("failed")
if status == "ok":
ok += 1
else:
failed += 1
payload = (
"# TYPE platform_quality_gate_runs_total counter\\n"
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {int(ok)}\\n'
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {int(failed)}\\n'
)
req = urllib.request.Request(
f"{gateway}/metrics/job/platform-quality-ci/suite/{suite}",
data=payload.encode("utf-8"),
method="POST",
headers={"Content-Type": "text/plain"},
)
urllib.request.urlopen(req, timeout=10).read()
PY
'''
}
}
always {
script {
env.QUALITY_STATUS = currentBuild.currentResult == 'SUCCESS' ? 'ok' : 'failed'
}
container('tester') {
sh '''
set +e
python scripts/publish_test_metrics.py
status=$?
set -e
if [ "$status" -ne 0 ]; then
echo "test metric publication failed with status $status"
fi
'''
}
script {
if (fileExists('build/junit.xml')) {
try {
@ -302,7 +216,7 @@ PY
}
}
}
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json', allowEmptyArchive: true, fingerprint: true
archiveArtifacts artifacts: 'build/junit.xml,build/coverage.json,build/quality-gate.json', allowEmptyArchive: true, fingerprint: true
script {
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]
echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}"

View File

@ -12,6 +12,13 @@ from ..settings import settings
@dataclass(frozen=True)
class AuthContext:
"""Authenticated user details returned by the OIDC verifier.
Inputs: normalized claims extracted from a validated bearer token.
Outputs: a compact object that downstream handlers can trust without
repeating token parsing logic.
"""
username: str
email: str
groups: list[str]
@ -19,6 +26,13 @@ class AuthContext:
class KeycloakOIDC:
"""Validate Keycloak-issued access tokens for Ariadne API requests.
Inputs: the JWKS URL, expected issuer, and client identifier.
Outputs: verified token claims after signature and audience checks so the
API can make authorization decisions safely.
"""
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
self._jwks_url = jwks_url
self._issuer = issuer
@ -97,6 +111,13 @@ class KeycloakOIDC:
class Authenticator:
"""Convert bearer tokens into normalized Ariadne auth contexts.
Inputs: raw bearer tokens from incoming API requests.
Outputs: an `AuthContext` with cleaned usernames, emails, and groups so
endpoint handlers can stay focused on business logic.
"""
def __init__(self) -> None:
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)

View File

@ -15,6 +15,13 @@ logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class DatabaseConfig:
"""Connection-pool and timeout settings for a database client.
Inputs: pool sizing and timeout values supplied by application settings.
Outputs: a single immutable config object so database construction remains
explicit and easy to test.
"""
pool_min: int = 0
pool_max: int = 5
connect_timeout_sec: int = 5
@ -25,6 +32,13 @@ class DatabaseConfig:
class Database:
"""Thin wrapper around a psycopg connection pool for Ariadne storage.
Inputs: a Postgres DSN plus optional pool and timeout configuration.
Outputs: helper methods for migrations and common query patterns while
centralizing timeout, locking, and row-format behavior.
"""
def __init__(self, dsn: str, config: DatabaseConfig | None = None) -> None:
if not dsn:
raise RuntimeError("database URL is required")

View File

@ -56,6 +56,13 @@ def delete_json(path: str) -> dict[str, Any]:
def get_secret_value(namespace: str, name: str, key: str) -> str:
"""Read and decode a string value from a Kubernetes Secret.
Inputs: a namespace, secret name, and key inside the secret data map.
Outputs: the decoded UTF-8 value so callers can consume cluster-managed
credentials without duplicating base64 and validation logic.
"""
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
raw = blob.get(key)

View File

@ -16,7 +16,7 @@ except Exception as exc: # pragma: no cover - import checked at runtime
else:
_IMPORT_ERROR = None
from .pods import PodSelectionError, select_pod
from .pods import select_pod
from ..utils.logging import get_logger
@ -26,6 +26,14 @@ _CORE_API = None
@dataclass(frozen=True)
class ExecResult:
"""Container for pod exec output captured from the Kubernetes stream API.
Inputs: stdout, stderr, and the observed exit code from a command run in a
pod container.
Outputs: a small immutable result object that callers can inspect or raise
on without juggling stream state.
"""
stdout: str
stderr: str
exit_code: int | None
@ -65,6 +73,13 @@ def _build_command(command: list[str] | str, env: dict[str, str] | None) -> list
class PodExecutor:
"""Run shell commands in the most suitable pod for a workload selector.
Inputs: a namespace, label selector, and optional container name.
Outputs: structured `ExecResult` objects or `ExecError`/`TimeoutError`
exceptions so service code can react consistently to pod command results.
"""
def __init__(self, namespace: str, label_selector: str, container: str | None = None) -> None:
self._namespace = namespace
self._label_selector = label_selector

View File

@ -10,6 +10,13 @@ from .client import get_json
@dataclass(frozen=True)
class PodRef:
"""Reference to a Kubernetes pod chosen for a follow-up operation.
Inputs: the pod name, namespace, and optional node name.
Outputs: a stable value object that higher-level helpers can pass around
without carrying the full Kubernetes payload.
"""
name: str
namespace: str
node: str | None = None
@ -47,6 +54,13 @@ def _is_ready(pod: dict[str, Any]) -> bool:
def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
"""Fetch pods for a namespace and selector from the Kubernetes API.
Inputs: the target namespace and a label-selector string.
Outputs: only dictionary pod objects so later selection logic can assume a
predictable payload shape.
"""
namespace = (namespace or "").strip()
if not namespace:
raise PodSelectionError("pod namespace missing")
@ -58,6 +72,13 @@ def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
def select_pod(namespace: str, label_selector: str) -> PodRef:
"""Pick the newest ready pod for a namespace and label selector.
Inputs: the namespace and selector used to query Kubernetes pods.
Outputs: a `PodRef` for the most recently started ready pod, or a
`PodSelectionError` when no safe candidate exists.
"""
pods = list_pods(namespace, label_selector)
candidates: list[tuple[float, PodRef]] = []
for pod in pods:

View File

@ -84,6 +84,14 @@ def record_schedule_state(
next_run_ts: float | None,
ok: bool | None,
) -> None:
"""Record scheduler timing and status gauges for a task.
Inputs: the task name plus timestamps for the last run, last success, next
run, and an optional success flag.
Outputs: updated Prometheus gauges that make scheduler health visible in
dashboards and alerts.
"""
if last_run_ts:
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
if last_success_ts:
@ -108,6 +116,13 @@ def set_cluster_state_metrics(
pods_running: float | None,
kustomizations_not_ready: int | None,
) -> None:
"""Publish the latest cluster-state summary to Prometheus gauges.
Inputs: the collection timestamp and aggregate node, pod, and Flux counts.
Outputs: refreshed gauges so Grafana panels can render the current cluster
snapshot without parsing raw service logs.
"""
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
if nodes_total is not None:
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)

View File

@ -24,6 +24,14 @@ def _build_db(dsn: str, application_name: str) -> Database:
def main() -> None:
"""Run Ariadne and portal schema migrations when enabled in settings.
Inputs: process-wide application settings for database URLs and migration
toggles.
Outputs: applied migrations and closed pools so CLI and container startup
paths can bootstrap storage safely.
"""
if not settings.ariadne_run_migrations:
return

View File

@ -1,7 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import re
import time
from typing import Any
@ -9,7 +8,7 @@ from typing import Any
import httpx
import psycopg
from ..k8s.exec import ExecError, PodExecutor
from ..k8s.exec import ExecError, ExecResult, PodExecutor
from ..k8s.pods import PodSelectionError
from ..settings import settings
from ..utils.logging import get_logger

68
quality_gate.toml Normal file
View File

@ -0,0 +1,68 @@
[files]
roots = ["ariadne", "tests", "scripts"]
max_lines = 500
[docstrings]
roots = ["ariadne", "scripts"]
non_trivial_min_lines = 6
[coverage]
roots = ["ariadne"]
threshold = 95.0
targets = [
"ariadne/auth/keycloak.py",
"ariadne/db/database.py",
"ariadne/k8s/client.py",
"ariadne/k8s/pods.py",
"ariadne/metrics/metrics.py",
"ariadne/migrate.py",
"ariadne/utils/errors.py",
"ariadne/utils/http.py",
"ariadne/utils/logging.py",
"ariadne/utils/name_generator.py",
"ariadne/utils/passwords.py",
]
[legacy.line_count]
"ariadne/app.py" = 996
"ariadne/manager/provisioning.py" = 933
"ariadne/services/cluster_state.py" = 3705
"ariadne/services/comms.py" = 943
"ariadne/services/firefly.py" = 667
"ariadne/services/nextcloud.py" = 698
"ariadne/services/vault.py" = 558
"ariadne/services/wger.py" = 624
"ariadne/settings.py" = 590
"tests/test_app.py" = 1001
"tests/test_keycloak_admin.py" = 679
"tests/test_provisioning.py" = 1762
"tests/test_services.py" = 1483
[legacy.docstrings]
"ariadne/app.py" = 15
"ariadne/db/storage.py" = 4
"ariadne/manager/provisioning.py" = 2
"ariadne/scheduler/cron.py" = 1
"ariadne/services/cluster_state.py" = 4
"ariadne/services/comms.py" = 2
"ariadne/services/firefly.py" = 4
"ariadne/services/image_sweeper.py" = 1
"ariadne/services/keycloak_admin.py" = 1
"ariadne/services/keycloak_profile.py" = 2
"ariadne/services/mailer.py" = 1
"ariadne/services/mailu.py" = 5
"ariadne/services/mailu_events.py" = 1
"ariadne/services/metis.py" = 1
"ariadne/services/nextcloud.py" = 3
"ariadne/services/opensearch_prune.py" = 2
"ariadne/services/pod_cleaner.py" = 1
"ariadne/services/soteria.py" = 2
"ariadne/services/vault.py" = 2
"ariadne/services/vaultwarden.py" = 1
"ariadne/services/vaultwarden_sync.py" = 4
"ariadne/services/wger.py" = 4
"ariadne/settings.py" = 1
"ariadne/utils/errors.py" = 1
"ariadne/utils/http.py" = 1
"ariadne/utils/logging.py" = 3
"ariadne/utils/name_generator.py" = 1

407
scripts/check_quality_gate.py Executable file
View File

@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""Enforce Ariadne's ratcheting test-quality gate.
Inputs: repository Python files, optional coverage JSON, and a TOML config that
captures the current legacy exceptions.
Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or
coverage requirements regress, so CI can block quality drift while allowing
incremental cleanup.
"""
from __future__ import annotations
import argparse
import ast
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import tomllib
@dataclass(frozen=True)
class DefinitionFinding:
"""Describe a public definition missing a required docstring.
Inputs: the symbol kind, name, source line, and logical size.
Outputs: a compact record that makes docstring failures actionable in CLI
output and in the JSON quality report.
"""
kind: str
name: str
lineno: int
length: int
@dataclass(frozen=True)
class Violation:
"""Represent a single quality-gate violation.
Inputs: the violated check name, file path, and a human-readable message.
Outputs: a normalized record for console rendering and JSON serialization so
Jenkins and local runs report the same facts.
"""
check: str
path: str
message: str
@dataclass(frozen=True)
class QualityConfig:
"""Typed quality-gate settings loaded from TOML.
Inputs: parsed configuration sections for file-size, docstrings, and
coverage enforcement.
Outputs: one immutable object so the gate logic stays deterministic and easy
to test.
"""
line_roots: tuple[str, ...]
max_lines: int
legacy_max_lines: dict[str, int]
docstring_roots: tuple[str, ...]
non_trivial_min_lines: int
legacy_missing_docstrings: dict[str, int]
coverage_roots: tuple[str, ...]
coverage_targets: tuple[str, ...]
coverage_threshold: float
def _load_config(path: Path) -> QualityConfig:
"""Load the quality-gate config from TOML.
Inputs: the path to the repository-local TOML config file.
Outputs: validated `QualityConfig` values used by every gate check so local
runs and Jenkins share the same policy.
"""
payload = tomllib.loads(path.read_text(encoding="utf-8"))
files = payload.get("files") or {}
docstrings = payload.get("docstrings") or {}
coverage = payload.get("coverage") or {}
legacy = payload.get("legacy") or {}
return QualityConfig(
line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")),
max_lines=int(files.get("max_lines", 500)),
legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()},
docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")),
non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)),
legacy_missing_docstrings={
str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items()
},
coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)),
coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()),
coverage_threshold=float(coverage.get("threshold", 95.0)),
)
def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]:
"""Collect Python files under the configured roots.
Inputs: the repository root plus the roots to scan.
Outputs: sorted Python paths so the gate produces stable results and diffs.
"""
files: list[Path] = []
for root in roots:
base = repo_root / root
if not base.exists():
continue
files.extend(sorted(base.rglob("*.py")))
return sorted({path for path in files})
def _relative(path: Path, repo_root: Path) -> str:
return path.relative_to(repo_root).as_posix()
def _line_count(path: Path) -> int:
return len(path.read_text(encoding="utf-8").splitlines())
def _definition_length(node: ast.AST) -> int:
end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0)
return max(end_lineno - getattr(node, "lineno", 0) + 1, 1)
def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]:
"""Find public top-level definitions missing required docstrings.
Inputs: a Python file path and the minimum logical size considered
non-trivial.
Outputs: missing-docstring findings so the gate can ratchet legacy files
while blocking new undocumented public APIs.
"""
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
findings: list[DefinitionFinding] = []
for node in module.body:
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
continue
if node.name.startswith("_"):
continue
length = _definition_length(node)
if length < min_lines:
continue
if ast.get_docstring(node) is not None:
continue
findings.append(
DefinitionFinding(
kind=type(node).__name__,
name=node.name,
lineno=getattr(node, "lineno", 1),
length=length,
)
)
return findings
def _excluded_coverage_lines(path: Path) -> set[int]:
"""Collect non-executable lines that Slipcover still reports as missing.
Inputs: a Python source file path.
Outputs: line numbers for multiline definition headers and docstring blocks
so adjusted per-file coverage tracks executable logic rather than syntax
scaffolding required for readability.
"""
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
excluded: set[int] = set()
def visit(node: ast.AST) -> None:
if isinstance(node, ast.Module):
for child in node.body:
visit(child)
return
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
return
if node.body:
body_start = node.body[0].lineno
excluded.update(range(node.lineno + 1, body_start))
first = node.body[0]
if (
isinstance(first, ast.Expr)
and isinstance(first.value, ast.Constant)
and isinstance(first.value.value, str)
):
excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1))
for child in node.body:
visit(child)
visit(module)
return excluded
def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]:
"""Read per-file coverage details from Slipcover JSON output.
Inputs: the optional coverage artifact path produced by the test run.
Outputs: raw and adjusted per-file coverage details so the gate can enforce
realistic thresholds even when Slipcover counts docstrings and wrapped
signatures as missing lines.
"""
if path is None or not path.exists():
return {}
payload = json.loads(path.read_text(encoding="utf-8"))
files = payload.get("files") or {}
coverage: dict[str, dict[str, Any]] = {}
for name, data in files.items():
if not isinstance(data, dict):
continue
summary = data.get("summary") or {}
percent = summary.get("percent_covered")
executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)}
missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)}
relative = str(name)
source_path = repo_root / relative
excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set()
adjusted_missing = missing_lines - excluded_lines
adjusted_total = len(executed_lines) + len(adjusted_missing)
adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0
coverage[relative] = {
"raw_percent": float(percent) if isinstance(percent, (int, float)) else None,
"adjusted_percent": adjusted_percent,
"excluded_lines": sorted(excluded_lines),
}
return coverage
def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]:
return [{"check": item.check, "path": item.path, "message": item.message} for item in violations]
def _build_report(
repo_root: Path,
config: QualityConfig,
coverage: dict[str, dict[str, Any]],
coverage_artifact_present: bool,
) -> dict[str, Any]:
"""Run all configured checks and build a JSON-serializable report.
Inputs: repository paths, quality-gate config, and per-file coverage data.
Outputs: a complete report for CI artifacts, metrics publication, and local
debugging when the gate fails.
"""
violations: list[Violation] = []
files_report: dict[str, dict[str, Any]] = {}
for path in _iter_python_files(repo_root, config.line_roots):
relative = _relative(path, repo_root)
lines = _line_count(path)
entry = files_report.setdefault(relative, {})
entry["lines"] = lines
entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines)
entry["line_limit_legacy"] = relative in config.legacy_max_lines
if lines > entry["line_limit"]:
violations.append(
Violation(
"line_count",
relative,
f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}",
)
)
for path in _iter_python_files(repo_root, config.docstring_roots):
relative = _relative(path, repo_root)
findings = _missing_docstrings(path, config.non_trivial_min_lines)
entry = files_report.setdefault(relative, {})
entry["missing_docstrings"] = len(findings)
entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0)
entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings
entry["missing_docstring_symbols"] = [
{
"kind": finding.kind,
"name": finding.name,
"lineno": finding.lineno,
"length": finding.length,
}
for finding in findings
]
if len(findings) > entry["missing_docstrings_allowed"]:
excess = findings[entry["missing_docstrings_allowed"] :]
for finding in excess:
violations.append(
Violation(
"docstrings",
relative,
f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}",
)
)
coverage_target_set = set(config.coverage_targets)
coverage_root_files = {
_relative(path, repo_root)
for path in _iter_python_files(repo_root, config.coverage_roots)
}
for relative in sorted(coverage_root_files):
entry = files_report.setdefault(relative, {})
details = coverage.get(relative) or {}
value = details.get("adjusted_percent")
entry["coverage_percent"] = value
entry["coverage_raw_percent"] = details.get("raw_percent")
entry["coverage_excluded_lines"] = details.get("excluded_lines") or []
entry["coverage_enforced"] = relative in coverage_target_set
if relative not in coverage_target_set:
continue
entry["coverage_target"] = config.coverage_threshold
if value is None:
violations.append(
Violation("coverage", relative, f"missing coverage data for {relative}")
)
continue
if value < config.coverage_threshold:
violations.append(
Violation(
"coverage",
relative,
f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%",
)
)
if coverage_target_set and not coverage_artifact_present:
violations.append(
Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets")
)
summary = {
"violations_total": len(violations),
"line_count_violations": sum(item.check == "line_count" for item in violations),
"docstring_violations": sum(item.check == "docstrings" for item in violations),
"coverage_violations": sum(item.check == "coverage" for item in violations),
"legacy_line_count_files": len(config.legacy_max_lines),
"legacy_docstring_files": len(config.legacy_missing_docstrings),
"coverage_targets": len(coverage_target_set),
"coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0),
}
return {
"status": "ok" if not violations else "failed",
"rules": {
"max_lines": config.max_lines,
"docstring_non_trivial_min_lines": config.non_trivial_min_lines,
"coverage_threshold": config.coverage_threshold,
},
"summary": summary,
"violations": _serialize_violations(violations),
"files": dict(sorted(files_report.items())),
}
def _print_report(report: dict[str, Any]) -> None:
"""Render a concise CLI summary for local and Jenkins logs.
Inputs: the JSON-ready report produced by the gate.
Outputs: human-readable lines that point directly at each violation so a
failing build is easy to fix.
"""
print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True))
for violation in report.get("violations") or []:
print(f"[{violation['check']}] {violation['path']}: {violation['message']}")
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments for the quality gate.
Inputs: command-line flags supplied by Jenkins or a local developer.
Outputs: normalized paths and options so the gate stays scriptable and
predictable in every environment.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config")
parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output")
parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report")
return parser.parse_args()
def main() -> int:
"""Run the Ariadne quality gate and write its JSON report.
Inputs: CLI arguments naming the config and optional coverage artifact.
Outputs: a persisted JSON report and a process exit code that Jenkins can
use to enforce quality rules.
"""
args = parse_args()
repo_root = Path.cwd()
config_path = repo_root / args.config
output_path = repo_root / args.output
coverage_path = repo_root / args.coverage_json
config = _load_config(config_path)
coverage_present = coverage_path.exists()
coverage = _load_coverage(coverage_path if coverage_present else None, repo_root)
report = _build_report(repo_root, config, coverage, coverage_present)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
_print_report(report)
return 0 if report["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())

284
scripts/publish_test_metrics.py Normal file → Executable file
View File

@ -1,35 +1,74 @@
#!/usr/bin/env python3
"""Publish Ariadne quality-gate test metrics to Pushgateway (Prometheus ingest)."""
"""Publish Ariadne test and quality-gate metrics to Pushgateway.
Inputs: build artifacts such as JUnit XML, Slipcover coverage JSON, optional
quality-gate JSON, and standard Jenkins metadata from the environment.
Outputs: Prometheus metric lines pushed to Pushgateway so Grafana can chart test
health, quality-gate drift, and build context even when a build fails early.
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from typing import Any
import urllib.request
import xml.etree.ElementTree as ET
def _escape_label(value: str) -> str:
"""Escape a Prometheus label value.
Inputs: an arbitrary label string from build metadata.
Outputs: a safely escaped label fragment so pushed metric lines remain valid
Prometheus exposition format.
"""
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def _label_str(labels: dict[str, str]) -> str:
"""Build a Prometheus label set from non-empty values.
Inputs: a dictionary of label keys and raw string values.
Outputs: a `{...}` label fragment or an empty string so callers can compose
metrics without repeating label-filtering logic.
"""
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
return "{" + ",".join(parts) + "}" if parts else ""
def _load_coverage(path: str) -> float:
with open(path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
def _load_coverage(path: Path) -> float | None:
"""Read the overall Slipcover percentage if the artifact exists.
Inputs: the expected coverage artifact path.
Outputs: the percent covered value, or `None` when the artifact is missing or
malformed so failed builds can still publish partial metrics.
"""
if not path.exists():
return None
payload = json.loads(path.read_text(encoding="utf-8"))
summary = payload.get("summary") or {}
percent = summary.get("percent_covered")
if isinstance(percent, (int, float)):
return float(percent)
raise RuntimeError("coverage summary missing percent_covered")
return None
def _load_junit(path: str) -> dict[str, int]:
def _load_junit(path: Path) -> dict[str, int] | None:
"""Aggregate JUnit totals if the artifact exists.
Inputs: the expected JUnit XML path.
Outputs: test totals by outcome, or `None` when the artifact is absent so the
publisher can still emit build and gate status metrics.
"""
if not path.exists():
return None
tree = ET.parse(path)
root = tree.getroot()
@ -57,6 +96,20 @@ def _load_junit(path: str) -> dict[str, int]:
return totals
def _load_quality_gate(path: Path) -> dict[str, Any] | None:
"""Load the optional quality-gate JSON report.
Inputs: the expected report path from `scripts/check_quality_gate.py`.
Outputs: the parsed report when present so metric publication can include
violation counts and per-file gate state.
"""
if not path.exists():
return None
payload = json.loads(path.read_text(encoding="utf-8"))
return payload if isinstance(payload, dict) else None
def _read_http(url: str) -> str:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
@ -78,6 +131,13 @@ def _post_text(url: str, payload: str) -> None:
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
"""Fetch the current counter value for a metric series.
Inputs: the Pushgateway base URL plus the metric name and exact labels.
Outputs: the last published counter value so reruns can increment rather than
overwrite the total.
"""
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
if not text:
return 0.0
@ -85,7 +145,7 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
for line in text.splitlines():
if not line.startswith(metric + "{"):
continue
if any(f'{k}="{v}"' not in line for k, v in labels.items()):
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
continue
parts = line.split()
if len(parts) < 2:
@ -97,9 +157,22 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
return 0.0
def main() -> int:
coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json")
junit_path = os.getenv("JUNIT_XML", "build/junit.xml")
def _metric_line(name: str, value: int | float, labels: dict[str, str] | None = None) -> str:
label_text = _label_str(labels or {})
return f"{name}{label_text} {value}"
def _build_payload() -> tuple[str, dict[str, Any]]:
"""Assemble the Pushgateway payload and a summary object.
Inputs: environment variables and any available build artifacts.
Outputs: metric lines ready for Pushgateway plus a summary dict that local
tests and Jenkins logs can inspect.
"""
coverage_path = Path(os.getenv("COVERAGE_JSON", "build/coverage.json"))
junit_path = Path(os.getenv("JUNIT_XML", "build/junit.xml"))
quality_gate_path = Path(os.getenv("QUALITY_GATE_JSON", "build/quality-gate.json"))
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
).strip()
@ -107,19 +180,17 @@ def main() -> int:
branch = os.getenv("BRANCH_NAME", "")
build_number = os.getenv("BUILD_NUMBER", "")
commit = os.getenv("GIT_COMMIT", "")
if not os.path.exists(coverage_path):
raise RuntimeError(f"missing coverage file {coverage_path}")
if not os.path.exists(junit_path):
raise RuntimeError(f"missing junit file {junit_path}")
outcome = os.getenv("QUALITY_STATUS", "failed").strip() or "failed"
coverage = _load_coverage(coverage_path)
totals = _load_junit(junit_path)
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
quality_gate = _load_quality_gate(quality_gate_path)
outcome = "ok"
if totals["tests"] <= 0 or totals["failures"] > 0 or totals["errors"] > 0:
outcome = "failed"
tests_total = totals["tests"] if totals else 0
tests_failed = totals["failures"] if totals else 0
tests_errors = totals["errors"] if totals else 0
tests_skipped = totals["skipped"] if totals else 0
tests_passed = max(tests_total - tests_failed - tests_errors - tests_skipped, 0)
job_name = "platform-quality-ci"
ok_count = _fetch_existing_counter(
@ -137,52 +208,149 @@ def main() -> int:
else:
failed_count += 1
labels = {
build_labels = {
"suite": suite,
"branch": branch,
"build_number": build_number,
"commit": commit,
}
payload_lines = [
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
"# TYPE ariadne_quality_gate_tests_total gauge",
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
f'ariadne_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
"# TYPE ariadne_quality_gate_coverage_percent gauge",
f'ariadne_quality_gate_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
"# TYPE ariadne_quality_gate_build_info gauge",
f"ariadne_quality_gate_build_info{_label_str(labels)} 1",
]
payload = "\n".join(payload_lines) + "\n"
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}", payload)
suite_labels = {"suite": suite}
print(
json.dumps(
{
"suite": suite,
"outcome": outcome,
"tests_total": totals["tests"],
"tests_passed": passed,
"tests_failed": totals["failures"],
"tests_errors": totals["errors"],
"tests_skipped": totals["skipped"],
"coverage_percent": round(coverage, 3),
"ok_counter": ok_count,
"failed_counter": failed_count,
},
indent=2,
summary = (quality_gate or {}).get("summary") or {}
files = (quality_gate or {}).get("files") or {}
metric_lines = [
"# TYPE platform_quality_gate_runs_total counter",
_metric_line("platform_quality_gate_runs_total", int(ok_count), {"suite": suite, "status": "ok"}),
_metric_line("platform_quality_gate_runs_total", int(failed_count), {"suite": suite, "status": "failed"}),
"# TYPE ariadne_quality_gate_status gauge",
_metric_line("ariadne_quality_gate_status", 1 if outcome == "ok" else 0, suite_labels),
"# TYPE ariadne_quality_gate_artifact_present gauge",
_metric_line(
"ariadne_quality_gate_artifact_present",
1 if coverage_path.exists() else 0,
{"suite": suite, "artifact": "coverage_json"},
),
_metric_line(
"ariadne_quality_gate_artifact_present",
1 if junit_path.exists() else 0,
{"suite": suite, "artifact": "junit_xml"},
),
_metric_line(
"ariadne_quality_gate_artifact_present",
1 if quality_gate_path.exists() else 0,
{"suite": suite, "artifact": "quality_gate_json"},
),
"# TYPE ariadne_quality_gate_tests_total gauge",
_metric_line("ariadne_quality_gate_tests_total", tests_passed, {"suite": suite, "result": "passed"}),
_metric_line("ariadne_quality_gate_tests_total", tests_failed, {"suite": suite, "result": "failed"}),
_metric_line("ariadne_quality_gate_tests_total", tests_errors, {"suite": suite, "result": "error"}),
_metric_line("ariadne_quality_gate_tests_total", tests_skipped, {"suite": suite, "result": "skipped"}),
"# TYPE ariadne_quality_gate_coverage_percent gauge",
_metric_line("ariadne_quality_gate_coverage_percent", round(coverage or 0.0, 3), suite_labels),
"# TYPE ariadne_quality_gate_violation_total gauge",
_metric_line(
"ariadne_quality_gate_violation_total",
int(summary.get("line_count_violations", 0)),
{"suite": suite, "check": "line_count"},
),
_metric_line(
"ariadne_quality_gate_violation_total",
int(summary.get("docstring_violations", 0)),
{"suite": suite, "check": "docstrings"},
),
_metric_line(
"ariadne_quality_gate_violation_total",
int(summary.get("coverage_violations", 0)),
{"suite": suite, "check": "coverage"},
),
"# TYPE ariadne_quality_gate_legacy_exception_total gauge",
_metric_line(
"ariadne_quality_gate_legacy_exception_total",
int(summary.get("legacy_line_count_files", 0)),
{"suite": suite, "check": "line_count"},
),
_metric_line(
"ariadne_quality_gate_legacy_exception_total",
int(summary.get("legacy_docstring_files", 0)),
{"suite": suite, "check": "docstrings"},
),
_metric_line(
"ariadne_quality_gate_legacy_exception_total",
int(summary.get("coverage_exemptions", 0)),
{"suite": suite, "check": "coverage"},
),
"# TYPE ariadne_quality_gate_build_info gauge",
f"ariadne_quality_gate_build_info{_label_str(build_labels)} 1",
]
if quality_gate:
metric_lines.extend(
[
"# TYPE ariadne_quality_gate_file_lines gauge",
"# TYPE ariadne_quality_gate_file_missing_docstrings gauge",
"# TYPE ariadne_quality_gate_file_coverage_percent gauge",
]
)
)
for path, data in sorted(files.items()):
file_labels = {"suite": suite, "file": path}
if isinstance(data.get("lines"), int):
metric_lines.append(_metric_line("ariadne_quality_gate_file_lines", data["lines"], file_labels))
if isinstance(data.get("missing_docstrings"), int):
metric_lines.append(
_metric_line(
"ariadne_quality_gate_file_missing_docstrings",
data["missing_docstrings"],
file_labels,
)
)
coverage_percent = data.get("coverage_percent")
if isinstance(coverage_percent, (int, float)):
metric_lines.append(
_metric_line(
"ariadne_quality_gate_file_coverage_percent",
round(float(coverage_percent), 3),
file_labels,
)
)
payload = "\n".join(metric_lines) + "\n"
result = {
"suite": suite,
"outcome": outcome,
"tests_total": tests_total,
"tests_passed": tests_passed,
"tests_failed": tests_failed,
"tests_errors": tests_errors,
"tests_skipped": tests_skipped,
"coverage_percent": round(coverage or 0.0, 3),
"quality_gate_present": quality_gate is not None,
"quality_gate_status": (quality_gate or {}).get("status") or "missing",
"quality_gate_summary": summary,
"ok_counter": ok_count,
"failed_counter": failed_count,
"payload": payload,
"pushgateway_url": pushgateway_url,
"job_name": job_name,
}
return payload, result
def main() -> int:
"""Publish Ariadne quality metrics to Pushgateway.
Inputs: environment variables plus any available build artifacts.
Outputs: a POST to Pushgateway and a JSON summary printed to stdout so local
runs and Jenkins logs can confirm exactly what was emitted.
"""
payload, result = _build_payload()
target = f"{result['pushgateway_url'].rstrip('/')}/metrics/job/{result['job_name']}/suite/{result['suite']}"
_post_text(target, payload)
printable = dict(result)
printable.pop("payload", None)
print(json.dumps(printable, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"metrics push failed: {exc}")
raise
raise SystemExit(main())

View File

@ -20,11 +20,10 @@ def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
lambda *args, **kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
kc,
"_decode_claims",
lambda *_args, **_kwargs: {"azp": "portal", "preferred_username": "alice", "groups": ["/admin"]},
)
claims = kc.verify(token)
@ -36,12 +35,7 @@ def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
lambda *args, **kwargs: {"azp": "other", "aud": ["other"]},
)
monkeypatch.setattr(kc, "_decode_claims", lambda *_args, **_kwargs: {"azp": "other", "aud": ["other"]})
with pytest.raises(ValueError):
kc.verify(token)
@ -73,11 +67,10 @@ def test_keycloak_verify_refreshes_jwks(monkeypatch) -> None:
return {"keys": [{"kid": "test"}]}
monkeypatch.setattr(kc, "_get_jwks", fake_get_jwks)
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
lambda *args, **kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
kc,
"_decode_claims",
lambda *_args, **_kwargs: {"azp": "other", "aud": "portal", "preferred_username": "alice"},
)
claims = kc.verify(token)
@ -94,6 +87,30 @@ def test_keycloak_verify_kid_not_found(monkeypatch) -> None:
kc.verify(token)
def test_keycloak_decode_claims_uses_expected_decoder_arguments(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
captured = {}
monkeypatch.setattr(
jwt.algorithms,
"RSAAlgorithm",
type("DummyRSA", (), {"from_jwk": staticmethod(lambda key: "parsed-key")}),
raising=False,
)
def fake_decode(token, **kwargs):
captured["token"] = token
captured["kwargs"] = kwargs
return {"preferred_username": "alice"}
monkeypatch.setattr(jwt, "decode", fake_decode)
claims = kc._decode_claims("header.payload.sig", {"kid": "test"})
assert claims["preferred_username"] == "alice"
assert captured["kwargs"]["key"] == "parsed-key"
assert captured["kwargs"]["issuer"] == "https://issuer"
def test_authenticator_normalizes_groups(monkeypatch) -> None:
token = _make_token()
auth = Authenticator()

View File

@ -240,55 +240,68 @@ def test_comms_guest_name_randomizer(monkeypatch) -> None:
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
responses = {
("POST", "http://mas/token"): DummyResponse({"access_token": "admintoken"}),
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"): DummyResponse(
{"data": {"id": "seed"}}
responses = [
(("POST", "http://mas/token"), DummyResponse({"access_token": "admintoken"})),
(
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"),
DummyResponse({"data": {"id": "seed"}}),
),
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
{"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}
(
("POST", "http://mas/api/admin/v1/personal-sessions"),
DummyResponse({"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}),
),
("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"): DummyResponse({}),
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"): DummyResponse(
{"room_id": "room1"}
(("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"), DummyResponse({})),
(
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"),
DummyResponse({"room_id": "room1"}),
),
("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"): DummyResponse(
{"chunk": []}
(("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"), DummyResponse({"chunk": []})),
(
("GET", "http://mas/api/admin/v1/users?page[size]=100"),
DummyResponse(
{
"data": [
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
]
}
),
),
("GET", "http://mas/api/admin/v1/users?page[size]=100"): DummyResponse(
{
"data": [
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
]
}
(
("POST", "http://mas/api/admin/v1/personal-sessions"),
DummyResponse({"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}),
),
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
{"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}
(
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"),
DummyResponse({"displayname": None}),
),
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"): DummyResponse(
{"displayname": None}
(("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"), DummyResponse({})),
(
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"),
DummyResponse(
{
"users": [
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
]
}
),
),
("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"): DummyResponse({}),
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse(
{
"users": [
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
]
}
(("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
(
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"),
DummyResponse({"displayname": None}),
),
("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse(
{"displayname": None}
),
("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"): DummyResponse({}),
}
(("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"), DummyResponse({})),
(("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"), DummyResponse({})),
]
response_queues: dict[tuple[str, str], list[DummyResponse]] = {}
for key, response in responses:
response_queues.setdefault(key, []).append(response)
def handler(method, url, **_kwargs):
resp = responses.get((method, url))
if resp is None:
queue = response_queues.get((method, url))
if not queue:
return DummyResponse({})
return resp
return queue.pop(0)
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0)

View File

@ -129,3 +129,95 @@ def test_fetchone_and_fetchall_return_dicts(monkeypatch) -> None:
db = Database("postgresql://user:pass@localhost/db")
assert db.fetchone("fetchone") == {"id": 1}
assert db.fetchall("fetchall") == [{"id": 1}, {"id": 2}]
def test_database_passes_config_to_pool(monkeypatch) -> None:
captured = {}
class CapturePool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
captured.update(
{
"conninfo": conninfo,
"min_size": min_size,
"max_size": max_size,
"kwargs": kwargs,
}
)
super().__init__(conninfo=conninfo, min_size=min_size, max_size=max_size, kwargs=kwargs)
monkeypatch.setattr(db_module, "ConnectionPool", CapturePool)
config = db_module.DatabaseConfig(
pool_min=1,
pool_max=7,
connect_timeout_sec=9,
lock_timeout_sec=11,
statement_timeout_sec=13,
idle_in_tx_timeout_sec=15,
application_name="ariadne-test",
)
Database("postgresql://user:pass@localhost/db", config)
assert captured["conninfo"] == "postgresql://user:pass@localhost/db"
assert captured["min_size"] == 1
assert captured["max_size"] == 7
assert captured["kwargs"]["connect_timeout"] == 9
assert captured["kwargs"]["application_name"] == "ariadne-test"
assert "lock_timeout=11s" in captured["kwargs"]["options"]
def test_migrate_returns_when_advisory_lock_is_unavailable(monkeypatch) -> None:
class NoLockConn(DummyConn):
def execute(self, query, params=None):
if "pg_try_advisory_lock" in query:
return DummyResult(row=(False,))
return super().execute(query, params)
class NoLockPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = NoLockConn()
monkeypatch.setattr(db_module, "ConnectionPool", NoLockPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
def test_migrate_handles_missing_access_requests_table(monkeypatch) -> None:
class UndefinedTableConn(DummyConn):
def execute(self, query, params=None):
if "ALTER TABLE access_requests" in query:
raise db_module.psycopg.errors.UndefinedTable()
return super().execute(query, params)
class UndefinedTablePool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = UndefinedTableConn()
monkeypatch.setattr(db_module, "ConnectionPool", UndefinedTablePool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)
def test_migrate_skip_flags(monkeypatch) -> None:
monkeypatch.setattr(db_module, "ConnectionPool", DummyPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123, include_ariadne_tables=False, include_access_requests=False)
assert not any("CREATE TABLE" in query for query, _ in db._pool.conn.executed)
assert not any("ALTER TABLE access_requests" in query for query, _ in db._pool.conn.executed)
def test_unlock_swallows_errors(monkeypatch) -> None:
class UnlockConn(DummyConn):
def execute(self, query, params=None):
if "pg_advisory_unlock" in query:
raise RuntimeError("boom")
return super().execute(query, params)
class UnlockPool(DummyPool):
def __init__(self, conninfo=None, min_size=None, max_size=None, kwargs=None):
self.conn = UnlockConn()
monkeypatch.setattr(db_module, "ConnectionPool", UnlockPool)
db = Database("postgresql://user:pass@localhost/db")
db.migrate(lock_id=123)

View File

@ -63,6 +63,10 @@ def test_build_command_wraps_env() -> None:
assert "export FOO=bar" in cmd[2]
def test_build_command_accepts_string_without_env() -> None:
assert _build_command("echo hello", None) == ["/bin/sh", "-c", "echo hello"]
def test_exec_returns_output(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
@ -84,6 +88,41 @@ def test_exec_raises_on_failure(monkeypatch) -> None:
executor.exec(["false"], check=True)
def test_exec_returns_failed_result_when_check_disabled(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: DummyStream(stderr="bad", exit_code=2))
executor = PodExecutor("ns", "app=test", None)
result = executor.exec(["false"], check=False)
assert result.ok is False
assert result.exit_code == 2
def test_exec_uses_returncode_when_exit_code_never_arrives(monkeypatch) -> None:
class ReturncodeOnlyStream(DummyStream):
def __init__(self):
super().__init__(stdout="ok", exit_code=7)
self._exit_code_ready = False
self._updated = False
def update(self, timeout: int = 1) -> None:
self._updated = True
self._open = False
def peek_exit_code(self) -> bool:
return False
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
monkeypatch.setattr(exec_module, "stream", lambda *args, **kwargs: ReturncodeOnlyStream())
executor = PodExecutor("ns", "app=test", None)
result = executor.exec(["echo", "ok"], check=False)
assert result.exit_code == 7
assert result.ok is False
def test_exec_times_out(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "select_pod", lambda *_args, **_kwargs: PodRef("pod", "ns"))
monkeypatch.setattr(exec_module, "_ensure_client", lambda: types.SimpleNamespace(connect_get_namespaced_pod_exec=None))
@ -115,3 +154,18 @@ def test_ensure_client_fallback(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "client", types.SimpleNamespace(CoreV1Api=lambda: dummy_api))
assert exec_module._ensure_client() is dummy_api
def test_ensure_client_raises_when_kubernetes_import_failed(monkeypatch) -> None:
monkeypatch.setattr(exec_module, "_CORE_API", None)
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", RuntimeError("missing"))
with pytest.raises(RuntimeError):
exec_module._ensure_client()
def test_ensure_client_reuses_cached_client(monkeypatch) -> None:
cached = object()
monkeypatch.setattr(exec_module, "_CORE_API", cached)
monkeypatch.setattr(exec_module, "_IMPORT_ERROR", None)
assert exec_module._ensure_client() is cached

View File

@ -5,6 +5,32 @@ import pytest
from ariadne.k8s import pods as pods_module
def test_parse_start_time_handles_missing_and_invalid_values() -> None:
assert pods_module._parse_start_time(None) == 0.0
assert pods_module._parse_start_time("not-a-date") == 0.0
def test_parse_start_time_defaults_naive_value_to_utc() -> None:
assert pods_module._parse_start_time("2026-01-20T00:00:00") > 0
def test_is_ready_handles_missing_conditions_and_non_ready_states() -> None:
assert pods_module._is_ready({"status": {"phase": "Pending"}}) is False
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": "bad"}}) is False
assert pods_module._is_ready({"status": {"phase": "Running", "conditions": ["bad"]}}) is False
assert (
pods_module._is_ready(
{"status": {"phase": "Running", "conditions": [{"type": "Initialized", "status": "True"}]}}
)
is False
)
def test_list_pods_requires_namespace() -> None:
with pytest.raises(pods_module.PodSelectionError):
pods_module.list_pods("", "app=test")
def test_list_pods_encodes_selector(monkeypatch) -> None:
captured = {}
@ -17,6 +43,11 @@ def test_list_pods_encodes_selector(monkeypatch) -> None:
assert "labelSelector=app%3Dnextcloud" in captured["path"]
def test_list_pods_filters_non_dict_items(monkeypatch) -> None:
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: {"items": [{}, "bad", 1]})
assert pods_module.list_pods("demo", "app=test") == [{}]
def test_select_pod_picks_ready_latest(monkeypatch) -> None:
payload = {
"items": [
@ -57,3 +88,49 @@ def test_select_pod_ignores_non_ready(monkeypatch) -> None:
with pytest.raises(pods_module.PodSelectionError):
pods_module.select_pod("demo", "app=test")
def test_select_pod_skips_deleted_and_invalid_entries(monkeypatch) -> None:
payload = {
"items": [
{
"metadata": {"name": "deleting", "deletionTimestamp": "2026-01-20T00:00:00Z"},
"status": {
"phase": "Running",
"conditions": [{"type": "Ready", "status": "True"}],
},
},
{
"metadata": {"name": "chosen"},
"status": {
"phase": "Running",
"nodeName": "titan-24",
"startTime": "2026-01-20T00:00:00Z",
"conditions": [{"type": "Ready", "status": "True"}],
},
},
]
}
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
pod = pods_module.select_pod("demo", "app=test")
assert pod.name == "chosen"
assert pod.node == "titan-24"
def test_select_pod_skips_blank_names(monkeypatch) -> None:
payload = {
"items": [
{
"metadata": {"name": " "},
"status": {
"phase": "Running",
"conditions": [{"type": "Ready", "status": "True"}],
},
}
]
}
monkeypatch.setattr(pods_module, "get_json", lambda *_args, **_kwargs: payload)
with pytest.raises(pods_module.PodSelectionError):
pods_module.select_pod("demo", "app=test")

View File

@ -244,7 +244,7 @@ def test_find_user_by_email_skips_non_dict(monkeypatch) -> None:
assert client.find_user_by_email("alice@example.com") is None
def test_get_user_invalid_payload(monkeypatch) -> None:
def test_get_user_invalid_payload_duplicate(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
keycloak_admin_url="http://kc",
keycloak_admin_realm="atlas",
@ -299,7 +299,7 @@ def test_update_user_safe_handles_bad_attrs(monkeypatch) -> None:
assert captured["payload"]["attributes"] == {"new": ["item"]}
def test_set_user_attribute_user_id_missing(monkeypatch) -> None:
def test_set_user_attribute_user_id_missing_duplicate(monkeypatch) -> None:
client = KeycloakAdminClient()
def fake_find_user(username: str) -> dict[str, Any]:

View File

@ -2,7 +2,25 @@ from __future__ import annotations
from datetime import datetime, timezone
from ariadne.metrics.metrics import set_access_request_counts, set_cluster_state_metrics
from ariadne.metrics.metrics import (
record_schedule_state,
record_task_run,
set_access_request_counts,
set_cluster_state_metrics,
)
def test_record_task_run_accepts_missing_duration() -> None:
record_task_run("demo", "ok", None)
def test_record_task_run_records_duration() -> None:
record_task_run("demo", "ok", 1.5)
def test_record_schedule_state_tracks_success_and_failure() -> None:
record_schedule_state("demo", 10.0, 8.0, 20.0, True)
record_schedule_state("demo", 11.0, None, None, False)
def test_set_access_request_counts() -> None:
@ -11,3 +29,7 @@ def test_set_access_request_counts() -> None:
def test_set_cluster_state_metrics() -> None:
set_cluster_state_metrics(datetime.now(timezone.utc), 4, 3, 12.0, 1)
def test_set_cluster_state_metrics_allows_missing_values() -> None:
set_cluster_state_metrics(datetime.now(timezone.utc), None, None, None, None)

79
tests/test_migrate.py Normal file
View File

@ -0,0 +1,79 @@
from __future__ import annotations
import types
import ariadne.migrate as migrate_module
class DummyDatabase:
def __init__(self, name: str):
self.name = name
self.calls: list[tuple[int, bool, bool]] = []
self.closed = False
def migrate(self, lock_id: int, *, include_ariadne_tables: bool, include_access_requests: bool) -> None:
self.calls.append((lock_id, include_ariadne_tables, include_access_requests))
def close(self) -> None:
self.closed = True
def test_build_db_uses_settings(monkeypatch) -> None:
captured = {}
monkeypatch.setattr(
migrate_module,
"settings",
types.SimpleNamespace(
ariadne_db_pool_min=1,
ariadne_db_pool_max=3,
ariadne_db_connect_timeout_sec=5,
ariadne_db_lock_timeout_sec=7,
ariadne_db_statement_timeout_sec=11,
ariadne_db_idle_in_tx_timeout_sec=13,
),
)
def fake_database(dsn, config):
captured["dsn"] = dsn
captured["config"] = config
return DummyDatabase("ariadne")
monkeypatch.setattr(migrate_module, "Database", fake_database)
migrate_module._build_db("postgresql://db", "ariadne_migrate")
assert captured["dsn"] == "postgresql://db"
assert captured["config"].application_name == "ariadne_migrate"
assert captured["config"].lock_timeout_sec == 7
def test_main_returns_when_migrations_disabled(monkeypatch) -> None:
monkeypatch.setattr(
migrate_module,
"settings",
types.SimpleNamespace(ariadne_run_migrations=False),
)
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("should not build")))
migrate_module.main()
def test_main_runs_ariadne_and_portal_migrations(monkeypatch) -> None:
ariadne_db = DummyDatabase("ariadne")
portal_db = DummyDatabase("portal")
databases = [ariadne_db, portal_db]
monkeypatch.setattr(
migrate_module,
"settings",
types.SimpleNamespace(
ariadne_run_migrations=True,
ariadne_database_url="postgresql://ariadne",
portal_database_url="postgresql://portal",
),
)
monkeypatch.setattr(migrate_module, "_build_db", lambda *_args, **_kwargs: databases.pop(0))
migrate_module.main()
assert ariadne_db.calls == [(migrate_module.ARIADNE_MIGRATION_LOCK_ID, True, False)]
assert portal_db.calls == [(migrate_module.PORTAL_MIGRATION_LOCK_ID, False, True)]
assert ariadne_db.closed is True
assert portal_db.closed is True

View File

@ -595,7 +595,7 @@ def test_provisioning_sync_errors(monkeypatch) -> None:
assert outcome.status == "accounts_building"
def test_provisioning_run_loop_processes_candidates(monkeypatch) -> None:
def test_provisioning_run_loop_processes_candidates_duplicate(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(provision_poll_interval_sec=0.0)
monkeypatch.setattr(prov, "settings", dummy_settings)
_patch_mailu_ready(monkeypatch, dummy_settings)
@ -652,7 +652,7 @@ def test_provisioning_run_loop_waits_for_admin(monkeypatch) -> None:
manager._run_loop()
def test_provisioning_missing_verified_email(monkeypatch) -> None:
def test_provisioning_missing_verified_email_duplicate(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
mailu_domain="bstein.dev",
mailu_sync_url="",

View File

@ -0,0 +1,104 @@
from __future__ import annotations
import json
from pathlib import Path
import urllib.request
import scripts.publish_test_metrics as metrics_script
class DummyHTTPResponse:
def __init__(self, body: str, status: int = 200):
self._body = body
self.status = status
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def read(self) -> bytes:
return self._body.encode("utf-8")
def test_build_payload_includes_quality_gate_metrics(monkeypatch, tmp_path: Path) -> None:
coverage_path = tmp_path / "coverage.json"
coverage_path.write_text(json.dumps({"summary": {"percent_covered": 97.5}}), encoding="utf-8")
junit_path = tmp_path / "junit.xml"
junit_path.write_text('<testsuite tests="5" failures="1" errors="1" skipped="1"/>', encoding="utf-8")
quality_gate_path = tmp_path / "quality-gate.json"
quality_gate_path.write_text(
json.dumps(
{
"status": "ok",
"summary": {
"line_count_violations": 0,
"docstring_violations": 1,
"coverage_violations": 0,
"legacy_line_count_files": 2,
"legacy_docstring_files": 3,
"coverage_exemptions": 4,
},
"files": {
"ariadne/auth/keycloak.py": {
"lines": 120,
"missing_docstrings": 0,
"coverage_percent": 99.1,
}
},
}
),
encoding="utf-8",
)
monkeypatch.setenv("COVERAGE_JSON", str(coverage_path))
monkeypatch.setenv("JUNIT_XML", str(junit_path))
monkeypatch.setenv("QUALITY_GATE_JSON", str(quality_gate_path))
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
monkeypatch.setenv("SUITE_NAME", "ariadne")
monkeypatch.setenv("QUALITY_STATUS", "ok")
monkeypatch.setenv("BUILD_NUMBER", "42")
monkeypatch.setenv("BRANCH_NAME", "main")
monkeypatch.setenv("GIT_COMMIT", "abc123")
monkeypatch.setattr(
metrics_script,
"_read_http",
lambda _url: 'platform_quality_gate_runs_total{job="platform-quality-ci",suite="ariadne",status="ok"} 5\n',
)
payload, result = metrics_script._build_payload()
assert result["tests_total"] == 5
assert result["tests_passed"] == 2
assert result["coverage_percent"] == 97.5
assert result["ok_counter"] == 6.0
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="quality_gate_json"} 1' in payload
assert 'ariadne_quality_gate_violation_total{suite="ariadne",check="docstrings"} 1' in payload
assert 'ariadne_quality_gate_file_coverage_percent{suite="ariadne",file="ariadne/auth/keycloak.py"} 99.1' in payload
def test_main_posts_payload_for_failed_build_with_missing_artifacts(monkeypatch, capsys) -> None:
requests: list[tuple[str, str]] = []
missing_base = Path("/tmp/ariadne-metrics-missing")
def fake_urlopen(target, timeout=10):
if isinstance(target, urllib.request.Request):
requests.append((target.full_url, target.data.decode("utf-8")))
return DummyHTTPResponse("", status=200)
return DummyHTTPResponse("", status=200)
monkeypatch.setenv("PUSHGATEWAY_URL", "http://pushgateway.example")
monkeypatch.setenv("SUITE_NAME", "ariadne")
monkeypatch.setenv("QUALITY_STATUS", "failed")
monkeypatch.setenv("COVERAGE_JSON", str(missing_base / "coverage.json"))
monkeypatch.setenv("JUNIT_XML", str(missing_base / "junit.xml"))
monkeypatch.setenv("QUALITY_GATE_JSON", str(missing_base / "quality-gate.json"))
monkeypatch.setattr(metrics_script.urllib.request, "urlopen", fake_urlopen)
assert metrics_script.main() == 0
assert requests
assert 'ariadne_quality_gate_artifact_present{suite="ariadne",artifact="coverage_json"} 0' in requests[0][1]
assert 'ariadne_quality_gate_status{suite="ariadne"} 0' in requests[0][1]
output = capsys.readouterr().out
assert '"quality_gate_status": "missing"' in output

117
tests/test_quality_gate.py Normal file
View File

@ -0,0 +1,117 @@
from __future__ import annotations
import json
from pathlib import Path
import sys
import scripts.check_quality_gate as quality_gate
def _write_quality_config(path: Path) -> None:
path.write_text(
"\n".join(
[
"[files]",
'roots = ["pkg"]',
"max_lines = 10",
"",
"[docstrings]",
'roots = ["pkg"]',
"non_trivial_min_lines = 4",
"",
"[coverage]",
'roots = ["pkg"]',
"threshold = 95.0",
'targets = ["pkg/good.py"]',
"",
"[legacy.line_count]",
'"pkg/legacy.py" = 12',
"",
"[legacy.docstrings]",
'"pkg/legacy.py" = 1',
]
)
+ "\n",
encoding="utf-8",
)
def test_build_report_accepts_legacy_exceptions(tmp_path: Path) -> None:
pkg = tmp_path / "pkg"
pkg.mkdir()
(pkg / "good.py").write_text(
'\n'.join(
[
'def documented(value: int) -> int:',
' """Return a simple incremented value.',
"",
" Inputs: an integer to increment.",
" Outputs: the incremented integer so the module stays fully documented.",
' """',
" return value + 1",
]
)
+ "\n",
encoding="utf-8",
)
(pkg / "legacy.py").write_text(
"\n".join(
[
"class Legacy:",
" pass",
]
+ ["# filler"] * 10
)
+ "\n",
encoding="utf-8",
)
coverage_path = tmp_path / "coverage.json"
coverage_path.write_text(
json.dumps({"files": {"pkg/good.py": {"summary": {"percent_covered": 99.0}}}}),
encoding="utf-8",
)
config_path = tmp_path / "quality_gate.toml"
_write_quality_config(config_path)
config = quality_gate._load_config(config_path)
report = quality_gate._build_report(
tmp_path,
config,
{"pkg/good.py": {"adjusted_percent": 99.0, "raw_percent": 99.0, "excluded_lines": []}},
True,
)
assert report["status"] == "ok"
assert report["summary"]["legacy_line_count_files"] == 1
assert report["files"]["pkg/good.py"]["coverage_enforced"] is True
def test_main_fails_when_new_violations_appear(monkeypatch, tmp_path: Path) -> None:
pkg = tmp_path / "pkg"
pkg.mkdir()
(pkg / "good.py").write_text(
"\n".join(
[
"def undocumented(value):",
" total = value + 1",
" total += 1",
" total += 1",
" return total",
]
)
+ "\n",
encoding="utf-8",
)
config_path = tmp_path / "quality_gate.toml"
_write_quality_config(config_path)
monkeypatch.chdir(tmp_path)
monkeypatch.setattr(
sys,
"argv",
["check_quality_gate.py", "--config", "quality_gate.toml", "--coverage-json", "missing.json", "--output", "out.json"],
)
assert quality_gate.main() == 1
report = json.loads((tmp_path / "out.json").read_text(encoding="utf-8"))
assert report["summary"]["docstring_violations"] >= 1
assert report["summary"]["coverage_violations"] >= 1

View File

@ -7,6 +7,7 @@ import httpx
from ariadne.services.mailu import MailuService
from ariadne.utils.errors import safe_error_detail
from ariadne.utils.http import extract_bearer_token
from ariadne.utils.name_generator import NameGenerator
from ariadne.utils.passwords import random_password
@ -81,6 +82,22 @@ def test_safe_error_detail_timeout() -> None:
assert safe_error_detail(exc, "fallback") == "timeout"
def test_safe_error_detail_runtime_blank_uses_fallback() -> None:
assert safe_error_detail(RuntimeError(" "), "fallback") == "fallback"
def test_safe_error_detail_http_status_without_body() -> None:
request = httpx.Request("GET", "https://example.com")
response = httpx.Response(403, request=request)
exc = httpx.HTTPStatusError("bad", request=request, response=response)
assert safe_error_detail(exc, "fallback") == "http 403"
def test_safe_error_detail_unknown_exception_uses_fallback() -> None:
assert safe_error_detail(ValueError("boom"), "fallback") == "fallback"
def test_extract_bearer_token() -> None:
request = DummyRequest({"Authorization": "Bearer token123"})
assert extract_bearer_token(request) == "token123"
@ -94,3 +111,24 @@ def test_extract_bearer_token_invalid() -> None:
def test_extract_bearer_token_missing_parts() -> None:
request = DummyRequest({"Authorization": "Bearer"})
assert extract_bearer_token(request) is None
def test_name_generator_unique_adds_candidate(monkeypatch) -> None:
generator = NameGenerator(max_attempts=2)
monkeypatch.setattr(NameGenerator, "generate", lambda self: "atlas-guest")
existing = {"taken"}
assert generator.unique(existing) == "atlas-guest"
assert "atlas-guest" in existing
def test_name_generator_unique_returns_none_after_retries(monkeypatch) -> None:
generator = NameGenerator(max_attempts=2)
monkeypatch.setattr(NameGenerator, "generate", lambda self: "taken")
assert generator.unique({"taken"}) is None
def test_name_generator_generate_normalizes_words(monkeypatch) -> None:
monkeypatch.setattr("ariadne.utils.name_generator.coolname.generate", lambda words: [" Atlas ", "", "Guest"])
generator = NameGenerator(words=3)
assert generator.generate() == "atlas-guest"