kingfisher/src/scanner/validation.rs
2026-04-01 10:20:52 -07:00

1360 lines
57 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use anyhow::Result;
use crossbeam_skiplist::SkipMap;
use dashmap::DashMap;
use futures::{stream, StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use liquid::Parser;
use reqwest::StatusCode;
use rustc_hash::FxHashMap;
use tokio::{sync::Notify, time::timeout};
use tracing::trace;
use crate::{
access_map::AccessMapRequest,
blob::BlobId,
findings_store::{FindingsStore, FindingsStoreMessage},
location::OffsetSpan,
matcher::{Match, OwnedBlobMatch},
rules::rule::Validation,
validation::{
collect_variables_and_dependencies, utils, validate_single_match, CachedResponse,
},
validation_body,
validation_rate_limit::ValidationRateLimiter,
};
#[derive(Clone, Default)]
pub struct AccessMapCollector {
inner: Arc<DashMap<u64, AccessMapRequest>>,
}
impl AccessMapCollector {
pub fn record_aws(&self, access_key: &str, secret_key: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("aws|{access_key}|{secret_key}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Aws {
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
session_token: None,
fingerprint,
});
}
pub fn record_gcp(&self, credential_json: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(credential_json.as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Gcp {
credential_json: credential_json.to_string(),
fingerprint,
});
}
pub fn record_azure(
&self,
credential_json: &str,
containers: Option<Vec<String>>,
fingerprint: String,
) {
let key = xxhash_rust::xxh3::xxh3_64(credential_json.as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Azure {
credential_json: credential_json.to_string(),
containers,
fingerprint,
});
}
pub fn record_azure_devops(&self, token: &str, organization: &str, fingerprint: String) {
let key =
xxhash_rust::xxh3::xxh3_64(format!("azure_devops|{organization}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::AzureDevops {
token: token.to_string(),
organization: organization.to_string(),
fingerprint,
});
}
pub fn record_github(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("github|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Github { token: token.to_string(), fingerprint });
}
pub fn record_gitlab(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("gitlab|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Gitlab { token: token.to_string(), fingerprint });
}
pub fn record_slack(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("slack|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Slack { token: token.to_string(), fingerprint });
}
pub fn record_postgres(&self, uri: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("postgres|{uri}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Postgres { uri: uri.to_string(), fingerprint });
}
pub fn record_mongodb(&self, uri: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("mongodb|{uri}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::MongoDB { uri: uri.to_string(), fingerprint });
}
pub fn record_huggingface(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("huggingface|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::HuggingFace {
token: token.to_string(),
fingerprint,
});
}
pub fn record_gitea(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("gitea|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Gitea { token: token.to_string(), fingerprint });
}
pub fn record_bitbucket(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("bitbucket|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Bitbucket {
token: token.to_string(),
fingerprint,
});
}
pub fn record_buildkite(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("buildkite|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Buildkite {
token: token.to_string(),
fingerprint,
});
}
pub fn record_harness(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("harness|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Harness { token: token.to_string(), fingerprint });
}
pub fn record_openai(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("openai|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::OpenAI { token: token.to_string(), fingerprint });
}
pub fn record_anthropic(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("anthropic|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Anthropic {
token: token.to_string(),
fingerprint,
});
}
pub fn record_salesforce(&self, token: &str, instance: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("salesforce|{instance}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Salesforce {
token: token.to_string(),
instance: instance.to_string(),
fingerprint,
});
}
pub fn record_weightsandbiases(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("weightsandbiases|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::WeightsAndBiases {
token: token.to_string(),
fingerprint,
});
}
pub fn record_microsoft_teams(&self, webhook_url: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("microsoft_teams|{webhook_url}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::MicrosoftTeams {
webhook_url: webhook_url.to_string(),
fingerprint,
});
}
pub fn record_airtable(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("airtable|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Airtable {
token: token.to_string(),
fingerprint,
});
}
pub fn record_algolia(&self, app_id: &str, api_key: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("algolia|{app_id}|{api_key}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Algolia {
app_id: app_id.to_string(),
api_key: api_key.to_string(),
fingerprint,
});
}
pub fn record_artifactory(&self, token: &str, base_url: Option<&str>, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("artifactory|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Artifactory {
token: token.to_string(),
base_url: base_url.map(|s| s.to_string()),
fingerprint,
});
}
pub fn record_auth0(
&self,
client_id: &str,
client_secret: &str,
domain: &str,
fingerprint: String,
) {
let key = xxhash_rust::xxh3::xxh3_64(
format!("auth0|{domain}|{client_id}|{client_secret}").as_bytes(),
);
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Auth0 {
client_id: client_id.to_string(),
client_secret: client_secret.to_string(),
domain: domain.to_string(),
fingerprint,
});
}
pub fn record_circleci(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("circleci|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::CircleCI {
token: token.to_string(),
fingerprint,
});
}
pub fn record_digitalocean(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("digitalocean|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::DigitalOcean {
token: token.to_string(),
fingerprint,
});
}
pub fn record_fastly(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("fastly|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Fastly { token: token.to_string(), fingerprint });
}
pub fn record_hubspot(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("hubspot|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::HubSpot { token: token.to_string(), fingerprint });
}
pub fn record_ibm_cloud(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("ibm_cloud|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::IbmCloud {
token: token.to_string(),
fingerprint,
});
}
pub fn record_jira(&self, token: &str, base_url: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("jira|{base_url}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Jira {
token: token.to_string(),
base_url: base_url.to_string(),
fingerprint,
});
}
pub fn record_mysql(&self, uri: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("mysql|{uri}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::MySQL { uri: uri.to_string(), fingerprint });
}
pub fn record_paypal(&self, client_id: &str, client_secret: &str, fingerprint: String) {
let key =
xxhash_rust::xxh3::xxh3_64(format!("paypal|{client_id}|{client_secret}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::PayPal {
client_id: client_id.to_string(),
client_secret: client_secret.to_string(),
fingerprint,
});
}
pub fn record_plaid(&self, client_id: &str, secret: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("plaid|{client_id}|{secret}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Plaid {
client_id: client_id.to_string(),
secret: secret.to_string(),
fingerprint,
});
}
pub fn record_sendgrid(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("sendgrid|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::SendGrid {
token: token.to_string(),
fingerprint,
});
}
pub fn record_sendinblue(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("sendinblue|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Sendinblue {
token: token.to_string(),
fingerprint,
});
}
pub fn record_shopify(&self, token: &str, subdomain: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("shopify|{subdomain}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Shopify {
token: token.to_string(),
subdomain: subdomain.to_string(),
fingerprint,
});
}
pub fn record_square(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("square|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Square { token: token.to_string(), fingerprint });
}
pub fn record_stripe(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("stripe|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Stripe { token: token.to_string(), fingerprint });
}
pub fn record_terraform(&self, token: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("terraform|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Terraform {
token: token.to_string(),
fingerprint,
});
}
pub fn record_xray(&self, token: &str, base_url: Option<&str>, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("xray|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Xray {
token: token.to_string(),
base_url: base_url.map(|s| s.to_string()),
fingerprint,
});
}
pub fn record_zendesk(&self, token: &str, subdomain: &str, fingerprint: String) {
let key = xxhash_rust::xxh3::xxh3_64(format!("zendesk|{subdomain}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Zendesk {
token: token.to_string(),
subdomain: subdomain.to_string(),
fingerprint,
});
}
pub fn into_requests(self) -> Vec<AccessMapRequest> {
self.inner.iter().map(|entry| entry.value().clone()).collect()
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_secret_validation(
datastore: Arc<Mutex<FindingsStore>>,
parser: &Parser,
clients: &crate::validation::ValidationClients,
cache: &Arc<SkipMap<String, CachedResponse>>,
num_jobs: usize,
range: Option<std::ops::Range<usize>>,
access_map: Option<AccessMapCollector>,
rate_limiter: Option<Arc<ValidationRateLimiter>>,
validation_timeout: Duration,
validation_retries: u32,
max_body_len: usize,
) -> Result<()> {
// ── 1. Concurrency & counters ───────────────────────────────────────────
let concurrency = if num_jobs > 0 { num_jobs } else { num_cpus::get() };
let chunk_size = std::cmp::max(concurrency * 50, 200);
let success_count = Arc::new(AtomicUsize::new(0));
let fail_count = Arc::new(AtomicUsize::new(0));
// ── 2. Fetch rules + matches ────────────────────────────────────────────
let (_all_rules, all_matches_by_blob) = {
let ds = datastore.lock().unwrap();
let rules = ds.get_rules()?;
let mut map: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
let matches = if let Some(r) = range.clone() {
ds.get_matches()[r].to_vec()
} else {
ds.get_matches().to_vec()
};
for arc_msg in matches.into_iter() {
map.entry(arc_msg.1.id).or_default().push(arc_msg);
}
(rules, map)
};
// ── 3. Partition blobs ──────────────────────────────────────────────────
let mut simple_matches = Vec::new();
let mut dependent_blobs = FxHashMap::default(); // blob_id -- Vec<Arc<…>>
for (blob_id, matches) in all_matches_by_blob {
if matches.iter().any(|m| !m.2.rule.syntax().depends_on_rule.is_empty()) {
dependent_blobs.insert(blob_id, matches);
} else {
simple_matches.extend(matches);
}
}
// Result accumulator
let mut updated_arcs: Vec<Arc<FindingsStoreMessage>> = Vec::new();
// ── Phase 1: simple, global de-dupe ──────────────────────────────────────
if !simple_matches.is_empty() {
let mut groups: FxHashMap<String, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
for arc_msg in simple_matches {
// VALIDATION DEDUP: Use get(0) to get the first/primary capture for grouping.
//
// This differs from fingerprint/reporting code (which uses get(1).or_else(get(0)))
// for backward compatibility reasons - changing fingerprint calculation would break
// historical baselines and dedup entries.
//
// For validation deduplication, we need the PRIMARY secret value to ensure each
// unique secret triggers a separate validation request. Using get(1) first would
// incorrectly pick up inner unnamed groups when patterns have nested captures
// like (?<REGEX>...(ABC|DEF)...), causing all matches to share the same
// validation result.
let secret = arc_msg.2.groups.captures.get(0).map_or("", |c| c.raw_value());
let group_key = format!("{}|{}", arc_msg.2.rule.id(), secret);
trace!(
rule_id = %arc_msg.2.rule.id(),
secret_value = %secret,
external_fingerprint = arc_msg.2.finding_fingerprint,
validation_group_key = %group_key,
"Grouping finding for validation"
);
groups.entry(group_key).or_default().push(arc_msg);
}
trace!(
total_findings = groups.values().map(|v| v.len()).sum::<usize>(),
unique_validation_groups = groups.len(),
"Validation grouping complete (internal dedup)"
);
let validation_results = DashMap::<String, CachedResponse>::new();
let pb = ProgressBar::new(groups.len() as u64).with_message("Validating secrets…");
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} {msg} [{bar:40.green/blue}] {pos}/{len} ({percent}%) \
[{elapsed_precise}]",
)?
.progress_chars("=>-")
.tick_chars("|/-\\"),
);
pb.enable_steady_tick(Duration::from_millis(100));
stream::iter(
groups.values().map(|v| v[0].clone()), // one representative
)
.for_each_concurrent(concurrency, |rep_arc| {
// clones into task
let parser = parser.clone();
let clients = clients.clone();
let cache_glob = cache.clone();
let val_res = &validation_results;
let success = success_count.clone();
let fail = fail_count.clone();
// *** FIX: Clone the progress bar for each concurrent task ***
let pb = pb.clone();
let access_map = access_map.clone();
let rate_limiter = rate_limiter.clone();
async move {
// VALIDATION DEDUP: Use get(0) for the primary secret value.
// See comment above for why this differs from fingerprint/reporting code.
let secret = rep_arc.2.groups.captures.get(0).map_or("", |c| c.raw_value());
let key = format!("{}|{}", rep_arc.2.rule.id(), secret);
match val_res.entry(key.clone()) {
dashmap::mapref::entry::Entry::Occupied(_) => return,
dashmap::mapref::entry::Entry::Vacant(entry) => {
// *** FIX: Corrected placeholder to match struct definition ***
entry.insert(CachedResponse {
body: validation_body::from_string(String::new()),
status: StatusCode::ACCEPTED,
is_valid: false,
timestamp: Instant::now(),
});
}
}
let mut om = OwnedBlobMatch::convert_match_to_owned_blobmatch(
&rep_arc.2,
rep_arc.2.rule.clone(),
);
validate_single(
&mut om,
&parser,
&clients,
&FxHashMap::default(),
&FxHashMap::default(),
&Arc::new(DashMap::new()),
&Arc::new(DashMap::new()),
&success,
&fail,
&cache_glob,
access_map.as_ref(),
rate_limiter.as_deref(),
validation_timeout,
validation_retries,
max_body_len,
)
.await;
let cr = CachedResponse {
body: om.validation_response_body.clone(),
status: om.validation_response_status,
is_valid: om.validation_success,
timestamp: Instant::now(),
};
val_res.insert(key, cr);
// Now we use the cloned `pb`
pb.inc(1);
}
})
.await;
// This is now valid because the original `pb` was never moved
pb.finish();
for (key, group) in groups {
let cr = validation_results.get(&key).expect("missing cached result");
for arc_msg in group {
let (origin, blob_md, old_match) = &*arc_msg;
updated_arcs.push(Arc::new((
origin.clone(),
blob_md.clone(),
Match {
validation_success: cr.is_valid,
validation_response_status: cr.status.as_u16(),
validation_response_body: cr.body.clone(),
..old_match.clone()
},
)));
}
}
}
// ── Phase 2: blobs with dependencies (original logic) ───────────────────
if !dependent_blobs.is_empty() {
let blob_ids: Vec<_> = {
let mut v: Vec<_> = dependent_blobs.keys().cloned().collect();
v.sort_unstable();
v
};
let total = blob_ids.len();
let pb = ProgressBar::new(total as u64).with_message("Validating dependent secrets…");
pb.set_style(
ProgressStyle::with_template(
"{spinner:.yellow} {msg} [{bar:40.yellow/blue}] {pos}/{len} ({percent}%) \
[{elapsed_precise}]",
)?
.progress_chars("=>-")
.tick_chars("|/-\\"),
);
pb.enable_steady_tick(Duration::from_millis(100));
let val_cache = Arc::new(DashMap::<String, CachedResponse>::new());
let in_flight = Arc::new(DashMap::<String, ()>::new());
for chunk in blob_ids.chunks(chunk_size) {
let tasks: Vec<_> = chunk
.iter()
.map(|blob_id| {
let matches_for_blob = dependent_blobs.get(blob_id).unwrap().clone();
let parser = parser.clone();
let clients = clients.clone();
let val_cache = val_cache.clone();
let in_flight = in_flight.clone();
let success = success_count.clone();
let fail = fail_count.clone();
let cache_glob = cache.clone();
let access_map = access_map.clone();
let rate_limiter = rate_limiter.clone();
let validation_timeout = validation_timeout;
let validation_retries = validation_retries;
async move {
let owned = matches_for_blob
.iter()
.map(|arc_msg| {
OwnedBlobMatch::convert_match_to_owned_blobmatch(
&arc_msg.2,
arc_msg.2.rule.clone(),
)
})
.collect::<Vec<_>>();
let (dep_vars, missing_deps) = collect_variables_and_dependencies(&owned);
let mut by_key: FxHashMap<String, Vec<OwnedBlobMatch>> =
FxHashMap::default();
for om in owned {
by_key.entry(build_cache_key(&om, &dep_vars)).or_default().push(om);
}
let reps: Vec<_> =
by_key.into_iter().map(|(_k, mut v)| (v.remove(0), v)).collect();
let validated: Vec<_> =
stream::iter(reps.into_iter().map(|(mut rep, mut dups)| {
let parser = parser.clone();
let clients = clients.clone();
let dep_vars = dep_vars.clone();
let miss_deps = missing_deps.clone();
let val_cache = val_cache.clone();
let in_flight = in_flight.clone();
let success = success.clone();
let fail = fail.clone();
let cache_glob = cache_glob.clone();
let access_map = access_map.clone();
let rate_limiter = rate_limiter.clone();
async move {
validate_single(
&mut rep,
&parser,
&clients,
&dep_vars,
&miss_deps,
&val_cache,
&in_flight,
&success,
&fail,
&cache_glob,
access_map.as_ref(),
rate_limiter.as_deref(),
validation_timeout,
validation_retries,
max_body_len,
)
.await;
for d in &mut dups {
d.validation_success = rep.validation_success;
d.validation_response_body =
rep.validation_response_body.clone();
d.validation_response_status =
rep.validation_response_status;
}
let mut out = vec![rep];
out.extend(dups);
out
}
}))
.buffer_unordered(concurrency)
.collect()
.await;
validated.into_iter().flatten().collect::<Vec<_>>()
}
})
.collect();
let validated_blobs: Vec<Vec<OwnedBlobMatch>> =
stream::iter(tasks).buffer_unordered(concurrency).collect().await;
for blob_vec in validated_blobs {
if blob_vec.is_empty() {
continue;
}
let map_original: FxHashMap<u64, _> = dependent_blobs
.get(&blob_vec[0].blob_id)
.unwrap()
.iter()
.map(|arc_msg| (arc_msg.2.finding_fingerprint, arc_msg.clone()))
.collect();
for om in blob_vec {
let orig = map_original.get(&om.finding_fingerprint).unwrap();
updated_arcs.push(Arc::new((
orig.0.clone(),
orig.1.clone(),
Match {
validation_success: om.validation_success,
validation_response_body: om.validation_response_body.clone(),
validation_response_status: om.validation_response_status.as_u16(),
// Copy dependent_captures from validated OwnedBlobMatch
// so they're available for building validate/revoke commands
dependent_captures: om.dependent_captures.clone(),
..orig.2.clone()
},
)));
}
}
pb.inc(chunk.len() as u64);
}
pb.finish();
}
// ── 4. Persist all updates ──────────────────────────────────────────────
{
let mut ds = datastore.lock().unwrap();
ds.replace_matches(updated_arcs);
}
Ok(())
}
// ---------------------------------------------------
// The core validation logic, used in an async pipeline
// ---------------------------------------------------
async fn validate_single(
om: &mut OwnedBlobMatch,
parser: &Parser,
clients: &crate::validation::ValidationClients,
dep_vars: &FxHashMap<String, Vec<(String, OffsetSpan)>>,
missing_deps: &FxHashMap<String, Vec<String>>,
cache: &DashMap<String, CachedResponse>,
in_progress: &DashMap<String, ()>,
success_count: &AtomicUsize,
fail_count: &AtomicUsize,
cache2: &Arc<SkipMap<String, CachedResponse>>,
access_map: Option<&AccessMapCollector>,
rate_limiter: Option<&ValidationRateLimiter>,
validation_timeout: Duration,
validation_retries: u32,
max_body_len: usize,
) {
// Build key
let dep_vars_str = dep_vars
.get(om.rule.id())
.map(|hm| {
let mut sorted: Vec<_> = hm.iter().collect();
sorted.sort_by(|(k, _), (k2, _)| k.cmp(k2));
sorted.into_iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("|")
})
.unwrap_or_default();
let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string());
let cache_key = format!("{}|{}|{}", om.rule.name(), capture0, dep_vars_str);
// Check cache first
if let Some(cached) = cache.get(&cache_key) {
om.validation_success = cached.is_valid;
om.validation_response_body = cached.body.clone();
om.validation_response_status = cached.status;
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
maybe_record_access_map(om, access_map);
return;
}
static NOTIFY: once_cell::sync::Lazy<DashMap<String, Arc<Notify>>> =
once_cell::sync::Lazy::new(DashMap::new);
let notify = NOTIFY.entry(cache_key.clone()).or_insert_with(|| Arc::new(Notify::new())).clone();
let first = in_progress.insert(cache_key.clone(), ()).is_none();
if !first {
notify.notified().await; // suspend with zero polling
// cached result now present
if let Some(cached) = cache.get(&cache_key) {
om.validation_success = cached.is_valid;
om.validation_response_body = cached.body.clone();
om.validation_response_status = cached.status;
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
maybe_record_access_map(om, access_map);
return; // Exit early if cached result is found
}
return;
}
// If we reach here, we're the first task to validate this key
// Perform validation
let outcome = timeout(validation_timeout, async {
validate_single_match(
om,
parser,
clients,
dep_vars,
missing_deps,
cache2,
validation_timeout,
validation_retries,
rate_limiter,
max_body_len,
)
.await
})
.await;
// Store result in cache
match outcome {
Ok(_) => {
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
cache.insert(
cache_key.clone(),
CachedResponse {
is_valid: om.validation_success,
status: om.validation_response_status,
body: om.validation_response_body.clone(),
timestamp: Instant::now(),
},
);
}
Err(_) => {
om.validation_success = false;
om.validation_response_body = validation_body::from_string("Validation timed out");
om.validation_response_status = http::StatusCode::REQUEST_TIMEOUT;
fail_count.fetch_add(1, Ordering::Relaxed);
}
}
maybe_record_access_map(om, access_map);
// Remove from `in_progress`
// in_progress.remove(&cache_key);
in_progress.remove(&cache_key);
if let Some(n) = NOTIFY.remove(&cache_key) {
n.1.notify_waiters(); // wake everyone
}
}
// Helper to compute the cache key for an OwnedBlobMatch
fn build_cache_key(
om: &OwnedBlobMatch,
dep_vars: &FxHashMap<String, Vec<(String, OffsetSpan)>>,
) -> String {
// Build key
let dep_vars_str = dep_vars
.get(om.rule.id())
.map(|hm| {
let mut sorted: Vec<_> = hm.iter().collect();
sorted.sort_by(|(k, _), (k2, _)| k.cmp(k2));
sorted.into_iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("|")
})
.unwrap_or_default();
// For demonstration, well do a simplistic approach
// You can adapt from your existing logic
let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string());
format!("{}|{}|{}", om.rule.name(), capture0, dep_vars_str)
}
fn maybe_record_access_map(om: &OwnedBlobMatch, collector: Option<&AccessMapCollector>) {
let is_gitlab_rule = om.rule.id().starts_with("kingfisher.gitlab.");
let validation_ok =
om.validation_success || (is_gitlab_rule && om.validation_response_status.is_success());
let collector = match collector {
Some(c) if validation_ok => c,
_ => return,
};
let captures = utils::process_captures(&om.captures);
let fp = om.finding_fingerprint.to_string();
match om.rule.syntax().validation {
Some(Validation::AWS) => {
let secret = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let mut akid = utils::find_closest_variable(&captures, &secret, "TOKEN", "AKID")
.unwrap_or_default();
if akid.is_empty() {
akid = extract_akid_from_body(&om.validation_response_body).unwrap_or_default();
}
if !akid.is_empty() && !secret.is_empty() {
collector.record_aws(&akid, &secret, fp.clone());
}
}
Some(Validation::GCP) => {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_gcp(value, fp.clone());
}
}
}
Some(Validation::AzureStorage) => {
let storage_key = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let storage_account =
utils::find_closest_variable(&captures, &storage_key, "TOKEN", "AZURENAME")
.unwrap_or_default();
let mut storage_account = storage_account;
if storage_account.is_empty() {
storage_account =
extract_azure_storage_account_from_body(&om.validation_response_body)
.unwrap_or_default();
}
let containers_hint =
extract_azure_storage_containers_from_body(&om.validation_response_body);
if !storage_account.is_empty() && !storage_key.is_empty() {
let creds_json = format!(
r#"{{"storage_account":"{}","storage_key":"{}"}}"#,
storage_account, storage_key
);
collector.record_azure(&creds_json, containers_hint, fp.clone());
}
}
Some(Validation::Postgres) => {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_postgres(value, fp.clone());
}
}
}
Some(Validation::MongoDB) => {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_mongodb(value, fp.clone());
}
}
}
Some(Validation::MySQL) => {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_mysql(value, fp.clone());
}
}
}
_ => {
if om.rule.id().starts_with("kingfisher.github.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_github(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.azure.devops.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let mut organization =
utils::find_closest_variable(&captures, &token, "TOKEN", "AZURE_DEVOPS_ORG")
.unwrap_or_default();
if organization.is_empty() {
organization = extract_azure_devops_org_from_body(&om.validation_response_body)
.unwrap_or_default();
}
if !token.is_empty() && !organization.is_empty() {
collector.record_azure_devops(&token, &organization, fp.clone());
}
}
if is_gitlab_rule {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_gitlab(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.slack.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_slack(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.huggingface.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_huggingface(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.gitea.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_gitea(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.bitbucket.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_bitbucket(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.buildkite.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_buildkite(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.harness.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_harness(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.openai.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_openai(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.anthropic.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_anthropic(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.salesforce.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let instance = captures
.iter()
.find(|(name, ..)| name == "INSTANCE")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("INSTANCE").cloned())
.unwrap_or_default();
if !token.is_empty() && !instance.is_empty() {
collector.record_salesforce(&token, &instance, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.wandb.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_weightsandbiases(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.msteams.")
|| om.rule.id().starts_with("kingfisher.microsoftteamswebhook.")
{
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_microsoft_teams(value, fp.clone());
}
}
}
// --- New providers ---
if om.rule.id().starts_with("kingfisher.airtable.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_airtable(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.algolia.") {
let api_key = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let app_id = captures
.iter()
.find(|(name, ..)| name == "APPID")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("APPID").cloned())
.unwrap_or_default();
if !api_key.is_empty() && !app_id.is_empty() {
collector.record_algolia(&app_id, &api_key, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.artifactory.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
let base_url = captures
.iter()
.find(|(name, ..)| name == "HOST" || name == "URL")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("HOST").cloned());
collector.record_artifactory(value, base_url.as_deref(), fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.auth0.") {
let client_secret = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let client_id = captures
.iter()
.find(|(name, ..)| name == "CLIENTID")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("CLIENTID").cloned())
.unwrap_or_default();
let domain = captures
.iter()
.find(|(name, ..)| name == "DOMAIN")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("DOMAIN").cloned())
.unwrap_or_default();
if !client_secret.is_empty() && !client_id.is_empty() && !domain.is_empty() {
collector.record_auth0(&client_id, &client_secret, &domain, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.circleci.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_circleci(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.digitalocean.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_digitalocean(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.fastly.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_fastly(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.hubspot.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_hubspot(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.ibm.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_ibm_cloud(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.jira.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let base_url = captures
.iter()
.find(|(name, ..)| name == "DOMAIN" || name == "URL")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("DOMAIN").cloned())
.unwrap_or_default();
if !token.is_empty() && !base_url.is_empty() {
let url = if base_url.starts_with("http") {
base_url
} else {
format!("https://{base_url}")
};
collector.record_jira(&token, &url, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.paypal.") {
let client_secret = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let client_id = captures
.iter()
.find(|(name, ..)| name == "CLIENTID")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("CLIENTID").cloned())
.unwrap_or_default();
if !client_secret.is_empty() && !client_id.is_empty() {
collector.record_paypal(&client_id, &client_secret, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.plaid.") {
let secret = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let client_id = captures
.iter()
.find(|(name, ..)| name == "CLIENTID")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("CLIENTID").cloned())
.unwrap_or_default();
if !secret.is_empty() && !client_id.is_empty() {
collector.record_plaid(&client_id, &secret, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.sendgrid.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_sendgrid(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.sendinblue.")
|| om.rule.id().starts_with("kingfisher.brevo.")
{
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_sendinblue(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.shopify.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let subdomain = captures
.iter()
.find(|(name, ..)| name == "DOMAIN" || name == "SUBDOMAIN")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("DOMAIN").cloned())
.unwrap_or_default();
if !token.is_empty() && !subdomain.is_empty() {
collector.record_shopify(&token, &subdomain, fp.clone());
}
}
if om.rule.id().starts_with("kingfisher.square.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_square(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.stripe.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_stripe(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.terraform.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_terraform(value, fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.jfrog.")
|| om.rule.id().starts_with("kingfisher.xray.")
{
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
let base_url = captures
.iter()
.find(|(name, ..)| name == "HOST" || name == "URL")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("HOST").cloned());
collector.record_xray(value, base_url.as_deref(), fp.clone());
}
}
}
if om.rule.id().starts_with("kingfisher.zendesk.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let subdomain = captures
.iter()
.find(|(name, ..)| name == "SUBDOMAIN" || name == "DOMAIN")
.map(|(_, value, ..)| value.clone())
.or_else(|| om.dependent_captures.get("SUBDOMAIN").cloned())
.unwrap_or_default();
if !token.is_empty() && !subdomain.is_empty() {
collector.record_zendesk(&token, &subdomain, fp.clone());
}
}
}
}
}
fn extract_akid_from_body(body: &validation_body::ValidationResponseBody) -> Option<String> {
static AKID_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(
r"(?xi)\b(?:A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[0-9A-Z]{16}\b",
)
.expect("valid regex")
});
let text = validation_body::clone_as_string(body);
AKID_RE.find(&text).map(|m| m.as_str().to_string())
}
fn extract_azure_storage_account_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<String> {
static ACCOUNT_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r"(?i)Account:\s*([a-z0-9]{3,24})").expect("valid regex")
});
let text = validation_body::clone_as_string(body);
ACCOUNT_RE.captures(&text).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
}
fn extract_azure_storage_containers_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<Vec<String>> {
static CONTAINERS_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r"(?i)Containers:\s*(\\[[^\\]]*\\])").expect("valid regex")
});
let text = validation_body::clone_as_string(body);
let capture = CONTAINERS_RE
.captures(&text)
.and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))?;
serde_json::from_str::<Vec<String>>(&capture).ok()
}
fn extract_azure_devops_org_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<String> {
static ORG_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r#"(?i)https?://dev\.azure\.com/([a-z0-9][a-z0-9-]{0,61}[a-z0-9])"#)
.expect("valid regex")
});
let text = validation_body::clone_as_string(body);
ORG_RE.captures(&text).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
}