performance improvements and access map viewer improvements

This commit is contained in:
Mick Grove 2026-04-16 09:56:56 -07:00
commit c3d686cfac
5 changed files with 1858 additions and 1003 deletions

File diff suppressed because it is too large Load diff

View file

@ -44,14 +44,67 @@ fn origin_fp(os: &OriginSet) -> u64 {
h.finish()
}
fn dedup_origin_kind(origin: &OriginSet) -> &'static str {
if origin.iter().any(|o| matches!(o, Origin::Extended(_))) {
"ext"
} else {
"file_git"
}
}
const DEDUP_BLOOM_FP_RATE: f64 = 0.001;
const INITIAL_BLOOM_CAPACITY: usize = 5_000_000;
const MAX_BLOOM_CAPACITY: usize = 10_000_000;
struct DedupBloomSet {
filters: Vec<Bloom<u64>>,
active_items: usize,
active_capacity: usize,
}
impl DedupBloomSet {
fn new() -> Self {
Self::with_capacity(INITIAL_BLOOM_CAPACITY)
}
fn with_capacity(initial_capacity: usize) -> Self {
let capacity = initial_capacity.max(1);
let first = Bloom::new_for_fp_rate(capacity, DEDUP_BLOOM_FP_RATE)
.expect("Bloom filter size params are valid");
Self { filters: vec![first], active_items: 0, active_capacity: capacity }
}
fn contains_or_insert(&mut self, key: u64) -> bool {
if self.filters.iter().any(|filter| filter.check(&key)) {
return true;
}
if self.active_items >= self.active_capacity {
self.grow();
}
let active = self.filters.last_mut().expect("at least one Bloom filter exists");
active.set(&key);
self.active_items += 1;
false
}
fn grow(&mut self) {
self.active_capacity = std::cmp::min(self.active_capacity * 2, MAX_BLOOM_CAPACITY);
let next = Bloom::new_for_fp_rate(self.active_capacity, DEDUP_BLOOM_FP_RATE)
.expect("Bloom filter size params are valid");
self.filters.push(next);
self.active_items = 0;
}
}
pub struct FindingsStore {
rules: Vec<Arc<Rule>>,
matches: Vec<Arc<FindingsStoreMessage>>,
index_map: FxHashMap<(BlobId, OffsetSpan), usize>,
blobs: FxHashSet<BlobId>,
clone_dir: PathBuf,
seen_bloom: Bloom<u64>,
bloom_items: usize,
dedup_filter: DedupBloomSet,
dependent_rule_ids: FxHashSet<String>,
blob_meta: FxHashMap<BlobId, Arc<BlobMetadata>>,
origin_meta: FxHashMap<u64, Arc<OriginSet>>,
@ -66,10 +119,6 @@ pub struct FindingsStore {
impl FindingsStore {
pub fn new(clone_dir: PathBuf) -> Self {
let expected_items = 10_000_000; // tune to your largest scan
let fp_rate = 0.001; // 0.1 % false-positive rate
let seen_bloom = Bloom::new_for_fp_rate(expected_items, fp_rate)
.expect("Bloom filter size params are valid");
Self {
rules: Vec::new(),
matches: Vec::new(),
@ -78,8 +127,7 @@ impl FindingsStore {
blob_meta: FxHashMap::default(),
origin_meta: FxHashMap::default(),
clone_dir,
seen_bloom,
bloom_items: 0,
dedup_filter: DedupBloomSet::new(),
dependent_rule_ids: FxHashSet::default(),
docker_images: FxHashMap::default(),
slack_links: FxHashMap::default(),
@ -187,11 +235,7 @@ impl FindingsStore {
.or_else(|| m.groups.captures.get(0).map(|c| c.raw_value()))
.unwrap_or("");
let origin_kind = match origin.first() {
Origin::GitRepo(_) => "git",
Origin::File(_) => "file",
Origin::Extended(_) => "ext",
};
let origin_kind = dedup_origin_kind(&origin);
let rule_id = m.rule.id().to_uppercase();
let key_string = if self.dependent_rule_ids.contains(&rule_id) {
@ -201,11 +245,9 @@ impl FindingsStore {
};
let key = xxh3_64(key_string.as_bytes());
if self.seen_bloom.check(&key) {
if self.dedup_filter.contains_or_insert(key) {
continue; // very likely a duplicate
}
self.seen_bloom.set(&key);
self.bloom_items += 1;
}
/*───────────────────────────────────────────────────────────────┐
@ -235,13 +277,6 @@ impl FindingsStore {
self.index_map.insert((blob_id, offset_span), idx);
}
/* ─────────────────────────────────────────────────────────────────── */
// Periodically rebuild Bloom filter to bound the FP rate
if dedup && self.bloom_items > 5_000_000 {
self.seen_bloom = Bloom::new_for_fp_rate(5_000_000, 0.001).unwrap();
self.bloom_items = 0;
}
added
}
@ -473,3 +508,21 @@ impl FindingsStore {
self.matches.chunks(chunk_size).map(|slice| slice.to_vec()) // keep Arc pointers
}
}
#[cfg(test)]
mod tests {
use super::DedupBloomSet;
#[test]
fn dedup_filter_remains_monotonic_across_growth() {
let mut filter = DedupBloomSet::with_capacity(2);
assert!(!filter.contains_or_insert(11));
assert!(!filter.contains_or_insert(22));
assert!(!filter.contains_or_insert(33));
assert!(filter.contains_or_insert(11));
assert!(filter.contains_or_insert(22));
assert!(filter.contains_or_insert(33));
}
}

View file

@ -108,7 +108,7 @@ fn main() -> anyhow::Result<()> {
// Run the real entry point on a thread with an explicit, larger stack so that
// deeply-nested async state machines (validation pipeline) cannot overflow the
// default main-thread stack.
const STACK_SIZE: usize = 64 * 1024 * 1024; // 64 MiB
const STACK_SIZE: usize = 32 * 1024 * 1024; // 32 MiB
let builder =
std::thread::Builder::new().name("kingfisher-main".to_string()).stack_size(STACK_SIZE);
@ -146,12 +146,12 @@ fn run() -> anyhow::Result<()> {
};
// Set up the Tokio runtime with the specified number of threads.
// Worker threads also need larger stacks because timed_validate_single_match
// compiles to an async state machine whose poll function has a very large
// stack frame (known LLVM limitation with big async fns).
// Worker threads need larger stacks because async state machines (validation
// pipeline) can produce large poll stack frames. 8 MiB is sufficient now that
// the validators are split into separate async fns.
let runtime = Builder::new_multi_thread()
.worker_threads(num_jobs)
.thread_stack_size(16 * 1024 * 1024) // 16 MiB per worker
.thread_stack_size(8 * 1024 * 1024) // 8 MiB per worker
.enable_all()
.build()
.context("Failed to create Tokio runtime")?;

View file

@ -13,7 +13,7 @@ use futures::{stream, FutureExt, StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use liquid::Parser;
use reqwest::StatusCode;
use rustc_hash::FxHashMap;
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::{sync::Notify, time::timeout};
use tracing::trace;
@ -22,7 +22,7 @@ use crate::{
blob::BlobId,
findings_store::{FindingsStore, FindingsStoreMessage},
location::OffsetSpan,
matcher::{Match, OwnedBlobMatch},
matcher::OwnedBlobMatch,
rules::rule::Validation,
validation::{
collect_variables_and_dependencies, utils, validate_single_match, CachedResponse,
@ -421,40 +421,44 @@ pub async fn run_secret_validation(
let success_count = Arc::new(AtomicUsize::new(0));
let fail_count = Arc::new(AtomicUsize::new(0));
// ── 2. Fetch rules + matches ────────────────────────────────────────────
let (_all_rules, all_matches_by_blob) = {
// ── 2. Fetch matches & partition ──────────────────────────────────────
// • simple_matches: Vec of Arcs for rules without dependencies
// • dependent_blob_ids: just the blob IDs — we re-fetch in Phase 2
// so we don't hold two full copies of the match set simultaneously
let (simple_matches, dependent_blob_ids) = {
let ds = datastore.lock().unwrap();
let rules = ds.get_rules()?;
let mut map: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
let matches = if let Some(r) = range.clone() {
ds.get_matches()[r].to_vec()
} else {
ds.get_matches().to_vec()
};
for arc_msg in matches.into_iter() {
map.entry(arc_msg.1.id).or_default().push(arc_msg);
let mut by_blob: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
for arc_msg in matches {
by_blob.entry(arc_msg.1.id).or_default().push(arc_msg);
}
(rules, map)
let mut simple = Vec::new();
let mut dep_ids = FxHashSet::default();
for (blob_id, blob_matches) in by_blob {
if blob_matches.iter().any(|m| !m.2.rule.syntax().depends_on_rule.is_empty()) {
dep_ids.insert(blob_id);
// Arcs dropped here — not held during Phase 1
} else {
simple.extend(blob_matches);
}
}
(simple, dep_ids)
};
// ── 3. Partition blobs ──────────────────────────────────────────────────
let mut simple_matches = Vec::new();
let mut dependent_blobs = FxHashMap::default(); // blob_id -- Vec<Arc<…>>
for (blob_id, matches) in all_matches_by_blob {
if matches.iter().any(|m| !m.2.rule.syntax().depends_on_rule.is_empty()) {
dependent_blobs.insert(blob_id, matches);
} else {
simple_matches.extend(matches);
}
}
// Result accumulator
let mut updated_arcs: Vec<Arc<FindingsStoreMessage>> = Vec::new();
// ── Phase 1: simple, global de-dupe ──────────────────────────────────────
if !simple_matches.is_empty() {
let mut groups: FxHashMap<String, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
// Keep only ONE representative per (rule_id, secret) group.
// Previous code stored ALL matches per group — holding thousands of
// Arc clones alive for the entire duration of the concurrent stream.
let total_simple = simple_matches.len();
let mut representatives: FxHashMap<String, Arc<FindingsStoreMessage>> =
FxHashMap::default();
for arc_msg in simple_matches {
// VALIDATION DEDUP: Use get(0) to get the first/primary capture for grouping.
//
@ -476,18 +480,19 @@ pub async fn run_secret_validation(
validation_group_key = %group_key,
"Grouping finding for validation"
);
groups.entry(group_key).or_default().push(arc_msg);
// Only keep the first representative — extra Arcs are dropped immediately
representatives.entry(group_key).or_insert(arc_msg);
}
trace!(
total_findings = groups.values().map(|v| v.len()).sum::<usize>(),
unique_validation_groups = groups.len(),
total_findings = total_simple,
unique_validation_groups = representatives.len(),
"Validation grouping complete (internal dedup)"
);
let validation_results = DashMap::<String, CachedResponse>::new();
let pb = ProgressBar::new(groups.len() as u64).with_message("Validating secrets…");
let pb = ProgressBar::new(representatives.len() as u64).with_message("Validating secrets…");
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} {msg} [{bar:40.green/blue}] {pos}/{len} ({percent}%) \
@ -498,21 +503,29 @@ pub async fn run_secret_validation(
);
pb.enable_steady_tick(Duration::from_millis(100));
// Shared empty maps — avoids allocating throwaway DashMaps per task
let empty_dep_vars: FxHashMap<String, Vec<(String, OffsetSpan)>> = FxHashMap::default();
let empty_missing: FxHashMap<String, Vec<String>> = FxHashMap::default();
let empty_cache: Arc<DashMap<String, CachedResponse>> = Arc::new(DashMap::new());
let empty_inflight: Arc<DashMap<String, ()>> = Arc::new(DashMap::new());
stream::iter(
groups.values().map(|v| v[0].clone()), // one representative
representatives.into_values(), // consumes map, dropping keys
)
.for_each_concurrent(concurrency, |rep_arc| {
// clones into task
let parser = parser.clone();
let clients = clients.clone();
let cache_glob = cache.clone();
let val_res = &validation_results;
let success = success_count.clone();
let fail = fail_count.clone();
// *** FIX: Clone the progress bar for each concurrent task ***
let pb = pb.clone();
let access_map = access_map.clone();
let rate_limiter = rate_limiter.clone();
let empty_dep_vars = &empty_dep_vars;
let empty_missing = &empty_missing;
let empty_cache = empty_cache.clone();
let empty_inflight = empty_inflight.clone();
async move {
// VALIDATION DEDUP: Use get(0) for the primary secret value.
@ -523,7 +536,6 @@ pub async fn run_secret_validation(
match val_res.entry(key.clone()) {
dashmap::mapref::entry::Entry::Occupied(_) => return,
dashmap::mapref::entry::Entry::Vacant(entry) => {
// *** FIX: Corrected placeholder to match struct definition ***
entry.insert(CachedResponse {
body: validation_body::from_string(String::new()),
status: StatusCode::ACCEPTED,
@ -542,10 +554,10 @@ pub async fn run_secret_validation(
&mut om,
&parser,
&clients,
&FxHashMap::default(),
&FxHashMap::default(),
&Arc::new(DashMap::new()),
&Arc::new(DashMap::new()),
empty_dep_vars,
empty_missing,
&empty_cache,
&empty_inflight,
&success,
&fail,
&cache_glob,
@ -565,35 +577,59 @@ pub async fn run_secret_validation(
};
val_res.insert(key, cr);
// Now we use the cloned `pb`
pb.inc(1);
}
.boxed()
})
.await;
// This is now valid because the original `pb` was never moved
pb.finish();
for (key, group) in groups {
let cr = validation_results.get(&key).expect("missing cached result");
for arc_msg in group {
let (origin, blob_md, old_match) = &*arc_msg;
updated_arcs.push(Arc::new((
origin.clone(),
blob_md.clone(),
Match {
validation_success: cr.is_valid,
validation_response_status: cr.status.as_u16(),
validation_response_body: cr.body.clone(),
..old_match.clone()
},
)));
// Apply Phase 1 results in-place — avoids cloning every Match
{
let mut ds = datastore.lock().unwrap();
let matches = ds.get_matches_mut();
let slice: &mut [Arc<FindingsStoreMessage>] = if let Some(ref r) = range {
&mut matches[r.clone()]
} else {
matches.as_mut_slice()
};
for match_arc in slice.iter_mut() {
// Skip dependent matches — handled in Phase 2
if !match_arc.2.rule.syntax().depends_on_rule.is_empty() {
continue;
}
let secret = match_arc.2.groups.captures.get(0).map_or("", |c| c.raw_value());
let key = format!("{}|{}", match_arc.2.rule.id(), secret);
if let Some(cr) = validation_results.get(&key) {
let (_, _, existing) = Arc::make_mut(match_arc);
existing.validation_success = cr.is_valid;
existing.validation_response_status = cr.status.as_u16();
existing.validation_response_body = cr.body.clone();
}
}
}
}
// ── Phase 2: blobs with dependencies (original logic) ───────────────────
if !dependent_blobs.is_empty() {
// ── Phase 2: blobs with dependencies ─────────────────────────────────────
// Re-fetch dependent matches from the datastore so we don't hold two
// copies of the full match set in memory simultaneously.
if !dependent_blob_ids.is_empty() {
let dependent_blobs: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = {
let ds = datastore.lock().unwrap();
let slice = if let Some(ref r) = range {
&ds.get_matches()[r.clone()]
} else {
ds.get_matches()
};
let mut map: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
for arc_msg in slice {
if dependent_blob_ids.contains(&arc_msg.1.id) {
map.entry(arc_msg.1.id).or_default().push(arc_msg.clone());
}
}
map
};
let blob_ids: Vec<_> = {
let mut v: Vec<_> = dependent_blobs.keys().cloned().collect();
v.sort_unstable();
@ -615,10 +651,21 @@ pub async fn run_secret_validation(
let val_cache = Arc::new(DashMap::<String, CachedResponse>::new());
let in_flight = Arc::new(DashMap::<String, ()>::new());
// Collect validation results keyed by finding_fingerprint:
// (validation_success, response_body, response_status_u16, dependent_captures)
type DepUpdate = (
bool,
crate::validation_body::ValidationResponseBody,
u16,
std::collections::BTreeMap<String, String>,
);
let mut dep_updates: FxHashMap<u64, DepUpdate> = FxHashMap::default();
for chunk in blob_ids.chunks(chunk_size) {
let tasks: Vec<_> = chunk
.iter()
.map(|blob_id| {
// Lazy iterator — futures are created on-demand by buffer_unordered,
// not all at once via .collect().
let validated_blobs: Vec<Vec<OwnedBlobMatch>> =
stream::iter(chunk.iter().map(|blob_id| {
let matches_for_blob = dependent_blobs.get(blob_id).unwrap().clone();
let parser = parser.clone();
let clients = clients.clone();
@ -643,6 +690,9 @@ pub async fn run_secret_validation(
})
.collect::<Vec<_>>();
// Drop Arc clones early — we only need OwnedBlobMatch from here
drop(matches_for_blob);
let (dep_vars, missing_deps) = collect_variables_and_dependencies(&owned);
let mut by_key: FxHashMap<String, Vec<OwnedBlobMatch>> =
@ -705,52 +755,56 @@ pub async fn run_secret_validation(
validated.into_iter().flatten().collect::<Vec<_>>()
}
.boxed()
})
.collect();
let validated_blobs: Vec<Vec<OwnedBlobMatch>> =
stream::iter(tasks).buffer_unordered(concurrency).collect().await;
}))
.buffer_unordered(concurrency)
.collect()
.await;
for blob_vec in validated_blobs {
if blob_vec.is_empty() {
continue;
}
let map_original: FxHashMap<u64, _> = dependent_blobs
.get(&blob_vec[0].blob_id)
.unwrap()
.iter()
.map(|arc_msg| (arc_msg.2.finding_fingerprint, arc_msg.clone()))
.collect();
for om in blob_vec {
let orig = map_original.get(&om.finding_fingerprint).unwrap();
updated_arcs.push(Arc::new((
orig.0.clone(),
orig.1.clone(),
Match {
validation_success: om.validation_success,
validation_response_body: om.validation_response_body.clone(),
validation_response_status: om.validation_response_status.as_u16(),
// Copy dependent_captures from validated OwnedBlobMatch
// so they're available for building validate/revoke commands
dependent_captures: om.dependent_captures.clone(),
..orig.2.clone()
},
)));
dep_updates.insert(
om.finding_fingerprint,
(
om.validation_success,
om.validation_response_body.clone(),
om.validation_response_status.as_u16(),
om.dependent_captures.clone(),
),
);
}
}
pb.inc(chunk.len() as u64);
}
pb.finish();
// Drop dependent blob Arc clones so datastore Arcs reach refcount == 1
drop(dependent_blobs);
// Apply Phase 2 results in-place
if !dep_updates.is_empty() {
let mut ds = datastore.lock().unwrap();
let matches = ds.get_matches_mut();
let slice: &mut [Arc<FindingsStoreMessage>] = if let Some(ref r) = range {
&mut matches[r.clone()]
} else {
matches.as_mut_slice()
};
for match_arc in slice.iter_mut() {
if let Some((success, body, status, dep_caps)) =
dep_updates.remove(&match_arc.2.finding_fingerprint)
{
let (_, _, existing) = Arc::make_mut(match_arc);
existing.validation_success = success;
existing.validation_response_status = status;
existing.validation_response_body = body;
existing.dependent_captures = dep_caps;
}
}
}
}
// ── 4. Persist all updates ──────────────────────────────────────────────
{
let mut ds = datastore.lock().unwrap();
ds.replace_matches(updated_arcs);
}
// Reclaim memory from static caches that accumulated during validation
crate::validation::clear_validation_caches();
Ok(())
}

File diff suppressed because it is too large Load diff