Add review-compliance-reports task and reorganize report storage

New mise task fetches Prowler reports from sifaka, parses with proper
muted/unmuted distinction, shows week-over-week delta, and includes
a scaffold for Kingfisher once JSON/CSV output is available upstream.

Moved all legacy top-level reports on sifaka into date subdirectories
to match the current CronJob output structure. Updated
read-compliance-reports doc with task reference and links.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Erich Blume 2026-04-06 10:16:46 -07:00
commit a059d81314
3 changed files with 391 additions and 3 deletions

View file

@ -0,0 +1,378 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["rich>=14.0.0", "typer>=0.24.0"]
# ///
#MISE description="Summarize the latest Prowler and Kingfisher compliance reports from sifaka"
#USAGE flag "--full" help="Show all unmuted failures, not just new ones"
#USAGE flag "--show-muted" help="Also show muted failures"
"""Fetch and summarize compliance reports from sifaka.
Covers:
- Prowler K8s CIS: CSV-based, full analysis with delta tracking
- Kingfisher secret scanning: TODO — pending upstream JSON/CSV output
support (currently HTML-only; contribute from spork)
For Prowler, copies the two most recent K8s CIS reports, parses them,
and displays:
1. Overall status (pass/fail/manual/muted counts)
2. Unmuted failures by severity
3. Delta from the previous report (new vs resolved)
4. Actionable unmuted failures with details
This is the primary tool for the weekly compliance report review.
"""
import csv
import subprocess
import sys
import tempfile
from collections import Counter
from pathlib import Path
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
REPORT_BASE = "sifaka:/volume1/reports/prowler"
console = Console()
def scp(remote: str, local: str) -> bool:
"""Copy a file from sifaka (requires scp -O for Synology)."""
result = subprocess.run(
["scp", "-O", remote, local],
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
def list_reports() -> list[str]:
"""List Prowler CSV reports on sifaka, sorted by embedded timestamp."""
result = subprocess.run(
["ssh", "sifaka", "find /volume1/reports/prowler/ -name '*.csv' "
"-not -path '*/compliance/*' -not -name '@*'"],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
console.print("[bold red]Failed to list reports on sifaka[/bold red]")
raise typer.Exit(code=1)
csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()]
# Sort by the timestamp embedded in the filename (e.g. 20260405030007)
import re
def sort_key(path: str) -> str:
m = re.search(r"(\d{14})", Path(path).name)
return m.group(1) if m else Path(path).name
return sorted(csvs, key=sort_key)
def load_csv(path: str) -> list[dict]:
"""Load a Prowler CSV report."""
with open(path) as f:
return list(csv.DictReader(f, delimiter=";"))
def parse_findings(rows: list[dict]) -> dict:
"""Categorize findings from a report."""
statuses = Counter(r["STATUS"] for r in rows)
fails = [r for r in rows if r["STATUS"] == "FAIL"]
unmuted = [r for r in fails if r.get("MUTED", "") != "True"]
muted = [r for r in fails if r.get("MUTED", "") == "True"]
return {
"total": len(rows),
"statuses": statuses,
"fails": fails,
"unmuted": unmuted,
"muted": muted,
}
def finding_key(r: dict) -> tuple[str, str]:
"""Stable identity for a finding (check + resource name, not UID)."""
return (r["CHECK_ID"], r.get("RESOURCE_NAME", ""))
SEVERITY_ORDER = ["critical", "high", "medium", "low", "informational"]
def severity_sort(r: dict) -> int:
sev = r.get("SEVERITY", "").lower()
return SEVERITY_ORDER.index(sev) if sev in SEVERITY_ORDER else 99
def main(
full: Annotated[
bool, typer.Option(help="Show all unmuted failures, not just new ones")
] = False,
show_muted: Annotated[
bool, typer.Option(help="Also show muted failures")
] = False,
) -> None:
csvs = list_reports()
if not csvs:
console.print("[bold red]No Prowler CSV reports found on sifaka[/bold red]")
raise typer.Exit(code=1)
with tempfile.TemporaryDirectory() as tmpdir:
# Fetch the two most recent reports
latest_remote = csvs[-1]
latest_local = Path(tmpdir) / "latest.csv"
console.print(f"[dim]Fetching {latest_remote}...[/dim]")
if not scp(f"sifaka:{latest_remote}", str(latest_local)):
console.print("[bold red]Failed to copy latest report[/bold red]")
raise typer.Exit(code=1)
prev_local = None
if len(csvs) >= 2:
prev_remote = csvs[-2]
prev_local = Path(tmpdir) / "prev.csv"
console.print(f"[dim]Fetching {prev_remote}...[/dim]")
if not scp(f"sifaka:{prev_remote}", str(prev_local)):
prev_local = None
latest = parse_findings(load_csv(str(latest_local)))
# Extract report date from filename
report_name = Path(latest_remote).stem
console.print()
# --- Overall status ---
status_table = Table(
show_header=True, header_style="bold", title=f"Report: {report_name}"
)
status_table.add_column("Status")
status_table.add_column("Count", justify="right")
for status in ["PASS", "FAIL", "MANUAL"]:
count = latest["statuses"].get(status, 0)
style = "red" if status == "FAIL" and count > 0 else ""
status_table.add_row(
f"[{style}]{status}[/{style}]" if style else status,
f"[{style}]{count}[/{style}]" if style else str(count),
)
fail_count = len(latest["fails"])
muted_count = len(latest["muted"])
unmuted_count = len(latest["unmuted"])
status_table.add_row("", "")
status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]")
status_table.add_row(
"[bold]↳ unmuted (action needed)[/bold]",
f"[bold red]{unmuted_count}[/bold red]"
if unmuted_count > 0
else "[bold green]0[/bold green]",
)
status_table.add_row("", "")
status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]")
console.print(status_table)
console.print()
# --- Unmuted failures by severity ---
if latest["unmuted"]:
sev_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures by Severity",
)
sev_table.add_column("Severity")
sev_table.add_column("Count", justify="right")
for sev, count in Counter(
r["SEVERITY"] for r in latest["unmuted"]
).most_common():
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
sev_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
f"[{style}]{count}[/{style}]" if style else str(count),
)
console.print(sev_table)
console.print()
# --- Delta from previous report ---
if prev_local:
prev = parse_findings(load_csv(str(prev_local)))
prev_keys = {finding_key(r): r for r in prev["unmuted"]}
curr_keys = {finding_key(r): r for r in latest["unmuted"]}
new_keys = set(curr_keys.keys()) - set(prev_keys.keys())
resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys())
prev_name = Path(csvs[-2]).stem
delta_lines = [
f"Compared against: [dim]{prev_name}[/dim]",
"",
f"Previous unmuted FAILs: {len(prev['unmuted'])}",
f"Current unmuted FAILs: {len(latest['unmuted'])}",
f"[green]Resolved: {len(resolved_keys)}[/green]",
f"[red]New: {len(new_keys)}[/red]"
if new_keys
else f"[green]New: 0[/green]",
]
console.print(
Panel(
"\n".join(delta_lines),
title="[bold]Week-over-Week Delta (unmuted only)[/bold]",
border_style="cyan",
)
)
console.print()
if new_keys:
console.print("[bold red]New Unmuted Failures:[/bold red]")
for k in sorted(new_keys):
r = curr_keys[k]
console.print(
f" [{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}"
)
console.print()
if resolved_keys:
console.print("[bold green]Resolved:[/bold green]")
for k in sorted(resolved_keys):
r = prev_keys[k]
console.print(
f" [dim][{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}[/dim]"
)
console.print()
# --- Unmuted failure details ---
findings_to_show = latest["unmuted"] if full else []
if not full and latest["unmuted"]:
findings_to_show = latest["unmuted"]
if findings_to_show:
detail_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures — Action Needed",
)
detail_table.add_column("Severity")
detail_table.add_column("Check")
detail_table.add_column("Resource")
detail_table.add_column("Detail", max_width=60)
for r in sorted(findings_to_show, key=severity_sort):
sev = r["SEVERITY"]
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
detail_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
r["CHECK_ID"],
r.get("RESOURCE_NAME", ""),
r["STATUS_EXTENDED"][:60],
)
console.print(detail_table)
console.print()
# --- Muted findings summary ---
if show_muted and latest["muted"]:
muted_table = Table(
show_header=True,
header_style="bold",
title="Muted Failures (for reference)",
)
muted_table.add_column("Severity")
muted_table.add_column("Check")
muted_table.add_column("Count", justify="right")
muted_groups: dict[tuple[str, str], int] = Counter()
for r in latest["muted"]:
muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1
for (sev, check), count in sorted(
muted_groups.items(), key=lambda x: severity_sort({"SEVERITY": x[0][0]})
):
muted_table.add_row(f"[dim]{sev}[/dim]", f"[dim]{check}[/dim]", f"[dim]{count}[/dim]")
console.print(muted_table)
console.print()
# --- Verdict ---
if not latest["unmuted"]:
console.print(
Panel(
"[bold green]All clear.[/bold green] No unmuted failures.",
title="Prowler Verdict",
border_style="green",
)
)
else:
console.print(
Panel(
f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) "
f"need triage.[/bold yellow]\n\n"
"For each: remediate (fix the pod spec) or mute "
"(add to mutelist + compensating control).",
title="Prowler Verdict",
border_style="yellow",
)
)
# --- Kingfisher secret scanning ---
# TODO: Kingfisher currently only outputs HTML. Once JSON or CSV output
# is supported upstream (contribute from our spork), add parsing here:
#
# KINGFISHER_BASE = "/volume1/reports/kingfisher"
# - Fetch latest JSON/CSV from sifaka:{KINGFISHER_BASE}/
# - Parse findings: active vs inactive vs skipped validations
# - Flag any "Active Credential" findings as critical
# - Compare against previous scan for delta
# - Show summary panel similar to Prowler
#
# For now, check that a recent report exists and warn if missing.
kf_check = subprocess.run(
["ssh", "sifaka", "ls -1t /volume1/reports/kingfisher/ | head -1"],
capture_output=True,
text=True,
timeout=15,
)
kf_latest = kf_check.stdout.strip() if kf_check.returncode == 0 else ""
if kf_latest and kf_latest.startswith("202"):
console.print(
f"[dim]Kingfisher: latest report directory is {kf_latest} "
f"(HTML only — JSON/CSV pending upstream)[/dim]"
)
else:
console.print(
"[bold yellow]Warning: No recent Kingfisher report found on "
"sifaka. Check the CronJob on ringtail.[/bold yellow]"
)
if __name__ == "__main__":
typer.run(main)