ci(titan-iac): harden quality metric publisher

This commit is contained in:
jenkins 2026-04-21 12:24:18 -03:00
parent ae2356de6a
commit d119f838e9
3 changed files with 42 additions and 2 deletions

View File

@ -6,10 +6,14 @@ from __future__ import annotations
import json
import os
from glob import glob
from pathlib import Path
import sys
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
from ci.scripts import publish_test_metrics_quality as _quality_helpers
CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS

View File

@ -28,11 +28,29 @@ def _query(promql: str) -> list[dict]:
def _expected_tasks() -> list[dict]:
cfg = _load_config()
tasks = cfg.get("ariadne_schedule_tasks", [])
tasks = [
_normalize_task(item, cfg)
for item in cfg.get("ariadne_schedule_tasks", [])
]
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def _normalize_task(item: object, cfg: dict) -> dict:
if isinstance(item, str):
return {
"task": item,
"check_last_success": True,
"max_success_age_hours": cfg.get("max_success_age_hours", 48),
}
if isinstance(item, dict):
normalized = dict(item)
normalized.setdefault("check_last_success", True)
normalized.setdefault("max_success_age_hours", cfg.get("max_success_age_hours", 48))
return normalized
raise TypeError(f"Unsupported Ariadne schedule task config entry: {item!r}")
def _tracked_tasks(tasks: list[dict]) -> list[dict]:
tracked = [item for item in tasks if item.get("check_last_success")]
assert tracked, "No Ariadne schedule tasks are marked for success tracking"

View File

@ -27,11 +27,29 @@ def _query(promql: str) -> list[dict]:
def _expected_tasks() -> list[dict]:
cfg = _load_config()
tasks = cfg.get("ariadne_schedule_tasks", [])
tasks = [
_normalize_task(item, cfg)
for item in cfg.get("ariadne_schedule_tasks", [])
]
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def _normalize_task(item: object, cfg: dict) -> dict:
if isinstance(item, str):
return {
"task": item,
"check_last_success": True,
"max_success_age_hours": cfg.get("max_success_age_hours", 48),
}
if isinstance(item, dict):
normalized = dict(item)
normalized.setdefault("check_last_success", True)
normalized.setdefault("max_success_age_hours", cfg.get("max_success_age_hours", 48))
return normalized
raise TypeError(f"Unsupported Ariadne schedule task config entry: {item!r}")
def _tracked_tasks(tasks: list[dict]) -> list[dict]:
tracked = [item for item in tasks if item.get("check_last_success")]
assert tracked, "No Ariadne schedule tasks are marked for success tracking"