#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = ["rich>=14.0.0", "typer>=0.24.0", "pyyaml>=6.0"] # /// #MISE description="Summarize the latest Prowler and Kingfisher compliance reports from sifaka" #USAGE flag "--full" help="Show all unmuted failures, not just new ones" #USAGE flag "--show-muted" help="Also show muted failures" """Fetch and summarize compliance reports from sifaka. Covers: - Prowler K8s CIS: CSV-based, full analysis with delta tracking - Kingfisher secret scanning: TODO — pending upstream JSON/CSV output support (currently HTML-only; contribute from spork) For Prowler, copies the two most recent K8s CIS reports, parses them, and displays: 1. Overall status (pass/fail/manual/muted counts) 2. Unmuted failures by severity 3. Delta from the previous report (new vs resolved) 4. Actionable unmuted failures with details This is the primary tool for the weekly compliance report review. """ import csv import subprocess import sys import tempfile from collections import Counter from pathlib import Path from typing import Annotated import typer from rich.console import Console from rich.panel import Panel from rich.table import Table REPORT_BASE = "sifaka:/volume1/reports/prowler" console = Console() def scp(remote: str, local: str) -> bool: """Copy a file from sifaka (requires scp -O for Synology).""" result = subprocess.run( ["scp", "-O", remote, local], capture_output=True, text=True, timeout=30, ) return result.returncode == 0 def list_reports() -> list[str]: """List Prowler CSV reports on sifaka, sorted by embedded timestamp.""" result = subprocess.run( ["ssh", "sifaka", "find /volume1/reports/prowler/ -name '*.csv' " "-not -path '*/compliance/*' -not -name '@*'"], capture_output=True, text=True, timeout=15, ) if result.returncode != 0: console.print("[bold red]Failed to list reports on sifaka[/bold red]") raise typer.Exit(code=1) csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()] # Sort by the timestamp embedded in the filename (e.g. 20260405030007) import re def sort_key(path: str) -> str: m = re.search(r"(\d{14})", Path(path).name) return m.group(1) if m else Path(path).name return sorted(csvs, key=sort_key) def load_csv(path: str) -> list[dict]: """Load a Prowler CSV report.""" with open(path) as f: return list(csv.DictReader(f, delimiter=";")) def parse_findings(rows: list[dict]) -> dict: """Categorize findings from a report.""" statuses = Counter(r["STATUS"] for r in rows) fails = [r for r in rows if r["STATUS"] == "FAIL"] unmuted = [r for r in fails if r.get("MUTED", "") != "True"] muted = [r for r in fails if r.get("MUTED", "") == "True"] return { "total": len(rows), "statuses": statuses, "fails": fails, "unmuted": unmuted, "muted": muted, } def finding_key(r: dict) -> tuple[str, str]: """Stable identity for a finding (check + resource name, not UID).""" return (r["CHECK_ID"], r.get("RESOURCE_NAME", "")) SEVERITY_ORDER = ["critical", "high", "medium", "low", "informational"] def severity_sort(r: dict) -> int: sev = r.get("SEVERITY", "").lower() return SEVERITY_ORDER.index(sev) if sev in SEVERITY_ORDER else 99 def _ssh_minikube(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess: """Run a command inside the minikube node via SSH.""" return subprocess.run( ["ssh", "indri", f"minikube ssh -- {cmd}"], capture_output=True, text=True, timeout=timeout, ) def _kubectl(args: str, timeout: int = 15) -> subprocess.CompletedProcess: """Run a kubectl command against minikube-indri.""" return subprocess.run( ["kubectl", "--context=minikube-indri"] + args.split(), capture_output=True, text=True, timeout=timeout, ) def run_node_verification(console: Console) -> None: """Verify node-level conditions that Prowler reports as MANUAL. Compensating control: node-config-automated-verification """ checks: list[tuple[str, str, bool]] = [] # (name, detail, passed) # --- File ownership and permissions --- file_expectations = [ ("kubelet.conf ownership", "/etc/kubernetes/kubelet.conf", "root:root", None), ("kubelet.conf permissions", "/etc/kubernetes/kubelet.conf", None, "600"), ("config.yaml ownership", "/var/lib/kubelet/config.yaml", "root:root", None), ("config.yaml permissions", "/var/lib/kubelet/config.yaml", None, "644"), ("kubelet service ownership", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", "root:root", None), ("kubelet service permissions", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", None, "644"), ] for name, path, expected_owner, expected_perms in file_expectations: if expected_owner: result = _ssh_minikube(f'"sudo stat -c %U:%G {path}"') else: result = _ssh_minikube(f'"sudo stat -c %a {path}"') if result.returncode != 0: checks.append((name, f"could not stat {path}", False)) else: actual = result.stdout.strip() expected = expected_owner or expected_perms passed = actual == expected checks.append((name, f"{actual} (expected {expected})", passed)) # --- Kubelet config arguments --- kubelet_result = _ssh_minikube('"sudo cat /var/lib/kubelet/config.yaml"') if kubelet_result.returncode != 0: checks.append(("kubelet config", "could not read config.yaml", False)) else: import yaml as _yaml try: kubelet_cfg = _yaml.safe_load(kubelet_result.stdout) or {} except Exception: kubelet_cfg = {} checks.append(("kubelet config parse", "failed to parse config.yaml", False)) # readOnlyPort: absent or 0 is safe rop = kubelet_cfg.get("readOnlyPort") checks.append(( "readOnlyPort", f"{rop!r} (absent or 0 is safe)", rop is None or rop == 0, )) # makeIPTablesUtilChains: absent (defaults true) or true miu = kubelet_cfg.get("makeIPTablesUtilChains") checks.append(( "makeIPTablesUtilChains", f"{miu!r} (absent or true is safe)", miu is None or miu is True, )) # eventRecordQPS: absent (defaults 5) or > 0 erq = kubelet_cfg.get("eventRecordQPS") checks.append(( "eventRecordQPS", f"{erq!r} (absent or > 0 is safe)", erq is None or (isinstance(erq, (int, float)) and erq > 0), )) # tlsCipherSuites: absent uses Go defaults (acceptable) tcs = kubelet_cfg.get("tlsCipherSuites") checks.append(( "tlsCipherSuites", "Go defaults" if tcs is None else f"{tcs!r}", True, # Go defaults are acceptable; explicit suites also fine )) # --- Etcd CA separation --- etcd_fp = _ssh_minikube( '"sudo openssl x509 -in /var/lib/minikube/certs/etcd/ca.crt -noout -fingerprint -sha256"' ) cluster_fp = _ssh_minikube( '"sudo openssl x509 -in /var/lib/minikube/certs/ca.crt -noout -fingerprint -sha256"' ) if etcd_fp.returncode != 0 or cluster_fp.returncode != 0: checks.append(("etcd CA separation", "could not read certificates", False)) else: etcd_hash = etcd_fp.stdout.strip() cluster_hash = cluster_fp.stdout.strip() different = etcd_hash != cluster_hash checks.append(( "etcd CA separation", "different CAs" if different else "SAME CA (unexpected)", different, )) # --- RBAC cluster-admin bindings --- expected_bindings = {"cluster-admin", "kubeadm:cluster-admins", "minikube-rbac"} # Use a jsonpath that emits "name\troleRef" pairs to avoid N+1 queries # Tab-separated because binding names can contain colons (e.g. kubeadm:cluster-admins) rb_result = subprocess.run( [ "kubectl", "--context=minikube-indri", "get", "clusterrolebindings", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{.roleRef.name}{'\\n'}{end}", ], capture_output=True, text=True, timeout=15, ) if rb_result.returncode != 0: checks.append(("cluster-admin bindings", "kubectl failed", False)) else: admin_bindings: set[str] = set() for line in rb_result.stdout.strip().splitlines(): if "\t" in line: name, role = line.split("\t", 1) if role == "cluster-admin": admin_bindings.add(name) unexpected = admin_bindings - expected_bindings if unexpected: checks.append(( "cluster-admin bindings", f"unexpected: {', '.join(sorted(unexpected))}", False, )) else: checks.append(( "cluster-admin bindings", f"only expected: {', '.join(sorted(admin_bindings))}", True, )) # --- Display results --- all_passed = all(passed for _, _, passed in checks) table = Table( show_header=True, header_style="bold", title="Node Verification (CC: node-config-automated-verification)", ) table.add_column("Check") table.add_column("Detail") table.add_column("Result", justify="center") for name, detail, passed in checks: status = "[green]PASS[/green]" if passed else "[bold red]FAIL[/bold red]" table.add_row(name, detail, status) console.print(table) console.print() if all_passed: console.print( Panel( "[bold green]All node-level checks passed.[/bold green] " "Muted MANUAL findings are verified.", title="Node Verification Verdict", border_style="green", ) ) else: failed = [(n, d) for n, d, p in checks if not p] console.print( Panel( f"[bold red]{len(failed)} node-level check(s) FAILED.[/bold red]\n" "Review the failures above — muted MANUAL findings may no longer " "be valid.", title="Node Verification Verdict", border_style="red", ) ) console.print() def main( full: Annotated[ bool, typer.Option(help="Show all unmuted failures, not just new ones") ] = False, show_muted: Annotated[ bool, typer.Option(help="Also show muted failures") ] = False, ) -> None: csvs = list_reports() if not csvs: console.print("[bold red]No Prowler CSV reports found on sifaka[/bold red]") raise typer.Exit(code=1) with tempfile.TemporaryDirectory() as tmpdir: # Fetch the two most recent reports latest_remote = csvs[-1] latest_local = Path(tmpdir) / "latest.csv" console.print(f"[dim]Fetching {latest_remote}...[/dim]") if not scp(f"sifaka:{latest_remote}", str(latest_local)): console.print("[bold red]Failed to copy latest report[/bold red]") raise typer.Exit(code=1) prev_local = None if len(csvs) >= 2: prev_remote = csvs[-2] prev_local = Path(tmpdir) / "prev.csv" console.print(f"[dim]Fetching {prev_remote}...[/dim]") if not scp(f"sifaka:{prev_remote}", str(prev_local)): prev_local = None latest = parse_findings(load_csv(str(latest_local))) # Extract report date from filename report_name = Path(latest_remote).stem console.print() # --- Overall status --- status_table = Table( show_header=True, header_style="bold", title=f"Report: {report_name}" ) status_table.add_column("Status") status_table.add_column("Count", justify="right") for status in ["PASS", "FAIL", "MANUAL"]: count = latest["statuses"].get(status, 0) style = "red" if status == "FAIL" and count > 0 else "" status_table.add_row( f"[{style}]{status}[/{style}]" if style else status, f"[{style}]{count}[/{style}]" if style else str(count), ) fail_count = len(latest["fails"]) muted_count = len(latest["muted"]) unmuted_count = len(latest["unmuted"]) status_table.add_row("", "") status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]") status_table.add_row( "[bold]↳ unmuted (action needed)[/bold]", f"[bold red]{unmuted_count}[/bold red]" if unmuted_count > 0 else "[bold green]0[/bold green]", ) status_table.add_row("", "") status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]") console.print(status_table) console.print() # --- Unmuted failures by severity --- if latest["unmuted"]: sev_table = Table( show_header=True, header_style="bold", title="Unmuted Failures by Severity", ) sev_table.add_column("Severity") sev_table.add_column("Count", justify="right") for sev, count in Counter( r["SEVERITY"] for r in latest["unmuted"] ).most_common(): style = ( "bold red" if sev == "critical" else "red" if sev == "high" else "yellow" if sev == "medium" else "" ) sev_table.add_row( f"[{style}]{sev}[/{style}]" if style else sev, f"[{style}]{count}[/{style}]" if style else str(count), ) console.print(sev_table) console.print() # --- Delta from previous report --- if prev_local: prev = parse_findings(load_csv(str(prev_local))) prev_keys = {finding_key(r): r for r in prev["unmuted"]} curr_keys = {finding_key(r): r for r in latest["unmuted"]} new_keys = set(curr_keys.keys()) - set(prev_keys.keys()) resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys()) prev_name = Path(csvs[-2]).stem delta_lines = [ f"Compared against: [dim]{prev_name}[/dim]", "", f"Previous unmuted FAILs: {len(prev['unmuted'])}", f"Current unmuted FAILs: {len(latest['unmuted'])}", f"[green]Resolved: {len(resolved_keys)}[/green]", f"[red]New: {len(new_keys)}[/red]" if new_keys else f"[green]New: 0[/green]", ] console.print( Panel( "\n".join(delta_lines), title="[bold]Week-over-Week Delta (unmuted only)[/bold]", border_style="cyan", ) ) console.print() if new_keys: console.print("[bold red]New Unmuted Failures:[/bold red]") for k in sorted(new_keys): r = curr_keys[k] console.print( f" [{r['SEVERITY']}] {r['CHECK_ID']}: " f"{r['STATUS_EXTENDED'][:120]}" ) console.print() if resolved_keys: console.print("[bold green]Resolved:[/bold green]") for k in sorted(resolved_keys): r = prev_keys[k] console.print( f" [dim][{r['SEVERITY']}] {r['CHECK_ID']}: " f"{r['STATUS_EXTENDED'][:120]}[/dim]" ) console.print() # --- Unmuted failure details --- findings_to_show = latest["unmuted"] if full else [] if not full and latest["unmuted"]: findings_to_show = latest["unmuted"] if findings_to_show: detail_table = Table( show_header=True, header_style="bold", title="Unmuted Failures — Action Needed", ) detail_table.add_column("Severity") detail_table.add_column("Check") detail_table.add_column("Resource") detail_table.add_column("Detail", max_width=60) for r in sorted(findings_to_show, key=severity_sort): sev = r["SEVERITY"] style = ( "bold red" if sev == "critical" else "red" if sev == "high" else "yellow" if sev == "medium" else "" ) detail_table.add_row( f"[{style}]{sev}[/{style}]" if style else sev, r["CHECK_ID"], r.get("RESOURCE_NAME", ""), r["STATUS_EXTENDED"][:60], ) console.print(detail_table) console.print() # --- Muted findings summary --- if show_muted and latest["muted"]: muted_table = Table( show_header=True, header_style="bold", title="Muted Failures (for reference)", ) muted_table.add_column("Severity") muted_table.add_column("Check") muted_table.add_column("Count", justify="right") muted_groups: dict[tuple[str, str], int] = Counter() for r in latest["muted"]: muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1 for (sev, check), count in sorted( muted_groups.items(), key=lambda x: severity_sort({"SEVERITY": x[0][0]}) ): muted_table.add_row(f"[dim]{sev}[/dim]", f"[dim]{check}[/dim]", f"[dim]{count}[/dim]") console.print(muted_table) console.print() # --- Verdict --- if not latest["unmuted"]: console.print( Panel( "[bold green]All clear.[/bold green] No unmuted failures.", title="Prowler Verdict", border_style="green", ) ) else: console.print( Panel( f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) " f"need triage.[/bold yellow]\n\n" "For each: remediate (fix the pod spec) or mute " "(add to mutelist + compensating control).", title="Prowler Verdict", border_style="yellow", ) ) # --- Node-level MANUAL check verification --- # Compensating control: node-config-automated-verification # These checks verify conditions Prowler reports as MANUAL because it # runs inside a pod and cannot evaluate them directly. run_node_verification(console) # --- Kingfisher secret scanning --- # TODO: Kingfisher currently only outputs HTML. Once JSON or CSV output # is supported upstream (contribute from our spork), add parsing here: # # KINGFISHER_BASE = "/volume1/reports/kingfisher" # - Fetch latest JSON/CSV from sifaka:{KINGFISHER_BASE}/ # - Parse findings: active vs inactive vs skipped validations # - Flag any "Active Credential" findings as critical # - Compare against previous scan for delta # - Show summary panel similar to Prowler # # For now, check that a recent report exists and warn if missing. kf_check = subprocess.run( ["ssh", "sifaka", "ls -1t /volume1/reports/kingfisher/ | head -1"], capture_output=True, text=True, timeout=15, ) kf_latest = kf_check.stdout.strip() if kf_check.returncode == 0 else "" if kf_latest and kf_latest.startswith("202"): console.print( f"[dim]Kingfisher: latest report directory is {kf_latest} " f"(HTML only — JSON/CSV pending upstream)[/dim]" ) else: console.print( "[bold yellow]Warning: No recent Kingfisher report found on " "sifaka. Check the CronJob on ringtail.[/bold yellow]" ) if __name__ == "__main__": typer.run(main)