"""Build a titan-iac supply-chain compliance report from Trivy evidence.""" from __future__ import annotations import argparse import datetime as dt import json from pathlib import Path from typing import Any FAIL_SEVERITIES = {"HIGH", "CRITICAL"} def _read_json(path: Path) -> dict[str, Any]: """Read a JSON object from disk for use as pipeline evidence.""" payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError(f"{path} must contain a JSON object") return payload def _parse_day(raw: str | None) -> dt.date | None: """Parse an ISO day while letting optional waiver dates stay optional.""" if not raw: return None return dt.date.fromisoformat(raw) def _today(override: str | None = None) -> dt.date: """Return the policy day so tests can pin expiry behavior.""" return _parse_day(override) or dt.date.today() def _load_waiver_pairs(path: Path | None, policy_day: dt.date) -> tuple[set[tuple[str, str]], int]: """Return active ``(misconfiguration id, target)`` waivers and expired count.""" if path is None or not path.exists(): return set(), 0 payload = _read_json(path) default_expires_at = payload.get("default_expires_at") active: set[tuple[str, str]] = set() expired = 0 for entry in payload.get("misconfigurations", []): if not isinstance(entry, dict): continue misconfiguration_id = str(entry.get("id") or "").strip() if not misconfiguration_id: continue expires_at = _parse_day(str(entry.get("expires_at") or default_expires_at or "")) targets = entry.get("targets", []) if not isinstance(targets, list): continue if expires_at and expires_at < policy_day: expired += len(targets) continue # Waivers are target-specific so a new unsafe manifest fails until it is # either fixed or deliberately accepted with a fresh expiration. for target in targets: if isinstance(target, str) and target: active.add((misconfiguration_id, target)) return active, expired def _iter_failed_misconfigurations(payload: dict[str, Any]): """Yield failed high/critical Trivy misconfiguration records.""" for result in payload.get("Results", []): if not isinstance(result, dict): continue target = str(result.get("Target") or "") for item in result.get("Misconfigurations") or []: if not isinstance(item, dict): continue if item.get("Status") != "FAIL": continue if str(item.get("Severity") or "").upper() not in FAIL_SEVERITIES: continue yield target, item def _count_vulnerabilities(payload: dict[str, Any], severity: str) -> int: """Count Trivy vulnerabilities at a specific severity.""" count = 0 for result in payload.get("Results", []): if not isinstance(result, dict): continue for item in result.get("Vulnerabilities") or []: if isinstance(item, dict) and str(item.get("Severity") or "").upper() == severity: count += 1 return count def _count_secrets(payload: dict[str, Any]) -> int: """Count detected secrets in the Trivy filesystem report.""" count = 0 for result in payload.get("Results", []): if isinstance(result, dict): count += len(result.get("Secrets") or []) return count def build_report( trivy_payload: dict[str, Any], waiver_path: Path | None = None, today_override: str | None = None, ) -> dict[str, Any]: """Build the compliance summary consumed by the quality gate.""" policy_day = _today(today_override) active_waivers, expired_waivers = _load_waiver_pairs(waiver_path, policy_day) open_misconfigs: list[dict[str, str]] = [] waived_misconfigs = 0 for target, item in _iter_failed_misconfigurations(trivy_payload): misconfiguration_id = str(item.get("ID") or "") if (misconfiguration_id, target) in active_waivers: waived_misconfigs += 1 continue open_misconfigs.append( { "id": misconfiguration_id, "target": target, "severity": str(item.get("Severity") or ""), "title": str(item.get("Title") or ""), } ) critical = _count_vulnerabilities(trivy_payload, "CRITICAL") high = _count_vulnerabilities(trivy_payload, "HIGH") secrets = _count_secrets(trivy_payload) status = "ok" if critical == 0 and secrets == 0 and not open_misconfigs else "failed" return { "status": status, "compliant": status == "ok", "category": "artifact_security", "scan_type": "filesystem", "scanner": "trivy", "critical_vulnerabilities": critical, "high_vulnerabilities": high, "high_vulnerability_policy": "observe", "secrets": secrets, "high_or_critical_misconfigurations": len(open_misconfigs), "waived_misconfigurations": waived_misconfigs, "expired_waivers": expired_waivers, "waiver_file": str(waiver_path) if waiver_path else "", "open_misconfiguration_examples": open_misconfigs[:20], } def main(argv: list[str] | None = None) -> int: """CLI entrypoint used by Jenkins after the Trivy scan completes.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--trivy-json", required=True) parser.add_argument("--waivers") parser.add_argument("--output", required=True) parser.add_argument("--today") args = parser.parse_args(argv) trivy_payload = _read_json(Path(args.trivy_json)) waiver_path = Path(args.waivers) if args.waivers else None report = build_report(trivy_payload, waiver_path=waiver_path, today_override=args.today) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") return 0 if __name__ == "__main__": # pragma: no cover raise SystemExit(main())