testing: add ariadne triage evidence bundle

This commit is contained in:
codex 2026-05-19 23:16:47 -03:00
parent e92e616fd9
commit 233d86ebe1
9 changed files with 627 additions and 0 deletions

View File

@ -32,6 +32,7 @@ from .services.nextcloud import nextcloud
from .services.opensearch_prune import prune_indices
from .services.platform_quality_probe import platform_quality_probe
from .services.pod_cleaner import clean_finished_pods
from .services.testing_triage import TRIAGE_EVENT_TYPE, collect_testing_triage, latest_testing_triage_bundle, run_testing_triage
from .services.vault import vault
from .services.vaultwarden_sync import run_vaultwarden_sync
from .services.wger import wger
@ -175,6 +176,7 @@ def _startup() -> None:
)
scheduler.add_task("schedule.jenkins_build_weather", settings.jenkins_build_weather_cron, collect_jenkins_build_weather)
scheduler.add_task("schedule.jenkins_workspace_cleanup", settings.jenkins_workspace_cleanup_cron, cleanup_jenkins_workspace_storage)
scheduler.add_task("schedule.testing_triage", settings.testing_triage_cron, lambda: run_testing_triage(storage))
scheduler.add_task("schedule.vault_k8s_auth", settings.vault_k8s_auth_cron, lambda: vault.sync_k8s_auth(wait=True))
scheduler.add_task("schedule.vault_oidc", settings.vault_oidc_cron, lambda: vault.sync_oidc(wait=True))
scheduler.add_task("schedule.comms_guest_name", settings.comms_guest_name_cron, lambda: comms.run_guest_name_randomizer(wait=True))
@ -207,6 +209,7 @@ def _startup() -> None:
"jenkins_workspace_cleanup_cron": settings.jenkins_workspace_cleanup_cron,
"jenkins_workspace_cleanup_dry_run": settings.jenkins_workspace_cleanup_dry_run,
"jenkins_workspace_cleanup_max_deletions_per_run": settings.jenkins_workspace_cleanup_max_deletions_per_run,
"testing_triage_cron": settings.testing_triage_cron,
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
"vault_oidc_cron": settings.vault_oidc_cron,
"comms_guest_name_cron": settings.comms_guest_name_cron,

View File

@ -141,6 +141,46 @@ def _register_admin_routes(app: FastAPI, require_auth: Callable, deps: Callable[
raise HTTPException(status_code=404, detail="cluster state unavailable")
return JSONResponse(snapshot)
@app.get("/api/admin/testing/triage/latest")
def get_testing_triage(ctx: AuthContext = Depends(require_auth)) -> JSONResponse:
"""Return the latest OpenClaw-ready testing triage bundle."""
module = deps()
module._require_admin(ctx)
bundle = module.latest_testing_triage_bundle(module.storage)
if not bundle:
raise HTTPException(status_code=404, detail="testing triage unavailable")
return JSONResponse(bundle)
@app.get("/api/internal/testing/triage/latest")
def get_testing_triage_internal() -> JSONResponse:
"""Return the latest testing triage bundle for trusted internal callers."""
module = deps()
bundle = module.latest_testing_triage_bundle(module.storage)
if not bundle:
raise HTTPException(status_code=404, detail="testing triage unavailable")
return JSONResponse(bundle)
@app.post("/api/admin/testing/triage/collect")
def collect_testing_triage(ctx: AuthContext = Depends(require_auth)) -> JSONResponse:
"""Collect, store, and return a fresh testing triage evidence bundle."""
module = deps()
module._require_admin(ctx)
bundle = module.collect_testing_triage(module.storage)
module.storage.record_event(module.TRIAGE_EVENT_TYPE, bundle)
return JSONResponse(bundle)
@app.post("/api/internal/testing/triage/collect")
def collect_testing_triage_internal() -> JSONResponse:
"""Collect, store, and return a fresh bundle for trusted internal callers."""
module = deps()
bundle = module.collect_testing_triage(module.storage)
module.storage.record_event(module.TRIAGE_EVENT_TYPE, bundle)
return JSONResponse(bundle)
@app.post("/api/admin/access/requests/{username}/approve")
async def approve_access_request(
username: str,

View File

@ -0,0 +1,461 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import json
from typing import Any
import httpx
from ..db.storage import Storage
from ..settings import settings
from ..utils.logging import get_logger
from .cluster_state import collect_cluster_state
logger = get_logger(__name__)
TRIAGE_EVENT_TYPE = "testing_triage_bundle"
_SUCCESS_STATUS = "ok|passed|success|not_applicable|skipped|na|n/a"
_JENKINS_TREE = (
"jobs[name,url,color,lastBuild[number,result,timestamp,duration,url],"
"lastFailedBuild[number,timestamp,url],jobs[name,url,color,"
"lastBuild[number,result,timestamp,duration,url],lastFailedBuild[number,timestamp,url]]]"
)
_MAX_JENKINS_LOG_LINES = 80
_MAX_JENKINS_LOG_CHARS = 12000
_MAX_EVIDENCE_ITEMS = 12
@dataclass(frozen=True)
class TestingTriageSummary:
"""Represent one stored testing triage bundle.
Inputs: bounded evidence counts gathered from Ariadne collectors.
Outputs: compact scheduler details for metrics and audit records.
"""
status: str
problem_count: int
failed_suites: list[str]
def latest_testing_triage_bundle(storage: Storage) -> dict[str, Any] | None:
"""Return the most recent stored testing triage bundle, if present."""
rows = storage.list_events(limit=1, event_type=TRIAGE_EVENT_TYPE)
if not rows:
return None
detail = rows[0].get("detail")
if isinstance(detail, dict):
return detail
if isinstance(detail, str):
try:
payload = json.loads(detail)
except json.JSONDecodeError:
return None
return payload if isinstance(payload, dict) else None
return None
def run_testing_triage(storage: Storage) -> TestingTriageSummary:
"""Collect and store an OpenClaw-ready testing triage evidence bundle."""
bundle = collect_testing_triage(storage)
storage.record_event(TRIAGE_EVENT_TYPE, bundle)
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
result = TestingTriageSummary(
status=str(summary.get("status") or "unknown"),
problem_count=int(summary.get("problem_count") or 0),
failed_suites=[str(item) for item in summary.get("failed_suites") or []],
)
logger.info(
"testing triage bundle stored",
extra={
"event": "testing_triage",
"status": result.status,
"problem_count": result.problem_count,
"failed_suites": ",".join(result.failed_suites),
},
)
return result
def collect_testing_triage(storage: Storage | None = None) -> dict[str, Any]:
"""Build a bounded evidence bundle for agentic testing triage.
Inputs: latest persisted cluster state when available, plus deterministic
VictoriaMetrics and Jenkins API reads.
Outputs: JSON and Markdown evidence that OpenClaw can summarize without
discovering cluster state from scratch.
"""
errors: list[str] = []
generated_at = datetime.now(timezone.utc).isoformat()
snapshot = _latest_cluster_snapshot(storage, errors)
quality = _quality_signals(errors)
jenkins = _jenkins_signals(errors)
cluster = _cluster_evidence(snapshot)
summary = _summary(cluster, quality, jenkins, errors)
bundle: dict[str, Any] = {
"kind": "testing_triage_bundle",
"generated_at": generated_at,
"summary": summary,
"evidence": {
"cluster": cluster,
"quality": quality,
"jenkins": jenkins,
},
"openclaw": {
"ariadne_latest_url": "http://ariadne.maintenance.svc.cluster.local/api/internal/testing/triage/latest",
"instructions": [
"Treat this bundle as the primary evidence source.",
"Summarize root cause, blast radius, and smallest Flux/IaC change.",
"Do not read Kubernetes Secrets or run mutating kubectl commands.",
"Only run extra read-only commands when the bundle is stale or ambiguous.",
],
},
"unknowns": errors,
}
bundle["markdown"] = _render_markdown(bundle)
return bundle
def _latest_cluster_snapshot(storage: Storage | None, errors: list[str]) -> dict[str, Any]:
if storage is not None:
try:
snapshot = storage.latest_cluster_state()
if isinstance(snapshot, dict) and snapshot:
return snapshot
except Exception as exc:
errors.append(f"cluster_state_latest: {exc}")
try:
snapshot, _summary = collect_cluster_state()
return snapshot
except Exception as exc:
errors.append(f"cluster_state_collect: {exc}")
return {}
def _cluster_evidence(snapshot: dict[str, Any]) -> dict[str, Any]:
summary = snapshot.get("summary") if isinstance(snapshot.get("summary"), dict) else {}
flux = snapshot.get("flux") if isinstance(snapshot.get("flux"), dict) else {}
pod_issues = snapshot.get("pod_issues") if isinstance(snapshot.get("pod_issues"), dict) else {}
jobs = snapshot.get("jobs") if isinstance(snapshot.get("jobs"), dict) else {}
events = snapshot.get("events") if isinstance(snapshot.get("events"), dict) else {}
nodes = snapshot.get("nodes_summary") if isinstance(snapshot.get("nodes_summary"), dict) else {}
return {
"collected_at": snapshot.get("collected_at") or "",
"health_bullets": _limit(summary.get("health_bullets")),
"attention_ranked": _limit(summary.get("attention_ranked")),
"nodes": {
"total": nodes.get("total"),
"ready": nodes.get("ready"),
"not_ready": nodes.get("not_ready"),
"not_ready_names": nodes.get("not_ready_names") or [],
},
"flux_not_ready": _limit(flux.get("items")),
"pod_issues": _limit(pod_issues.get("items")),
"pending_oldest": _limit(pod_issues.get("pending_oldest")),
"jobs_failing": _limit(jobs.get("failing")),
"jobs_active_oldest": _limit(jobs.get("active_oldest")),
"events_recent": _limit(events.get("warnings_recent")),
}
def _quality_signals(errors: list[str]) -> dict[str, Any]:
queries = {
"failed_runs_24h": (
'topk(12, sum by (suite) (increase(platform_quality_gate_runs_total'
f'{{exported_job="platform-quality-ci",status!~"{_SUCCESS_STATUS}"}}[24h])))'
),
"failing_checks_24h": (
'topk(20, sum by (suite,check,status) (increase({__name__=~".*_quality_gate_checks_total",'
f'exported_job="platform-quality-ci",status!~"{_SUCCESS_STATUS}"}}[24h])))'
),
"problem_tests_24h": (
'topk(20, sum by (suite,test,status) (increase(platform_quality_gate_test_case_result'
'{exported_job="platform-quality-ci",test!="",test!="__no_test_cases__",status="failed"}[24h])))'
),
"jenkins_weather_failures": (
"topk(12, max by (exported_job,job_url,weather_icon) "
"(ariadne_jenkins_build_weather_job_last_status != 1))"
),
}
return {
name: {
"query": query,
"items": _vm_items(query, errors),
}
for name, query in queries.items()
}
def _vm_items(query: str, errors: list[str]) -> list[dict[str, Any]]:
base_url = settings.vm_url.strip().rstrip("/")
if not base_url:
return []
try:
with httpx.Client(timeout=settings.cluster_state_vm_timeout_sec) as client:
response = client.get(f"{base_url}/api/v1/query", params={"query": query})
response.raise_for_status()
payload = response.json()
except Exception as exc:
errors.append(f"victoria_metrics: {exc}")
return []
if payload.get("status") != "success":
errors.append("victoria_metrics: query failed")
return []
result = payload.get("data", {}).get("result")
rows = result if isinstance(result, list) else []
return [_vm_item(row) for row in rows[:_MAX_EVIDENCE_ITEMS] if isinstance(row, dict)]
def _vm_item(row: dict[str, Any]) -> dict[str, Any]:
metric = row.get("metric") if isinstance(row.get("metric"), dict) else {}
value = row.get("value") if isinstance(row.get("value"), list) else []
labels = {key: value for key, value in metric.items() if not key.startswith("__")}
return {
"labels": labels,
"value": _float_value(value[1] if len(value) > 1 else None),
}
def _jenkins_signals(errors: list[str]) -> dict[str, Any]:
base_url = settings.jenkins_base_url.strip().rstrip("/")
if not base_url:
return {"failed_builds": []}
try:
jobs = _fetch_jenkins_jobs(base_url)
except Exception as exc:
errors.append(f"jenkins: {exc}")
return {"failed_builds": []}
failed = [job for job in jobs if job.get("status") in {"failure", "running", "unknown"}]
failed.sort(key=lambda item: -(item.get("last_run_ts") or 0))
for job in failed[:3]:
_attach_jenkins_log_tail(job, errors)
return {"failed_builds": failed[:_MAX_EVIDENCE_ITEMS]}
def _fetch_jenkins_jobs(base_url: str) -> list[dict[str, Any]]:
auth = _jenkins_auth()
kwargs: dict[str, Any] = {"timeout": settings.jenkins_api_timeout_sec, "follow_redirects": True}
if auth is not None:
kwargs["auth"] = auth
with httpx.Client(**kwargs) as client:
response = client.get(f"{base_url}/api/json", params={"tree": _JENKINS_TREE})
response.raise_for_status()
payload = response.json()
items = payload.get("jobs") if isinstance(payload, dict) and isinstance(payload.get("jobs"), list) else []
jobs: list[dict[str, Any]] = []
for row in _flatten_jobs(items):
job = _jenkins_job(row)
if job is not None:
jobs.append(job)
return jobs
def _flatten_jobs(items: list[Any], prefix: str = "") -> list[dict[str, Any]]:
output: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
name = item.get("name")
if not isinstance(name, str) or not name:
continue
full_name = f"{prefix}/{name}" if prefix else name
children = item.get("jobs") if isinstance(item.get("jobs"), list) else []
if children:
output.extend(_flatten_jobs(children, full_name))
if isinstance(item.get("lastBuild"), dict):
entry = dict(item)
entry["name"] = full_name
output.append(entry)
return output
def _jenkins_job(raw: dict[str, Any]) -> dict[str, Any] | None:
name = raw.get("name")
url = raw.get("url")
if not isinstance(name, str) or not isinstance(url, str):
return None
last_build = raw.get("lastBuild") if isinstance(raw.get("lastBuild"), dict) else {}
result = str(last_build.get("result") or "").upper()
status = _jenkins_status(raw, result)
return {
"job": name,
"job_url": url,
"status": status,
"result": result or "UNKNOWN",
"last_build_number": last_build.get("number"),
"last_run_ts": _millis_to_seconds(last_build.get("timestamp")),
"last_duration_seconds": _millis_to_seconds(last_build.get("duration")),
"console_url": str(last_build.get("url") or url).rstrip("/") + "/consoleText",
}
def _jenkins_status(raw: dict[str, Any], result: str) -> str:
color = str(raw.get("color") or "").lower()
if color.endswith("_anime"):
return "running"
if result == "SUCCESS":
return "success"
if result in {"FAILURE", "ABORTED", "UNSTABLE", "NOT_BUILT"}:
return "failure"
if color.startswith(("blue", "green")):
return "success"
if color.startswith(("red", "yellow")):
return "failure"
return "unknown"
def _attach_jenkins_log_tail(job: dict[str, Any], errors: list[str]) -> None:
url = job.get("console_url")
if not isinstance(url, str) or not url:
return
auth = _jenkins_auth()
kwargs: dict[str, Any] = {"timeout": settings.jenkins_api_timeout_sec, "follow_redirects": True}
if auth is not None:
kwargs["auth"] = auth
try:
with httpx.Client(**kwargs) as client:
response = client.get(url)
response.raise_for_status()
job["log_tail"] = _tail_text(response.text)
except Exception as exc:
errors.append(f"jenkins_log:{job.get('job')}: {exc}")
def _tail_text(text: str) -> str:
lines = text.splitlines()[-_MAX_JENKINS_LOG_LINES:]
tail = "\n".join(lines)
if len(tail) <= _MAX_JENKINS_LOG_CHARS:
return tail
return tail[-_MAX_JENKINS_LOG_CHARS:]
def _summary(
cluster: dict[str, Any],
quality: dict[str, Any],
jenkins: dict[str, Any],
errors: list[str],
) -> dict[str, Any]:
failed_suites = sorted(_failed_suites(quality))
problem_count = (
len(cluster.get("flux_not_ready") or [])
+ len(cluster.get("pod_issues") or [])
+ len(cluster.get("jobs_failing") or [])
+ len(quality.get("failed_runs_24h", {}).get("items") or [])
+ len(quality.get("failing_checks_24h", {}).get("items") or [])
+ len(jenkins.get("failed_builds") or [])
)
return {
"status": "needs_attention" if problem_count or errors else "ok",
"problem_count": problem_count,
"failed_suites": failed_suites,
"cluster_collected_at": cluster.get("collected_at") or "",
"unknown_count": len(errors),
}
def _failed_suites(quality: dict[str, Any]) -> set[str]:
suites: set[str] = set()
for bucket in quality.values():
if not isinstance(bucket, dict):
continue
for item in bucket.get("items") or []:
labels = item.get("labels") if isinstance(item, dict) else {}
suite = labels.get("suite") if isinstance(labels, dict) else None
if isinstance(suite, str) and suite:
suites.add(suite)
return suites
def _render_markdown(bundle: dict[str, Any]) -> str:
summary = bundle.get("summary") if isinstance(bundle.get("summary"), dict) else {}
evidence = bundle.get("evidence") if isinstance(bundle.get("evidence"), dict) else {}
cluster = evidence.get("cluster") if isinstance(evidence.get("cluster"), dict) else {}
quality = evidence.get("quality") if isinstance(evidence.get("quality"), dict) else {}
jenkins = evidence.get("jenkins") if isinstance(evidence.get("jenkins"), dict) else {}
lines = [
"# Testing Triage Evidence",
"",
f"- Generated: {bundle.get('generated_at')}",
f"- Status: {summary.get('status')}",
f"- Problem count: {summary.get('problem_count')}",
f"- Failed suites: {', '.join(summary.get('failed_suites') or []) or 'none'}",
"",
"## Cluster",
*_markdown_items(cluster.get("health_bullets")),
*_markdown_named_items("Flux", cluster.get("flux_not_ready"), "name"),
*_markdown_named_items("Pods", cluster.get("pod_issues"), "pod"),
"",
"## Quality",
*_markdown_quality(quality),
"",
"## Jenkins",
*_markdown_named_items("Failed builds", jenkins.get("failed_builds"), "job"),
]
unknowns = bundle.get("unknowns") if isinstance(bundle.get("unknowns"), list) else []
if unknowns:
lines.extend(["", "## Unknowns", *_markdown_items(unknowns)])
return "\n".join(lines).strip() + "\n"
def _markdown_items(items: Any) -> list[str]:
values = items if isinstance(items, list) else []
if not values:
return ["- none"]
return [f"- {item}" for item in values[:_MAX_EVIDENCE_ITEMS]]
def _markdown_named_items(title: str, items: Any, key: str) -> list[str]:
values = items if isinstance(items, list) else []
if not values:
return [f"- {title}: none"]
output = []
for item in values[:_MAX_EVIDENCE_ITEMS]:
if not isinstance(item, dict):
continue
name = item.get(key) or item.get("name") or item.get("job") or "unknown"
namespace = item.get("namespace")
prefix = f"{namespace}/" if namespace else ""
output.append(f"- {title}: {prefix}{name}")
return output or [f"- {title}: none"]
def _markdown_quality(quality: dict[str, Any]) -> list[str]:
lines: list[str] = []
for name, bucket in quality.items():
items = bucket.get("items") if isinstance(bucket, dict) else []
if not items:
lines.append(f"- {name}: none")
continue
for item in items[:5]:
labels = item.get("labels") if isinstance(item, dict) else {}
lines.append(f"- {name}: {labels} value={item.get('value')}")
return lines
def _limit(items: Any) -> list[Any]:
return items[:_MAX_EVIDENCE_ITEMS] if isinstance(items, list) else []
def _float_value(value: Any) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0
def _millis_to_seconds(value: Any) -> float:
return _float_value(value) / 1000.0
def _jenkins_auth() -> tuple[str, str] | None:
username = settings.jenkins_api_user.strip()
token = settings.jenkins_api_token.strip()
if username and token:
return username, token
return None

View File

@ -240,6 +240,7 @@ class Settings:
platform_quality_suite_probe_cron: str
jenkins_build_weather_cron: str
jenkins_workspace_cleanup_cron: str
testing_triage_cron: str
opensearch_url: str
opensearch_limit_bytes: int

View File

@ -295,6 +295,10 @@ def _schedule_config() -> dict[str, Any]:
"ARIADNE_SCHEDULE_JENKINS_WORKSPACE_CLEANUP",
"45 */6 * * *",
),
"testing_triage_cron": _env(
"ARIADNE_SCHEDULE_TESTING_TRIAGE",
"*/15 * * * *",
),
}

View File

@ -33,6 +33,7 @@ def test_from_env_includes_jenkins_weather_settings(monkeypatch) -> None:
monkeypatch.setenv("JENKINS_API_TOKEN", "token")
monkeypatch.setenv("JENKINS_API_TIMEOUT_SEC", "8.5")
monkeypatch.setenv("ARIADNE_SCHEDULE_JENKINS_BUILD_WEATHER", "*/9 * * * *")
monkeypatch.setenv("ARIADNE_SCHEDULE_TESTING_TRIAGE", "*/11 * * * *")
cfg = Settings.from_env()
assert cfg.jenkins_base_url == "https://ci.bstein.dev"
@ -40,3 +41,4 @@ def test_from_env_includes_jenkins_weather_settings(monkeypatch) -> None:
assert cfg.jenkins_api_token == "token"
assert cfg.jenkins_api_timeout_sec == 8.5
assert cfg.jenkins_build_weather_cron == "*/9 * * * *"
assert cfg.testing_triage_cron == "*/11 * * * *"

View File

@ -0,0 +1,74 @@
from __future__ import annotations
from ariadne.services import testing_triage
class DummyStorage:
def __init__(self) -> None:
self.events: list[tuple[str, dict]] = []
def latest_cluster_state(self): # type: ignore[no-untyped-def]
return {
"collected_at": "2026-05-20T00:00:00+00:00",
"summary": {
"health_bullets": ["Pods pending: 1"],
"attention_ranked": [{"kind": "pod_pending"}],
},
"nodes_summary": {"total": 3, "ready": 3, "not_ready": 0},
"flux": {"items": [{"namespace": "flux-system", "name": "monitoring"}]},
"pod_issues": {
"items": [{"namespace": "jenkins", "pod": "agent-1", "phase": "Pending"}],
"pending_oldest": [{"namespace": "jenkins", "pod": "agent-1"}],
},
"jobs": {"failing": [], "active_oldest": []},
"events": {"warnings_recent": []},
}
def record_event(self, event_type: str, detail: dict) -> None:
self.events.append((event_type, detail))
def list_events(self, limit: int = 1, event_type: str | None = None): # type: ignore[no-untyped-def]
matching = [
{"detail": detail}
for stored_type, detail in self.events
if event_type is None or stored_type == event_type
]
return matching[-limit:][::-1]
def test_collect_testing_triage_builds_bundle(monkeypatch) -> None:
storage = DummyStorage()
monkeypatch.setattr(
testing_triage,
"_vm_items",
lambda query, errors: [{"labels": {"suite": "ariadne"}, "value": 1.0}]
if "platform_quality_gate_runs_total" in query
else [],
)
monkeypatch.setattr(testing_triage, "_jenkins_signals", lambda errors: {"failed_builds": []})
bundle = testing_triage.collect_testing_triage(storage)
assert bundle["kind"] == "testing_triage_bundle"
assert bundle["summary"]["status"] == "needs_attention"
assert bundle["summary"]["failed_suites"] == ["ariadne"]
assert "Testing Triage Evidence" in bundle["markdown"]
assert bundle["openclaw"]["ariadne_latest_url"].endswith("/api/internal/testing/triage/latest")
def test_run_testing_triage_stores_latest(monkeypatch) -> None:
storage = DummyStorage()
monkeypatch.setattr(
testing_triage,
"collect_testing_triage",
lambda _storage: {
"summary": {"status": "ok", "problem_count": 0, "failed_suites": []},
},
)
summary = testing_triage.run_testing_triage(storage)
latest = testing_triage.latest_testing_triage_bundle(storage)
assert summary.status == "ok"
assert storage.events[0][0] == testing_triage.TRIAGE_EVENT_TYPE
assert latest["summary"]["status"] == "ok"

View File

@ -213,6 +213,47 @@ def test_cluster_state_routes_report_unavailable(monkeypatch) -> None:
assert admin_resp.status_code == 404
assert internal_resp.status_code == 404
def test_testing_triage_routes(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
bundle = {"kind": "testing_triage_bundle", "summary": {"status": "ok"}}
recorded = []
monkeypatch.setattr(app_module, "latest_testing_triage_bundle", lambda _storage: bundle)
monkeypatch.setattr(app_module, "collect_testing_triage", lambda _storage: bundle)
monkeypatch.setattr(app_module.storage, "record_event", lambda event, detail: recorded.append((event, detail)))
admin_latest = client.get(
"/api/admin/testing/triage/latest",
headers={"Authorization": "Bearer token"},
)
internal_latest = client.get("/api/internal/testing/triage/latest")
admin_collect = client.post(
"/api/admin/testing/triage/collect",
headers={"Authorization": "Bearer token"},
)
internal_collect = client.post("/api/internal/testing/triage/collect")
assert admin_latest.status_code == 200
assert internal_latest.status_code == 200
assert admin_collect.status_code == 200
assert internal_collect.status_code == 200
assert recorded[0][0] == app_module.TRIAGE_EVENT_TYPE
def test_testing_triage_latest_unavailable(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)
monkeypatch.setattr(app_module, "latest_testing_triage_bundle", lambda _storage: None)
admin_resp = client.get(
"/api/admin/testing/triage/latest",
headers={"Authorization": "Bearer token"},
)
internal_resp = client.get("/api/internal/testing/triage/latest")
assert admin_resp.status_code == 404
assert internal_resp.status_code == 404
def test_access_request_approve(monkeypatch) -> None:
ctx = AuthContext(username="bstein", email="", groups=["admin"], claims={})
client = _client(monkeypatch, ctx)

View File

@ -42,6 +42,7 @@ def test_startup_registers_metis_watch(monkeypatch) -> None:
assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks)
assert any(name == "schedule.jenkins_build_weather" for name, _cron in tasks)
assert any(name == "schedule.jenkins_workspace_cleanup" for name, _cron in tasks)
assert any(name == "schedule.testing_triage" for name, _cron in tasks)
def test_record_event_handles_exception(monkeypatch) -> None:
monkeypatch.setattr(app_module.storage, "record_event", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")))