kingfisher/src/scanner/enumerate.rs

712 lines
28 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::{
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::{Duration, Instant as StdInstant, Instant},
};
use anyhow::{bail, Context, Result};
use base64::{engine::general_purpose::STANDARD, Engine};
use bstr::BString;
use gix::Repository as GixRepo;
use indicatif::{ProgressBar, ProgressStyle};
use rayon::{
iter::plumbing::Folder,
prelude::{ParallelIterator, *},
};
use serde::{Deserialize, Deserializer};
use tracing::{debug, error};
use crate::{
binary::is_binary,
blob::{Blob, BlobId, BlobIdMap},
cli::commands::{github::GitHistoryMode, scan},
decompress::{decompress_file_to_temp, CompressedContent},
findings_store,
matcher::{Matcher, MatcherStats},
open_git_repo,
origin::{Origin, OriginSet},
rule_profiling::ConcurrentRuleProfiler,
rules_database::RulesDatabase,
scanner::{
processing::BlobProcessor,
runner::{create_datastore_channel, spawn_datastore_writer_thread},
util::is_compressed_file,
},
scanner_pool::ScannerPool,
EnumeratorConfig, EnumeratorFileResult, FileResult, FilesystemEnumerator, FoundInput,
GitRepoEnumerator, GitRepoResult, GitRepoWithMetadataEnumerator, PathBuf,
};
type OwnedBlob = Blob<'static>;
pub fn enumerate_filesystem_inputs(
args: &scan::ScanArgs,
datastore: Arc<Mutex<findings_store::FindingsStore>>,
input_roots: &[PathBuf],
progress_enabled: bool,
rules_db: &RulesDatabase,
enable_profiling: bool,
shared_profiler: Arc<ConcurrentRuleProfiler>,
matcher_stats: &Mutex<MatcherStats>,
) -> Result<()> {
let repo_scan_timeout = Duration::from_secs(args.git_repo_timeout);
let progress = if progress_enabled {
let style =
ProgressStyle::with_template("{spinner} {msg} {total_bytes} [{elapsed_precise}]")
.expect("progress bar style template should compile");
let pb = ProgressBar::new_spinner()
.with_style(style)
.with_message("Scanning files and git repository content...");
pb.enable_steady_tick(Duration::from_millis(500));
pb
} else {
ProgressBar::hidden()
};
let _input_enumerator = || -> Result<FilesystemEnumerator> {
let mut ie = FilesystemEnumerator::new(input_roots, &args)?;
ie.threads(args.num_jobs);
ie.max_filesize(args.content_filtering_args.max_file_size_bytes());
if args.input_specifier_args.git_history == GitHistoryMode::None {
ie.enumerate_git_history(false);
}
let collect_git_metadata = true;
ie.collect_git_metadata(collect_git_metadata);
Ok(ie)
}()
.context("Failed to initialize filesystem enumerator")?;
let (enum_thread, input_recv, exclude_globset) = {
let fs_enumerator = make_fs_enumerator(args, input_roots.into())
.context("Failed to initialize filesystem enumerator")?;
let exclude_globset = fs_enumerator.as_ref().and_then(|ie| ie.exclude_globset());
let channel_size = std::cmp::max(args.num_jobs * 128, 1024);
let (input_send, input_recv) = crossbeam_channel::bounded(channel_size);
let input_enumerator_thread = std::thread::Builder::new()
.name("input_enumerator".to_string())
.spawn(move || -> Result<_> {
if let Some(fs_enumerator) = fs_enumerator {
fs_enumerator.run(input_send.clone())?;
}
Ok(())
})
.context("Failed to enumerate filesystem inputs")?;
(input_enumerator_thread, input_recv, exclude_globset)
};
let enum_cfg = EnumeratorConfig {
enumerate_git_history: match args.input_specifier_args.git_history {
GitHistoryMode::Full => true,
GitHistoryMode::None => false,
},
collect_git_metadata: args.input_specifier_args.commit_metadata,
repo_scan_timeout,
exclude_globset,
};
let (send_ds, recv_ds) = create_datastore_channel(args.num_jobs);
let datastore_writer_thread =
spawn_datastore_writer_thread(datastore, recv_ds, !args.no_dedup)?;
let t1 = Instant::now();
let num_blob_processors = Mutex::new(0u64);
let seen_blobs = BlobIdMap::new();
let scanner_pool = Arc::new(ScannerPool::new(Arc::new(rules_db.vsdb.clone())));
let matcher = Matcher::new(
&rules_db,
scanner_pool.clone(),
&seen_blobs,
Some(&matcher_stats),
enable_profiling,
Some(shared_profiler),
)?;
let blob_processor_init_time = Mutex::new(t1.elapsed());
let make_blob_processor = || -> BlobProcessor {
let t1 = Instant::now();
*num_blob_processors.lock().unwrap() += 1;
{
let mut init_time = blob_processor_init_time.lock().unwrap();
*init_time += t1.elapsed();
}
BlobProcessor { matcher }
};
let scan_res: Result<()> = input_recv
.into_iter()
.par_bridge()
.filter_map(|input| match (&enum_cfg, input).into_blob_iter() {
Err(e) => {
debug!("Error enumerating input: {e:#}");
None
}
Ok(blob_iter) => blob_iter,
})
.flatten()
.try_for_each_init(
|| (make_blob_processor.clone()(), progress.clone()),
move |(processor, progress), entry| {
let (origin, blob) = match entry {
Err(e) => {
error!("Error loading input: {e:#}");
return Ok(());
}
Ok(entry) => entry,
};
// Check if this is an archive file
let is_archive = if let Origin::File(file_origin) = &origin.first() {
is_compressed_file(&file_origin.path)
} else {
false
};
let is_binary = is_binary(&blob.bytes());
let should_skip = if is_archive {
// For archives: skip only if --no_extract_archives is true
args.content_filtering_args.no_extract_archives
} else {
// For non-archives: skip if it's binary and --no_binary is true
is_binary && args.content_filtering_args.no_binary
};
if should_skip {
progress.suspend(|| {
if is_archive {
debug!("Skipping archive: {}", blob.id);
} else {
debug!("Skipping binary blob: {}", blob.id);
}
});
return Ok(());
}
progress.inc(blob.len().try_into().unwrap());
match processor.run(origin, blob, args.no_dedup, args.redact, args.no_base64) {
Ok(None) => {
// nothing to record
}
Ok(Some((origin_set, blob_metadata, vec_of_matches))) => {
for (_, single_match) in vec_of_matches {
// Send each match
send_ds.send((
Arc::new(origin_set.clone()),
Arc::new(blob_metadata.clone()),
single_match,
))?;
}
}
Err(e) => {
debug!("Error scanning input: {e:#}");
}
}
Ok(())
},
);
enum_thread.join().unwrap().context("Failed to enumerate inputs")?;
let (..) = datastore_writer_thread
.join()
.unwrap()
.context("Failed to save results to the datastore")?;
scan_res.context("Failed to scan inputs")?;
progress.finish();
Ok(())
}
/// Initialize a `FilesystemEnumerator` based on the command-line arguments and
/// datastore. Also initialize a `Gitignore` that is the same as that used by
/// the filesystem enumerator.
fn make_fs_enumerator(
args: &scan::ScanArgs,
input_roots: Vec<PathBuf>,
) -> Result<Option<FilesystemEnumerator>> {
if input_roots.is_empty() {
Ok(None)
} else {
let mut ie = FilesystemEnumerator::new(&input_roots, &args)?;
ie.threads(args.num_jobs);
ie.max_filesize(args.content_filtering_args.max_file_size_bytes());
if args.input_specifier_args.git_history == GitHistoryMode::None {
ie.enumerate_git_history(false);
}
// Pass no_dedup when enumerating git history
ie.no_dedup(args.no_dedup);
ie.set_exclude_patterns(&args.content_filtering_args.exclude)?;
// Determine whether to collect git metadata or not
let collect_git_metadata = false;
ie.collect_git_metadata(collect_git_metadata);
Ok(Some(ie))
}
}
// Rest of the file remains the same...
/// Implements parallel iteration for either a single blob or a list of blobs.
struct FileResultIter<'a> {
iter_kind: FileResultIterKind,
_marker: PhantomData<&'a ()>,
}
impl<'a> ParallelIterator for FileResultIter<'a> {
type Item = Result<(OriginSet, Blob<'a>)>;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
{
match self.iter_kind {
FileResultIterKind::Single(maybe_one) => {
let mut folder = consumer.into_folder();
if let Some(one) = maybe_one {
folder = folder.consume(Ok(one));
}
folder.complete()
}
FileResultIterKind::Archive(items) => {
items.into_par_iter().map(Ok).drive_unindexed(consumer)
}
}
}
}
impl ParallelBlobIterator for FileResult {
type Iter<'a> = FileResultIter<'a>;
fn into_blob_iter<'a>(self) -> Result<Option<Self::Iter<'a>>> {
let extraction_enabled = self.extract_archives;
let max_extraction_depth = self.extraction_depth;
if extraction_enabled && is_compressed_file(&self.path) {
match decompress_file_to_temp(&self.path) {
Ok((content, _temp_dir)) => match content {
// Single-file decompression fully in memory.
CompressedContent::Raw(ref data) => {
let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]);
let blob = Blob::from_bytes(data.to_vec());
Ok(Some(FileResultIter {
iter_kind: FileResultIterKind::Single(Some((origin, blob))),
_marker: PhantomData,
}))
}
// Single-file decompression streamed to a file. We read it back into memory
// here.
CompressedContent::RawFile(path) => {
let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]);
let blob = Blob::from_file(&path)?;
Ok(Some(FileResultIter {
iter_kind: FileResultIterKind::Single(Some((origin, blob))),
_marker: PhantomData,
}))
}
// Multifile archive (inmemory).
CompressedContent::Archive(ref files) => {
if max_extraction_depth == 0 {
debug!(
"Skipping nested archive (max depth reached): {}",
self.path.display()
);
return Ok(None);
}
let items = files
.iter()
.map(|(filename, data)| {
let full_path = PathBuf::from(filename);
let nested_origin =
OriginSet::new(Origin::from_file(full_path), vec![]);
// Construct a FileResult for deeper extraction if needed (not used
// directly here)
let _ = FileResult {
path: self.path.join(filename),
num_bytes: data.len() as u64,
extract_archives: self.extract_archives,
extraction_depth: max_extraction_depth - 1,
};
(nested_origin, Blob::from_bytes(data.to_vec()))
})
.collect();
Ok(Some(FileResultIter {
iter_kind: FileResultIterKind::Archive(items),
_marker: PhantomData,
}))
}
// Multifile archive (files on disk).
CompressedContent::ArchiveFiles(ref entries) => {
if max_extraction_depth == 0 {
debug!(
"Skipping nested archive (max depth reached): {}",
self.path.display()
);
return Ok(None);
}
// Read each extracted file from disk and create a Blob.
let mut items = Vec::new();
for (filename, disk_path) in entries {
let blob = match Blob::from_file(disk_path) {
Ok(b) => b,
Err(e) => {
debug!(
"Failed to mmap extracted file {}: {}",
disk_path.display(),
e
);
continue; // skip unreadable / unmappable file
}
};
let full_path = PathBuf::from(filename);
let nested_origin =
OriginSet::new(Origin::from_file(full_path), vec![]);
// Construct a FileResult for deeper extraction if needed (not used
// directly here)
let _ = FileResult {
path: self.path.join(filename),
num_bytes: blob.len() as u64,
extract_archives: self.extract_archives,
extraction_depth: max_extraction_depth - 1,
};
items.push((nested_origin, blob));
}
Ok(Some(FileResultIter {
iter_kind: FileResultIterKind::Archive(items),
_marker: PhantomData,
}))
}
},
Err(e) => {
debug!("Failed to decompress {}: {}", self.path.display(), e);
Ok(None) // Skip on decompression failure
}
}
} else {
// Not compressed or extraction disabled: read file as a single blob.
let blob = Blob::from_file(&self.path)
.with_context(|| format!("Failed to load blob from {}", self.path.display()))?;
let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]);
Ok(Some(FileResultIter {
iter_kind: FileResultIterKind::Single(Some((origin, blob))),
_marker: PhantomData,
}))
}
}
}
// A marker so the struct itself carries the lifetime.
struct GitRepoResultIter<'a> {
inner: GitRepoResult,
deadline: std::time::Instant,
_marker: std::marker::PhantomData<&'a ()>,
}
impl ParallelBlobIterator for GitRepoResult {
type Iter<'a> = GitRepoResultIter<'a>;
fn into_blob_iter<'a>(self) -> Result<Option<Self::Iter<'a>>> {
// placeholder 1 h deadline; will be overwritten immediately
const PLACEHOLDER: Duration = Duration::from_secs(3600);
Ok(Some(GitRepoResultIter {
inner: self,
deadline: Instant::now() + PLACEHOLDER,
_marker: std::marker::PhantomData,
}))
}
}
impl<'a> rayon::iter::ParallelIterator for GitRepoResultIter<'a> {
type Item = Result<(OriginSet, Blob<'a>)>;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
{
// ── shared state ──────────────────────────────────────────────
let repo_sync = self.inner.repository.into_sync();
let repo_path = Arc::new(self.inner.path.clone());
let deadline = self.deadline;
let flag = Arc::new(AtomicBool::new(false)); // first-timeout gate
self.inner
.blobs
.into_par_iter()
.with_min_len(1024)
.map_init(|| repo_sync.to_thread_local(), {
let repo_path = Arc::clone(&repo_path);
let flag = Arc::clone(&flag);
move |repo: &mut GixRepo, md| -> Result<(OriginSet, Blob)> {
// ── 10-minute guard ──────────────────────────
if StdInstant::now() > deadline {
if flag.swap(true, Ordering::Relaxed) {
bail!("__timeout_silenced__");
}
bail!("blob-read timeout (repo: {})", repo_path.display());
}
// ── load blob ────────────────────────────────
let blob_id = md.blob_oid;
let mut raw = repo.find_object(blob_id)?.try_into_blob()?;
let blob = Blob::new(BlobId::from(&blob_id), std::mem::take(&mut raw.data));
// ── build Origin — CLONE Arc & PathBuf ──────
let origin = OriginSet::try_from_iter(md.first_seen.iter().map(|e| {
Origin::from_git_repo_with_first_commit(
Arc::clone(&repo_path),
Arc::clone(&e.commit_metadata),
String::from_utf8_lossy(&e.path).to_string(),
)
}))
.unwrap_or_else(|| Origin::from_git_repo(Arc::clone(&repo_path)).into());
Ok((origin, blob))
}
})
.filter(|res| {
!matches!(res,
Err(e) if e.to_string() == "__timeout_silenced__"
)
})
.drive_unindexed(consumer)
}
}
struct EnumeratorFileIter<'a> {
inner: EnumeratorFileResult,
reader: std::io::BufReader<std::fs::File>,
_marker: PhantomData<&'a ()>,
}
impl ParallelBlobIterator for EnumeratorFileResult {
type Iter<'a> = EnumeratorFileIter<'a>;
fn into_blob_iter<'a>(self) -> Result<Option<Self::Iter<'a>>> {
let file = std::fs::File::open(&self.path)?;
let reader = std::io::BufReader::new(file);
Ok(Some(EnumeratorFileIter { inner: self, reader, _marker: PhantomData }))
}
}
enum FoundInputIter<'a> {
File(FileResultIter<'a>),
GitRepo(GitRepoResultIter<'a>),
EnumeratorFile(EnumeratorFileIter<'a>),
}
// Enumerator file parallelism approach:
//
// - Split into lines sequentially
// - Parallelize JSON deserialization (JSON is an expensive serialization format, but easy to sling
// around, hence used here -- another format like Arrow or msgpack would be much more efficient)
impl<'a> ParallelIterator for EnumeratorFileIter<'a> {
type Item = Result<(OriginSet, Blob<'a>)>;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
{
use std::io::BufRead;
(1usize..)
.zip(self.reader.lines())
.filter_map(|(line_num, line)| line.map(|line| (line_num, line)).ok())
.par_bridge()
.map(|(line_num, line)| {
let e: EnumeratorBlobResult = serde_json::from_str(&line).with_context(|| {
format!("Error in enumerator {}:{line_num}", self.inner.path.display())
})?;
// let origin = Origin::from_extended(e.origin).into();
let origin = OriginSet::new(Origin::from_extended(e.origin), Vec::new());
let blob = Blob::from_bytes(e.content.as_bytes().to_owned());
Ok((origin, blob))
})
.drive_unindexed(consumer)
}
}
trait ParallelBlobIterator {
/// The concrete parallel iterator returned by `into_blob_iter`.
/// It is generic over the lifetime `'a` that the produced `Blob<'a>` carries.
type Iter<'a>: ParallelIterator<Item = Result<(OriginSet, Blob<'a>)>> + 'a
where
Self: 'a;
/// Convert the input into an *optional* parallel iterator of `(Origin, Blob)` tuples.
fn into_blob_iter<'a>(self) -> Result<Option<Self::Iter<'a>>>
where
Self: 'a;
}
impl<'a> ParallelIterator for FoundInputIter<'a> {
type Item = Result<(OriginSet, Blob<'a>)>;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: rayon::iter::plumbing::UnindexedConsumer<Self::Item>,
{
match self {
FoundInputIter::File(i) => i.drive_unindexed(consumer),
FoundInputIter::GitRepo(i) => i.drive_unindexed(consumer),
FoundInputIter::EnumeratorFile(i) => i.drive_unindexed(consumer),
}
}
}
impl<'cfg> ParallelBlobIterator for (&'cfg EnumeratorConfig, FoundInput) {
type Iter<'a>
= FoundInputIter<'a>
where
Self: 'a;
fn into_blob_iter<'a>(self) -> Result<Option<Self::Iter<'a>>>
where
'cfg: 'a,
{
use std::time::Instant;
let (cfg, input) = self;
match input {
// ───────────── regular file ─────────────
FoundInput::File(i) => Ok(i.into_blob_iter()?.map(FoundInputIter::File)),
// ───────────── directory (possible Git repo) ─────────────
FoundInput::Directory(i) => {
let path = &i.path;
if !cfg.enumerate_git_history {
return Ok(None);
}
// Try to open a Git repository at that path
let repository = match open_git_repo(path)? {
Some(r) => r,
None => return Ok(None),
};
debug!("Found Git repository at {}", path.display());
let t_start = Instant::now();
let collect_git_metadata = cfg.collect_git_metadata;
let timeout = cfg.repo_scan_timeout;
// Spawn an enumerator thread so we can time-out cleanly
let path_clone = path.to_path_buf();
let (tx, rx) = std::sync::mpsc::channel();
let exclude_globset = cfg.exclude_globset.clone();
let handle = std::thread::spawn(move || {
let res = if collect_git_metadata {
GitRepoWithMetadataEnumerator::new(
&path_clone,
repository,
exclude_globset.clone(),
)
.run()
} else {
GitRepoEnumerator::new(&path_clone, repository).run()
};
let _ = tx.send(res);
});
// Wait for enumeration, polling every 100 ms
let git_result = loop {
if t_start.elapsed() > timeout {
debug!(
"Git repo enumeration at {} timed-out after {:.1}s (> {} s)",
path.display(),
t_start.elapsed().as_secs_f64(),
timeout.as_secs()
);
// Abandon the worker thread and skip this repo
return Ok(None);
}
match rx.try_recv() {
Ok(res) => break res,
Err(std::sync::mpsc::TryRecvError::Empty) => {
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
debug!("Enumerator thread disconnected for {}", path.display());
return Ok(None);
}
}
};
let _ = handle.join(); // avoid leak
match git_result {
Err(e) => {
debug!("Failed to enumerate Git repo at {}: {e}", path.display());
Ok(None)
}
Ok(repo_result) => {
debug!(
"Enumerated Git repo at {} in {:.2}s",
path.display(),
t_start.elapsed().as_secs_f64()
);
// Convert to a blob iterator, then patch the deadline
repo_result
.into_blob_iter() // Option<GitRepoResultIter>
.map(|iter| {
iter.map(|mut gri| {
gri.deadline = Instant::now() + timeout;
FoundInputIter::GitRepo(gri)
})
})
}
}
}
// ───────────── pre-enumerated JSON file list ─────────────
FoundInput::EnumeratorFile(i) => {
Ok(i.into_blob_iter()?.map(FoundInputIter::EnumeratorFile))
}
}
}
}
/// A simple enum describing how we yield file content:
/// - Single: one `(origin, blob)`
/// - Archive: multiple `(origin, blob)` items from a decompressed archive
enum FileResultIterKind {
Single(Option<(OriginSet, OwnedBlob)>),
Archive(Vec<(OriginSet, OwnedBlob)>),
}
#[derive(Deserialize)]
pub enum Content {
#[serde(rename = "content_base64")]
Base64(#[serde(deserialize_with = "deserialize_b64_bstring")] BString),
#[serde(rename = "content")]
Utf8(String),
}
impl Content {
pub fn as_bytes(&self) -> &[u8] {
match self {
Content::Base64(s) => s.as_slice(),
Content::Utf8(s) => s.as_bytes(),
}
}
}
fn deserialize_b64_bstring<'de, D>(deserializer: D) -> Result<BString, D::Error>
where
D: Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
let decoded = STANDARD.decode(&encoded).map_err(serde::de::Error::custom)?;
Ok(decoded.into())
}
// -------------------------------------------------------------------------------------------------
/// An entry deserialized from an extensible enumerator
#[derive(serde::Deserialize)]
struct EnumeratorBlobResult {
#[serde(flatten)]
pub content: Content,
pub origin: serde_json::Value,
}