#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = ["rich==15.0.0", "typer==0.25.0", "pyyaml==6.0.3"] # /// #MISE description="Summarize the latest Prowler and Kingfisher compliance reports from sifaka" #USAGE flag "--full" help="Show all unmuted failures, not just new ones" #USAGE flag "--show-muted" help="Also show muted failures" """Fetch and summarize compliance reports from sifaka. Covers: - Prowler K8s CIS (in-cluster): per-finding detail - Prowler container image scans: grouped by check + resource - Prowler IaC manifest scans: grouped by check + resource - Kingfisher secret scanning: TODO — pending upstream JSON/CSV output support (currently HTML-only; contribute from spork) For each Prowler scan, copies the two most recent CSV reports, parses them, and displays: 1. Overall status (pass/fail/manual/muted counts) 2. Unmuted failures by severity 3. Delta from the previous report (new vs resolved) 4. Actionable unmuted failures (per-finding for in-cluster; grouped by check ID and resource for image/IaC because they have far too many findings to list individually) This is the primary tool for the weekly compliance report review. """ import csv import subprocess import tempfile from collections import Counter from pathlib import Path from typing import Annotated import typer from rich.console import Console from rich.panel import Panel from rich.table import Table PROWLER_SCANS: list[tuple[str, str, bool]] = [ # (label, sifaka base path, group_findings) ("K8s CIS (In-Cluster)", "/volume1/reports/prowler", False), ("Container Images", "/volume1/reports/prowler-images", True), ("IaC (manifests)", "/volume1/reports/prowler-iac", True), ] console = Console() def scp(remote: str, local: str) -> bool: """Copy a file from sifaka (requires scp -O for Synology).""" result = subprocess.run( ["scp", "-O", remote, local], capture_output=True, text=True, timeout=30, ) return result.returncode == 0 def list_reports(base: str) -> list[str]: """List Prowler CSV reports under `base` on sifaka, sorted by timestamp.""" result = subprocess.run( ["ssh", "sifaka", f"find {base}/ -name '*.csv' " "-not -path '*/compliance/*' -not -name '@*'"], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: console.print(f"[bold red]Failed to list reports under {base}[/bold red]") return [] csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] # Sort by the timestamp embedded in the filename (e.g. 20260405030007) import re def sort_key(path: str) -> str: m = re.search(r"(\d{14})", Path(path).name) return m.group(1) if m else Path(path).name return sorted(csvs, key=sort_key) def load_csv(path: str) -> list[dict]: """Load a Prowler CSV report.""" with open(path) as f: return list(csv.DictReader(f, delimiter=";")) def parse_findings(rows: list[dict]) -> dict: """Categorize findings from a report.""" statuses = Counter(r["STATUS"] for r in rows) fails = [r for r in rows if r["STATUS"] == "FAIL"] unmuted = [r for r in fails if r.get("MUTED", "") != "True"] muted = [r for r in fails if r.get("MUTED", "") == "True"] return { "total": len(rows), "statuses": statuses, "fails": fails, "unmuted": unmuted, "muted": muted, } def finding_key(r: dict) -> tuple[str, str]: """Stable identity for a finding (check + resource name, not UID).""" return (r["CHECK_ID"], r.get("RESOURCE_NAME", "")) SEVERITY_ORDER = ["critical", "high", "medium", "low", "informational"] def severity_sort(r: dict) -> int: sev = r.get("SEVERITY", "").lower() return SEVERITY_ORDER.index(sev) if sev in SEVERITY_ORDER else 99 def _ssh_minikube(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess: """Run a command inside the minikube node via SSH.""" return subprocess.run( ["ssh", "indri", f"minikube ssh -- {cmd}"], capture_output=True, text=True, timeout=timeout, ) def _kubectl(args: str, timeout: int = 15) -> subprocess.CompletedProcess: """Run a kubectl command against minikube-indri.""" return subprocess.run( ["kubectl", "--context=minikube-indri"] + args.split(), capture_output=True, text=True, timeout=timeout, ) def run_node_verification(console: Console) -> None: """Verify node-level conditions that Prowler reports as MANUAL. Prowler runs inside a pod and can't evaluate kubelet file permissions, kubelet config arguments, etcd CA separation, or cluster-admin RBAC bindings. We SSH into the minikube node and check each condition here, failing loudly if any deviates from expected values. """ checks: list[tuple[str, str, bool]] = [] # (name, detail, passed) # --- File ownership and permissions --- file_expectations = [ ("kubelet.conf ownership", "/etc/kubernetes/kubelet.conf", "root:root", None), ("kubelet.conf permissions", "/etc/kubernetes/kubelet.conf", None, "600"), ("config.yaml ownership", "/var/lib/kubelet/config.yaml", "root:root", None), ("config.yaml permissions", "/var/lib/kubelet/config.yaml", None, "644"), ("kubelet service ownership", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", "root:root", None), ("kubelet service permissions", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", None, "644"), ] for name, path, expected_owner, expected_perms in file_expectations: if expected_owner: result = _ssh_minikube(f'"sudo stat -c %U:%G {path}"') else: result = _ssh_minikube(f'"sudo stat -c %a {path}"') if result.returncode != 0: checks.append((name, f"could not stat {path}", False)) else: actual = result.stdout.strip() expected = expected_owner or expected_perms passed = actual == expected checks.append((name, f"{actual} (expected {expected})", passed)) # --- Kubelet config arguments --- kubelet_result = _ssh_minikube('"sudo cat /var/lib/kubelet/config.yaml"') if kubelet_result.returncode != 0: checks.append(("kubelet config", "could not read config.yaml", False)) else: import yaml as _yaml try: kubelet_cfg = _yaml.safe_load(kubelet_result.stdout) or {} except Exception: kubelet_cfg = {} checks.append(("kubelet config parse", "failed to parse config.yaml", False)) # readOnlyPort: absent or 0 is safe rop = kubelet_cfg.get("readOnlyPort") checks.append(( "readOnlyPort", f"{rop!r} (absent or 0 is safe)", rop is None or rop == 0, )) # makeIPTablesUtilChains: absent (defaults true) or true miu = kubelet_cfg.get("makeIPTablesUtilChains") checks.append(( "makeIPTablesUtilChains", f"{miu!r} (absent or true is safe)", miu is None or miu is True, )) # eventRecordQPS: absent (defaults 5) or > 0 erq = kubelet_cfg.get("eventRecordQPS") checks.append(( "eventRecordQPS", f"{erq!r} (absent or > 0 is safe)", erq is None or (isinstance(erq, (int, float)) and erq > 0), )) # tlsCipherSuites: absent uses Go defaults (acceptable) tcs = kubelet_cfg.get("tlsCipherSuites") checks.append(( "tlsCipherSuites", "Go defaults" if tcs is None else f"{tcs!r}", True, # Go defaults are acceptable; explicit suites also fine )) # --- Etcd CA separation --- etcd_fp = _ssh_minikube( '"sudo openssl x509 -in /var/lib/minikube/certs/etcd/ca.crt -noout -fingerprint -sha256"' ) cluster_fp = _ssh_minikube( '"sudo openssl x509 -in /var/lib/minikube/certs/ca.crt -noout -fingerprint -sha256"' ) if etcd_fp.returncode != 0 or cluster_fp.returncode != 0: checks.append(("etcd CA separation", "could not read certificates", False)) else: etcd_hash = etcd_fp.stdout.strip() cluster_hash = cluster_fp.stdout.strip() different = etcd_hash != cluster_hash checks.append(( "etcd CA separation", "different CAs" if different else "SAME CA (unexpected)", different, )) # --- RBAC cluster-admin bindings --- expected_bindings = {"cluster-admin", "kubeadm:cluster-admins", "minikube-rbac"} # Use a jsonpath that emits "name\troleRef" pairs to avoid N+1 queries # Tab-separated because binding names can contain colons (e.g. kubeadm:cluster-admins) rb_result = subprocess.run( [ "kubectl", "--context=minikube-indri", "get", "clusterrolebindings", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{.roleRef.name}{'\\n'}{end}", ], capture_output=True, text=True, timeout=15, ) if rb_result.returncode != 0: checks.append(("cluster-admin bindings", "kubectl failed", False)) else: admin_bindings: set[str] = set() for line in rb_result.stdout.strip().splitlines(): if "\t" in line: name, role = line.split("\t", 1) if role == "cluster-admin": admin_bindings.add(name) unexpected = admin_bindings - expected_bindings if unexpected: checks.append(( "cluster-admin bindings", f"unexpected: {', '.join(sorted(unexpected))}", False, )) else: checks.append(( "cluster-admin bindings", f"only expected: {', '.join(sorted(admin_bindings))}", True, )) # --- Display results --- all_passed = all(passed for _, _, passed in checks) table = Table( show_header=True, header_style="bold", title="Node Verification (out-of-band checks for MANUAL findings)", ) table.add_column("Check") table.add_column("Detail") table.add_column("Result", justify="center") for name, detail, passed in checks: status = "[green]PASS[/green]" if passed else "[bold red]FAIL[/bold red]" table.add_row(name, detail, status) console.print(table) console.print() if all_passed: console.print( Panel( "[bold green]All node-level checks passed.[/bold green] " "Muted MANUAL findings are verified.", title="Node Verification Verdict", border_style="green", ) ) else: failed = [(n, d) for n, d, p in checks if not p] console.print( Panel( f"[bold red]{len(failed)} node-level check(s) FAILED.[/bold red]\n" "Review the failures above — muted MANUAL findings may no longer " "be valid.", title="Node Verification Verdict", border_style="red", ) ) console.print() SEVERITY_STYLE = { "critical": "bold red", "high": "red", "medium": "yellow", } def _sev_style(sev: str) -> str: return SEVERITY_STYLE.get(sev.lower(), "") def summarize_report( label: str, base: str, tmpdir: str, *, show_muted: bool = False, group_findings: bool = False, ) -> None: """Fetch and summarize the latest Prowler report under `base`. When `group_findings` is True, top-N CHECK_ID and RESOURCE_NAME tables are shown instead of a per-finding detail table — appropriate for image and IaC scans that produce thousands of findings. """ console.rule(f"[bold]{label}[/bold]") csvs = list_reports(base) if not csvs: console.print( f"[bold yellow]{label}: no Prowler CSV reports found " f"under {base}[/bold yellow]" ) console.print() return safe = "".join(c if c.isalnum() else "_" for c in label.lower()) latest_remote = csvs[-1] latest_local = Path(tmpdir) / f"{safe}_latest.csv" console.print(f"[dim]Fetching {latest_remote}...[/dim]") if not scp(f"sifaka:{latest_remote}", str(latest_local)): console.print(f"[bold red]Failed to copy {latest_remote}[/bold red]") return prev_local: Path | None = None if len(csvs) >= 2: prev_remote = csvs[-2] prev_path = Path(tmpdir) / f"{safe}_prev.csv" console.print(f"[dim]Fetching {prev_remote}...[/dim]") if scp(f"sifaka:{prev_remote}", str(prev_path)): prev_local = prev_path latest = parse_findings(load_csv(str(latest_local))) report_name = Path(latest_remote).stem console.print() # --- Overall status --- status_table = Table( show_header=True, header_style="bold", title=f"Report: {report_name}" ) status_table.add_column("Status") status_table.add_column("Count", justify="right") for status in ["PASS", "FAIL", "MANUAL"]: count = latest["statuses"].get(status, 0) style = "red" if status == "FAIL" and count > 0 else "" status_table.add_row( f"[{style}]{status}[/{style}]" if style else status, f"[{style}]{count}[/{style}]" if style else str(count), ) muted_count = len(latest["muted"]) unmuted_count = len(latest["unmuted"]) status_table.add_row("", "") status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]") status_table.add_row( "[bold]↳ unmuted (action needed)[/bold]", f"[bold red]{unmuted_count}[/bold red]" if unmuted_count > 0 else "[bold green]0[/bold green]", ) status_table.add_row("", "") status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]") console.print(status_table) console.print() # --- Unmuted failures by severity --- if latest["unmuted"]: sev_table = Table( show_header=True, header_style="bold", title="Unmuted Failures by Severity", ) sev_table.add_column("Severity") sev_table.add_column("Count", justify="right") for sev, count in sorted( Counter(r["SEVERITY"] for r in latest["unmuted"]).items(), key=lambda kv: severity_sort({"SEVERITY": kv[0]}), ): style = _sev_style(sev) sev_table.add_row( f"[{style}]{sev}[/{style}]" if style else sev, f"[{style}]{count}[/{style}]" if style else str(count), ) console.print(sev_table) console.print() # --- Delta from previous report --- if prev_local: prev = parse_findings(load_csv(str(prev_local))) prev_keys = {finding_key(r): r for r in prev["unmuted"]} curr_keys = {finding_key(r): r for r in latest["unmuted"]} new_keys = set(curr_keys.keys()) - set(prev_keys.keys()) resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys()) prev_name = Path(csvs[-2]).stem delta_lines = [ f"Compared against: [dim]{prev_name}[/dim]", "", f"Previous unmuted FAILs: {len(prev['unmuted'])}", f"Current unmuted FAILs: {len(latest['unmuted'])}", f"[green]Resolved: {len(resolved_keys)}[/green]", f"[red]New: {len(new_keys)}[/red]" if new_keys else "[green]New: 0[/green]", ] console.print( Panel( "\n".join(delta_lines), title="[bold]Week-over-Week Delta (unmuted only)[/bold]", border_style="cyan", ) ) console.print() # For grouped scans the new/resolved listings are too noisy # (potentially thousands of lines). Skip the listings; the count # is in the panel above and detail is in the grouped tables. if not group_findings: if new_keys: console.print("[bold red]New Unmuted Failures:[/bold red]") for k in sorted(new_keys): r = curr_keys[k] console.print( f" [{r['SEVERITY']}] {r['CHECK_ID']}: " f"{r['STATUS_EXTENDED'][:120]}" ) console.print() if resolved_keys: console.print("[bold green]Resolved:[/bold green]") for k in sorted(resolved_keys): r = prev_keys[k] console.print( f" [dim][{r['SEVERITY']}] {r['CHECK_ID']}: " f"{r['STATUS_EXTENDED'][:120]}[/dim]" ) console.print() # --- Unmuted failure details (grouped or per-finding) --- if latest["unmuted"]: if group_findings: _print_grouped_findings(latest["unmuted"]) else: _print_findings_detail(latest["unmuted"]) # --- Muted findings summary --- if show_muted and latest["muted"]: muted_table = Table( show_header=True, header_style="bold", title="Muted Failures (for reference)", ) muted_table.add_column("Severity") muted_table.add_column("Check") muted_table.add_column("Count", justify="right") muted_groups: dict[tuple[str, str], int] = Counter() for r in latest["muted"]: muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1 for (sev, check), count in sorted( muted_groups.items(), key=lambda x: severity_sort({"SEVERITY": x[0][0]}), ): muted_table.add_row( f"[dim]{sev}[/dim]", f"[dim]{check}[/dim]", f"[dim]{count}[/dim]", ) console.print(muted_table) console.print() # --- Verdict --- if not latest["unmuted"]: console.print( Panel( "[bold green]All clear.[/bold green] No unmuted failures.", title=f"{label} Verdict", border_style="green", ) ) else: console.print( Panel( f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) " f"need triage.[/bold yellow]\n\n" "For each: remediate, or add a Resource entry to the " "matching check in argocd/manifests/prowler/mutelist/.", title=f"{label} Verdict", border_style="yellow", ) ) console.print() def _print_findings_detail(unmuted: list[dict]) -> None: """Per-finding detail table — appropriate when finding count is small.""" detail_table = Table( show_header=True, header_style="bold", title="Unmuted Failures — Action Needed", ) detail_table.add_column("Severity") detail_table.add_column("Check") detail_table.add_column("Resource") detail_table.add_column("Detail", max_width=60) for r in sorted(unmuted, key=severity_sort): sev = r["SEVERITY"] style = _sev_style(sev) detail_table.add_row( f"[{style}]{sev}[/{style}]" if style else sev, r["CHECK_ID"], r.get("RESOURCE_NAME", ""), r["STATUS_EXTENDED"][:60], ) console.print(detail_table) console.print() def _worst_severity(rows: list[dict]) -> str: """Return the most severe severity label across `rows`.""" if not rows: return "" return min( (r["SEVERITY"] for r in rows), key=lambda s: severity_sort({"SEVERITY": s}), ) def _print_grouped_findings(unmuted: list[dict], top_n: int = 15) -> None: """Top-N tables grouped by CHECK_ID and RESOURCE_NAME. Used for image and IaC scans where per-finding tables would be too large to be useful. Shows count and worst severity for each group. """ by_check: dict[str, list[dict]] = {} by_resource: dict[str, list[dict]] = {} for r in unmuted: by_check.setdefault(r["CHECK_ID"], []).append(r) by_resource.setdefault(r.get("RESOURCE_NAME", "") or "(no resource)", []).append(r) check_table = Table( show_header=True, header_style="bold", title=f"Top {top_n} Checks by Unmuted Finding Count", ) check_table.add_column("Worst Sev") check_table.add_column("Check ID") check_table.add_column("Count", justify="right") for check, rows in sorted( by_check.items(), key=lambda kv: -len(kv[1]) )[:top_n]: worst = _worst_severity(rows) style = _sev_style(worst) check_table.add_row( f"[{style}]{worst}[/{style}]" if style else worst, check, str(len(rows)), ) console.print(check_table) console.print() res_table = Table( show_header=True, header_style="bold", title=f"Top {top_n} Resources by Unmuted Finding Count", ) res_table.add_column("Worst Sev") res_table.add_column("Resource") res_table.add_column("Count", justify="right") for resource, rows in sorted( by_resource.items(), key=lambda kv: -len(kv[1]) )[:top_n]: worst = _worst_severity(rows) style = _sev_style(worst) res_table.add_row( f"[{style}]{worst}[/{style}]" if style else worst, resource[:80], str(len(rows)), ) console.print(res_table) console.print() def main( full: Annotated[ bool, typer.Option(help="(reserved) currently a no-op; all unmuted failures already shown") ] = False, show_muted: Annotated[ bool, typer.Option(help="Also show muted failures") ] = False, ) -> None: del full # historical flag, kept for backwards compatibility with tempfile.TemporaryDirectory() as tmpdir: for label, base, group in PROWLER_SCANS: summarize_report( label, base, tmpdir, show_muted=show_muted, group_findings=group, ) # --- Node-level MANUAL check verification --- # These checks verify conditions Prowler reports as MANUAL because it # runs inside a pod and cannot evaluate them directly. run_node_verification(console) # --- Kingfisher secret scanning --- # TODO: Kingfisher currently only outputs HTML. Once JSON or CSV output # is supported upstream (contribute from our spork), add parsing here: # # KINGFISHER_BASE = "/volume1/reports/kingfisher" # - Fetch latest JSON/CSV from sifaka:{KINGFISHER_BASE}/ # - Parse findings: active vs inactive vs skipped validations # - Flag any "Active Credential" findings as critical # - Compare against previous scan for delta # - Show summary panel similar to Prowler # # For now, check that a recent report exists and warn if missing. kf_check = subprocess.run( ["ssh", "sifaka", "ls -1t /volume1/reports/kingfisher/ | head -1"], capture_output=True, text=True, timeout=15, ) kf_latest = kf_check.stdout.strip() if kf_check.returncode == 0 else "" if kf_latest and kf_latest.startswith("202"): console.print( f"[dim]Kingfisher: latest report directory is {kf_latest} " f"(HTML only — JSON/CSV pending upstream)[/dim]" ) else: console.print( "[bold yellow]Warning: No recent Kingfisher report found on " "sifaka. Check the CronJob on ringtail.[/bold yellow]" ) if __name__ == "__main__": typer.run(main)