maintenance(jenkins): export build weather metrics for grafana

This commit is contained in:
Brad Stein 2026-04-13 00:25:15 -03:00
parent 27788d307f
commit 1094323f1a
6 changed files with 684 additions and 0 deletions

View File

@ -25,6 +25,7 @@ from .services.mailu import mailu
from .services.mailu_events import mailu_events from .services.mailu_events import mailu_events
from .services.nextcloud import nextcloud from .services.nextcloud import nextcloud
from .services.image_sweeper import image_sweeper from .services.image_sweeper import image_sweeper
from .services.jenkins_build_weather import collect_jenkins_build_weather
from .services.jenkins_workspace_cleanup import cleanup_jenkins_workspace_storage from .services.jenkins_workspace_cleanup import cleanup_jenkins_workspace_storage
from .services.metis import metis from .services.metis import metis
from .services.metis_token_sync import metis_token_sync from .services.metis_token_sync import metis_token_sync
@ -328,6 +329,11 @@ def _startup() -> None:
settings.platform_quality_suite_probe_cron, settings.platform_quality_suite_probe_cron,
lambda: platform_quality_probe.run(wait=True), lambda: platform_quality_probe.run(wait=True),
) )
scheduler.add_task(
"schedule.jenkins_build_weather",
settings.jenkins_build_weather_cron,
collect_jenkins_build_weather,
)
scheduler.add_task( scheduler.add_task(
"schedule.jenkins_workspace_cleanup", "schedule.jenkins_workspace_cleanup",
settings.jenkins_workspace_cleanup_cron, settings.jenkins_workspace_cleanup_cron,
@ -388,6 +394,8 @@ def _startup() -> None:
"metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron, "metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron,
"metis_k3s_token_sync_cron": settings.metis_k3s_token_sync_cron, "metis_k3s_token_sync_cron": settings.metis_k3s_token_sync_cron,
"platform_quality_suite_probe_cron": settings.platform_quality_suite_probe_cron, "platform_quality_suite_probe_cron": settings.platform_quality_suite_probe_cron,
"jenkins_build_weather_cron": settings.jenkins_build_weather_cron,
"jenkins_base_url": settings.jenkins_base_url,
"jenkins_workspace_cleanup_cron": settings.jenkins_workspace_cleanup_cron, "jenkins_workspace_cleanup_cron": settings.jenkins_workspace_cleanup_cron,
"jenkins_workspace_cleanup_dry_run": settings.jenkins_workspace_cleanup_dry_run, "jenkins_workspace_cleanup_dry_run": settings.jenkins_workspace_cleanup_dry_run,
"jenkins_workspace_cleanup_max_deletions_per_run": settings.jenkins_workspace_cleanup_max_deletions_per_run, "jenkins_workspace_cleanup_max_deletions_per_run": settings.jenkins_workspace_cleanup_max_deletions_per_run,

View File

@ -0,0 +1,413 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import threading
from typing import Any
import httpx
from prometheus_client import Counter, Gauge
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
JENKINS_BUILD_WEATHER_RUNS_TOTAL = Counter(
"ariadne_jenkins_build_weather_runs_total",
"Jenkins build weather collector runs by status",
["status"],
)
JENKINS_BUILD_WEATHER_LAST_RUN_TS = Gauge(
"ariadne_jenkins_build_weather_last_run_timestamp_seconds",
"Last Jenkins build weather collection timestamp",
)
JENKINS_BUILD_WEATHER_LAST_SUCCESS_TS = Gauge(
"ariadne_jenkins_build_weather_last_success_timestamp_seconds",
"Last successful Jenkins build weather collection timestamp",
)
JENKINS_BUILD_WEATHER_LAST_FAILURE_TS = Gauge(
"ariadne_jenkins_build_weather_last_failure_timestamp_seconds",
"Last failed Jenkins build weather collection timestamp",
)
JENKINS_BUILD_WEATHER_JOBS_TOTAL = Gauge(
"ariadne_jenkins_build_weather_jobs_total",
"Jenkins jobs observed in the latest weather collection",
["status"],
)
JENKINS_BUILD_WEATHER_JOB_LAST_RUN_TS = Gauge(
"ariadne_jenkins_build_weather_job_last_run_timestamp_seconds",
"Jenkins job last run timestamp",
["job", "job_url", "weather_icon"],
)
JENKINS_BUILD_WEATHER_JOB_LAST_SUCCESS_TS = Gauge(
"ariadne_jenkins_build_weather_job_last_success_timestamp_seconds",
"Jenkins job last success timestamp",
["job", "job_url", "weather_icon"],
)
JENKINS_BUILD_WEATHER_JOB_LAST_FAILURE_TS = Gauge(
"ariadne_jenkins_build_weather_job_last_failure_timestamp_seconds",
"Jenkins job last failure timestamp",
["job", "job_url", "weather_icon"],
)
JENKINS_BUILD_WEATHER_JOB_LAST_DURATION_SECONDS = Gauge(
"ariadne_jenkins_build_weather_job_last_duration_seconds",
"Jenkins job last build duration in seconds",
["job", "job_url", "weather_icon"],
)
JENKINS_BUILD_WEATHER_JOB_LAST_STATUS = Gauge(
"ariadne_jenkins_build_weather_job_last_status",
"Jenkins job last build status (1=success,0=failure,2=running,-1=unknown)",
["job", "job_url", "weather_icon"],
)
JENKINS_BUILD_WEATHER_JOB_HEALTH_SCORE = Gauge(
"ariadne_jenkins_build_weather_job_health_score",
"Jenkins job weather health score (0-100)",
["job", "job_url", "weather_icon"],
)
_JENKINS_JOBS_TREE = (
"jobs[name,url,color,healthReport[score],lastBuild[result,timestamp,duration],"
"lastSuccessfulBuild[timestamp],lastFailedBuild[timestamp],"
"jobs[name,url,color,healthReport[score],lastBuild[result,timestamp,duration],"
"lastSuccessfulBuild[timestamp],lastFailedBuild[timestamp]]]"
)
_STATUS_VALUES = {
"success": 1.0,
"failure": 0.0,
"running": 2.0,
"unknown": -1.0,
}
_JOB_SERIES: set[tuple[str, str, str]] = set()
_JOB_SERIES_LOCK = threading.Lock()
_JOB_METRICS = (
JENKINS_BUILD_WEATHER_JOB_LAST_RUN_TS,
JENKINS_BUILD_WEATHER_JOB_LAST_SUCCESS_TS,
JENKINS_BUILD_WEATHER_JOB_LAST_FAILURE_TS,
JENKINS_BUILD_WEATHER_JOB_LAST_DURATION_SECONDS,
JENKINS_BUILD_WEATHER_JOB_LAST_STATUS,
JENKINS_BUILD_WEATHER_JOB_HEALTH_SCORE,
)
@dataclass(frozen=True)
class JenkinsBuildWeatherJob:
job: str
job_url: str
weather_icon: str
status: str
last_run_ts: float
last_success_ts: float
last_failure_ts: float
last_duration_seconds: float
health_score: float
@property
def series_key(self) -> tuple[str, str, str]:
return (self.job, self.job_url, self.weather_icon)
@dataclass(frozen=True)
class JenkinsBuildWeatherSummary:
jobs_total: int
success_total: int
failure_total: int
running_total: int
unknown_total: int
def _metric_number(value: Any) -> float:
if isinstance(value, bool):
return 0.0
if isinstance(value, (int, float)):
return float(value)
return 0.0
def _millis_to_seconds(value: Any) -> float:
raw = _metric_number(value)
if raw <= 0:
return 0.0
return raw / 1000.0
def _jenkins_auth() -> tuple[str, str] | None:
username = settings.jenkins_api_user.strip()
token = settings.jenkins_api_token.strip()
if username and token:
return (username, token)
return None
def _jenkins_status(job: dict[str, Any]) -> str:
last_build = job.get("lastBuild") if isinstance(job.get("lastBuild"), dict) else {}
result = str(last_build.get("result") or "").upper().strip()
color = str(job.get("color") or "").lower().strip()
if color.endswith("_anime"):
return "running"
if result == "SUCCESS":
return "success"
if result in {"FAILURE", "ABORTED", "UNSTABLE", "NOT_BUILT"}:
return "failure"
if color.startswith(("blue", "green")):
return "success"
if color.startswith(("red", "yellow")):
return "failure"
return "unknown"
def _health_score(job: dict[str, Any], status: str) -> float:
reports = job.get("healthReport")
if isinstance(reports, list):
for report in reports:
if not isinstance(report, dict):
continue
score = _metric_number(report.get("score"))
if score >= 0:
return max(0.0, min(score, 100.0))
if status == "success":
return 100.0
if status == "running":
return 60.0
if status == "failure":
return 10.0
return -1.0
def _weather_icon(score: float) -> str:
if score < 0:
return ""
if score >= 80:
return "☀️"
if score >= 60:
return ""
if score >= 40:
return "☁️"
if score >= 20:
return "🌧️"
return "⛈️"
def _flatten_jobs(items: list[Any], prefix: str = "") -> list[dict[str, Any]]:
flattened: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
name = item.get("name")
if not isinstance(name, str) or not name.strip():
continue
full_name = f"{prefix}/{name}" if prefix else name
nested_jobs = item.get("jobs") if isinstance(item.get("jobs"), list) else []
if nested_jobs:
flattened.extend(_flatten_jobs(nested_jobs, prefix=full_name))
last_build = item.get("lastBuild")
if nested_jobs and not isinstance(last_build, dict):
continue
payload = dict(item)
payload["name"] = full_name
flattened.append(payload)
return flattened
def _parse_job(raw: dict[str, Any]) -> JenkinsBuildWeatherJob | None:
job = str(raw.get("name") or "").strip()
job_url = str(raw.get("url") or "").strip()
if not job or not job_url:
return None
status = _jenkins_status(raw)
score = _health_score(raw, status)
weather_icon = _weather_icon(score)
last_build = raw.get("lastBuild") if isinstance(raw.get("lastBuild"), dict) else {}
last_success = raw.get("lastSuccessfulBuild") if isinstance(raw.get("lastSuccessfulBuild"), dict) else {}
last_failure = raw.get("lastFailedBuild") if isinstance(raw.get("lastFailedBuild"), dict) else {}
return JenkinsBuildWeatherJob(
job=job,
job_url=job_url,
weather_icon=weather_icon,
status=status if status in _STATUS_VALUES else "unknown",
last_run_ts=_millis_to_seconds(last_build.get("timestamp")),
last_success_ts=_millis_to_seconds(last_success.get("timestamp")),
last_failure_ts=_millis_to_seconds(last_failure.get("timestamp")),
last_duration_seconds=_metric_number(last_build.get("duration")) / 1000.0,
health_score=score,
)
def _fetch_jobs() -> list[JenkinsBuildWeatherJob]:
base_url = settings.jenkins_base_url.strip().rstrip("/")
if not base_url:
return []
client_kwargs: dict[str, Any] = {
"timeout": settings.jenkins_api_timeout_sec,
"follow_redirects": True,
}
auth = _jenkins_auth()
if auth is not None:
client_kwargs["auth"] = auth
with httpx.Client(**client_kwargs) as client:
response = client.get(
f"{base_url}/api/json",
params={"tree": _JENKINS_JOBS_TREE},
)
response.raise_for_status()
payload = response.json()
if not isinstance(payload, dict):
raise ValueError("jenkins API returned a non-object payload")
items = payload.get("jobs") if isinstance(payload.get("jobs"), list) else []
jobs: list[JenkinsBuildWeatherJob] = []
for raw in _flatten_jobs(items):
parsed = _parse_job(raw)
if parsed is None:
continue
jobs.append(parsed)
jobs.sort(key=lambda row: row.last_run_ts, reverse=True)
return jobs
def _remove_missing_series(current_series: set[tuple[str, str, str]]) -> None:
global _JOB_SERIES
with _JOB_SERIES_LOCK:
removed = _JOB_SERIES - current_series
if removed:
for labels in removed:
for metric in _JOB_METRICS:
try:
metric.remove(*labels)
except KeyError:
pass
_JOB_SERIES = set(current_series)
def _record_jobs(jobs: list[JenkinsBuildWeatherJob]) -> JenkinsBuildWeatherSummary:
counts = {
"success": 0,
"failure": 0,
"running": 0,
"unknown": 0,
}
series: set[tuple[str, str, str]] = set()
for job in jobs:
series.add(job.series_key)
counts[job.status] = counts.get(job.status, 0) + 1
JENKINS_BUILD_WEATHER_JOB_LAST_RUN_TS.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(job.last_run_ts)
JENKINS_BUILD_WEATHER_JOB_LAST_SUCCESS_TS.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(job.last_success_ts)
JENKINS_BUILD_WEATHER_JOB_LAST_FAILURE_TS.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(job.last_failure_ts)
JENKINS_BUILD_WEATHER_JOB_LAST_DURATION_SECONDS.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(max(job.last_duration_seconds, 0.0))
JENKINS_BUILD_WEATHER_JOB_LAST_STATUS.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(_STATUS_VALUES.get(job.status, _STATUS_VALUES["unknown"]))
JENKINS_BUILD_WEATHER_JOB_HEALTH_SCORE.labels(
job=job.job,
job_url=job.job_url,
weather_icon=job.weather_icon,
).set(job.health_score)
_remove_missing_series(series)
for status in ("success", "failure", "running", "unknown"):
JENKINS_BUILD_WEATHER_JOBS_TOTAL.labels(status=status).set(counts.get(status, 0))
return JenkinsBuildWeatherSummary(
jobs_total=len(jobs),
success_total=counts.get("success", 0),
failure_total=counts.get("failure", 0),
running_total=counts.get("running", 0),
unknown_total=counts.get("unknown", 0),
)
def collect_jenkins_build_weather() -> JenkinsBuildWeatherSummary:
"""Collect Jenkins homepage job weather/status into Prometheus gauges."""
now_ts = datetime.now(timezone.utc).timestamp()
JENKINS_BUILD_WEATHER_LAST_RUN_TS.set(now_ts)
if not settings.jenkins_base_url.strip():
JENKINS_BUILD_WEATHER_RUNS_TOTAL.labels(status="skipped").inc()
summary = JenkinsBuildWeatherSummary(
jobs_total=0,
success_total=0,
failure_total=0,
running_total=0,
unknown_total=0,
)
logger.info(
"jenkins build weather skipped",
extra={
"event": "jenkins_build_weather",
"status": "skipped",
"detail": "jenkins base url is empty",
},
)
return summary
try:
jobs = _fetch_jobs()
summary = _record_jobs(jobs)
except Exception as exc:
JENKINS_BUILD_WEATHER_RUNS_TOTAL.labels(status="error").inc()
JENKINS_BUILD_WEATHER_LAST_FAILURE_TS.set(now_ts)
logger.exception(
"jenkins build weather collection failed",
extra={
"event": "jenkins_build_weather",
"status": "error",
"detail": str(exc),
},
)
raise
JENKINS_BUILD_WEATHER_RUNS_TOTAL.labels(status="ok").inc()
JENKINS_BUILD_WEATHER_LAST_SUCCESS_TS.set(now_ts)
logger.info(
"jenkins build weather collection finished",
extra={
"event": "jenkins_build_weather",
"status": "ok",
"jobs_total": summary.jobs_total,
"success_total": summary.success_total,
"failure_total": summary.failure_total,
"running_total": summary.running_total,
"unknown_total": summary.unknown_total,
},
)
return summary

View File

@ -168,6 +168,10 @@ class Settings:
platform_quality_probe_wait_timeout_sec: float platform_quality_probe_wait_timeout_sec: float
platform_quality_probe_pushgateway_url: str platform_quality_probe_pushgateway_url: str
platform_quality_probe_http_timeout_sec: int platform_quality_probe_http_timeout_sec: int
jenkins_base_url: str
jenkins_api_user: str
jenkins_api_token: str
jenkins_api_timeout_sec: float
jenkins_workspace_namespace: str jenkins_workspace_namespace: str
jenkins_workspace_pvc_prefix: str jenkins_workspace_pvc_prefix: str
jenkins_workspace_cleanup_min_age_hours: float jenkins_workspace_cleanup_min_age_hours: float
@ -239,6 +243,7 @@ class Settings:
metis_token_sync_vault_k8s_role: str metis_token_sync_vault_k8s_role: str
metis_k3s_token_sync_cron: str metis_k3s_token_sync_cron: str
platform_quality_suite_probe_cron: str platform_quality_suite_probe_cron: str
jenkins_build_weather_cron: str
jenkins_workspace_cleanup_cron: str jenkins_workspace_cleanup_cron: str
opensearch_url: str opensearch_url: str
@ -465,6 +470,15 @@ class Settings:
"platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12), "platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12),
} }
@classmethod
def _jenkins_build_weather_config(cls) -> dict[str, Any]:
return {
"jenkins_base_url": _env("JENKINS_BASE_URL", "https://ci.bstein.dev").rstrip("/"),
"jenkins_api_user": _env("JENKINS_API_USER", ""),
"jenkins_api_token": _env("JENKINS_API_TOKEN", ""),
"jenkins_api_timeout_sec": _env_float("JENKINS_API_TIMEOUT_SEC", 10.0),
}
@classmethod @classmethod
def _jenkins_workspace_cleanup_config(cls) -> dict[str, Any]: def _jenkins_workspace_cleanup_config(cls) -> dict[str, Any]:
return { return {
@ -524,6 +538,10 @@ class Settings:
"ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE", "ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE",
"*/15 * * * *", "*/15 * * * *",
), ),
"jenkins_build_weather_cron": _env(
"ARIADNE_SCHEDULE_JENKINS_BUILD_WEATHER",
"*/10 * * * *",
),
"jenkins_workspace_cleanup_cron": _env( "jenkins_workspace_cleanup_cron": _env(
"ARIADNE_SCHEDULE_JENKINS_WORKSPACE_CLEANUP", "ARIADNE_SCHEDULE_JENKINS_WORKSPACE_CLEANUP",
"45 */6 * * *", "45 */6 * * *",
@ -588,6 +606,7 @@ class Settings:
comms_cfg = cls._comms_config() comms_cfg = cls._comms_config()
image_cfg = cls._image_sweeper_config() image_cfg = cls._image_sweeper_config()
platform_quality_probe_cfg = cls._platform_quality_probe_config() platform_quality_probe_cfg = cls._platform_quality_probe_config()
jenkins_build_weather_cfg = cls._jenkins_build_weather_config()
jenkins_workspace_cleanup_cfg = cls._jenkins_workspace_cleanup_config() jenkins_workspace_cleanup_cfg = cls._jenkins_workspace_cleanup_config()
vaultwarden_cfg = cls._vaultwarden_config() vaultwarden_cfg = cls._vaultwarden_config()
schedule_cfg = cls._schedule_config() schedule_cfg = cls._schedule_config()
@ -629,6 +648,7 @@ class Settings:
**comms_cfg, **comms_cfg,
**image_cfg, **image_cfg,
**platform_quality_probe_cfg, **platform_quality_probe_cfg,
**jenkins_build_weather_cfg,
**jenkins_workspace_cleanup_cfg, **jenkins_workspace_cleanup_cfg,
**vaultwarden_cfg, **vaultwarden_cfg,
**schedule_cfg, **schedule_cfg,

View File

@ -64,6 +64,7 @@ def test_startup_registers_metis_watch(monkeypatch) -> None:
assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks) assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks)
assert any(name == "schedule.metis_k3s_token_sync" for name, _cron in tasks) assert any(name == "schedule.metis_k3s_token_sync" for name, _cron in tasks)
assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks) assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks)
assert any(name == "schedule.jenkins_build_weather" for name, _cron in tasks)
assert any(name == "schedule.jenkins_workspace_cleanup" for name, _cron in tasks) assert any(name == "schedule.jenkins_workspace_cleanup" for name, _cron in tasks)

View File

@ -0,0 +1,227 @@
from __future__ import annotations
from datetime import datetime, timezone
import types
import httpx
from prometheus_client import REGISTRY
from ariadne.services import jenkins_build_weather as weather_module
class _DummyResponse:
def __init__(self, payload: dict[str, object], status_code: int = 200) -> None:
self._payload = payload
self.status_code = status_code
def raise_for_status(self) -> None:
if self.status_code >= 400:
request = httpx.Request("GET", "https://ci.bstein.dev/api/json")
response = httpx.Response(self.status_code, request=request)
raise httpx.HTTPStatusError("boom", request=request, response=response)
def json(self) -> dict[str, object]:
return self._payload
class _DummyClient:
def __init__(self, payload: dict[str, object]) -> None:
self._payload = payload
self.called = False
def __enter__(self) -> _DummyClient:
return self
def __exit__(self, exc_type, exc, tb) -> bool:
return False
def get(self, url: str, params: dict[str, str] | None = None) -> _DummyResponse:
self.called = True
assert url == "https://ci.bstein.dev/api/json"
assert isinstance(params, dict)
assert "tree" in params
return _DummyResponse(self._payload)
def _metric_value(name: str, labels: dict[str, str] | None = None) -> float | None:
value = REGISTRY.get_sample_value(name, labels or {})
return float(value) if value is not None else None
def _dummy_settings(base_url: str = "https://ci.bstein.dev") -> types.SimpleNamespace:
return types.SimpleNamespace(
jenkins_base_url=base_url,
jenkins_api_user="",
jenkins_api_token="",
jenkins_api_timeout_sec=5.0,
)
def test_collect_jenkins_build_weather_records_metrics(monkeypatch) -> None:
weather_module._JOB_SERIES = set()
monkeypatch.setattr(weather_module, "settings", _dummy_settings())
payload = {
"jobs": [
{
"name": "ariadne",
"url": "https://ci.bstein.dev/job/ariadne/",
"color": "blue",
"healthReport": [{"score": 93}],
"lastBuild": {"result": "SUCCESS", "timestamp": 1713000000000, "duration": 186000},
"lastSuccessfulBuild": {"timestamp": 1713000000000},
"lastFailedBuild": {"timestamp": 1712000000000},
},
{
"name": "titan-iac",
"url": "https://ci.bstein.dev/job/titan-iac/",
"color": "red",
"healthReport": [{"score": 11}],
"lastBuild": {"result": "FAILURE", "timestamp": 1712990000000, "duration": 126000},
"lastSuccessfulBuild": {"timestamp": 1711000000000},
"lastFailedBuild": {"timestamp": 1712990000000},
},
]
}
monkeypatch.setattr(weather_module.httpx, "Client", lambda **_kwargs: _DummyClient(payload))
before = _metric_value("ariadne_jenkins_build_weather_runs_total", {"status": "ok"}) or 0.0
summary = weather_module.collect_jenkins_build_weather()
assert summary.jobs_total == 2
assert summary.success_total == 1
assert summary.failure_total == 1
assert summary.running_total == 0
assert summary.unknown_total == 0
assert (_metric_value("ariadne_jenkins_build_weather_runs_total", {"status": "ok"}) or 0.0) == before + 1
assert _metric_value(
"ariadne_jenkins_build_weather_job_last_status",
{
"job": "ariadne",
"job_url": "https://ci.bstein.dev/job/ariadne/",
"weather_icon": "☀️",
},
) == 1.0
assert _metric_value(
"ariadne_jenkins_build_weather_job_last_status",
{
"job": "titan-iac",
"job_url": "https://ci.bstein.dev/job/titan-iac/",
"weather_icon": "⛈️",
},
) == 0.0
assert _metric_value(
"ariadne_jenkins_build_weather_job_last_duration_seconds",
{
"job": "ariadne",
"job_url": "https://ci.bstein.dev/job/ariadne/",
"weather_icon": "☀️",
},
) == 186.0
def test_collect_jenkins_build_weather_removes_deleted_job_series(monkeypatch) -> None:
weather_module._JOB_SERIES = set()
monkeypatch.setattr(weather_module, "settings", _dummy_settings())
first_payload = {
"jobs": [
{
"name": "ariadne",
"url": "https://ci.bstein.dev/job/ariadne/",
"color": "blue",
"healthReport": [{"score": 90}],
"lastBuild": {"result": "SUCCESS", "timestamp": 1713000000000, "duration": 186000},
"lastSuccessfulBuild": {"timestamp": 1713000000000},
"lastFailedBuild": {"timestamp": 1712000000000},
},
{
"name": "pegasus",
"url": "https://ci.bstein.dev/job/pegasus/",
"color": "yellow",
"healthReport": [{"score": 50}],
"lastBuild": {"result": "FAILURE", "timestamp": 1712980000000, "duration": 120000},
"lastSuccessfulBuild": {"timestamp": 1710000000000},
"lastFailedBuild": {"timestamp": 1712980000000},
},
]
}
second_payload = {
"jobs": [
{
"name": "ariadne",
"url": "https://ci.bstein.dev/job/ariadne/",
"color": "blue",
"healthReport": [{"score": 90}],
"lastBuild": {"result": "SUCCESS", "timestamp": 1713010000000, "duration": 184000},
"lastSuccessfulBuild": {"timestamp": 1713010000000},
"lastFailedBuild": {"timestamp": 1712000000000},
}
]
}
payloads = [first_payload, second_payload]
monkeypatch.setattr(
weather_module.httpx,
"Client",
lambda **_kwargs: _DummyClient(payloads.pop(0)),
)
weather_module.collect_jenkins_build_weather()
weather_module.collect_jenkins_build_weather()
assert _metric_value(
"ariadne_jenkins_build_weather_job_last_status",
{
"job": "pegasus",
"job_url": "https://ci.bstein.dev/job/pegasus/",
"weather_icon": "☁️",
},
) is None
def test_collect_jenkins_build_weather_skips_when_base_url_empty(monkeypatch) -> None:
weather_module._JOB_SERIES = set()
monkeypatch.setattr(weather_module, "settings", _dummy_settings(base_url=""))
before = _metric_value("ariadne_jenkins_build_weather_runs_total", {"status": "skipped"}) or 0.0
summary = weather_module.collect_jenkins_build_weather()
assert summary.jobs_total == 0
assert (_metric_value("ariadne_jenkins_build_weather_runs_total", {"status": "skipped"}) or 0.0) == before + 1
def test_fetch_jobs_flattens_folder_jobs(monkeypatch) -> None:
weather_module._JOB_SERIES = set()
monkeypatch.setattr(weather_module, "settings", _dummy_settings())
payload = {
"jobs": [
{
"name": "folder",
"url": "https://ci.bstein.dev/job/folder/",
"jobs": [
{
"name": "child",
"url": "https://ci.bstein.dev/job/folder/job/child/",
"color": "blue",
"healthReport": [{"score": 100}],
"lastBuild": {"result": "SUCCESS", "timestamp": 1713000000000, "duration": 1000},
"lastSuccessfulBuild": {"timestamp": 1713000000000},
"lastFailedBuild": {"timestamp": 1712000000000},
}
],
}
]
}
monkeypatch.setattr(weather_module.httpx, "Client", lambda **_kwargs: _DummyClient(payload))
jobs = weather_module._fetch_jobs()
assert len(jobs) == 1
assert jobs[0].job == "folder/child"
assert jobs[0].status == "success"
assert jobs[0].last_duration_seconds == 1.0
assert datetime.fromtimestamp(jobs[0].last_run_ts, tz=timezone.utc).year == 2024

View File

@ -25,3 +25,18 @@ def test_from_env_includes_metis_settings(monkeypatch) -> None:
assert cfg.metis_watch_url == "http://metis.example/internal/sentinel/watch" assert cfg.metis_watch_url == "http://metis.example/internal/sentinel/watch"
assert cfg.metis_timeout_sec == 9.5 assert cfg.metis_timeout_sec == 9.5
assert cfg.metis_sentinel_watch_cron == "*/7 * * * *" assert cfg.metis_sentinel_watch_cron == "*/7 * * * *"
def test_from_env_includes_jenkins_weather_settings(monkeypatch) -> None:
monkeypatch.setenv("JENKINS_BASE_URL", "https://ci.bstein.dev/")
monkeypatch.setenv("JENKINS_API_USER", "ariadne")
monkeypatch.setenv("JENKINS_API_TOKEN", "token")
monkeypatch.setenv("JENKINS_API_TIMEOUT_SEC", "8.5")
monkeypatch.setenv("ARIADNE_SCHEDULE_JENKINS_BUILD_WEATHER", "*/9 * * * *")
cfg = Settings.from_env()
assert cfg.jenkins_base_url == "https://ci.bstein.dev"
assert cfg.jenkins_api_user == "ariadne"
assert cfg.jenkins_api_token == "token"
assert cfg.jenkins_api_timeout_sec == 8.5
assert cfg.jenkins_build_weather_cron == "*/9 * * * *"