Compare commits

..

No commits in common. "63a64661ecce28455c51bd8bb747da0526ba060a" and "f95c51e7f535a575eacc2b354be83fe0c0ff77b6" have entirely different histories.

35 changed files with 33 additions and 188 deletions

View File

@ -422,23 +422,17 @@ def _shutdown() -> None:
@app.get("/health")
def health() -> dict[str, Any]:
"""Return a minimal liveness response for probes and operators."""
return {"ok": True}
@app.get(settings.metrics_path)
def metrics() -> Response:
"""Expose Prometheus metrics generated by Ariadne runtime tasks."""
payload = generate_latest()
return Response(payload, media_type=CONTENT_TYPE_LATEST)
@app.get("/api/admin/access/requests")
def list_access_requests(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Return pending access requests for authenticated administrators."""
_require_admin(ctx)
logger.info(
"list access requests",
@ -469,8 +463,6 @@ def list_access_requests(ctx: AuthContext = Depends(_require_auth)) -> JSONRespo
@app.get("/api/admin/access/flags")
def list_access_flags(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Return Keycloak groups that can be applied as access-request flags."""
_require_admin(ctx)
flags = settings.allowed_flag_groups
if keycloak_admin.ready():
@ -487,8 +479,6 @@ def list_audit_events(
event_type: str | None = None,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
"""Return recent audit events with optional type filtering."""
_require_admin(ctx)
try:
rows = storage.list_events(limit=limit, event_type=event_type)
@ -516,8 +506,6 @@ def list_audit_task_runs(
task: str | None = None,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
"""Return recorded background task runs for admin audit views."""
_require_admin(ctx)
try:
rows = storage.list_task_runs(limit=limit, request_code=request_code, task=task)
@ -545,8 +533,6 @@ def list_audit_task_runs(
@app.get("/api/admin/cluster/state")
def get_cluster_state(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Return the latest cluster-state snapshot to authenticated administrators."""
_require_admin(ctx)
snapshot = storage.latest_cluster_state()
if not snapshot:
@ -556,8 +542,6 @@ def get_cluster_state(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse
@app.get("/api/internal/cluster/state")
def get_cluster_state_internal() -> JSONResponse:
"""Return the latest cluster-state snapshot for trusted internal callers."""
snapshot = storage.latest_cluster_state()
if not snapshot:
raise HTTPException(status_code=404, detail="cluster state unavailable")
@ -570,8 +554,6 @@ async def approve_access_request(
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
"""Approve a verified access request and start account provisioning."""
_require_admin(ctx)
with task_context("admin.access.approve"):
payload = await _read_json_payload(request)
@ -650,8 +632,6 @@ async def deny_access_request(
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
"""Deny a pending access request and record the administrator decision."""
_require_admin(ctx)
with task_context("admin.access.deny"):
payload = await _read_json_payload(request)
@ -712,8 +692,6 @@ async def deny_access_request(
@app.post("/api/access/requests/{request_code}/retry")
def retry_access_request(request_code: str) -> JSONResponse:
"""Reset failed provisioning tasks so an approved request can retry."""
code = (request_code or "").strip()
if not code:
raise HTTPException(status_code=400, detail="request_code is required")
@ -770,8 +748,6 @@ def retry_access_request(request_code: str) -> JSONResponse:
@app.post("/api/account/mailu/rotate")
def rotate_mailu_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Rotate the caller's Mailu app password and trigger dependent syncs."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -868,8 +844,6 @@ def rotate_mailu_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResp
@app.post("/api/account/wger/reset")
def reset_wger_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Reset the caller's Wger password and synchronize the service account."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -897,8 +871,6 @@ def reset_wger_password(ctx: AuthContext = Depends(_require_auth)) -> JSONRespon
@app.post("/api/account/firefly/reset")
def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Reset the caller's Firefly password and synchronize the service account."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -926,8 +898,6 @@ def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONRes
@app.post("/api/account/firefly/rotation/check")
def firefly_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Check whether the caller's Firefly password rotation is healthy."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -945,8 +915,6 @@ def firefly_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONRes
@app.post("/api/account/wger/rotation/check")
def wger_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Check whether the caller's Wger password rotation is healthy."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -964,8 +932,6 @@ def wger_rotation_check(ctx: AuthContext = Depends(_require_auth)) -> JSONRespon
@app.post("/api/account/nextcloud/mail/sync")
async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
"""Synchronize the caller's Mailu address into Nextcloud mail settings."""
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
@ -1042,7 +1008,5 @@ async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_requ
@app.post("/events")
def mailu_event_listener(payload: dict[str, Any] | None = Body(default=None)) -> Response:
"""Accept Mailu webhook events and dispatch mapped account actions."""
status_code, response = mailu_events.handle_event(payload)
return JSONResponse(response, status_code=status_code)

View File

@ -19,8 +19,6 @@ class AuthContext:
class KeycloakOIDC:
"""Validate Keycloak-issued OIDC tokens and return trusted claims."""
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
self._jwks_url = jwks_url
self._issuer = issuer
@ -57,18 +55,12 @@ class KeycloakOIDC:
def _decode_claims(self, token: str, key: dict[str, Any]) -> dict[str, Any]:
return jwt.decode(
token,
key=self._key_from_jwk(key),
key=jwt.algorithms.RSAAlgorithm.from_jwk(key),
algorithms=["RS256"],
options={"verify_aud": False},
issuer=self._issuer,
)
def _key_from_jwk(self, key: dict[str, Any]) -> Any:
algorithm = getattr(jwt.algorithms, "RSAAlgorithm", None)
if algorithm and hasattr(algorithm, "from_jwk"):
return algorithm.from_jwk(key)
return jwt.PyJWK.from_dict(key).key
def _validate_audience(self, claims: dict[str, Any]) -> None:
azp = claims.get("azp")
aud = claims.get("aud")
@ -105,8 +97,6 @@ class KeycloakOIDC:
class Authenticator:
"""Translate bearer tokens into Ariadne authorization context."""
def __init__(self) -> None:
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)

View File

@ -25,8 +25,6 @@ class DatabaseConfig:
class Database:
"""Small Postgres wrapper with migration and query helpers."""
def __init__(self, dsn: str, config: DatabaseConfig | None = None) -> None:
if not dsn:
raise RuntimeError("database URL is required")

View File

@ -62,8 +62,6 @@ class ScheduleState:
class Storage:
"""Persist Ariadne access requests, task state, and audit data."""
def __init__(self, db: Database, portal_db: Database | None = None) -> None:
self._db = db
self._portal_db = portal_db or db

View File

@ -35,8 +35,6 @@ def _k8s_request(method: str, path: str, payload: dict[str, Any] | None = None)
def get_json(path: str) -> dict[str, Any]:
"""Fetch a Kubernetes API path and return its JSON object payload."""
payload = _k8s_request("GET", path)
if not isinstance(payload, dict):
raise RuntimeError("unexpected kubernetes response")
@ -44,8 +42,6 @@ def get_json(path: str) -> dict[str, Any]:
def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
"""Post a JSON payload to the Kubernetes API and return the response."""
data = _k8s_request("POST", path, payload)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
@ -53,8 +49,6 @@ def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
def delete_json(path: str) -> dict[str, Any]:
"""Delete a Kubernetes API resource and return the response payload."""
data = _k8s_request("DELETE", path)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
@ -62,8 +56,6 @@ def delete_json(path: str) -> dict[str, Any]:
def get_secret_value(namespace: str, name: str, key: str) -> str:
"""Read and decode one string value from a Kubernetes Secret."""
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
raw = blob.get(key)

View File

@ -65,8 +65,6 @@ def _build_command(command: list[str] | str, env: dict[str, str] | None) -> list
class PodExecutor:
"""Run shell commands inside the freshest ready pod matching a selector."""
def __init__(self, namespace: str, label_selector: str, container: str | None = None) -> None:
self._namespace = namespace
self._label_selector = label_selector

View File

@ -47,8 +47,6 @@ def _is_ready(pod: dict[str, Any]) -> bool:
def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
"""List Kubernetes pods for a namespace and label selector."""
namespace = (namespace or "").strip()
if not namespace:
raise PodSelectionError("pod namespace missing")
@ -60,8 +58,6 @@ def list_pods(namespace: str, label_selector: str) -> list[dict[str, Any]]:
def select_pod(namespace: str, label_selector: str) -> PodRef:
"""Select the newest ready pod matching a namespace and label selector."""
pods = list_pods(namespace, label_selector)
candidates: list[tuple[float, PodRef]] = []
for pod in pods:

View File

@ -94,8 +94,6 @@ def _extract_attr(attrs: Any, key: str) -> str:
class ProvisioningManager:
"""Coordinate approved access requests across identity and app services."""
def __init__(self, db: Database, storage: Storage) -> None:
self._db = db
self._storage = storage

View File

@ -72,8 +72,6 @@ CLUSTER_STATE_KUSTOMIZATIONS_NOT_READY = Gauge(
def record_task_run(task: str, status: str, duration_sec: float | None) -> None:
"""Increment task counters and duration histograms for one run."""
TASK_RUNS_TOTAL.labels(task=task, status=status).inc()
if duration_sec is not None:
TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec)
@ -86,8 +84,6 @@ def record_schedule_state(
next_run_ts: float | None,
ok: bool | None,
) -> None:
"""Publish the latest scheduler timestamps and status for a task."""
if last_run_ts:
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
if last_success_ts:
@ -101,8 +97,6 @@ def record_schedule_state(
def set_access_request_counts(counts: dict[str, int]) -> None:
"""Set access-request gauges grouped by lifecycle status."""
for status, count in counts.items():
ACCESS_REQUESTS.labels(status=status).set(count)
@ -114,8 +108,6 @@ def set_cluster_state_metrics(
pods_running: float | None,
kustomizations_not_ready: int | None,
) -> None:
"""Set cluster-state gauges from the most recent collector snapshot."""
CLUSTER_STATE_LAST_TS.set(collected_at.timestamp())
if nodes_total is not None:
CLUSTER_STATE_NODES_TOTAL.set(nodes_total)

View File

@ -24,8 +24,6 @@ def _build_db(dsn: str, application_name: str) -> Database:
def main() -> None:
"""Run configured Ariadne and portal database migrations."""
if not settings.ariadne_run_migrations:
return

View File

@ -22,8 +22,6 @@ class CronTask:
class CronScheduler:
"""Run named cron tasks while recording schedule state and outcomes."""
def __init__(self, storage: Storage, tick_sec: float = 5.0) -> None:
self._storage = storage
self._tick_sec = tick_sec

View File

@ -3521,8 +3521,6 @@ def _build_attention_ranked(
def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
"""Collect Kubernetes, Flux, Longhorn, and metric context into one snapshot."""
errors: list[str] = []
collected_at = datetime.now(timezone.utc)
@ -3695,8 +3693,6 @@ def collect_cluster_state() -> tuple[dict[str, Any], ClusterStateSummary]:
def run_cluster_state(storage: Storage) -> ClusterStateSummary:
"""Collect cluster state, persist it, and prune old stored snapshots."""
snapshot, summary = collect_cluster_state()
try:
storage.record_cluster_state(snapshot)

View File

@ -88,8 +88,6 @@ def _needs_rename_display(display: str | None) -> bool:
class CommsService:
"""Maintain Matrix/MAS guest naming and pruning hygiene."""
def __init__(
self,
client_factory: type[httpx.Client] = httpx.Client,

View File

@ -498,8 +498,6 @@ def _rotation_check_input(username: str) -> tuple[FireflySyncInput | UserSyncOut
class FireflyService:
"""Synchronize Keycloak users and password rotations into Firefly."""
def __init__(self) -> None:
self._executor = PodExecutor(
settings.firefly_namespace,

View File

@ -107,8 +107,6 @@ sleep infinity
class ImageSweeperService:
"""Create Kubernetes cleanup jobs that prune stale node images."""
def _job_payload(self, job_name: str) -> dict[str, Any]:
job: dict[str, Any] = {
"apiVersion": "batch/v1",

View File

@ -9,8 +9,6 @@ from ..settings import settings
class KeycloakAdminClient:
"""Call the Keycloak admin API for user, group, and attribute updates."""
def __init__(self) -> None:
self._token: str = ""
self._expires_at: float = 0.0

View File

@ -29,8 +29,6 @@ def _profile_complete(user: dict[str, Any]) -> bool:
def run_profile_sync() -> ProfileSyncSummary:
"""Clear completed Keycloak profile actions once required fields exist."""
if not keycloak_admin.ready():
summary = ProfileSyncSummary(0, 0, 0, 1, detail="keycloak admin not configured")
logger.info(

View File

@ -19,8 +19,6 @@ class SentEmail:
class Mailer:
"""Send onboarding and notification email through configured SMTP."""
def __init__(self) -> None:
self._host = settings.smtp_host
self._port = settings.smtp_port

View File

@ -115,8 +115,6 @@ def _password_too_long(password: str) -> bool:
class MailuService:
"""Synchronize Keycloak user mail settings into Mailu storage."""
def __init__(self) -> None:
self._db_config = {
"host": settings.mailu_db_host,

View File

@ -54,8 +54,6 @@ def _event_context(payload: dict[str, Any] | None) -> dict[str, Any]:
class MailuEventRunner:
"""Debounce Keycloak events into Mailu synchronization runs."""
def __init__(
self,
min_interval_sec: float,

View File

@ -39,8 +39,6 @@ def _normalize_payload(payload: Any) -> dict[str, Any]:
class MetisService:
"""Trigger Metis sentinel watch runs and normalize their response."""
def ready(self) -> bool:
return bool(_watch_url())

View File

@ -106,8 +106,6 @@ class MailSyncCounters:
class NextcloudService:
"""Synchronize user mail configuration inside the Nextcloud pod."""
def __init__(self) -> None:
self._executor = PodExecutor(
settings.nextcloud_namespace,

View File

@ -24,8 +24,6 @@ HTTP_NOT_FOUND = 404
def parse_size(value: str) -> int:
"""Convert OpenSearch CAT index size text into bytes."""
if not value:
return 0
text = value.strip().lower()
@ -67,8 +65,6 @@ def _delete_index(client: httpx.Client, index: str) -> None:
def prune_indices() -> OpensearchPruneSummary:
"""Delete old OpenSearch indices until usage is under the configured limit."""
patterns = [p.strip() for p in settings.opensearch_index_patterns.split(",") if p.strip()]
if not patterns:
return OpensearchPruneSummary(0, 0, 0, detail="no patterns configured")

View File

@ -28,8 +28,6 @@ def _delete_pod(namespace: str, name: str) -> None:
def clean_finished_pods() -> PodCleanerSummary:
"""Delete succeeded and failed pods across namespaces."""
deleted = 0
skipped = 0
failures = 0

View File

@ -303,8 +303,6 @@ path "kv/data/atlas/shared/*" {
class VaultClient:
"""Minimal HTTP client for Vault API requests."""
def __init__(self, base_url: str, token: str | None = None) -> None:
self._base_url = base_url.rstrip("/")
self._token = token
@ -323,8 +321,6 @@ class VaultClient:
class VaultService:
"""Ensure Vault is initialized, unsealed, and configured for Atlas access."""
def __init__(self) -> None:
self._token: str | None = None

View File

@ -33,8 +33,6 @@ class VaultwardenLookup:
class VaultwardenService:
"""Invite eligible users to Vaultwarden through the admin interface."""
def __init__(self) -> None:
self._admin_lock = threading.Lock()
self._admin_client: httpx.Client | None = None

View File

@ -297,8 +297,6 @@ def _sync_user(
def run_vaultwarden_sync() -> VaultwardenSyncSummary:
"""Process pending Vaultwarden invite failures until the queue is healthy."""
consecutive_failures = 0
counters = VaultwardenSyncCounters()

View File

@ -446,8 +446,6 @@ def _rotation_check_input(username: str) -> tuple[WgerSyncInput | UserSyncOutcom
class WgerService:
"""Synchronize Keycloak users and password rotations into Wger."""
def __init__(self) -> None:
self._executor = PodExecutor(
settings.wger_namespace,

View File

@ -39,8 +39,6 @@ def _http_error_detail(exc: httpx.HTTPStatusError) -> str:
def safe_error_detail(exc: Exception, fallback: str) -> str:
"""Return a user-safe error message without leaking noisy exception internals."""
runtime_detail = _runtime_error_detail(exc)
if runtime_detail:
return runtime_detail

View File

@ -7,8 +7,6 @@ _BEARER_PARTS = 2
def extract_bearer_token(request: Request) -> str | None:
"""Extract a Bearer token from a FastAPI request if one is present."""
header = request.headers.get("Authorization", "")
if not header:
return None

View File

@ -42,8 +42,6 @@ class LogConfig:
class JsonFormatter(logging.Formatter):
"""Format log records as structured JSON with Ariadne task context."""
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
@ -89,8 +87,6 @@ class _ContextFilter(logging.Filter):
def configure_logging(config: LogConfig | None = None) -> None:
"""Configure process-wide JSON logging once for Ariadne services."""
global _LOGGING_CONFIGURED
if _LOGGING_CONFIGURED:
return
@ -113,15 +109,11 @@ def configure_logging(config: LogConfig | None = None) -> None:
def get_logger(name: str) -> logging.Logger:
"""Return a named logger using the shared Ariadne logging configuration."""
return logging.getLogger(name)
@contextmanager
def task_context(name: str | None) -> Any:
"""Attach a task name to log records emitted inside the context."""
token = _TASK_NAME.set(name)
try:
yield

View File

@ -5,7 +5,5 @@ import string
def random_password(length: int = 32) -> str:
"""Generate a random alphanumeric password with the requested length."""
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))

View File

@ -8,45 +8,29 @@ import ast
from pathlib import Path
def _is_dataclass_class(node: ast.ClassDef) -> bool:
"""Return whether a class uses the dataclass decorator."""
return any(
(isinstance(dec, ast.Name) and dec.id == "dataclass")
or (isinstance(dec, ast.Call) and isinstance(dec.func, ast.Name) and dec.func.id == "dataclass")
for dec in node.decorator_list
)
def _base_names(node: ast.ClassDef) -> set[str]:
"""Return simple base class names used by a class definition."""
return {base.id for base in node.bases if isinstance(base, ast.Name)}
def _needs_function_docstring(node: ast.FunctionDef | ast.AsyncFunctionDef, parent_class: str | None) -> bool:
"""Return whether a public function-like node needs a docstring."""
if node.name.startswith("_") and node.name != "__init__":
return False
return not (parent_class and node.name.startswith("_"))
def _needs_class_docstring(node: ast.ClassDef) -> bool:
"""Return whether a public class-like node needs a docstring."""
bases = _base_names(node)
skipped_bases = {"Exception", "RuntimeError", "BaseException", "BaseModel"}
return not (node.name.startswith("_") or _is_dataclass_class(node) or bool(bases.intersection(skipped_bases)))
def _needs_docstring(node: ast.AST, *, parent_class: str | None = None) -> bool:
"""Return whether `node` should carry an API contract docstring."""
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
return _needs_function_docstring(node, parent_class)
name = node.name
if name.startswith("_") and name != "__init__":
return False
return not (parent_class and name.startswith("_"))
if isinstance(node, ast.ClassDef):
return _needs_class_docstring(node)
if node.name.startswith("_"):
return False
if any(
(isinstance(dec, ast.Name) and dec.id == "dataclass")
or (isinstance(dec, ast.Call) and isinstance(dec.func, ast.Name) and dec.func.id == "dataclass")
for dec in node.decorator_list
):
return False
if any(
isinstance(base, ast.Name) and base.id in {"Exception", "RuntimeError", "BaseException"}
for base in node.bases
):
return False
return not any(isinstance(base, ast.Name) and base.id == "BaseModel" for base in node.bases)
return False

View File

@ -204,11 +204,19 @@ def _supply_chain_check_status(build_dir: Path) -> str:
return "failed"
def _resolve_artifact_paths(repo_root: Path) -> tuple[Path, Path]:
"""Find coverage and JUnit artifacts even when a test runner uses fallback names."""
def main() -> int:
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
coverage_path = Path(os.getenv("COVERAGE_JSON", "build/coverage.json"))
junit_path = Path(os.getenv("JUNIT_XML", "build/junit.xml"))
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
).strip()
suite = os.getenv("SUITE_NAME", "ariadne")
branch = os.getenv("BRANCH_NAME", "")
build_number = os.getenv("BUILD_NUMBER", "")
commit = os.getenv("GIT_COMMIT", "")
if not coverage_path.exists():
for candidate in (
repo_root / "build" / "coverage.json",
@ -222,21 +230,6 @@ def _resolve_artifact_paths(repo_root: Path) -> tuple[Path, Path]:
junit_candidates = sorted((repo_root / "build").glob("junit*.xml"))
if junit_candidates:
junit_path = junit_candidates[0]
return coverage_path, junit_path
def main() -> int:
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
coverage_path, junit_path = _resolve_artifact_paths(repo_root)
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
).strip()
suite = os.getenv("SUITE_NAME", "ariadne")
branch = os.getenv("BRANCH_NAME", "")
build_number = os.getenv("BUILD_NUMBER", "")
commit = os.getenv("GIT_COMMIT", "")
print(f"[metrics] coverage_path={coverage_path} exists={coverage_path.exists()}")
print(f"[metrics] junit_path={junit_path} exists={junit_path.exists()}")

View File

@ -20,7 +20,7 @@ def test_keycloak_verify_accepts_matching_audience(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(kc, "_key_from_jwk", lambda key: "dummy")
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
@ -36,7 +36,7 @@ def test_keycloak_verify_rejects_wrong_audience(monkeypatch) -> None:
kc = KeycloakOIDC("https://jwks", "https://issuer", "portal")
monkeypatch.setattr(kc, "_get_jwks", lambda force=False: {"keys": [{"kid": "test"}]})
monkeypatch.setattr(kc, "_key_from_jwk", lambda key: "dummy")
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",
@ -73,7 +73,7 @@ def test_keycloak_verify_refreshes_jwks(monkeypatch) -> None:
return {"keys": [{"kid": "test"}]}
monkeypatch.setattr(kc, "_get_jwks", fake_get_jwks)
monkeypatch.setattr(kc, "_key_from_jwk", lambda key: "dummy")
monkeypatch.setattr(jwt.algorithms.RSAAlgorithm, "from_jwk", lambda key: "dummy")
monkeypatch.setattr(
jwt,
"decode",