kingfisher/src/scanner/validation.rs
Mick Grove 7237a931d5 v1.73.0
2026-01-01 22:24:57 -08:00

697 lines
28 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use anyhow::Result;
use crossbeam_skiplist::SkipMap;
use dashmap::DashMap;
use futures::{stream, StreamExt};
use indicatif::{ProgressBar, ProgressStyle};
use liquid::Parser;
use reqwest::{Client, StatusCode};
use rustc_hash::FxHashMap;
use tokio::{sync::Notify, time::timeout};
use crate::{
access_map::AccessMapRequest,
blob::BlobId,
findings_store::{FindingsStore, FindingsStoreMessage},
location::OffsetSpan,
matcher::{Match, OwnedBlobMatch},
rules::rule::Validation,
validation::{
collect_variables_and_dependencies, utils, validate_single_match, CachedResponse,
},
validation_body,
};
#[derive(Clone, Default)]
pub struct AccessMapCollector {
inner: Arc<DashMap<u64, AccessMapRequest>>,
}
impl AccessMapCollector {
pub fn record_aws(&self, access_key: &str, secret_key: &str) {
let key = xxhash_rust::xxh3::xxh3_64(format!("aws|{access_key}|{secret_key}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Aws {
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
session_token: None,
});
}
pub fn record_gcp(&self, credential_json: &str) {
let key = xxhash_rust::xxh3::xxh3_64(credential_json.as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Gcp {
credential_json: credential_json.to_string(),
});
}
pub fn record_azure(&self, credential_json: &str, containers: Option<Vec<String>>) {
let key = xxhash_rust::xxh3::xxh3_64(credential_json.as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::Azure {
credential_json: credential_json.to_string(),
containers,
});
}
pub fn record_azure_devops(&self, token: &str, organization: &str) {
let key =
xxhash_rust::xxh3::xxh3_64(format!("azure_devops|{organization}|{token}").as_bytes());
self.inner.entry(key).or_insert_with(|| AccessMapRequest::AzureDevops {
token: token.to_string(),
organization: organization.to_string(),
});
}
pub fn record_github(&self, token: &str) {
let key = xxhash_rust::xxh3::xxh3_64(format!("github|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Github { token: token.to_string() });
}
pub fn record_gitlab(&self, token: &str) {
let key = xxhash_rust::xxh3::xxh3_64(format!("gitlab|{token}").as_bytes());
self.inner
.entry(key)
.or_insert_with(|| AccessMapRequest::Gitlab { token: token.to_string() });
}
pub fn into_requests(self) -> Vec<AccessMapRequest> {
self.inner.iter().map(|entry| entry.value().clone()).collect()
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_secret_validation(
datastore: Arc<Mutex<FindingsStore>>,
parser: &Parser,
client: &Client,
cache: &Arc<SkipMap<String, CachedResponse>>,
num_jobs: usize,
range: Option<std::ops::Range<usize>>,
access_map: Option<AccessMapCollector>,
validation_timeout: Duration,
validation_retries: u32,
) -> Result<()> {
// ── 1. Concurrency & counters ───────────────────────────────────────────
let concurrency = if num_jobs > 0 { num_jobs } else { num_cpus::get() };
let chunk_size = std::cmp::max(concurrency * 50, 200);
let success_count = Arc::new(AtomicUsize::new(0));
let fail_count = Arc::new(AtomicUsize::new(0));
// ── 2. Fetch rules + matches ────────────────────────────────────────────
let (_all_rules, all_matches_by_blob) = {
let ds = datastore.lock().unwrap();
let rules = ds.get_rules()?;
let mut map: FxHashMap<BlobId, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
let matches = if let Some(r) = range.clone() {
ds.get_matches()[r].to_vec()
} else {
ds.get_matches().to_vec()
};
for arc_msg in matches.into_iter() {
map.entry(arc_msg.1.id).or_default().push(arc_msg);
}
(rules, map)
};
// ── 3. Partition blobs ──────────────────────────────────────────────────
let mut simple_matches = Vec::new();
let mut dependent_blobs = FxHashMap::default(); // blob_id -- Vec<Arc<…>>
for (blob_id, matches) in all_matches_by_blob {
if matches.iter().any(|m| !m.2.rule.syntax().depends_on_rule.is_empty()) {
dependent_blobs.insert(blob_id, matches);
} else {
simple_matches.extend(matches);
}
}
// Result accumulator
let mut updated_arcs: Vec<Arc<FindingsStoreMessage>> = Vec::new();
// ── Phase 1: simple, global de-dupe ──────────────────────────────────────
if !simple_matches.is_empty() {
let mut groups: FxHashMap<String, Vec<Arc<FindingsStoreMessage>>> = FxHashMap::default();
for arc_msg in simple_matches {
let secret = arc_msg
.2
.groups
.captures
.get(1)
.or_else(|| arc_msg.2.groups.captures.get(0))
.map_or("", |c| c.raw_value());
groups.entry(format!("{}|{}", arc_msg.2.rule.id(), secret)).or_default().push(arc_msg);
}
let validation_results = DashMap::<String, CachedResponse>::new();
let pb = ProgressBar::new(groups.len() as u64).with_message("Validating secrets…");
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} {msg} [{bar:40.green/blue}] {pos}/{len} ({percent}%) \
[{elapsed_precise}]",
)?
.progress_chars("=>-")
.tick_chars("|/-\\"),
);
pb.enable_steady_tick(Duration::from_millis(100));
stream::iter(
groups.values().map(|v| v[0].clone()), // one representative
)
.for_each_concurrent(concurrency, |rep_arc| {
// clones into task
let parser = parser.clone();
let client = client.clone();
let cache_glob = cache.clone();
let val_res = &validation_results;
let success = success_count.clone();
let fail = fail_count.clone();
// *** FIX: Clone the progress bar for each concurrent task ***
let pb = pb.clone();
let access_map = access_map.clone();
async move {
let secret = rep_arc
.2
.groups
.captures
.get(1)
.or_else(|| rep_arc.2.groups.captures.get(0))
.map_or("", |c| c.raw_value());
let key = format!("{}|{}", rep_arc.2.rule.id(), secret);
match val_res.entry(key.clone()) {
dashmap::mapref::entry::Entry::Occupied(_) => return,
dashmap::mapref::entry::Entry::Vacant(entry) => {
// *** FIX: Corrected placeholder to match struct definition ***
entry.insert(CachedResponse {
body: validation_body::from_string(String::new()),
status: StatusCode::ACCEPTED,
is_valid: false,
timestamp: Instant::now(),
});
}
}
let mut om = OwnedBlobMatch::convert_match_to_owned_blobmatch(
&rep_arc.2,
rep_arc.2.rule.clone(),
);
validate_single(
&mut om,
&parser,
&client,
&FxHashMap::default(),
&FxHashMap::default(),
&Arc::new(DashMap::new()),
&Arc::new(DashMap::new()),
&success,
&fail,
&cache_glob,
access_map.as_ref(),
validation_timeout,
validation_retries,
)
.await;
let cr = CachedResponse {
body: om.validation_response_body.clone(),
status: om.validation_response_status,
is_valid: om.validation_success,
timestamp: Instant::now(),
};
val_res.insert(key, cr);
// Now we use the cloned `pb`
pb.inc(1);
}
})
.await;
// This is now valid because the original `pb` was never moved
pb.finish();
for (key, group) in groups {
let cr = validation_results.get(&key).expect("missing cached result");
for arc_msg in group {
let (origin, blob_md, old_match) = &*arc_msg;
updated_arcs.push(Arc::new((
origin.clone(),
blob_md.clone(),
Match {
validation_success: cr.is_valid,
validation_response_status: cr.status.as_u16(),
validation_response_body: cr.body.clone(),
..old_match.clone()
},
)));
}
}
}
// ── Phase 2: blobs with dependencies (original logic) ───────────────────
if !dependent_blobs.is_empty() {
let blob_ids: Vec<_> = {
let mut v: Vec<_> = dependent_blobs.keys().cloned().collect();
v.sort_unstable();
v
};
let total = blob_ids.len();
let pb = ProgressBar::new(total as u64).with_message("Validating dependent secrets…");
pb.set_style(
ProgressStyle::with_template(
"{spinner:.yellow} {msg} [{bar:40.yellow/blue}] {pos}/{len} ({percent}%) \
[{elapsed_precise}]",
)?
.progress_chars("=>-")
.tick_chars("|/-\\"),
);
pb.enable_steady_tick(Duration::from_millis(100));
let val_cache = Arc::new(DashMap::<String, CachedResponse>::new());
let in_flight = Arc::new(DashMap::<String, ()>::new());
for chunk in blob_ids.chunks(chunk_size) {
let tasks: Vec<_> = chunk
.iter()
.map(|blob_id| {
let matches_for_blob = dependent_blobs.get(blob_id).unwrap().clone();
let parser = parser.clone();
let client = client.clone();
let val_cache = val_cache.clone();
let in_flight = in_flight.clone();
let success = success_count.clone();
let fail = fail_count.clone();
let cache_glob = cache.clone();
let access_map = access_map.clone();
let validation_timeout = validation_timeout;
let validation_retries = validation_retries;
async move {
let owned = matches_for_blob
.iter()
.map(|arc_msg| {
OwnedBlobMatch::convert_match_to_owned_blobmatch(
&arc_msg.2,
arc_msg.2.rule.clone(),
)
})
.collect::<Vec<_>>();
let (dep_vars, missing_deps) = collect_variables_and_dependencies(&owned);
let mut by_key: FxHashMap<String, Vec<OwnedBlobMatch>> =
FxHashMap::default();
for om in owned {
by_key.entry(build_cache_key(&om, &dep_vars)).or_default().push(om);
}
let reps: Vec<_> =
by_key.into_iter().map(|(_k, mut v)| (v.remove(0), v)).collect();
let validated: Vec<_> =
stream::iter(reps.into_iter().map(|(mut rep, mut dups)| {
let parser = parser.clone();
let client = client.clone();
let dep_vars = dep_vars.clone();
let miss_deps = missing_deps.clone();
let val_cache = val_cache.clone();
let in_flight = in_flight.clone();
let success = success.clone();
let fail = fail.clone();
let cache_glob = cache_glob.clone();
let access_map = access_map.clone();
async move {
validate_single(
&mut rep,
&parser,
&client,
&dep_vars,
&miss_deps,
&val_cache,
&in_flight,
&success,
&fail,
&cache_glob,
access_map.as_ref(),
validation_timeout,
validation_retries,
)
.await;
for d in &mut dups {
d.validation_success = rep.validation_success;
d.validation_response_body =
rep.validation_response_body.clone();
d.validation_response_status =
rep.validation_response_status;
}
let mut out = vec![rep];
out.extend(dups);
out
}
}))
.buffer_unordered(concurrency)
.collect()
.await;
validated.into_iter().flatten().collect::<Vec<_>>()
}
})
.collect();
let validated_blobs: Vec<Vec<OwnedBlobMatch>> =
stream::iter(tasks).buffer_unordered(concurrency).collect().await;
for blob_vec in validated_blobs {
if blob_vec.is_empty() {
continue;
}
let map_original: FxHashMap<u64, _> = dependent_blobs
.get(&blob_vec[0].blob_id)
.unwrap()
.iter()
.map(|arc_msg| (arc_msg.2.finding_fingerprint, arc_msg.clone()))
.collect();
for om in blob_vec {
let orig = map_original.get(&om.finding_fingerprint).unwrap();
updated_arcs.push(Arc::new((
orig.0.clone(),
orig.1.clone(),
Match {
validation_success: om.validation_success,
validation_response_body: om.validation_response_body.clone(),
validation_response_status: om.validation_response_status.as_u16(),
..orig.2.clone()
},
)));
}
}
pb.inc(chunk.len() as u64);
}
pb.finish();
}
// ── 4. Persist all updates ──────────────────────────────────────────────
{
let mut ds = datastore.lock().unwrap();
ds.replace_matches(updated_arcs);
}
Ok(())
}
// ---------------------------------------------------
// The core validation logic, used in an async pipeline
// ---------------------------------------------------
async fn validate_single(
om: &mut OwnedBlobMatch,
parser: &Parser,
client: &Client,
dep_vars: &FxHashMap<String, Vec<(String, OffsetSpan)>>,
missing_deps: &FxHashMap<String, Vec<String>>,
cache: &DashMap<String, CachedResponse>,
in_progress: &DashMap<String, ()>,
success_count: &AtomicUsize,
fail_count: &AtomicUsize,
cache2: &Arc<SkipMap<String, CachedResponse>>,
access_map: Option<&AccessMapCollector>,
validation_timeout: Duration,
validation_retries: u32,
) {
// Build key
let dep_vars_str = dep_vars
.get(om.rule.id())
.map(|hm| {
let mut sorted: Vec<_> = hm.iter().collect();
sorted.sort_by(|(k, _), (k2, _)| k.cmp(k2));
sorted.into_iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("|")
})
.unwrap_or_default();
let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string());
let cache_key = format!("{}|{}|{}", om.rule.name(), capture0, dep_vars_str);
// Check cache first
if let Some(cached) = cache.get(&cache_key) {
om.validation_success = cached.is_valid;
om.validation_response_body = cached.body.clone();
om.validation_response_status = cached.status;
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
maybe_record_access_map(om, access_map);
return;
}
static NOTIFY: once_cell::sync::Lazy<DashMap<String, Arc<Notify>>> =
once_cell::sync::Lazy::new(DashMap::new);
let notify = NOTIFY.entry(cache_key.clone()).or_insert_with(|| Arc::new(Notify::new())).clone();
let first = in_progress.insert(cache_key.clone(), ()).is_none();
if !first {
notify.notified().await; // suspend with zero polling
// cached result now present
if let Some(cached) = cache.get(&cache_key) {
om.validation_success = cached.is_valid;
om.validation_response_body = cached.body.clone();
om.validation_response_status = cached.status;
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
maybe_record_access_map(om, access_map);
return; // Exit early if cached result is found
}
return;
}
// If we reach here, we're the first task to validate this key
// Perform validation
let outcome = timeout(validation_timeout, async {
validate_single_match(
om,
parser,
client,
dep_vars,
missing_deps,
cache2,
validation_timeout,
validation_retries,
)
.await
})
.await;
// Store result in cache
match outcome {
Ok(_) => {
if om.validation_success {
success_count.fetch_add(1, Ordering::Relaxed);
} else if om.validation_response_status != http::StatusCode::CONTINUE {
fail_count.fetch_add(1, Ordering::Relaxed);
}
cache.insert(
cache_key.clone(),
CachedResponse {
is_valid: om.validation_success,
status: om.validation_response_status,
body: om.validation_response_body.clone(),
timestamp: Instant::now(),
},
);
}
Err(_) => {
om.validation_success = false;
om.validation_response_body = validation_body::from_string("Validation timed out");
om.validation_response_status = http::StatusCode::REQUEST_TIMEOUT;
fail_count.fetch_add(1, Ordering::Relaxed);
}
}
maybe_record_access_map(om, access_map);
// Remove from `in_progress`
// in_progress.remove(&cache_key);
in_progress.remove(&cache_key);
if let Some(n) = NOTIFY.remove(&cache_key) {
n.1.notify_waiters(); // wake everyone
}
}
// Helper to compute the cache key for an OwnedBlobMatch
fn build_cache_key(
om: &OwnedBlobMatch,
dep_vars: &FxHashMap<String, Vec<(String, OffsetSpan)>>,
) -> String {
// Build key
let dep_vars_str = dep_vars
.get(om.rule.id())
.map(|hm| {
let mut sorted: Vec<_> = hm.iter().collect();
sorted.sort_by(|(k, _), (k2, _)| k.cmp(k2));
sorted.into_iter().map(|(k, v)| format!("{}={}", k, v)).collect::<Vec<_>>().join("|")
})
.unwrap_or_default();
// For demonstration, well do a simplistic approach
// You can adapt from your existing logic
let capture0 = om.captures.captures.get(0).map_or(String::new(), |c| c.raw_value().to_string());
format!("{}|{}|{}", om.rule.name(), capture0, dep_vars_str)
}
fn maybe_record_access_map(om: &OwnedBlobMatch, collector: Option<&AccessMapCollector>) {
let is_gitlab_rule = om.rule.id().starts_with("kingfisher.gitlab.");
let validation_ok =
om.validation_success || (is_gitlab_rule && om.validation_response_status.is_success());
let collector = match collector {
Some(c) if validation_ok => c,
_ => return,
};
let captures = utils::process_captures(&om.captures);
match om.rule.syntax().validation {
Some(Validation::AWS) => {
let secret = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let mut akid = utils::find_closest_variable(&captures, &secret, "TOKEN", "AKID")
.unwrap_or_default();
if akid.is_empty() {
akid = extract_akid_from_body(&om.validation_response_body).unwrap_or_default();
}
if !akid.is_empty() && !secret.is_empty() {
collector.record_aws(&akid, &secret);
}
}
Some(Validation::GCP) => {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_gcp(value);
}
}
}
Some(Validation::AzureStorage) => {
let storage_key = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let storage_account =
utils::find_closest_variable(&captures, &storage_key, "TOKEN", "AZURENAME")
.unwrap_or_default();
let mut storage_account = storage_account;
if storage_account.is_empty() {
storage_account =
extract_azure_storage_account_from_body(&om.validation_response_body)
.unwrap_or_default();
}
let containers_hint =
extract_azure_storage_containers_from_body(&om.validation_response_body);
if !storage_account.is_empty() && !storage_key.is_empty() {
let creds_json = format!(
r#"{{"storage_account":"{}","storage_key":"{}"}}"#,
storage_account, storage_key
);
collector.record_azure(&creds_json, containers_hint);
}
}
_ => {
if om.rule.id().starts_with("kingfisher.github.") {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_github(value);
}
}
}
if om.rule.id().starts_with("kingfisher.azure.devops.") {
let token = captures
.iter()
.find(|(name, ..)| name == "TOKEN")
.map(|(_, value, ..)| value.clone())
.unwrap_or_default();
let mut organization =
utils::find_closest_variable(&captures, &token, "TOKEN", "AZURE_DEVOPS_ORG")
.unwrap_or_default();
if organization.is_empty() {
organization = extract_azure_devops_org_from_body(&om.validation_response_body)
.unwrap_or_default();
}
if !token.is_empty() && !organization.is_empty() {
collector.record_azure_devops(&token, &organization);
}
}
if is_gitlab_rule {
if let Some((_, value, ..)) = captures.iter().find(|(name, ..)| name == "TOKEN") {
if !value.is_empty() {
collector.record_gitlab(value);
}
}
}
}
}
}
fn extract_akid_from_body(body: &validation_body::ValidationResponseBody) -> Option<String> {
static AKID_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(
r"(?xi)\b(?:A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[0-9A-Z]{16}\b",
)
.expect("valid regex")
});
let text = validation_body::clone_as_string(body);
AKID_RE.find(&text).map(|m| m.as_str().to_string())
}
fn extract_azure_storage_account_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<String> {
static ACCOUNT_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r"(?i)Account:\s*([a-z0-9]{3,24})").expect("valid regex")
});
let text = validation_body::clone_as_string(body);
ACCOUNT_RE.captures(&text).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
}
fn extract_azure_storage_containers_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<Vec<String>> {
static CONTAINERS_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r"(?i)Containers:\s*(\\[[^\\]]*\\])").expect("valid regex")
});
let text = validation_body::clone_as_string(body);
let capture = CONTAINERS_RE
.captures(&text)
.and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))?;
serde_json::from_str::<Vec<String>>(&capture).ok()
}
fn extract_azure_devops_org_from_body(
body: &validation_body::ValidationResponseBody,
) -> Option<String> {
static ORG_RE: once_cell::sync::Lazy<regex::Regex> = once_cell::sync::Lazy::new(|| {
regex::Regex::new(r#"(?i)https?://dev\.azure\.com/([a-z0-9][a-z0-9-]{0,61}[a-z0-9])"#)
.expect("valid regex")
});
let text = validation_body::clone_as_string(body);
ORG_RE.captures(&text).and_then(|caps| caps.get(1).map(|m| m.as_str().to_string()))
}