From 6fb119d50180c1eeb719b37c372dc89d7da448a3 Mon Sep 17 00:00:00 2001 From: Mick Grove Date: Sat, 16 Aug 2025 07:33:36 -0700 Subject: [PATCH] removed serde_utils and added Authress rule --- data/rules/authress.yml | 30 ++ src/git_commit_metadata.rs | 9 +- src/git_repo_enumerator.rs | 7 +- src/lib.rs | 1 - src/origin.rs | 20 +- src/reporter.rs | 10 +- src/safe_list.rs | 236 ++++++----- src/scanner/enumerate.rs | 2 +- src/scanner/enumerate.rs.orig | 712 ++++++++++++++++++++++++++++++++++ src/serde_utils.rs | 60 --- tests/fingerprint_dedup.rs | 7 +- 11 files changed, 901 insertions(+), 193 deletions(-) create mode 100644 data/rules/authress.yml create mode 100644 src/scanner/enumerate.rs.orig delete mode 100644 src/serde_utils.rs diff --git a/data/rules/authress.yml b/data/rules/authress.yml new file mode 100644 index 0000000..46f4ffc --- /dev/null +++ b/data/rules/authress.yml @@ -0,0 +1,30 @@ +rules: + - name: Authress Service Client Access Key + id: kingfisher.authress.1 + pattern: | + (?xi) + ( + (?:sc|ext|scauth|authress)_[a-z0-9]{5,30}\.[a-z0-9]{4,6}\.acc[_-][a-z0-9-]{10,32}\.[a-z0-9+/_=-]{30,120} + ) + confidence: medium + min_entropy: 4.0 + validation: + type: Http + content: + request: + method: GET + url: "https://api.authress.io/v1/users/me" + headers: + Authorization: "Bearer {{TOKEN}}" + response_matcher: + - report_response: true + - type: JsonValid + - type: WordMatch + words: + - '"Unauthorized"' + negative: true + references: + - https://authress.io/knowledge-base/docs/authorization/service-clients/access-keys/ + - https://authress.io/knowledge-base/docs/usage-guides/api-keys-as-a-service-setup/ + examples: + - "sc_V2.access-key-id.acc_my-account-id.secret-key-material-that-is-long" diff --git a/src/git_commit_metadata.rs b/src/git_commit_metadata.rs index 2588d73..db519cb 100644 --- a/src/git_commit_metadata.rs +++ b/src/git_commit_metadata.rs @@ -1,10 +1,7 @@ -use bstr::BString; use gix::{date::Time, ObjectId}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::serde_utils::BStringLossyUtf8; - #[repr(transparent)] #[derive(Serialize, Deserialize, Copy, Clone)] #[serde(remote = "Time")] @@ -133,11 +130,9 @@ pub struct CommitMetadata { #[serde(with = "HexObjectId")] pub commit_id: ObjectId, - #[serde(with = "BStringLossyUtf8")] - pub committer_name: BString, + pub committer_name: String, - #[serde(with = "BStringLossyUtf8")] - pub committer_email: BString, + pub committer_email: String, #[serde(with = "TextTime")] pub committer_timestamp: Time, diff --git a/src/git_repo_enumerator.rs b/src/git_repo_enumerator.rs index 3f3d93d..9c55c2f 100644 --- a/src/git_repo_enumerator.rs +++ b/src/git_repo_enumerator.rs @@ -107,12 +107,9 @@ impl<'a> GitRepoWithMetadataEnumerator<'a> { *commit_oid, Arc::new(CommitMetadata { commit_id: *commit_oid, - committer_name: committer.name.to_owned(), - committer_email: committer.email.to_owned(), + committer_name: String::from_utf8_lossy(&committer.name).into_owned(), + committer_email: String::from_utf8_lossy(&committer.email).into_owned(), committer_timestamp: parse_sig_time(committer.time), - // author_name: author.name.to_owned(), - // author_email: author.email.to_owned(), - // author_timestamp: parse_sig_time(author.time), }), ); } diff --git a/src/lib.rs b/src/lib.rs index 4a95419..fa4c35b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,6 @@ pub mod s3; pub mod safe_list; pub mod scanner; pub mod scanner_pool; -pub mod serde_utils; pub mod slack; pub mod snippet; pub mod update; diff --git a/src/origin.rs b/src/origin.rs index 390c3f6..0dcd207 100644 --- a/src/origin.rs +++ b/src/origin.rs @@ -4,7 +4,7 @@ use std::{ }; use anyhow::{anyhow, Result}; -use bstr::{BString, ByteSlice}; +use bstr::ByteSlice; use dashmap::DashMap; use once_cell::sync::Lazy; use rustc_hash::FxHashSet; @@ -12,7 +12,7 @@ use schemars::JsonSchema; use serde::{ser::SerializeSeq, Deserialize, Serialize}; use smallvec::SmallVec; -use crate::{git_commit_metadata::CommitMetadata, serde_utils::BStringLossyUtf8}; +use crate::git_commit_metadata::CommitMetadata; static URL_CACHE: Lazy>> = Lazy::new(DashMap::default); fn compute_url(repo_path: &Path) -> Result { @@ -93,7 +93,7 @@ impl Origin { pub fn from_git_repo_with_first_commit( repo_path: Arc, commit_metadata: Arc, - blob_path: BString, + blob_path: String, ) -> Self { let first_commit = Some(CommitOrigin { commit_metadata, blob_path }); Origin::GitRepo(GitRepoOrigin { repo_path, first_commit }) @@ -106,10 +106,9 @@ impl Origin { /// Get the path for the blob from this `Origin` entry, if one is specified. pub fn blob_path(&self) -> Option<&Path> { - use bstr::ByteSlice; match self { Self::File(e) => Some(&e.path), - Self::GitRepo(e) => e.first_commit.as_ref().and_then(|c| c.blob_path.to_path().ok()), + Self::GitRepo(e) => e.first_commit.as_ref().map(|c| Path::new(&c.blob_path)), Self::Extended(e) => e.path(), } } @@ -117,11 +116,7 @@ impl Origin { pub fn full_path(&self) -> Option { match self { Self::File(e) => Some((*e.path).clone()), - Self::GitRepo(e) => e - .first_commit - .as_ref() - .and_then(|c| c.blob_path.to_path().ok()) - .map(|p| e.repo_path.join(p)), + Self::GitRepo(e) => e.first_commit.as_ref().map(|c| e.repo_path.join(&c.blob_path)), Self::Extended(e) => e.path().map(PathBuf::from), } } @@ -136,7 +131,7 @@ impl std::fmt::Display for Origin { "git repo {}: first seen in commit {} as {}", e.repo_path.display(), md.commit_metadata.commit_id, - md.blob_path, + &md.blob_path, ), None => write!(f, "git repo {}", e.repo_path.display()), }, @@ -170,8 +165,7 @@ pub struct GitRepoOrigin { pub struct CommitOrigin { pub commit_metadata: Arc, - #[serde(with = "BStringLossyUtf8")] - pub blob_path: BString, + pub blob_path: String, } // ------------------------------------------------------------------------------------------------- // ExtendedOrigin diff --git a/src/reporter.rs b/src/reporter.rs index 13ca84c..a42e682 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -81,8 +81,8 @@ impl DetailsReporter { "url": format!("{}/commit/{}", repo_url, cmd.commit_id), "date": atime, "committer": { - "name": String::from_utf8_lossy(&cmd.committer_name), - "email": String::from_utf8_lossy(&cmd.committer_email), + "name": &cmd.committer_name, + "email": &cmd.committer_email, }, // "author": { // "name": String::from_utf8_lossy(&cmd.author_name), @@ -91,19 +91,19 @@ impl DetailsReporter { // "message": msg, }, "file": { - "path": String::from_utf8_lossy(&cs.blob_path), + "path": &cs.blob_path, "url": format!( "{}/blob/{}/{}#L{}", repo_url, cmd.commit_id, - String::from_utf8_lossy(&cs.blob_path), + &cs.blob_path, source_span.start.line ), "git_command": format!( "git -C {} show {}:{}", prov.repo_path.display(), cmd.commit_id, - String::from_utf8_lossy(&cs.blob_path) + &cs.blob_path ) } }); diff --git a/src/safe_list.rs b/src/safe_list.rs index bbea073..60d88aa 100644 --- a/src/safe_list.rs +++ b/src/safe_list.rs @@ -1,110 +1,152 @@ +//! Safe-match filters: identify *benign* placeholder/example/redacted strings +//! so they don't get treated as real secrets. When a rule matches, we log +//! which rule/explanation fired at `debug!` level. +// +// Usage: +// if is_safe_match(bytes) { /* skip finding */ } +// +// If you also want the specific reason: +// if let Some(reason) = is_safe_match_reason(bytes) { +// // reason contains the rule description +// } + use once_cell::sync::Lazy; use regex::bytes::Regex; use tracing::debug; +/// A rule that describes *why* a match is considered safe/benign. +#[derive(Debug)] +struct SafeRule { + /// Human-friendly reason that will be logged when this rule fires. + description: &'static str, + /// Compiled regex to detect the benign pattern. + regex: Regex, +} + +/// Compile a bytes regex and panic on failure (at init time). +fn compile(pattern: &'static str) -> Regex { + Regex::new(pattern).unwrap_or_else(|e| { + // Compile happens once at startup, so panic is acceptable here. + // We still emit a debug line to aid troubleshooting in non-panic logs. + debug!("Failed to compile safe-list regex: {pattern}\nError: {e}"); + panic!("invalid safe-list regex: {pattern}: {e}"); + }) +} + /// Case-insensitive patterns that indicate a *benign* match (placeholders, examples, redactions, etc.). -/// `is_safe_match()` returns true if any of these are present. -static SAFE_LIST_FILTER_REGEX: Lazy>> = Lazy::new(|| { +/// `is_safe_match()` returns true if any of these are present and logs which rule fired. +/// `is_safe_match_reason()` returns the matching rule's description instead of logging. +static SAFE_LIST_FILTER_RULES: Lazy> = Lazy::new(|| { vec![ - // Assignment-like value that ends with "EXAMPLEKEY" (common placeholder) - // e.g., "KEY=ABC_EXAMPLEKEY" or "key: fooEXAMPLEKEY" - compile_regex(r"(?i)[:=][^:=]{0,64}EXAMPLEKEY"), - - // AWS-style AKIA keys explicitly marked as example/fake/test/sample - // e.g., "AKIA...EXAMPLE", "AKIA...FAKE", "AKIA...SAMPLE" - compile_regex(r"(?i)\b(AKIA(?:.*?EXAMPLE|.*?FAKE|TEST|.*?SAMPLE))\b"), - - // Secret-y key name followed by short value and then "&&" / "||" or a run of asterisks - // e.g., "password=foo &&", "secret: *****" (redacted/masked) - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\s(&&|\|\||\*{5,50})", - ), - - // Secret-y key name with short value, then *another* short assignment on the same line - // Typical of docs/examples rather than hardcoded secrets - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b\w{4,12}\s{0,6}=\s{0,6}\D{0,3}\w{1,12}", - ), - - // Secret-y key assigned to a shell variable reference (e.g., "$FOO") — not a literal secret - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\$\w{4,30}", - ), - - // Secret-y key set via command that *generates* randomness, not a literal value - // e.g., "password = openssl rand -base64 32" - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,16}[=:?][^=:?]{0,8}\bopenssl\s{0,4}rand\b", - ), - - // Secret-y key assigned a value containing "encrypted" (marker/metadata, not a secret) - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}encrypted", - ), - - // Secret-y key assigned boolean literals — not secrets - // e.g., "auth=false" - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b(?:false|true)\b", - ), - - // Secret-y key assigned to null-ish or self-referential placeholders — not secrets - // e.g., "password: null", "secret = none" - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b(null|nil|none|password|pass|pwd|passwd|secret|cred|key|auth|authorization).{1,6}$", - ), - - // The classic xkcd "hunter2" fake password - compile_regex( - r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}hunter2", - ), - - // Obvious placeholder sequences - // (Consider grouping like (?i)(?:123456789|abcdefghij) for clarity.) - compile_regex(r"(?i)123456789|abcdefghij"), - - // Literal placeholder tag often used in docs/config - compile_regex(r"(?i)"), - - // OpenAPI schema references in assignment/query contexts — not secrets - // e.g., "password?ref=#/components/schemas/Credential" - compile_regex(r"(?i)[=:?][^=:?]{0,8}#/components/schemas/"), - - // Example MongoDB URIs with placeholder user/pass like "user:pass" or "foo:bar" - compile_regex( - r"(?i)\b(mongodb(?:\+srv)?://(?:user|foo)[^:@]+:(?:pass|bar)[^@]+@[-\w.%+/:]{3,64}(?:/\w+)?)", - ), - - // "classpath://" URIs — configuration references, not secrets - compile_regex(r"(?i)\b(classpath://)"), - - // Assignment where the value dereferences a placeholder/property like ${env_var} - // e.g., "password=${db_password}" - compile_regex(r"(?i)(\b[^\s\t]{0,16}[=:][^$]*\$\{[a-z_-]{5,30}\})"), - - // URLs with basic auth to hosts ending in "example" or "test" — placeholders - // e.g., "https://user:pass@example" - compile_regex(r"(?i)\b((?:https?:)?//[^:@]{3,50}:[^:@]{3,50}@[\w.]{0,16}(?:example|test))"), - - // Assignment ending with "SECRETMANAGER" — explicit placeholder - compile_regex(r"(?i)[:=][^:=]{0,32}\bSECRETMANAGER"), + SafeRule { + description: "Assignment ending with EXAMPLEKEY (placeholder)", + regex: compile(r"(?i)[:=][^:=]{0,64}EXAMPLEKEY"), + }, + SafeRule { + description: "AWS AKIA key explicitly marked as example/fake/test/sample", + regex: compile(r"(?i)\b(AKIA(?:.*?EXAMPLE|.*?FAKE|TEST|.*?SAMPLE))\b"), + }, + SafeRule { + description: "Secret-like key followed by redaction marker (&&, ||, or ***** run)", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\s(&&|\|\||\*{5,50})", + ), + }, + SafeRule { + description: "Secret-like key + short value followed by another short assignment on same line (example-y)", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b\w{4,12}\s{0,6}=\s{0,6}\D{0,3}\w{1,12}", + ), + }, + SafeRule { + description: "Secret-like key assigned from a shell variable reference (e.g., $FOO), not a literal", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\$\w{4,30}", + ), + }, + SafeRule { + description: "Secret-like key set via randomness generator command (openssl rand ...), not a literal", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,16}[=:?][^=:?]{0,8}\bopenssl\s{0,4}rand\b", + ), + }, + SafeRule { + description: "Secret-like key assigned a value containing 'encrypted' (metadata/marker)", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}encrypted", + ), + }, + SafeRule { + description: "Secret-like key assigned boolean literal (true/false)", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b(?:false|true)\b", + ), + }, + SafeRule { + description: "Secret-like key assigned to null-ish or self-referential placeholders", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}\b(null|nil|none|password|pass|pwd|passwd|secret|cred|key|auth|authorization).{1,6}$", + ), + }, + SafeRule { + description: "Classic xkcd fake password 'hunter2'", + regex: compile( + r"(?i)(password|pass|pwd|passwd|secret|cred|key|auth|authorization)[^=:?]{0,8}[=:?][^=:?]{0,8}hunter2", + ), + }, + SafeRule { + description: "Obvious placeholder sequences (123456789 or abcdefghij)", + regex: compile(r"(?i)123456789|abcdefghij"), + }, + SafeRule { + description: "Literal placeholder tag ''", + regex: compile(r"(?i)"), + }, + SafeRule { + description: "OpenAPI schema references near assignment/query (not a secret)", + regex: compile(r"(?i)[=:?][^=:?]{0,8}#/components/schemas/"), + }, + SafeRule { + description: "Example MongoDB URI with placeholder user/pass like user:pass or foo:bar", + regex: compile( + r"(?i)\b(mongodb(?:\+srv)?://(?:user|foo)[^:@]+:(?:pass|bar)[^@]+@[-\w.%+/:]{3,64}(?:/\w+)?)", + ), + }, + SafeRule { + description: "Classpath URI (configuration reference, not a secret)", + regex: compile(r"(?i)\b(classpath://)"), + }, + SafeRule { + description: "Assignment using property placeholder like ${ENV_VAR}", + regex: compile(r"(?i)(\b[^\s\t]{0,16}[=:][^$]*\$\{[a-z_-]{5,30}\})"), + }, + SafeRule { + description: "URL with basic auth to host ending in example/test (placeholder)", + regex: compile(r"(?i)\b((?:https?:)?//[^:@]{3,50}:[^:@]{3,50}@[\w.]{0,16}(?:example|test))"), + }, + SafeRule { + description: "Assignment ending with SECRETMANAGER (explicit placeholder)", + regex: compile(r"(?i)[:=][^:=]{0,32}\bSECRETMANAGER"), + }, ] }); -fn compile_regex(pattern: &str) -> Option { - match Regex::new(pattern) { - Ok(regex) => Some(regex), - Err(e) => { - debug!("Failed to compile regex '{}': {}", pattern, e); - None - } - } +/// Returns `Some(&'static str)` with the rule description if the input likely +/// contains *benign* placeholder/test strings; otherwise `None`. +pub fn is_safe_match_reason(input: &[u8]) -> Option<&'static str> { + SAFE_LIST_FILTER_RULES + .iter() + .find(|rule| rule.regex.is_match(input)) + .map(|rule| rule.description) } -/// Returns true if the input likely contains *benign* placeholder/test strings. +/// Returns true if the input likely contains *benign* placeholder/test strings, +/// and logs which rule triggered at `debug!` level. pub fn is_safe_match(input: &[u8]) -> bool { - SAFE_LIST_FILTER_REGEX - .iter() - .filter_map(|regex_option| regex_option.as_ref()) - .any(|regex| regex.is_match(input)) + if let Some(reason) = is_safe_match_reason(input) { + debug!("Safe match: {reason}"); + true + } else { + false + } } diff --git a/src/scanner/enumerate.rs b/src/scanner/enumerate.rs index 2b9f958..f4bac9d 100644 --- a/src/scanner/enumerate.rs +++ b/src/scanner/enumerate.rs @@ -456,7 +456,7 @@ impl<'a> rayon::iter::ParallelIterator for GitRepoResultIter<'a> { Origin::from_git_repo_with_first_commit( Arc::clone(&repo_path), Arc::clone(&e.commit_metadata), // ← clone Arc - e.path.clone(), // ← clone path + String::from_utf8_lossy(&e.path).into_owned(), ) })) .unwrap_or_else(|| Origin::from_git_repo(Arc::clone(&repo_path)).into()); diff --git a/src/scanner/enumerate.rs.orig b/src/scanner/enumerate.rs.orig new file mode 100644 index 0000000..2b9f958 --- /dev/null +++ b/src/scanner/enumerate.rs.orig @@ -0,0 +1,712 @@ +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant as StdInstant, Instant}, +}; + +use anyhow::{bail, Context, Result}; +use base64::{engine::general_purpose::STANDARD, Engine}; +use bstr::BString; +use gix::Repository as GixRepo; +use indicatif::{ProgressBar, ProgressStyle}; +use rayon::{ + iter::plumbing::Folder, + prelude::{ParallelIterator, *}, +}; +use serde::{Deserialize, Deserializer}; +use tracing::{debug, error}; + +use crate::{ + binary::is_binary, + blob::{Blob, BlobId, BlobIdMap}, + cli::commands::{github::GitHistoryMode, scan}, + decompress::{decompress_file_to_temp, CompressedContent}, + findings_store, + matcher::{Matcher, MatcherStats}, + open_git_repo, + origin::{Origin, OriginSet}, + rule_profiling::ConcurrentRuleProfiler, + rules_database::RulesDatabase, + scanner::{ + processing::BlobProcessor, + runner::{create_datastore_channel, spawn_datastore_writer_thread}, + util::is_compressed_file, + }, + scanner_pool::ScannerPool, + EnumeratorConfig, EnumeratorFileResult, FileResult, FilesystemEnumerator, FoundInput, + GitRepoEnumerator, GitRepoResult, GitRepoWithMetadataEnumerator, PathBuf, +}; + +type OwnedBlob = Blob<'static>; + +pub fn enumerate_filesystem_inputs( + args: &scan::ScanArgs, + datastore: Arc>, + input_roots: &[PathBuf], + progress_enabled: bool, + rules_db: &RulesDatabase, + enable_profiling: bool, + shared_profiler: Arc, + matcher_stats: &Mutex, +) -> Result<()> { + let repo_scan_timeout = Duration::from_secs(args.git_repo_timeout); + + let progress = if progress_enabled { + let style = + ProgressStyle::with_template("{spinner} {msg} {total_bytes} [{elapsed_precise}]") + .expect("progress bar style template should compile"); + let pb = ProgressBar::new_spinner() + .with_style(style) + .with_message("Scanning files and git repository content..."); + pb.enable_steady_tick(Duration::from_millis(500)); + pb + } else { + ProgressBar::hidden() + }; + let _input_enumerator = || -> Result { + let mut ie = FilesystemEnumerator::new(input_roots, &args)?; + ie.threads(args.num_jobs); + ie.max_filesize(args.content_filtering_args.max_file_size_bytes()); + if args.input_specifier_args.git_history == GitHistoryMode::None { + ie.enumerate_git_history(false); + } + + let collect_git_metadata = true; + ie.collect_git_metadata(collect_git_metadata); + Ok(ie) + }() + .context("Failed to initialize filesystem enumerator")?; + + let (enum_thread, input_recv, exclude_globset) = { + let fs_enumerator = make_fs_enumerator(args, input_roots.into()) + .context("Failed to initialize filesystem enumerator")?; + let exclude_globset = fs_enumerator.as_ref().and_then(|ie| ie.exclude_globset()); + let channel_size = std::cmp::max(args.num_jobs * 128, 1024); + + let (input_send, input_recv) = crossbeam_channel::bounded(channel_size); + let input_enumerator_thread = std::thread::Builder::new() + .name("input_enumerator".to_string()) + .spawn(move || -> Result<_> { + if let Some(fs_enumerator) = fs_enumerator { + fs_enumerator.run(input_send.clone())?; + } + Ok(()) + }) + .context("Failed to enumerate filesystem inputs")?; + (input_enumerator_thread, input_recv, exclude_globset) + }; + + let enum_cfg = EnumeratorConfig { + enumerate_git_history: match args.input_specifier_args.git_history { + GitHistoryMode::Full => true, + GitHistoryMode::None => false, + }, + collect_git_metadata: args.input_specifier_args.commit_metadata, + repo_scan_timeout, + exclude_globset, + }; + let (send_ds, recv_ds) = create_datastore_channel(args.num_jobs); + let datastore_writer_thread = + spawn_datastore_writer_thread(datastore, recv_ds, !args.no_dedup)?; + + let t1 = Instant::now(); + let num_blob_processors = Mutex::new(0u64); + let seen_blobs = BlobIdMap::new(); + let scanner_pool = Arc::new(ScannerPool::new(Arc::new(rules_db.vsdb.clone()))); + + let matcher = Matcher::new( + &rules_db, + scanner_pool.clone(), + &seen_blobs, + Some(&matcher_stats), + enable_profiling, + Some(shared_profiler), + )?; + let blob_processor_init_time = Mutex::new(t1.elapsed()); + let make_blob_processor = || -> BlobProcessor { + let t1 = Instant::now(); + *num_blob_processors.lock().unwrap() += 1; + { + let mut init_time = blob_processor_init_time.lock().unwrap(); + *init_time += t1.elapsed(); + } + BlobProcessor { matcher } + }; + let scan_res: Result<()> = input_recv + .into_iter() + .par_bridge() + .filter_map(|input| match (&enum_cfg, input).into_blob_iter() { + Err(e) => { + debug!("Error enumerating input: {e:#}"); + None + } + Ok(blob_iter) => blob_iter, + }) + .flatten() + .try_for_each_init( + || (make_blob_processor.clone()(), progress.clone()), + move |(processor, progress), entry| { + let (origin, blob) = match entry { + Err(e) => { + error!("Error loading input: {e:#}"); + return Ok(()); + } + Ok(entry) => entry, + }; + // Check if this is an archive file + let is_archive = if let Origin::File(file_origin) = &origin.first() { + is_compressed_file(&file_origin.path) + } else { + false + }; + let is_binary = is_binary(&blob.bytes()); + let should_skip = if is_archive { + // For archives: skip only if --no_extract_archives is true + args.content_filtering_args.no_extract_archives + } else { + // For non-archives: skip if it's binary and --no_binary is true + is_binary && args.content_filtering_args.no_binary + }; + if should_skip { + progress.suspend(|| { + if is_archive { + debug!("Skipping archive: {}", blob.id); + } else { + debug!("Skipping binary blob: {}", blob.id); + } + }); + return Ok(()); + } + progress.inc(blob.len().try_into().unwrap()); + match processor.run(origin, blob, args.no_dedup, args.redact) { + Ok(None) => { + // nothing to record + } + Ok(Some((origin_set, blob_metadata, vec_of_matches))) => { + for (_, single_match) in vec_of_matches { + // Send each match + send_ds.send(( + Arc::new(origin_set.clone()), + Arc::new(blob_metadata.clone()), + single_match, + ))?; + } + } + Err(e) => { + debug!("Error scanning input: {e:#}"); + } + } + Ok(()) + }, + ); + + enum_thread.join().unwrap().context("Failed to enumerate inputs")?; + let (..) = datastore_writer_thread + .join() + .unwrap() + .context("Failed to save results to the datastore")?; + scan_res.context("Failed to scan inputs")?; + progress.finish(); + Ok(()) +} + +/// Initialize a `FilesystemEnumerator` based on the command-line arguments and +/// datastore. Also initialize a `Gitignore` that is the same as that used by +/// the filesystem enumerator. +fn make_fs_enumerator( + args: &scan::ScanArgs, + input_roots: Vec, +) -> Result> { + if input_roots.is_empty() { + Ok(None) + } else { + let mut ie = FilesystemEnumerator::new(&input_roots, &args)?; + ie.threads(args.num_jobs); + ie.max_filesize(args.content_filtering_args.max_file_size_bytes()); + if args.input_specifier_args.git_history == GitHistoryMode::None { + ie.enumerate_git_history(false); + } + + // Pass no_dedup when enumerating git history + ie.no_dedup(args.no_dedup); + + ie.set_exclude_patterns(&args.content_filtering_args.exclude)?; + // Determine whether to collect git metadata or not + let collect_git_metadata = false; + ie.collect_git_metadata(collect_git_metadata); + Ok(Some(ie)) + } +} + +// Rest of the file remains the same... +/// Implements parallel iteration for either a single blob or a list of blobs. +struct FileResultIter<'a> { + iter_kind: FileResultIterKind, + _marker: PhantomData<&'a ()>, +} + +impl<'a> ParallelIterator for FileResultIter<'a> { + type Item = Result<(OriginSet, Blob<'a>)>; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: rayon::iter::plumbing::UnindexedConsumer, + { + match self.iter_kind { + FileResultIterKind::Single(maybe_one) => { + let mut folder = consumer.into_folder(); + if let Some(one) = maybe_one { + folder = folder.consume(Ok(one)); + } + folder.complete() + } + FileResultIterKind::Archive(items) => { + items.into_par_iter().map(Ok).drive_unindexed(consumer) + } + } + } +} + +impl ParallelBlobIterator for FileResult { + type Iter<'a> = FileResultIter<'a>; + + fn into_blob_iter<'a>(self) -> Result>> { + let extraction_enabled = self.extract_archives; + let max_extraction_depth = self.extraction_depth; + + if extraction_enabled && is_compressed_file(&self.path) { + match decompress_file_to_temp(&self.path) { + Ok((content, _temp_dir)) => match content { + // Single-file decompression fully in memory. + CompressedContent::Raw(ref data) => { + let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]); + let blob = Blob::from_bytes(data.to_vec()); + Ok(Some(FileResultIter { + iter_kind: FileResultIterKind::Single(Some((origin, blob))), + _marker: PhantomData, + })) + } + + // Single-file decompression streamed to a file. We read it back into memory + // here. + CompressedContent::RawFile(path) => { + let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]); + let blob = Blob::from_file(&path)?; + Ok(Some(FileResultIter { + iter_kind: FileResultIterKind::Single(Some((origin, blob))), + _marker: PhantomData, + })) + } + + // Multi‑file archive (in‑memory). + CompressedContent::Archive(ref files) => { + if max_extraction_depth == 0 { + debug!( + "Skipping nested archive (max depth reached): {}", + self.path.display() + ); + return Ok(None); + } + let items = files + .iter() + .map(|(filename, data)| { + let full_path = PathBuf::from(filename); + let nested_origin = + OriginSet::new(Origin::from_file(full_path), vec![]); + // Construct a FileResult for deeper extraction if needed (not used + // directly here) + let _ = FileResult { + path: self.path.join(filename), + num_bytes: data.len() as u64, + extract_archives: self.extract_archives, + extraction_depth: max_extraction_depth - 1, + }; + (nested_origin, Blob::from_bytes(data.to_vec())) + }) + .collect(); + Ok(Some(FileResultIter { + iter_kind: FileResultIterKind::Archive(items), + _marker: PhantomData, + })) + } + + // Multi‑file archive (files on disk). + CompressedContent::ArchiveFiles(ref entries) => { + if max_extraction_depth == 0 { + debug!( + "Skipping nested archive (max depth reached): {}", + self.path.display() + ); + return Ok(None); + } + // Read each extracted file from disk and create a Blob. + let mut items = Vec::new(); + for (filename, disk_path) in entries { + let blob = match Blob::from_file(disk_path) { + Ok(b) => b, + Err(e) => { + debug!( + "Failed to mmap extracted file {}: {}", + disk_path.display(), + e + ); + continue; // skip unreadable / unmappable file + } + }; + let full_path = PathBuf::from(filename); + let nested_origin = + OriginSet::new(Origin::from_file(full_path), vec![]); + + // Construct a FileResult for deeper extraction if needed (not used + // directly here) + let _ = FileResult { + path: self.path.join(filename), + num_bytes: blob.len() as u64, + extract_archives: self.extract_archives, + extraction_depth: max_extraction_depth - 1, + }; + items.push((nested_origin, blob)); + } + Ok(Some(FileResultIter { + iter_kind: FileResultIterKind::Archive(items), + _marker: PhantomData, + })) + } + }, + Err(e) => { + debug!("Failed to decompress {}: {}", self.path.display(), e); + Ok(None) // Skip on decompression failure + } + } + } else { + // Not compressed or extraction disabled: read file as a single blob. + let blob = Blob::from_file(&self.path) + .with_context(|| format!("Failed to load blob from {}", self.path.display()))?; + let origin = OriginSet::new(Origin::from_file(self.path.clone()), vec![]); + Ok(Some(FileResultIter { + iter_kind: FileResultIterKind::Single(Some((origin, blob))), + _marker: PhantomData, + })) + } + } +} + +// A marker so the struct itself carries the lifetime. +struct GitRepoResultIter<'a> { + inner: GitRepoResult, + deadline: std::time::Instant, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl ParallelBlobIterator for GitRepoResult { + type Iter<'a> = GitRepoResultIter<'a>; + + fn into_blob_iter<'a>(self) -> Result>> { + // placeholder 1 h deadline; will be overwritten immediately + const PLACEHOLDER: Duration = Duration::from_secs(3600); + + Ok(Some(GitRepoResultIter { + inner: self, + deadline: Instant::now() + PLACEHOLDER, + _marker: std::marker::PhantomData, + })) + } +} + +impl<'a> rayon::iter::ParallelIterator for GitRepoResultIter<'a> { + type Item = Result<(OriginSet, Blob<'a>)>; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: rayon::iter::plumbing::UnindexedConsumer, + { + // ── shared state ────────────────────────────────────────────── + let repo_sync = self.inner.repository.into_sync(); + let repo_path = Arc::new(self.inner.path.clone()); + let deadline = self.deadline; + let flag = Arc::new(AtomicBool::new(false)); // first-timeout gate + + self.inner + .blobs + .into_par_iter() + .with_min_len(1024) + .map_init(|| repo_sync.to_thread_local(), { + let repo_path = Arc::clone(&repo_path); + let flag = Arc::clone(&flag); + + move |repo: &mut GixRepo, md| -> Result<(OriginSet, Blob)> { + // ── 10-minute guard ────────────────────────── + if StdInstant::now() > deadline { + if flag.swap(true, Ordering::Relaxed) { + bail!("__timeout_silenced__"); + } + bail!("blob-read timeout (repo: {})", repo_path.display()); + } + + // ── load blob ──────────────────────────────── + let blob_id = md.blob_oid; + let mut raw = repo.find_object(blob_id)?.try_into_blob()?; + let blob = Blob::new(BlobId::from(&blob_id), std::mem::take(&mut raw.data)); + + // ── build Origin — CLONE Arc & PathBuf ────── + let origin = OriginSet::try_from_iter(md.first_seen.iter().map(|e| { + Origin::from_git_repo_with_first_commit( + Arc::clone(&repo_path), + Arc::clone(&e.commit_metadata), // ← clone Arc + e.path.clone(), // ← clone path + ) + })) + .unwrap_or_else(|| Origin::from_git_repo(Arc::clone(&repo_path)).into()); + + Ok((origin, blob)) + } + }) + .filter(|res| { + !matches!(res, + Err(e) if e.to_string() == "__timeout_silenced__" + ) + }) + .drive_unindexed(consumer) + } +} + +struct EnumeratorFileIter<'a> { + inner: EnumeratorFileResult, + reader: std::io::BufReader, + _marker: PhantomData<&'a ()>, +} + +impl ParallelBlobIterator for EnumeratorFileResult { + type Iter<'a> = EnumeratorFileIter<'a>; + + fn into_blob_iter<'a>(self) -> Result>> { + let file = std::fs::File::open(&self.path)?; + let reader = std::io::BufReader::new(file); + Ok(Some(EnumeratorFileIter { inner: self, reader, _marker: PhantomData })) + } +} +enum FoundInputIter<'a> { + File(FileResultIter<'a>), + GitRepo(GitRepoResultIter<'a>), + EnumeratorFile(EnumeratorFileIter<'a>), +} + +// Enumerator file parallelism approach: +// +// - Split into lines sequentially +// - Parallelize JSON deserialization (JSON is an expensive serialization format, but easy to sling +// around, hence used here -- another format like Arrow or msgpack would be much more efficient) + +impl<'a> ParallelIterator for EnumeratorFileIter<'a> { + type Item = Result<(OriginSet, Blob<'a>)>; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: rayon::iter::plumbing::UnindexedConsumer, + { + use std::io::BufRead; + (1usize..) + .zip(self.reader.lines()) + .filter_map(|(line_num, line)| line.map(|line| (line_num, line)).ok()) + .par_bridge() + .map(|(line_num, line)| { + let e: EnumeratorBlobResult = serde_json::from_str(&line).with_context(|| { + format!("Error in enumerator {}:{line_num}", self.inner.path.display()) + })?; + // let origin = Origin::from_extended(e.origin).into(); + let origin = OriginSet::new(Origin::from_extended(e.origin), Vec::new()); + let blob = Blob::from_bytes(e.content.as_bytes().to_owned()); + Ok((origin, blob)) + }) + .drive_unindexed(consumer) + } +} + +trait ParallelBlobIterator { + /// The concrete parallel iterator returned by `into_blob_iter`. + /// It is generic over the lifetime `'a` that the produced `Blob<'a>` carries. + type Iter<'a>: ParallelIterator)>> + 'a + where + Self: 'a; + /// Convert the input into an *optional* parallel iterator of `(Origin, Blob)` tuples. + fn into_blob_iter<'a>(self) -> Result>> + where + Self: 'a; +} + +impl<'a> ParallelIterator for FoundInputIter<'a> { + type Item = Result<(OriginSet, Blob<'a>)>; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: rayon::iter::plumbing::UnindexedConsumer, + { + match self { + FoundInputIter::File(i) => i.drive_unindexed(consumer), + FoundInputIter::GitRepo(i) => i.drive_unindexed(consumer), + FoundInputIter::EnumeratorFile(i) => i.drive_unindexed(consumer), + } + } +} +impl<'cfg> ParallelBlobIterator for (&'cfg EnumeratorConfig, FoundInput) { + type Iter<'a> + = FoundInputIter<'a> + where + Self: 'a; + + fn into_blob_iter<'a>(self) -> Result>> + where + 'cfg: 'a, + { + use std::time::Instant; + + let (cfg, input) = self; + + match input { + // ───────────── regular file ───────────── + FoundInput::File(i) => Ok(i.into_blob_iter()?.map(FoundInputIter::File)), + + // ───────────── directory (possible Git repo) ───────────── + FoundInput::Directory(i) => { + let path = &i.path; + + if !cfg.enumerate_git_history { + return Ok(None); + } + + // Try to open a Git repository at that path + let repository = match open_git_repo(path)? { + Some(r) => r, + None => return Ok(None), + }; + + debug!("Found Git repository at {}", path.display()); + let t_start = Instant::now(); + let collect_git_metadata = cfg.collect_git_metadata; + let timeout = cfg.repo_scan_timeout; + + // Spawn an enumerator thread so we can time-out cleanly + let path_clone = path.to_path_buf(); + let (tx, rx) = std::sync::mpsc::channel(); + let exclude_globset = cfg.exclude_globset.clone(); + let handle = std::thread::spawn(move || { + let res = if collect_git_metadata { + GitRepoWithMetadataEnumerator::new( + &path_clone, + repository, + exclude_globset.clone(), + ) + .run() + } else { + GitRepoEnumerator::new(&path_clone, repository).run() + }; + let _ = tx.send(res); + }); + + // Wait for enumeration, polling every 100 ms + let git_result = loop { + if t_start.elapsed() > timeout { + debug!( + "Git repo enumeration at {} timed-out after {:.1}s (> {} s)", + path.display(), + t_start.elapsed().as_secs_f64(), + timeout.as_secs() + ); + // Abandon the worker thread and skip this repo + return Ok(None); + } + + match rx.try_recv() { + Ok(res) => break res, + Err(std::sync::mpsc::TryRecvError::Empty) => { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => { + debug!("Enumerator thread disconnected for {}", path.display()); + return Ok(None); + } + } + }; + + let _ = handle.join(); // avoid leak + + match git_result { + Err(e) => { + debug!("Failed to enumerate Git repo at {}: {e}", path.display()); + Ok(None) + } + Ok(repo_result) => { + debug!( + "Enumerated Git repo at {} in {:.2}s", + path.display(), + t_start.elapsed().as_secs_f64() + ); + + // Convert to a blob iterator, then patch the deadline + repo_result + .into_blob_iter() // Option + .map(|iter| { + iter.map(|mut gri| { + gri.deadline = Instant::now() + timeout; + FoundInputIter::GitRepo(gri) + }) + }) + } + } + } + + // ───────────── pre-enumerated JSON file list ───────────── + FoundInput::EnumeratorFile(i) => { + Ok(i.into_blob_iter()?.map(FoundInputIter::EnumeratorFile)) + } + } + } +} + +/// A simple enum describing how we yield file content: +/// - Single: one `(origin, blob)` +/// - Archive: multiple `(origin, blob)` items from a decompressed archive +enum FileResultIterKind { + Single(Option<(OriginSet, OwnedBlob)>), + Archive(Vec<(OriginSet, OwnedBlob)>), +} + +#[derive(Deserialize)] +pub enum Content { + #[serde(rename = "content_base64")] + Base64(#[serde(deserialize_with = "deserialize_b64_bstring")] BString), + + #[serde(rename = "content")] + Utf8(String), +} + +impl Content { + pub fn as_bytes(&self) -> &[u8] { + match self { + Content::Base64(s) => s.as_slice(), + Content::Utf8(s) => s.as_bytes(), + } + } +} + +fn deserialize_b64_bstring<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let encoded = String::deserialize(deserializer)?; + let decoded = STANDARD.decode(&encoded).map_err(serde::de::Error::custom)?; + Ok(decoded.into()) +} + +// ------------------------------------------------------------------------------------------------- +/// An entry deserialized from an extensible enumerator +#[derive(serde::Deserialize)] +struct EnumeratorBlobResult { + #[serde(flatten)] + pub content: Content, + + pub origin: serde_json::Value, +} diff --git a/src/serde_utils.rs b/src/serde_utils.rs deleted file mode 100644 index 06286e2..0000000 --- a/src/serde_utils.rs +++ /dev/null @@ -1,60 +0,0 @@ -use bstr::BString; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize)] -#[serde(remote = "BString")] -pub struct BStringLossyUtf8( - #[serde( - getter = "bstring_as_vec", - serialize_with = "serialize_bytes_string_lossy", - deserialize_with = "deserialize_bytes_string" - )] - pub Vec, -); -#[inline] -fn bstring_as_vec(b: &BString) -> &Vec { - b -} -impl From for BString { - fn from(b: BStringLossyUtf8) -> BString { - BString::new(b.0) - } -} -fn serialize_bytes_string_lossy( - bytes: &[u8], - s: S, -) -> Result { - s.serialize_str(&String::from_utf8_lossy(bytes)) -} -fn deserialize_bytes_string<'de, D: serde::Deserializer<'de>>(d: D) -> Result, D::Error> { - struct Vis; - impl serde::de::Visitor<'_> for Vis { - type Value = Vec; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a string") - } - - fn visit_str(self, v: &str) -> Result { - Ok(v.into()) - } - } - d.deserialize_str(Vis) -} -impl JsonSchema for BStringLossyUtf8 { - fn is_referenceable() -> bool { - false - } - - fn schema_id() -> std::borrow::Cow<'static, str> { - ::schema_id() - } - - fn schema_name() -> String { - ::schema_name() - } - - fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { - String::json_schema(gen) - } -} diff --git a/tests/fingerprint_dedup.rs b/tests/fingerprint_dedup.rs index f51f2ed..bfcbee0 100644 --- a/tests/fingerprint_dedup.rs +++ b/tests/fingerprint_dedup.rs @@ -5,7 +5,6 @@ use std::{ }; use anyhow::Result; -use bstr::BString; use gix::{date, ObjectId}; use kingfisher::{ blob::{BlobId, BlobMetadata}, @@ -62,8 +61,8 @@ fn dummy_commit(commit_id: &str) -> CommitMetadata { CommitMetadata { commit_id: oid, - committer_name: BString::from("tester"), - committer_email: BString::from("tester@example.com"), + committer_name: "tester".into(), + committer_email: "tester@example.com".into(), committer_timestamp: ts, } } @@ -76,7 +75,7 @@ fn git_origin(commit_id: &str) -> OriginSet { OriginSet::single(Origin::from_git_repo_with_first_commit( Arc::new(PathBuf::from("/tmp/repo")), Arc::new(md), - BString::from("dummy.txt"), + String::from("dummy.txt"), )) }