blumeops/mise-tasks/review-compliance-reports

695 lines
23 KiB
Text
Raw Normal View History

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["rich==15.0.0", "typer==0.26.2", "pyyaml==6.0.3"]
# ///
#MISE description="Summarize the latest Prowler and Kingfisher compliance reports from sifaka"
#USAGE flag "--full" help="Show all unmuted failures, not just new ones"
#USAGE flag "--show-muted" help="Also show muted failures"
"""Fetch and summarize compliance reports from sifaka.
Covers:
- Prowler K8s CIS (in-cluster): per-finding detail
- Prowler container image scans: grouped by check + resource
- Prowler IaC manifest scans: grouped by check + resource
- Kingfisher secret scanning: TODO — pending upstream JSON/CSV output
support (currently HTML-only; contribute from spork)
For each Prowler scan, copies the two most recent CSV reports, parses
them, and displays:
1. Overall status (pass/fail/manual/muted counts)
2. Unmuted failures by severity
3. Delta from the previous report (new vs resolved)
4. Actionable unmuted failures (per-finding for in-cluster; grouped
by check ID and resource for image/IaC because they have far too
many findings to list individually)
This is the primary tool for the weekly compliance report review.
"""
import csv
import subprocess
import tempfile
from collections import Counter
from pathlib import Path
from typing import Annotated
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
PROWLER_SCANS: list[tuple[str, str, bool]] = [
# (label, sifaka base path, group_findings)
("K8s CIS (In-Cluster)", "/volume1/reports/prowler", False),
("Container Images", "/volume1/reports/prowler-images", True),
("IaC (manifests)", "/volume1/reports/prowler-iac", True),
]
console = Console()
def scp(remote: str, local: str) -> bool:
"""Copy a file from sifaka (requires scp -O for Synology)."""
result = subprocess.run(
["scp", "-O", remote, local],
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
def list_reports(base: str) -> list[str]:
"""List Prowler CSV reports under `base` on sifaka, sorted by timestamp."""
result = subprocess.run(
["ssh", "sifaka", f"find {base}/ -name '*.csv' "
"-not -path '*/compliance/*' -not -name '@*'"],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
console.print(f"[bold red]Failed to list reports under {base}[/bold red]")
return []
csvs = [p.strip() for p in result.stdout.strip().splitlines() if p.strip()]
# Sort by the timestamp embedded in the filename (e.g. 20260405030007)
import re
def sort_key(path: str) -> str:
m = re.search(r"(\d{14})", Path(path).name)
return m.group(1) if m else Path(path).name
return sorted(csvs, key=sort_key)
def load_csv(path: str) -> list[dict]:
"""Load a Prowler CSV report."""
with open(path) as f:
return list(csv.DictReader(f, delimiter=";"))
def parse_findings(rows: list[dict]) -> dict:
"""Categorize findings from a report."""
statuses = Counter(r["STATUS"] for r in rows)
fails = [r for r in rows if r["STATUS"] == "FAIL"]
unmuted = [r for r in fails if r.get("MUTED", "") != "True"]
muted = [r for r in fails if r.get("MUTED", "") == "True"]
return {
"total": len(rows),
"statuses": statuses,
"fails": fails,
"unmuted": unmuted,
"muted": muted,
}
def finding_key(r: dict) -> tuple[str, str]:
"""Stable identity for a finding (check + resource name, not UID)."""
return (r["CHECK_ID"], r.get("RESOURCE_NAME", ""))
SEVERITY_ORDER = ["critical", "high", "medium", "low", "informational"]
def severity_sort(r: dict) -> int:
sev = r.get("SEVERITY", "").lower()
return SEVERITY_ORDER.index(sev) if sev in SEVERITY_ORDER else 99
def _ssh_minikube(cmd: str, timeout: int = 15) -> subprocess.CompletedProcess:
"""Run a command inside the minikube node via SSH."""
return subprocess.run(
["ssh", "indri", f"minikube ssh -- {cmd}"],
capture_output=True,
text=True,
timeout=timeout,
)
def _kubectl(args: str, timeout: int = 15) -> subprocess.CompletedProcess:
"""Run a kubectl command against minikube-indri."""
return subprocess.run(
["kubectl", "--context=minikube-indri"] + args.split(),
capture_output=True,
text=True,
timeout=timeout,
)
def run_node_verification(console: Console) -> None:
"""Verify node-level conditions that Prowler reports as MANUAL.
Rip out compensating-controls framework (#359) ## Summary Removes the compensating-controls (CC) framework. Prowler and Kingfisher continue to run weekly and produce reports; the Prowler mutelist YAML files stay in place but no longer carry \`CC: <id>\` prefixes — each entry now just keeps a free-form \`Description\` of why it's muted. The CC review cadence proved to be more process overhead than this single-operator homelab needed. ## What changed **Deleted** - \`compensating-controls.yaml\` — the CC registry - \`mise-tasks/review-compensating-controls\` — the staleness-review task - \`docs/how-to/operations/review-compensating-controls.md\` - \`docs/how-to/operations/record-review-evidence.md\` (was aspirational) - \`docs/explanation/compliance-mute-categories.md\` (proposed-future CC/NA/RA work) - 5 orphan \`+review-cc-*\` / \`+compliance-mute-categories\` changelog fragments **Modified** - 6 mutelist YAML files: stripped \`CC: <id>.\` prefix from every \`Description\` / \`statement\` field, kept the free-form text - \`mise-tasks/review-compliance-reports\`: removed CC mentions from docstrings, panel text, and the node-verification table title. Node-verification logic itself is unchanged. - \`docs/reference/operations/security.md\`: removed the "Compensating controls" section - \`docs/how-to/operations/read-compliance-reports.md\`: rewrote step 3 of "Acting on findings" to point at the mutelist YAML directly - \`docs/changelog.d/prowler-iac-mutelist.infra.md\`: rewrote to drop the "two new compensating controls" framing ## What did not change - All Prowler manifests (cronjobs, RBAC, PVs, kustomization) — scans still run on the same schedule - The Kingfisher deployment - The trivy-shim in the Prowler container — that's about Trivy ignorefile plumbing, independent of the CC concept - The mutelist entries themselves — each \`Resources\` list is unchanged; only the prose of \`Description\` was edited - \`CHANGELOG.md\` — historical releases are left as-is ## Test plan - [ ] Wait for human review before deploying — once merged, re-point ArgoCD: \`argocd app set prowler --revision main && argocd app sync prowler\` (no manifest changes besides the ConfigMap, so impact is limited to muted-finding descriptions in next week's report) - [ ] Confirm next weekly Prowler K8s CIS run (Sunday 3am) still completes and produces a report on sifaka - [ ] Confirm next weekly Prowler IaC run still honors \`trivyignore.yaml\` (the trivy shim is untouched but the ignorefile content was rewritten) - [ ] \`mise run review-compliance-reports\` — verify node-verification block still runs and prints the renamed table title Reviewed-on: https://forge.eblu.me/eblume/blumeops/pulls/359
2026-05-22 21:08:53 -07:00
Prowler runs inside a pod and can't evaluate kubelet file permissions,
kubelet config arguments, etcd CA separation, or cluster-admin RBAC
bindings. We SSH into the minikube node and check each condition here,
failing loudly if any deviates from expected values.
"""
checks: list[tuple[str, str, bool]] = [] # (name, detail, passed)
# --- File ownership and permissions ---
file_expectations = [
("kubelet.conf ownership", "/etc/kubernetes/kubelet.conf", "root:root", None),
("kubelet.conf permissions", "/etc/kubernetes/kubelet.conf", None, "600"),
("config.yaml ownership", "/var/lib/kubelet/config.yaml", "root:root", None),
("config.yaml permissions", "/var/lib/kubelet/config.yaml", None, "644"),
("kubelet service ownership", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", "root:root", None),
("kubelet service permissions", "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf", None, "644"),
]
for name, path, expected_owner, expected_perms in file_expectations:
if expected_owner:
result = _ssh_minikube(f'"sudo stat -c %U:%G {path}"')
else:
result = _ssh_minikube(f'"sudo stat -c %a {path}"')
if result.returncode != 0:
checks.append((name, f"could not stat {path}", False))
else:
actual = result.stdout.strip()
expected = expected_owner or expected_perms
passed = actual == expected
checks.append((name, f"{actual} (expected {expected})", passed))
# --- Kubelet config arguments ---
kubelet_result = _ssh_minikube('"sudo cat /var/lib/kubelet/config.yaml"')
if kubelet_result.returncode != 0:
checks.append(("kubelet config", "could not read config.yaml", False))
else:
import yaml as _yaml
try:
kubelet_cfg = _yaml.safe_load(kubelet_result.stdout) or {}
except Exception:
kubelet_cfg = {}
checks.append(("kubelet config parse", "failed to parse config.yaml", False))
# readOnlyPort: absent or 0 is safe
rop = kubelet_cfg.get("readOnlyPort")
checks.append((
"readOnlyPort",
f"{rop!r} (absent or 0 is safe)",
rop is None or rop == 0,
))
# makeIPTablesUtilChains: absent (defaults true) or true
miu = kubelet_cfg.get("makeIPTablesUtilChains")
checks.append((
"makeIPTablesUtilChains",
f"{miu!r} (absent or true is safe)",
miu is None or miu is True,
))
# eventRecordQPS: absent (defaults 5) or > 0
erq = kubelet_cfg.get("eventRecordQPS")
checks.append((
"eventRecordQPS",
f"{erq!r} (absent or > 0 is safe)",
erq is None or (isinstance(erq, (int, float)) and erq > 0),
))
# tlsCipherSuites: absent uses Go defaults (acceptable)
tcs = kubelet_cfg.get("tlsCipherSuites")
checks.append((
"tlsCipherSuites",
"Go defaults" if tcs is None else f"{tcs!r}",
True, # Go defaults are acceptable; explicit suites also fine
))
# --- Etcd CA separation ---
etcd_fp = _ssh_minikube(
'"sudo openssl x509 -in /var/lib/minikube/certs/etcd/ca.crt -noout -fingerprint -sha256"'
)
cluster_fp = _ssh_minikube(
'"sudo openssl x509 -in /var/lib/minikube/certs/ca.crt -noout -fingerprint -sha256"'
)
if etcd_fp.returncode != 0 or cluster_fp.returncode != 0:
checks.append(("etcd CA separation", "could not read certificates", False))
else:
etcd_hash = etcd_fp.stdout.strip()
cluster_hash = cluster_fp.stdout.strip()
different = etcd_hash != cluster_hash
checks.append((
"etcd CA separation",
"different CAs" if different else "SAME CA (unexpected)",
different,
))
# --- RBAC cluster-admin bindings ---
expected_bindings = {"cluster-admin", "kubeadm:cluster-admins", "minikube-rbac"}
# Use a jsonpath that emits "name\troleRef" pairs to avoid N+1 queries
# Tab-separated because binding names can contain colons (e.g. kubeadm:cluster-admins)
rb_result = subprocess.run(
[
"kubectl", "--context=minikube-indri",
"get", "clusterrolebindings",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\t'}{.roleRef.name}{'\\n'}{end}",
],
capture_output=True,
text=True,
timeout=15,
)
if rb_result.returncode != 0:
checks.append(("cluster-admin bindings", "kubectl failed", False))
else:
admin_bindings: set[str] = set()
for line in rb_result.stdout.strip().splitlines():
if "\t" in line:
name, role = line.split("\t", 1)
if role == "cluster-admin":
admin_bindings.add(name)
unexpected = admin_bindings - expected_bindings
if unexpected:
checks.append((
"cluster-admin bindings",
f"unexpected: {', '.join(sorted(unexpected))}",
False,
))
else:
checks.append((
"cluster-admin bindings",
f"only expected: {', '.join(sorted(admin_bindings))}",
True,
))
# --- Display results ---
all_passed = all(passed for _, _, passed in checks)
table = Table(
show_header=True,
header_style="bold",
Rip out compensating-controls framework (#359) ## Summary Removes the compensating-controls (CC) framework. Prowler and Kingfisher continue to run weekly and produce reports; the Prowler mutelist YAML files stay in place but no longer carry \`CC: <id>\` prefixes — each entry now just keeps a free-form \`Description\` of why it's muted. The CC review cadence proved to be more process overhead than this single-operator homelab needed. ## What changed **Deleted** - \`compensating-controls.yaml\` — the CC registry - \`mise-tasks/review-compensating-controls\` — the staleness-review task - \`docs/how-to/operations/review-compensating-controls.md\` - \`docs/how-to/operations/record-review-evidence.md\` (was aspirational) - \`docs/explanation/compliance-mute-categories.md\` (proposed-future CC/NA/RA work) - 5 orphan \`+review-cc-*\` / \`+compliance-mute-categories\` changelog fragments **Modified** - 6 mutelist YAML files: stripped \`CC: <id>.\` prefix from every \`Description\` / \`statement\` field, kept the free-form text - \`mise-tasks/review-compliance-reports\`: removed CC mentions from docstrings, panel text, and the node-verification table title. Node-verification logic itself is unchanged. - \`docs/reference/operations/security.md\`: removed the "Compensating controls" section - \`docs/how-to/operations/read-compliance-reports.md\`: rewrote step 3 of "Acting on findings" to point at the mutelist YAML directly - \`docs/changelog.d/prowler-iac-mutelist.infra.md\`: rewrote to drop the "two new compensating controls" framing ## What did not change - All Prowler manifests (cronjobs, RBAC, PVs, kustomization) — scans still run on the same schedule - The Kingfisher deployment - The trivy-shim in the Prowler container — that's about Trivy ignorefile plumbing, independent of the CC concept - The mutelist entries themselves — each \`Resources\` list is unchanged; only the prose of \`Description\` was edited - \`CHANGELOG.md\` — historical releases are left as-is ## Test plan - [ ] Wait for human review before deploying — once merged, re-point ArgoCD: \`argocd app set prowler --revision main && argocd app sync prowler\` (no manifest changes besides the ConfigMap, so impact is limited to muted-finding descriptions in next week's report) - [ ] Confirm next weekly Prowler K8s CIS run (Sunday 3am) still completes and produces a report on sifaka - [ ] Confirm next weekly Prowler IaC run still honors \`trivyignore.yaml\` (the trivy shim is untouched but the ignorefile content was rewritten) - [ ] \`mise run review-compliance-reports\` — verify node-verification block still runs and prints the renamed table title Reviewed-on: https://forge.eblu.me/eblume/blumeops/pulls/359
2026-05-22 21:08:53 -07:00
title="Node Verification (out-of-band checks for MANUAL findings)",
)
table.add_column("Check")
table.add_column("Detail")
table.add_column("Result", justify="center")
for name, detail, passed in checks:
status = "[green]PASS[/green]" if passed else "[bold red]FAIL[/bold red]"
table.add_row(name, detail, status)
console.print(table)
console.print()
if all_passed:
console.print(
Panel(
"[bold green]All node-level checks passed.[/bold green] "
"Muted MANUAL findings are verified.",
title="Node Verification Verdict",
border_style="green",
)
)
else:
failed = [(n, d) for n, d, p in checks if not p]
console.print(
Panel(
f"[bold red]{len(failed)} node-level check(s) FAILED.[/bold red]\n"
"Review the failures above — muted MANUAL findings may no longer "
"be valid.",
title="Node Verification Verdict",
border_style="red",
)
)
console.print()
SEVERITY_STYLE = {
"critical": "bold red",
"high": "red",
"medium": "yellow",
}
def _sev_style(sev: str) -> str:
return SEVERITY_STYLE.get(sev.lower(), "")
def summarize_report(
label: str,
base: str,
tmpdir: str,
*,
show_muted: bool = False,
group_findings: bool = False,
) -> None:
"""Fetch and summarize the latest Prowler report under `base`.
When `group_findings` is True, top-N CHECK_ID and RESOURCE_NAME tables
are shown instead of a per-finding detail table — appropriate for
image and IaC scans that produce thousands of findings.
"""
console.rule(f"[bold]{label}[/bold]")
csvs = list_reports(base)
if not csvs:
console.print(
f"[bold yellow]{label}: no Prowler CSV reports found "
f"under {base}[/bold yellow]"
)
console.print()
return
safe = "".join(c if c.isalnum() else "_" for c in label.lower())
latest_remote = csvs[-1]
latest_local = Path(tmpdir) / f"{safe}_latest.csv"
console.print(f"[dim]Fetching {latest_remote}...[/dim]")
if not scp(f"sifaka:{latest_remote}", str(latest_local)):
console.print(f"[bold red]Failed to copy {latest_remote}[/bold red]")
return
prev_local: Path | None = None
if len(csvs) >= 2:
prev_remote = csvs[-2]
prev_path = Path(tmpdir) / f"{safe}_prev.csv"
console.print(f"[dim]Fetching {prev_remote}...[/dim]")
if scp(f"sifaka:{prev_remote}", str(prev_path)):
prev_local = prev_path
latest = parse_findings(load_csv(str(latest_local)))
report_name = Path(latest_remote).stem
console.print()
# --- Overall status ---
status_table = Table(
show_header=True, header_style="bold", title=f"Report: {report_name}"
)
status_table.add_column("Status")
status_table.add_column("Count", justify="right")
for status in ["PASS", "FAIL", "MANUAL"]:
count = latest["statuses"].get(status, 0)
style = "red" if status == "FAIL" and count > 0 else ""
status_table.add_row(
f"[{style}]{status}[/{style}]" if style else status,
f"[{style}]{count}[/{style}]" if style else str(count),
)
muted_count = len(latest["muted"])
unmuted_count = len(latest["unmuted"])
status_table.add_row("", "")
status_table.add_row("[dim]↳ muted[/dim]", f"[dim]{muted_count}[/dim]")
status_table.add_row(
"[bold]↳ unmuted (action needed)[/bold]",
f"[bold red]{unmuted_count}[/bold red]"
if unmuted_count > 0
else "[bold green]0[/bold green]",
)
status_table.add_row("", "")
status_table.add_row("[bold]Total[/bold]", f"[bold]{latest['total']}[/bold]")
console.print(status_table)
console.print()
# --- Unmuted failures by severity ---
if latest["unmuted"]:
sev_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures by Severity",
)
sev_table.add_column("Severity")
sev_table.add_column("Count", justify="right")
for sev, count in sorted(
Counter(r["SEVERITY"] for r in latest["unmuted"]).items(),
key=lambda kv: severity_sort({"SEVERITY": kv[0]}),
):
style = _sev_style(sev)
sev_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
f"[{style}]{count}[/{style}]" if style else str(count),
)
console.print(sev_table)
console.print()
# --- Delta from previous report ---
if prev_local:
prev = parse_findings(load_csv(str(prev_local)))
prev_keys = {finding_key(r): r for r in prev["unmuted"]}
curr_keys = {finding_key(r): r for r in latest["unmuted"]}
new_keys = set(curr_keys.keys()) - set(prev_keys.keys())
resolved_keys = set(prev_keys.keys()) - set(curr_keys.keys())
prev_name = Path(csvs[-2]).stem
delta_lines = [
f"Compared against: [dim]{prev_name}[/dim]",
"",
f"Previous unmuted FAILs: {len(prev['unmuted'])}",
f"Current unmuted FAILs: {len(latest['unmuted'])}",
f"[green]Resolved: {len(resolved_keys)}[/green]",
f"[red]New: {len(new_keys)}[/red]"
if new_keys
else "[green]New: 0[/green]",
]
console.print(
Panel(
"\n".join(delta_lines),
title="[bold]Week-over-Week Delta (unmuted only)[/bold]",
border_style="cyan",
)
)
console.print()
# For grouped scans the new/resolved listings are too noisy
# (potentially thousands of lines). Skip the listings; the count
# is in the panel above and detail is in the grouped tables.
if not group_findings:
if new_keys:
console.print("[bold red]New Unmuted Failures:[/bold red]")
for k in sorted(new_keys):
r = curr_keys[k]
console.print(
f" [{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}"
)
console.print()
if resolved_keys:
console.print("[bold green]Resolved:[/bold green]")
for k in sorted(resolved_keys):
r = prev_keys[k]
console.print(
f" [dim][{r['SEVERITY']}] {r['CHECK_ID']}: "
f"{r['STATUS_EXTENDED'][:120]}[/dim]"
)
console.print()
# --- Unmuted failure details (grouped or per-finding) ---
if latest["unmuted"]:
if group_findings:
_print_grouped_findings(latest["unmuted"])
else:
_print_findings_detail(latest["unmuted"])
# --- Muted findings summary ---
if show_muted and latest["muted"]:
muted_table = Table(
show_header=True,
header_style="bold",
title="Muted Failures (for reference)",
)
muted_table.add_column("Severity")
muted_table.add_column("Check")
muted_table.add_column("Count", justify="right")
muted_groups: dict[tuple[str, str], int] = Counter()
for r in latest["muted"]:
muted_groups[(r["SEVERITY"], r["CHECK_ID"])] += 1
for (sev, check), count in sorted(
muted_groups.items(),
key=lambda x: severity_sort({"SEVERITY": x[0][0]}),
):
muted_table.add_row(
f"[dim]{sev}[/dim]",
f"[dim]{check}[/dim]",
f"[dim]{count}[/dim]",
)
console.print(muted_table)
console.print()
# --- Verdict ---
if not latest["unmuted"]:
console.print(
Panel(
"[bold green]All clear.[/bold green] No unmuted failures.",
title=f"{label} Verdict",
border_style="green",
)
)
else:
console.print(
Panel(
f"[bold yellow]{len(latest['unmuted'])} unmuted failure(s) "
f"need triage.[/bold yellow]\n\n"
Rip out compensating-controls framework (#359) ## Summary Removes the compensating-controls (CC) framework. Prowler and Kingfisher continue to run weekly and produce reports; the Prowler mutelist YAML files stay in place but no longer carry \`CC: <id>\` prefixes — each entry now just keeps a free-form \`Description\` of why it's muted. The CC review cadence proved to be more process overhead than this single-operator homelab needed. ## What changed **Deleted** - \`compensating-controls.yaml\` — the CC registry - \`mise-tasks/review-compensating-controls\` — the staleness-review task - \`docs/how-to/operations/review-compensating-controls.md\` - \`docs/how-to/operations/record-review-evidence.md\` (was aspirational) - \`docs/explanation/compliance-mute-categories.md\` (proposed-future CC/NA/RA work) - 5 orphan \`+review-cc-*\` / \`+compliance-mute-categories\` changelog fragments **Modified** - 6 mutelist YAML files: stripped \`CC: <id>.\` prefix from every \`Description\` / \`statement\` field, kept the free-form text - \`mise-tasks/review-compliance-reports\`: removed CC mentions from docstrings, panel text, and the node-verification table title. Node-verification logic itself is unchanged. - \`docs/reference/operations/security.md\`: removed the "Compensating controls" section - \`docs/how-to/operations/read-compliance-reports.md\`: rewrote step 3 of "Acting on findings" to point at the mutelist YAML directly - \`docs/changelog.d/prowler-iac-mutelist.infra.md\`: rewrote to drop the "two new compensating controls" framing ## What did not change - All Prowler manifests (cronjobs, RBAC, PVs, kustomization) — scans still run on the same schedule - The Kingfisher deployment - The trivy-shim in the Prowler container — that's about Trivy ignorefile plumbing, independent of the CC concept - The mutelist entries themselves — each \`Resources\` list is unchanged; only the prose of \`Description\` was edited - \`CHANGELOG.md\` — historical releases are left as-is ## Test plan - [ ] Wait for human review before deploying — once merged, re-point ArgoCD: \`argocd app set prowler --revision main && argocd app sync prowler\` (no manifest changes besides the ConfigMap, so impact is limited to muted-finding descriptions in next week's report) - [ ] Confirm next weekly Prowler K8s CIS run (Sunday 3am) still completes and produces a report on sifaka - [ ] Confirm next weekly Prowler IaC run still honors \`trivyignore.yaml\` (the trivy shim is untouched but the ignorefile content was rewritten) - [ ] \`mise run review-compliance-reports\` — verify node-verification block still runs and prints the renamed table title Reviewed-on: https://forge.eblu.me/eblume/blumeops/pulls/359
2026-05-22 21:08:53 -07:00
"For each: remediate, or add a Resource entry to the "
"matching check in argocd/manifests/prowler/mutelist/.",
title=f"{label} Verdict",
border_style="yellow",
)
)
console.print()
def _print_findings_detail(unmuted: list[dict]) -> None:
"""Per-finding detail table — appropriate when finding count is small."""
detail_table = Table(
show_header=True,
header_style="bold",
title="Unmuted Failures — Action Needed",
)
detail_table.add_column("Severity")
detail_table.add_column("Check")
detail_table.add_column("Resource")
detail_table.add_column("Detail", max_width=60)
for r in sorted(unmuted, key=severity_sort):
sev = r["SEVERITY"]
style = _sev_style(sev)
detail_table.add_row(
f"[{style}]{sev}[/{style}]" if style else sev,
r["CHECK_ID"],
r.get("RESOURCE_NAME", ""),
r["STATUS_EXTENDED"][:60],
)
console.print(detail_table)
console.print()
def _worst_severity(rows: list[dict]) -> str:
"""Return the most severe severity label across `rows`."""
if not rows:
return ""
return min(
(r["SEVERITY"] for r in rows),
key=lambda s: severity_sort({"SEVERITY": s}),
)
def _print_grouped_findings(unmuted: list[dict], top_n: int = 15) -> None:
"""Top-N tables grouped by CHECK_ID and RESOURCE_NAME.
Used for image and IaC scans where per-finding tables would be too
large to be useful. Shows count and worst severity for each group.
"""
by_check: dict[str, list[dict]] = {}
by_resource: dict[str, list[dict]] = {}
for r in unmuted:
by_check.setdefault(r["CHECK_ID"], []).append(r)
by_resource.setdefault(r.get("RESOURCE_NAME", "") or "(no resource)", []).append(r)
check_table = Table(
show_header=True,
header_style="bold",
title=f"Top {top_n} Checks by Unmuted Finding Count",
)
check_table.add_column("Worst Sev")
check_table.add_column("Check ID")
check_table.add_column("Count", justify="right")
for check, rows in sorted(
by_check.items(), key=lambda kv: -len(kv[1])
)[:top_n]:
worst = _worst_severity(rows)
style = _sev_style(worst)
check_table.add_row(
f"[{style}]{worst}[/{style}]" if style else worst,
check,
str(len(rows)),
)
console.print(check_table)
console.print()
res_table = Table(
show_header=True,
header_style="bold",
title=f"Top {top_n} Resources by Unmuted Finding Count",
)
res_table.add_column("Worst Sev")
res_table.add_column("Resource")
res_table.add_column("Count", justify="right")
for resource, rows in sorted(
by_resource.items(), key=lambda kv: -len(kv[1])
)[:top_n]:
worst = _worst_severity(rows)
style = _sev_style(worst)
res_table.add_row(
f"[{style}]{worst}[/{style}]" if style else worst,
resource[:80],
str(len(rows)),
)
console.print(res_table)
console.print()
def main(
full: Annotated[
bool, typer.Option(help="(reserved) currently a no-op; all unmuted failures already shown")
] = False,
show_muted: Annotated[
bool, typer.Option(help="Also show muted failures")
] = False,
) -> None:
del full # historical flag, kept for backwards compatibility
with tempfile.TemporaryDirectory() as tmpdir:
for label, base, group in PROWLER_SCANS:
summarize_report(
label,
base,
tmpdir,
show_muted=show_muted,
group_findings=group,
)
# --- Node-level MANUAL check verification ---
# These checks verify conditions Prowler reports as MANUAL because it
# runs inside a pod and cannot evaluate them directly.
run_node_verification(console)
# --- Kingfisher secret scanning ---
# TODO: Kingfisher currently only outputs HTML. Once JSON or CSV output
# is supported upstream (contribute from our spork), add parsing here:
#
# KINGFISHER_BASE = "/volume1/reports/kingfisher"
# - Fetch latest JSON/CSV from sifaka:{KINGFISHER_BASE}/
# - Parse findings: active vs inactive vs skipped validations
# - Flag any "Active Credential" findings as critical
# - Compare against previous scan for delta
# - Show summary panel similar to Prowler
#
# For now, check that a recent report exists and warn if missing.
kf_check = subprocess.run(
["ssh", "sifaka", "ls -1t /volume1/reports/kingfisher/ | head -1"],
capture_output=True,
text=True,
timeout=15,
)
kf_latest = kf_check.stdout.strip() if kf_check.returncode == 0 else ""
if kf_latest and kf_latest.startswith("202"):
console.print(
f"[dim]Kingfisher: latest report directory is {kf_latest} "
f"(HTML only — JSON/CSV pending upstream)[/dim]"
)
else:
console.print(
"[bold yellow]Warning: No recent Kingfisher report found on "
"sifaka. Check the CronJob on ringtail.[/bold yellow]"
)
if __name__ == "__main__":
typer.run(main)