C0: review-compliance-reports — summarize image and IaC scans

Previously only the K8s CIS in-cluster scan was processed; the weekly
container-image and IaC Prowler scans were running on schedule but never
reviewed. Now each scan gets its own status / severity / week-over-week
delta, with top-N grouped tables (by check ID and resource) for the
high-volume image and IaC outputs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erich Blume 2026-04-27 12:18:06 -07:00
commit 718e0a0043
2 changed files with 321 additions and 205 deletions

View file

@ -0,0 +1 @@
`review-compliance-reports` now also fetches and summarizes the weekly Prowler container-image and IaC scans (previously only the K8s CIS in-cluster scan was processed). For each scan it shows status counts, severity breakdown, week-over-week delta, and — for the high-volume image/IaC scans — top-N tables grouped by check ID and resource instead of per-finding listings.

View file

@ -9,23 +9,26 @@
"""Fetch and summarize compliance reports from sifaka.
Covers:
- Prowler K8s CIS: CSV-based, full analysis with delta tracking
- Prowler K8s CIS (in-cluster): per-finding detail
- Prowler container image scans: grouped by check + resource
- Prowler IaC manifest scans: grouped by check + resource
- Kingfisher secret scanning: TODO — pending upstream JSON/CSV output
support (currently HTML-only; contribute from spork)
For Prowler, copies the two most recent K8s CIS reports, parses them,
and displays:
For each Prowler scan, copies the two most recent CSV reports, parses
them, and displays:
1. Overall status (pass/fail/manual/muted counts)
2. Unmuted failures by severity
3. Delta from the previous report (new vs resolved)
4. Actionable unmuted failures with details
4. Actionable unmuted failures (per-finding for in-cluster; grouped
by check ID and resource for image/IaC because they have far too
many findings to list individually)
This is the primary tool for the weekly compliance report review.
"""
import csv
import subprocess
import sys
import tempfile
from collections import Counter
from pathlib import Path
@ -36,7 +39,12 @@ from rich.console import Console
from rich.panel import Panel
from rich.table import Table
REPORT_BASE = "sifaka:/volume1/reports/prowler"
PROWLER_SCANS: list[tuple[str, str, bool]] = [
# (label, sifaka base path, group_findings)
("K8s CIS (In-Cluster)", "/volume1/reports/prowler", False),
("Container Images", "/volume1/reports/prowler-images", True),
("IaC (manifests)", "/volume1/reports/prowler-iac", True),
]
console = Console()
@ -52,18 +60,18 @@ def scp(remote: str, local: str) -> bool:
return result.returncode == 0
def list_reports() -> list[str]:
"""List Prowler CSV reports on sifaka, sorted by embedded timestamp."""
def list_reports(base: str) -> list[str]:
"""List Prowler CSV reports under `base` on sifaka, sorted by timestamp."""
result = subprocess.run(
["ssh", "sifaka", "find /volume1/reports/prowler/ -name '*.csv' "
["ssh", "sifaka", f"find {base}/ -name '*.csv' "
"-not -path '*/compliance/*' -not -name '@*'"],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
console.print("[bold red]Failed to list reports on sifaka[/bold red]")
raise typer.Exit(code=1)
console.print(f"[bold red]Failed to list reports under {base}[/bold red]")
return []
csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()]
# Sort by the timestamp embedded in the filename (e.g. 20260405030007)
@ -306,136 +314,151 @@ def run_node_verification(console: Console) -> None:
console.print()
def main(
full: Annotated[
bool, typer.Option(help="Show all unmuted failures, not just new ones")
] = False,
show_muted: Annotated[
bool, typer.Option(help="Also show muted failures")
] = False,
SEVERITY_STYLE = {
"critical": "bold red",
"high": "red",
"medium": "yellow",
}
def _sev_style(sev: str) -> str:
return SEVERITY_STYLE.get(sev.lower(), "")
def summarize_report(
label: str,
base: str,
tmpdir: str,
*,
show_muted: bool = False,
group_findings: bool = False,
) -> None:
csvs = list_reports()
"""Fetch and summarize the latest Prowler report under `base`.
When `group_findings` is True, top-N CHECK_ID and RESOURCE_NAME tables
are shown instead of a per-finding detail table — appropriate for
image and IaC scans that produce thousands of findings.
"""
console.rule(f"[bold]{label}[/bold]")
csvs = list_reports(base)
if not csvs:
console.print("[bold red]No Prowler CSV reports found on sifaka[/bold red]")
raise typer.Exit(code=1)
with tempfile.TemporaryDirectory() as tmpdir:
# Fetch the two most recent reports
latest_remote = csvs[-1]
latest_local = Path(tmpdir) / "latest.csv"
console.print(f"[dim]Fetching {latest_remote}...[/dim]")
if not scp(f"sifaka:{latest_remote}", str(latest_local)):
console.print("[bold red]Failed to copy latest report[/bold red]")
raise typer.Exit(code=1)
prev_local = None
if len(csvs) >= 2:
prev_remote = csvs[-2]
prev_local = Path(tmpdir) / "prev.csv"
console.print(f"[dim]Fetching {prev_remote}...[/dim]")
if not scp(f"sifaka:{prev_remote}", str(prev_local)):
prev_local = None
latest = parse_findings(load_csv(str(latest_local)))
# Extract report date from filename
report_name = Path(latest_remote).stem
console.print()
# --- Overall status ---
status_table = Table(
show_header=True, header_style="bold", title=f"Report: {report_name}"
console.print(
f"[bold yellow]{label}: no Prowler CSV reports found "
f"under {base}[/bold yellow]"
)
status_table.add_column("Status")
status_table.add_column("Count", justify="right")
console.print()
return
for status in ["PASS", "FAIL", "MANUAL"]:
count = latest["statuses"].get(status, 0)
style = "red" if status == "FAIL" and count > 0 else ""
status_table.add_row(
f"[{style}]{status}[/{style}]" if style else status,
safe = "".join(c if c.isalnum() else "_" for c in label.lower())
latest_remote = csvs[-1]
latest_local = Path(tmpdir) / f"{safe}_latest.csv"
console.print(f"[dim]Fetching {latest_remote}...[/dim]")
if not scp(f"sifaka:{latest_remote}", str(latest_local)):
console.print(f"[bold red]Failed to copy {latest_remote}[/bold red]")
return
prev_local: Path | None = None
if len(csvs) >= 2:
prev_remote = csvs[-2]
prev_path = Path(tmpdir) / f"{safe}_prev.csv"
console.print(f"[dim]Fetching {prev_remote}...[/dim]")
if scp(f"sifaka:{prev_remote}", str(prev_path)):
prev_local = prev_path
latest = parse_findings(load_csv(str(latest_local)))
report_name = Path(latest_remote).stem
console.print()
# --- Overall status ---
status_table = Table(
show_header=True, header_style="bold", title=f"Report: {report_name}"
)
status_table.add_column("Status")
status_table.add_column("Count", justify="right")
for status in ["PASS", "FAIL", "MANUAL"]:
count = latest["statuses"].get(status, 0)
style = "red" if status == "FAIL" and count > 0 else ""
status_table.add_row(
f"[{style}]{status}[/{style}]" if style else status,
f"[{style}]{count}[/{style}]" if style else str(count),
)
muted_count = len(latest["muted"])
unmuted_count = len(latest["unmuted"])
status_table.add_row("", "")
status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]")
status_table.add_row(
"[bold]↳ unmuted (action needed)[/bold]",
f"[bold red]{unmuted_count}[/bold red]"
if unmuted_count > 0
else "[bold green]0[/bold green]",
)
status_table.add_row("", "")
status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]")
console.print(status_table)
console.print()
# --- Unmuted failures by severity ---
if latest["unmuted"]:
sev_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures by Severity",
)
sev_table.add_column("Severity")
sev_table.add_column("Count", justify="right")
for sev, count in sorted(
Counter(r["SEVERITY"] for r in latest["unmuted"]).items(),
key=lambda kv: severity_sort({"SEVERITY": kv[0]}),
):
style = _sev_style(sev)
sev_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
f"[{style}]{count}[/{style}]" if style else str(count),
)
fail_count = len(latest["fails"])
muted_count = len(latest["muted"])
unmuted_count = len(latest["unmuted"])
status_table.add_row("", "")
status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]")
status_table.add_row(
"[bold]↳ unmuted (action needed)[/bold]",
f"[bold red]{unmuted_count}[/bold red]"
if unmuted_count > 0
else "[bold green]0[/bold green]",
)
status_table.add_row("", "")
status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]")
console.print(status_table)
console.print(sev_table)
console.print()
# --- Unmuted failures by severity ---
if latest["unmuted"]:
sev_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures by Severity",
# --- Delta from previous report ---
if prev_local:
prev = parse_findings(load_csv(str(prev_local)))
prev_keys = {finding_key(r): r for r in prev["unmuted"]}
curr_keys = {finding_key(r): r for r in latest["unmuted"]}
new_keys = set(curr_keys.keys()) - set(prev_keys.keys())
resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys())
prev_name = Path(csvs[-2]).stem
delta_lines = [
f"Compared against: [dim]{prev_name}[/dim]",
"",
f"Previous unmuted FAILs: {len(prev['unmuted'])}",
f"Current unmuted FAILs: {len(latest['unmuted'])}",
f"[green]Resolved: {len(resolved_keys)}[/green]",
f"[red]New: {len(new_keys)}[/red]"
if new_keys
else "[green]New: 0[/green]",
]
console.print(
Panel(
"\n".join(delta_lines),
title="[bold]Week-over-Week Delta (unmuted only)[/bold]",
border_style="cyan",
)
sev_table.add_column("Severity")
sev_table.add_column("Count", justify="right")
for sev, count in Counter(
r["SEVERITY"] for r in latest["unmuted"]
).most_common():
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
sev_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
f"[{style}]{count}[/{style}]" if style else str(count),
)
console.print(sev_table)
console.print()
# --- Delta from previous report ---
if prev_local:
prev = parse_findings(load_csv(str(prev_local)))
prev_keys = {finding_key(r): r for r in prev["unmuted"]}
curr_keys = {finding_key(r): r for r in latest["unmuted"]}
new_keys = set(curr_keys.keys()) - set(prev_keys.keys())
resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys())
prev_name = Path(csvs[-2]).stem
delta_lines = [
f"Compared against: [dim]{prev_name}[/dim]",
"",
f"Previous unmuted FAILs: {len(prev['unmuted'])}",
f"Current unmuted FAILs: {len(latest['unmuted'])}",
f"[green]Resolved: {len(resolved_keys)}[/green]",
f"[red]New: {len(new_keys)}[/red]"
if new_keys
else f"[green]New: 0[/green]",
]
console.print(
Panel(
"\n".join(delta_lines),
title="[bold]Week-over-Week Delta (unmuted only)[/bold]",
border_style="cyan",
)
)
console.print()
)
console.print()
# For grouped scans the new/resolved listings are too noisy
# (potentially thousands of lines). Skip the listings; the count
# is in the panel above and detail is in the grouped tables.
if not group_findings:
if new_keys:
console.print("[bold red]New Unmuted Failures:[/bold red]")
for k in sorted(new_keys):
@ -456,85 +479,177 @@ def main(
)
console.print()
# --- Unmuted failure details ---
findings_to_show = latest["unmuted"] if full else []
if not full and latest["unmuted"]:
findings_to_show = latest["unmuted"]
if findings_to_show:
detail_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures — Action Needed",
)
detail_table.add_column("Severity")
detail_table.add_column("Check")
detail_table.add_column("Resource")
detail_table.add_column("Detail", max_width=60)
for r in sorted(findings_to_show, key=severity_sort):
sev = r["SEVERITY"]
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
detail_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
r["CHECK_ID"],
r.get("RESOURCE_NAME", ""),
r["STATUS_EXTENDED"][:60],
)
console.print(detail_table)
console.print()
# --- Muted findings summary ---
if show_muted and latest["muted"]:
muted_table = Table(
show_header=True,
header_style="bold",
title="Muted Failures (for reference)",
)
muted_table.add_column("Severity")
muted_table.add_column("Check")
muted_table.add_column("Count", justify="right")
muted_groups: dict[tuple[str, str], int] = Counter()
for r in latest["muted"]:
muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1
for (sev, check), count in sorted(
muted_groups.items(), key=lambda x: severity_sort({"SEVERITY": x[0][0]})
):
muted_table.add_row(f"[dim]{sev}[/dim]", f"[dim]{check}[/dim]", f"[dim]{count}[/dim]")
console.print(muted_table)
console.print()
# --- Verdict ---
if not latest["unmuted"]:
console.print(
Panel(
"[bold green]All clear.[/bold green] No unmuted failures.",
title="Prowler Verdict",
border_style="green",
)
)
# --- Unmuted failure details (grouped or per-finding) ---
if latest["unmuted"]:
if group_findings:
_print_grouped_findings(latest["unmuted"])
else:
console.print(
Panel(
f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) "
f"need triage.[/bold yellow]\n\n"
"For each: remediate (fix the pod spec) or mute "
"(add to mutelist + compensating control).",
title="Prowler Verdict",
border_style="yellow",
)
_print_findings_detail(latest["unmuted"])
# --- Muted findings summary ---
if show_muted and latest["muted"]:
muted_table = Table(
show_header=True,
header_style="bold",
title="Muted Failures (for reference)",
)
muted_table.add_column("Severity")
muted_table.add_column("Check")
muted_table.add_column("Count", justify="right")
muted_groups: dict[tuple[str, str], int] = Counter()
for r in latest["muted"]:
muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1
for (sev, check), count in sorted(
muted_groups.items(),
key=lambda x: severity_sort({"SEVERITY": x[0][0]}),
):
muted_table.add_row(
f"[dim]{sev}[/dim]",
f"[dim]{check}[/dim]",
f"[dim]{count}[/dim]",
)
console.print(muted_table)
console.print()
# --- Verdict ---
if not latest["unmuted"]:
console.print(
Panel(
"[bold green]All clear.[/bold green] No unmuted failures.",
title=f"{label} Verdict",
border_style="green",
)
)
else:
console.print(
Panel(
f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) "
f"need triage.[/bold yellow]\n\n"
"For each: remediate or mute "
"(add to mutelist + compensating control).",
title=f"{label} Verdict",
border_style="yellow",
)
)
console.print()
def _print_findings_detail(unmuted: list[dict]) -> None:
"""Per-finding detail table — appropriate when finding count is small."""
detail_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures — Action Needed",
)
detail_table.add_column("Severity")
detail_table.add_column("Check")
detail_table.add_column("Resource")
detail_table.add_column("Detail", max_width=60)
for r in sorted(unmuted, key=severity_sort):
sev = r["SEVERITY"]
style = _sev_style(sev)
detail_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
r["CHECK_ID"],
r.get("RESOURCE_NAME", ""),
r["STATUS_EXTENDED"][:60],
)
console.print(detail_table)
console.print()
def _worst_severity(rows: list[dict]) -> str:
"""Return the most severe severity label across `rows`."""
if not rows:
return ""
return min(
(r["SEVERITY"] for r in rows),
key=lambda s: severity_sort({"SEVERITY": s}),
)
def _print_grouped_findings(unmuted: list[dict], top_n: int = 15) -> None:
"""Top-N tables grouped by CHECK_ID and RESOURCE_NAME.
Used for image and IaC scans where per-finding tables would be too
large to be useful. Shows count and worst severity for each group.
"""
by_check: dict[str, list[dict]] = {}
by_resource: dict[str, list[dict]] = {}
for r in unmuted:
by_check.setdefault(r["CHECK_ID"], []).append(r)
by_resource.setdefault(r.get("RESOURCE_NAME", "") or "(no resource)", []).append(r)
check_table = Table(
show_header=True,
header_style="bold",
title=f"Top {top_n} Checks by Unmuted Finding Count",
)
check_table.add_column("Worst Sev")
check_table.add_column("Check ID")
check_table.add_column("Count", justify="right")
for check, rows in sorted(
by_check.items(), key=lambda kv: -len(kv[1])
)[:top_n]:
worst = _worst_severity(rows)
style = _sev_style(worst)
check_table.add_row(
f"[{style}]{worst}[/{style}]" if style else worst,
check,
str(len(rows)),
)
console.print(check_table)
console.print()
res_table = Table(
show_header=True,
header_style="bold",
title=f"Top {top_n} Resources by Unmuted Finding Count",
)
res_table.add_column("Worst Sev")
res_table.add_column("Resource")
res_table.add_column("Count", justify="right")
for resource, rows in sorted(
by_resource.items(), key=lambda kv: -len(kv[1])
)[:top_n]:
worst = _worst_severity(rows)
style = _sev_style(worst)
res_table.add_row(
f"[{style}]{worst}[/{style}]" if style else worst,
resource[:80],
str(len(rows)),
)
console.print(res_table)
console.print()
def main(
full: Annotated[
bool, typer.Option(help="(reserved) currently a no-op; all unmuted failures already shown")
] = False,
show_muted: Annotated[
bool, typer.Option(help="Also show muted failures")
] = False,
) -> None:
del full # historical flag, kept for backwards compatibility
with tempfile.TemporaryDirectory() as tmpdir:
for label, base, group in PROWLER_SCANS:
summarize_report(
label,
base,
tmpdir,
show_muted=show_muted,
group_findings=group,
)
# --- Node-level MANUAL check verification ---