174 lines
6.1 KiB
Python
174 lines
6.1 KiB
Python
"""Build a titan-iac supply-chain compliance report from Trivy evidence."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
FAIL_SEVERITIES = {"HIGH", "CRITICAL"}
|
|
|
|
|
|
def _read_json(path: Path) -> dict[str, Any]:
|
|
"""Read a JSON object from disk for use as pipeline evidence."""
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
raise ValueError(f"{path} must contain a JSON object")
|
|
return payload
|
|
|
|
|
|
def _parse_day(raw: str | None) -> dt.date | None:
|
|
"""Parse an ISO day while letting optional waiver dates stay optional."""
|
|
if not raw:
|
|
return None
|
|
return dt.date.fromisoformat(raw)
|
|
|
|
|
|
def _today(override: str | None = None) -> dt.date:
|
|
"""Return the policy day so tests can pin expiry behavior."""
|
|
return _parse_day(override) or dt.date.today()
|
|
|
|
|
|
def _load_waiver_pairs(path: Path | None, policy_day: dt.date) -> tuple[set[tuple[str, str]], int]:
|
|
"""Return active ``(misconfiguration id, target)`` waivers and expired count."""
|
|
if path is None or not path.exists():
|
|
return set(), 0
|
|
|
|
payload = _read_json(path)
|
|
default_expires_at = payload.get("default_expires_at")
|
|
active: set[tuple[str, str]] = set()
|
|
expired = 0
|
|
|
|
for entry in payload.get("misconfigurations", []):
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
misconfiguration_id = str(entry.get("id") or "").strip()
|
|
if not misconfiguration_id:
|
|
continue
|
|
expires_at = _parse_day(str(entry.get("expires_at") or default_expires_at or ""))
|
|
targets = entry.get("targets", [])
|
|
if not isinstance(targets, list):
|
|
continue
|
|
|
|
if expires_at and expires_at < policy_day:
|
|
expired += len(targets)
|
|
continue
|
|
|
|
# Waivers are target-specific so a new unsafe manifest fails until it is
|
|
# either fixed or deliberately accepted with a fresh expiration.
|
|
for target in targets:
|
|
if isinstance(target, str) and target:
|
|
active.add((misconfiguration_id, target))
|
|
|
|
return active, expired
|
|
|
|
|
|
def _iter_failed_misconfigurations(payload: dict[str, Any]):
|
|
"""Yield failed high/critical Trivy misconfiguration records."""
|
|
for result in payload.get("Results", []):
|
|
if not isinstance(result, dict):
|
|
continue
|
|
target = str(result.get("Target") or "")
|
|
for item in result.get("Misconfigurations") or []:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if item.get("Status") != "FAIL":
|
|
continue
|
|
if str(item.get("Severity") or "").upper() not in FAIL_SEVERITIES:
|
|
continue
|
|
yield target, item
|
|
|
|
|
|
def _count_vulnerabilities(payload: dict[str, Any], severity: str) -> int:
|
|
"""Count Trivy vulnerabilities at a specific severity."""
|
|
count = 0
|
|
for result in payload.get("Results", []):
|
|
if not isinstance(result, dict):
|
|
continue
|
|
for item in result.get("Vulnerabilities") or []:
|
|
if isinstance(item, dict) and str(item.get("Severity") or "").upper() == severity:
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def _count_secrets(payload: dict[str, Any]) -> int:
|
|
"""Count detected secrets in the Trivy filesystem report."""
|
|
count = 0
|
|
for result in payload.get("Results", []):
|
|
if isinstance(result, dict):
|
|
count += len(result.get("Secrets") or [])
|
|
return count
|
|
|
|
|
|
def build_report(
|
|
trivy_payload: dict[str, Any],
|
|
waiver_path: Path | None = None,
|
|
today_override: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Build the compliance summary consumed by the quality gate."""
|
|
policy_day = _today(today_override)
|
|
active_waivers, expired_waivers = _load_waiver_pairs(waiver_path, policy_day)
|
|
|
|
open_misconfigs: list[dict[str, str]] = []
|
|
waived_misconfigs = 0
|
|
for target, item in _iter_failed_misconfigurations(trivy_payload):
|
|
misconfiguration_id = str(item.get("ID") or "")
|
|
if (misconfiguration_id, target) in active_waivers:
|
|
waived_misconfigs += 1
|
|
continue
|
|
open_misconfigs.append(
|
|
{
|
|
"id": misconfiguration_id,
|
|
"target": target,
|
|
"severity": str(item.get("Severity") or ""),
|
|
"title": str(item.get("Title") or ""),
|
|
}
|
|
)
|
|
|
|
critical = _count_vulnerabilities(trivy_payload, "CRITICAL")
|
|
high = _count_vulnerabilities(trivy_payload, "HIGH")
|
|
secrets = _count_secrets(trivy_payload)
|
|
status = "ok" if critical == 0 and secrets == 0 and not open_misconfigs else "failed"
|
|
|
|
return {
|
|
"status": status,
|
|
"compliant": status == "ok",
|
|
"category": "artifact_security",
|
|
"scan_type": "filesystem",
|
|
"scanner": "trivy",
|
|
"critical_vulnerabilities": critical,
|
|
"high_vulnerabilities": high,
|
|
"high_vulnerability_policy": "observe",
|
|
"secrets": secrets,
|
|
"high_or_critical_misconfigurations": len(open_misconfigs),
|
|
"waived_misconfigurations": waived_misconfigs,
|
|
"expired_waivers": expired_waivers,
|
|
"waiver_file": str(waiver_path) if waiver_path else "",
|
|
"open_misconfiguration_examples": open_misconfigs[:20],
|
|
}
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
"""CLI entrypoint used by Jenkins after the Trivy scan completes."""
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--trivy-json", required=True)
|
|
parser.add_argument("--waivers")
|
|
parser.add_argument("--output", required=True)
|
|
parser.add_argument("--today")
|
|
args = parser.parse_args(argv)
|
|
|
|
trivy_payload = _read_json(Path(args.trivy_json))
|
|
waiver_path = Path(args.waivers) if args.waivers else None
|
|
report = build_report(trivy_payload, waiver_path=waiver_path, today_override=args.today)
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
raise SystemExit(main())
|