blumeops/mise-tasks/review-compliance-reports

578 lines
20 KiB
Text
Raw Normal View History

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["rich>=14.0.0", "typer>=0.24.0", "pyyaml>=6.0"]
# ///
#MISE description="Summarize the latest Prowler and Kingfisher compliance reports from sifaka"
#USAGE flag "--full" help="Show all unmuted failures, not just new ones"
#USAGE flag "--show-muted" help="Also show muted failures"
"""Fetch and summarize compliance reports from sifaka.
Covers:
- Prowler K8s CIS: CSV-based, full analysis with delta tracking
- Kingfisher secret scanning: TODO — pending upstream JSON/CSV output
support (currently HTML-only; contribute from spork)
For Prowler, copies the two most recent K8s CIS reports, parses them,
and displays:
1. Overall status (pass/fail/manual/muted counts)
2. Unmuted failures by severity
3. Delta from the previous report (new vs resolved)
4. Actionable unmuted failures with details
This is the primary tool for the weekly compliance report review.
"""
import csv
import subprocess
import sys
import tempfile
from collections import Counter
from pathlib import Path
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
REPORT_BASE = "sifaka:/volume1/reports/prowler"
console = Console()
def scp(remote: str, local: str) -> bool:
"""Copy a file from sifaka (requires scp -O for Synology)."""
result = subprocess.run(
["scp", "-O", remote, local],
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
def list_reports() -> list[str]:
"""List Prowler CSV reports on sifaka, sorted by embedded timestamp."""
result = subprocess.run(
["ssh", "sifaka", "find /volume1/reports/prowler/ -name '*.csv' "
"-not -path '*/compliance/*' -not -name '@*'"],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
console.print("[bold red]Failed to list reports on sifaka[/bold red]")
raise typer.Exit(code=1)
csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()]
# Sort by the timestamp embedded in the filename (e.g. 20260405030007)
import re
def sort_key(path: str) -> str:
m = re.search(r"(\d{14})", Path(path).name)
return m.group(1) if m else Path(path).name
return sorted(csvs, key=sort_key)
def load_csv(path: str) -> list[dict]:
"""Load a Prowler CSV report."""
with open(path) as f:
return list(csv.DictReader(f, delimiter=";"))
def parse_findings(rows: list[dict]) -> dict:
"""Categorize findings from a report."""
statuses = Counter(r["STATUS"] for r in rows)
fails = [r for r in rows if r["STATUS"] == "FAIL"]
unmuted = [r for r in fails if r.get("MUTED", "") != "True"]
muted = [r for r in fails if r.get("MUTED", "") == "True"]
return {
"total": len(rows),
"statuses": statuses,
"fails": fails,
"unmuted": unmuted,
"muted": muted,
}
def finding_key(r: dict) -> tuple[str, str]:
"""Stable identity for a finding (check + resource name, not UID)."""
return (r["CHECK_ID"], r.get("RESOURCE_NAME", ""))
SEVERITY_ORDER = ["critical", "high", "medium", "low", "informational"]
def severity_sort(r: dict) -> int:
sev = r.get("SEVERITY", "").lower()
return SEVERITY_ORDER.index(sev) if sev in SEVERITY_ORDER else 99
def _ssh_minikube(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess:
"""Run a command inside the minikube node via SSH."""
return subprocess.run(
["ssh", "indri", f"minikube ssh -- {cmd}"],
capture_output=True,
text=True,
timeout=timeout,
)
def _kubectl(args: str, timeout: int = 15) -> subprocess.CompletedProcess:
"""Run a kubectl command against minikube-indri."""
return subprocess.run(
["kubectl", "--context=minikube-indri"] + args.split(),
capture_output=True,
text=True,
timeout=timeout,
)
def run_node_verification(console: Console) -> None:
"""Verify node-level conditions that Prowler reports as MANUAL.
Compensating control: node-config-automated-verification
"""
checks: list[tuple[str, str, bool]] = [] # (name, detail, passed)
# --- File ownership and permissions ---
file_expectations = [
("kubelet.conf ownership", "/etc/kubernetes/kubelet.conf", "root:root", None),
("kubelet.conf permissions", "/etc/kubernetes/kubelet.conf", None, "600"),
("config.yaml ownership", "/var/lib/kubelet/config.yaml", "root:root", None),
("config.yaml permissions", "/var/lib/kubelet/config.yaml", None, "644"),
("kubelet service ownership", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", "root:root", None),
("kubelet service permissions", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", None, "644"),
]
for name, path, expected_owner, expected_perms in file_expectations:
if expected_owner:
result = _ssh_minikube(f'"sudo stat -c %U:%G {path}"')
else:
result = _ssh_minikube(f'"sudo stat -c %a {path}"')
if result.returncode != 0:
checks.append((name, f"could not stat {path}", False))
else:
actual = result.stdout.strip()
expected = expected_owner or expected_perms
passed = actual == expected
checks.append((name, f"{actual} (expected {expected})", passed))
# --- Kubelet config arguments ---
kubelet_result = _ssh_minikube('"sudo cat /var/lib/kubelet/config.yaml"')
if kubelet_result.returncode != 0:
checks.append(("kubelet config", "could not read config.yaml", False))
else:
import yaml as _yaml
try:
kubelet_cfg = _yaml.safe_load(kubelet_result.stdout) or {}
except Exception:
kubelet_cfg = {}
checks.append(("kubelet config parse", "failed to parse config.yaml", False))
# readOnlyPort: absent or 0 is safe
rop = kubelet_cfg.get("readOnlyPort")
checks.append((
"readOnlyPort",
f"{rop!r} (absent or 0 is safe)",
rop is None or rop == 0,
))
# makeIPTablesUtilChains: absent (defaults true) or true
miu = kubelet_cfg.get("makeIPTablesUtilChains")
checks.append((
"makeIPTablesUtilChains",
f"{miu!r} (absent or true is safe)",
miu is None or miu is True,
))
# eventRecordQPS: absent (defaults 5) or > 0
erq = kubelet_cfg.get("eventRecordQPS")
checks.append((
"eventRecordQPS",
f"{erq!r} (absent or > 0 is safe)",
erq is None or (isinstance(erq, (int, float)) and erq > 0),
))
# tlsCipherSuites: absent uses Go defaults (acceptable)
tcs = kubelet_cfg.get("tlsCipherSuites")
checks.append((
"tlsCipherSuites",
"Go defaults" if tcs is None else f"{tcs!r}",
True, # Go defaults are acceptable; explicit suites also fine
))
# --- Etcd CA separation ---
etcd_fp = _ssh_minikube(
'"sudo openssl x509 -in /var/lib/minikube/certs/etcd/ca.crt -noout -fingerprint -sha256"'
)
cluster_fp = _ssh_minikube(
'"sudo openssl x509 -in /var/lib/minikube/certs/ca.crt -noout -fingerprint -sha256"'
)
if etcd_fp.returncode != 0 or cluster_fp.returncode != 0:
checks.append(("etcd CA separation", "could not read certificates", False))
else:
etcd_hash = etcd_fp.stdout.strip()
cluster_hash = cluster_fp.stdout.strip()
different = etcd_hash != cluster_hash
checks.append((
"etcd CA separation",
"different CAs" if different else "SAME CA (unexpected)",
different,
))
# --- RBAC cluster-admin bindings ---
expected_bindings = {"cluster-admin", "kubeadm:cluster-admins", "minikube-rbac"}
# Use a jsonpath that emits "name\troleRef" pairs to avoid N+1 queries
# Tab-separated because binding names can contain colons (e.g. kubeadm:cluster-admins)
rb_result = subprocess.run(
[
"kubectl", "--context=minikube-indri",
"get", "clusterrolebindings",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{.roleRef.name}{'\\n'}{end}",
],
capture_output=True,
text=True,
timeout=15,
)
if rb_result.returncode != 0:
checks.append(("cluster-admin bindings", "kubectl failed", False))
else:
admin_bindings: set[str] = set()
for line in rb_result.stdout.strip().splitlines():
if "\t" in line:
name, role = line.split("\t", 1)
if role == "cluster-admin":
admin_bindings.add(name)
unexpected = admin_bindings - expected_bindings
if unexpected:
checks.append((
"cluster-admin bindings",
f"unexpected: {', '.join(sorted(unexpected))}",
False,
))
else:
checks.append((
"cluster-admin bindings",
f"only expected: {', '.join(sorted(admin_bindings))}",
True,
))
# --- Display results ---
all_passed = all(passed for _, _, passed in checks)
table = Table(
show_header=True,
header_style="bold",
title="Node Verification (CC: node-config-automated-verification)",
)
table.add_column("Check")
table.add_column("Detail")
table.add_column("Result", justify="center")
for name, detail, passed in checks:
status = "[green]PASS[/green]" if passed else "[bold red]FAIL[/bold red]"
table.add_row(name, detail, status)
console.print(table)
console.print()
if all_passed:
console.print(
Panel(
"[bold green]All node-level checks passed.[/bold green] "
"Muted MANUAL findings are verified.",
title="Node Verification Verdict",
border_style="green",
)
)
else:
failed = [(n, d) for n, d, p in checks if not p]
console.print(
Panel(
f"[bold red]{len(failed)} node-level check(s) FAILED.[/bold red]\n"
"Review the failures above — muted MANUAL findings may no longer "
"be valid.",
title="Node Verification Verdict",
border_style="red",
)
)
console.print()
def main(
full: Annotated[
bool, typer.Option(help="Show all unmuted failures, not just new ones")
] = False,
show_muted: Annotated[
bool, typer.Option(help="Also show muted failures")
] = False,
) -> None:
csvs = list_reports()
if not csvs:
console.print("[bold red]No Prowler CSV reports found on sifaka[/bold red]")
raise typer.Exit(code=1)
with tempfile.TemporaryDirectory() as tmpdir:
# Fetch the two most recent reports
latest_remote = csvs[-1]
latest_local = Path(tmpdir) / "latest.csv"
console.print(f"[dim]Fetching {latest_remote}...[/dim]")
if not scp(f"sifaka:{latest_remote}", str(latest_local)):
console.print("[bold red]Failed to copy latest report[/bold red]")
raise typer.Exit(code=1)
prev_local = None
if len(csvs) >= 2:
prev_remote = csvs[-2]
prev_local = Path(tmpdir) / "prev.csv"
console.print(f"[dim]Fetching {prev_remote}...[/dim]")
if not scp(f"sifaka:{prev_remote}", str(prev_local)):
prev_local = None
latest = parse_findings(load_csv(str(latest_local)))
# Extract report date from filename
report_name = Path(latest_remote).stem
console.print()
# --- Overall status ---
status_table = Table(
show_header=True, header_style="bold", title=f"Report: {report_name}"
)
status_table.add_column("Status")
status_table.add_column("Count", justify="right")
for status in ["PASS", "FAIL", "MANUAL"]:
count = latest["statuses"].get(status, 0)
style = "red" if status == "FAIL" and count > 0 else ""
status_table.add_row(
f"[{style}]{status}[/{style}]" if style else status,
f"[{style}]{count}[/{style}]" if style else str(count),
)
fail_count = len(latest["fails"])
muted_count = len(latest["muted"])
unmuted_count = len(latest["unmuted"])
status_table.add_row("", "")
status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]")
status_table.add_row(
"[bold]↳ unmuted (action needed)[/bold]",
f"[bold red]{unmuted_count}[/bold red]"
if unmuted_count > 0
else "[bold green]0[/bold green]",
)
status_table.add_row("", "")
status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]")
console.print(status_table)
console.print()
# --- Unmuted failures by severity ---
if latest["unmuted"]:
sev_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures by Severity",
)
sev_table.add_column("Severity")
sev_table.add_column("Count", justify="right")
for sev, count in Counter(
r["SEVERITY"] for r in latest["unmuted"]
).most_common():
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
sev_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
f"[{style}]{count}[/{style}]" if style else str(count),
)
console.print(sev_table)
console.print()
# --- Delta from previous report ---
if prev_local:
prev = parse_findings(load_csv(str(prev_local)))
prev_keys = {finding_key(r): r for r in prev["unmuted"]}
curr_keys = {finding_key(r): r for r in latest["unmuted"]}
new_keys = set(curr_keys.keys()) - set(prev_keys.keys())
resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys())
prev_name = Path(csvs[-2]).stem
delta_lines = [
f"Compared against: [dim]{prev_name}[/dim]",
"",
f"Previous unmuted FAILs: {len(prev['unmuted'])}",
f"Current unmuted FAILs: {len(latest['unmuted'])}",
f"[green]Resolved: {len(resolved_keys)}[/green]",
f"[red]New: {len(new_keys)}[/red]"
if new_keys
else f"[green]New: 0[/green]",
]
console.print(
Panel(
"\n".join(delta_lines),
title="[bold]Week-over-Week Delta (unmuted only)[/bold]",
border_style="cyan",
)
)
console.print()
if new_keys:
console.print("[bold red]New Unmuted Failures:[/bold red]")
for k in sorted(new_keys):
r = curr_keys[k]
console.print(
f" [{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}"
)
console.print()
if resolved_keys:
console.print("[bold green]Resolved:[/bold green]")
for k in sorted(resolved_keys):
r = prev_keys[k]
console.print(
f" [dim][{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}[/dim]"
)
console.print()
# --- Unmuted failure details ---
findings_to_show = latest["unmuted"] if full else []
if not full and latest["unmuted"]:
findings_to_show = latest["unmuted"]
if findings_to_show:
detail_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures — Action Needed",
)
detail_table.add_column("Severity")
detail_table.add_column("Check")
detail_table.add_column("Resource")
detail_table.add_column("Detail", max_width=60)
for r in sorted(findings_to_show, key=severity_sort):
sev = r["SEVERITY"]
style = (
"bold red"
if sev == "critical"
else "red"
if sev == "high"
else "yellow"
if sev == "medium"
else ""
)
detail_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
r["CHECK_ID"],
r.get("RESOURCE_NAME", ""),
r["STATUS_EXTENDED"][:60],
)
console.print(detail_table)
console.print()
# --- Muted findings summary ---
if show_muted and latest["muted"]:
muted_table = Table(
show_header=True,
header_style="bold",
title="Muted Failures (for reference)",
)
muted_table.add_column("Severity")
muted_table.add_column("Check")
muted_table.add_column("Count", justify="right")
muted_groups: dict[tuple[str, str], int] = Counter()
for r in latest["muted"]:
muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1
for (sev, check), count in sorted(
muted_groups.items(), key=lambda x: severity_sort({"SEVERITY": x[0][0]})
):
muted_table.add_row(f"[dim]{sev}[/dim]", f"[dim]{check}[/dim]", f"[dim]{count}[/dim]")
console.print(muted_table)
console.print()
# --- Verdict ---
if not latest["unmuted"]:
console.print(
Panel(
"[bold green]All clear.[/bold green] No unmuted failures.",
title="Prowler Verdict",
border_style="green",
)
)
else:
console.print(
Panel(
f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) "
f"need triage.[/bold yellow]\n\n"
"For each: remediate (fix the pod spec) or mute "
"(add to mutelist + compensating control).",
title="Prowler Verdict",
border_style="yellow",
)
)
# --- Node-level MANUAL check verification ---
# Compensating control: node-config-automated-verification
# These checks verify conditions Prowler reports as MANUAL because it
# runs inside a pod and cannot evaluate them directly.
run_node_verification(console)
# --- Kingfisher secret scanning ---
# TODO: Kingfisher currently only outputs HTML. Once JSON or CSV output
# is supported upstream (contribute from our spork), add parsing here:
#
# KINGFISHER_BASE = "/volume1/reports/kingfisher"
# - Fetch latest JSON/CSV from sifaka:{KINGFISHER_BASE}/
# - Parse findings: active vs inactive vs skipped validations
# - Flag any "Active Credential" findings as critical
# - Compare against previous scan for delta
# - Show summary panel similar to Prowler
#
# For now, check that a recent report exists and warn if missing.
kf_check = subprocess.run(
["ssh", "sifaka", "ls -1t /volume1/reports/kingfisher/ | head -1"],
capture_output=True,
text=True,
timeout=15,
)
kf_latest = kf_check.stdout.strip() if kf_check.returncode == 0 else ""
if kf_latest and kf_latest.startswith("202"):
console.print(
f"[dim]Kingfisher: latest report directory is {kf_latest} "
f"(HTML only — JSON/CSV pending upstream)[/dim]"
)
else:
console.print(
"[bold yellow]Warning: No recent Kingfisher report found on "
"sifaka. Check the CronJob on ringtail.[/bold yellow]"
)
if __name__ == "__main__":
typer.run(main)