titan-iac/ci/scripts/supply_chain_report.py

174 lines
6.1 KiB
Python
Raw Permalink Normal View History

"""Build a titan-iac supply-chain compliance report from Trivy evidence."""
from __future__ import annotations
import argparse
import datetime as dt
import json
from pathlib import Path
from typing import Any
FAIL_SEVERITIES = {"HIGH", "CRITICAL"}
def _read_json(path: Path) -> dict[str, Any]:
"""Read a JSON object from disk for use as pipeline evidence."""
payload = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise ValueError(f"{path} must contain a JSON object")
return payload
def _parse_day(raw: str | None) -> dt.date | None:
"""Parse an ISO day while letting optional waiver dates stay optional."""
if not raw:
return None
return dt.date.fromisoformat(raw)
def _today(override: str | None = None) -> dt.date:
"""Return the policy day so tests can pin expiry behavior."""
return _parse_day(override) or dt.date.today()
def _load_waiver_pairs(path: Path | None, policy_day: dt.date) -> tuple[set[tuple[str, str]], int]:
"""Return active ``(misconfiguration id, target)`` waivers and expired count."""
if path is None or not path.exists():
return set(), 0
payload = _read_json(path)
default_expires_at = payload.get("default_expires_at")
active: set[tuple[str, str]] = set()
expired = 0
for entry in payload.get("misconfigurations", []):
if not isinstance(entry, dict):
continue
misconfiguration_id = str(entry.get("id") or "").strip()
if not misconfiguration_id:
continue
expires_at = _parse_day(str(entry.get("expires_at") or default_expires_at or ""))
targets = entry.get("targets", [])
if not isinstance(targets, list):
continue
if expires_at and expires_at < policy_day:
expired += len(targets)
continue
# Waivers are target-specific so a new unsafe manifest fails until it is
# either fixed or deliberately accepted with a fresh expiration.
for target in targets:
if isinstance(target, str) and target:
active.add((misconfiguration_id, target))
return active, expired
def _iter_failed_misconfigurations(payload: dict[str, Any]):
"""Yield failed high/critical Trivy misconfiguration records."""
for result in payload.get("Results", []):
if not isinstance(result, dict):
continue
target = str(result.get("Target") or "")
for item in result.get("Misconfigurations") or []:
if not isinstance(item, dict):
continue
if item.get("Status") != "FAIL":
continue
if str(item.get("Severity") or "").upper() not in FAIL_SEVERITIES:
continue
yield target, item
def _count_vulnerabilities(payload: dict[str, Any], severity: str) -> int:
"""Count Trivy vulnerabilities at a specific severity."""
count = 0
for result in payload.get("Results", []):
if not isinstance(result, dict):
continue
for item in result.get("Vulnerabilities") or []:
if isinstance(item, dict) and str(item.get("Severity") or "").upper() == severity:
count += 1
return count
def _count_secrets(payload: dict[str, Any]) -> int:
"""Count detected secrets in the Trivy filesystem report."""
count = 0
for result in payload.get("Results", []):
if isinstance(result, dict):
count += len(result.get("Secrets") or [])
return count
def build_report(
trivy_payload: dict[str, Any],
waiver_path: Path | None = None,
today_override: str | None = None,
) -> dict[str, Any]:
"""Build the compliance summary consumed by the quality gate."""
policy_day = _today(today_override)
active_waivers, expired_waivers = _load_waiver_pairs(waiver_path, policy_day)
open_misconfigs: list[dict[str, str]] = []
waived_misconfigs = 0
for target, item in _iter_failed_misconfigurations(trivy_payload):
misconfiguration_id = str(item.get("ID") or "")
if (misconfiguration_id, target) in active_waivers:
waived_misconfigs += 1
continue
open_misconfigs.append(
{
"id": misconfiguration_id,
"target": target,
"severity": str(item.get("Severity") or ""),
"title": str(item.get("Title") or ""),
}
)
critical = _count_vulnerabilities(trivy_payload, "CRITICAL")
high = _count_vulnerabilities(trivy_payload, "HIGH")
secrets = _count_secrets(trivy_payload)
status = "ok" if critical == 0 and secrets == 0 and not open_misconfigs else "failed"
return {
"status": status,
"compliant": status == "ok",
"category": "artifact_security",
"scan_type": "filesystem",
"scanner": "trivy",
"critical_vulnerabilities": critical,
"high_vulnerabilities": high,
"high_vulnerability_policy": "observe",
"secrets": secrets,
"high_or_critical_misconfigurations": len(open_misconfigs),
"waived_misconfigurations": waived_misconfigs,
"expired_waivers": expired_waivers,
"waiver_file": str(waiver_path) if waiver_path else "",
"open_misconfiguration_examples": open_misconfigs[:20],
}
def main(argv: list[str] | None = None) -> int:
"""CLI entrypoint used by Jenkins after the Trivy scan completes."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--trivy-json", required=True)
parser.add_argument("--waivers")
parser.add_argument("--output", required=True)
parser.add_argument("--today")
args = parser.parse_args(argv)
trivy_payload = _read_json(Path(args.trivy_json))
waiver_path = Path(args.waivers) if args.waivers else None
report = build_report(trivy_payload, waiver_path=waiver_path, today_override=args.today)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())